Skip to content

Commit 3f1029c

Browse files
author
Matt George
committed
added more test coverage
Also fixed some of the code that the new tests uncovered as crap.
1 parent eca7c5e commit 3f1029c

File tree

3 files changed

+119
-27
lines changed

3 files changed

+119
-27
lines changed

src/pyres/__init__.py

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,12 @@ def info(self):
8585
pending += self.size(q)
8686
return {
8787
'pending' : pending,
88-
'processed' : Stat['processed'],
89-
'queues' : queues.size,
90-
'workers' : len(self.workers),
91-
'working' : len(self.working),
92-
'failed' : Stat['failed'],
93-
'servers' : [redis.server]
88+
'processed' : Stat('processed',self).get(),
89+
'queues' : len(self.queues()),
90+
'workers' : len(self.workers()),
91+
#'working' : len(self.working()),
92+
'failed' : Stat('failed',self).get(),
93+
'servers' : ['%s:%s' % (self.redis.host, self.redis.port)]
9494
}
9595

9696
def keys(self):
@@ -104,7 +104,8 @@ def __str__(self):
104104
return "PyRes Client connected to %s" % self.redis.server
105105

106106
def workers(self):
107-
raise NotImplementedError
107+
from pyres.worker import Worker
108+
return Worker.all(self)
108109

109110
def working(self):
110111
raise NotImplementedError
@@ -120,11 +121,11 @@ def decode(cls, item):
120121
return ret
121122
return None
122123

123-
@staticmethod
124-
def enqueue(klass, *args):
124+
@classmethod
125+
def _enqueue(cls, klass, *args):
125126
queue = getattr(klass,'queue', None)
126127
#print cls._res
127-
_self = ResQ()
128+
_self = cls()
128129
if queue:
129130
class_name = '%s.%s' % (klass.__module__, klass.__name__)
130131
#print class_name
@@ -137,15 +138,11 @@ def __init__(self, name, resq):
137138
self.key = "stat:%s" % self.name
138139
self.resq = resq
139140

140-
def __getitem__(self, name):
141-
val = self.get(name)
142-
if val is None:
143-
raise KeyError
144-
return val
145-
146141
def get(self):
147142
val = self.resq.redis.get(self.key)
148-
return int(val)
143+
if val:
144+
return int(val)
145+
return 0
149146

150147
def incr(self, ammount=1):
151148
self.resq.redis.incr(self.key, ammount)

src/pyres/worker.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,11 @@ def run(cls, queues, server):
149149
worker.work()
150150

151151
@classmethod
152-
def all(cls, host):
153-
resq = ResQ(host)
152+
def all(cls, host="localhost:6379"):
153+
if isinstance(host,basestring):
154+
resq = ResQ(host)
155+
elif isinstance(host, ResQ):
156+
resq = host
154157
return resq.redis.smembers('workers')
155158

156159
@classmethod

src/tests.py

Lines changed: 100 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import os
33
from pyres import ResQ, str_to_class
44
from pyres.job import Job
5-
from pyres.worker import Worker, JuniorWorker
5+
from pyres.worker import Worker
66
class Basic(object):
77
queue = 'basic'
88

@@ -11,7 +11,24 @@ def perform(name):
1111
s = "name:%s" % name
1212
print s
1313
return s
14+
15+
class TestProcess(object):
16+
queue = 'high'
17+
18+
@staticmethod
19+
def perform():
20+
import time
21+
time.sleep(.5)
22+
return 'Done Sleeping'
23+
24+
25+
class ErrorObjcet(object):
26+
queue = 'basic'
1427

28+
@staticmethod
29+
def perform():
30+
raise Exception("Could not finish job")
31+
1532
def test_str_to_class():
1633
ret = str_to_class('tests.Basic')
1734
assert ret
@@ -32,7 +49,7 @@ class ResQTests(PyResTests):
3249
def test_enqueue(self):
3350
self.resq.enqueue(Basic,"test1")
3451
self.resq.enqueue(Basic,"test2")
35-
ResQ.enqueue(Basic, "test3")
52+
ResQ._enqueue(Basic, "test3")
3653
assert self.redis.llen("queue:basic") == 3
3754
assert self.redis.sismember('queues','basic')
3855

