Skip to content

Commit a8615e6

Browse files
committed
Merge branch 'develop'
2 parents 4ac39d5 + b1e5c82 commit a8615e6

18 files changed

+515
-478
lines changed

pyres/__init__.py

Lines changed: 61 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,22 @@ def setup_logging(log_level=logging.INFO, filename=None, stream=sys.stderr):
1818
handler = WatchedFileHandler(filename)
1919
except:
2020
from logging.handlers import RotatingFileHandler
21-
handler = RotatingFileHandler(filename,maxBytes=52428800, backupCount=7)
21+
handler = RotatingFileHandler(filename,maxBytes=52428800,
22+
backupCount=7)
2223
else:
23-
handler = logging.StreamHandler(strm=stream)
24+
handler = logging.StreamHandler(stream)
2425
handler.setFormatter(logging.Formatter(
2526
'%(asctime)s %(levelname)-8s %(message)s', '%Y-%m-%d %H:%M:%S'))
2627
logger.addHandler(handler)
2728

2829
def my_import(name):
29-
"""Helper function for walking import calls when searching for classes by string names."""
30-
mod = __import__(name)
31-
components = name.split('.')
32-
for comp in components[1:]:
33-
mod = getattr(mod, comp)
30+
"""Helper function for walking import calls when searching for classes by
31+
string names.
32+
"""
33+
mod = __import__(name)
34+
components = name.split('.')
35+
for comp in components[1:]:
36+
mod = getattr(mod, comp)
3437
return mod
3538

3639
def safe_str_to_class(s):
@@ -63,34 +66,34 @@ def str_to_class(s):
6366
class ResQ(object):
6467
"""The ResQ class defines the Redis server object to which we will
6568
enqueue jobs into various queues.
66-
69+
6770
The ``__init__`` takes these keyword arguments:
68-
71+
6972
``server`` -- IP address and port of the Redis server to which you want to connect. Default is `localhost:6379`.
70-
73+
7174
``password`` -- The password, if required, of your Redis server. Default is "None".
72-
75+
7376
``timeout`` -- The timeout keyword is in the signature, but is unused. Default is "None".
74-
77+
7578
``retry_connection`` -- This keyword is in the signature but is deprecated. Default is "True".
76-
77-
79+
80+
7881
Both ``timeout`` and ``retry_connection`` will be removed as the python-redis client
79-
no longer uses them.
80-
82+
no longer uses them.
83+
8184
Example usage::
8285
8386
>>> from pyres import *
8487
>>> r = ResQ(server="192.168.1.10:6379", password="some_pwd")
8588
# Assuming redis is running on default port with no password
86-
89+
8790
**r** is a resque object on which we can enqueue tasks.::
8891
8992
>>>> r.enqueue(SomeClass, args)
9093
91-
SomeClass can be any python class with a *perform* method and a *queue*
94+
SomeClass can be any python class with a *perform* method and a *queue*
9295
attribute on it.
93-
96+
9497
"""
9598
def __init__(self, server="localhost:6379", password=None):
9699
self.redis = server
@@ -144,9 +147,9 @@ def _set_redis(self, server):
144147
redis = property(_get_redis, _set_redis)
145148

