Skip to content

Commit 7d51b4e

Browse files
committed
Merge remote branch 'themonkey/master'
2 parents 90808f6 + 26269ad commit 7d51b4e

File tree

12 files changed

+347
-269
lines changed

12 files changed

+347
-269
lines changed

CHANGES.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
2011-03-01 whit <[email protected]>
2+
3+
* Added hooks for the worker to allow worker subclasses to insert
4+
code before and after forking
5+

pyres/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,11 @@ def push(self, queue, item):
105105
self.watch_queue(queue)
106106
self.redis.rpush("resque:queue:%s" % queue, ResQ.encode(item))
107107

108-
def pop(self, queue):
109-
ret = self.redis.lpop("resque:queue:%s" % queue)
108+
def pop(self, queue, timeout=10):
109+
ret = self.redis.blpop("resque:queue:%s" % queue, timeout=timeout)
110110
if ret:
111+
if isinstance(ret, tuple):
112+
q, ret = ret
111113
return ResQ.decode(ret)
112114
return ret
113115

pyres/extensions.py

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
1+
import os
2+
import datetime
3+
import time
4+
import signal
5+
16
try:
27
import multiprocessing
38
except:
49
import sys
510
sys.exit("multiprocessing was not available")
6-
import os, datetime, time, signal
11+
712
from pyres import ResQ
8-
13+
914
from pyres.exceptions import NoQueueError
1015
from pyres.worker import Worker
1116

@@ -18,20 +23,23 @@ def work(self, interval=5):
1823
job = self.reserve()
1924
if job:
2025
print "got: %s" % job
26+
2127
self.child = os.fork()
28+
2229
if self.child:
23-
print 'Forked %s at %s' % (self.child, datetime.datetime.now())
30+
print 'Forked %s at %s' % (self.child,
31+
datetime.datetime.now())
2432
os.waitpid(self.child, 0)
2533
else:
26-
print 'Processing %s since %s' % (job._queue, datetime.datetime.now())
34+
print 'Processing %s since %s' % (job._queue,
35+
datetime.datetime.now())
2736
self.process(job)
2837
os._exit(0)
2938
self.child = None
3039
else:
3140
break
32-
41+
3342
self.unregister_worker()
34-
3543

3644
class Manager(object):
3745
def __init__(self, queues, host, max_children=10):
@@ -43,28 +51,27 @@ def __init__(self, queues, host, max_children=10):
4351
self.resq = ResQ(host)
4452
self.validate_queues()
4553
self.reports = {}
46-
54+
4755
def __str__(self):
48-
import os;
4956
hostname = os.uname()[1]
5057
pid = os.getpid()
5158
return 'Manager:%s:%s:%s' % (hostname, pid, ','.join(self.queues))
52-
59+
5360
def validate_queues(self):
5461
if not self.queues:
5562
raise NoQueueError("Please give each worker at least one queue.")
56-
63+
5764
def check_rising(self, queue, size):
5865
if queue in self.reports:
59-
time = time.time()
66+
new_time = time.time()
6067
old_size = self.reports[queue][0]
6168
old_time = self.reports[queue][1]
62-
if time > old_time + 5 and size > old_size + 20:
69+
if new_time > old_time + 5 and size > old_size + 20:
6370
return True
6471
else:
6572
self.reports[queue] = (size, time.time())
6673
return False
67-
74+
6875
def work(self):
6976
self.startup()
7077
while True:
@@ -74,44 +81,46 @@ def work(self):
7481
for queue in self.queues:
7582
#check queue size
7683
size = self.resq.size(queue)
77-
if check_rising(queue,size):
84+
85+
if self.check_rising(queue, size):
7886
if len(self.children) < self.max_children:
7987
self.start_child(queue)
80-
88+
8189
def startup(self):
8290
self.register_manager()
8391
self.register_signals()
84-
92+
8593
def register_manager(self):
86-
self.resq.redis.sadd('managers',str(self))
87-
94+
self.resq.redis.sadd('managers', str(self))
95+
8896
def unregister_manager(self):
89-
self.resq.redis.srem('managers',str(self))
90-
97+
self.resq.redis.srem('managers', str(self))
98+
9199
def register_signals(self):
92100
signal.signal(signal.SIGTERM, self.shutdown_all)
93101
signal.signal(signal.SIGINT, self.shutdown_all)
94102
signal.signal(signal.SIGQUIT, self.schedule_shutdown)
95103
signal.signal(signal.SIGUSR1, self.kill_children)
96-
104+
97105
def shutdown_all(self, signum, frame):
98106
self.schedule_shutdown(signum, frame)
99107
self.kill_children(signum, frame)
100-
108+
101109
def schedule_shutdown(self, signum, frame):
102110
self._shutdown = True
103-
111+
104112
def kill_children(self):
105113
for child in self.children:
106114
child.terminate()
107-
115+
108116
def start_child(self, queue):
109-
p = multiprocessing.Process(target=JuniorWorker.run, args=([queue], self._host))
117+
p = multiprocessing.Process(target=JuniorWorker.run, args=([queue],
118+
self._host))
110119
self.children.append(p)
111120
p.start()
112121
return True
113-
122+
114123
@classmethod
115-
def run(cls, queues=[], host="localhost:6379"):
124+
def run(cls, queues=(), host="localhost:6379"):
116125
manager = cls(queues, host)
117-
manager.work()
126+
manager.work()