@@ -56,6 +73,39 @@ def test_peek(self):
5673
self.resq.enqueue(Basic,"test1")
5774
self.resq.enqueue(Basic,"test2")
5875
assert len(self.resq.peek('basic',0,20)) == 2
76+
77+
def test_size(self):
78+
self.resq.enqueue(Basic,"test1")
79+
self.resq.enqueue(Basic,"test2")
80+
assert self.resq.size('basic') == 2
81+
assert self.resq.size('noq') == 0
82+
83+
def test_redis_property(self):
84+
from redis import Redis
85+
rq = ResQ(server="localhost:6379")
86+
red = Redis()
87+
rq2 = ResQ(server=red)
88+
self.assertRaises(Exception, rq.redis,[Basic])
89+
90+
def test_info(self):
91+
self.resq.enqueue(Basic,"test1")
92+
self.resq.enqueue(TestProcess)
93+
info = self.resq.info()
94+
assert info['queues'] == 2
95+
assert info['servers'] == ['localhost:6379']
96+
assert info['workers'] == 0
97+
worker = Worker(['basic'])
98+
worker.register_worker()
99+
info = self.resq.info()
100+
assert info['workers'] == 1
101+
102+
def test_workers(self):
103+
worker = Worker(['basic'])
104+
worker.register_worker()
105+
name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic')
106+
assert len(self.resq.workers()) == 1
107+
assert name in self.resq.workers()
108+
59109

60110
class JobTests(PyResTests):
61111
def test_reserve(self):
@@ -67,7 +117,10 @@ def test_reserve(self):
67117
def test_perform(self):
68118
self.resq.enqueue(Basic,"test1")
69119
job = Job.reserve('basic',self.resq)
120+
self.resq.enqueue(TestProcess)
121+
job2 = Job.reserve('high', self.resq)
70122
assert job.perform() == "name:test1"
123+
assert job2.perform()
71124

72125
def test_fail(self):
73126
self.resq.enqueue(Basic,"test1")
@@ -79,11 +132,21 @@ def test_fail(self):
79132

80133
class WorkerTests(PyResTests):
81134
def test_worker_init(self):
82-
try:
83-
worker = Worker([])
84-
except:
85-
assert True
86-
135+
from pyres.exceptions import NoQueueError
136+
self.assertRaises(NoQueueError, Worker,[])
137+
self.assertRaises(Exception, Worker,['test'],TestProcess())
138+
139+
def test_startup(self):
140+
worker = Worker(['basic'])
141+
worker.startup()
142+
name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic')
143+
assert self.redis.sismember('workers',name)
144+
import signal
145+
assert signal.getsignal(signal.SIGTERM) == worker.shutdown_all
146+
assert signal.getsignal(signal.SIGINT) == worker.shutdown_all
147+
assert signal.getsignal(signal.SIGQUIT) == worker.schedule_shutdown
148+
assert signal.getsignal(signal.SIGUSR1) == worker.kill_child
149+
87150
def test_register(self):
88151
worker = Worker(['basic'])
89152
worker.register_worker()
@@ -139,4 +202,33 @@ def test_process(self):
139202
assert not self.redis.get('worker:%s' % worker)
140203
assert not self.redis.get("stat:failed")
141204
assert not self.redis.get("stat:failed:%s" % name)
142-
205+
self.resq.enqueue(Basic,"test1")
206+
worker.process()
207+
assert not self.redis.get('worker:%s' % worker)
208+
assert not self.redis.get("stat:failed")
209+
assert not self.redis.get("stat:failed:%s" % name)
210+
211+
212+
def test_signals(self):
213+
worker = Worker(['basic'])
214+
worker.startup()
215+
import inspect, signal
216+
frame = inspect.currentframe()
217+
worker.schedule_shutdown(frame, signal.SIGQUIT)
218+
assert worker._shutdown
219+
del worker
220+
worker = Worker(['high'])
221+
#self.resq.enqueue(TestSleep)
222+
#worker.work()
223+
#assert worker.child
224+
assert not worker.kill_child(frame, signal.SIGUSR1)
225+
226+
def test_job_failure(self):
227+
self.resq.enqueue(ErrorObjcet)
228+
worker = Worker(['basic'])
229+
#worker.process()
230+
name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic')
231+
#assert not self.redis.get('worker:%s' % worker)
232+
#assert self.redis.get("stat:failed")
233+
#assert self.redis.get("stat:failed:%s" % name)
234+
assert False

0 commit comments

Comments
 (0)