Skip to content

Commit a94eee3

Browse files
author
Matt George
committed
first working commit of horde work.
With this commit, the horde is working. However, there are no tests. On a positive note, completely un-scientific benchmarks show that the horde much faster than the older worker model.
1 parent 7d7ebba commit a94eee3

File tree

3 files changed

+229
-43
lines changed

3 files changed

+229
-43
lines changed

pyres/horde.py

Lines changed: 130 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,37 +3,39 @@
33
except:
44
import sys
55
sys.exit("multiprocessing was not available")
6-
import os, datetime, time, signal, sys
7-
from pyres import ResQ
8-
from pyres.exceptions import NoQueueError
6+
7+
import time, os, signal
98
from pyres.worker import Worker
9+
from pyres import ResQ
10+
from pyres.utils import OrderedDict
11+
import datetime
12+
1013
class Minion(multiprocessing.Process, Worker):
11-
def __init__(self, conn, queues=[]):
12-
self.conn = conn
14+
def __init__(self, queues, server, password):
15+
multiprocessing.Process.__init__(self, name='Minion')
1316
self.queues = queues
14-
super(Minion,self).__init__(name='Minion')
15-
16-
def _check_message(self):
17-
if self.conn.poll():
18-
message = self.conn.recv()
19-
self.process_message(message)
20-
21-
def process_message(self, message):
22-
pass
17+
self.validate_queues()
18+
self._shutdown = False
19+
self.child = None
20+
if isinstance(server,basestring):
21+
self.resq = ResQ(server=server, password=password)
22+
elif isinstance(server, ResQ):
23+
self.resq = server
24+
else:
25+
raise Exception("Bad server argument")
26+
#Worker.__init__(self, queues=queues, server="localhost:6379", password=None)
27+
#super(Minion,self).__init__(name='Minion')
2328

2429
def work(self, interval=5):
2530
self.startup()
2631
while True:
27-
self._check_messages()
2832
if self._shutdown:
2933
print 'shutdown scheduled'
3034
break
3135
job = self.reserve()
3236
if job:
3337
self.process(job)
3438
else:
35-
if interval == 0:
36-
break
3739
time.sleep(interval)
3840
self.unregister_worker()
3941

@@ -43,54 +45,141 @@ def run(self):
4345
# job = self.q.get()
4446
# print 'pid: %s is running %s ' % (self.pid,job)
4547

46-
class Khan(Worker):
48+
49+
class Khan(object):
4750
_command_map = {
48-
'ADD': '_add_minion',
51+
'ADD': 'add_minion',
4952
'REMOVE': '_remove_minion',
5053
'SHUTDOWN': '_schedule_shutdown'
5154
}
52-
_workers = {}
53-
def __init__(self, pool_size=5, queue_list=[], server='localhost:6379', password=None):
55+
_workers = OrderedDict()
56+
def __init__(self, pool_size=5, queues=[], server='localhost:6379', password=None):
57+
#super(Khan,self).__init__(queues=queues,server=server,password=password)
58+
self._shutdown = False
5459
self.pool_size = pool_size
55-
self.resq = ResQ(server, password=password)
56-
self._workers = list()
60+
self.queues = queues
61+
self.server = server
62+
self.password = password
63+
self.pid = os.getpid()
64+
if isinstance(server,basestring):
65+
self.resq = ResQ(server=server, password=password)
66+
elif isinstance(server, ResQ):
67+
self.resq = server
68+
else:
69+
raise Exception("Bad server argument")
70+
#self._workers = list()
71+
72+
def startup(self):
73+
self.register_signal_handlers()
74+
self.register_worker()
75+
76+
def register_signal_handlers(self):
77+
signal.signal(signal.SIGTERM, self.schedule_shutdown)
78+
signal.signal(signal.SIGINT, self.schedule_shutdown)
79+
signal.signal(signal.SIGQUIT, self.schedule_shutdown)
80+
signal.signal(signal.SIGUSR1, self.kill_child)
81+
signal.signal(signal.SIGUSR1, self.add_child)
82+
83+
def _schedule_shutdown(self):
84+
self.schedule_shutdown(None, None)
85+
86+
def schedule_shutdown(self, signum, frame):
87+
print 'Shutdown scheduled'
88+
self._shutdown = True
89+
90+
def kill_child(self, signum, frame):
91+
self._remove_minion()
92+
93+
def add_child(self, signum, frame):
94+
self.add_minion()
5795

