Skip to content

Commit de677af

Browse files
mjrussoMatt George
authored andcommitted
prune dead workers
1 parent 85c2feb commit de677af

File tree

2 files changed

+40
-3
lines changed

2 files changed

+40
-3
lines changed

pyres/worker.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import os, sys
88
import time
99
import json_parser as json
10+
import commands
1011

1112
class Worker(object):
1213
"""Defines a worker. The ``pyres_worker`` script instantiates this Worker class and
@@ -22,6 +23,7 @@ def __init__(self, queues=[], server="localhost:6379", password=None):
2223
self._shutdown = False
2324
self.child = None
2425
self.pid = os.getpid()
26+
self.hostname = os.uname()[1]
2527
if isinstance(server,basestring):
2628
self.resq = ResQ(server=server, password=password)
2729
elif isinstance(server, ResQ):
@@ -62,8 +64,21 @@ def unregister_worker(self):
6264
Stat("processed:%s" % self, self.resq).clear()
6365
Stat("failed:%s" % self, self.resq).clear()
6466

67+
def prune_dead_workers(self):
68+
all_workers = Worker.all(self.resq)
69+
known_workers = self.worker_pids()
70+
for worker in all_workers:
71+
host, pid, queues = worker.id.split(':')
72+
if host != self.hostname:
73+
continue
74+
if pid in known_workers:
75+
continue
76+
logging.warning("pruning dead worker: %s" % worker)
77+
worker.unregister_worker()
78+
6579
def startup(self):
6680
self.register_signal_handlers()
81+
self.prune_dead_workers()
6782
self.register_worker()
6883

6984
def register_signal_handlers(self):
@@ -83,12 +98,11 @@ def kill_child(self, signum, frame):
8398
if self.child:
8499
logging.info("Killing child at %s" % self.child)
85100
os.kill(self.child, signal.SIGKILL)
86-
101+
87102
def __str__(self):
88103
if getattr(self,'id', None):
89104
return self.id
90-
hostname = os.uname()[1]
91-
return '%s:%s:%s' % (hostname, self.pid, ','.join(self.queues))
105+
return '%s:%s:%s' % (self.hostname, self.pid, ','.join(self.queues))
92106

93107
def work(self, interval=5):
94108
"""Invoked by ``run`` method. ``work`` listens on a list of queues and sleeps
@@ -204,6 +218,13 @@ def processing(self):
204218

205219
def state(self):
206220
return 'working' if self.resq.redis.exists('resque:worker:%s' % self) else 'idle'
221+
222+
def worker_pids(self):
223+
"""Returns an array of all pids (as strings) of the workers on
224+
this machine. Used when pruning dead workers."""
225+
return map(lambda l: l.split(' ')[0],
226+
commands.getoutput("ps -A -o pid,command | \
227+
grep pyres_worker").split("\n"))
207228

208229
@classmethod
209230
def run(cls, queues, server, interval):

tests/test_worker.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,3 +156,19 @@ def test_state(self):
156156
worker.done_working()
157157
assert worker.state() == 'idle'
158158

159+
def test_prune_dead_workers(self):
160+
worker = Worker(['basic']) # we haven't registered this worker, so the assertion below holds
161+
assert self.redis.scard('resque:workers') == 0
162+
self.redis.sadd('resque:workers',"%s:%s:%s" % (os.uname()[1],'1','basic'))
163+
self.redis.sadd('resque:workers',"%s:%s:%s" % (os.uname()[1],'2','basic'))
164+
self.redis.sadd('resque:workers',"%s:%s:%s" % (os.uname()[1],'3','basic'))
165+
assert self.redis.scard('resque:workers') == 3
166+
worker.prune_dead_workers()
167+
assert self.redis.scard('resque:workers') == 0
168+
self.redis.sadd('resque:workers',"%s:%s:%s" % ('host-that-does-not-exist','1','basic'))
169+
self.redis.sadd('resque:workers',"%s:%s:%s" % ('host-that-does-not-exist','2','basic'))
170+
self.redis.sadd('resque:workers',"%s:%s:%s" % ('host-that-does-not-exist','3','basic'))
171+
worker.prune_dead_workers()
172+
# the assertion below should hold, because the workers we registered above are on a
173+
# different host, and thus should not be pruned by this process
174+
assert self.redis.scard('resque:workers') == 3

0 commit comments

Comments
 (0)