Skip to content

Commit 551b47b

Browse files
committed
Adding job to the error queue after a timeout.
1 parent 75760da commit 551b47b

File tree

2 files changed

+17
-5
lines changed

2 files changed

+17
-5
lines changed

pyres/exceptions.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,5 @@
11
class NoQueueError(Exception):
22
pass
3+
4+
class TimeoutError(RuntimeError):
5+
pass

pyres/worker.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import commands
77
import random
88

9-
from pyres.exceptions import NoQueueError
9+
from pyres.exceptions import NoQueueError, TimeoutError
1010
from pyres.job import Job
1111
from pyres import ResQ, Stat, __version__
1212

@@ -155,24 +155,33 @@ def work(self, interval=5):
155155
try:
156156
start = datetime.datetime.now()
157157
result = (0, 0)
158-
timed_out = False
159158

160159
# waits for the result or times out
161-
while result == (0, 0) and not timed_out:
160+
while True:
162161
result = os.waitpid(self.child, os.WNOHANG)
163-
now = datetime.datetime.now()
162+
if result != (0, 0):
163+
break
164+
time.sleep(0.5)
164165

166+
now = datetime.datetime.now()
165167
if self.timeout and ((now - start).seconds > self.timeout):
166168
os.kill(self.child, signal.SIGKILL)
167169
os.waitpid(-1, os.WNOHANG)
168-
timed_out = True
170+
raise TimeoutError("Timed out after %d seconds" % self.timeout)
169171

170172
except OSError as ose:
171173
import errno
172174

173175
if ose.errno != errno.EINTR:
174176
raise ose
175177

178+
except TimeoutError as e:
179+
exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
180+
logger.exception("%s timed out: %s" % (job, e))
181+
job.fail(exceptionTraceback)
182+
self.failed()
183+
self.done_working()
184+
176185
logger.debug('done waiting')
177186
else:
178187
self._setproctitle("Processing %s since %s" %

0 commit comments

Comments
 (0)