Skip to content

Commit 52a29d8

Browse files
author
Matt George
committed
Merge branch 'refactor_paths'
2 parents 3868f2c + 64ff2bb commit 52a29d8

40 files changed

+451
-390
lines changed

AUTHORS.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
## Authors
2+
* Matt George
3+
* Chris Song
4+
5+
Inspired by Resque, by Chris Wanstrath

HISTORY.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
## 0.2.1 (2009-12-10)
2+
3+
* updated setup.py
4+
* refactored package for better testing
5+
* resque namespacing by fakechris
6+
* smarter import from string by fakechris
7+
18
## 0.2.0 (2009-12-09)
29

310
* Better web interface via resweb

coverage.report

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
Name Stmts Exec Cover Missing
2+
------------------------------------------------
3+
pyres 138 131 94% 26, 39, 98, 133-134, 144-145
4+
pyres.exceptions 2 2 100%
5+
pyres.failure 23 22 95% 41
6+
pyres.job 23 23 100%
7+
pyres.worker 189 146 77% 66, 74, 84-112, 161, 179, 186, 230-241
8+
------------------------------------------------
9+
TOTAL 375 324 86%
10+
----------------------------------------------------------------------
11+
Ran 32 tests in 0.884s

src/pyres/__init__.py renamed to pyres/__init__.py

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,31 @@
1-
__version__ = '0.2.0'
1+
__version__ = '0.2.1'
22

33
from redis import Redis
44
import simplejson
55

66
import types
7+
8+
def my_import(name):
9+
mod = __import__(name)
10+
components = name.split('.')
11+
for comp in components[1:]:
12+
mod = getattr(mod, comp)
13+
return mod
14+
15+
def safe_str_to_class(s):
16+
lst = s.split(".")
17+
klass = lst[-1]
18+
mod_list = lst[:-1]
19+
module = ".".join(mod_list)
20+
try:
21+
mod = my_import(module)
22+
if hasattr(mod, klass):
23+
return getattr(mod, klass)
24+
else:
25+
return None
26+
except ImportError:
27+
return None
28+
729
def str_to_class(s):
830
lst = s.split(".")
931
klass = lst[-1]
@@ -20,32 +42,37 @@ def str_to_class(s):
2042

2143
class ResQ(object):
2244

23-
def __init__(self, server="localhost:6379"):
45+
def __init__(self, server="localhost:6379", password=None,
46+
timeout=None, retry_connection=True):
47+
self.timeout = timeout
48+
self.retry_connection = retry_connection
2449
self.redis = server
50+
if password:
51+
self.redis.auth(password)
2552
self._watched_queues = set()
2653

2754
def push(self, queue, item):
2855
self.watch_queue(queue)
29-
self.redis.push("queue:%s" % queue, ResQ.encode(item))
56+
self.redis.push("resque:queue:%s" % queue, ResQ.encode(item))
3057

3158
def pop(self, queue):
32-
ret = self.redis.pop("queue:%s" % queue)
59+
ret = self.redis.pop("resque:queue:%s" % queue)
3360
if ret:
3461
return ResQ.decode(ret)
3562
return ret
3663

3764
def size(self, queue):
38-
return int(self.redis.llen("queue:%s" % queue))
65+
return int(self.redis.llen("resque:queue:%s" % queue))
3966

4067
def watch_queue(self, queue):
4168
if queue in self._watched_queues:
4269
return
4370
else:
44-
if self.redis.sadd('queues',str(queue)):
71+
if self.redis.sadd('resque:queues',str(queue)):
4572
self._watched_queues.add(queue)
4673

4774
def peek(self, queue, start=0, count=1):
48-
return self.list_range('queue:%s' % queue, start, count)
75+
return self.list_range('resque:queue:%s' % queue, start, count)
4976

5077
def list_range(self, key, start, count):
5178
items = self.redis.lrange(key,start,start+count-1)
@@ -61,7 +88,9 @@ def _set_redis(self, server):
6188
if isinstance(server, basestring):
6289
self.dsn = server
6390
host, port = server.split(':')
64-
self._redis = Redis(host=host, port=int(port))
91+
self._redis = Redis(host=host, port=int(port),
92+
retry_connection=self.retry_connection,
93+
timeout=self.timeout)
6594
elif isinstance(server, Redis):
6695
self.dsn = '%s:%s' % (server.host,server.port)
6796
self._redis = server
@@ -81,7 +110,7 @@ def enqueue_from_string(self, klass_as_string, queue, *args):
81110
self.push(queue, {'class':klass_as_string,'args':args})
82111

83112
def queues(self):
84-
return self.redis.smembers("queues")
113+
return self.redis.smembers("resque:queues")
85114

86115
def info(self):
87116
pending = 0
@@ -118,8 +147,8 @@ def working(self):
118147
def remove_queue(self, queue):
119148
if queue in self._watched_queues:
120149
self._watched_queues.remove(queue)
121-
self.redis.srem('queues',queue)
122-
del self.redis['queue:%s' % queue]
150+
self.redis.srem('resque:queues',queue)
151+
del self.redis['resque:queue:%s' % queue]
123152

