Skip to content

Commit 5a370b3

Browse files
author
Matt George
committed
Updated the ResQ class to better reflect resque
Also added __getattr__ to the stats class and a property for the redis object in ResQ
1 parent a87127d commit 5a370b3

File tree

6 files changed

+134
-63
lines changed

6 files changed

+134
-63
lines changed

src/pyres/__init__.py

Lines changed: 87 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,88 +17,142 @@ def str_to_class(s):
1717
return None
1818

1919
class ResQ(object):
20-
_res = None
20+
2121
def __init__(self, server="localhost:6379"):
22-
self._server = server
23-
host, port = server.split(':')
24-
self._redis = Redis(host=host, port=int(port))
22+
self.redis = server
2523
self._watched_queues = {}
26-
24+
2725
def push(self, queue, item):
2826
self.watch_queue(queue)
29-
self._redis.push("queue:%s" % queue, ResQ.encode(item))
30-
27+
self.redis.push("queue:%s" % queue, ResQ.encode(item))
28+
3129
def pop(self, queue):
32-
ret = self._redis.pop("queue:%s" % queue)
30+
ret = self.redis.pop("queue:%s" % queue)
3331
if ret:
3432
return ResQ.decode(ret)
3533
return ret
36-
34+
3735
def size(self, queue):
38-
return int(self._redis.llen("queue:%s" % queue))
39-
36+
return int(self.redis.llen("queue:%s" % queue))
37+
4038
def watch_queue(self, queue):
4139
if self._watched_queues.has_key(queue):
4240
return
4341
else:
44-
if self._redis.sadd('queues',str(queue)):
42+
if self.redis.sadd('queues',str(queue)):
4543
self._watched_queues[queue] = queue
44+
4645
def peek(self, queue, start=0, count=1):
4746
return self.list_range('queue:%s' % queue, start, count)
48-
47+
4948
def list_range(self, key, start, count):
50-
items = self._redis.lrange(key,start,start+count-1)
49+
items = self.redis.lrange(key,start,start+count-1)
5150
ret_list = []
5251
for i in items:
5352
ret_list.append(ResQ.decode(i))
5453
return ret_list
54+
55+
def _get_redis(self):
56+
return self._redis
57+
58+
def _set_redis(self, server):
59+
if isinstance(server, basestring):
60+
self.dsn = server
61+
host, port = server.split(':')
62+
self._redis = Redis(host=host, port=int(port))
63+
elif isinstance(server, Redis):
64+
self.dsn = '%s:%s' % (server.host,server.port)
65+
self._redis = server
66+
else:
67+
raise Exception("I don't know what to do with %s" % str(server))
68+
redis = property(_get_redis, _set_redis)
69+
70+
def enqueue(self, klass, *args):
71+
queue = getattr(klass,'queue', None)
72+
#print cls._res
73+
if queue:
74+
class_name = '%s.%s' % (klass.__module__, klass.__name__)
75+
#print class_name
76+
self.push(queue, {'class':class_name,'args':args})
77+
#Job.create(queue, klass,*args)
78+
79+
def queues(self):
80+
return self.redis.smembers("queues")
81+
82+
def info(self):
83+
pending = 0
84+
for q in self.queues():
85+
pending += self.size(q)
86+
return {
87+
'pending' : pending,
88+
'processed' : Stat['processed'],
89+
'queues' : queues.size,
90+
'workers' : len(self.workers),
91+
'working' : len(self.working),
92+
'failed' : Stat['failed'],
93+
'servers' : [redis.server]
94+
}
95+
96+
def keys(self):
97+
raise NotImplementedError
98+
99+
def reserve(self, queue):
100+
from pyres.job import Job
101+
return Job.reserve(queue, self)
102+
103+
def __str__(self):
104+
return "PyRes Client connected to %s" % self.redis.server
105+
106+
def workers(self):
107+
raise NotImplementedError
108+
109+
def working(self):
110+
raise NotImplementedError
111+
55112
@classmethod
56113
def encode(cls, item):
57114
return simplejson.dumps(item)
58-
115+
59116
@classmethod
60117
def decode(cls, item):
61118
if item:
62119
ret = simplejson.loads(item)
63120
return ret
64121
return None
65122

