@@ -128,8 +128,6 @@ def work(self, interval=5):
128128 that job to make sure another worker won't run it, then *forks* itself to
129129 work on that job.
130130
131- Finally, the ``process`` method actually processes the job by eventually calling the Job instance's ``perform`` method.
132-
133131 """
134132 self ._setproctitle ("Starting" )
135133 self .startup ()
@@ -142,63 +140,7 @@ def work(self, interval=5):
142140 job = self .reserve (interval )
143141
144142 if job :
145- logger .debug ('picked up job' )
146- logger .debug ('job details: %s' % job )
147- self .before_fork (job )
148- self .child = os .fork ()
149- if self .child :
150- self ._setproctitle ("Forked %s at %s" %
151- (self .child ,
152- datetime .datetime .now ()))
153- logger .info ('Forked %s at %s' % (self .child ,
154- datetime .datetime .now ()))
155-
156- try :
157- start = datetime .datetime .now ()
158-
159- # waits for the result or times out
160- while True :
161- result = os .waitpid (self .child , os .WNOHANG )
162- if result != (0 , 0 ):
163- break
164- time .sleep (0.5 )
165-
166- now = datetime .datetime .now ()
167- if self .timeout and ((now - start ).seconds > self .timeout ):
168- os .kill (self .child , signal .SIGKILL )
169- os .waitpid (- 1 , os .WNOHANG )
170- raise TimeoutError ("Timed out after %d seconds" % self .timeout )
171-
172- except OSError as ose :
173- import errno
174-
175- if ose .errno != errno .EINTR :
176- raise ose
177-
178- except TimeoutError as e :
179- exceptionType , exceptionValue , exceptionTraceback = sys .exc_info ()
180- logger .exception ("%s timed out: %s" % (job , e ))
181- job .fail (exceptionTraceback )
182- self .failed ()
183- self .done_working ()
184-
185- logger .debug ('done waiting' )
186- else :
187- self ._setproctitle ("Processing %s since %s" %
188- (job ._queue ,
189- datetime .datetime .now ()))
190- logger .info ('Processing %s since %s' %
191- (job ._queue , datetime .datetime .now ()))
192- self .after_fork (job )
193-
194- # re-seed the Python PRNG after forking, otherwise
195- # all job process will share the same sequence of
196- # random numbers
197- random .seed ()
198-
199- self .process (job )
200- os ._exit (0 )
201- self .child = None
143+ self .fork_worker (job )
202144 else :
203145 if interval == 0 :
204146 break
@@ -207,6 +149,73 @@ def work(self, interval=5):
207149 #time.sleep(interval)
208150 self .unregister_worker ()
209151
152+ def fork_worker (self , job ):
153+ """Invoked by ``work`` method. ``fork_worker`` does the actual forking to create the child
154+ process that will process the job. It's also responsible for monitoring the child process
155+ and handling hangs and crashes.
156+
157+ Finally, the ``process`` method actually processes the job by eventually calling the Job
158+ instance's ``perform`` method.
159+
160+ """
161+ logger .debug ('picked up job' )
162+ logger .debug ('job details: %s' % job )
163+ self .before_fork (job )
164+ self .child = os .fork ()
165+ if self .child :
166+ self ._setproctitle ("Forked %s at %s" %
167+ (self .child ,
168+ datetime .datetime .now ()))
169+ logger .info ('Forked %s at %s' % (self .child ,
170+ datetime .datetime .now ()))
171+
172+ try :
173+ start = datetime .datetime .now ()
174+
175+ # waits for the result or times out
176+ while True :
177+ result = os .waitpid (self .child , os .WNOHANG )
178+ if result != (0 , 0 ):
179+ break
180+ time .sleep (0.5 )
181+
182+ now = datetime .datetime .now ()
183+ if self .timeout and ((now - start ).seconds > self .timeout ):
184+ os .kill (self .child , signal .SIGKILL )
185+ os .waitpid (- 1 , os .WNOHANG )
186+ raise TimeoutError ("Timed out after %d seconds" % self .timeout )
187+
188+ except OSError as ose :
189+ import errno
190+
191+ if ose .errno != errno .EINTR :
192+ raise ose
193+
194+ except TimeoutError as e :
195+ exceptionType , exceptionValue , exceptionTraceback = sys .exc_info ()
196+ logger .exception ("%s timed out: %s" % (job , e ))
197+ job .fail (exceptionTraceback )
198+ self .failed ()
199+ self .done_working ()
200+
201+ logger .debug ('done waiting' )
202+ else :
203+ self ._setproctitle ("Processing %s since %s" %
204+ (job ._queue ,
205+ datetime .datetime .now ()))
206+ logger .info ('Processing %s since %s' %
207+ (job ._queue , datetime .datetime .now ()))
208+ self .after_fork (job )
209+
210+ # re-seed the Python PRNG after forking, otherwise
211+ # all job process will share the same sequence of
212+ # random numbers
213+ random .seed ()
214+
215+ self .process (job )
216+ os ._exit (0 )
217+ self .child = None
218+
210219 def before_fork (self , job ):
211220 """
212221 hook for making changes immediately before forking to process
0 commit comments