Skip to content

Commit 408b68a

Browse files
committed
Merge pull request binarymatt#105 from heynemann/master
Implementing timeout for workers
2 parents e6655b0 + a297dba commit 408b68a

File tree

5 files changed

+61
-8
lines changed

5 files changed

+61
-8
lines changed

pyres/exceptions.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,5 @@
11
class NoQueueError(Exception):
22
pass
3+
4+
class TimeoutError(RuntimeError):
5+
pass

pyres/scripts.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ def pyres_worker():
106106
parser.add_option('-l', '--log-level', dest='log_level', default='info', help='log level. Valid values are "debug", "info", "warning", "error", "critical", in decreasing order of verbosity. Defaults to "info" if parameter not specified.')
107107
parser.add_option('-f', dest='logfile', help='If present, a logfile will be used. "stderr", "stdout", and "syslog" are all special values.')
108108
parser.add_option('-p', dest='pidfile', help='If present, a pidfile will be used.')
109+
parser.add_option("-t", '--timeout', dest='timeout', default=None, help='the timeout in seconds for this worker')
109110
(options,args) = parser.parse_args()
110111

111112
if len(args) != 1:
@@ -120,6 +121,8 @@ def pyres_worker():
120121
if interval is not None:
121122
interval = int(interval)
122123

124+
timeout = options.timeout is None and options.timeout or int(options.timeout)
125+
123126
queues = args[0].split(',')
124127
server = '%s:%s' % (options.host,options.port)
125-
Worker.run(queues, server, interval)
128+
Worker.run(queues, server, interval, timeout=timeout)

pyres/worker.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import commands
77
import random
88

9-
from pyres.exceptions import NoQueueError
9+
from pyres.exceptions import NoQueueError, TimeoutError
1010
from pyres.job import Job
1111
from pyres import ResQ, Stat, __version__
1212

@@ -25,13 +25,14 @@ class and passes a comma-separated list of queues to listen on.::
2525

2626
job_class = Job
2727

28-
def __init__(self, queues=(), server="localhost:6379", password=None):
28+
def __init__(self, queues=(), server="localhost:6379", password=None, timeout=None):
2929
self.queues = queues
3030
self.validate_queues()
3131
self._shutdown = False
3232
self.child = None
3333
self.pid = os.getpid()
3434
self.hostname = os.uname()[1]
35+
self.timeout = timeout
3536

3637
if isinstance(server, basestring):
3738
self.resq = ResQ(server=server, password=password)
@@ -153,13 +154,34 @@ def work(self, interval=5):
153154
datetime.datetime.now()))
154155

155156
try:
156-
os.waitpid(self.child, 0)
157+
start = datetime.datetime.now()
158+
159+
# waits for the result or times out
160+
while True:
161+
result = os.waitpid(self.child, os.WNOHANG)
162+
if result != (0, 0):
163+
break
164+
time.sleep(0.5)
165+
166+
now = datetime.datetime.now()
167+
if self.timeout and ((now - start).seconds > self.timeout):
168+
os.kill(self.child, signal.SIGKILL)
169+
os.waitpid(-1, os.WNOHANG)
170+
raise TimeoutError("Timed out after %d seconds" % self.timeout)
171+
157172
except OSError as ose:
158173
import errno
159174

160175
if ose.errno != errno.EINTR:
161176
raise ose
162-
#os.wait()
177+
178+
except TimeoutError as e:
179+
exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
180+
logger.exception("%s timed out: %s" % (job, e))
181+
job.fail(exceptionTraceback)
182+
self.failed()
183+
self.done_working()
184+
163185
logger.debug('done waiting')
164186
else:
165187
self._setproctitle("Processing %s since %s" %
@@ -283,8 +305,8 @@ def worker_pids(self):
283305
grep pyres_worker").split("\n"))
284306

285307
@classmethod
286-
def run(cls, queues, server="localhost:6379", interval=None):
287-
worker = cls(queues=queues, server=server)
308+
def run(cls, queues, server="localhost:6379", interval=None, timeout=None):
309+
worker = cls(queues=queues, server=server, timeout=timeout)
288310
if interval is not None:
289311
worker.work(interval)
290312
else:

tests/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,14 @@ def perform(fail_until):
3737
else:
3838
return True
3939

40+
class TimeoutJob(object):
41+
queue = 'basic'
42+
43+
@staticmethod
44+
def perform(wait_for):
45+
import time
46+
time.sleep(wait_for)
47+
return "Done Sleeping"
4048

4149
class TestProcess(object):
4250
queue = 'high'

tests/test_worker.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from tests import PyResTests, Basic, TestProcess, ErrorObject, RetryOnExceptionJob
1+
from tests import PyResTests, Basic, TestProcess, ErrorObject, RetryOnExceptionJob, TimeoutJob
22
from pyres import ResQ
33
from pyres.job import Job
44
from pyres.scheduler import Scheduler
@@ -202,6 +202,23 @@ def test_retry_on_exception(self):
202202
assert True == worker.process()
203203
assert worker.get_failed() == 0
204204

205+
def test_kills_stale_workers_after_timeout(self):
206+
import signal
207+
timeout = 1
208+
209+
worker = Worker(['basic'], timeout=timeout)
210+
self.resq.enqueue(TimeoutJob, timeout + 1)
211+
212+
child = os.fork()
213+
if child:
214+
assert worker.get_failed() == 0
215+
time.sleep(timeout + 2)
216+
os.kill(child, signal.SIGKILL)
217+
os.waitpid(-1, os.WNOHANG)
218+
assert worker.get_failed() == 1
219+
else:
220+
worker.work()
221+
205222
def test_retries_give_up_eventually(self):
206223
now = datetime.datetime.now()
207224
self.set_current_time(now)

0 commit comments

Comments
 (0)