Skip to content

Commit d57db77

Browse files
committed
introduce a multi_queue object
1 parent 0ca46ce commit d57db77

File tree

4 files changed

+98
-1
lines changed

4 files changed

+98
-1
lines changed

lib/resque.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
require 'resque/worker'
1414
require 'resque/plugin'
1515
require 'resque/queue'
16+
require 'resque/multi_queue'
1617
require 'resque/coder'
1718
require 'resque/multi_json_coder'
1819

lib/resque/multi_queue.rb

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
require 'redis'
2+
require 'redis-namespace'
3+
require 'thread'
4+
require 'mutex_m'
5+
6+
module Resque
7+
###
8+
# Holds multiple queues, allowing you to pop the first available job
9+
class MultiQueue
10+
include Mutex_m
11+
12+
###
13+
# Create a new MultiQueue using the +queues+ from the +redis+ connection
14+
def initialize(queues, redis)
15+
super()
16+
17+
@queues = {}
18+
@redis = redis
19+
20+
queues.each do |queue|
21+
key = @redis.is_a?(Redis::Namespace) ? "#{@redis.namespace}:" : ""
22+
key += queue.redis_name
23+
@queues[key] = queue
24+
end
25+
end
26+
27+
# Pop an item off one of the queues. This method will block until an item
28+
# is available.
29+
#
30+
# Pass +true+ for a non-blocking pop. If nothing is read on a non-blocking
31+
# pop, a ThreadError is raised.
32+
def pop(non_block = false)
33+
if non_block
34+
synchronize do
35+
value = nil
36+
37+
@queues.values.each do |queue|
38+
begin
39+
return queue.pop(true)
40+
rescue ThreadError
41+
end
42+
end
43+
44+
raise ThreadError
45+
end
46+
else
47+
queue_names = @queues.values.map {|queue| queue.redis_name }
48+
synchronize do
49+
value = @redis.blpop(*(queue_names + [1])) until value
50+
queue_name, payload = value
51+
@queues[queue_name].coder.decode(payload)
52+
end
53+
end
54+
end
55+
end
56+
end

lib/resque/queue.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ module Resque
99
class Queue
1010
include Mutex_m
1111

12-
attr_reader :name, :redis_name
12+
attr_reader :name, :redis_name, :coder
1313

1414
###
1515
# Create a new Queue object with +name+ on +redis+ connection, and using

test/multi_queue_test.rb

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
require "test_helper"
2+
3+
describe "Resque::MulitQueue" do
4+
let(:redis) { Resque.redis }
5+
let(:coder) { Resque::MultiJsonCoder.new }
6+
7+
before do
8+
redis.flushall
9+
end
10+
11+
it "blocks on pop" do
12+
foo = Resque::Queue.new 'foo', redis, coder
13+
bar = Resque::Queue.new 'bar', redis, coder
14+
queue = Resque::MultiQueue.new([foo, bar], redis)
15+
t = Thread.new { queue.pop }
16+
17+
job = Resque::Job.new(:bar, :class => 'GoodJob')
18+
bar << job
19+
assert_equal coder.decode(job.to_json), t.join.value
20+
end
21+
22+
it "nonblocking pop works" do
23+
foo = Resque::Queue.new 'foo', redis, coder
24+
bar = Resque::Queue.new 'bar', redis, coder
25+
queue = Resque::MultiQueue.new([foo, bar], redis)
26+
27+
job = Resque::Job.new(:bar, :class => 'GoodJob')
28+
bar << job
29+
assert_equal coder.decode(job.to_json), queue.pop(true)
30+
end
31+
32+
it "blocks forever on pop" do
33+
foo = Resque::Queue.new 'foo', redis, coder
34+
bar = Resque::Queue.new 'bar', redis, coder
35+
queue = Resque::MultiQueue.new([foo, bar], redis)
36+
assert_raises Timeout::Error do
37+
Timeout::timeout(2) { queue.pop }
38+
end
39+
end
40+
end

0 commit comments

Comments
 (0)