1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
import json
import sqlite3
class JobQueue:
def __init__(self, dbfile):
self.dbfile = dbfile
self.db = sqlite3.connect(dbfile, check_same_thread=False)
self.db.row_factory = sqlite3.Row
with self.db as conn:
conn.execute('''CREATE TABLE IF NOT EXISTS jobs
(id INTEGER PRIMARY KEY,
type TEXT,
params TEXT,
client TEXT,
result TEXT DEFAULT NULL,
created DATETIME DEFAULT CURRENT_TIMESTAMP,
consumed DATETIME DEFAULT NULL,
aborted DATETIME DEFAULT NULL,
finished DATETIME DEFAULT NULL);''')
def enqueue(self, task_type:str, client, **params):
""" Enqueue a job of the given type with the given params. Returns the new job ID. """
with self.db as conn:
return conn.execute('INSERT INTO jobs(type, client, params) VALUES (?, ?, ?)',
(task_type, client, json.dumps(params))).lastrowid
def check_result(slef, job_id):
with self.db as conn:
job = conn.execute('SELECT * FROM jobs WHERE id=?', (job_id,)).fetchone()
if job is None:
raise IndexError('Job id not found')
return job.result
def drop(self, job_id):
with self.db as conn:
return conn.execute('DELETE FROM jobs WHERE id=?', (job_id,)).rowcount > 0
def pop(self, task_type):
""" Fetch the next job of the given type. Returns a sqlite3.Row object of the job or None if no jobs of the given
type are queued. """
with self.db as conn:
job = conn.execute('SELECT * FROM jobs WHERE type=? AND consumed IS NULL ORDER BY created ASC LIMIT 1',
(task_type,)).fetchone()
if job is None:
return None
# Atomically commit to this job
conn.execute('UPDATE jobs SET consumed=datetime("now") WHERE id=?', (job['id'],))
return Job(self.db, job)
def job_iter(self, task_type):
return iter(lambda: self.pop(task_type), None)
def __getitem__(self, key):
""" Return the job with the given ID, or raise a KeyError if the key cannot be found. """
with self.db as conn:
job = conn.execute('SELECT * FROM jobs WHERE id=?', (key,)).fetchone()
if job is None:
raise KeyError(f'Unknown job ID "{key}"')
return Job(self.db, job)
class Job(dict):
def __init__(self, db, row):
super().__init__(json.loads(row['params']))
self._db = db
self._row = row
self.id = row['id']
self.type = row['type']
self.client = row['client']
self.created = row['created']
self.consumed = row['consumed']
self.finished = row['finished']
self.result = None
def __enter__(self):
return self
def __exit__(self, _exc_type, _exc_val, _exc_tb):
with self._db as conn:
conn.execute('UPDATE jobs SET finished=datetime("now"), result=? WHERE id=?', (self.result, self.id,))
def abort(self):
with self._db as conn:
conn.execute('UPDATE jobs SET aborted=datetime("now") WHERE id=?', (self.id,))
|