Skip to content

Commit 5da5543

Browse files
committed
Merge pull request binarymatt#95 from joeshaw/joeshaw-timestamps
add enqueue & perform timestamps, add job hooks for accessing them
2 parents 5dd360a + 7867e34 commit 5da5543

File tree

2 files changed

+41
-3
lines changed

2 files changed

+41
-3
lines changed

pyres/__init__.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,8 @@ def enqueue(self, klass, *args):
212212
queue = getattr(klass,'queue', None)
213213
if queue:
214214
class_name = '%s.%s' % (klass.__module__, klass.__name__)
215-
self.push(queue, {'class':class_name,'args':args})
215+
self.push(queue, {'class':class_name,'args':args,
216+
'enqueue_timestamp':time.time()})
216217
logging.info("enqueued '%s' job on queue %s" % (class_name, queue))
217218
if args:
218219
logging.debug("job arguments: %s" % str(args))
@@ -222,7 +223,8 @@ def enqueue(self, klass, *args):
222223
logging.warning("unable to enqueue job with class %s" % str(klass))
223224

224225
def enqueue_from_string(self, klass_as_string, queue, *args, **kwargs):
225-
payload = {'class':klass_as_string, 'queue': queue, 'args':args}
226+
payload = {'class':klass_as_string, 'queue': queue, 'args':args,
227+
'enqueue_timestamp':time.time()}
226228
if 'first_attempt' in kwargs:
227229
payload['first_attempt'] = kwargs['first_attempt']
228230
self.push(queue, payload)
@@ -356,7 +358,8 @@ def _enqueue(cls, klass, *args):
356358
_self = cls()
357359
if queue:
358360
class_name = '%s.%s' % (klass.__module__, klass.__name__)
359-
_self.push(queue, {'class':class_name,'args':args})
361+
_self.push(queue, {'class':class_name,'args':args,
362+
'enqueue_timestamp': time.time()})
360363

361364
@staticmethod
362365
def _current_time():

pyres/job.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import time
12
from datetime import timedelta
23
from pyres import ResQ, safe_str_to_class
34
from pyres import failure
@@ -31,6 +32,8 @@ def __init__(self, queue, payload, resq, worker=None):
3132
self.resq = resq
3233
self._worker = worker
3334

35+
self.enqueue_timestamp = self._payload.get("enqueue_timestamp")
36+
3437
# Set the default back end, jobs can override when we import them
3538
# inside perform().
3639
failure.backend = RedisBackend
@@ -43,6 +46,20 @@ def perform(self):
4346
"""This method converts payload into args and calls the ``perform``
4447
method on the payload class.
4548
49+
Before calling ``perform``, a ``before_perform`` class method
50+
is called, if it exists. It takes a dictionary as an argument;
51+
currently the only things stored on the dictionary are the
52+
args passed into ``perform`` and a timestamp of when the job
53+
was enqueued.
54+
55+
Similarly, an ``after_perform`` class method is called after
56+
``perform`` is finished. The metadata dictionary contains the
57+
same data, plus a timestamp of when the job was performed, a
58+
``failed`` boolean value, and if it did fail, a ``retried``
59+
boolean value. This method is called after retry, and is
60+
called regardless of whether an exception is ultimately thrown
61+
by the perform method.
62+
4663
#@ add entry_point loading
4764
4865
"""
@@ -51,11 +68,29 @@ def perform(self):
5168
payload_class.resq = self.resq
5269
args = self._payload.get("args")
5370

71+
metadata = dict(args=args)
72+
if self.enqueue_timestamp:
73+
metadata["enqueue_timestamp"] = self.enqueue_timestamp
74+
75+
before_perform = getattr(payload_class, "before_perform", None)
76+
if before_perform:
77+
before_perform(metadata)
78+
79+
metadata["failed"] = False
80+
metadata["perform_timestamp"] = time.time()
5481
try:
5582
return payload_class.perform(*args)
5683
except:
84+
metadata["failed"] = True
5785
if not self.retry(payload_class, args):
86+
metadata["retried"] = False
5887
raise
88+
else:
89+
metadata["retried"] = True
90+
finally:
91+
after_perform = getattr(payload_class, "after_perform", None)
92+
if after_perform:
93+
after_perform(metadata)
5994

6095
def fail(self, exception):
6196
"""This method provides a way to fail a job and will use whatever

0 commit comments

Comments
 (0)