@@ -17,10 +17,10 @@ def setproctitle(name):
17
17
class Worker (object ):
18
18
"""Defines a worker. The ``pyres_worker`` script instantiates this Worker class and
19
19
passes a comma-separated list of queues to listen on.::
20
-
20
+
21
21
>>> from pyres.worker import Worker
22
22
>>> Worker.run([queue1, queue2], server="localhost:6379")
23
-
23
+
24
24
"""
25
25
def __init__ (self , queues = [], server = "localhost:6379" , password = None ):
26
26
self .queues = queues
@@ -35,40 +35,40 @@ def __init__(self, queues=[], server="localhost:6379", password=None):
35
35
self .resq = server
36
36
else :
37
37
raise Exception ("Bad server argument" )
38
-
39
-
38
+
39
+
40
40
def validate_queues (self ):
41
41
"""Checks if a worker is given at least one queue to work on."""
42
42
if not self .queues :
43
43
raise NoQueueError ("Please give each worker at least one queue." )
44
-
44
+
45
45
def register_worker (self ):
46
46
self .resq .redis .sadd ('resque:workers' ,str (self ))
47
47
#self.resq._redis.add("worker:#{self}:started", Time.now.to_s)
48
48
self .started = datetime .datetime .now ()
49
-
49
+
50
50
def _set_started (self , dt ):
51
51
if dt :
52
52
key = int (time .mktime (dt .timetuple ()))
53
53
self .resq .redis .set ("resque:worker:%s:started" % self , key )
54
54
else :
55
55
self .resq .redis .delete ("resque:worker:%s:started" % self )
56
-
56
+
57
57
def _get_started (self ):
58
58
datestring = self .resq .redis .get ("resque:worker:%s:started" % self )
59
59
#ds = None
60
60
#if datestring:
61
61
# ds = datetime.datetime.strptime(datestring, '%Y-%m-%d %H:%M:%S')
62
62
return datestring
63
-
63
+
64
64
started = property (_get_started , _set_started )
65
-
65
+
66
66
def unregister_worker (self ):
67
67
self .resq .redis .srem ('resque:workers' ,str (self ))
68
68
self .started = None
69
69
Stat ("processed:%s" % self , self .resq ).clear ()
70
70
Stat ("failed:%s" % self , self .resq ).clear ()
71
-
71
+
72
72
def prune_dead_workers (self ):
73
73
all_workers = Worker .all (self .resq )
74
74
known_workers = self .worker_pids ()
@@ -85,20 +85,20 @@ def startup(self):
85
85
self .register_signal_handlers ()
86
86
self .prune_dead_workers ()
87
87
self .register_worker ()
88
-
88
+
89
89
def register_signal_handlers (self ):
90
90
signal .signal (signal .SIGTERM , self .shutdown_all )
91
91
signal .signal (signal .SIGINT , self .shutdown_all )
92
92
signal .signal (signal .SIGQUIT , self .schedule_shutdown )
93
93
signal .signal (signal .SIGUSR1 , self .kill_child )
94
-
94
+
95
95
def shutdown_all (self , signum , frame ):
96
96
self .schedule_shutdown (signum , frame )
97
97
self .kill_child (signum , frame )
98
-
98
+
99
99
def schedule_shutdown (self , signum , frame ):
100
100
self ._shutdown = True
101
-
101
+
102
102
def kill_child (self , signum , frame ):
103
103
if self .child :
104
104
logging .info ("Killing child at %s" % self .child )
@@ -108,19 +108,19 @@ def __str__(self):
108
108
if getattr (self ,'id' , None ):
109
109
return self .id
110
110
return '%s:%s:%s' % (self .hostname , self .pid , ',' .join (self .queues ))
111
-
111
+
112
112
def work (self , interval = 5 ):
113
113
"""Invoked by ``run`` method. ``work`` listens on a list of queues and sleeps
114
- for ``interval`` time.
115
-
114
+ for ``interval`` time.
115
+
116
116
``interval`` -- Number of seconds the worker will wait until processing the next job. Default is "5".
117
-
117
+
118
118
Whenever a worker finds a job on the queue it first calls ``reserve`` on
119
- that job to make sure another worker won't run it, then *forks* itself to
119
+ that job to make sure another worker won't run it, then *forks* itself to
120
120
work on that job.
121
-
121
+
122
122
Finally, the ``process`` method actually processes the job by eventually calling the Job instance's ``perform`` method.
123
-
123
+
124
124
"""
125
125
setproctitle ('pyres_worker-%s: Starting' % __version__ )
126
126
self .startup ()
@@ -157,7 +157,7 @@ def work(self, interval=5):
157
157
setproctitle ("pyres_worker-%s: Waiting for %s " % (__version__ , ',' .join (self .queues )))
158
158
time .sleep (interval )
159
159
self .unregister_worker ()
160
-
160
+
161
161
def process (self , job = None ):
162
162
if not job :
163
163
job = self .reserve ()
@@ -174,15 +174,15 @@ def process(self, job=None):
174
174
logging .debug ('job details: %s' % job )
175
175
finally :
176
176
self .done_working ()
177
-
177
+
178
178
def reserve (self ):
179
179
for q in self .queues :
180
180
logging .debug ('checking queue %s' % q )
181
181
job = Job .reserve (q , self .resq , self .__str__ ())
182
182
if job :
183
183
logging .info ('Found job on %s' % q )
184
184
return job
185
-
185
+
186
186
def working_on (self , job ):
187
187
logging .debug ('marking as working on' )
188
188
data = {
@@ -194,38 +194,38 @@ def working_on(self, job):
194
194
self .resq .redis ["resque:worker:%s" % str (self )] = data
195
195
logging .debug ("worker:%s" % str (self ))
196
196
logging .debug (self .resq .redis ["resque:worker:%s" % str (self )])
197
-
197
+
198
198
def done_working (self ):
199
199
logging .info ('done working' )
200
200
self .processed ()
201
201
self .resq .redis .delete ("resque:worker:%s" % str (self ))
202
-
202
+
203
203
def processed (self ):
204
204
total_processed = Stat ("processed" , self .resq )
205
205
worker_processed = Stat ("processed:%s" % str (self ), self .resq )
206
206
total_processed .incr ()
207
207
worker_processed .incr ()
208
-
208
+
209
209
def get_processed (self ):
210
210
return Stat ("processed:%s" % str (self ), self .resq ).get ()
211
-
211
+
212
212
def failed (self ):
213
213
Stat ("failed" , self .resq ).incr ()
214
214
Stat ("failed:%s" % self , self .resq ).incr ()
215
-
215
+
216
216
def get_failed (self ):
217
217
return Stat ("failed:%s" % self , self .resq ).get ()
218
-
218
+
219
219
def job (self ):
220
220
data = self .resq .redis .get ("resque:worker:%s" % self )
221
221
if data :
222
222
return ResQ .decode (data )
223
223
return {}
224
224
225
-
225
+
226
226
def processing (self ):
227
227
return self .job ()
228
-
228
+
229
229
def state (self ):
230
230
return 'working' if self .resq .redis .exists ('resque:worker:%s' % self ) else 'idle'
231
231
@@ -235,32 +235,32 @@ def worker_pids(self):
235
235
return map (lambda l : l .strip ().split (' ' )[0 ],
236
236
commands .getoutput ("ps -A -o pid,command | \
237
237
grep pyres_worker" ).split ("\n " ))
238
-
238
+
239
239
@classmethod
240
240
def run (cls , queues , server = "localhost:6379" , interval = None ):
241
241
worker = cls (queues = queues , server = server )
242
242
if interval is not None :
243
- worker .work (interval )
243
+ worker .work (interval )
244
244
else :
245
245
worker .work ()
246
-
246
+
247
247
@classmethod
248
248
def all (cls , host = "localhost:6379" ):
249
249
if isinstance (host ,basestring ):
250
250
resq = ResQ (host )
251
251
elif isinstance (host , ResQ ):
252
252
resq = host
253
-
253
+
254
254
return [Worker .find (w ,resq ) for w in resq .redis .smembers ('resque:workers' ) or []]
255
-
255
+
256
256
@classmethod
257
257
def working (cls , host ):
258
258
if isinstance (host , basestring ):
259
259
resq = ResQ (host )
260
260
elif isinstance (host , ResQ ):
261
261
resq = host
262
262
total = []
263
- for key in Worker .all (host ):
263
+ for key in Worker .all (host ):
264
264
total .append ('resque:worker:%s' % key )
265
265
names = []
266
266
for key in total :
@@ -269,7 +269,7 @@ def working(cls, host):
269
269
w = Worker .find (key [14 :], resq ) #resque:worker:
270
270
names .append (w )
271
271
return names
272
-
272
+
273
273
@classmethod
274
274
def find (cls , worker_id , resq ):
275
275
if Worker .exists (worker_id , resq ):
@@ -279,11 +279,11 @@ def find(cls, worker_id, resq):
279
279
return worker
280
280
else :
281
281
return None
282
-
282
+
283
283
@classmethod
284
284
def exists (cls , worker_id , resq ):
285
285
return resq .redis .sismember ('resque:workers' , worker_id )
286
-
286
+
287
287
288
288
if __name__ == "__main__" :
289
289
from optparse import OptionParser
0 commit comments