146149
def enqueue(self, klass, *args):
147-
"""Enqueue a job into a specific queue. Make sure the class you are passing
148-
has **queue** attribute and a **perform** method on it.
149-
150+
"""Enqueue a job into a specific queue. Make sure the class you are
151+
passing has **queue** attribute and a **perform** method on it.
152+
150153
"""
151154
queue = getattr(klass,'queue', None)
152155
if queue:
@@ -170,14 +173,14 @@ def enqueue_from_string(self, klass_as_string, queue, *args, **kwargs):
170173
logging.debug("job arguments: %s" % str(args))
171174
else:
172175
logging.debug("no arguments passed in.")
173-
176+
174177
def queues(self):
175178
return self.redis.smembers("resque:queues") or []
176-
179+
177180
def info(self):
178-
"""Returns a dictionary of the current status of the pending jobs,
181+
"""Returns a dictionary of the current status of the pending jobs,
179182
processed, no. of queues, no. of workers, no. of failed jobs.
180-
183+
181184
"""
182185
pending = 0
183186
for q in self.queues():
@@ -191,77 +194,81 @@ def info(self):
191194
'failed' : Stat('failed',self).get(),
192195
'servers' : ['%s:%s' % (self.redis.host, self.redis.port)]
193196
}
194-
197+
195198
def keys(self):
196-
return [key.replace('resque:','') for key in self.redis.keys('resque:*')]
197-
199+
return [key.replace('resque:','')
200+
for key in self.redis.keys('resque:*')]
201+
198202
def reserve(self, queue):
199203
from pyres.job import Job
200204
return Job.reserve(queue, self)
201-
205+
202206
def __str__(self):
203207
return "PyRes Client connected to %s" % self.redis.server
204-
208+
205209
def workers(self):
206210
from pyres.worker import Worker
207211
return Worker.all(self)
208-
212+
209213
def working(self):
210214
from pyres.worker import Worker
211215
return Worker.working(self)
212-
216+
213217
def remove_queue(self, queue):
214218
if queue in self._watched_queues:
215219
self._watched_queues.remove(queue)
216220
self.redis.srem('resque:queues',queue)
217221
del self.redis['resque:queue:%s' % queue]
218-
222+
219223
def close(self):
220224
"""Close the underlying redis connection.
221-
225+
222226
"""
223227
self.redis.disconnect()
224-
228+
225229
def enqueue_at(self, datetime, klass, *args, **kwargs):
226230
class_name = '%s.%s' % (klass.__module__, klass.__name__)
227-
logging.info("enqueued '%s' job for execution at %s" % (class_name, datetime))
231+
logging.info("enqueued '%s' job for execution at %s" % (class_name,
232+
datetime))
228233
if args:
229234
logging.debug("job arguments are: %s" % str(args))
230235
payload = {'class':class_name, 'queue': klass.queue, 'args':args}
231236
if 'first_attempt' in kwargs:
232237
payload['first_attempt'] = kwargs['first_attempt']
233238
self.delayed_push(datetime, payload)
234-
239+
235240
def delayed_push(self, datetime, item):
236241
key = int(time.mktime(datetime.timetuple()))
237242
self.redis.rpush('resque:delayed:%s' % key, ResQ.encode(item))
238243
self.redis.zadd('resque:delayed_queue_schedule', key, key)
239-
244+
240245
def delayed_queue_peek(self, start, count):
241-
return [int(item) for item in self.redis.zrange('resque:delayed_queue_schedule', start, start+count) or []]
242-
246+
return [int(item) for item in self.redis.zrange(
247+
'resque:delayed_queue_schedule', start, start+count) or []]
248+
243249
def delayed_timestamp_peek(self, timestamp, start, count):
244250
return self.list_range('resque:delayed:%s' % timestamp, start, count)
245-
251+
246252
def delayed_queue_schedule_size(self):
247253
size = 0
248254
length = self.redis.zcard('resque:delayed_queue_schedule')
249255
for i in self.redis.zrange('resque:delayed_queue_schedule',0,length):
250256
size += self.delayed_timestamp_size(i)
251257
return size
252-
258+
253259
def delayed_timestamp_size(self, timestamp):
254260
#key = int(time.mktime(timestamp.timetuple()))
255261
return self.redis.llen("resque:delayed:%s" % timestamp)
256-
262+
257263
def next_delayed_timestamp(self):
258264
key = int(time.mktime(ResQ._current_time().timetuple()))
259-
array = self.redis.zrangebyscore('resque:delayed_queue_schedule', '-inf', key)
265+
array = self.redis.zrangebyscore('resque:delayed_queue_schedule',
266+
'-inf', key)
260267
timestamp = None
261268
if array:
262269
timestamp = array[0]
263270
return timestamp
264-
271+
265272
def next_item_for_timestamp(self, timestamp):
266273
#key = int(time.mktime(timestamp.timetuple()))
267274
key = "resque:delayed:%s" % timestamp
@@ -273,7 +280,7 @@ def next_item_for_timestamp(self, timestamp):
273280
self.redis.delete(key)
274281
self.redis.zrem('resque:delayed_queue_schedule', timestamp)
275282
return item
276-
283+
277284
@classmethod
278285
def encode(cls, item):
279286
return json.dumps(item)
@@ -284,7 +291,7 @@ def decode(cls, item):
284291
ret = json.loads(item)
285292
return ret
286293
return None
287-
294+
288295
@classmethod
289296
def _enqueue(cls, klass, *args):
290297
queue = getattr(klass,'queue', None)
@@ -300,25 +307,25 @@ def _current_time():
300307

301308
class Stat(object):
302309
"""A Stat class which shows the current status of the queue.
303-
310+
304311
"""
305312
def __init__(self, name, resq):
306313
self.name = name
307314
self.key = "resque:stat:%s" % self.name
308315
self.resq = resq
309-
316+
310317
def get(self):
311318
val = self.resq.redis.get(self.key)
312319
if val:
313320
return int(val)
314321
return 0
315-
322+
316323
def incr(self, ammount=1):
317324
self.resq.redis.incr(self.key, ammount)
318-
325+
319326
def decr(self, ammount=1):
320327
self.resq.redis.decr(self.key, ammount)
321-
328+
322329
def clear(self):
323330
self.resq.redis.delete(self.key)
324-
331+

0 commit comments

Comments
 (0)