Skip to content

Commit edfb5e6

Browse files
Kevin McConnellMatt George
authored andcommitted
Allow jobs to specify retry settings
Jobs can specify how often, and for how long, they should be automatically retried before being considered an error. This change relies on the scheduler service to perform the actual retrying. An example class: class ExampleJob(object): queue = 'basic' retry_every = 30 # retry every 30 seconds retry_timeout = 600 # give up trying after 10 minutes @staticmethod def perform(): # do something...
1 parent df8c76a commit edfb5e6

File tree

6 files changed

+112
-13
lines changed

6 files changed

+112
-13
lines changed

pyres/__init__.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,11 @@ def enqueue(self, klass, *args):
142142
else:
143143
logging.warning("unable to enqueue job with class %s" % str(klass))
144144

145-
def enqueue_from_string(self, klass_as_string, queue, *args):
146-
self.push(queue, {'class':klass_as_string,'args':args})
145+
def enqueue_from_string(self, klass_as_string, queue, *args, **kwargs):
146+
payload = {'class':klass_as_string, 'queue': queue, 'args':args}
147+
if 'first_attempt' in kwargs:
148+
payload['first_attempt'] = kwargs['first_attempt']
149+
self.push(queue, payload)
147150
logging.info("enqueued '%s' job" % klass_as_string)
148151
if args:
149152
logging.debug("job arguments: %s" % str(args))
@@ -201,12 +204,15 @@ def close(self):
201204
"""
202205
self.redis.disconnect()
203206

204-
def enqueue_at(self, datetime, klass, *args):
207+
def enqueue_at(self, datetime, klass, *args, **kwargs):
205208
class_name = '%s.%s' % (klass.__module__, klass.__name__)
206209
logging.info("enqueued '%s' job for execution at %s" % (class_name, datetime))
207210
if args:
208211
logging.debug("job arguments are: %s" % str(args))
209-
self.delayed_push(datetime, {'class':class_name,'queue': klass.queue, 'args':args})
212+
payload = {'class':class_name, 'queue': klass.queue, 'args':args}
213+
if 'first_attempt' in kwargs:
214+
payload['first_attempt'] = kwargs['first_attempt']
215+
self.delayed_push(datetime, payload)
210216

211217
def delayed_push(self, datetime, item):
212218
key = int(time.mktime(datetime.timetuple()))
@@ -227,7 +233,7 @@ def delayed_timestamp_size(self, timestamp):
227233
return self.redis.llen("resque:delayed:%s" % timestamp)
228234

229235
def next_delayed_timestamp(self):
230-
key = int(time.mktime(datetime.datetime.now().timetuple()))
236+
key = int(time.mktime(ResQ._current_time().timetuple()))
231237
array = self.redis.zrangebyscore('resque:delayed_queue_schedule', '-inf', key)
232238
timestamp = None
233239
if array:
@@ -265,6 +271,10 @@ def _enqueue(cls, klass, *args):
265271
class_name = '%s.%s' % (klass.__module__, klass.__name__)
266272
_self.push(queue, {'class':class_name,'args':args})
267273

274+
@staticmethod
275+
def _current_time():
276+
return datetime.datetime.now()
277+
268278

269279
class Stat(object):
270280
"""A Stat class which shows the current status of the queue.

pyres/job.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from datetime import datetime, timedelta
12
from pyres import ResQ, str_to_class, safe_str_to_class
23
from pyres import failure
34

@@ -35,10 +36,11 @@ def perform(self):
3536
payload_class = safe_str_to_class(payload_class_str)
3637
payload_class.resq = self.resq
3738
args = self._payload.get("args", None)
38-
if args:
39+
try:
3940
return payload_class.perform(*args)
40-
else:
41-
return payload_class.perform()
41+
except:
42+
if not self.retry(payload_class, args):
43+
raise
4244

4345
def fail(self, exception):
4446
"""This method provides a way to fail a job and will use whatever failure backend
@@ -48,7 +50,22 @@ def fail(self, exception):
4850
fail = failure.create(exception, self._queue, self._payload, self._worker)
4951
fail.save(self.resq)
5052
return fail
51-
53+
54+
def retry(self, payload_class, args):
55+
retry_every = getattr(payload_class, 'retry_every', None)
56+
retry_timeout = getattr(payload_class, 'retry_timeout', 0)
57+
58+
if retry_every:
59+
now = ResQ._current_time()
60+
first_attempt = self._payload.get("first_attempt", now)
61+
retry_until = first_attempt + timedelta(seconds=retry_timeout)
62+
retry_at = now + timedelta(seconds=retry_every)
63+
if retry_at < retry_until:
64+
self.resq.enqueue_at(ResQ._current_time(), payload_class, *args,
65+
**{'first_attempt':first_attempt})
66+
return True
67+
return False
68+
5269
@classmethod
5370
def reserve(cls, queue, res, worker=None):
5471
"""Reserve a job on the queue. This marks this job so that other workers

