Skip to content

Commit 93a4065

Browse files
author
Whit Morriss
committed
Merge branch 'master' of https://github.com/binarydud/pyres
2 parents 622815b + 1cf7a1d commit 93a4065

19 files changed

+453
-247
lines changed

AUTHORS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@
55
* dsc
66
* Alex Ezell
77
* Michael Russo
8+
* Whit Morris
89

910
Inspired by Resque, by Chris Wanstrath

HISTORY.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
## 0.9.1 (2010-10-15)
2+
* fixing issues #45, #46.
3+
* #45 - resweb not working in chrome
4+
* #46 - delayed_queue_schedule_size() returns incorrect value
5+
* updated version requirement for redis-py
6+
* added Failure docs from Alex._
7+
8+
## 0.9 (2010-08-05)
9+
* added better logging to the project
10+
111
## 0.8 (2010-04-24)
212
* added the pyres_manager and the horde module. This allows a more prefork like model for processing jobs.
313
* setproctitle usage. Allows better process titles when viewing via ps

docs/source/failures.rst

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
Failures
2+
===============
3+
4+
Pyres provides a ``BaseBackend`` for handling failed jobs. You can subclass
5+
this backend to store failed jobs in any system you like.
6+
7+
Currently, the only provided backend is a ``RedisBackend`` which will store
8+
your failed jobs into a special *failed* queue for later processing or reenqueueing.
9+
10+
Here's a simple example::
11+
12+
>>> from pyres import failure
13+
>>> from pyres.job import Job
14+
>>> from pyres import ResQ
15+
>>> r = Resq()
16+
>>> job = Job.reserve('basic',r)
17+
>>> job.fail("problem")

pyres/__init__.py

Lines changed: 67 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
__version__ = '0.9'
1+
__version__ = '0.9.1'
22

33
from redis import Redis
44
import pyres.json_parser as json
@@ -18,19 +18,22 @@ def setup_logging(log_level=logging.INFO, filename=None, stream=sys.stderr):
1818
handler = WatchedFileHandler(filename)
1919
except:
2020
from logging.handlers import RotatingFileHandler
21-
handler = RotatingFileHandler(filename,maxBytes=52428800, backupCount=7)
21+
handler = RotatingFileHandler(filename,maxBytes=52428800,
22+
backupCount=7)
2223
else:
23-
handler = logging.StreamHandler(strm=stream)
24+
handler = logging.StreamHandler(stream)
2425
handler.setFormatter(logging.Formatter(
2526
'%(asctime)s %(levelname)-8s %(message)s', '%Y-%m-%d %H:%M:%S'))
2627
logger.addHandler(handler)
2728

2829
def my_import(name):
29-
"""Helper function for walking import calls when searching for classes by string names."""
30-
mod = __import__(name)
31-
components = name.split('.')
32-
for comp in components[1:]:
33-
mod = getattr(mod, comp)
30+
"""Helper function for walking import calls when searching for classes by
31+
string names.
32+
"""
33+
mod = __import__(name)
34+
components = name.split('.')
35+
for comp in components[1:]:
36+
mod = getattr(mod, comp)
3437
return mod
3538

3639
def safe_str_to_class(s):
@@ -63,34 +66,34 @@ def str_to_class(s):
6366
class ResQ(object):
6467
"""The ResQ class defines the Redis server object to which we will
6568
enqueue jobs into various queues.
66-
69+
6770
The ``__init__`` takes these keyword arguments:
68-
71+
6972
``server`` -- IP address and port of the Redis server to which you want to connect. Default is `localhost:6379`.
70-
73+
7174
``password`` -- The password, if required, of your Redis server. Default is "None".
72-
75+
7376
``timeout`` -- The timeout keyword is in the signature, but is unused. Default is "None".
74-
77+
7578
``retry_connection`` -- This keyword is in the signature but is deprecated. Default is "True".
76-
77-
79+
80+
7881
Both ``timeout`` and ``retry_connection`` will be removed as the python-redis client
79-
no longer uses them.
80-
82+
no longer uses them.
83+
8184
Example usage::
8285
8386
>>> from pyres import *
8487
>>> r = ResQ(server="192.168.1.10:6379", password="some_pwd")
8588
# Assuming redis is running on default port with no password
86-
89+
8790
**r** is a resque object on which we can enqueue tasks.::
8891
8992
>>>> r.enqueue(SomeClass, args)
9093
91-
SomeClass can be any python class with a *perform* method and a *queue*
94+
SomeClass can be any python class with a *perform* method and a *queue*
9295
attribute on it.
93-
96+
9497
"""
9598
def __init__(self, server="localhost:6379", password=None):
9699
self.redis = server
@@ -144,9 +147,9 @@ def _set_redis(self, server):
144147
redis = property(_get_redis, _set_redis)
145148

