Skip to content

Commit 1b692b2

Browse files
author
Matt George
committed
Merge branch 'retry_queues'
2 parents df8c76a + 85de502 commit 1b692b2

File tree

6 files changed

+112
-13
lines changed

6 files changed

+112
-13
lines changed

pyres/__init__.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,11 @@ def enqueue(self, klass, *args):
142142
else:
143143
logging.warning("unable to enqueue job with class %s" % str(klass))
144144

145-
def enqueue_from_string(self, klass_as_string, queue, *args):
146-
self.push(queue, {'class':klass_as_string,'args':args})
145+
def enqueue_from_string(self, klass_as_string, queue, *args, **kwargs):
146+
payload = {'class':klass_as_string, 'queue': queue, 'args':args}
147+
if 'first_attempt' in kwargs:
148+
payload['first_attempt'] = kwargs['first_attempt']
149+
self.push(queue, payload)
147150
logging.info("enqueued '%s' job" % klass_as_string)
148151
if args:
149152
logging.debug("job arguments: %s" % str(args))
@@ -201,12 +204,15 @@ def close(self):
201204
"""
202205
self.redis.disconnect()
203206

204-
def enqueue_at(self, datetime, klass, *args):
207+
def enqueue_at(self, datetime, klass, *args, **kwargs):
205208
class_name = '%s.%s' % (klass.__module__, klass.__name__)
206209
logging.info("enqueued '%s' job for execution at %s" % (class_name, datetime))
207210
if args:
208211
logging.debug("job arguments are: %s" % str(args))
209-
self.delayed_push(datetime, {'class':class_name,'queue': klass.queue, 'args':args})
212+
payload = {'class':class_name, 'queue': klass.queue, 'args':args}
213+
if 'first_attempt' in kwargs:
214+
payload['first_attempt'] = kwargs['first_attempt']
215+
self.delayed_push(datetime, payload)
210216

211217
def delayed_push(self, datetime, item):
212218
key = int(time.mktime(datetime.timetuple()))
@@ -227,7 +233,7 @@ def delayed_timestamp_size(self, timestamp):
227233
return self.redis.llen("resque:delayed:%s" % timestamp)
228234

229235
def next_delayed_timestamp(self):
230-
key = int(time.mktime(datetime.datetime.now().timetuple()))
236+
key = int(time.mktime(ResQ._current_time().timetuple()))
231237
array = self.redis.zrangebyscore('resque:delayed_queue_schedule', '-inf', key)
232238
timestamp = None
233239
if array:
@@ -265,6 +271,10 @@ def _enqueue(cls, klass, *args):
265271
class_name = '%s.%s' % (klass.__module__, klass.__name__)
266272
_self.push(queue, {'class':class_name,'args':args})
267273

274+
@staticmethod
275+
def _current_time():
276+
return datetime.datetime.now()
277+
268278

269279
class Stat(object):
270280
"""A Stat class which shows the current status of the queue.

pyres/job.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from datetime import datetime, timedelta
12
from pyres import ResQ, str_to_class, safe_str_to_class
23
from pyres import failure
34

@@ -35,10 +36,11 @@ def perform(self):
3536
payload_class = safe_str_to_class(payload_class_str)
3637
payload_class.resq = self.resq
3738
args = self._payload.get("args", None)
38-
if args:
39+
try:
3940
return payload_class.perform(*args)
40-
else:
41-
return payload_class.perform()
41+
except:
42+
if not self.retry(payload_class, args):
43+
raise
4244

4345
def fail(self, exception):
4446
"""This method provides a way to fail a job and will use whatever failure backend
@@ -48,7 +50,22 @@ def fail(self, exception):
4850
fail = failure.create(exception, self._queue, self._payload, self._worker)
4951
fail.save(self.resq)
5052
return fail
51-
53+
54+
def retry(self, payload_class, args):
55+
retry_every = getattr(payload_class, 'retry_every', None)
56+
retry_timeout = getattr(payload_class, 'retry_timeout', 0)
57+
58+
if retry_every:
59+
now = ResQ._current_time()
60+
first_attempt = self._payload.get("first_attempt", now)
61+
retry_until = first_attempt + timedelta(seconds=retry_timeout)
62+
retry_at = now + timedelta(seconds=retry_every)
63+
if retry_at < retry_until:
64+
self.resq.enqueue_at(retry_at, payload_class, *args,
65+
**{'first_attempt':first_attempt})
66+
return True
67+
return False
68+
5269
@classmethod
5370
def reserve(cls, queue, res, worker=None):
5471
"""Reserve a job on the queue. This marks this job so that other workers

