Skip to content

Commit 587e78e

Browse files
author
Matt George
committed
Merge branch 'for_the_horde'
2 parents 001f614 + b6b6036 commit 587e78e

File tree

9 files changed

+530
-9
lines changed

9 files changed

+530
-9
lines changed

pyres/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
__version__ = '0.7.5.1'
1+
__version__ = '0.8'
22

33
from redis import Redis
44
import pyres.json_parser as json
@@ -258,7 +258,7 @@ def encode(cls, item):
258258

259259
@classmethod
260260
def decode(cls, item):
261-
if item:
261+
if isinstance(item, basestring):
262262
ret = json.loads(item)
263263
return ret
264264
return None

pyres/horde.py

Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
import sys
2+
try:
3+
import multiprocessing
4+
except:
5+
sys.exit("multiprocessing was not available")
6+
7+
import time, os, signal
8+
import datetime
9+
import logging
10+
import logging.handlers
11+
from pyres import ResQ, Stat
12+
from pyres.exceptions import NoQueueError
13+
from pyres.utils import OrderedDict
14+
from pyres.job import Job
15+
import pyres.json_parser as json
16+
17+
def setup_logging(namespace='', log_level=logging.INFO, log_file=None):
18+
19+
logger = multiprocessing.get_logger()
20+
#logger = multiprocessing.log_to_stderr()
21+
logger.setLevel(log_level)
22+
format = '%(asctime)s %(levelname)s '+namespace+': %(message)s'
23+
if log_file:
24+
handler = logging.handlers.RotatingFileHandler(log_file, maxBytes=104857600, backupCount=5)
25+
else:
26+
handler = logging.StreamHandler()
27+
handler.setFormatter(logging.Formatter((format)))
28+
logger.addHandler(handler)
29+
return logger
30+
31+
class Minion(multiprocessing.Process):
32+
def __init__(self, queues, server, password, log_level=logging.INFO, log_file=None):
33+
multiprocessing.Process.__init__(self, name='Minion')
34+
35+
#format = '%(asctime)s %(levelname)s %(filename)s-%(lineno)d: %(message)s'
36+
#logHandler = logging.StreamHandler()
37+
#logHandler.setFormatter(logging.Formatter(format))
38+
#self.logger = multiprocessing.get_logger()
39+
#self.logger.addHandler(logHandler)
40+
#self.logger.setLevel(logging.DEBUG)
41+
42+
self.queues = queues
43+
self._shutdown = False
44+
self.hostname = os.uname()[1]
45+
self.server = server
46+
self.password = password
47+
48+
self.log_level = log_level
49+
self.log_file = log_file
50+
51+
def prune_dead_workers(self):
52+
pass
53+
54+
def schedule_shutdown(self, signum, frame):
55+
self._shutdown = True
56+
57+
def register_signal_handlers(self):
58+
signal.signal(signal.SIGTERM, self.schedule_shutdown)
59+
signal.signal(signal.SIGINT, self.schedule_shutdown)
60+
signal.signal(signal.SIGQUIT, self.schedule_shutdown)
61+
62+
def register_minion(self):
63+
self.resq.redis.sadd('resque:minions',str(self))
64+
self.started = datetime.datetime.now()
65+
66+
def startup(self):
67+
self.register_signal_handlers()
68+
self.prune_dead_workers()
69+
self.register_minion()
70+
71+
def __str__(self):
72+
return '%s:%s:%s' % (self.hostname, self.pid, ','.join(self.queues))
73+
74+
def reserve(self):
75+
for q in self.queues:
76+
self.logger.debug('checking queue: %s' % q)
77+
job = Job.reserve(q, self.resq, self.__str__())
78+
if job:
79+
self.logger.info('Found job on %s' % q)
80+
return job
81+
82+
def process(self, job):
83+
if not job:
84+
return
85+
try:
86+
self.working_on(job)
87+
job.perform()
88+
except Exception, e:
89+
exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
90+
self.logger.error("%s failed: %s" % (job, e))
91+
job.fail(exceptionTraceback)
92+
self.failed()
93+
else:
94+
self.logger.debug("Hells yeah")
95+
self.logger.info('completed job: %s' % job)
96+
finally:
97+
self.done_working()
98+
99+
def working_on(self, job):
100+
self.logger.debug('marking as working on')
101+
data = {
102+
'queue': job._queue,
103+
'run_at': int(time.mktime(datetime.datetime.now().timetuple())),
104+
'payload': job._payload
105+
}
106+
data = json.dumps(data)
107+
self.resq.redis["resque:minion:%s" % str(self)] = data
108+
self.logger.debug("minion:%s" % str(self))
109+
#self.logger.debug(self.resq.redis["resque:minion:%s" % str(self)])
110+
111+
def failed(self):
112+
Stat("failed", self.resq).incr()
113+
114+
def processed(self):
115+
total_processed = Stat("processed", self.resq)
116+
total_processed.incr()
117+
118+
def done_working(self):
119+
self.logger.debug('done working')
120+
self.processed()
121+
self.resq.redis.delete("resque:minion:%s" % str(self))
122+
123+
def unregister_minion(self):
124+
self.resq.redis.srem('resque:minions',str(self))
125+
self.started = None
126+
127+
def work(self, interval=5):
128+
129+
self.startup()
130+
while True:
131+
if self._shutdown:
132+
self.logger.info('shutdown scheduled')
133+
break
134+
job = self.reserve()
135+
if job:
136+
self.process(job)
137+
else:
138+
time.sleep(interval)
139+
self.unregister_minion()
140+
141+
def run(self):
142+
143+
if isinstance(self.server,basestring):
144+
self.resq = ResQ(server=self.server, password=self.password)
145+
elif isinstance(self.server, ResQ):
146+
self.resq = self.server
147+
else:
148+
raise Exception("Bad server argument")
149+
namespace = 'minion:%s' % self.pid
150+
self.logger = setup_logging(namespace, self.log_level, self.log_file)
151+
self.work()
152+
#while True:
153+
# job = self.q.get()
154+
# print 'pid: %s is running %s ' % (self.pid,job)
155+
156+
157+
class Khan(object):
158+
_command_map = {
159+
'ADD': 'add_minion',
160+
'REMOVE': '_remove_minion',
161+
'SHUTDOWN': '_schedule_shutdown'
162+
}
163+
def __init__(self, pool_size=5, queues=[], server='localhost:6379', password=None, logging_level=logging.INFO, log_file=None):
164+
#super(Khan,self).__init__(queues=queues,server=server,password=password)
165+
self._shutdown = False
166+
self.pool_size = int(pool_size)
167+
self.queues = queues
168+
self.server = server
169+
self.password = password
170+
self.pid = os.getpid()
171+
self.validate_queues()
172+
self._workers = OrderedDict()
173+
self.server = server
174+
self.password = password
175+
self.logging_level = logging_level
176+
self.log_file = log_file
177+
self.logger = setup_logging('khan', self.logging_level, self.log_file)
178+
#self._workers = list()
179+
180+
def setup_resq(self):
181+
if isinstance(self.server,basestring):
182+
self.resq = ResQ(server=self.server, password=self.password)
183+
elif isinstance(self.server, ResQ):
184+
self.resq = self.server
185+
else:
186+
raise Exception("Bad server argument")
187+
188+
def validate_queues(self):
189+
"Checks if a worker is given atleast one queue to work on."
190+
if not self.queues:
191+
raise NoQueueError("Please give each worker at least one queue.")
192+
193+
def startup(self):
194+
self.register_signal_handlers()
195+
196+
197+
def register_signal_handlers(self):
198+
signal.signal(signal.SIGTERM, self.schedule_shutdown)
199+
signal.signal(signal.SIGINT, self.schedule_shutdown)
200+
signal.signal(signal.SIGQUIT, self.schedule_shutdown)
201+
signal.signal(signal.SIGUSR1, self.kill_child)
202+
signal.signal(signal.SIGUSR1, self.add_child)
203+
204+
def _schedule_shutdown(self):
205+
self.schedule_shutdown(None, None)
206+
207+
def schedule_shutdown(self, signum, frame):
208+
self.logger.info('Khan Shutdown scheduled')
209+
self._shutdown = True
210+
211+
def kill_child(self, signum, frame):
212+
self._remove_minion()
213+
214+
def add_child(self, signum, frame):
215+
self.add_minion()
216+
217+
def register_khan(self):
218+
if not hasattr(self, 'resq'):
219+
self.setup_resq()
220+
self.resq.redis.sadd('resque:khans',str(self))
221+
self.started = datetime.datetime.now()
222+
223+
def _check_commands(self):
224+
if not self._shutdown:
225+
self.logger.debug('Checking commands')
226+
command = self.resq.redis.lpop('resque:khan:%s' % str(self))
227+
self.logger.debug('COMMAND FOUND: %s ' % command)
228+
if command:
229+
import pdb;pdb.set_trace()
230+
self.process_command(command)
231+
self._check_commands()
232+
233+
def process_command(self, command):
234+
self.logger.info('Processing Command')
235+
#available commands, shutdown, add 1, remove 1
236+
command_item = self._command_map.get(command, None)
237+
if command_item:
238+
fn = getattr(self, command_item)
239+
if fn:
240+
fn()
241+
242+
def add_minion(self):
243+
m = self._add_minion()
244+
self.resq.redis.srem('resque:khans',str(self))
245+
self.pool_size += 1
246+
self.resq.redis.sadd('resque:khans',str(self))
247+
248+
def _add_minion(self):
249+
self.logger.info('Adding minion')
250+
m = Minion(self.queues, self.server, self.password, log_level=self.logging_level)
251+
m.start()
252+
self._workers[m.pid] = m
253+
self.logger.info('minion added at: %s' % m.pid)
254+
return m
255+
256+
def _shutdown_minions(self):
257+
"""
258+
send the SIGNINT signal to each worker in the pool.
259+
"""
260+
for minion in self._workers.values():
261+
minion.terminate()
262+
minion.join()
263+
264+
def _remove_minion(self, pid=None):
265+
#if pid:
266+
# m = self._workers.pop(pid)
267+
pid, m = self._workers.popitem(False)
268+
m.terminate()
269+
self.resq.redis.srem('resque:khans',str(self))
270+
self.pool_size -= 1
271+
self.resq.redis.sadd('resque:khans',str(self))
272+
return m
273+
274+
def unregister_khan(self):
275+
self.logger.debug('unregistering khan')
276+
self.resq.redis.srem('resque:khans',str(self))
277+
self.started = None
278+
279+
def setup_minions(self):
280+
for i in range(self.pool_size):
281+
self._add_minion()
282+
283+
def work(self, interval=2):
284+
self.startup()
285+
self.setup_minions()
286+
self.setup_resq()
287+
self.register_khan()
288+
while True:
289+
self._check_commands()
290+
if self._shutdown:
291+
#send signals to each child
292+
self._shutdown_minions()
293+
break
294+
#get job
295+
else:
296+
time.sleep(interval)
297+
self.unregister_khan()
298+
299+
def __str__(self):
300+
hostname = os.uname()[1]
301+
return '%s:%s:%s' % (hostname, self.pid, self.pool_size)
302+
303+
@classmethod
304+
def run(cls, pool_size=5, queues=[], server='localhost:6379', logging_level=logging.INFO):
305+
worker = cls(pool_size=pool_size, queues=queues, server=server, logging_level=logging_level)
306+
worker.work()
307+
308+
#if __name__ == "__main__":
309+
# k = Khan()
310+
# k.run()
311+
312+
if __name__ == "__main__":
313+
from optparse import OptionParser
314+
parser = OptionParser(usage="%prog [options] queue list")
315+
parser.add_option("-s", dest="server", default="localhost:6379")
316+
(options,args) = parser.parse_args()
317+
if len(args) < 1:
318+
parser.print_help()
319+
parser.error("Please give the horde at least one queue.")
320+
Khan.run(pool_size=2, queues=args, server=options.server)
321+
#khan.run()
322+
#Worker.run(queues, options.server)

pyres/json_parser.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
from datetime import datetime
22
try:
3-
import simplejson as json
4-
except ImportError:
3+
#import simplejson as json
54
import json
5+
except ImportError:
6+
import simplejson as json
67

78

89
DATE_FORMAT = '%Y-%m-%dT%H:%M:%S'

0 commit comments

Comments
 (0)