Skip to content

Commit 7867e34

Browse files
committed
add enqueue & perform timestamps, add job hooks for accessing them
Classes which implement the perform() method can now also implement before_perform() and after_perform() methods which take a metadata dict. These contain the args passed in, as well as timestamps for when the job was enqueued and when it was performed, and whether the job failed and was retried.
1 parent 2642587 commit 7867e34

File tree

2 files changed

+41
-3
lines changed

2 files changed

+41
-3
lines changed

pyres/__init__.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,8 @@ def enqueue(self, klass, *args):
212212
queue = getattr(klass,'queue', None)
213213
if queue:
214214
class_name = '%s.%s' % (klass.__module__, klass.__name__)
215-
self.push(queue, {'class':class_name,'args':args})
215+
self.push(queue, {'class':class_name,'args':args,
216+
'enqueue_timestamp':time.time()})
216217
logging.info("enqueued '%s' job on queue %s" % (class_name, queue))
217218
if args:
218219
logging.debug("job arguments: %s" % str(args))
@@ -222,7 +223,8 @@ def enqueue(self, klass, *args):
222223
logging.warning("unable to enqueue job with class %s" % str(klass))
223224

224225
def enqueue_from_string(self, klass_as_string, queue, *args, **kwargs):
225-
payload = {'class':klass_as_string, 'queue': queue, 'args':args}
226+
payload = {'class':klass_as_string, 'queue': queue, 'args':args,
227+
'enqueue_timestamp':time.time()}
226228
if 'first_attempt' in kwargs:
227229
payload['first_attempt'] = kwargs['first_attempt']
228230
self.push(queue, payload)
@@ -356,7 +358,8 @@ def _enqueue(cls, klass, *args):
356358
_self = cls()
357359
if queue:
358360
class_name = '%s.%s' % (klass.__module__, klass.__name__)
359-
_self.push(queue, {'class':class_name,'args':args})
361+
_self.push(queue, {'class':class_name,'args':args,
362+
'enqueue_timestamp': time.time()})
360363

361364
@staticmethod
362365
def _current_time():

pyres/job.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import time
12
from datetime import timedelta
23
from pyres import ResQ, safe_str_to_class
34
from pyres import failure
@@ -31,6 +32,8 @@ def __init__(self, queue, payload, resq, worker=None):
3132
self.resq = resq
3233
self._worker = worker
3334

35+
self.enqueue_timestamp = self._payload.get("enqueue_timestamp")
36+
3437
# Set the default back end, jobs can override when we import them
3538
# inside perform().
3639
failure.backend = RedisBackend
@@ -43,6 +46,20 @@ def perform(self):
4346
"""This method converts payload into args and calls the ``perform``
4447
method on the payload class.
4548
49+
Before calling ``perform``, a ``before_perform`` class method
50+
is called, if it exists. It takes a dictionary as an argument;
51+
currently the only things stored on the dictionary are the
52+
args passed into ``perform`` and a timestamp of when the job
53+
was enqueued.
54+
55+
Similarly, an ``after_perform`` class method is called after
56+
``perform`` is finished. The metadata dictionary contains the
57+
same data, plus a timestamp of when the job was performed, a
58+
``failed`` boolean value, and if it did fail, a ``retried``
59+
boolean value. This method is called after retry, and is
60+
called regardless of whether an exception is ultimately thrown
61+
by the perform method.
62+
4663
#@ add entry_point loading
4764
4865
"""
@@ -51,11 +68,29 @@ def perform(self):
5168
payload_class.resq = self.resq
5269
args = self._payload.get("args")
5370

71+
metadata = dict(args=args)
72+
if self.enqueue_timestamp:
73+
metadata["enqueue_timestamp"] = self.enqueue_timestamp
74+
75+
before_perform = getattr(payload_class, "before_perform", None)
76+
if before_perform:
77+
before_perform(metadata)
78+
79+
metadata["failed"] = False
80+
metadata["perform_timestamp"] = time.time()
5481
try:
5582
return payload_class.perform(*args)
5683
except:
84+
metadata["failed"] = True
5785
if not self.retry(payload_class, args):
86+
metadata["retried"] = False
5887
raise
88+
else:
89+
metadata["retried"] = True
90+
finally:
91+
after_perform = getattr(payload_class, "after_perform", None)
92+
if after_perform:
93+
after_perform(metadata)
5994

6095
def fail(self, exception):
6196
"""This method provides a way to fail a job and will use whatever

0 commit comments

Comments
 (0)