1
1
from pyres .exceptions import NoQueueError
2
2
from pyres .job import Job
3
3
from pyres import ResQ , Stat
4
+ import logging
4
5
import signal
5
6
import datetime
6
7
import os , sys
@@ -80,7 +81,7 @@ def schedule_shutdown(self, signum, frame):
80
81
81
82
def kill_child (self , signum , frame ):
82
83
if self .child :
83
- print "Killing child at %s" % self .child
84
+ logging . info ( "Killing child at %s" % self .child )
84
85
os .kill (self .child , signal .SIGKILL )
85
86
86
87
def __str__ (self ):
@@ -105,24 +106,25 @@ def work(self, interval=5):
105
106
self .startup ()
106
107
while True :
107
108
if self ._shutdown :
108
- print 'shutdown scheduled'
109
+ logging . info ( 'shutdown scheduled' )
109
110
break
110
111
job = self .reserve ()
111
112
if job :
112
- print "got: %s" % job
113
+ logging .info ('picked up job' )
114
+ logging .debug ('job details: %s' % job )
113
115
self .child = os .fork ()
114
116
if self .child :
115
- print 'Forked %s at %s' % (self .child , datetime .datetime .now ())
117
+ logging . info ( 'Forked %s at %s' % (self .child , datetime .datetime .now () ))
116
118
try :
117
119
os .waitpid (self .child , 0 )
118
120
except OSError , ose :
119
121
import errno
120
122
if ose .errno != errno .EINTR :
121
123
raise ose
122
124
#os.wait()
123
- print 'Done waiting'
125
+ logging . debug ( 'done waiting')
124
126
else :
125
- print 'Processing %s since %s' % (job ._queue , datetime .datetime .now ())
127
+ logging . info ( 'Processing %s since %s' % (job ._queue , datetime .datetime .now () ))
126
128
self .process (job )
127
129
os ._exit (0 )
128
130
self .child = None
@@ -140,36 +142,37 @@ def process(self, job=None):
140
142
job .perform ()
141
143
except Exception , e :
142
144
exceptionType , exceptionValue , exceptionTraceback = sys .exc_info ()
143
- print "%s failed: %s" % (job , e )
145
+ logging . error ( "%s failed: %s" % (job , e ) )
144
146
job .fail (exceptionTraceback )
145
147
self .failed ()
146
148
else :
147
- print "done: %s" % job
149
+ logging .info ('completed job' )
150
+ logging .debug ('job details: %s' % job )
148
151
finally :
149
152
self .done_working ()
150
153
151
154
def reserve (self ):
152
155
for q in self .queues :
153
- print "Checking %s" % q
156
+ logging . debug ( 'checking queue %s' % q )
154
157
job = Job .reserve (q , self .resq , self .__str__ ())
155
158
if job :
156
- print " Found job on %s" % q
159
+ logging . info ( ' Found job on %s' % q )
157
160
return job
158
161
159
162
def working_on (self , job ):
160
- print 'marking as working on'
163
+ logging . debug ( 'marking as working on' )
161
164
data = {
162
165
'queue' : job ._queue ,
163
166
'run_at' : str (datetime .datetime .now ()),
164
167
'payload' : job ._payload
165
168
}
166
169
data = json .dumps (data )
167
170
self .resq .redis ["resque:worker:%s" % str (self )] = data
168
- print "worker:%s" % str (self )
169
- print self .resq .redis ["resque:worker:%s" % str (self )]
171
+ logging . debug ( "worker:%s" % str (self ) )
172
+ logging . debug ( self .resq .redis ["resque:worker:%s" % str (self )])
170
173
171
174
def done_working (self ):
172
- print 'done working'
175
+ logging . info ( 'done working' )
173
176
self .processed ()
174
177
self .resq .redis .delete ("resque:worker:%s" % str (self ))
175
178
0 commit comments