146149
def enqueue(self, klass, *args):
147-
"""Enqueue a job into a specific queue. Make sure the class you are passing
148-
has **queue** attribute and a **perform** method on it.
149-
150+
"""Enqueue a job into a specific queue. Make sure the class you are
151+
passing has **queue** attribute and a **perform** method on it.
152+
150153
"""
151154
queue = getattr(klass,'queue', None)
152155
if queue:
@@ -170,14 +173,14 @@ def enqueue_from_string(self, klass_as_string, queue, *args, **kwargs):
170173
logging.debug("job arguments: %s" % str(args))
171174
else:
172175
logging.debug("no arguments passed in.")
173-
176+
174177
def queues(self):
175178
return self.redis.smembers("resque:queues") or []
176-
179+
177180
def info(self):
178-
"""Returns a dictionary of the current status of the pending jobs,
181+
"""Returns a dictionary of the current status of the pending jobs,
179182
processed, no. of queues, no. of workers, no. of failed jobs.
180-
183+
181184
"""
182185
pending = 0
183186
for q in self.queues():
@@ -191,73 +194,81 @@ def info(self):
191194
'failed' : Stat('failed',self).get(),
192195
'servers' : ['%s:%s' % (self.redis.host, self.redis.port)]
193196
}
194-
197+
195198
def keys(self):
196-
return [key.replace('resque:','') for key in self.redis.keys('resque:*')]
197-
199+
return [key.replace('resque:','')
200+
for key in self.redis.keys('resque:*')]
201+
198202
def reserve(self, queue):
199203
from pyres.job import Job
200204
return Job.reserve(queue, self)
201-
205+
202206
def __str__(self):
203207
return "PyRes Client connected to %s" % self.redis.server
204-
208+
205209
def workers(self):
206210
from pyres.worker import Worker
207211
return Worker.all(self)
208-
212+
209213
def working(self):
210214
from pyres.worker import Worker
211215
return Worker.working(self)
212-
216+
213217
def remove_queue(self, queue):
214218
if queue in self._watched_queues:
215219
self._watched_queues.remove(queue)
216220
self.redis.srem('resque:queues',queue)
217221
del self.redis['resque:queue:%s' % queue]
218-
222+
219223
def close(self):
220224
"""Close the underlying redis connection.
221-
225+
222226
"""
223227
self.redis.disconnect()
224-
228+
225229
def enqueue_at(self, datetime, klass, *args, **kwargs):
226230
class_name = '%s.%s' % (klass.__module__, klass.__name__)
227-
logging.info("enqueued '%s' job for execution at %s" % (class_name, datetime))
231+
logging.info("enqueued '%s' job for execution at %s" % (class_name,
232+
datetime))
228233
if args:
229234
logging.debug("job arguments are: %s" % str(args))
230235
payload = {'class':class_name, 'queue': klass.queue, 'args':args}
231236
if 'first_attempt' in kwargs:
232237
payload['first_attempt'] = kwargs['first_attempt']
233238
self.delayed_push(datetime, payload)
234-
239+
235240
def delayed_push(self, datetime, item):
236241
key = int(time.mktime(datetime.timetuple()))
237242
self.redis.rpush('resque:delayed:%s' % key, ResQ.encode(item))
238243
self.redis.zadd('resque:delayed_queue_schedule', key, key)
239-
244+
240245
def delayed_queue_peek(self, start, count):
241-
return [int(item) for item in self.redis.zrange('resque:delayed_queue_schedule', start, start+count) or []]
242-
246+
return [int(item) for item in self.redis.zrange(
247+
'resque:delayed_queue_schedule', start, start+count) or []]
248+
243249
def delayed_timestamp_peek(self, timestamp, start, count):
244250
return self.list_range('resque:delayed:%s' % timestamp, start, count)
245-
251+
246252
def delayed_queue_schedule_size(self):
247-
return self.redis.zcard('resque:delayed_queue_schedule')
248-
253+
size = 0
254+
length = self.redis.zcard('resque:delayed_queue_schedule')
255+
for i in self.redis.zrange('resque:delayed_queue_schedule',0,length):
256+
size += self.delayed_timestamp_size(i)
257+
return size
258+
249259
def delayed_timestamp_size(self, timestamp):
250260
#key = int(time.mktime(timestamp.timetuple()))
251261
return self.redis.llen("resque:delayed:%s" % timestamp)
252-
262+
253263
def next_delayed_timestamp(self):
254264
key = int(time.mktime(ResQ._current_time().timetuple()))
255-
array = self.redis.zrangebyscore('resque:delayed_queue_schedule', '-inf', key)
265+
array = self.redis.zrangebyscore('resque:delayed_queue_schedule',
266+
'-inf', key)
256267
timestamp = None
257268
if array:
258269
timestamp = array[0]
259270
return timestamp
260-
271+
261272
def next_item_for_timestamp(self, timestamp):
262273
#key = int(time.mktime(timestamp.timetuple()))
263274
key = "resque:delayed:%s" % timestamp
@@ -269,7 +280,7 @@ def next_item_for_timestamp(self, timestamp):
269280
self.redis.delete(key)
270281
self.redis.zrem('resque:delayed_queue_schedule', timestamp)
271282
return item
272-
283+
273284
@classmethod
274285
def encode(cls, item):
275286
return json.dumps(item)
@@ -280,7 +291,7 @@ def decode(cls, item):
280291
ret = json.loads(item)
281292
return ret
282293
return None
283-
294+
284295
@classmethod
285296
def _enqueue(cls, klass, *args):
286297
queue = getattr(klass,'queue', None)
@@ -296,25 +307,25 @@ def _current_time():
296307