pyres/failure/redis.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import datetime, time
2+
from base64 import b64encode
3+
24
from base import BaseBackend
35
from pyres import ResQ
46

57
class RedisBackend(BaseBackend):
68
"""Extends the ``BaseBackend`` to provide a Redis backend for failed jobs."""
7-
9+
810
def save(self, resq=None):
911
"""Saves the failed Job into a "failed" Redis queue preserving all its original enqueud info."""
1012
if not resq:
@@ -20,15 +22,15 @@ def save(self, resq=None):
2022
data['worker'] = self._worker
2123
data = ResQ.encode(data)
2224
resq.redis.rpush('resque:failed', data)
23-
25+
2426
@classmethod
2527
def count(cls, resq):
2628
return int(resq.redis.llen('resque:failed'))
2729

2830
@classmethod
2931
def all(cls, resq, start=0, count=1):
30-
from base64 import b64encode
3132
items = resq.redis.lrange('resque:failed', start, count) or []
33+
3234
ret_list = []
3335
for i in items:
3436
failure = ResQ.decode(i)
@@ -39,4 +41,4 @@ def all(cls, resq, start=0, count=1):
3941
@classmethod
4042
def clear(cls, resq):
4143
return resq.redis.delete('resque:failed')
42-
44+

pyres/job.py

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,30 @@
1-
from datetime import datetime, timedelta
2-
from pyres import ResQ, str_to_class, safe_str_to_class
1+
from datetime import timedelta
2+
from pyres import ResQ, safe_str_to_class
33
from pyres import failure
44
from pyres.failure.redis import RedisBackend
55

6+
67
class Job(object):
78
"""Every job on the ResQ is an instance of the *Job* class.
89
910
The ``__init__`` takes these keyword arguments:
1011
11-
``queue`` -- A string defining the queue to which this Job will be added.
12+
``queue`` -- A string defining the queue to which this Job will be
13+
added.
1214
13-
``payload`` -- A dictionary which contains the string name of a class which extends this Job and
14-
a list of args which will be passed to that class.
15+
``payload`` -- A dictionary which contains the string name of a class
16+
which extends this Job and a list of args which will be
17+
passed to that class.
1518
1619
``resq`` -- An instance of the ResQ class.
1720
18-
``worker`` -- The name of a specific worker if you'd like this Job to be done by that worker. Default is "None".
21+
``worker`` -- The name of a specific worker if you'd like this Job to be
22+
done by that worker. Default is "None".
1923
2024
"""
25+
26+
safe_str_to_class = staticmethod(safe_str_to_class)
27+
2128
def __init__(self, queue, payload, resq, worker=None):
2229
self._queue = queue
2330
self._payload = payload
@@ -33,26 +40,30 @@ def __str__(self):
3340
self._queue, self._payload['class'], repr(self._payload['args']))
3441

3542
def perform(self):
36-
"""This method converts payload into args and calls the ``perform`` method
37-
on the payload class.
43+
"""This method converts payload into args and calls the ``perform``
44+
method on the payload class.
45+
46+
#@ add entry_point loading
3847
3948
"""
4049
payload_class_str = self._payload["class"]
41-
payload_class = safe_str_to_class(payload_class_str)
50+
payload_class = self.safe_str_to_class(payload_class_str)
4251
payload_class.resq = self.resq
43-
args = self._payload.get("args", None)
52+
args = self._payload.get("args")
53+
4454
try:
4555
return payload_class.perform(*args)
4656
except:
4757
if not self.retry(payload_class, args):
4858
raise
4959

5060
def fail(self, exception):
51-
"""This method provides a way to fail a job and will use whatever failure backend
52-
you've provided. The default is the ``RedisBackend``.
61+
"""This method provides a way to fail a job and will use whatever
62+
failure backend you've provided. The default is the ``RedisBackend``.
5363
5464
"""
55-
fail = failure.create(exception, self._queue, self._payload, self._worker)
65+
fail = failure.create(exception, self._queue, self._payload,
66+
self._worker)
5667
fail.save(self.resq)
5768
return fail
5869

@@ -72,11 +83,11 @@ def retry(self, payload_class, args):
7283
return False
7384

7485
@classmethod
75-
def reserve(cls, queue, res, worker=None):
86+
def reserve(cls, queue, res, worker=None, timeout=10):
7687
"""Reserve a job on the queue. This marks this job so that other workers
7788
will not pick it up.
7889
7990
"""
80-
payload = res.pop(queue)
91+
payload = res.pop(queue, timeout=timeout)
8192
if payload:
8293
return cls(queue, payload, res, worker)

pyres/json_parser.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
from datetime import datetime
2+
23
try:
34
#import simplejson as json
45
import json
56
except ImportError:
67
import simplejson as json
78

8-
99
DATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
1010
DATE_PREFIX = '@D:'
1111

12-
1312
class CustomJSONEncoder(json.JSONEncoder):
1413

1514
def default(self, o):

0 commit comments

Comments
 (0)