58-
def _check_command(self):
96+
def register_khan(self):
97+
self.resq.redis.sadd('resque:khans',str(self))
98+
self.started = datetime.datetime.now()
99+
100+
def _check_commands(self):
59101
if not self._shutdown:
60-
command = self.resq.redis.pop('resque:khan:%s' % self)
102+
print 'Checking commands'
103+
command_key = 'resque:khan:%s' % self
104+
command = self.resq.redis.pop(command_key)
105+
print 'COMMAND', command
61106
if command:
62107
self.process_command(command)
63-
self._check_command()
108+
self._check_commands()
64109

65110
def process_command(self, command):
111+
print 'Processing Command'
66112
#available commands, shutdown, add 1, remove 1
67113
command = self._command_map.get(command, None)
68114
if command:
69115
fn = getattr(self, command)
70116
if fn:
71117
fn()
72118

73-
def _add_minion(self):
74-
parent_conn, child_conn = multiprocessing.Pipe()
75-
m = Minion(child_conn, q)
119+
def add_minion(self):
120+
print 'Adding minion'
121+
m = self._add_minion()
76122
m.start()
77-
self._workers.append((parent_conn, m))
78123

79-
def _remove_minion(self):
80-
self._workers
81-
def run():
124+
def _add_minion(self):
125+
print 'Adding mminion'
126+
#parent_conn, child_conn = multiprocessing.Pipe()
127+
m = Minion(self.queues, self.server, self.password)
128+
#m.start()
129+
self._workers[m.pid] = m
130+
return m
131+
#self._workers.append(m)
132+
133+
def _shutdown_minions(self):
134+
"""
135+
send the SIGNINT signal to each worker in the pool.
136+
"""
137+
for minion in self._workers.values():
138+
minion.terminate()
139+
minion.join()
140+
141+
def _remove_minion(self, pid=None):
142+
#if pid:
143+
# m = self._workers.pop(pid)
144+
pid, m = self._workers.popitem(False)
145+
m.terminate()
146+
return m
147+
148+
def register_worker(self):
149+
self.resq.redis.sadd('resque:khans',str(self))
150+
#self.resq._redis.add("worker:#{self}:started", Time.now.to_s)
151+
self.started = datetime.datetime.now()
152+
153+
def unregister_worker(self):
154+
print 'Unregistering'
155+
self.resq.redis.srem('resque:khans',str(self))
156+
self.started = None
157+
158+
def work(self, interval=5):
82159
self.startup()
83-
q = multiprocessing.Queue()
84-
for i in range(pool_size):
85-
self._add_minion()
160+
for i in range(self.pool_size):
161+
m = self._add_minion()
162+
m.start()
163+
86164
while True:
87165
self._check_commands()
88166
if self._shutdown:
89167
#send signals to each child
168+
self._shutdown_minions()
90169
break
91-
time.sleep(5)
170+
#get job
171+
else:
172+
time.sleep(interval)
92173
self.unregister_worker()
93-
174+
175+
def __str__(self):
176+
hostname = os.uname()[1]
177+
return '%s:%s' % (hostname, self.pid)
178+
179+
@classmethod
180+
def run(cls, pool_size=5, queues=[], server='localhost:6379'):
181+
worker = cls(pool_size=pool_size, queues=queues, server=server)
182+
worker.work()
94183

95184
#if __name__ == "__main__":
96185
# k = Khan()
@@ -104,6 +193,6 @@ def run():
104193
if len(args) < 1:
105194
parser.print_help()
106195
parser.error("Please give the horde at least one queue.")
107-
khan = Khan(queue_list=args, server=options.dest)
108-
khan.run()
109-
#Worker.run(queues, options.server)
196+
Khan.run(pool_size=2, queues=args, server=options.server)
197+
#khan.run()
198+
#Worker.run(queues, options.server)

