Skip to content

Commit 1f43c46

Browse files
author
Matt George
committed
added extensions module
1 parent 5a370b3 commit 1f43c46

File tree

2 files changed

+114
-23
lines changed

2 files changed

+114
-23
lines changed

src/pyres/extensions.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
try:
2+
import multiprocessing
3+
except:
4+
import sys
5+
sys.exit("multiprocessing was not available")
6+
7+
from pyres.exceptions import NoQueueError
8+
from pyres.worker import Worker
9+
class JuniorWorker(Worker):
10+
def work(self, interval=5):
11+
self.startup()
12+
while True:
13+
if self._shutdown:
14+
break
15+
job = self.reserve()
16+
if job:
17+
print "got: %s" % job
18+
self.child = os.fork()
19+
if self.child:
20+
print 'Forked %s at %s' % (self.child, datetime.datetime.now())
21+
os.waitpid(self.child, 0)
22+
else:
23+
print 'Processing %s since %s' % (job._queue, datetime.datetime.now())
24+
self.process(job)
25+
os._exit(0)
26+
self.child = None
27+
else:
28+
break
29+
30+
self.unregister_worker()
31+
32+
33+
class Manager(object):
34+
def __init__(self, queues, host, max_children=10):
35+
self.queues = queues
36+
self._host = host
37+
self.max_children = max_children
38+
self._shutdown = False
39+
self.children = []
40+
self.resq = ResQ(host)
41+
self.validate_queues()
42+
self.reports = {}
43+
44+
def __str__(self):
45+
import os;
46+
hostname = os.uname()[1]
47+
pid = os.getpid()
48+
return 'Manager:%s:%s:%s' % (hostname, pid, ','.join(self.queues))
49+
50+
def validate_queues(self):
51+
if not self.queues:
52+
raise NoQueueError("Please give each worker at least one queue.")
53+
54+
def check_rising(self, queue, size):
55+
if queue in self.reports:
56+
time = time.time()
57+
old_size = self.reports[queue][0]
58+
old_time = self.reports[queue][1]
59+
if time > old_time + 5 and size > old_size + 20:
60+
return True
61+
else:
62+
self.reports[queue] = (size, time.time())
63+
return False
64+
65+
def work(self):
66+
self.startup()
67+
while True:
68+
if self._shutdown:
69+
break
70+
#check to see if stuff is still going
71+
for queue in self.queues:
72+
#check queue size
73+
size = self.resq.size(queue)
74+
if check_rising(queue,size):
75+
if len(self.children) < self.max_children:
76+
self.start_child(queue)
77+
78+
def startup(self):
79+
self.register_manager()
80+
self.register_signals()
81+
82+
def register_manager(self):
83+
self.resq.redis.sadd('managers',str(self))
84+
85+
def unregister_manager(self):
86+
self.resq.redis.srem('managers',str(self))
87+
88+
def register_signals(self):
89+
signal.signal(signal.SIGTERM, self.shutdown_all)
90+
signal.signal(signal.SIGINT, self.shutdown_all)
91+
signal.signal(signal.SIGQUIT, self.schedule_shutdown)
92+
signal.signal(signal.SIGUSR1, self.kill_children)
93+
94+
def shutdown_all(self, signum, frame):
95+
self.schedule_shutdown(signum, frame)
96+
self.kill_children(signum, frame)
97+
98+
def schedule_shutdown(self, signum, frame):
99+
self._shutdown = True
100+
101+
def kill_children(self):
102+
for child in self.children:
103+
child.terminate()
104+
105+
def start_child(self, queue):
106+
p = multiprocessing.Process(target=JuniorWorker.run, args=([queue], self._host))
107+
self.children.append(p)
108+
p.start()
109+
return True
110+
111+
@classmethod
112+
def run(cls, queues=[], host="localhost:6379"):
113+
manager = cls(queues, host)
114+
manager.work()

src/pyres/worker.py

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -178,29 +178,6 @@ def find(cls, worker_id, resq):
178178
def exists(cls, worker_id, resq):
179179
return resq.redis.sismember('workers', worker_id)
180180

181-
class JuniorWorker(Worker):
182-
def work(self, interval=5):
183-
self.startup()
184-
while True:
185-
if self._shutdown:
186-
break
187-
job = self.reserve()
188-
if job:
189-
print "got: %s" % job
190-
self.child = os.fork()
191-
if self.child:
192-
print 'Forked %s at %s' % (self.child, datetime.datetime.now())
193-
os.waitpid(self.child, 0)
194-
else:
195-
print 'Processing %s since %s' % (job._queue, datetime.datetime.now())
196-
self.process(job)
197-
os._exit(0)
198-
self.child = None
199-
else:
200-
break
201-
202-
self.unregister_worker()
203-
204181

205182
if __name__ == "__main__":
206183
from optparse import OptionParser

0 commit comments

Comments
 (0)