297308
class Stat(object):
298309
"""A Stat class which shows the current status of the queue.
299-
310+
300311
"""
301312
def __init__(self, name, resq):
302313
self.name = name
303314
self.key = "resque:stat:%s" % self.name
304315
self.resq = resq
305-
316+
306317
def get(self):
307318
val = self.resq.redis.get(self.key)
308319
if val:
309320
return int(val)
310321
return 0
311-
322+
312323
def incr(self, ammount=1):
313324
self.resq.redis.incr(self.key, ammount)
314-
325+
315326
def decr(self, ammount=1):
316327
self.resq.redis.decr(self.key, ammount)
317-
328+
318329
def clear(self):
319330
self.resq.redis.delete(self.key)
320-
331+

pyres/failure/__init__.py

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,28 @@
1-
import datetime
2-
from pyres import ResQ
3-
import sys, traceback
41
from pyres.failure.redis import RedisBackend
52

6-
_backend = RedisBackend
3+
backend = RedisBackend
74

85
def create(*args, **kwargs):
9-
return _backend(*args, **kwargs)
6+
return backend(*args, **kwargs)
107

118
def count(resq):
12-
return _backend.count(resq)
9+
return backend.count(resq)
1310

1411
def all(resq, start, count):
15-
return _backend.all(resq, start, count)
12+
return backend.all(resq, start, count)
1613

1714
def clear(resq):
18-
return _backend.clear(resq)
15+
return backend.clear(resq)
1916

2017
def requeue(resq, failure_object):
2118
queue = failure_object._queue
2219
payload = failure_object._payload
2320
return resq.push(queue, payload)
24-
21+
2522
def retry(resq, queue, payload):
2623
job = resq.decode(payload)
2724
resq.push(queue, job['payload'])
2825
return delete(resq, payload)
29-
26+
3027
def delete(resq, payload):
31-
return resq.redis.lrem(name = 'resque:failed', num = 1, value = payload)
32-
28+
return resq.redis.lrem(name='resque:failed', num=1, value=payload)

0 commit comments

Comments
 (0)