Skip to content

Commit dd122b4

Browse files
author
Matt George
committed
Merge branch 'redis_upgrade'
2 parents 16ba89a + dfda467 commit dd122b4

File tree

9 files changed

+71
-35
lines changed

9 files changed

+71
-35
lines changed

pyres/__init__.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
from redis import Redis
44
import pyres.json_parser as json
55

6-
import types
7-
86
def my_import(name):
97
mod = __import__(name)
108
components = name.split('.')
@@ -60,18 +58,18 @@ class ResQ(object):
6058
def __init__(self, server="localhost:6379", password=None,
6159
timeout=None, retry_connection=True):
6260
self.timeout = timeout
63-
self.retry_connection = retry_connection
61+
#self.retry_connection = retry_connection
6462
self.redis = server
6563
if password:
6664
self.redis.auth(password)
6765
self._watched_queues = set()
6866

6967
def push(self, queue, item):
7068
self.watch_queue(queue)
71-
self.redis.push("resque:queue:%s" % queue, ResQ.encode(item))
69+
self.redis.rpush("resque:queue:%s" % queue, ResQ.encode(item))
7270

7371
def pop(self, queue):
74-
ret = self.redis.pop("resque:queue:%s" % queue)
72+
ret = self.redis.lpop("resque:queue:%s" % queue)
7573
if ret:
7674
return ResQ.decode(ret)
7775
return ret
@@ -90,7 +88,7 @@ def peek(self, queue, start=0, count=1):
9088
return self.list_range('resque:queue:%s' % queue, start, count)
9189

9290
def list_range(self, key, start, count):
93-
items = self.redis.lrange(key, start,start+count-1)
91+
items = self.redis.lrange(key, start,start+count-1) or []
9492
ret_list = []
9593
for i in items:
9694
ret_list.append(ResQ.decode(i))
@@ -103,9 +101,7 @@ def _set_redis(self, server):
103101
if isinstance(server, basestring):
104102
self.dsn = server
105103
host, port = server.split(':')
106-
self._redis = Redis(host=host, port=int(port),
107-
retry_connection=self.retry_connection,
108-
timeout=self.timeout)
104+
self._redis = Redis(host=host, port=int(port))
109105
elif isinstance(server, Redis):
110106
self.dsn = '%s:%s' % (server.host,server.port)
111107
self._redis = server
@@ -129,7 +125,7 @@ def enqueue_from_string(self, klass_as_string, queue, *args):
129125
self.push(queue, {'class':klass_as_string,'args':args})
130126

131127
def queues(self):
132-
return self.redis.smembers("resque:queues")
128+
return self.redis.smembers("resque:queues") or []
133129