pyres/scheduler.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,15 @@ def handle_delayed_items(self):
6767
klass = item['class']
6868
queue = item['queue']
6969
args = item['args']
70-
self.resq.enqueue_from_string(klass, queue, *args)
70+
kwargs = {}
71+
if 'first_attempt' in item:
72+
kwargs['first_attempt'] = item['first_attempt']
73+
self.resq.enqueue_from_string(klass, queue, *args, **kwargs)
7174

7275

7376
@classmethod
7477
def run(cls, server, password=None):
7578
sched = cls(server=server, password=password)
7679
sched()
7780

78-
81+

pyres/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ def process(self, job=None):
154154
job = self.reserve()
155155
try:
156156
self.working_on(job)
157-
job.perform()
157+
return job.perform()
158158
except Exception, e:
159159
exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
160160
logging.error("%s failed: %s" % (job, e))

tests/__init__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,20 @@ class ReturnAllArgsJob(object):
2525
def perform(*args):
2626
return args
2727

28+
29+
class RetryOnExceptionJob(object):
30+
queue = 'basic'
31+
retry_every = 5
32+
retry_timeout = 15
33+
34+
@staticmethod
35+
def perform(fail_until):
36+
if ResQ._current_time() < fail_until:
37+
raise Exception("Don't blame me! I'm supposed to fail!")
38+
else:
39+
return True
40+
41+
2842
class TestProcess(object):
2943
queue = 'high'
3044

tests/test_worker.py

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1-
from tests import PyResTests, Basic, TestProcess, ErrorObject
1+
from tests import PyResTests, Basic, TestProcess, ErrorObject, RetryOnExceptionJob
22
from pyres import ResQ
33
from pyres.job import Job
4+
from pyres.scheduler import Scheduler
45
from pyres.worker import Worker
56
import os
67
import time
8+
import datetime
9+
10+
711
class WorkerTests(PyResTests):
812
def test_worker_init(self):
913
from pyres.exceptions import NoQueueError
@@ -173,3 +177,54 @@ def test_prune_dead_workers(self):
173177
# the assertion below should hold, because the workers we registered above are on a
174178
# different host, and thus should not be pruned by this process
175179
assert self.redis.scard('resque:workers') == 3
180+
181+
def test_retry_on_exception(self):
182+
now = datetime.datetime.now()
183+
self.set_current_time(now)
184+
worker = Worker(['basic'])
185+
scheduler = Scheduler()
186+
187+
# queue up a job that will fail for 30 seconds
188+
self.resq.enqueue(RetryOnExceptionJob,
189+
now + datetime.timedelta(seconds=30))
190+
worker.process()
191+
assert worker.get_failed() == 0
192+
193+
# check it retries the first time
194+
self.set_current_time(now + datetime.timedelta(seconds=5))
195+
scheduler.handle_delayed_items()
196+
assert None == worker.process()
197+
assert worker.get_failed() == 0
198+
199+
# check it runs fine when it's stopped crashing
200+
self.set_current_time(now + datetime.timedelta(seconds=60))
201+
scheduler.handle_delayed_items()
202+
assert True == worker.process()
203+
assert worker.get_failed() == 0
204+
205+
def test_retries_give_up_eventually(self):
206+
now = datetime.datetime.now()
207+
self.set_current_time(now)
208+
worker = Worker(['basic'])
209+
scheduler = Scheduler()
210+
211+
# queue up a job that will fail for 60 seconds
212+
self.resq.enqueue(RetryOnExceptionJob,
213+
now + datetime.timedelta(seconds=60))
214+
worker.process()
215+
assert worker.get_failed() == 0
216+
217+
# check it retries the first time
218+
self.set_current_time(now + datetime.timedelta(seconds=5))
219+
scheduler.handle_delayed_items()
220+
assert None == worker.process()
221+
assert worker.get_failed() == 0
222+
223+
# check it fails when we've been trying too long
224+
self.set_current_time(now + datetime.timedelta(seconds=20))
225+
scheduler.handle_delayed_items()
226+
assert None == worker.process()
227+
assert worker.get_failed() == 1
228+
229+
def set_current_time(self, time):
230+
ResQ._current_time = staticmethod(lambda: time)

0 commit comments

Comments
 (0)