Skip to content

Commit 29128a9

Browse files
committed
Merge pull request binarymatt#107 from cezarsa/master
Handling crashes and unexpected exits in the child process
2 parents 9afa179 + 56d9c08 commit 29128a9

File tree

4 files changed

+198
-83
lines changed

4 files changed

+198
-83
lines changed

pyres/exceptions.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
class NoQueueError(Exception):
22
pass
33

4-
class TimeoutError(RuntimeError):
4+
class JobError(RuntimeError):
5+
pass
6+
7+
class TimeoutError(JobError):
8+
pass
9+
10+
class CrashError(JobError):
511
pass

pyres/worker.py

Lines changed: 100 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import commands
77
import random
88

9-
from pyres.exceptions import NoQueueError, TimeoutError
9+
from pyres.exceptions import NoQueueError, JobError, TimeoutError, CrashError
1010
from pyres.job import Job
1111
from pyres import ResQ, Stat, __version__
1212

@@ -128,8 +128,6 @@ def work(self, interval=5):
128128
that job to make sure another worker won't run it, then *forks* itself to
129129
work on that job.
130130
131-
Finally, the ``process`` method actually processes the job by eventually calling the Job instance's ``perform`` method.
132-
133131
"""
134132
self._setproctitle("Starting")
135133
self.startup()
@@ -142,63 +140,7 @@ def work(self, interval=5):
142140
job = self.reserve(interval)
143141

144142
if job:
145-
logger.debug('picked up job')
146-
logger.debug('job details: %s' % job)
147-
self.before_fork(job)
148-
self.child = os.fork()
149-
if self.child:
150-
self._setproctitle("Forked %s at %s" %
151-
(self.child,
152-
datetime.datetime.now()))
153-
logger.info('Forked %s at %s' % (self.child,
154-
datetime.datetime.now()))
155-
156-
try:
157-
start = datetime.datetime.now()
158-
159-
# waits for the result or times out
160-
while True:
161-
result = os.waitpid(self.child, os.WNOHANG)
162-
if result != (0, 0):
163-
break
164-
time.sleep(0.5)
165-
166-
now = datetime.datetime.now()
167-
if self.timeout and ((now - start).seconds > self.timeout):
168-
os.kill(self.child, signal.SIGKILL)
169-
os.waitpid(-1, os.WNOHANG)
170-
raise TimeoutError("Timed out after %d seconds" % self.timeout)
171-
172-
except OSError as ose:
173-
import errno
174-
175-
if ose.errno != errno.EINTR:
176-
raise ose
177-
178-
except TimeoutError as e:
179-
exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
180-
logger.exception("%s timed out: %s" % (job, e))
181-
job.fail(exceptionTraceback)
182-
self.failed()
183-
self.done_working()
184-
185-
logger.debug('done waiting')
186-
else:
187-
self._setproctitle("Processing %s since %s" %
188-
(job._queue,
189-
datetime.datetime.now()))
190-
logger.info('Processing %s since %s' %
191-
(job._queue, datetime.datetime.now()))
192-
self.after_fork(job)
193-
194-
# re-seed the Python PRNG after forking, otherwise
195-
# all job process will share the same sequence of
196-
# random numbers
197-
random.seed()
198-
199-
self.process(job)
200-
os._exit(0)
201-
self.child = None
143+
self.fork_worker(job)
202144
else:
203145
if interval == 0:
204146
break
@@ -207,6 +149,81 @@ def work(self, interval=5):
207149
#time.sleep(interval)
208150
self.unregister_worker()
209151

152+
def fork_worker(self, job):
153+
"""Invoked by ``work`` method. ``fork_worker`` does the actual forking to create the child
154+
process that will process the job. It's also responsible for monitoring the child process
155+
and handling hangs and crashes.
156+
157+
Finally, the ``process`` method actually processes the job by eventually calling the Job
158+
instance's ``perform`` method.
159+
160+
"""
161+
logger.debug('picked up job')
162+
logger.debug('job details: %s' % job)
163+
self.before_fork(job)
164+
self.child = os.fork()
165+
if self.child:
166+
self._setproctitle("Forked %s at %s" %
167+
(self.child,
168+
datetime.datetime.now()))
169+
logger.info('Forked %s at %s' % (self.child,
170+
datetime.datetime.now()))
171+
172+
try:
173+
start = datetime.datetime.now()
174+
175+
# waits for the result or times out
176+
while True:
177+
pid, status = os.waitpid(self.child, os.WNOHANG)
178+
if pid != 0:
179+
if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0:
180+
break
181+
if os.WIFSTOPPED(status):
182+
logger.warning("Process stopped by signal %d" % os.WSTOPSIG(status))
183+
else:
184+
if os.WIFSIGNALED(status):
185+
raise CrashError("Unexpected exit by signal %d" % os.WTERMSIG(status))
186+
raise CrashError("Unexpected exit status %d" % os.WEXITSTATUS(status))
187+
188+
time.sleep(0.5)
189+
190+
now = datetime.datetime.now()
191+
if self.timeout and ((now - start).seconds > self.timeout):
192+
os.kill(self.child, signal.SIGKILL)
193+
os.waitpid(-1, os.WNOHANG)
194+
raise TimeoutError("Timed out after %d seconds" % self.timeout)
195+
196+
except OSError as ose:
197+
import errno
198+
199+
if ose.errno != errno.EINTR:
200+
raise ose
201+
except JobError:
202+
self._handle_job_exception(job)
203+
finally:
204+
# If the child process' job called os._exit manually we need to
205+
# finish the clean up here.
206+
if self.job():
207+
self.done_working()
208+
209+
logger.debug('done waiting')
210+
else:
211+
self._setproctitle("Processing %s since %s" %
212+
(job._queue,
213+
datetime.datetime.now()))
214+
logger.info('Processing %s since %s' %
215+
(job._queue, datetime.datetime.now()))
216+
self.after_fork(job)
217+
218+
# re-seed the Python PRNG after forking, otherwise
219+
# all job process will share the same sequence of
220+
# random numbers
221+
random.seed()
222+
223+
self.process(job)
224+
os._exit(0)
225+
self.child = None
226+
210227
def before_fork(self, job):
211228
"""
212229
hook for making changes immediately before forking to process
@@ -227,21 +244,33 @@ def before_process(self, job):
227244
def process(self, job=None):
228245
if not job:
229246
job = self.reserve()
247+
248+
job_failed = False
230249
try:
231-
self.working_on(job)
232-
job = self.before_process(job)
233-
return job.perform()
234-
except Exception, e:
235-
exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
236-
logger.exception("%s failed: %s" % (job, e))
237-
job.fail(exceptionTraceback)
238-
self.failed()
239-
else:
240-
logger.info('completed job')
241-
logger.debug('job details: %s' % job)
250+
try:
251+
self.working_on(job)
252+
job = self.before_process(job)
253+
return job.perform()
254+
except Exception:
255+
job_failed = True
256+
self._handle_job_exception(job)
257+
except SystemExit, e:
258+
if e.code != 0:
259+
job_failed = True
260+
self._handle_job_exception(job)
261+
262+
if not job_failed:
263+
logger.info('completed job')
264+
logger.debug('job details: %s' % job)
242265
finally:
243266
self.done_working()
244267

