Skip to content

Commit 171fbe5

Browse files
author
Whit Morriss
committed
Merge branch 'master' of https://github.com/chawco/pyres into experiment
2 parents 93a4065 + 48afb8a commit 171fbe5

File tree

10 files changed

+299
-259
lines changed

10 files changed

+299
-259
lines changed

pyres/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ def push(self, queue, item):
105105
self.watch_queue(queue)
106106
self.redis.rpush("resque:queue:%s" % queue, ResQ.encode(item))
107107

108-
def pop(self, queue):
109-
ret = self.redis.lpop("resque:queue:%s" % queue)
108+
def pop(self, queue, timeout=10):
109+
ret = self.redis.blpop("resque:queue:%s" % queue, timeout=timeout)
110110
if ret:
111111
return ResQ.decode(ret)
112112
return ret

pyres/extensions.py

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
1+
import os
2+
import datetime
3+
import time
4+
import signal
5+
16
try:
27
import multiprocessing
38
except:
49
import sys
510
sys.exit("multiprocessing was not available")
6-
import os, datetime, time, signal
11+
712
from pyres import ResQ
8-
13+
914
from pyres.exceptions import NoQueueError
1015
from pyres.worker import Worker
1116

@@ -18,20 +23,23 @@ def work(self, interval=5):
1823
job = self.reserve()
1924
if job:
2025
print "got: %s" % job
26+
2127
self.child = os.fork()
28+
2229
if self.child:
23-
print 'Forked %s at %s' % (self.child, datetime.datetime.now())
30+
print 'Forked %s at %s' % (self.child,
31+
datetime.datetime.now())
2432
os.waitpid(self.child, 0)
2533
else:
26-
print 'Processing %s since %s' % (job._queue, datetime.datetime.now())
34+
print 'Processing %s since %s' % (job._queue,
35+
datetime.datetime.now())
2736
self.process(job)
2837
os._exit(0)
2938
self.child = None
3039
else:
3140
break
32-
41+
3342
self.unregister_worker()
34-
3543

3644
class Manager(object):
3745
def __init__(self, queues, host, max_children=10):
@@ -43,28 +51,27 @@ def __init__(self, queues, host, max_children=10):
4351
self.resq = ResQ(host)
4452
self.validate_queues()
4553
self.reports = {}
46-
54+
4755
def __str__(self):
48-
import os;
4956
hostname = os.uname()[1]
5057
pid = os.getpid()
5158
return 'Manager:%s:%s:%s' % (hostname, pid, ','.join(self.queues))
52-
59+
5360
def validate_queues(self):
5461
if not self.queues:
5562
raise NoQueueError("Please give each worker at least one queue.")
56-
63+
5764
def check_rising(self, queue, size):
5865
if queue in self.reports:
59-
time = time.time()
66+
new_time = time.time()
6067
old_size = self.reports[queue][0]
6168
old_time = self.reports[queue][1]
62-
if time > old_time + 5 and size > old_size + 20:
69+
if new_time > old_time + 5 and size > old_size + 20:
6370
return True
6471
else:
6572
self.reports[queue] = (size, time.time())
6673
return False
67-
74+
6875
def work(self):
6976
self.startup()
7077
while True:
@@ -74,44 +81,46 @@ def work(self):
7481
for queue in self.queues:
7582
#check queue size
7683
size = self.resq.size(queue)
77-
if check_rising(queue,size):
84+
85+
if self.check_rising(queue, size):
7886
if len(self.children) < self.max_children:
7987
self.start_child(queue)
80-
88+
8189
def startup(self):
8290
self.register_manager()
8391
self.register_signals()
84-
92+
8593
def register_manager(self):
86-
self.resq.redis.sadd('managers',str(self))
87-
94+
self.resq.redis.sadd('managers', str(self))
95+
8896
def unregister_manager(self):
89-
self.resq.redis.srem('managers',str(self))
90-
97+
self.resq.redis.srem('managers', str(self))
98+
9199
def register_signals(self):
92100
signal.signal(signal.SIGTERM, self.shutdown_all)
93101
signal.signal(signal.SIGINT, self.shutdown_all)
94102
signal.signal(signal.SIGQUIT, self.schedule_shutdown)
95103
signal.signal(signal.SIGUSR1, self.kill_children)
96-
104+
97105
def shutdown_all(self, signum, frame):
98106
self.schedule_shutdown(signum, frame)
99107
self.kill_children(signum, frame)
100-
108+
101109
def schedule_shutdown(self, signum, frame):
102110
self._shutdown = True
103-
111+
104112
def kill_children(self):
105113
for child in self.children:
106114
child.terminate()
107-
115+
108116
def start_child(self, queue):
109-
p = multiprocessing.Process(target=JuniorWorker.run, args=([queue], self._host))
117+
p = multiprocessing.Process(target=JuniorWorker.run, args=([queue],
118+
self._host))
110119
self.children.append(p)
111120
p.start()
112121
return True
113-
122+
114123
@classmethod
115-
def run(cls, queues=[], host="localhost:6379"):
124+
def run(cls, queues=(), host="localhost:6379"):
116125
manager = cls(queues, host)
117-
manager.work()
126+
manager.work()

