Skip to content

Commit 0a8bead

Browse files
committed
Merge pull request resque#561 from tenderlove/queue
Implement a Resque::Queue class
2 parents 4dbddcb + 157e9c2 commit 0a8bead

File tree

4 files changed

+189
-5
lines changed

4 files changed

+189
-5
lines changed

lib/resque.rb

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
require 'resque/job'
1313
require 'resque/worker'
1414
require 'resque/plugin'
15+
require 'resque/queue'
1516

1617
module Resque
1718
include Helpers
@@ -43,6 +44,9 @@ def redis=(server)
4344
else
4445
@redis = Redis::Namespace.new(:resque, :redis => server)
4546
end
47+
@queues = Hash.new { |h,name|
48+
h[name] = Resque::Queue.new(name, @redis, self)
49+
}
4650
end
4751

4852
# Returns the current Redis connection. If none has been created, will
@@ -137,20 +141,24 @@ def to_s
137141
# Returns nothing
138142
def push(queue, item)
139143
watch_queue(queue)
140-
redis.rpush "queue:#{queue}", encode(item)
144+
@queues[queue] << item
141145
end
142146

143147
# Pops a job off a queue. Queue name should be a string.
144148
#
145149
# Returns a Ruby object.
146150
def pop(queue)
147-
decode redis.lpop("queue:#{queue}")
151+
begin
152+
@queues[queue].pop(true)
153+
rescue ThreadError
154+
nil
155+
end
148156
end
149157

150158
# Returns an integer representing the size of a queue.
151159
# Queue name should be a string.
152160
def size(queue)
153-
redis.llen("queue:#{queue}").to_i
161+
@queues[queue].size
154162
end
155163

156164
# Returns an array of items currently queued. Queue name should be
@@ -162,7 +170,7 @@ def size(queue)
162170
# To get the 3rd page of a 30 item, paginatied list one would use:
163171
# Resque.peek('my_list', 59, 30)
164172
def peek(queue, start = 0, count = 1)
165-
list_range("queue:#{queue}", start, count)
173+
@queues[queue].slice start, count
166174
end
167175

168176
# Does the dirty work of fetching a range of items from a Redis list
@@ -185,7 +193,7 @@ def queues
185193
# Given a queue name, completely deletes the queue.
186194
def remove_queue(queue)
187195
redis.srem(:queues, queue.to_s)
188-
redis.del("queue:#{queue}")
196+
@queues[queue].destroy
189197
end
190198

191199
# Used internally to keep track of which queues we've created.

lib/resque/helpers.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def redis
2121
def encode(object)
2222
::MultiJson.encode(object)
2323
end
24+
alias :dump :encode
2425

2526
# Given a string, returns a Ruby object.
2627
def decode(object)
@@ -32,6 +33,7 @@ def decode(object)
3233
raise DecodeException, e.message, e.backtrace
3334
end
3435
end
36+
alias :load :decode
3537

3638
# Given a word with dashes, returns a camel cased version of it.
3739
#

lib/resque/queue.rb

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
require 'redis'
2+
require 'redis-namespace'
3+
require 'thread'
4+
require 'mutex_m'
5+
6+
module Resque
7+
###
8+
# A queue interface that quacks like Queue from Ruby's stdlib.
9+
class Queue
10+
include Mutex_m
11+
12+
###
13+
# Create a new Queue object with +name+ on +redis+ connection, and using
14+
# the +coder+ for encoding and decoding objects that are stored in redis.
15+
def initialize name, redis, coder = Marshal
16+
super()
17+
@name = "queue:#{name}"
18+
@redis = redis
19+
@coder = coder
20+
end
21+
22+
# Add +object+ to the queue
23+
def push object
24+
synchronize do
25+
@redis.rpush @name, encode(object)
26+
end
27+
end
28+
29+
alias :<< :push
30+
alias :enq :push
31+
32+
# Returns a list of objects in the queue. This method is *not* available
33+
# on the stdlib Queue.
34+
def slice start, length
35+
if length == 1
36+
synchronize do
37+
decode @redis.lindex @name, start
38+
end
39+
else
40+
synchronize do
41+
Array(@redis.lrange(@name, start, start + length - 1)).map do |item|
42+
decode item
43+
end
44+
end
45+
end
46+
end
47+
48+
# Pop an item off the queue. This method will block until an item is
49+
# available.
50+
#
51+
# Pass +true+ for a non-blocking pop. If nothing is read on a non-blocking
52+
# pop, a ThreadError is raised.
53+
def pop non_block = false
54+
if non_block
55+
synchronize do
56+
value = @redis.lpop(@name)
57+
raise ThreadError unless value
58+
decode value
59+
end
60+
else
61+
synchronize do
62+
value = @redis.blpop(@name, 1) until value
63+
decode value.last
64+
end
65+
end
66+
end
67+
68+
# Get the length of the queue
69+
def length
70+
@redis.llen @name
71+
end
72+
alias :size :length
73+
74+
# Is the queue empty?
75+
def empty?
76+
size == 0
77+
end
78+
79+
# Deletes this Queue from redis. This method is *not* available on the
80+
# stdlib Queue.
81+
def destroy
82+
@redis.del @name
83+
end
84+
85+
private
86+
def encode object
87+
@coder.dump object
88+
end
89+
90+
def decode object
91+
@coder.load object
92+
end
93+
end
94+
end

test/redis_queue_test.rb

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
require 'test_helper'
2+
require 'resque/queue'
3+
4+
module Resque
5+
class TestQueue < MiniTest::Unit::TestCase
6+
class Thing
7+
attr_reader :inside
8+
9+
def initialize
10+
@inside = "x"
11+
end
12+
13+
def == other
14+
super || @inside == other.inside
15+
end
16+
end
17+
18+
def test_sanity
19+
queue = q
20+
x = Thing.new
21+
queue.push x
22+
assert_equal x, queue.pop
23+
end
24+
25+
def test_pop_blocks
26+
queue1 = q
27+
queue2 = q
28+
29+
t = Thread.new { queue1.pop }
30+
x = Thing.new
31+
32+
queue2.push x
33+
assert_equal x, t.join.value
34+
end
35+
36+
def test_nonblock_pop
37+
queue1 = q
38+
39+
assert_raises ThreadError do
40+
queue1.pop(true)
41+
end
42+
end
43+
44+
def test_pop_blocks_forever
45+
queue1 = q
46+
assert_raises Timeout::Error do
47+
Timeout.timeout(2) { queue1.pop }
48+
end
49+
end
50+
51+
def test_size
52+
queue = q
53+
assert_equal 0, queue.size
54+
55+
queue << Thing.new
56+
assert_equal 1, queue.size
57+
ensure
58+
queue.pop
59+
end
60+
61+
def test_empty?
62+
queue = q
63+
assert queue.empty?
64+
65+
queue << Thing.new
66+
refute queue.empty?
67+
ensure
68+
queue.pop
69+
end
70+
71+
def q
72+
Queue.new 'foo', backend
73+
end
74+
75+
def backend
76+
redis = Redis.new(:host => "127.0.0.1", :port => 9736)
77+
Redis::Namespace.new :resque, :redis => redis
78+
end
79+
end
80+
end

0 commit comments

Comments
 (0)