268+
def _handle_job_exception(self, job):
269+
exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
270+
logger.exception("%s failed: %s" % (job, exceptionValue))
271+
job.fail(exceptionTraceback)
272+
self.failed()
273+
245274
def reserve(self, timeout=10):
246275
logger.debug('checking queues %s' % self.queues)
247276
job = self.job_class.reserve(self.queues, self.resq, self.__str__(), timeout=timeout)

tests/__init__.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,33 @@ def perform(wait_for):
4646
time.sleep(wait_for)
4747
return "Done Sleeping"
4848

49+
class CrashJob(object):
50+
queue = 'basic'
51+
52+
@staticmethod
53+
def perform():
54+
# Dangerous, this will cause a hard crash of the python process
55+
import ctypes
56+
ctypes.string_at(1)
57+
return "Never got here"
58+
59+
class PrematureExitJob(object):
60+
queue = 'basic'
61+
62+
@staticmethod
63+
def perform(exit_code):
64+
import sys
65+
sys.exit(exit_code)
66+
return "Never got here"
67+
68+
class PrematureHardExitJob(object):
69+
queue = 'basic'
70+
71+
@staticmethod
72+
def perform(exit_code):
73+
os._exit(exit_code)
74+
return "Never got here"
75+
4976
class TestProcess(object):
5077
queue = 'high'
5178

