Skip to content

Commit a297dba

Browse files
committed
Adding basic test for workers with timeout values
1 parent 921db92 commit a297dba

File tree

3 files changed

+29
-4
lines changed

3 files changed

+29
-4
lines changed

pyres/worker.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,14 @@ class and passes a comma-separated list of queues to listen on.::
2525

2626
job_class = Job
2727

28-
def __init__(self, queues=(), server="localhost:6379", password=None):
28+
def __init__(self, queues=(), server="localhost:6379", password=None, timeout=None):
2929
self.queues = queues
3030
self.validate_queues()
3131
self._shutdown = False
3232
self.child = None
3333
self.pid = os.getpid()
3434
self.hostname = os.uname()[1]
35+
self.timeout = timeout
3536

3637
if isinstance(server, basestring):
3738
self.resq = ResQ(server=server, password=password)
@@ -305,8 +306,7 @@ def worker_pids(self):
305306

306307
@classmethod
307308
def run(cls, queues, server="localhost:6379", interval=None, timeout=None):
308-
worker = cls(queues=queues, server=server)
309-
worker.timeout = timeout
309+
worker = cls(queues=queues, server=server, timeout=timeout)
310310
if interval is not None:
311311
worker.work(interval)
312312
else:

tests/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,14 @@ def perform(fail_until):
3737
else:
3838
return True
3939

40+
class TimeoutJob(object):
41+
queue = 'basic'
42+
43+
@staticmethod
44+
def perform(wait_for):
45+
import time
46+
time.sleep(wait_for)
47+
return "Done Sleeping"
4048

4149
class TestProcess(object):
4250
queue = 'high'

tests/test_worker.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from tests import PyResTests, Basic, TestProcess, ErrorObject, RetryOnExceptionJob
1+
from tests import PyResTests, Basic, TestProcess, ErrorObject, RetryOnExceptionJob, TimeoutJob
22
from pyres import ResQ
33
from pyres.job import Job
44
from pyres.scheduler import Scheduler
@@ -202,6 +202,23 @@ def test_retry_on_exception(self):
202202
assert True == worker.process()
203203
assert worker.get_failed() == 0
204204

205+
def test_kills_stale_workers_after_timeout(self):
206+
import signal
207+
timeout = 1
208+
209+
worker = Worker(['basic'], timeout=timeout)
210+
self.resq.enqueue(TimeoutJob, timeout + 1)
211+
212+
child = os.fork()
213+
if child:
214+
assert worker.get_failed() == 0
215+
time.sleep(timeout + 2)
216+
os.kill(child, signal.SIGKILL)
217+
os.waitpid(-1, os.WNOHANG)
218+
assert worker.get_failed() == 1
219+
else:
220+
worker.work()
221+
205222
def test_retries_give_up_eventually(self):
206223
now = datetime.datetime.now()
207224
self.set_current_time(now)

0 commit comments

Comments
 (0)