134130
def info(self):
135131
"""

pyres/failure/redis.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def save(self, resq=None):
1515
if self._worker:
1616
data['worker'] = self._worker
1717
data = ResQ.encode(data)
18-
resq.redis.push('resque:failed', data)
18+
resq.redis.rpush('resque:failed', data)
1919

2020
@classmethod
2121
def count(cls, resq):

pyres/worker.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,8 @@ def all(cls, host="localhost:6379"):
212212
resq = ResQ(host)
213213
elif isinstance(host, ResQ):
214214
resq = host
215-
return [Worker.find(w,resq) for w in resq.redis.smembers('resque:workers')]
215+
216+
return [Worker.find(w,resq) for w in resq.redis.smembers('resque:workers') or []]
216217

217218
@classmethod
218219
def working(cls, host):

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
simplejson==2.0.9
22
itty==0.6.2
3-
redis==0.6.0
3+
redis==1.34.1
44
pystache==0.1.0

resweb/views.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -313,12 +313,14 @@ def key_type(self):
313313
def items(self):
314314
items = []
315315
if self.key_type() == 'list':
316-
for k in self.resq.redis.lrange('resque:'+self.stat_id,0,20):
316+
lst = self.resq.redis.lrange('resque:'+self.stat_id,0,20) or []
317+
for k in lst:
317318
items.append({
318319
'row':str(k)
319320
})
320321
elif self.key_type() == 'set':
321-
for k in self.resq.redis.smembers('resque:'+self.stat_id):
322+
st = self.resq.redis.smembers('resque:'+self.stat_id) or set([])
323+
for k in st:
322324
items.append({
323325
'row':str(k)
324326
})

tests/__init__.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,26 @@ def perform(sleep_time):
4343
import time
4444
time.sleep(sleep_time)
4545
print 'Done Sleeping'
46+
4647
def test_str_to_class():
4748
ret = str_to_class('tests.Basic')
4849
assert ret
50+
assert ret == Basic
51+
assert str_to_class('hello.World') == None
52+
53+
def test_safe_str_to_class():
54+
from pyres import safe_str_to_class
55+
assert safe_str_to_class('tests.Basic') == Basic
56+
assert safe_str_to_class('test.Mine') == None
57+
assert safe_str_to_class('hello.World') == None
4958

5059
class PyResTests(unittest.TestCase):
5160
def setUp(self):
5261
self.resq = ResQ()
5362
self.redis = self.resq.redis
54-
self.redis.flush(True)
63+
self.redis.flushall()
5564

5665
def tearDown(self):
57-
self.redis.flush(True)
66+
self.redis.flushall()
5867
del self.redis
5968
del self.resq

tests/test_resq.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def test_redis_property(self):
4242
from redis import Redis
4343
rq = ResQ(server="localhost:6379")
4444
red = Redis()
45-
rq2 = ResQ(server=red)
45+
#rq2 = ResQ(server=red)
4646
self.assertRaises(Exception, rq.redis,[Basic])
4747

4848
def test_info(self):
@@ -89,3 +89,14 @@ def test_keys(self):
8989
self.resq.enqueue_from_string('tests.Basic','basic','test1')
9090
assert 'queue:basic' in self.resq.keys()
9191
assert 'queues' in self.resq.keys()
92+
93+
def test_queues(self):
94+
assert self.resq.queues() == []
95+
self.resq.enqueue_from_string('tests.Basic','basic','test1')
96+
assert len(self.resq.queues()) == 1
97+
self.resq.enqueue_from_string('tests.Basic','basic','test1')
98+
assert len(self.resq.queues()) == 1
99+
self.resq.enqueue_from_string('tests.Basic','basic2','test1')
100+
assert len(self.resq.queues()) == 2
101+
assert 'test' not in self.resq.queues()
102+
assert 'basic' in self.resq.queues()

tests/test_stats.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,22 @@ class StatTests(PyResTests):
44
def test_incr(self):
55
stat_obj = Stat('test_stat', self.resq)
66
stat_obj.incr()
7-
assert self.redis.get('resque:stat:test_stat') == 1
7+
assert self.redis.get('resque:stat:test_stat') == str(1)
88
stat_obj.incr()
9-
assert self.redis.get('resque:stat:test_stat') == 2
9+
assert self.redis.get('resque:stat:test_stat') == str(2)
1010
stat_obj.incr(2)
11-
assert self.redis.get('resque:stat:test_stat') == 4
11+
assert self.redis.get('resque:stat:test_stat') == str(4)
1212

1313
def test_decr(self):
1414
stat_obj = Stat('test_stat', self.resq)
1515
stat_obj.incr()
1616
stat_obj.incr()
17-
assert self.redis.get('resque:stat:test_stat') == 2
17+
assert self.redis.get('resque:stat:test_stat') == str(2)
1818
stat_obj.decr()
19-
assert self.redis.get('resque:stat:test_stat') == 1
19+
assert self.redis.get('resque:stat:test_stat') == str(1)
2020
stat_obj.incr()
2121
stat_obj.decr(2)
22-
assert self.redis.get('resque:stat:test_stat') == 0
22+
assert self.redis.get('resque:stat:test_stat') == str(0)
2323

2424
def test_get(self):
2525
stat_obj = Stat('test_stat', self.resq)

tests/test_worker.py

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,23 +48,27 @@ def test_processed(self):
4848
worker.processed()
4949
assert self.redis.exists("resque:stat:processed")
5050
assert self.redis.exists("resque:stat:processed:%s" % name)
51-
assert self.redis.get("resque:stat:processed") == 1
52-
assert self.redis.get("resque:stat:processed:%s" % name) == 1
51+
assert self.redis.get("resque:stat:processed") == str(1)
52+
assert self.redis.get("resque:stat:processed:%s" % name) == str(1)
53+
assert worker.get_processed() == 1
5354
worker.processed()
54-
assert self.redis.get("resque:stat:processed") == 2
55-
assert self.redis.get("resque:stat:processed:%s" % name) == 2
55+
assert self.redis.get("resque:stat:processed") == str(2)
56+
assert self.redis.get("resque:stat:processed:%s" % name) == str(2)
57+
assert worker.get_processed() == 2
5658

5759
def test_failed(self):
5860
name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic')
5961
worker = Worker(['basic'])
6062
worker.failed()
6163
assert self.redis.exists("resque:stat:failed")
6264
assert self.redis.exists("resque:stat:failed:%s" % name)
63-
assert self.redis.get("resque:stat:failed") == 1
64-
assert self.redis.get("resque:stat:failed:%s" % name) == 1
65+
assert self.redis.get("resque:stat:failed") == str(1)
66+
assert self.redis.get("resque:stat:failed:%s" % name) == str(1)
67+
assert worker.get_failed() == 1
6568
worker.failed()
66-
assert self.redis.get("resque:stat:failed") == 2
67-
assert self.redis.get("resque:stat:failed:%s" % name) == 2
69+
assert self.redis.get("resque:stat:failed") == str(2)
70+
assert self.redis.get("resque:stat:failed:%s" % name) == str(2)
71+
assert worker.get_failed() == 2
6872

6973
def test_process(self):
7074
name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic')
@@ -102,8 +106,8 @@ def test_job_failure(self):
102106
worker.process()
103107
name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic')
104108
assert not self.redis.get('resque:worker:%s' % worker)
105-
assert self.redis.get("resque:stat:failed") == 1
106-
assert self.redis.get("resque:stat:failed:%s" % name) == 1
109+
assert self.redis.get("resque:stat:failed") == str(1)
110+
assert self.redis.get("resque:stat:failed:%s" % name) == str(1)
107111

108112
def test_get_job(self):
109113
worker = Worker(['basic'])
@@ -112,6 +116,7 @@ def test_get_job(self):
112116
worker.working_on(job)
113117
name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic')
114118
assert worker.job() == ResQ.decode(self.redis.get('resque:worker:%s' % name))
119+
assert worker.processing() == ResQ.decode(self.redis.get('resque:worker:%s' % name))
115120
worker.done_working()
116121
w2 = Worker(['basic'])
117122
print w2.job()
@@ -138,4 +143,16 @@ def test_started(self):
138143
assert self.redis.get('resque:worker:%s:started' % name) == dt.strftime('%Y-%m-%d %H:%M:%S')
139144
assert worker.started == datetime.datetime.strptime(dt.strftime('%Y-%m-%d %H:%M:%S'),'%Y-%m-%d %H:%M:%S')
140145
worker.started = None
141-
assert not self.redis.exists('resque:worker:%s:started' % name)
146+
assert not self.redis.exists('resque:worker:%s:started' % name)
147+
148+
def test_state(self):
149+
worker = Worker(['basic'])
150+
assert worker.state() == 'idle'
151+
self.resq.enqueue_from_string('tests.Basic','basic','test1')
152+
worker.register_worker()
153+
job = Job.reserve('basic', self.resq)
154+
worker.working_on(job)
155+
assert worker.state() == 'working'
156+
worker.done_working()
157+
assert worker.state() == 'idle'
158+

0 commit comments

Comments
 (0)