@@ -154,7 +154,6 @@ def __init__(self, pool_size=5, queues=[], server='localhost:6379', password=Non
154
154
self ._workers = OrderedDict ()
155
155
self .server = server
156
156
self .password = password
157
-
158
157
#self._workers = list()
159
158
160
159
def setup_resq (self ):
@@ -196,15 +195,12 @@ def add_child(self, signum, frame):
196
195
def register_khan (self ):
197
196
if not hasattr (self , 'resq' ):
198
197
self .setup_resq ()
199
-
200
198
self .resq .redis .sadd ('resque:khans' ,str (self ))
201
199
self .started = datetime .datetime .now ()
202
200
203
201
def _check_commands (self ):
204
202
if not self ._shutdown :
205
203
logging .debug ('Checking commands' )
206
- command_key = 'resque:khan:%s' % self
207
-
208
204
command = self .resq .redis .lpop ('resque:khan:%s' % str (self ))
209
205
logging .debug ('COMMAND FOUND: %s ' % command )
210
206
if command :
@@ -223,15 +219,17 @@ def process_command(self, command):
223
219
224
220
def add_minion (self ):
225
221
m = self ._add_minion ()
226
- m .start ()
222
+ self .resq .redis .srem ('resque:khans' ,str (self ))
223
+ self .pool_size += 1
224
+ self .resq .redis .sadd ('resque:khans' ,str (self ))
227
225
228
226
def _add_minion (self ):
229
227
logging .info ('Adding minion' )
230
- #parent_conn, child_conn = multiprocessing.Pipe()
231
228
m = Minion (self .queues , self .server , self .password )
232
- #m.start()
229
+ m .start ()
230
+ self ._workers [m .pid ] = m
231
+ logging .info ('minion added at: %s' % m .pid )
233
232
return m
234
- #self._workers.append(m)
235
233
236
234
def _shutdown_minions (self ):
237
235
"""
@@ -246,20 +244,23 @@ def _remove_minion(self, pid=None):
246
244
# m = self._workers.pop(pid)
247
245
pid , m = self ._workers .popitem (False )
248
246
m .terminate ()
247
+ self .resq .redis .srem ('resque:khans' ,str (self ))
248
+ self .pool_size -= 1
249
+ self .resq .redis .sadd ('resque:khans' ,str (self ))
249
250
return m
250
251
251
252
def unregister_khan (self ):
252
253
logging .debug ('unregistering khan' )
253
254
self .resq .redis .srem ('resque:khans' ,str (self ))
254
255
self .started = None
255
256
257
+ def setup_minions (self ):
258
+ for i in range (self .pool_size ):
259
+ self ._add_minion ()
260
+
256
261
def work (self , interval = 2 ):
257
262
self .startup ()
258
- for i in range (self .pool_size ):
259
- m = self ._add_minion ()
260
- m .start ()
261
- self ._workers [m .pid ] = m
262
- logging .info ('minion added at %s' % m .pid )
263
+ self .setup_minions ()
263
264
self .setup_resq ()
264
265
self .register_khan ()
265
266
while True :
@@ -275,7 +276,7 @@ def work(self, interval=2):
275
276
276
277
def __str__ (self ):
277
278
hostname = os .uname ()[1 ]
278
- return '%s:%s' % (hostname , self .pid )
279
+ return '%s:%s:%s ' % (hostname , self .pid , self . pool_size )
279
280
280
281
@classmethod
281
282
def run (cls , pool_size = 5 , queues = [], server = 'localhost:6379' ):
0 commit comments