aboutsummaryrefslogtreecommitdiff
path: root/gerboweb/job_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'gerboweb/job_queue.py')
-rw-r--r--gerboweb/job_queue.py88
1 files changed, 88 insertions, 0 deletions
diff --git a/gerboweb/job_queue.py b/gerboweb/job_queue.py
new file mode 100644
index 0000000..e48379d
--- /dev/null
+++ b/gerboweb/job_queue.py
@@ -0,0 +1,88 @@
+
+import json
+import sqlite3
+
+class JobQueue:
+ def __init__(self, dbfile):
+ self.dbfile = dbfile
+ self.db = sqlite3.connect(dbfile, check_same_thread=False)
+ self.db.row_factory = sqlite3.Row
+ with self.db as conn:
+ conn.execute('''CREATE TABLE IF NOT EXISTS jobs
+ (id INTEGER PRIMARY KEY,
+ type TEXT,
+ params TEXT,
+ client TEXT,
+ result TEXT DEFAULT NULL,
+ created DATETIME DEFAULT CURRENT_TIMESTAMP,
+ consumed DATETIME DEFAULT NULL,
+ aborted DATETIME DEFAULT NULL,
+ finished DATETIME DEFAULT NULL);''')
+
+ def enqueue(self, task_type:str, client, **params):
+ """ Enqueue a job of the given type with the given params. Returns the new job ID. """
+ with self.db as conn:
+ return conn.execute('INSERT INTO jobs(type, client, params) VALUES (?, ?, ?)',
+ (task_type, client, json.dumps(params))).lastrowid
+
+ def check_result(slef, job_id):
+ with self.db as conn:
+ job = conn.execute('SELECT * FROM jobs WHERE id=?', (job_id,)).fetchone()
+ if job is None:
+ raise IndexError('Job id not found')
+ return job.result
+
+ def drop(self, job_id):
+ with self.db as conn:
+ return conn.execute('DELETE FROM jobs WHERE id=?', (job_id,)).rowcount > 0
+
+ def pop(self, task_type):
+ """ Fetch the next job of the given type. Returns a sqlite3.Row object of the job or None if no jobs of the given
+ type are queued. """
+ with self.db as conn:
+ job = conn.execute('SELECT * FROM jobs WHERE type=? AND consumed IS NULL ORDER BY created ASC LIMIT 1',
+ (task_type,)).fetchone()
+ if job is None:
+ return None
+
+ # Atomically commit to this job
+ conn.execute('UPDATE jobs SET consumed=datetime("now") WHERE id=?', (job['id'],))
+
+ return Job(self.db, job)
+
+ def job_iter(self, task_type):
+ return iter(lambda: self.pop(task_type), None)
+
+ def __getitem__(self, key):
+ """ Return the job with the given ID, or raise a KeyError if the key cannot be found. """
+ with self.db as conn:
+ job = conn.execute('SELECT * FROM jobs WHERE id=?', (key,)).fetchone()
+ if job is None:
+ raise KeyError(f'Unknown job ID "{key}"')
+
+ return Job(self.db, job)
+
+class Job(dict):
+ def __init__(self, db, row):
+ super().__init__(json.loads(row['params']))
+ self._db = db
+ self._row = row
+ self.id = row['id']
+ self.type = row['type']
+ self.client = row['client']
+ self.created = row['created']
+ self.consumed = row['consumed']
+ self.finished = row['finished']
+ self.result = None
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, _exc_type, _exc_val, _exc_tb):
+ with self._db as conn:
+ conn.execute('UPDATE jobs SET finished=datetime("now"), result=? WHERE id=?', (self.result, self.id,))
+
+ def abort(self):
+ with self._db as conn:
+ conn.execute('UPDATE jobs SET aborted=datetime("now") WHERE id=?', (self.id,))
+