124153
@classmethod
125154
def encode(cls, item):
@@ -146,7 +175,7 @@ def _enqueue(cls, klass, *args):
146175
class Stat(object):
147176
def __init__(self, name, resq):
148177
self.name = name
149-
self.key = "stat:%s" % self.name
178+
self.key = "resque:stat:%s" % self.name
150179
self.resq = resq
151180

152181
def get(self):
File renamed without changes.

src/pyres/extensions.py renamed to pyres/extensions.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
except:
44
import sys
55
sys.exit("multiprocessing was not available")
6+
import os, datetime, time, signal
7+
from pyres import ResQ
68

79
from pyres.exceptions import NoQueueError
810
from pyres.worker import Worker

src/pyres/failure.py renamed to pyres/failure.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22
from pyres import ResQ
33
import sys, traceback
44
class Failure(object):
5-
def __init__(self, exp, queue, payload):
5+
def __init__(self, exp, queue, payload, worker=None):
66
excc, _, tb = sys.exc_info()
77

88
self._exception = excc
99
self._traceback = tb
10-
#self._worker = worker
10+
self._worker = worker
1111
self._queue = queue
1212
self._payload = payload
1313

@@ -30,13 +30,14 @@ def save(self, resq):
3030
'payload' : self._payload,
3131
'error' : self._parse_message(self._exception),
3232
'backtrace' : self._parse_traceback(self._traceback),
33-
'queue' : self._queue
33+
'queue' : self._queue,
34+
'worker' : self._worker
3435
}
3536
data = ResQ.encode(data)
36-
resq.redis.push('failed', data)
37+
resq.redis.push('resque:failed', data)
3738

3839
@classmethod
3940
def count(cls, resq):
40-
return int(resq.redis.llen('failed'))
41+
return int(resq.redis.llen('resque:failed'))
4142

4243

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1-
from pyres import ResQ, str_to_class
1+
from pyres import ResQ, str_to_class, safe_str_to_class
22
from pyres.failure import Failure
33
class Job(object):
4-
def __init__(self, queue, payload, resq):
4+
def __init__(self, queue, payload, resq, worker=None):
55
self._queue = queue
66
self._payload = payload
77
self.resq = resq
8+
self._worker = worker
89

910
def perform(self):
1011
payload_class_str = self._payload["class"]
11-
payload_class = str_to_class(payload_class_str)
12+
payload_class = safe_str_to_class(payload_class_str)
1213
args = self._payload.get("args", None)
1314
if args:
1415
return payload_class.perform(*args)
@@ -17,11 +18,11 @@ def perform(self):
1718

1819
def fail(self, exception):
1920
#Failure.create(exception)
20-
failure = Failure(exception, self._queue, self._payload)
21+
failure = Failure(exception, self._queue, self._payload, self._worker)
2122
failure.save(self.resq)
2223

2324
@classmethod
24-
def reserve(cls, queue, res):
25+
def reserve(cls, queue, res, worker=None):
2526
payload = res.pop(queue)
2627
if payload:
27-
return cls(queue, payload, res)
28+
return cls(queue, payload, res, worker)

src/pyres/worker.py renamed to pyres/worker.py

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,16 @@
66
import os, sys
77
import time
88
import simplejson
9+
910
class Worker(object):
10-
def __init__(self, queues=[], server="localhost:6379"):
11+
def __init__(self, queues=[], server="localhost:6379", password=None):
1112
self.queues = queues
1213
self.validate_queues()
1314
self._shutdown = False
1415
self.child = None
1516
self.pid = os.getpid()
1617
if isinstance(server,basestring):
17-
self.resq = ResQ(server)
18+
self.resq = ResQ(server=server, password=password)
1819
elif isinstance(server, ResQ):
1920
self.resq = server
2021
else:
@@ -26,28 +27,29 @@ def validate_queues(self):
2627
raise NoQueueError("Please give each worker at least one queue.")
2728

2829
def register_worker(self):
29-
self.resq.redis.sadd('workers',str(self))
30+
self.resq.redis.sadd('resque:workers',str(self))
3031
#self.resq._redis.add("worker:#{self}:started", Time.now.to_s)
3132
self.started = datetime.datetime.now()
3233
#Stat.clear("processed:#{self}")
3334
#Stat.clear("failed:#{self}")
3435

