From 874adce8f4efdda653c1e60d5b353a3bc816af93 Mon Sep 17 00:00:00 2001 From: jaseg Date: Wed, 27 Mar 2019 18:28:57 +0900 Subject: gerboweb: Initial commit The functionality is there, no design yet --- gerboweb/job_queue.py | 88 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 gerboweb/job_queue.py (limited to 'gerboweb/job_queue.py') diff --git a/gerboweb/job_queue.py b/gerboweb/job_queue.py new file mode 100644 index 0000000..e48379d --- /dev/null +++ b/gerboweb/job_queue.py @@ -0,0 +1,88 @@ + +import json +import sqlite3 + +class JobQueue: + def __init__(self, dbfile): + self.dbfile = dbfile + self.db = sqlite3.connect(dbfile, check_same_thread=False) + self.db.row_factory = sqlite3.Row + with self.db as conn: + conn.execute('''CREATE TABLE IF NOT EXISTS jobs + (id INTEGER PRIMARY KEY, + type TEXT, + params TEXT, + client TEXT, + result TEXT DEFAULT NULL, + created DATETIME DEFAULT CURRENT_TIMESTAMP, + consumed DATETIME DEFAULT NULL, + aborted DATETIME DEFAULT NULL, + finished DATETIME DEFAULT NULL);''') + + def enqueue(self, task_type:str, client, **params): + """ Enqueue a job of the given type with the given params. Returns the new job ID. """ + with self.db as conn: + return conn.execute('INSERT INTO jobs(type, client, params) VALUES (?, ?, ?)', + (task_type, client, json.dumps(params))).lastrowid + + def check_result(slef, job_id): + with self.db as conn: + job = conn.execute('SELECT * FROM jobs WHERE id=?', (job_id,)).fetchone() + if job is None: + raise IndexError('Job id not found') + return job.result + + def drop(self, job_id): + with self.db as conn: + return conn.execute('DELETE FROM jobs WHERE id=?', (job_id,)).rowcount > 0 + + def pop(self, task_type): + """ Fetch the next job of the given type. Returns a sqlite3.Row object of the job or None if no jobs of the given + type are queued. """ + with self.db as conn: + job = conn.execute('SELECT * FROM jobs WHERE type=? AND consumed IS NULL ORDER BY created ASC LIMIT 1', + (task_type,)).fetchone() + if job is None: + return None + + # Atomically commit to this job + conn.execute('UPDATE jobs SET consumed=datetime("now") WHERE id=?', (job['id'],)) + + return Job(self.db, job) + + def job_iter(self, task_type): + return iter(lambda: self.pop(task_type), None) + + def __getitem__(self, key): + """ Return the job with the given ID, or raise a KeyError if the key cannot be found. """ + with self.db as conn: + job = conn.execute('SELECT * FROM jobs WHERE id=?', (key,)).fetchone() + if job is None: + raise KeyError(f'Unknown job ID "{key}"') + + return Job(self.db, job) + +class Job(dict): + def __init__(self, db, row): + super().__init__(json.loads(row['params'])) + self._db = db + self._row = row + self.id = row['id'] + self.type = row['type'] + self.client = row['client'] + self.created = row['created'] + self.consumed = row['consumed'] + self.finished = row['finished'] + self.result = None + + def __enter__(self): + return self + + def __exit__(self, _exc_type, _exc_val, _exc_tb): + with self._db as conn: + conn.execute('UPDATE jobs SET finished=datetime("now"), result=? WHERE id=?', (self.result, self.id,)) + + def abort(self): + with self._db as conn: + conn.execute('UPDATE jobs SET aborted=datetime("now") WHERE id=?', (self.id,)) + -- cgit From a846d39bc88e0c03402b20790b04762666bc055f Mon Sep 17 00:00:00 2001 From: jaseg Date: Tue, 2 Apr 2019 04:34:57 +0900 Subject: gerboweb: Fix job queue handling --- gerboweb/job_queue.py | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) (limited to 'gerboweb/job_queue.py') diff --git a/gerboweb/job_queue.py b/gerboweb/job_queue.py index e48379d..76f17dc 100644 --- a/gerboweb/job_queue.py +++ b/gerboweb/job_queue.py @@ -25,22 +25,11 @@ class JobQueue: return conn.execute('INSERT INTO jobs(type, client, params) VALUES (?, ?, ?)', (task_type, client, json.dumps(params))).lastrowid - def check_result(slef, job_id): - with self.db as conn: - job = conn.execute('SELECT * FROM jobs WHERE id=?', (job_id,)).fetchone() - if job is None: - raise IndexError('Job id not found') - return job.result - - def drop(self, job_id): - with self.db as conn: - return conn.execute('DELETE FROM jobs WHERE id=?', (job_id,)).rowcount > 0 - def pop(self, task_type): """ Fetch the next job of the given type. Returns a sqlite3.Row object of the job or None if no jobs of the given type are queued. """ with self.db as conn: - job = conn.execute('SELECT * FROM jobs WHERE type=? AND consumed IS NULL ORDER BY created ASC LIMIT 1', + job = conn.execute('SELECT * FROM jobs WHERE type=? AND consumed IS NULL AND aborted IS NULL ORDER BY created ASC LIMIT 1', (task_type,)).fetchone() if job is None: return None @@ -73,7 +62,7 @@ class Job(dict): self.created = row['created'] self.consumed = row['consumed'] self.finished = row['finished'] - self.result = None + self.result = row['result'] def __enter__(self): return self @@ -82,7 +71,7 @@ class Job(dict): with self._db as conn: conn.execute('UPDATE jobs SET finished=datetime("now"), result=? WHERE id=?', (self.result, self.id,)) - def abort(self): - with self._db as conn: + def abort(self, job_id): + with self.db as conn: conn.execute('UPDATE jobs SET aborted=datetime("now") WHERE id=?', (self.id,)) -- cgit From a54abadf01e9e543b416ed09e8500ed71170342c Mon Sep 17 00:00:00 2001 From: jaseg Date: Wed, 3 Apr 2019 23:53:04 +0900 Subject: gerbolyze fixes, clippy experiments --- gerboweb/job_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'gerboweb/job_queue.py') diff --git a/gerboweb/job_queue.py b/gerboweb/job_queue.py index 76f17dc..62ba398 100644 --- a/gerboweb/job_queue.py +++ b/gerboweb/job_queue.py @@ -13,7 +13,7 @@ class JobQueue: type TEXT, params TEXT, client TEXT, - result TEXT DEFAULT NULL, + result INTEGER DEFAULT NULL, created DATETIME DEFAULT CURRENT_TIMESTAMP, consumed DATETIME DEFAULT NULL, aborted DATETIME DEFAULT NULL, -- cgit