Skip to content

Commit 2463471

Browse files
author
Matt George
committed
Merge branch '0.7.0-fixes'
2 parents 85c2feb + fd0cf7f commit 2463471

File tree

6 files changed

+49
-23
lines changed

6 files changed

+49
-23
lines changed

pyres/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ def info(self):
170170
}
171171

172172
def keys(self):
173-
return [key.replace('resque:','') for key in self.redis.keys('*')]
173+
return [key.replace('resque:','') for key in self.redis.keys('resque:*')]
174174

175175
def reserve(self, queue):
176176
from pyres.job import Job

pyres/failure/base.py

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,15 @@ def __init__(self, exp, queue, payload, worker=None):
2020
excc, _, tb = sys.exc_info()
2121

2222
self._exception = excc
23-
self._traceback = tb
23+
self._traceback = traceback.format_exc()
2424
self._worker = worker
2525
self._queue = queue
2626
self._payload = payload
2727

2828

2929
def _parse_traceback(self, trace):
3030
"""Return the given traceback string formatted for a notification."""
31-
reversed_backtrace = list(
32-
reversed(traceback.extract_tb(trace))
33-
)
34-
p_traceback = []
35-
for filename, lineno, funcname, text in reversed_backtrace:
36-
p_traceback.append("%s:%s:%d:in `%s`" % (text, filename, lineno, funcname))
37-
#p_traceback = [ "%s:%d:in `%s'" % (filename, lineno, funcname)
38-
# for filename, lineno, funcname, _
39-
# in traceback.extract_tb(trace) ]
40-
#p_traceback.reverse()
41-
42-
return p_traceback
31+
return trace
4332

4433
def _parse_message(self, exc):
4534
"""Return a message for a notification from the given exception."""

pyres/worker.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import os, sys
88
import time
99
import json_parser as json
10+
import commands
1011

1112
class Worker(object):
1213
"""Defines a worker. The ``pyres_worker`` script instantiates this Worker class and
@@ -22,6 +23,7 @@ def __init__(self, queues=[], server="localhost:6379", password=None):
2223
self._shutdown = False
2324
self.child = None
2425
self.pid = os.getpid()
26+
self.hostname = os.uname()[1]
2527
if isinstance(server,basestring):
2628
self.resq = ResQ(server=server, password=password)
2729
elif isinstance(server, ResQ):
@@ -62,8 +64,21 @@ def unregister_worker(self):
6264
Stat("processed:%s" % self, self.resq).clear()
6365
Stat("failed:%s" % self, self.resq).clear()
6466

67+
def prune_dead_workers(self):
68+
all_workers = Worker.all(self.resq)
69+
known_workers = self.worker_pids()
70+
for worker in all_workers:
71+
host, pid, queues = worker.id.split(':')
72+
if host != self.hostname:
73+
continue
74+
if pid in known_workers:
75+
continue
76+
logging.warning("pruning dead worker: %s" % worker)
77+
worker.unregister_worker()
78+
6579
def startup(self):
6680
self.register_signal_handlers()
81+
self.prune_dead_workers()
6782
self.register_worker()
6883

6984
def register_signal_handlers(self):
@@ -83,12 +98,11 @@ def kill_child(self, signum, frame):
8398
if self.child:
8499
logging.info("Killing child at %s" % self.child)
85100
os.kill(self.child, signal.SIGKILL)
86-
101+
87102
def __str__(self):
88103
if getattr(self,'id', None):
89104
return self.id
90-
hostname = os.uname()[1]
91-
return '%s:%s:%s' % (hostname, self.pid, ','.join(self.queues))
105+
return '%s:%s:%s' % (self.hostname, self.pid, ','.join(self.queues))
92106

93107
def work(self, interval=5):
94108
"""Invoked by ``run`` method. ``work`` listens on a list of queues and sleeps
@@ -204,6 +218,13 @@ def processing(self):
204218