pyres/failure/redis.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import datetime, time
2+
from base64 import b64encode
3+
24
from base import BaseBackend
35
from pyres import ResQ
46

57
class RedisBackend(BaseBackend):
68
"""Extends the ``BaseBackend`` to provide a Redis backend for failed jobs."""
7-
9+
810
def save(self, resq=None):
911
"""Saves the failed Job into a "failed" Redis queue preserving all its original enqueud info."""
1012
if not resq:
@@ -20,15 +22,15 @@ def save(self, resq=None):
2022
data['worker'] = self._worker
2123
data = ResQ.encode(data)
2224
resq.redis.rpush('resque:failed', data)
23-
25+
2426
@classmethod
2527
def count(cls, resq):
2628
return int(resq.redis.llen('resque:failed'))
2729

2830
@classmethod
2931
def all(cls, resq, start=0, count=1):
30-
from base64 import b64encode
3132
items = resq.redis.lrange('resque:failed', start, count) or []
33+
3234
ret_list = []
3335
for i in items:
3436
failure = ResQ.decode(i)
@@ -39,4 +41,4 @@ def all(cls, resq, start=0, count=1):
3941
@classmethod
4042
def clear(cls, resq):
4143
return resq.redis.delete('resque:failed')
42-
44+

pyres/job.py

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
from datetime import datetime, timedelta
2-
from pyres import ResQ, str_to_class, safe_str_to_class
1+
from datetime import timedelta
2+
from pyres import ResQ, safe_str_to_class
33
from pyres import failure
44
from pyres.failure.redis import RedisBackend
55

@@ -8,14 +8,17 @@ class Job(object):
88
99
The ``__init__`` takes these keyword arguments:
1010
11-
``queue`` -- A string defining the queue to which this Job will be added.
11+
``queue`` -- A string defining the queue to which this Job will be
12+
added.
1213
13-
``payload`` -- A dictionary which contains the string name of a class which extends this Job and
14-
a list of args which will be passed to that class.
14+
``payload`` -- A dictionary which contains the string name of a class
15+
which extends this Job and a list of args which will be
16+
passed to that class.
1517
1618
``resq`` -- An instance of the ResQ class.
1719
18-
``worker`` -- The name of a specific worker if you'd like this Job to be done by that worker. Default is "None".
20+
``worker`` -- The name of a specific worker if you'd like this Job to be
21+
done by that worker. Default is "None".
1922
2023
"""
2124
def __init__(self, queue, payload, resq, worker=None):
@@ -33,26 +36,28 @@ def __str__(self):
3336
self._queue, self._payload['class'], repr(self._payload['args']))
3437

3538
def perform(self):
36-
"""This method converts payload into args and calls the ``perform`` method
37-
on the payload class.
39+
"""This method converts payload into args and calls the ``perform``
40+
method on the payload class.
3841
3942
"""
4043
payload_class_str = self._payload["class"]
4144
payload_class = safe_str_to_class(payload_class_str)
4245
payload_class.resq = self.resq
43-
args = self._payload.get("args", None)
46+
args = self._payload.get("args")
47+
4448
try:
4549
return payload_class.perform(*args)
4650
except:
4751
if not self.retry(payload_class, args):
4852
raise
4953