66-
@classmethod
67-
def enqueue(cls, klass, *args):
123+
@staticmethod
124+
def enqueue(klass, *args):
68125
queue = getattr(klass,'queue', None)
69126
#print cls._res
70-
resq = cls()
127+
_self = ResQ()
71128
if queue:
72129
class_name = '%s.%s' % (klass.__module__, klass.__name__)
73130
#print class_name
74-
resq.push(queue, {'class':class_name,'args':args})
131+
_self.push(queue, {'class':class_name,'args':args})
75132
#Job.create(queue, klass,*args)
76-
77-
@classmethod
78-
def queues(cls, server="localhost:6379"):
79-
resq = cls(server)
80-
return resq._redis.smembers("queues")
81-
82-
#@classmethod
83-
#def working(cls, server="localhost:6379"):
84-
# resq = cls(server)
85-
# return Worker.working(resq)
133+
86134
class Stat(object):
87135
def __init__(self, name, resq):
88136
self.name = name
89137
self.key = "stat:%s" % self.name
90138
self.resq = resq
91139

140+
def __getitem__(self, name):
141+
val = self.get(name)
142+
if val is None:
143+
raise KeyError
144+
return val
145+
92146
def get(self):
93-
val = self.resq._redis.get(self.key)
147+
val = self.resq.redis.get(self.key)
94148
return int(val)
95149

96150
def incr(self, ammount=1):
97-
self.resq._redis.incr(self.key, ammount)
151+
self.resq.redis.incr(self.key, ammount)
98152

99153
def decr(self, ammount=1):
100-
self.resq._redis.decr(self.key, ammount)
154+
self.resq.redis.decr(self.key, ammount)
101155

102156
def clear(self):
103-
self.resq._redis.delete(self.key)
157+
self.resq.redis.delete(self.key)
104158

src/pyres/failure.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ def save(self, resq):
1515
'queue' : self._queue
1616
}
1717
data = ResQ.encode(data)
18-
resq._redis.push('failed', data)
18+
resq.redis.push('failed', data)
1919

2020
@classmethod
2121
def count(cls, resq):
22-
return int(resq._redis.llen('failed'))
22+
return int(resq.redis.llen('failed'))
2323

2424
@classmethod
2525
def create(cls, options={}):

src/pyres/job.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,5 @@ def fail(self, exception):
2323
@classmethod
2424
def reserve(cls, queue, res):
2525
payload = res.pop(queue)
26-
print "payload: %s" % payload
2726
if payload:
2827
return cls(queue, payload, res)

src/pyres/manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ def startup(self):
2626
self.register_signals()
2727

2828
def register_manager(self):
29-
self.resq._redis.sadd('managers',str(self))
29+
self.resq.redis.sadd('managers',str(self))
3030

3131
def unregister_manager(self):
32-
self.resq._redis.srem('managers',str(self))
32+
self.resq.redis.srem('managers',str(self))
3333

3434
def register_signals(self):
3535
signal.signal(signal.SIGTERM, self.shutdown_all)

src/pyres/worker.py

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,31 @@
77
import time
88
import simplejson
99
class Worker(object):
10-
def __init__(self, queues=[], host="localhost:6379"):
10+
def __init__(self, queues=[], server="localhost:6379"):
1111
self.queues = queues
1212
self.validate_queues()
1313
self._shutdown = False
1414
self.child = None
15-
self.resq = ResQ(host)
15+
if isinstance(server,basestring):
16+
self.resq = ResQ(server)
17+
elif isinstance(server, ResQ):
18+
self.resq = server
19+
else:
20+
raise Exception("Bad server argument")
21+
1622

1723
def validate_queues(self):
1824
if not self.queues:
1925
raise NoQueueError("Please give each worker at least one queue.")
2026

2127
def register_worker(self):
22-
23-
self.resq._redis.sadd('workers',str(self))
28+
self.resq.redis.sadd('workers',str(self))
2429
#self.resq._redis.add("worker:#{self}:started", Time.now.to_s)
2530
#Stat.clear("processed:#{self}")
2631
#Stat.clear("failed:#{self}")
2732

2833
def unregister_worker(self):
29-
self.resq._redis.srem('workers',str(self))
34+
self.resq.redis.srem('workers',str(self))
3035

3136
def startup(self):
3237
self.register_signal_handlers()
@@ -103,14 +108,14 @@ def working_on(self, job):
103108
'payload': job._payload
104109
}
105110
data = simplejson.dumps(data)
106-
self.resq._redis.set("worker:%s" % str(self), data)
111+
self.resq.redis["worker:%s" % str(self)] = data
107112

108113
def job(self):
109-
return ResQ.decode(self.resq._redis.get("worker:%s" % self)) or {}
114+
return ResQ.decode(self.resq.redis.get("worker:%s" % self)) or {}
110115

111116
def done_working(self):
112117
self.processed()
113-
self.resq._redis.delete("worker:%s" % str(self))
118+
self.resq.redis.delete("worker:%s" % str(self))
114119

115120
def processed(self):
116121
total_processed = Stat("processed", self.resq)
@@ -125,7 +130,7 @@ def failed(self):
125130
stat.incr()
126131

