Skip to content

Commit eb2cbf2

Browse files
committed
Adding test to check whether pyres identifies a crashed process as a
failure (currently failing). The biggest problem shown by this test is the fact that, even after the child process has crashed and the worker is ready to process new jobs, the entry associating the job with the worker remain on Redis. It also refactors the test for timeout errors on staled workers, making it simpler.
1 parent 8762b63 commit eb2cbf2

File tree

2 files changed

+26
-11
lines changed

2 files changed

+26
-11
lines changed

tests/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,16 @@ def perform(wait_for):
4646
time.sleep(wait_for)
4747
return "Done Sleeping"
4848

49+
class CrashJob(object):
50+
queue = 'basic'
51+
52+
@staticmethod
53+
def perform():
54+
# Dangerous, this will cause a hard crash of the python process
55+
import ctypes
56+
ctypes.string_at(1)
57+
return "Never got here"
58+
4959
class TestProcess(object):
5060
queue = 'high'
5161

tests/test_worker.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from tests import PyResTests, Basic, TestProcess, ErrorObject, RetryOnExceptionJob, TimeoutJob
1+
from tests import PyResTests, Basic, TestProcess, ErrorObject, RetryOnExceptionJob, TimeoutJob, CrashJob
22
from pyres import ResQ
33
from pyres.job import Job
44
from pyres.scheduler import Scheduler
@@ -203,21 +203,26 @@ def test_retry_on_exception(self):
203203
assert worker.get_failed() == 0
204204

205205
def test_kills_stale_workers_after_timeout(self):
206-
import signal
207206
timeout = 1
208207

209208
worker = Worker(['basic'], timeout=timeout)
210209
self.resq.enqueue(TimeoutJob, timeout + 1)
211210

212-
child = os.fork()
213-
if child:
214-
assert worker.get_failed() == 0
215-
time.sleep(timeout + 2)
216-
os.kill(child, signal.SIGKILL)
217-
os.waitpid(-1, os.WNOHANG)
218-
assert worker.get_failed() == 1
219-
else:
220-
worker.work()
211+
assert worker.get_failed() == 0
212+
worker.fork_worker(worker.reserve())
213+
assert worker.get_failed() == 1
214+
215+
def test_detect_crashed_workers_as_failures(self):
216+
worker = Worker(['basic'])
217+
self.resq.enqueue(CrashJob)
218+
219+
assert worker.job() == {}
220+
assert worker.get_failed() == 0
221+
222+
worker.fork_worker(worker.reserve())
223+
224+
assert worker.job() == {}
225+
assert worker.get_failed() == 1
221226

222227
def test_retries_give_up_eventually(self):
223228
now = datetime.datetime.now()

0 commit comments

Comments
 (0)