5054
def fail(self, exception):
51-
"""This method provides a way to fail a job and will use whatever failure backend
52-
you've provided. The default is the ``RedisBackend``.
55+
"""This method provides a way to fail a job and will use whatever
56+
failure backend you've provided. The default is the ``RedisBackend``.
5357
5458
"""
55-
fail = failure.create(exception, self._queue, self._payload, self._worker)
59+
fail = failure.create(exception, self._queue, self._payload,
60+
self._worker)
5661
fail.save(self.resq)
5762
return fail
5863

@@ -72,11 +77,11 @@ def retry(self, payload_class, args):
7277
return False
7378

7479
@classmethod
75-
def reserve(cls, queue, res, worker=None):
80+
def reserve(cls, queue, res, worker=None, timeout=10):
7681
"""Reserve a job on the queue. This marks this job so that other workers
7782
will not pick it up.
7883
7984
"""
80-
payload = res.pop(queue)
85+
payload = res.pop(queue, timeout=timeout)
8186
if payload:
8287
return cls(queue, payload, res, worker)

pyres/json_parser.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
from datetime import datetime
2+
23
try:
34
#import simplejson as json
45
import json
56
except ImportError:
67
import simplejson as json
78

8-
99
DATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
1010
DATE_PREFIX = '@D:'
1111

12-
1312
class CustomJSONEncoder(json.JSONEncoder):
1413

1514
def default(self, o):

pyres/scheduler.py

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,78 +4,80 @@
44

55
from pyres import ResQ
66

7+
logger = logging.getLogger(__name__)
8+
79
class Scheduler(object):
8-
10+
911
def __init__(self, server="localhost:6379", password=None):
1012
"""
1113
>>> from pyres.scheduler import Scheduler
1214
>>> scheduler = Scheduler('localhost:6379')
1315
"""
1416
self._shutdown = False
15-
if isinstance(server,basestring):
17+
if isinstance(server, basestring):
1618
self.resq = ResQ(server=server, password=password)
1719
elif isinstance(server, ResQ):
1820
self.resq = server
1921
else:
2022
raise Exception("Bad server argument")
21-
23+
2224
def register_signal_handlers(self):
23-
logging.info('registering signals')
25+
logger.info('registering signals')
2426
signal.signal(signal.SIGTERM, self.schedule_shutdown)
2527
signal.signal(signal.SIGINT, self.schedule_shutdown)
2628
signal.signal(signal.SIGQUIT, self.schedule_shutdown)
27-
29+
2830
def schedule_shutdown(self, signal, frame):
29-
logging.info('shutting down started')
31+
logger.info('shutting down started')
3032
self._shutdown = True
31-
33+
3234
def __call__(self):
33-
logging.info('starting up')
35+
logger.info('starting up')
3436
self.register_signal_handlers()
3537
#self.load_schedule()
36-
logging.info('looking for delayed items')
38+
logger.info('looking for delayed items')
3739
while True:
38-
if self._shutdown is True:
40+
if self._shutdown:
3941
break
4042
self.handle_delayed_items()
41-
logging.debug('sleeping')
43+
logger.debug('sleeping')
4244
time.sleep(5)
43-
logging.info('shutting down complete')
44-
45+
logger.info('shutting down complete')
46+
4547
def next_timestamp(self):
4648
while True:
4749
timestamp = self.resq.next_delayed_timestamp()
4850
if timestamp:
4951
yield timestamp
5052
else:
5153
break
52-
53-
54+
55+
5456
def next_item(self, timestamp):
5557
while True:
5658
item = self.resq.next_item_for_timestamp(timestamp)
5759
if item:
5860
yield item
5961
else:
6062
break
61-
63+
6264
def handle_delayed_items(self):
6365
for timestamp in self.next_timestamp():
64-
logging.info('handling timestamp: %s' % timestamp)
66+
logger.info('handling timestamp: %s' % timestamp)
6567
for item in self.next_item(timestamp):
66-
logging.debug('queueing item %s' % item)
68+
logger.debug('queueing item %s' % item)
6769
klass = item['class']
6870
queue = item['queue']
6971
args = item['args']
7072
kwargs = {}
7173
if 'first_attempt' in item:
7274
kwargs['first_attempt'] = item['first_attempt']
7375
self.resq.enqueue_from_string(klass, queue, *args, **kwargs)
74-
75-
76+
77+
7678
@classmethod
7779
def run(cls, server, password=None):
7880
sched = cls(server=server, password=password)
7981
sched()
80-
81-
82+
83+

0 commit comments

Comments
 (0)