Skip to content

Commit 605d8f8

Browse files
author
Matt George
committed
updates to the horde package
1 parent fca54dc commit 605d8f8

File tree

3 files changed

+56
-11
lines changed

3 files changed

+56
-11
lines changed

pyres/horde.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@
55
sys.exit("multiprocessing was not available")
66

77
import time, os, signal
8+
import datetime
9+
import logging
10+
811
from pyres.worker import Worker
912
from pyres import ResQ
1013
from pyres.exceptions import NoQueueError
1114
from pyres.utils import OrderedDict
12-
import datetime
15+
1316

1417
class Minion(multiprocessing.Process, Worker):
1518
def __init__(self, queues, server, password):
@@ -18,6 +21,7 @@ def __init__(self, queues, server, password):
1821
self.validate_queues()
1922
self._shutdown = False
2023
self.child = None
24+
self.hostname = os.uname()[1]
2125
if isinstance(server,basestring):
2226
self.resq = ResQ(server=server, password=password)
2327
elif isinstance(server, ResQ):
@@ -26,12 +30,14 @@ def __init__(self, queues, server, password):
2630
raise Exception("Bad server argument")
2731
#Worker.__init__(self, queues=queues, server="localhost:6379", password=None)
2832
#super(Minion,self).__init__(name='Minion')
33+
def prune_dead_workers(self):
34+
pass
2935

3036
def work(self, interval=5):
3137
self.startup()
3238
while True:
3339
if self._shutdown:
34-
print 'shutdown scheduled'
40+
logging.info('shutdown scheduled')
3541
break
3642
job = self.reserve()
3743
if job:
@@ -53,7 +59,6 @@ class Khan(object):
5359
'REMOVE': '_remove_minion',
5460
'SHUTDOWN': '_schedule_shutdown'
5561
}
56-
_workers = OrderedDict()
5762
def __init__(self, pool_size=5, queues=[], server='localhost:6379', password=None):
5863
#super(Khan,self).__init__(queues=queues,server=server,password=password)
5964
self._shutdown = False
@@ -63,6 +68,7 @@ def __init__(self, pool_size=5, queues=[], server='localhost:6379', password=Non
6368
self.password = password
6469
self.pid = os.getpid()
6570
self.validate_queues()
71+
self._workers = OrderedDict()
6672
if isinstance(server,basestring):
6773
self.resq = ResQ(server=server, password=password)
6874
elif isinstance(server, ResQ):
@@ -91,7 +97,7 @@ def _schedule_shutdown(self):
9197
self.schedule_shutdown(None, None)
9298

9399
def schedule_shutdown(self, signum, frame):
94-
print 'Shutdown scheduled'
100+
logging.info('Khan Shutdown scheduled')
95101
self._shutdown = True
96102

97103
def kill_child(self, signum, frame):
@@ -106,10 +112,10 @@ def register_khan(self):
106112

107113
def _check_commands(self):
108114
if not self._shutdown:
109-
print 'Checking commands'
115+
logging.debug('Checking commands')
110116
command_key = 'resque:khan:%s' % self
111-
command = self.resq.redis.pop(command_key)
112-
print 'COMMAND', command
117+
command = self.resq.redis.lpop(command_key)
118+
logging.debug('COMMAND FOUND:', command)
113119
if command:
114120
self.process_command(command)
115121
self._check_commands()
@@ -124,16 +130,16 @@ def process_command(self, command):
124130
fn()
125131

126132
def add_minion(self):
127-
print 'Adding minion'
128133
m = self._add_minion()
129134
m.start()
130135

131136
def _add_minion(self):
132-
print 'Adding mminion'
137+
logging.info('Adding minion')
133138
#parent_conn, child_conn = multiprocessing.Pipe()
134139
m = Minion(self.queues, self.server, self.password)
135140
#m.start()
136141
self._workers[m.pid] = m
142+
logging.info('minion added at %s' % m.pid)
137143
return m
138144
#self._workers.append(m)
139145

@@ -153,12 +159,13 @@ def _remove_minion(self, pid=None):
153159
return m
154160

155161
def register_worker(self):
162+
logging.debug('registering khan')
156163
self.resq.redis.sadd('resque:khans',str(self))
157164
#self.resq._redis.add("worker:#{self}:started", Time.now.to_s)
158165
self.started = datetime.datetime.now()
159166

160167
def unregister_worker(self):
161-
print 'Unregistering'
168+
logging.debug('unregistering khan')
162169
self.resq.redis.srem('resque:khans',str(self))
163170
self.started = None
164171

pyres/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ def worker_pids(self):
227227
grep pyres_worker").split("\n"))
228228

229229
@classmethod
230-
def run(cls, queues, server="localhost:6379", interval):
230+
def run(cls, queues, server="localhost:6379", interval=None):
231231
worker = cls(queues=queues, server=server)
232232
if interval is not None:
233233
worker.work(interval)

scripts/pyres_manager

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#!/usr/bin/env python
2+
3+
import logging
4+
5+
from optparse import OptionParser
6+
from pyres.horde import Khan
7+
8+
9+
10+
def main():
11+
12+
usage = "usage: %prog [options] arg1"
13+
parser = OptionParser(usage=usage)
14+
#parser.add_option("-q", dest="queue_list")
15+
parser.add_option("--host", dest="host", default="localhost")
16+
parser.add_option("--port",dest="port",type="int", default=6379)
17+
parser.add_option("-i", '--interval', dest='interval', default=None, help='the default time interval to sleep between runs')
18+
parser.add_option('-l', '--log-level', dest='log_level', default='info', help='log level. Valid values are "debug", "info", "warning", "error", "critical", in decreasing order of verbosity. Defaults to "info" if parameter not specified.')
19+
parser.add_option("--pool", type="int", dest="pool_size", default=1, help="Number of minions to spawn under the manager.")
20+
(options,args) = parser.parse_args()
21+
22+
if len(args) != 1:
23+
parser.print_help()
24+
parser.error("Argument must be a comma seperated list of queues")
25+
26+
log_level = getattr(logging, options.log_level.upper(), 'INFO')
27+
logging.basicConfig(level=log_level, format="%(asctime)s: %(levelname)s: %(message)s")
28+
29+
interval = options.interval
30+
if interval is not None:
31+
interval = float(interval)
32+
33+
queues = args[0].split(',')
34+
server = '%s:%s' % (options.host,options.port)
35+
Khan.run(pool_size=options.pool_size, queues=queues, server=server)
36+
37+
if __name__ == "__main__":
38+
main()

0 commit comments

Comments
 (0)