diff options
author | jaseg <git@jaseg.net> | 2019-03-27 18:28:57 +0900 |
---|---|---|
committer | jaseg <git@jaseg.net> | 2019-03-27 18:28:57 +0900 |
commit | 874adce8f4efdda653c1e60d5b353a3bc816af93 (patch) | |
tree | f4926c974ee807ffbd60a08814341466f755f14b /gerboweb/job_queue.py | |
parent | a006deb18830997529044e898282e2d9735b632d (diff) | |
download | gerbolyze-874adce8f4efdda653c1e60d5b353a3bc816af93.tar.gz gerbolyze-874adce8f4efdda653c1e60d5b353a3bc816af93.tar.bz2 gerbolyze-874adce8f4efdda653c1e60d5b353a3bc816af93.zip |
gerboweb: Initial commit
The functionality is there, no design yet
Diffstat (limited to 'gerboweb/job_queue.py')
-rw-r--r-- | gerboweb/job_queue.py | 88 |
1 files changed, 88 insertions, 0 deletions
diff --git a/gerboweb/job_queue.py b/gerboweb/job_queue.py new file mode 100644 index 0000000..e48379d --- /dev/null +++ b/gerboweb/job_queue.py @@ -0,0 +1,88 @@ + +import json +import sqlite3 + +class JobQueue: + def __init__(self, dbfile): + self.dbfile = dbfile + self.db = sqlite3.connect(dbfile, check_same_thread=False) + self.db.row_factory = sqlite3.Row + with self.db as conn: + conn.execute('''CREATE TABLE IF NOT EXISTS jobs + (id INTEGER PRIMARY KEY, + type TEXT, + params TEXT, + client TEXT, + result TEXT DEFAULT NULL, + created DATETIME DEFAULT CURRENT_TIMESTAMP, + consumed DATETIME DEFAULT NULL, + aborted DATETIME DEFAULT NULL, + finished DATETIME DEFAULT NULL);''') + + def enqueue(self, task_type:str, client, **params): + """ Enqueue a job of the given type with the given params. Returns the new job ID. """ + with self.db as conn: + return conn.execute('INSERT INTO jobs(type, client, params) VALUES (?, ?, ?)', + (task_type, client, json.dumps(params))).lastrowid + + def check_result(slef, job_id): + with self.db as conn: + job = conn.execute('SELECT * FROM jobs WHERE id=?', (job_id,)).fetchone() + if job is None: + raise IndexError('Job id not found') + return job.result + + def drop(self, job_id): + with self.db as conn: + return conn.execute('DELETE FROM jobs WHERE id=?', (job_id,)).rowcount > 0 + + def pop(self, task_type): + """ Fetch the next job of the given type. Returns a sqlite3.Row object of the job or None if no jobs of the given + type are queued. """ + with self.db as conn: + job = conn.execute('SELECT * FROM jobs WHERE type=? AND consumed IS NULL ORDER BY created ASC LIMIT 1', + (task_type,)).fetchone() + if job is None: + return None + + # Atomically commit to this job + conn.execute('UPDATE jobs SET consumed=datetime("now") WHERE id=?', (job['id'],)) + + return Job(self.db, job) + + def job_iter(self, task_type): + return iter(lambda: self.pop(task_type), None) + + def __getitem__(self, key): + """ Return the job with the given ID, or raise a KeyError if the key cannot be found. """ + with self.db as conn: + job = conn.execute('SELECT * FROM jobs WHERE id=?', (key,)).fetchone() + if job is None: + raise KeyError(f'Unknown job ID "{key}"') + + return Job(self.db, job) + +class Job(dict): + def __init__(self, db, row): + super().__init__(json.loads(row['params'])) + self._db = db + self._row = row + self.id = row['id'] + self.type = row['type'] + self.client = row['client'] + self.created = row['created'] + self.consumed = row['consumed'] + self.finished = row['finished'] + self.result = None + + def __enter__(self): + return self + + def __exit__(self, _exc_type, _exc_val, _exc_tb): + with self._db as conn: + conn.execute('UPDATE jobs SET finished=datetime("now"), result=? WHERE id=?', (self.result, self.id,)) + + def abort(self): + with self._db as conn: + conn.execute('UPDATE jobs SET aborted=datetime("now") WHERE id=?', (self.id,)) + |