205219
def state(self):
206220
return 'working' if self.resq.redis.exists('resque:worker:%s' % self) else 'idle'
221+
222+
def worker_pids(self):
223+
"""Returns an array of all pids (as strings) of the workers on
224+
this machine. Used when pruning dead workers."""
225+
return map(lambda l: l.split(' ')[0],
226+
commands.getoutput("ps -A -o pid,command | \
227+
grep pyres_worker").split("\n"))
207228

208229
@classmethod
209230
def run(cls, queues, server, interval):

resweb/server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def working(request):
3030
def queues(request):
3131
return str(Queues(HOST).render())
3232

33-
@get('/queues/(?P<queue_id>\w+)/')
33+
@get('/queues/(?P<queue_id>\w.+)/')
3434
def queue(request, queue_id):
3535
start = int(request.GET.get('start',0))
3636
return str(Queue(HOST, queue_id, start).render())

resweb/views.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,9 @@ def failed_jobs(self):
212212
for job in failure.all(self.resq, self._start, self._start + 20):
213213
item = job
214214
item['worker_url'] = '/workers/%s/' % job['worker']
215-
item['payload_args'] = ','.join(job['payload']['args'])
215+
item['payload_args'] = str(job['payload']['args'])
216216
item['payload_class'] = job['payload']['class']
217-
item['traceback'] = '\n'.join(job['backtrace'])
217+
item['traceback'] = job['backtrace']
218218
jobs.append(item)
219219
return jobs
220220

@@ -287,7 +287,7 @@ def key_info(self):
287287

288288
stats.append({
289289
'key': str(key),
290-
'type': str(self.resq.redis.get_type('resque:'+key)),
290+
'type': str(self.resq.redis.type('resque:'+key)),
291291
'size': str(redis_size(key, self.resq))
292292
})
293293
return stats
@@ -308,7 +308,7 @@ def key(self):
308308
return str(self.stat_id)
309309

310310
def key_type(self):
311-
return str(self.resq.redis.get_type(self.stat_id))
311+
return str(self.resq.redis.type('resque:'+ str(self.stat_id)))
312312

313313
def items(self):
314314
items = []
@@ -407,7 +407,7 @@ def runat(self):
407407
"""
408408
pass
409409
def redis_size(key, resq):
410-
key_type = resq.redis.get_type('resque:'+key)
410+
key_type = resq.redis.type('resque:'+key)
411411
item = 0
412412
if key_type == 'list':
413413
item = resq.redis.llen('resque:'+key)

tests/test_worker.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,3 +156,19 @@ def test_state(self):
156156
worker.done_working()
157157
assert worker.state() == 'idle'
158158

159+
def test_prune_dead_workers(self):
160+
worker = Worker(['basic']) # we haven't registered this worker, so the assertion below holds
161+
assert self.redis.scard('resque:workers') == 0
162+
self.redis.sadd('resque:workers',"%s:%s:%s" % (os.uname()[1],'1','basic'))
163+
self.redis.sadd('resque:workers',"%s:%s:%s" % (os.uname()[1],'2','basic'))
164+
self.redis.sadd('resque:workers',"%s:%s:%s" % (os.uname()[1],'3','basic'))
165+
assert self.redis.scard('resque:workers') == 3
166+
worker.prune_dead_workers()
167+
assert self.redis.scard('resque:workers') == 0
168+
self.redis.sadd('resque:workers',"%s:%s:%s" % ('host-that-does-not-exist','1','basic'))
169+
self.redis.sadd('resque:workers',"%s:%s:%s" % ('host-that-does-not-exist','2','basic'))
170+
self.redis.sadd('resque:workers',"%s:%s:%s" % ('host-that-does-not-exist','3','basic'))
171+
worker.prune_dead_workers()
172+
# the assertion below should hold, because the workers we registered above are on a
173+
# different host, and thus should not be pruned by this process
174+
assert self.redis.scard('resque:workers') == 3

0 commit comments

Comments
 (0)