pyres/scheduler.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,15 @@ def handle_delayed_items(self):
6767
klass = item['class']
6868
queue = item['queue']
6969
args = item['args']
70-
self.resq.enqueue_from_string(klass, queue, *args)
70+
kwargs = {}
71+
if 'first_attempt' in item:
72+
kwargs['first_attempt'] = item['first_attempt']
73+
self.resq.enqueue_from_string(klass, queue, *args, **kwargs)
7174

7275

7376
@classmethod
7477
def run(cls, server, password=None):
7578
sched = cls(server=server, password=password)
7679
sched()
7780

78-
81+

pyres/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ def process(self, job=None):
154154
job = self.reserve()
155155
try:
156156
self.working_on(job)
157-
job.perform()
157+
return job.perform()
158158
except Exception, e:
159159
exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
160160
logging.error("%s failed: %s" % (job, e))

tests/__init__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,20 @@ class ReturnAllArgsJob(object):
2525
def perform(*args):
2626
return args
2727

28+
29+
class RetryOnExceptionJob(object):
30+
queue = 'basic'
31+
retry_every = 5
32+
retry_timeout = 15
33+
34+
@staticmethod
35+
def perform(fail_until):
36+
if ResQ._current_time() < fail_until:
37+
raise Exception("Don't blame me! I'm supposed to fail!")
38+
else:
39+
return True
40+
41+
2842
class TestProcess(object):
2943
queue = 'high'
3044

tests/test_worker.py

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1-
from tests import PyResTests, Basic, TestProcess, ErrorObject
1+
from tests import PyResTests, Basic, TestProcess, ErrorObject, RetryOnExceptionJob
22
from pyres import ResQ
33
from pyres.job import Job
4+
from pyres.scheduler import Scheduler
45
from pyres.worker import Worker
56
import os
67
import time
8+
import datetime
9+
10+
711
class WorkerTests(PyResTests):
812
def test_worker_init(self):
913
from pyres.exceptions import NoQueueError
@@ -173,3 +177,54 @@ def test_prune_dead_workers(self):
173177
# the assertion below should hold, because the workers we registered above are on a
174178
# different host, and thus should not be pruned by this process
175179
assert self.redis.scard('resque:workers') == 3
180+
181+
def test_retry_on_exception(self):
182+
now = datetime.datetime.now()
183+
self.set_current_time(now)
184+
worker = Worker(['basic'])
185+
scheduler = Scheduler()
186+
187+
# queue up a job that will fail for 30 seconds
188+
self.resq.enqueue(RetryOnExceptionJob,
189+
now + datetime.timedelta(seconds=30))
190+
worker.process()
191+
assert worker.get_failed() == 0
192+
193+
# check it retries the first time
194+
self.set_current_time(now + datetime.timedelta(seconds=5))
195+
scheduler.handle_delayed_items()
196+
assert None == worker.process()
197+
assert worker.get_failed() == 0
198+
199+
# check it runs fine when it's stopped crashing
200+
self.set_current_time(now + datetime.timedelta(seconds=60))
201+
scheduler.handle_delayed_items()
202+
assert True == worker.process()
203+
assert worker.get_failed() == 0
204+
205+
def test_retries_give_up_eventually(self):
206+
now = datetime.datetime.now()
207+
self.set_current_time(now)
208+
worker = Worker(['basic'])
209+
scheduler = Scheduler()
210+
211+
# queue up a job that will fail for 60 seconds
212+
self.resq.enqueue(RetryOnExceptionJob,
213+
now + datetime.timedelta(seconds=60))
214+
worker.process()
215+
assert worker.get_failed() == 0
216+
217+
# check it retries the first time
218+
self.set_current_time(now + datetime.timedelta(seconds=5))
219+
scheduler.handle_delayed_items()
220+
assert None == worker.process()
221+
assert worker.get_failed() == 0
222+
223+
# check it fails when we've been trying too long
224+
self.set_current_time(now + datetime.timedelta(seconds=20))
225+
scheduler.handle_delayed_items()
226+
assert None == worker.process()
227+
assert worker.get_failed() == 1
228+
229+
def set_current_time(self, time):
230+
ResQ._current_time = staticmethod(lambda: time)

0 commit comments

Comments
 (0)