Skip to content

Commit 12cf070

Browse files
committed
don't BLPOP each queue individually; do them all at once
Fixes issue binarymatt#85
1 parent 2adc3f0 commit 12cf070

File tree

5 files changed

+46
-26
lines changed

5 files changed

+46
-26
lines changed

pyres/__init__.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -149,13 +149,16 @@ def push(self, queue, item):
149149
self.watch_queue(queue)
150150
self.redis.rpush("resque:queue:%s" % queue, ResQ.encode(item))
151151

152-
def pop(self, queue, timeout=10):
153-
ret = self.redis.blpop("resque:queue:%s" % queue, timeout=timeout)
152+
def pop(self, queues, timeout=10):
153+
if isinstance(queues, basestring):
154+
queues = [queues]
155+
ret = self.redis.blpop(["resque:queue:%s" % q for q in queues],
156+
timeout=timeout)
154157
if ret:
155-
if isinstance(ret, tuple):
156-
q, ret = ret
157-
return ResQ.decode(ret)
158-
return ret
158+
key, ret = ret
159+
return key[13:], ResQ.decode(ret) # trim "resque:queue:"
160+
else:
161+
return None, None
159162

160163
def size(self, queue):
161164
return int(self.redis.llen("resque:queue:%s" % queue))
@@ -254,9 +257,9 @@ def keys(self):
254257
return [key.replace('resque:','')
255258
for key in self.redis.keys('resque:*')]
256259

257-
def reserve(self, queue):
260+
def reserve(self, queues):
258261
from pyres.job import Job
259-
return Job.reserve(queue, self)
262+
return Job.reserve(queues, self)
260263

261264
def __str__(self):
262265
return "PyRes Client connected to %s" % self.redis.server

pyres/horde.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,11 @@ def __str__(self):
7373
return '%s:%s:%s' % (self.hostname, self.pid, ','.join(self.queues))
7474

7575
def reserve(self):
76-
for q in self.queues:
77-
self.logger.debug('checking queue: %s' % q)
78-
job = Job.reserve(q, self.resq, self.__str__())
79-
if job:
80-
self.logger.info('Found job on %s' % q)
81-
return job
76+
self.logger.debug('checking queues: %s' % self.queues)
77+
job = Job.reserve(self.queues, self.resq, self.__str__())
78+
if job:
79+
self.logger.info('Found job on %s' % job._queue)
80+
return job
8281

8382
def process(self, job):
8483
if not job:

pyres/job.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,13 @@ def retry(self, payload_class, args):
8383
return False
8484

8585
@classmethod
86-
def reserve(cls, queue, res, worker=None, timeout=10):
87-
"""Reserve a job on the queue. This marks this job so that other workers
88-
will not pick it up.
86+
def reserve(cls, queues, res, worker=None, timeout=10):
87+
"""Reserve a job on one of the queues. This marks this job so
88+
that other workers will not pick it up.
8989
9090
"""
91-
payload = res.pop(queue, timeout=timeout)
91+
if isinstance(queues, basestring):
92+
queues = [queues]
93+
queue, payload = res.pop(queues, timeout=timeout)
9294
if payload:
9395
return cls(queue, payload, res, worker)

pyres/worker.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -221,12 +221,11 @@ def process(self, job=None):
221221
self.done_working()
222222

223223
def reserve(self, timeout=10):
224-
for q in self.queues:
225-
logger.debug('checking queue %s' % q)
226-
job = self.job_class.reserve(q, self.resq, self.__str__(), timeout=timeout)
227-
if job:
228-
logger.info('Found job on %s' % q)
229-
return job
224+
logger.debug('checking queues %s' % self.queues)
225+
job = self.job_class.reserve(self.queues, self.resq, self.__str__(), timeout=timeout)
226+
if job:
227+
logger.info('Found job on %s' % job._queue)
228+
return job
230229

231230
def working_on(self, job):
232231
logger.debug('marking as working on')

tests/test_resq.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,27 @@ def test_pop(self):
2222
self.resq.push('pushq','content-newqueue')
2323
self.resq.push('pushq','content2-newqueue')
2424
assert self.redis.llen('resque:queue:pushq') == 2
25-
assert self.resq.pop('pushq') == 'content-newqueue'
25+
assert self.resq.pop('pushq') == ('pushq', 'content-newqueue')
2626
assert self.redis.llen('resque:queue:pushq') == 1
27-
assert self.resq.pop('pushq') == 'content2-newqueue'
27+
assert self.resq.pop(['pushq']) == ('pushq', 'content2-newqueue')
2828
assert self.redis.llen('resque:queue:pushq') == 0
29+
30+
def test_pop_two_queues(self):
31+
self.resq.push('pushq1', 'content-q1-1')
32+
self.resq.push('pushq1', 'content-q1-2')
33+
self.resq.push('pushq2', 'content-q2-1')
34+
assert self.redis.llen('resque:queue:pushq1') == 2
35+
assert self.redis.llen('resque:queue:pushq2') == 1
36+
assert self.resq.pop(['pushq1', 'pushq2']) == ('pushq1', 'content-q1-1')
37+
assert self.redis.llen('resque:queue:pushq1') == 1
38+
assert self.redis.llen('resque:queue:pushq2') == 1
39+
assert self.resq.pop(['pushq2', 'pushq1']) == ('pushq2', 'content-q2-1')
40+
assert self.redis.llen('resque:queue:pushq1') == 1
41+
assert self.redis.llen('resque:queue:pushq2') == 0
42+
assert self.resq.pop(['pushq2', 'pushq1']) == ('pushq1', 'content-q1-2')
43+
assert self.redis.llen('resque:queue:pushq1') == 0
44+
assert self.redis.llen('resque:queue:pushq2') == 0
45+
assert self.resq.pop(['pushq1', 'pushq2'], timeout=1) == (None, None)
2946

3047
def test_peek(self):
3148
self.resq.enqueue(Basic,"test1")

0 commit comments

Comments
 (0)