Skip to content

Commit 779e5d3

Browse files
committed
introduce a queue object
1 parent 864bdea commit 779e5d3

File tree

2 files changed

+141
-0
lines changed

2 files changed

+141
-0
lines changed

lib/resque/queue.rb

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
require 'redis'
2+
require 'redis-namespace'
3+
require 'thread'
4+
require 'mutex_m'
5+
6+
module Resque
7+
class Queue
8+
include Mutex_m
9+
10+
VERSION = '1.0.0'
11+
12+
def initialize name, redis, coder = Marshal
13+
super()
14+
@name = "queue:#{name}"
15+
@redis = redis
16+
@coder = coder
17+
end
18+
19+
def push object
20+
synchronize do
21+
@redis.rpush @name, encode(object)
22+
end
23+
end
24+
25+
alias :<< :push
26+
alias :enq :push
27+
28+
def pop non_block = false
29+
if non_block
30+
synchronize do
31+
value = @redis.rpop(@name)
32+
raise ThreadError unless value
33+
decode value
34+
end
35+
else
36+
synchronize do
37+
value = @redis.brpop(@name, 1) until value
38+
decode value.last
39+
end
40+
end
41+
end
42+
43+
def length
44+
@redis.llen @name
45+
end
46+
alias :size :length
47+
48+
def empty?
49+
size == 0
50+
end
51+
52+
private
53+
def encode object
54+
@coder.dump object
55+
end
56+
57+
def decode object
58+
@coder.load object
59+
end
60+
end
61+
end

test/redis_queue_test.rb

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
require 'test_helper'
2+
require 'resque/queue'
3+
4+
module Resque
5+
class TestQueue < MiniTest::Unit::TestCase
6+
class Thing
7+
attr_reader :inside
8+
9+
def initialize
10+
@inside = "x"
11+
end
12+
13+
def == other
14+
super || @inside == other.inside
15+
end
16+
end
17+
18+
def test_sanity
19+
queue = q
20+
x = Thing.new
21+
queue.push x
22+
assert_equal x, queue.pop
23+
end
24+
25+
def test_pop_blocks
26+
queue1 = q
27+
queue2 = q
28+
29+
t = Thread.new { queue1.pop }
30+
x = Thing.new
31+
32+
queue2.push x
33+
assert_equal x, t.join.value
34+
end
35+
36+
def test_nonblock_pop
37+
queue1 = q
38+
39+
assert_raises ThreadError do
40+
queue1.pop(true)
41+
end
42+
end
43+
44+
def test_pop_blocks_forever
45+
queue1 = q
46+
assert_raises Timeout::Error do
47+
Timeout.timeout(2) { queue1.pop }
48+
end
49+
end
50+
51+
def test_size
52+
queue = q
53+
assert_equal 0, queue.size
54+
55+
queue << Thing.new
56+
assert_equal 1, queue.size
57+
ensure
58+
queue.pop
59+
end
60+
61+
def test_empty?
62+
queue = q
63+
assert queue.empty?
64+
65+
queue << Thing.new
66+
refute queue.empty?
67+
ensure
68+
queue.pop
69+
end
70+
71+
def q
72+
Queue.new 'foo', backend
73+
end
74+
75+
def backend
76+
redis = Redis.new(:host => "127.0.0.1", :port => 9736)
77+
Redis::Namespace.new :resque, :redis => redis
78+
end
79+
end
80+
end

0 commit comments

Comments
 (0)