Skip to content

Commit 48c1054

Browse files
committed
Merge branch 'feature/failure-backends' into develop
2 parents 89b05b9 + 4ac39d5 commit 48c1054

File tree

5 files changed

+204
-30
lines changed

5 files changed

+204
-30
lines changed

pyres/failure/__init__.py

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,28 @@
1-
import datetime
2-
from pyres import ResQ
3-
import sys, traceback
41
from pyres.failure.redis import RedisBackend
52

6-
_backend = RedisBackend
3+
backend = RedisBackend
74

85
def create(*args, **kwargs):
9-
return _backend(*args, **kwargs)
6+
return backend(*args, **kwargs)
107

118
def count(resq):
12-
return _backend.count(resq)
9+
return backend.count(resq)
1310

1411
def all(resq, start, count):
15-
return _backend.all(resq, start, count)
12+
return backend.all(resq, start, count)
1613

1714
def clear(resq):
18-
return _backend.clear(resq)
15+
return backend.clear(resq)
1916

2017
def requeue(resq, failure_object):
2118
queue = failure_object._queue
2219
payload = failure_object._payload
2320
return resq.push(queue, payload)
24-
21+
2522
def retry(resq, queue, payload):
2623
job = resq.decode(payload)
2724
resq.push(queue, job['payload'])
2825
return delete(resq, payload)
29-
26+
3027
def delete(resq, payload):
31-
return resq.redis.lrem(name = 'resque:failed', num = 1, value = payload)
32-
28+
return resq.redis.lrem(name='resque:failed', num=1, value=payload)

pyres/failure/mail.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import smtplib
2+
3+
from textwrap import dedent
4+
from email.mime.text import MIMEText
5+
6+
from base import BaseBackend
7+
8+
class MailBackend(BaseBackend):
9+
"""Extends ``BaseBackend`` to provide support for emailing failures.
10+
Intended to be used with the MultipleBackend:
11+
12+
from pyres import failure
13+
14+
from pyres.failure.mail import MailBackend
15+
from pyres.failure.multiple import MultipleBackend
16+
from pyres.failure.redis import RedisBackend
17+
18+
class EmailFailure(MailBackend):
19+
subject = 'Pyres Failure on {queue}'
20+
from_user = 'My Email User <[email protected]>'
21+
recipients = ['Me <[email protected]>']
22+
23+
smtp_host = 'mail.mydomain.tld'
24+
smtp_port = 25
25+
smtp_tls = True
26+
27+
smtp_user = 'mailuser'
28+
smtp_password = 'm41lp455w0rd'
29+
30+
failure.backend = MultipleBackend
31+
failure.backend.classes = [RedisBackend, EmailFailure]
32+
33+
34+
Additional notes:
35+
- The following tokens are available in subject: queue, worker, exception
36+
37+
- Override the create_message method to provide an alternate body. It
38+
should return one of the message types from email.mime.*
39+
"""
40+
subject = 'Pyres Failure on {queue}'
41+
42+
recipients = []
43+
from_user = None
44+
smtp_host = None
45+
smtp_port = 25
46+
47+
smtp_tls = False
48+
49+
smtp_user = None
50+
smtp_password = None
51+
52+
def save(self, resq=None):
53+
if not self.recipients or not self.smtp_host or not self.from_user:
54+
return
55+
56+
message = self.create_message()
57+
subject = self.format_subject()
58+
59+
message['Subject'] = subject
60+
message['From'] = self.from_user
61+
message['To'] = ", ".join(self.recipients)
62+
63+
self.send_message(message)
64+
65+
def format_subject(self):
66+
return self.subject.format(queue=self._queue,
67+
worker=self._worker,
68+
exception=self._exception)
69+
70+
def create_message(self):
71+
"""Returns a message body to send in this email. Should be from email.mime.*"""
72+
73+
body = dedent("""\
74+
Received exception {exception} on {queue} from worker {worker}:
75+
76+
{traceback}
77+
78+
Payload:
79+
{payload}
80+
81+
""").format(exception=self._exception,
82+
traceback=self._traceback,
83+
queue=self._queue,
84+
payload=self._payload,
85+
worker=self._worker)
86+
87+
return MIMEText(body)
88+
89+
def send_message(self, message):
90+
smtp = smtplib.SMTP(self.smtp_host, self.smtp_port)
91+
92+
try:
93+
smtp.ehlo()
94+
95+
if self.smtp_tls:
96+
smtp.starttls()
97+
98+
if self.smtp_user:
99+
smtp.login(self.smtp_user, self.smtp_password)
100+
101+
smtp.sendmail(self.from_user, self.recipients, message.as_string())
102+
finally:
103+
smtp.close()

pyres/failure/multiple.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
from base import BaseBackend
2+
from redis import RedisBackend
3+
4+
class MultipleBackend(BaseBackend):
5+
"""Extends ``BaseBackend`` to provide support for delegating calls to multiple
6+
backends. Queries are delegated to the first backend in the list. Defaults to
7+
only the RedisBackend.
8+
9+
To use:
10+
11+
from pyres import failure
12+
13+
from pyres.failure.base import BaseBackend
14+
from pyres.failure.multiple import MultipleBackend
15+
from pyres.failure.redis import RedisBackend
16+
17+
class CustomBackend(BaseBackend):
18+
def save(self, resq):
19+
print('Custom backend')
20+
21+
failure.backend = MultipleBackend
22+
failure.backend.classes = [RedisBackend, CustomBackend]
23+
"""
24+
classes = []
25+
26+
def __init__(self, *args):
27+
if not self.classes:
28+
self.classes = [RedisBackend]
29+
30+
self.backends = [klass(*args) for klass in self.classes]
31+
BaseBackend.__init__(self, *args)
32+
33+
@classmethod
34+
def count(cls, resq):
35+
first = MultipleBackend.classes[0]
36+
return first.count(resq)
37+
38+
@classmethod
39+
def all(cls, resq, start=0, count=1):
40+
first = MultipleBackend.classes[0]
41+
return first.all(resq, start, count)
42+
43+
@classmethod
44+
def clear(cls, resq):
45+
first = MultipleBackend.classes[0]
46+
return first.clear(resq)
47+
48+
def save(self, resq=None):
49+
map(lambda x: x.save(resq), self.backends)

