Skip to content

Commit c449d9f

Browse files
committed
use the resque queue class to power the Rescue class methods
1 parent 779e5d3 commit c449d9f

File tree

3 files changed

+52
-7
lines changed

3 files changed

+52
-7
lines changed

lib/resque.rb

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
require 'resque/job'
1313
require 'resque/worker'
1414
require 'resque/plugin'
15+
require 'resque/queue'
1516

1617
module Resque
1718
include Helpers
@@ -43,6 +44,9 @@ def redis=(server)
4344
else
4445
@redis = Redis::Namespace.new(:resque, :redis => server)
4546
end
47+
@queues = Hash.new { |h,name|
48+
h[name] = Resque::Queue.new(name, @redis, self)
49+
}
4650
end
4751

4852
# Returns the current Redis connection. If none has been created, will
@@ -137,20 +141,24 @@ def to_s
137141
# Returns nothing
138142
def push(queue, item)
139143
watch_queue(queue)
140-
redis.rpush "queue:#{queue}", encode(item)
144+
@queues[queue] << item
141145
end
142146

143147
# Pops a job off a queue. Queue name should be a string.
144148
#
145149
# Returns a Ruby object.
146150
def pop(queue)
147-
decode redis.lpop("queue:#{queue}")
151+
begin
152+
@queues[queue].pop(true)
153+
rescue ThreadError
154+
nil
155+
end
148156
end
149157

150158
# Returns an integer representing the size of a queue.
151159
# Queue name should be a string.
152160
def size(queue)
153-
redis.llen("queue:#{queue}").to_i
161+
@queues[queue].size
154162
end
155163

156164
# Returns an array of items currently queued. Queue name should be
@@ -162,7 +170,7 @@ def size(queue)
162170
# To get the 3rd page of a 30 item, paginatied list one would use:
163171
# Resque.peek('my_list', 59, 30)
164172
def peek(queue, start = 0, count = 1)
165-
list_range("queue:#{queue}", start, count)
173+
@queues[queue].slice start, count
166174
end
167175

168176
# Does the dirty work of fetching a range of items from a Redis list
@@ -185,7 +193,7 @@ def queues
185193
# Given a queue name, completely deletes the queue.
186194
def remove_queue(queue)
187195
redis.srem(:queues, queue.to_s)
188-
redis.del("queue:#{queue}")
196+
@queues[queue].destroy
189197
end
190198

191199
# Used internally to keep track of which queues we've created.

lib/resque/helpers.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def redis
2121
def encode(object)
2222
::MultiJson.encode(object)
2323
end
24+
alias :dump :encode
2425

2526
# Given a string, returns a Ruby object.
2627
def decode(object)
@@ -32,6 +33,7 @@ def decode(object)
3233
raise DecodeException, e.message, e.backtrace
3334
end
3435
end
36+
alias :load :decode
3537

3638
# Given a word with dashes, returns a camel cased version of it.
3739
#

lib/resque/queue.rb

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,24 @@
44
require 'mutex_m'
55

66
module Resque
7+
###
8+
# A queue interface that quacks like Queue from Ruby's stdlib.
79
class Queue
810
include Mutex_m
911

1012
VERSION = '1.0.0'
1113

14+
###
15+
# Create a new Queue object with +name+ on +redis+ connection, and using
16+
# the +coder+ for encoding and decoding objects that are stored in redis.
1217
def initialize name, redis, coder = Marshal
1318
super()
1419
@name = "queue:#{name}"
1520
@redis = redis
1621
@coder = coder
1722
end
1823

24+
# Add +object+ to the queue
1925
def push object
2026
synchronize do
2127
@redis.rpush @name, encode(object)
@@ -25,30 +31,59 @@ def push object
2531
alias :<< :push
2632
alias :enq :push
2733

34+
# Returns a list of objects in the queue. This method is *not* available
35+
# on the stdlib Queue.
36+
def slice start, length
37+
if length == 1
38+
synchronize do
39+
decode @redis.lindex @name, start
40+
end
41+
else
42+
synchronize do
43+
Array(@redis.lrange(@name, start, start + length - 1)).map do |item|
44+
decode item
45+
end
46+
end
47+
end
48+
end
49+
50+
# Pop an item off the queue. This method will block until an item is
51+
# available.
52+
#
53+
# Pass +true+ for a non-blocking pop. If nothing is read on a non-blocking
54+
# pop, a ThreadError is raised.
2855
def pop non_block = false
2956
if non_block
3057
synchronize do
31-
value = @redis.rpop(@name)
58+
value = @redis.lpop(@name)
3259
raise ThreadError unless value
3360
decode value
3461
end
3562
else
3663
synchronize do
37-
value = @redis.brpop(@name, 1) until value
64+
value = @redis.blpop(@name, 1) until value
3865
decode value.last
3966
end
4067
end
4168
end
4269

70+
# Get the length of the queue
4371
def length
4472
@redis.llen @name
4573
end
4674
alias :size :length
4775

76+
# Is the queue empty?
4877
def empty?
4978
size == 0
5079
end
5180

81+
# Deletes this Queue from redis. This method is *not* available on the
82+
# stdlib Queue.
83+
def destroy
84+
@redis.del @name
85+
end
86+
5287
private
5388
def encode object
5489
@coder.dump object

0 commit comments

Comments
 (0)