Skip to content

Commit 3c39af5

Browse files
committed
queues now use ConnectionPool
1 parent 46ce3a5 commit 3c39af5

File tree

6 files changed

+57
-55
lines changed

6 files changed

+57
-55
lines changed

lib/resque/multi_queue.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@ class MultiQueue
1111

1212
###
1313
# Create a new MultiQueue using the +queues+ from the +redis+ connection
14-
def initialize(queues, redis = Resque.redis)
14+
def initialize(queues, pool = Resque.pool)
1515
super()
1616

1717
@queues = queues # since ruby 1.8 doesn't have Ordered Hashes
1818
@queue_hash = {}
19-
@redis = redis
19+
@pool = pool
2020

2121
queues.each do |queue|
2222
@queue_hash[queue.redis_name] = queue
@@ -45,7 +45,7 @@ def pop(non_block = false)
4545
else
4646
queue_names = @queues.map {|queue| queue.redis_name }
4747
synchronize do
48-
value = @redis.blpop(*(queue_names + [1])) until value
48+
value = @pool.with_connection {|pool| pool.blpop(*(queue_names + [1])) } until value
4949
queue_name, payload = value
5050
queue = @queue_hash[queue_name]
5151
[queue, queue.decode(payload)]
@@ -59,7 +59,7 @@ def pop(non_block = false)
5959
# the timeout expires.
6060
def poll(timeout)
6161
queue_names = @queues.map {|queue| queue.redis_name }
62-
queue_name, payload = @redis.blpop(*(queue_names + [timeout]))
62+
queue_name, payload = @pool.with_connection {|pool| pool.blpop(*(queue_names + [timeout])) }
6363
return unless payload
6464

6565
synchronize do

lib/resque/queue.rb

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,26 @@ class Queue
1818
###
1919
# Create a new Queue object with +name+ on +redis+ connection, and using
2020
# the +coder+ for encoding and decoding objects that are stored in redis.
21-
def initialize name, redis = Resque.redis, coder = Marshal
21+
def initialize name, pool = Resque.pool, coder = Marshal
2222
super()
2323
@name = name
2424
@redis_name = "queue:#{@name}"
25-
@redis = redis
25+
@pool = pool
2626
@coder = coder
2727
@destroyed = false
2828

29-
@redis.sadd(:queues, @name)
29+
@pool.with_connection do |conn|
30+
conn.sadd(:queues, @name)
31+
end
3032
end
3133

3234
# Add +object+ to the queue
3335
# If trying to push to an already destroyed queue, it will raise a Resque::QueueDestroyed exception
3436
def push object
3537
raise QueueDestroyed if destroyed?
3638

37-
synchronize do
38-
@redis.rpush @redis_name, encode(object)
39+
@pool.with_connection do |conn|
40+
conn.rpush @redis_name, synchronize {encode(object) }
3941
end
4042
end
4143

@@ -47,13 +49,15 @@ def push object
4749
def slice start, length
4850
if length == 1
4951
synchronize do
50-
decode @redis.lindex @redis_name, start
52+
decode(@pool.with_connection {|conn| conn.lindex(@redis_name , start) })
5153
end
5254
else
53-
synchronize do
54-
Array(@redis.lrange(@redis_name, start, start + length - 1)).map do |item|
55-
decode item
55+
Array(
56+
@pool.with_connection do |conn|
57+
conn.lrange(@redis_name, start, start + length - 1)
5658
end
59+
).map do |item|
60+
synchronize {decode item }
5761
end
5862
end
5963
end
@@ -65,16 +69,12 @@ def slice start, length
6569
# pop, a ThreadError is raised.
6670
def pop non_block = false
6771
if non_block
68-
synchronize do
69-
value = @redis.lpop(@redis_name)
70-
raise ThreadError unless value
71-
decode value
72-
end
72+
value = @pool.with_connection {|pool| pool.lpop(@redis_name) }
73+
raise ThreadError unless value
74+
synchronize {decode value }
7375
else
74-
synchronize do
75-
value = @redis.blpop(@redis_name, 1) until value
76-
decode value.last
77-
end
76+
value = @pool.with_connection {|pool| pool.blpop(@redis_name, 1) } until value
77+
synchronize {decode value.last }
7878
end
7979
end
8080

@@ -83,7 +83,7 @@ def pop non_block = false
8383
# Blocks for +timeout+ seconds if the queue is empty, and returns nil if
8484
# the timeout expires.
8585
def poll(timeout)
86-
queue_name, payload = @redis.blpop(@redis_name, timeout)
86+
queue_name, payload = @pool.with_connection {|pool| pool.blpop(@redis_name, timeout) }
8787
return unless payload
8888

8989
synchronize do
@@ -93,7 +93,7 @@ def poll(timeout)
9393

9494
# Get the length of the queue
9595
def length
96-
@redis.llen @redis_name
96+
@pool.with_connection {|pool| pool.llen @redis_name }
9797
end
9898
alias :size :length
9999

@@ -109,8 +109,10 @@ def empty?
109109
# B and you delete Queue A, pushing to Queue B will have unknown side
110110
# effects. Queue A will be marked destroyed, but Queue B will not.
111111
def destroy
112-
@redis.del @redis_name
113-
@redis.srem(:queues, @name)
112+
@pool.with_connection do |conn|
113+
conn.del @redis_name
114+
conn.srem(:queues, @name)
115+
end
114116
@destroyed = true
115117
end
116118

lib/resque/worker.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,8 @@ def perform(job)
204204
def reserve(interval = 5.0)
205205
interval = interval.to_i
206206
multi_queue = MultiQueue.new(
207-
queues.map {|queue| Queue.new(queue, Resque.redis, Resque.coder) },
208-
Resque.redis)
207+
queues.map {|queue| Queue.new(queue, Resque.pool, Resque.coder) },
208+
Resque.pool)
209209