3536
def _set_started(self, time):
3637
if time:
37-
self.resq.redis.set("worker:%s:started" % self, time.strftime('%Y-%m-%d %H:%M:%S'))
38+
self.resq.redis.set("resque:worker:%s:started" % self, time.strftime('%Y-%m-%d %H:%M:%S'))
3839
else:
39-
self.resq.redis.delete("worker:%s:started" % self)
40-
40+
self.resq.redis.delete("resque:worker:%s:started" % self)
41+
4142
def _get_started(self):
42-
datestring = self.resq.redis.get("worker:%s:started" % self)
43+
datestring = self.resq.redis.get("resque:worker:%s:started" % self)
4344
ds = None
4445
if datestring:
4546
ds = datetime.datetime.strptime(datestring, '%Y-%m-%d %H:%M:%S')
4647
return ds
48+
4749
started = property(_get_started, _set_started)
4850

4951
def unregister_worker(self):
50-
self.resq.redis.srem('workers',str(self))
52+
self.resq.redis.srem('resque:workers',str(self))
5153
self.started = None
5254

5355
def startup(self):
@@ -71,14 +73,13 @@ def kill_child(self, signum, frame):
7173
if self.child:
7274
print "Killing child at %s" % self.child
7375
os.kill(self.child, signal.SIGKILL)
74-
76+
7577
def __str__(self):
7678
if getattr(self,'id', None):
7779
return self.id
7880
hostname = os.uname()[1]
79-
#pid = os.getpid()
8081
return '%s:%s:%s' % (hostname, self.pid, ','.join(self.queues))
81-
82+
8283
def work(self, interval=5):
8384
self.startup()
8485
while True:
@@ -128,7 +129,7 @@ def process(self, job=None):
128129
def reserve(self):
129130
for q in self.queues:
130131
print "Checking %s" % q
131-
job = Job.reserve(q, self.resq)
132+
job = Job.reserve(q, self.resq, self.__str__())
132133
if job:
133134
print "Found job on %s" % q
134135
return job
@@ -141,14 +142,14 @@ def working_on(self, job):
141142
'payload': job._payload
142143
}
143144
data = simplejson.dumps(data)
144-
self.resq.redis["worker:%s" % str(self)] = data
145+
self.resq.redis["resque:worker:%s" % str(self)] = data
145146
print "worker:%s" % str(self)
146-
print self.resq.redis["worker:%s" % str(self)]
147+
print self.resq.redis["resque:worker:%s" % str(self)]
147148

148149
def done_working(self):
149150
print 'done working'
150151
self.processed()
151-
self.resq.redis.delete("worker:%s" % str(self))
152+
self.resq.redis.delete("resque:worker:%s" % str(self))
152153

153154
def processed(self):
154155
total_processed = Stat("processed", self.resq)
@@ -164,20 +165,21 @@ def failed(self):
164165
stat = Stat("failed:%s" % self, self.resq)
165166
total_failed.incr()
166167
stat.incr()
168+
167169
def get_failed(self):
168170
return Stat("failed:%s" % self, self.resq).get()
169171
def job(self):
170-
data = self.resq.redis.get("worker:%s" % self)
172+
data = self.resq.redis.get("resque:worker:%s" % self)
171173
if data:
172174
return ResQ.decode(data)
173175
return {}
176+
174177

175178
def processing(self):
176179
return self.job()
177180

178181
def state(self):
179-
return 'working' if self.resq.redis.exists('worker:%s' % self) else 'idle'
180-
182+
return 'working' if self.resq.redis.exists('resque:worker:%s' % self) else 'idle'
181183

182184
@classmethod
183185
def run(cls, queues, server):
@@ -190,7 +192,7 @@ def all(cls, host="localhost:6379"):
190192
resq = ResQ(host)
191193
elif isinstance(host, ResQ):
192194
resq = host
193-
return [Worker.find(w,resq) for w in resq.redis.smembers('workers')]
195+
return [Worker.find(w,resq) for w in resq.redis.smembers('resque:workers')]
194196

195197
@classmethod
196198
def working(cls, host):
@@ -199,13 +201,13 @@ def working(cls, host):
199201
elif isinstance(host, ResQ):
200202
resq = host
201203
total = []
202-
for key in Worker.all(host):
203-
total.append('worker:%s' % key)
204+
for key in Worker.all(host):
205+
total.append('resque:worker:%s' % key)
204206
names = []
205207
for key in total:
206208
value = resq.redis.get(key)
207209
if value:
208-
w = Worker.find(key[7:], resq)
210+
w = Worker.find(key[14:], resq) #resque:worker:
209211
names.append(w)
210212
return names
211213

@@ -221,7 +223,7 @@ def find(cls, worker_id, resq):
221223

222224
@classmethod
223225
def exists(cls, worker_id, resq):
224-
return resq.redis.sismember('workers', worker_id)
226+
return resq.redis.sismember('resque:workers', worker_id)
225227

226228

227229
if __name__ == "__main__":
@@ -236,4 +238,4 @@ def exists(cls, worker_id, resq):
236238
queues = options.queue_list.split(',')
237239
import sys
238240
sys.path.insert(0,'/Users/mgeorge/dev/pyres/src')
239-
Worker.run(queues, options.server)
241+
Worker.run(queues, options.server)
File renamed without changes.

0 commit comments

Comments
 (0)