Skip to content

Commit cd6d44f

Browse files
committed
Merge remote-tracking branch 'tenderlove/consumers' into consumers
2 parents c0823bf + 5fd389b commit cd6d44f

File tree

2 files changed

+35
-14
lines changed

2 files changed

+35
-14
lines changed

lib/resque/consumer.rb

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,23 @@
11
module Resque
22
class Consumer
33
class Latch # :nodoc:
4-
def initialize
5-
@mutex = Mutex.new
6-
@cond = ConditionVariable.new
4+
def initialize(count = 1)
5+
@count = count
6+
@lock = Monitor.new
7+
@cv = @lock.new_cond
78
end
89

910
def release
10-
@mutex.synchronize { @cond.broadcast }
11+
@lock.synchronize do
12+
@count -= 1 if @count > 0
13+
@cv.broadcast if @count.zero?
14+
end
1115
end
1216

1317
def await
14-
@mutex.synchronize { @cond.wait @mutex }
18+
@lock.synchronize do
19+
@cv.wait_while { @count > 0 }
20+
end
1521
end
1622
end
1723

@@ -57,7 +63,7 @@ def resume
5763
@paused = false
5864
@latch.release
5965
end
60-
66+
6167
def shutdown
6268
@should_shutdown = true
6369
end

test/consumer_test.rb

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,20 @@ def run
1313
self.class.ran << self
1414
end
1515
end
16-
16+
17+
class Resumer
18+
LATCHES = {}
19+
20+
def initialize(latch)
21+
@latch_id = latch.object_id
22+
LATCHES[@latch_id] = latch
23+
end
24+
25+
def run
26+
LATCHES[@latch_id].release
27+
end
28+
end
29+
1730
before do
1831
Actionable.ran.clear
1932
end
@@ -28,7 +41,7 @@ def run
2841
assert_raises Timeout::Error do
2942
Timeout.timeout(1) { c.consume }
3043
end
31-
44+
3245
assert_equal 1, Actionable.ran.length
3346
assert q.empty?
3447
end
@@ -59,17 +72,19 @@ def run
5972
q.pop until q.empty?
6073

6174
c = Consumer.new(q, 1)
75+
consumed = Consumer::Latch.new
6276

6377
c.pause
6478
t = Thread.new { c.consume }
65-
# wait until queue blocks
66-
sleep 1
79+
Thread.pass until c.paused?
6780

68-
q << Actionable.new
81+
# A job that unblocks the main thread
82+
q << Resumer.new(consumed)
6983
c.resume
70-
# wait until queue blocks
71-
Thread.pass until t.status != "sleep"
72-
sleep 1
84+
85+
# wait until consumed
86+
consumed.await
87+
7388
assert_equal 0, q.length, 'all jobs should be consumed'
7489
t.kill
7590
end

0 commit comments

Comments
 (0)