Skip to content

Commit 56d9c08

Browse files
committed
Better handling crashes and unexpected exits in the forked worker process.
Fixes all the previously added tests.
1 parent 614d067 commit 56d9c08

File tree

2 files changed

+49
-23
lines changed

2 files changed

+49
-23
lines changed

pyres/exceptions.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
class NoQueueError(Exception):
22
pass
33

4-
class TimeoutError(RuntimeError):
4+
class JobError(RuntimeError):
5+
pass
6+
7+
class TimeoutError(JobError):
8+
pass
9+
10+
class CrashError(JobError):
511
pass

pyres/worker.py

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import commands
77
import random
88

9-
from pyres.exceptions import NoQueueError, TimeoutError
9+
from pyres.exceptions import NoQueueError, JobError, TimeoutError, CrashError
1010
from pyres.job import Job
1111
from pyres import ResQ, Stat, __version__
1212

@@ -174,9 +174,17 @@ def fork_worker(self, job):
174174

175175
# waits for the result or times out
176176
while True:
177-
result = os.waitpid(self.child, os.WNOHANG)
178-
if result != (0, 0):
179-
break
177+
pid, status = os.waitpid(self.child, os.WNOHANG)
178+
if pid != 0:
179+
if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0:
180+
break
181+
if os.WIFSTOPPED(status):
182+
logger.warning("Process stopped by signal %d" % os.WSTOPSIG(status))
183+
else:
184+
if os.WIFSIGNALED(status):
185+
raise CrashError("Unexpected exit by signal %d" % os.WTERMSIG(status))
186+
raise CrashError("Unexpected exit status %d" % os.WEXITSTATUS(status))
187+
180188
time.sleep(0.5)
181189

182190
now = datetime.datetime.now()
@@ -190,13 +198,13 @@ def fork_worker(self, job):
190198

191199
if ose.errno != errno.EINTR:
192200
raise ose
193-
194-
except TimeoutError as e:
195-
exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
196-
logger.exception("%s timed out: %s" % (job, e))
197-
job.fail(exceptionTraceback)
198-
self.failed()
199-
self.done_working()
201+
except JobError:
202+
self._handle_job_exception(job)
203+
finally:
204+
# If the child process' job called os._exit manually we need to
205+
# finish the clean up here.
206+
if self.job():
207+
self.done_working()
200208

201209
logger.debug('done waiting')
202210
else:
@@ -236,21 +244,33 @@ def before_process(self, job):
236244
def process(self, job=None):
237245
if not job:
238246
job = self.reserve()
247+
248+
job_failed = False
239249
try:
240-
self.working_on(job)
241-
job = self.before_process(job)
242-
return job.perform()
243-
except Exception, e:
244-
exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
245-
logger.exception("%s failed: %s" % (job, e))
246-
job.fail(exceptionTraceback)
247-
self.failed()
248-
else:
249-
logger.info('completed job')
250-
logger.debug('job details: %s' % job)
250+
try:
251+
self.working_on(job)
252+
job = self.before_process(job)
253+
return job.perform()
254+
except Exception:
255+
job_failed = True
256+
self._handle_job_exception(job)
257+
except SystemExit, e:
258+
if e.code != 0:
259+
job_failed = True
260+
self._handle_job_exception(job)
261+
262+
if not job_failed:
263+
logger.info('completed job')
264+
logger.debug('job details: %s' % job)
251265
finally:
252266
self.done_working()
253267

268+
def _handle_job_exception(self, job):
269+
exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
270+
logger.exception("%s failed: %s" % (job, exceptionValue))
271+
job.fail(exceptionTraceback)
272+
self.failed()
273+
254274
def reserve(self, timeout=10):
255275
logger.debug('checking queues %s' % self.queues)
256276
job = self.job_class.reserve(self.queues, self.resq, self.__str__(), timeout=timeout)

0 commit comments

Comments
 (0)