127132
def job(self):
128-
data = self.resq._redis.get("worker:%s" % self)
133+
data = self.resq.redis.get("worker:%s" % self)
129134
if data:
130135
return ResQ.decode(data)
131136
return {}
@@ -139,13 +144,15 @@ def __str__(self):
139144
return '%s:%s:%s' % (hostname, pid, ','.join(self.queues))
140145

141146
@classmethod
142-
def run(cls, queues, host):
143-
worker = cls(queues=queues, host=host)
147+
def run(cls, queues, server):
148+
worker = cls(queues=queues, host=server)
144149
worker.work()
150+
145151
@classmethod
146152
def all(cls, host):
147153
resq = ResQ(host)
148-
return resq._redis.smembers('workers')
154+
return resq.redis.smembers('workers')
155+
149156
@classmethod
150157
def working(cls, host):
151158
resq = ResQ(host)
@@ -169,7 +176,7 @@ def find(cls, worker_id, resq):
169176

170177
@classmethod
171178
def exists(cls, worker_id, resq):
172-
return resq._redis.sismember('workers', worker_id)
179+
return resq.redis.sismember('workers', worker_id)
173180

174181
class JuniorWorker(Worker):
175182
def work(self, interval=5):

src/tests.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ class Basic(object):
88

99
@staticmethod
1010
def perform(name):
11-
return "name:%s" % name
11+
s = "name:%s" % name
12+
print s
13+
return s
1214

1315
def test_str_to_class():
1416
ret = str_to_class('tests.Basic')
@@ -17,19 +19,21 @@ def test_str_to_class():
1719
class PyResTests(unittest.TestCase):
1820
def setUp(self):
1921
self.resq = ResQ()
20-
self.redis = self.resq._redis
22+
self.redis = self.resq.redis
2123
self.redis.flush(True)
2224

2325
def tearDown(self):
26+
self.redis.flush(True)
2427
del self.redis
2528
del self.resq
2629

2730

2831
class ResQTests(PyResTests):
2932
def test_enqueue(self):
30-
ResQ.enqueue(Basic,"test1")
31-
ResQ.enqueue(Basic,"test2")
32-
assert self.redis.llen("queue:basic") == 2
33+
self.resq.enqueue(Basic,"test1")
34+
self.resq.enqueue(Basic,"test2")
35+
ResQ.enqueue(Basic, "test3")
36+
assert self.redis.llen("queue:basic") == 3
3337
assert self.redis.sismember('queues','basic')
3438

3539
def test_push(self):
@@ -49,24 +53,24 @@ def test_pop(self):
4953
assert self.redis.llen('queue:pushq') == 0
5054

5155
def test_peek(self):
52-
ResQ.enqueue(Basic,"test1")
53-
ResQ.enqueue(Basic,"test2")
56+
self.resq.enqueue(Basic,"test1")
57+
self.resq.enqueue(Basic,"test2")
5458
assert len(self.resq.peek('basic',0,20)) == 2
5559

5660
class JobTests(PyResTests):
5761
def test_reserve(self):
58-
ResQ.enqueue(Basic,"test1")
62+
self.resq.enqueue(Basic,"test1")
5963
job = Job.reserve('basic', self.resq)
6064
assert job._queue == 'basic'
6165
assert job._payload
6266

6367
def test_perform(self):
64-
ResQ.enqueue(Basic,"test1")
68+
self.resq.enqueue(Basic,"test1")
6569
job = Job.reserve('basic',self.resq)
6670
assert job.perform() == "name:test1"
6771

6872
def test_fail(self):
69-
ResQ.enqueue(Basic,"test1")
73+
self.resq.enqueue(Basic,"test1")
7074
job = Job.reserve('basic',self.resq)
7175
assert self.redis.llen('failed') == 0
7276
job.fail("problem")
@@ -96,7 +100,7 @@ def test_unregister(self):
96100

97101
def test_working_on(self):
98102
name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic')
99-
ResQ.enqueue(Basic,"test1")
103+
self.resq.enqueue(Basic,"test1")
100104
job = Job.reserve('basic', self.resq)
101105
worker = Worker(['basic'])
102106
worker.working_on(job)
@@ -127,5 +131,12 @@ def test_failed(self):
127131
assert self.redis.get("stat:failed:%s" % name) == 2
128132

129133
def test_process(self):
130-
assert False
134+
name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic')
135+
self.resq.enqueue(Basic,"test1")
136+
job = Job.reserve('basic', self.resq)
137+
worker = Worker(['basic'])
138+
worker.process(job)
139+
assert not self.redis.get('worker:%s' % worker)
140+
assert not self.redis.get("stat:failed")
141+
assert not self.redis.get("stat:failed:%s" % name)
131142

0 commit comments

Comments
 (0)