pyres/job.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,41 @@
11
from datetime import datetime, timedelta
22
from pyres import ResQ, str_to_class, safe_str_to_class
33
from pyres import failure
4+
from pyres.failure.redis import RedisBackend
45

56
class Job(object):
67
"""Every job on the ResQ is an instance of the *Job* class.
7-
8+
89
The ``__init__`` takes these keyword arguments:
9-
10+
1011
``queue`` -- A string defining the queue to which this Job will be added.
11-
12+
1213
``payload`` -- A dictionary which contains the string name of a class which extends this Job and
1314
a list of args which will be passed to that class.
14-
15+
1516
``resq`` -- An instance of the ResQ class.
16-
17+
1718
``worker`` -- The name of a specific worker if you'd like this Job to be done by that worker. Default is "None".
18-
19+
1920
"""
2021
def __init__(self, queue, payload, resq, worker=None):
2122
self._queue = queue
2223
self._payload = payload
2324
self.resq = resq
2425
self._worker = worker
25-
26+
27+
# Set the default back end, jobs can override when we import them
28+
# inside perform().
29+
failure.backend = RedisBackend
30+
2631
def __str__(self):
2732
return "(Job{%s} | %s | %s)" % (
2833
self._queue, self._payload['class'], repr(self._payload['args']))
29-
34+
3035
def perform(self):
3136
"""This method converts payload into args and calls the ``perform`` method
3237
on the payload class.
33-
38+
3439
"""
3540
payload_class_str = self._payload["class"]
3641
payload_class = safe_str_to_class(payload_class_str)
@@ -41,11 +46,11 @@ def perform(self):
4146
except:
4247
if not self.retry(payload_class, args):
4348
raise
44-
49+
4550
def fail(self, exception):
4651
"""This method provides a way to fail a job and will use whatever failure backend
4752
you've provided. The default is the ``RedisBackend``.
48-
53+
4954
"""
5055
fail = failure.create(exception, self._queue, self._payload, self._worker)
5156
fail.save(self.resq)
@@ -70,7 +75,7 @@ def retry(self, payload_class, args):
7075
def reserve(cls, queue, res, worker=None):
7176
"""Reserve a job on the queue. This marks this job so that other workers
7277
will not pick it up.
73-
78+
7479
"""
7580
payload = res.pop(queue)
7681
if payload:

tests/test_failure.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,35 @@
11
from tests import PyResTests, Basic, TestProcess, ErrorObject
22
from pyres import failure
3+
from pyres.failure.base import BaseBackend
4+
from pyres.failure.multiple import MultipleBackend
5+
from pyres.failure.redis import RedisBackend
36
from pyres.job import Job
7+
48
class FailureTests(PyResTests):
59
def test_count(self):
610
self.resq.enqueue(Basic,"test1")
711
job = Job.reserve('basic',self.resq)
812
job.fail("problem")
913
assert failure.count(self.resq) == 1
1014
assert self.redis.llen('resque:failed') == 1
11-
15+
1216
def test_create(self):
1317
self.resq.enqueue(Basic,"test1")
1418
job = Job.reserve('basic',self.resq)
1519
e = Exception('test')
1620
fail = failure.create(e, 'basic', job._payload)
1721
assert isinstance(fail._payload, dict)
18-
fail.save()
22+
fail.save(self.resq)
1923
assert failure.count(self.resq) == 1
2024
assert self.redis.llen('resque:failed') == 1
21-
25+
2226
def test_all(self):
2327
self.resq.enqueue(Basic,"test1")
2428
job = Job.reserve('basic',self.resq)
2529
e = Exception('problem')
2630
job.fail(e)
2731
assert len(failure.all(self.resq, 0, 20)) == 1
28-
32+
2933
def test_clear(self):
3034
self.resq.enqueue(Basic,"test1")
3135
job = Job.reserve('basic',self.resq)
@@ -34,7 +38,7 @@ def test_clear(self):
3438
assert self.redis.llen('resque:failed') == 1
3539
failure.clear(self.resq)
3640
assert self.redis.llen('resque:failed') == 0
37-
41+
3842
def test_requeue(self):
3943
self.resq.enqueue(Basic,"test1")
4044
job = Job.reserve('basic',self.resq)
@@ -45,4 +49,21 @@ def test_requeue(self):
4549
assert self.resq.size('basic') == 1
4650
job = Job.reserve('basic',self.resq)
4751
assert job._queue == 'basic'
48-
assert job._payload == {'class':'tests.Basic','args':['test1']}
52+
assert job._payload == {'class':'tests.Basic','args':['test1']}
53+
54+
# Test the MultipleBackend, basically just repeat the above tests, ensuring that
55+
# we've delegated to the methods appropriately
56+
57+
class TestBackend(BaseBackend):
58+
def save(self, resq):
59+
resq.redis.set('testbackend:called', 1)
60+
61+
failure.backend = MultipleBackend
62+
failure.backend.classes = [RedisBackend, TestBackend]
63+
64+
class MultipleFailureTests(FailureTests):
65+
def test_create(self):
66+
# Run the parent test
67+
FailureTests.test_create(self)
68+
# But also ensure the other backends were called
69+
assert int(self.redis.get('testbackend:called')) == 1

0 commit comments

Comments
 (0)