Skip to content

Commit 335c8b6

Browse files
author
Aaron Patterson and Terence Lee
committed
don't need a separate redis connection for MultiQueue if we have a Redis connection pool
1 parent e14e9b6 commit 335c8b6

File tree

3 files changed

+18
-17
lines changed

3 files changed

+18
-17
lines changed

lib/resque/multi_queue.rb

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,18 @@ class MultiQueue
1111

1212
###
1313
# Create a new MultiQueue using the +queues+ from the +redis+ connection
14-
def initialize(queues, redis, pool = Resque.pool)
14+
def initialize(queues, pool = Resque.pool)
1515
super()
1616

1717
@queues = queues # since ruby 1.8 doesn't have Ordered Hashes
1818
@queue_hash = {}
19-
@redis = redis
2019
@pool = pool
20+
@pool.with_connection { |redis|
21+
@namespace = redis.is_a?(Redis::Namespace) ? redis.namespace : nil
22+
}
2123

2224
queues.each do |queue|
23-
key = @redis.is_a?(Redis::Namespace) ? "#{@redis.namespace}:" : ""
24-
key += queue.redis_name
25+
key = [@namespace, queue.redis_name].compact.join(':')
2526
@queue_hash[key] = queue
2627

2728
end
@@ -52,7 +53,7 @@ def pop(non_block = false)
5253
synchronize do
5354
value = @pool.with_connection {|pool| pool.blpop(*(queue_names + [:timeout => 1])) } until value
5455
queue_name, payload = value
55-
queue = @queue_hash["#{@redis.namespace}:#{queue_name}"]
56+
queue = @queue_hash["#{@namespace}:#{queue_name}"]
5657
[queue, queue.decode(payload)]
5758
end
5859
else
@@ -72,7 +73,8 @@ def poll(timeout)
7273
return unless payload
7374

7475
synchronize do
75-
queue = @queue_hash["#{@redis.namespace}:#{queue_name}"]
76+
key = [@namespace, queue_name].compact.join ':'
77+
queue = @queue_hash[key]
7678
[queue, queue.decode(payload)]
7779
end
7880
else

lib/resque/worker.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ def perform(job)
207207
def reserve(interval = 5.0)
208208
interval = interval.to_i
209209
multi_queue = MultiQueue.new(
210-
queues.map {|queue| Queue.new(queue, Resque.pool, Resque.coder) }, Resque.redis,
210+
queues.map {|queue| Queue.new(queue, Resque.pool, Resque.coder) },
211211
Resque.pool)
212212

213213
if interval < 1

test/multi_queue_test.rb

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,25 @@
11
require "test_helper"
22

33
describe "Resque::MultiQueue" do
4-
let(:redis) { Resque.redis }
54
let(:pool) { Resque.pool }
65
let(:coder) { Resque::JsonCoder.new }
76

87
it "poll times out and returns nil" do
98
foo = Resque::Queue.new 'foo', pool
109
bar = Resque::Queue.new 'bar', pool
11-
queue = Resque::MultiQueue.new([foo, bar], redis, pool)
10+
queue = Resque::MultiQueue.new([foo, bar], pool)
1211
assert_nil queue.poll(1)
1312
end
1413

1514
it "poll is a no-op when queues are empty" do
16-
queue = Resque::MultiQueue.new([], redis)
15+
queue = Resque::MultiQueue.new([])
1716
assert_nil queue.poll(1)
1817
end
1918

2019
it "blocks on pop" do
2120
foo = Resque::Queue.new 'foo', pool, coder
2221
bar = Resque::Queue.new 'bar', pool, coder
23-
queue = Resque::MultiQueue.new([foo, bar], redis, pool)
22+
queue = Resque::MultiQueue.new([foo, bar], pool)
2423
t = Thread.new { queue.pop }
2524

2625
job = { 'class' => 'GoodJob', 'args' => [35, 'tar'] }
@@ -32,7 +31,7 @@
3231
it "nonblocking pop works" do
3332
foo = Resque::Queue.new 'foo', pool, coder
3433
bar = Resque::Queue.new 'bar', pool, coder
35-
queue = Resque::MultiQueue.new([foo, bar], redis, pool)
34+
queue = Resque::MultiQueue.new([foo, bar], pool)
3635

3736
job = { 'class' => 'GoodJob', 'args' => [35, 'tar'] }
3837
bar << job
@@ -43,7 +42,7 @@
4342
it "nonblocking pop doesn't block" do
4443
foo = Resque::Queue.new 'foo', pool, coder
4544
bar = Resque::Queue.new 'bar', pool, coder
46-
queue = Resque::MultiQueue.new([foo, bar], redis, pool)
45+
queue = Resque::MultiQueue.new([foo, bar], pool)
4746

4847
assert_raises ThreadError do
4948
queue.pop(true)
@@ -53,7 +52,7 @@
5352
it "blocks forever on pop" do
5453
foo = Resque::Queue.new 'foo', pool, coder
5554
bar = Resque::Queue.new 'bar', pool, coder
56-
queue = Resque::MultiQueue.new([foo, bar], redis, pool)
55+
queue = Resque::MultiQueue.new([foo, bar], pool)
5756
assert_raises Timeout::Error do
5857
Timeout::timeout(2) { queue.pop }
5958
end
@@ -64,7 +63,7 @@
6463
bar = Resque::Queue.new 'bar', pool, coder
6564
baz = Resque::Queue.new 'baz', pool, coder
6665
queues = [foo, bar, baz]
67-
queue = Resque::MultiQueue.new(queues, redis, pool)
66+
queue = Resque::MultiQueue.new(queues, pool)
6867
job = { 'class' => 'GoodJob', 'args' => [35, 'tar'] }
6968

7069
queues.each {|q| q << job }
@@ -82,7 +81,7 @@
8281
bar = Resque::Queue.new 'bar', pool, coder
8382
baz = Resque::Queue.new 'baz', pool, coder
8483
queues = [foo, bar, baz]
85-
queue = Resque::MultiQueue.new(queues, redis, pool)
84+
queue = Resque::MultiQueue.new(queues, pool)
8685
job = { 'class' => 'GoodJob', 'args' => [35, 'tar'] }
8786

8887
queues.each {|q| q << job }
@@ -96,7 +95,7 @@
9695
end
9796

9897
it "blocking pop is a no-op if queues are empty" do
99-
queue = Resque::MultiQueue.new([], redis)
98+
queue = Resque::MultiQueue.new([])
10099
assert_raises Timeout::Error do
101100
Timeout.timeout(2) { queue.pop }
102101
end

0 commit comments

Comments
 (0)