7
7
import time , os , signal
8
8
import datetime
9
9
import logging
10
-
10
+ import logging . handlers
11
11
from pyres import ResQ , Stat
12
12
from pyres .exceptions import NoQueueError
13
13
from pyres .utils import OrderedDict
14
14
from pyres .job import Job
15
15
import pyres .json_parser as json
16
16
17
+ def setup_logging (namespace = '' , log_level = logging .INFO , log_file = None ):
18
+
19
+ logger = multiprocessing .get_logger ()
20
+ #logger = multiprocessing.log_to_stderr()
21
+ logger .setLevel (log_level )
22
+ format = '%(asctime)s %(levelname)s ' + namespace + ': %(message)s'
23
+ if log_file :
24
+ handler = logging .handlers .RotatingFileHandler (log_file , maxBytes = 104857600 , backupCount = 5 )
25
+ else :
26
+ handler = logging .StreamHandler ()
27
+ handler .setFormatter (logging .Formatter ((format )))
28
+ logger .addHandler (handler )
29
+ return logger
30
+
17
31
class Minion (multiprocessing .Process ):
18
- def __init__ (self , queues , server , password ):
19
- #super(Minion,self).__init__(name='Minion')
32
+ def __init__ (self , queues , server , password , log_level = logging .INFO , log_file = None ):
20
33
multiprocessing .Process .__init__ (self , name = 'Minion' )
21
34
22
- format = '%(asctime)s %(levelname)s %(filename)s-%(lineno)d: %(message)s'
23
- logHandler = logging .StreamHandler ()
24
- logHandler .setFormatter (logging .Formatter (format ))
25
- self .logger = multiprocessing .get_logger ()
26
- self .logger .addHandler (logHandler )
27
- self .logger .setLevel (logging .DEBUG )
35
+ # format = '%(asctime)s %(levelname)s %(filename)s-%(lineno)d: %(message)s'
36
+ # logHandler = logging.StreamHandler()
37
+ # logHandler.setFormatter(logging.Formatter(format))
38
+ # self.logger = multiprocessing.get_logger()
39
+ # self.logger.addHandler(logHandler)
40
+ # self.logger.setLevel(logging.DEBUG)
28
41
29
42
self .queues = queues
30
43
self ._shutdown = False
31
44
self .hostname = os .uname ()[1 ]
32
45
self .server = server
33
46
self .password = password
34
47
48
+ self .log_level = log_level
49
+ self .log_file = log_file
50
+
35
51
def prune_dead_workers (self ):
36
52
pass
37
53
@@ -68,15 +84,15 @@ def process(self, job):
68
84
return
69
85
try :
70
86
self .working_on (job )
71
- return job .perform ()
87
+ job .perform ()
72
88
except Exception , e :
73
89
exceptionType , exceptionValue , exceptionTraceback = sys .exc_info ()
74
90
self .logger .error ("%s failed: %s" % (job , e ))
75
91
job .fail (exceptionTraceback )
76
92
self .failed ()
77
93
else :
78
- self .logger .info ( 'completed job' )
79
- self .logger .debug ( 'job details : %s' % job )
94
+ self .logger .debug ( "Hells yeah" )
95
+ self .logger .info ( 'completed job : %s' % job )
80
96
finally :
81
97
self .done_working ()
82
98
@@ -100,7 +116,7 @@ def processed(self):
100
116
total_processed .incr ()
101
117
102
118
def done_working (self ):
103
- self .logger .info ('done working' )
119
+ self .logger .debug ('done working' )
104
120
self .processed ()
105
121
self .resq .redis .delete ("resque:minion:%s" % str (self ))
106
122
@@ -130,6 +146,8 @@ def run(self):
130
146
self .resq = self .server
131
147
else :
132
148
raise Exception ("Bad server argument" )
149
+ namespace = 'minion:%s' % self .pid
150
+ self .logger = setup_logging (namespace , self .log_level , self .log_file )
133
151
self .work ()
134
152
#while True:
135
153
# job = self.q.get()
@@ -142,7 +160,7 @@ class Khan(object):
142
160
'REMOVE' : '_remove_minion' ,
143
161
'SHUTDOWN' : '_schedule_shutdown'
144
162
}
145
- def __init__ (self , pool_size = 5 , queues = [], server = 'localhost:6379' , password = None ):
163
+ def __init__ (self , pool_size = 5 , queues = [], server = 'localhost:6379' , password = None , logging_level = logging . INFO , log_file = None ):
146
164
#super(Khan,self).__init__(queues=queues,server=server,password=password)
147
165
self ._shutdown = False
148
166
self .pool_size = int (pool_size )
@@ -154,6 +172,9 @@ def __init__(self, pool_size=5, queues=[], server='localhost:6379', password=Non
154
172
self ._workers = OrderedDict ()
155
173
self .server = server
156
174
self .password = password
175
+ self .logging_level = logging_level
176
+ self .log_file = log_file
177
+ self .logger = setup_logging ('khan' , self .logging_level , self .log_file )
157
178
#self._workers = list()
158
179
159
180
def setup_resq (self ):
@@ -171,6 +192,7 @@ def validate_queues(self):
171
192
172
193
def startup (self ):
173
194
self .register_signal_handlers ()
195
+
174
196
175
197
def register_signal_handlers (self ):
176
198
signal .signal (signal .SIGTERM , self .schedule_shutdown )
@@ -183,7 +205,7 @@ def _schedule_shutdown(self):
183
205
self .schedule_shutdown (None , None )
184
206
185
207
def schedule_shutdown (self , signum , frame ):
186
- logging .info ('Khan Shutdown scheduled' )
208
+ self . logger .info ('Khan Shutdown scheduled' )
187
209
self ._shutdown = True
188
210
189
211
def kill_child (self , signum , frame ):
@@ -200,16 +222,16 @@ def register_khan(self):
200
222
201
223
def _check_commands (self ):
202
224
if not self ._shutdown :
203
- logging .debug ('Checking commands' )
225
+ self . logger .debug ('Checking commands' )
204
226
command = self .resq .redis .lpop ('resque:khan:%s' % str (self ))
205
- logging .debug ('COMMAND FOUND: %s ' % command )
227
+ self . logger .debug ('COMMAND FOUND: %s ' % command )
206
228
if command :
207
229
import pdb ;pdb .set_trace ()
208
230
self .process_command (command )
209
231
self ._check_commands ()
210
232
211
233
def process_command (self , command ):
212
- logging .info ('Processing Command' )
234
+ self . logger .info ('Processing Command' )
213
235
#available commands, shutdown, add 1, remove 1
214
236
command_item = self ._command_map .get (command , None )
215
237
if command_item :
@@ -224,11 +246,11 @@ def add_minion(self):
224
246
self .resq .redis .sadd ('resque:khans' ,str (self ))
225
247
226
248
def _add_minion (self ):
227
- logging .info ('Adding minion' )
228
- m = Minion (self .queues , self .server , self .password )
249
+ self . logger .info ('Adding minion' )
250
+ m = Minion (self .queues , self .server , self .password , log_level = self . logging_level )
229
251
m .start ()
230
252
self ._workers [m .pid ] = m
231
- logging .info ('minion added at: %s' % m .pid )
253
+ self . logger .info ('minion added at: %s' % m .pid )
232
254
return m
233
255
234
256
def _shutdown_minions (self ):
@@ -250,7 +272,7 @@ def _remove_minion(self, pid=None):
250
272
return m
251
273
252
274
def unregister_khan (self ):
253
- logging .debug ('unregistering khan' )
275
+ self . logger .debug ('unregistering khan' )
254
276
self .resq .redis .srem ('resque:khans' ,str (self ))
255
277
self .started = None
256
278
@@ -279,8 +301,8 @@ def __str__(self):
279
301
return '%s:%s:%s' % (hostname , self .pid , self .pool_size )
280
302
281
303
@classmethod
282
- def run (cls , pool_size = 5 , queues = [], server = 'localhost:6379' ):
283
- worker = cls (pool_size = pool_size , queues = queues , server = server )
304
+ def run (cls , pool_size = 5 , queues = [], server = 'localhost:6379' , logging_level = logging . INFO ):
305
+ worker = cls (pool_size = pool_size , queues = queues , server = server , logging_level = logging_level )
284
306
worker .work ()
285
307
286
308
#if __name__ == "__main__":
0 commit comments