pyres/utils.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
from UserDict import DictMixin
2+
3+
class OrderedDict(dict, DictMixin):
4+
5+
def __init__(self, *args, **kwds):
6+
if len(args) > 1:
7+
raise TypeError('expected at most 1 arguments, got %d' % len(args))
8+
try:
9+
self.__end
10+
except AttributeError:
11+
self.clear()
12+
self.update(*args, **kwds)
13+
14+
def clear(self):
15+
self.__end = end = []
16+
end += [None, end, end] # sentinel node for doubly linked list
17+
self.__map = {} # key --> [key, prev, next]
18+
dict.clear(self)
19+
20+
def __setitem__(self, key, value):
21+
if key not in self:
22+
end = self.__end
23+
curr = end[1]
24+
curr[2] = end[1] = self.__map[key] = [key, curr, end]
25+
dict.__setitem__(self, key, value)
26+
27+
def __delitem__(self, key):
28+
dict.__delitem__(self, key)
29+
key, prev, next = self.__map.pop(key)
30+
prev[2] = next
31+
next[1] = prev
32+
33+
def __iter__(self):
34+
end = self.__end
35+
curr = end[2]
36+
while curr is not end:
37+
yield curr[0]
38+
curr = curr[2]
39+
40+
def __reversed__(self):
41+
end = self.__end
42+
curr = end[1]
43+
while curr is not end:
44+
yield curr[0]
45+
curr = curr[1]
46+
47+
def popitem(self, last=True):
48+
if not self:
49+
raise KeyError('dictionary is empty')
50+
key = reversed(self).next() if last else iter(self).next()
51+
value = self.pop(key)
52+
return key, value
53+
54+
def __reduce__(self):
55+
items = [[k, self[k]] for k in self]
56+
tmp = self.__map, self.__end
57+
del self.__map, self.__end
58+
inst_dict = vars(self).copy()
59+
self.__map, self.__end = tmp
60+
if inst_dict:
61+
return (self.__class__, (items,), inst_dict)
62+
return self.__class__, (items,)
63+
64+
def keys(self):
65+
return list(self)
66+
67+
setdefault = DictMixin.setdefault
68+
update = DictMixin.update
69+
pop = DictMixin.pop
70+
values = DictMixin.values
71+
items = DictMixin.items
72+
iterkeys = DictMixin.iterkeys
73+
itervalues = DictMixin.itervalues
74+
iteritems = DictMixin.iteritems
75+
76+
def __repr__(self):
77+
if not self:
78+
return '%s()' % (self.__class__.__name__,)
79+
return '%s(%r)' % (self.__class__.__name__, self.items())
80+
81+
def copy(self):
82+
return self.__class__(self)
83+
84+
@classmethod
85+
def fromkeys(cls, iterable, value=None):
86+
d = cls()
87+
for key in iterable:
88+
d[key] = value
89+
return d
90+
91+
def __eq__(self, other):
92+
if isinstance(other, OrderedDict):
93+
return len(self)==len(other) and \
94+
all(p==q for p, q in zip(self.items(), other.items()))
95+
return dict.__eq__(self, other)
96+
97+
def __ne__(self, other):
98+
return not self == other

pyres/worker.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ def register_worker(self):
4141
self.resq.redis.sadd('resque:workers',str(self))
4242
#self.resq._redis.add("worker:#{self}:started", Time.now.to_s)
4343
self.started = datetime.datetime.now()
44-
4544

4645
def _set_started(self, dt):
4746
if dt:
@@ -228,7 +227,7 @@ def worker_pids(self):
228227
grep pyres_worker").split("\n"))
229228

230229
@classmethod
231-
def run(cls, queues, server, interval):
230+
def run(cls, queues, server="localhost:6379", interval):
232231
worker = cls(queues=queues, server=server)
233232
if interval is not None:
234233
worker.work(interval)

0 commit comments

Comments
 (0)