Skip to content

Commit 2551545

Browse files
author
Matt George
committed
Merge commit 'acc9414ab51a9692e5660dbaad175564d6e67582' into refactor_merge
Conflicts: pyres/worker.py src/resweb/server.py
2 parents 8e51e4f + acc9414 commit 2551545

File tree

3 files changed

+22
-18
lines changed

3 files changed

+22
-18
lines changed

pyres/__init__.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,26 +26,26 @@ def __init__(self, server="localhost:6379"):
2626

2727
def push(self, queue, item):
2828
self.watch_queue(queue)
29-
self.redis.push("queue:%s" % queue, ResQ.encode(item))
29+
self.redis.push("resque:queue:%s" % queue, ResQ.encode(item))
3030

3131
def pop(self, queue):
32-
ret = self.redis.pop("queue:%s" % queue)
32+
ret = self.redis.pop("resque:queue:%s" % queue)
3333
if ret:
3434
return ResQ.decode(ret)
3535
return ret
3636

3737
def size(self, queue):
38-
return int(self.redis.llen("queue:%s" % queue))
38+
return int(self.redis.llen("resque:queue:%s" % queue))
3939

4040
def watch_queue(self, queue):
4141
if queue in self._watched_queues:
4242
return
4343
else:
44-
if self.redis.sadd('queues',str(queue)):
44+
if self.redis.sadd('resque:queues',str(queue)):
4545
self._watched_queues.add(queue)
4646

4747
def peek(self, queue, start=0, count=1):
48-
return self.list_range('queue:%s' % queue, start, count)
48+
return self.list_range('resque:queue:%s' % queue, start, count)
4949

5050
def list_range(self, key, start, count):
5151
items = self.redis.lrange(key,start,start+count-1)
@@ -81,7 +81,7 @@ def enqueue_from_string(self, klass_as_string, queue, *args):
8181
self.push(queue, {'class':klass_as_string,'args':args})
8282

8383
def queues(self):
84-
return self.redis.smembers("queues")
84+
return self.redis.smembers("resque:queues")
8585

8686
def info(self):
8787
pending = 0
@@ -118,8 +118,8 @@ def working(self):
118118
def remove_queue(self, queue):
119119
if queue in self._watched_queues:
120120
self._watched_queues.remove(queue)
121-
self.redis.srem('queues',queue)
122-
del self.redis['queue:%s' % queue]
121+
self.redis.srem('resque:queues',queue)
122+
del self.redis['resque:queue:%s' % queue]
123123

124124
@classmethod
125125
def encode(cls, item):
@@ -146,7 +146,7 @@ def _enqueue(cls, klass, *args):
146146
class Stat(object):
147147
def __init__(self, name, resq):
148148
self.name = name
149-
self.key = "stat:%s" % self.name
149+
self.key = "resque:stat:%s" % self.name
150150
self.resq = resq
151151

152152
def get(self):

pyres/extensions.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
except:
44
import sys
55
sys.exit("multiprocessing was not available")
6+
import os, datetime, time, signal
7+
from pyres import ResQ
68

79
from pyres.exceptions import NoQueueError
810
from pyres.worker import Worker

pyres/worker.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import os, sys
77
import time
88
import simplejson
9+
910
class Worker(object):
1011
def __init__(self, queues=[], server="localhost:6379"):
1112
self.queues = queues
@@ -26,7 +27,7 @@ def validate_queues(self):
2627
raise NoQueueError("Please give each worker at least one queue.")
2728

2829
def register_worker(self):
29-
self.resq.redis.sadd('workers',str(self))
30+
self.resq.redis.sadd('resque:workers',str(self))
3031
#self.resq._redis.add("worker:#{self}:started", Time.now.to_s)
3132
self.started = datetime.datetime.now()
3233
#Stat.clear("processed:#{self}")
@@ -47,7 +48,7 @@ def _get_started(self):
4748
started = property(_get_started, _set_started)
4849

4950
def unregister_worker(self):
50-
self.resq.redis.srem('workers',str(self))
51+
self.resq.redis.srem('resque:workers',str(self))
5152
self.started = None
5253

5354
def startup(self):
@@ -141,14 +142,14 @@ def working_on(self, job):
141142
'payload': job._payload
142143
}
143144
data = simplejson.dumps(data)
144-
self.resq.redis["worker:%s" % str(self)] = data
145+
self.resq.redis["resque:worker:%s" % str(self)] = data
145146
print "worker:%s" % str(self)
146147
print self.resq.redis["worker:%s" % str(self)]
147148

148149
def done_working(self):
149150
print 'done working'
150151
self.processed()
151-
self.resq.redis.delete("worker:%s" % str(self))
152+
self.resq.redis.delete("resque:worker:%s" % str(self))
152153

153154
def processed(self):
154155
total_processed = Stat("processed", self.resq)
@@ -167,7 +168,7 @@ def failed(self):
167168
def get_failed(self):
168169
return Stat("failed:%s" % self, self.resq).get()
169170
def job(self):
170-
data = self.resq.redis.get("worker:%s" % self)
171+
data = self.resq.redis.get("resque:worker:%s" % self)
171172
if data:
172173
return ResQ.decode(data)
173174
return {}
@@ -190,7 +191,7 @@ def all(cls, host="localhost:6379"):
190191
resq = ResQ(host)
191192
elif isinstance(host, ResQ):
192193
resq = host
193-
return [Worker.find(w,resq) for w in resq.redis.smembers('workers')]
194+
return [Worker.find(w,resq) for w in resq.redis.smembers('resque:workers')]
194195

195196
@classmethod
196197
def working(cls, host):
@@ -200,7 +201,8 @@ def working(cls, host):
200201
resq = host
201202
total = []
202203
for key in Worker.all(host):
203-
total.append('worker:%s' % key)
204+
if Worker.exists(key, resq):
205+
total.append('resque:worker:%s' % key)
204206
names = []
205207
for key in total:
206208
value = resq.redis.get(key)
@@ -221,7 +223,7 @@ def find(cls, worker_id, resq):
221223

222224
@classmethod
223225
def exists(cls, worker_id, resq):
224-
return resq.redis.sismember('workers', worker_id)
226+
return resq.redis.sismember('resque:workers', worker_id)
225227

226228

227229
if __name__ == "__main__":
@@ -236,4 +238,4 @@ def exists(cls, worker_id, resq):
236238
queues = options.queue_list.split(',')
237239
import sys
238240
sys.path.insert(0,'/Users/mgeorge/dev/pyres/src')
239-
Worker.run(queues, options.server)
241+
Worker.run(queues, options.server)

0 commit comments

Comments
 (0)