tests/test_worker.py

Lines changed: 64 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from tests import PyResTests, Basic, TestProcess, ErrorObject, RetryOnExceptionJob, TimeoutJob
1+
from tests import PyResTests, Basic, TestProcess, ErrorObject, RetryOnExceptionJob, TimeoutJob, CrashJob, PrematureExitJob, PrematureHardExitJob
22
from pyres import ResQ
33
from pyres.job import Job
44
from pyres.scheduler import Scheduler
@@ -203,21 +203,74 @@ def test_retry_on_exception(self):
203203
assert worker.get_failed() == 0
204204

205205
def test_kills_stale_workers_after_timeout(self):
206-
import signal
207206
timeout = 1
208207

209208
worker = Worker(['basic'], timeout=timeout)
210209
self.resq.enqueue(TimeoutJob, timeout + 1)
211210

212-
child = os.fork()
213-
if child:
214-
assert worker.get_failed() == 0
215-
time.sleep(timeout + 2)
216-
os.kill(child, signal.SIGKILL)
217-
os.waitpid(-1, os.WNOHANG)
218-
assert worker.get_failed() == 1
219-
else:
220-
worker.work()
211+
assert worker.get_failed() == 0
212+
worker.fork_worker(worker.reserve())
213+
assert worker.get_failed() == 1
214+
215+
def test_detect_crashed_workers_as_failures(self):
216+
worker = Worker(['basic'])
217+
self.resq.enqueue(CrashJob)
218+
219+
assert worker.job() == {}
220+
assert worker.get_failed() == 0
221+
222+
worker.fork_worker(worker.reserve())
223+
224+
assert worker.job() == {}
225+
assert worker.get_failed() == 1
226+
227+
def test_detect_non_0_sys_exit_as_failure(self):
228+
worker = Worker(['basic'])
229+
self.resq.enqueue(PrematureExitJob, 9)
230+
231+
assert worker.job() == {}
232+
assert worker.get_failed() == 0
233+
234+
worker.fork_worker(worker.reserve())
235+
236+
assert worker.job() == {}
237+
assert worker.get_failed() == 1
238+
239+
def test_detect_code_0_sys_exit_as_success(self):
240+
worker = Worker(['basic'])
241+
self.resq.enqueue(PrematureExitJob, 0)
242+
243+
assert worker.job() == {}
244+
assert worker.get_failed() == 0
245+
246+
worker.fork_worker(worker.reserve())
247+
248+
assert worker.job() == {}
249+
assert worker.get_failed() == 0
250+
251+
def test_detect_non_0_os_exit_as_failure(self):
252+
worker = Worker(['basic'])
253+
self.resq.enqueue(PrematureHardExitJob, 9)
254+
255+
assert worker.job() == {}
256+
assert worker.get_failed() == 0
257+
258+
worker.fork_worker(worker.reserve())
259+
260+
assert worker.job() == {}
261+
assert worker.get_failed() == 1
262+
263+
def test_detect_code_0_os_exit_as_success(self):
264+
worker = Worker(['basic'])
265+
self.resq.enqueue(PrematureHardExitJob, 0)
266+
267+
assert worker.job() == {}
268+
assert worker.get_failed() == 0
269+
270+
worker.fork_worker(worker.reserve())
271+
272+
assert worker.job() == {}
273+
assert worker.get_failed() == 0
221274

222275
def test_retries_give_up_eventually(self):
223276
now = datetime.datetime.now()

0 commit comments

Comments
 (0)