Skip to content

Commit 571c160

Browse files
author
Matt George
committed
Merge commit 'ee677c0236f116847f7e6852cedb9d21c6b90f76' into refactor_merge
Conflicts: HISTORY.md pyres/__init__.py
2 parents 3759f82 + ee677c0 commit 571c160

File tree

4 files changed

+41
-12
lines changed

4 files changed

+41
-12
lines changed

pyres/__init__.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,28 @@
44
import simplejson
55

66
import types
7+
8+
def my_import(name):
9+
mod = __import__(name)
10+
components = name.split('.')
11+
for comp in components[1:]:
12+
mod = getattr(mod, comp)
13+
return mod
14+
15+
def safe_str_to_class(s):
16+
lst = s.split(".")
17+
klass = lst[-1]
18+
mod_list = lst[:-1]
19+
module = ".".join(mod_list)
20+
try:
21+
mod = my_import(module)
22+
if hasattr(mod, klass):
23+
return getattr(mod, klass)
24+
else:
25+
return None
26+
except ImportError:
27+
return None
28+
729
def str_to_class(s):
830
lst = s.split(".")
931
klass = lst[-1]
@@ -20,7 +42,10 @@ def str_to_class(s):
2042

2143
class ResQ(object):
2244

23-
def __init__(self, server="localhost:6379", password=None):
45+
def __init__(self, server="localhost:6379", password=None,
46+
timeout=None, retry_connection=True):
47+
self.timeout = timeout
48+
self.retry_connection = retry_connection
2449
self.redis = server
2550
if password:
2651
self.redis.auth(password)
@@ -63,7 +88,9 @@ def _set_redis(self, server):
6388
if isinstance(server, basestring):
6489
self.dsn = server
6590
host, port = server.split(':')
66-
self._redis = Redis(host=host, port=int(port))
91+
self._redis = Redis(host=host, port=int(port),
92+
retry_connection=self.retry_connection,
93+
timeout=self.timeout)
6794
elif isinstance(server, Redis):
6895
self.dsn = '%s:%s' % (server.host,server.port)
6996
self._redis = server

pyres/failure.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22
from pyres import ResQ
33
import sys, traceback
44
class Failure(object):
5-
def __init__(self, exp, queue, payload):
5+
def __init__(self, exp, worker, queue, payload):
66
excc, _, tb = sys.exc_info()
77

88
self._exception = excc
99
self._traceback = tb
10-
#self._worker = worker
10+
self._worker = worker
1111
self._queue = queue
1212
self._payload = payload
1313

@@ -30,7 +30,8 @@ def save(self, resq):
3030
'payload' : self._payload,
3131
'error' : self._parse_message(self._exception),
3232
'backtrace' : self._parse_traceback(self._traceback),
33-
'queue' : self._queue
33+
'queue' : self._queue,
34+
'worker' : self._worker
3435
}
3536
data = ResQ.encode(data)
3637
resq.redis.push('resque:failed', data)

pyres/job.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1-
from pyres import ResQ, str_to_class
1+
from pyres import ResQ, str_to_class, safe_str_to_class
22
from pyres.failure import Failure
33
class Job(object):
4-
def __init__(self, queue, payload, resq):
4+
def __init__(self, queue, payload, resq, worker):
55
self._queue = queue
66
self._payload = payload
77
self.resq = resq
8+
self._worker = worker
89

910
def perform(self):
1011
payload_class_str = self._payload["class"]
11-
payload_class = str_to_class(payload_class_str)
12+
payload_class = safe_str_to_class(payload_class_str)
1213
args = self._payload.get("args", None)
1314
if args:
1415
return payload_class.perform(*args)
@@ -17,11 +18,11 @@ def perform(self):
1718

1819
def fail(self, exception):
1920
#Failure.create(exception)
20-
failure = Failure(exception, self._queue, self._payload)
21+
failure = Failure(exception, self._worker, self._queue, self._payload)
2122
failure.save(self.resq)
2223

2324
@classmethod
24-
def reserve(cls, queue, res):
25+
def reserve(cls, queue, res, worker):
2526
payload = res.pop(queue)
2627
if payload:
27-
return cls(queue, payload, res)
28+
return cls(queue, payload, res, worker)

pyres/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ def process(self, job=None):
129129
def reserve(self):
130130
for q in self.queues:
131131
print "Checking %s" % q
132-
job = Job.reserve(q, self.resq)
132+
job = Job.reserve(q, self.resq, self.__str__())
133133
if job:
134134
print "Found job on %s" % q
135135
return job

0 commit comments

Comments
 (0)