210210
if interval < 1
211211
begin

test/multi_queue_test.rb

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,24 @@
22

33
describe "Resque::MulitQueue" do
44
let(:redis) { Resque.redis }
5+
let(:pool) { Resque.pool }
56
let(:coder) { Resque::MultiJsonCoder.new }
67

78
before do
89
redis.flushall
910
end
1011

1112
it "poll times out and returns nil" do
12-
foo = Resque::Queue.new 'foo', redis
13-
bar = Resque::Queue.new 'bar', redis
14-
queue = Resque::MultiQueue.new([foo, bar], redis)
13+
foo = Resque::Queue.new 'foo', pool
14+
bar = Resque::Queue.new 'bar', pool
15+
queue = Resque::MultiQueue.new([foo, bar], pool)
1516
assert_nil queue.poll(1)
1617
end
1718

1819
it "blocks on pop" do
19-
foo = Resque::Queue.new 'foo', redis, coder
20-
bar = Resque::Queue.new 'bar', redis, coder
21-
queue = Resque::MultiQueue.new([foo, bar], redis)
20+
foo = Resque::Queue.new 'foo', pool, coder
21+
bar = Resque::Queue.new 'bar', pool, coder
22+
queue = Resque::MultiQueue.new([foo, bar], pool)
2223
t = Thread.new { queue.pop }
2324

2425
job = { 'class' => 'GoodJob', 'args' => [35, 'tar'] }
@@ -28,9 +29,9 @@
2829
end
2930

3031
it "nonblocking pop works" do
31-
foo = Resque::Queue.new 'foo', redis, coder
32-
bar = Resque::Queue.new 'bar', redis, coder
33-
queue = Resque::MultiQueue.new([foo, bar], redis)
32+
foo = Resque::Queue.new 'foo', pool, coder
33+
bar = Resque::Queue.new 'bar', pool, coder
34+
queue = Resque::MultiQueue.new([foo, bar], pool)
3435

3536
job = { 'class' => 'GoodJob', 'args' => [35, 'tar'] }
3637
bar << job
@@ -39,30 +40,30 @@
3940
end
4041

4142
it "nonblocking pop doesn't block" do
42-
foo = Resque::Queue.new 'foo', redis, coder
43-
bar = Resque::Queue.new 'bar', redis, coder
44-
queue = Resque::MultiQueue.new([foo, bar], redis)
43+
foo = Resque::Queue.new 'foo', pool, coder
44+
bar = Resque::Queue.new 'bar', pool, coder
45+
queue = Resque::MultiQueue.new([foo, bar], pool)
4546

4647
assert_raises ThreadError do
4748
queue.pop(true)
4849
end
4950
end
5051

5152
it "blocks forever on pop" do
52-
foo = Resque::Queue.new 'foo', redis, coder
53-
bar = Resque::Queue.new 'bar', redis, coder
54-
queue = Resque::MultiQueue.new([foo, bar], redis)
53+
foo = Resque::Queue.new 'foo', pool, coder
54+
bar = Resque::Queue.new 'bar', pool, coder
55+
queue = Resque::MultiQueue.new([foo, bar], pool)
5556
assert_raises Timeout::Error do
5657
Timeout::timeout(2) { queue.pop }
5758
end
5859
end
5960

6061
it "blocking pop processes queues in the order given" do
61-
foo = Resque::Queue.new 'foo', redis, coder
62-
bar = Resque::Queue.new 'bar', redis, coder
63-
baz = Resque::Queue.new 'baz', redis, coder
62+
foo = Resque::Queue.new 'foo', pool, coder
63+
bar = Resque::Queue.new 'bar', pool, coder
64+
baz = Resque::Queue.new 'baz', pool, coder
6465
queues = [foo, bar, baz]
65-
queue = Resque::MultiQueue.new(queues, redis)
66+
queue = Resque::MultiQueue.new(queues, pool)
6667
job = { 'class' => 'GoodJob', 'args' => [35, 'tar'] }
6768

6869
queues.each {|q| q << job }
@@ -76,11 +77,11 @@
7677
end
7778

7879
it "nonblocking pop processes queues in the order given" do
79-
foo = Resque::Queue.new 'foo', redis, coder
80-
bar = Resque::Queue.new 'bar', redis, coder
81-
baz = Resque::Queue.new 'baz', redis, coder
80+
foo = Resque::Queue.new 'foo', pool, coder
81+
bar = Resque::Queue.new 'bar', pool, coder
82+
baz = Resque::Queue.new 'baz', pool, coder
8283
queues = [foo, bar, baz]
83-
queue = Resque::MultiQueue.new(queues, redis)
84+
queue = Resque::MultiQueue.new(queues, pool)
8485
job = { 'class' => 'GoodJob', 'args' => [35, 'tar'] }
8586

8687
queues.each {|q| q << job }

test/redis_queue_test.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,10 +144,10 @@ def == other
144144
end
145145

146146
def q
147-
Resque::Queue.new 'foo', Resque.redis
147+
Resque::Queue.new 'foo'
148148
end
149149

150150
def pipe
151-
[Resque::Queue.new('foo', Resque.create_connection), Resque::Queue.new('foo', Resque.create_connection)]
151+
[Resque::Queue.new('foo'), Resque::Queue.new('foo')]
152152
end
153153
end

test/threaded_pool_test.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ def run
1818

1919
before do
2020
@write = Queue.new(:foo)
21-
conn = Resque.create_connection
22-
@read = Queue.new(:foo, conn)
21+
@read = Queue.new(:foo, Resque.pool)
2322
@tp = ThreadedPool.new(@read, 5)
2423
end
2524

0 commit comments

Comments
 (0)