1
+ import sys
1
2
try :
2
3
import multiprocessing
3
4
except :
4
- import sys
5
5
sys .exit ("multiprocessing was not available" )
6
6
7
7
import time , os , signal
8
8
import datetime
9
9
import logging
10
10
11
- from pyres .worker import Worker
12
11
from pyres import ResQ
13
12
from pyres .exceptions import NoQueueError
14
13
from pyres .utils import OrderedDict
14
+ from pyres .job import Job
15
+ import pyres .json_parser as json
15
16
16
-
17
- class Minion (multiprocessing .Process , Worker ):
17
+ class Minion (multiprocessing .Process ):
18
18
def __init__ (self , queues , server , password ):
19
+ #super(Minion,self).__init__(name='Minion')
19
20
multiprocessing .Process .__init__ (self , name = 'Minion' )
21
+
22
+ format = '%(asctime)s %(levelname)s %(filename)s-%(lineno)d: %(message)s'
23
+ logHandler = logging .StreamHandler ()
24
+ logHandler .setFormatter (logging .Formatter (format ))
25
+ self .logger = multiprocessing .get_logger ()
26
+ self .logger .addHandler (logHandler )
27
+ self .logger .setLevel (logging .DEBUG )
28
+
20
29
self .queues = queues
21
- self .validate_queues ()
22
30
self ._shutdown = False
23
- self .child = None
24
31
self .hostname = os .uname ()[1 ]
25
- if isinstance (server ,basestring ):
26
- self .resq = ResQ (server = server , password = password )
27
- elif isinstance (server , ResQ ):
28
- self .resq = server
29
- else :
30
- raise Exception ("Bad server argument" )
32
+ self .server = server
33
+ self .password = password
34
+
31
35
#Worker.__init__(self, queues=queues, server="localhost:6379", password=None)
32
- #super(Minion,self).__init__(name='Minion')
36
+ #
37
+
33
38
def prune_dead_workers (self ):
34
39
pass
35
40
41
+ def schedule_shutdown (self , signum , frame ):
42
+ self ._shutdown = True
43
+
44
+ def register_signal_handlers (self ):
45
+ signal .signal (signal .SIGTERM , self .schedule_shutdown )
46
+ signal .signal (signal .SIGINT , self .schedule_shutdown )
47
+ signal .signal (signal .SIGQUIT , self .schedule_shutdown )
48
+
49
+ def register_minion (self ):
50
+ pass
51
+
52
+ def startup (self ):
53
+ self .register_signal_handlers ()
54
+ self .prune_dead_workers ()
55
+ self .register_minion ()
56
+
57
+ def __str__ (self ):
58
+ return '%s:%s:%s' % (self .hostname , self .pid , ',' .join (self .queues ))
59
+
60
+ def reserve (self ):
61
+ for q in self .queues :
62
+ self .logger .debug ('checking queue: %s' % q )
63
+ job = Job .reserve (q , self .resq , self .__str__ ())
64
+ if job :
65
+ self .logger .info ('Found job on %s' % q )
66
+ return job
67
+
68
+ def process (self , job ):
69
+ if not job :
70
+ return
71
+ try :
72
+ self .working_on (job )
73
+ return job .perform ()
74
+ except Exception , e :
75
+ exceptionType , exceptionValue , exceptionTraceback = sys .exc_info ()
76
+ self .logger .error ("%s failed: %s" % (job , e ))
77
+ job .fail (exceptionTraceback )
78
+ self .failed ()
79
+ else :
80
+ self .logger .info ('completed job' )
81
+ self .logger .debug ('job details: %s' % job )
82
+ finally :
83
+ self .done_working ()
84
+
85
+ def working_on (self , job ):
86
+ self .logger .debug ('marking as working on' )
87
+ data = {
88
+ 'queue' : job ._queue ,
89
+ 'run_at' : int (time .mktime (datetime .datetime .now ().timetuple ())),
90
+ 'payload' : job ._payload
91
+ }
92
+ data = json .dumps (data )
93
+ self .resq .redis ["resque:minion:%s" % str (self )] = data
94
+ self .logger .debug ("minion:%s" % str (self ))
95
+ #self.logger.debug(self.resq.redis["resque:minion:%s" % str(self)])
96
+
97
+ def failed (self ):
98
+ pass
99
+
100
+ def done_working (self ):
101
+ self .logger .info ('done working' )
102
+ #self.processed()
103
+ self .resq .redis .delete ("resque:minion:%s" % str (self ))
104
+
105
+ def unregister_worker (self ):
106
+ pass
107
+
36
108
def work (self , interval = 5 ):
109
+
37
110
self .startup ()
38
111
while True :
39
112
if self ._shutdown :
40
- logging .info ('shutdown scheduled' )
113
+ self . logger .info ('shutdown scheduled' )
41
114
break
42
115
job = self .reserve ()
43
116
if job :
@@ -47,6 +120,13 @@ def work(self, interval=5):
47
120
self .unregister_worker ()
48
121
49
122
def run (self ):
123
+
124
+ if isinstance (self .server ,basestring ):
125
+ self .resq = ResQ (server = self .server , password = self .password )
126
+ elif isinstance (self .server , ResQ ):
127
+ self .resq = self .server
128
+ else :
129
+ raise Exception ("Bad server argument" )
50
130
self .work ()
51
131
#while True:
52
132
# job = self.q.get()
@@ -69,14 +149,18 @@ def __init__(self, pool_size=5, queues=[], server='localhost:6379', password=Non
69
149
self .pid = os .getpid ()
70
150
self .validate_queues ()
71
151
self ._workers = OrderedDict ()
72
- if isinstance (server ,basestring ):
73
- self .resq = ResQ (server = server , password = password )
74
- elif isinstance (server , ResQ ):
75
- self .resq = server
76
- else :
77
- raise Exception ("Bad server argument" )
152
+ self .server = server
153
+ self .password = password
154
+
78
155
#self._workers = list()
79
156
157
+ def setup_resq (self ):
158
+ if isinstance (self .server ,basestring ):
159
+ self .resq = ResQ (server = self .server , password = self .password )
160
+ elif isinstance (self .server , ResQ ):
161
+ self .resq = self .server
162
+ else :
163
+ raise Exception ("Bad server argument" )
80
164
def validate_queues (self ):
81
165
"Checks if a worker is given atleast one queue to work on."
82
166
if not self .queues :
@@ -114,18 +198,20 @@ def _check_commands(self):
114
198
if not self ._shutdown :
115
199
logging .debug ('Checking commands' )
116
200
command_key = 'resque:khan:%s' % self
117
- command = self .resq .redis .lpop (command_key )
118
- logging .debug ('COMMAND FOUND:' , command )
201
+
202
+ command = self .resq .redis .lpop ('resque:khan:%s' % str (self ))
203
+ logging .debug ('COMMAND FOUND: %s ' % command )
119
204
if command :
205
+ import pdb ;pdb .set_trace ()
120
206
self .process_command (command )
121
207
self ._check_commands ()
122
208
123
209
def process_command (self , command ):
124
- print 'Processing Command'
210
+ logging . info ( 'Processing Command' )
125
211
#available commands, shutdown, add 1, remove 1
126
- command = self ._command_map .get (command , None )
127
- if command :
128
- fn = getattr (self , command )
212
+ command_item = self ._command_map .get (command , None )
213
+ if command_item :
214
+ fn = getattr (self , command_item )
129
215
if fn :
130
216
fn ()
131
217
@@ -138,8 +224,6 @@ def _add_minion(self):
138
224
#parent_conn, child_conn = multiprocessing.Pipe()
139
225
m = Minion (self .queues , self .server , self .password )
140
226
#m.start()
141
- self ._workers [m .pid ] = m
142
- logging .info ('minion added at %s' % m .pid )
143
227
return m
144
228
#self._workers.append(m)
145
229
@@ -160,21 +244,23 @@ def _remove_minion(self, pid=None):
160
244
161
245
def register_worker (self ):
162
246
logging .debug ('registering khan' )
163
- self .resq .redis .sadd ('resque:khans' ,str (self ))
247
+ # self.resq.redis.sadd('resque:khans',str(self))
164
248
#self.resq._redis.add("worker:#{self}:started", Time.now.to_s)
165
- self .started = datetime .datetime .now ()
249
+ # self.started = datetime.datetime.now()
166
250
167
251
def unregister_worker (self ):
168
252
logging .debug ('unregistering khan' )
169
253
self .resq .redis .srem ('resque:khans' ,str (self ))
170
254
self .started = None
171
255
172
- def work (self , interval = 5 ):
256
+ def work (self , interval = 2 ):
173
257
self .startup ()
174
258
for i in range (self .pool_size ):
175
259
m = self ._add_minion ()
176
260
m .start ()
177
-
261
+ self ._workers [m .pid ] = m
262
+ logging .info ('minion added at %s' % m .pid )
263
+ self .setup_resq ()
178
264
while True :
179
265
self ._check_commands ()
180
266
if self ._shutdown :
0 commit comments