Skip to content

Commit 95e65ab

Browse files
author
Aaron Patterson and Terence Lee
committed
add pause and resume to ThreadedConsumerPool
1 parent 950edfc commit 95e65ab

File tree

2 files changed

+39
-7
lines changed

2 files changed

+39
-7
lines changed

lib/resque/threaded_consumer_pool.rb

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
11
module Resque
22
class ThreadedConsumerPool
3-
43
def initialize(queue, size)
54
@queue = queue
65
@size = size
76
@threads = []
87
@consumers = []
98
end
109

11-
1210
def start
1311
stop
1412
@consumers.clear
1513
@threads = @size.times.map {
16-
c = Consumer.new(@queue)
14+
c = build_consumer @queue
1715
@consumers << c
1816
Thread.new { c.consume }
1917
}
@@ -34,5 +32,19 @@ def term
3432
def kill
3533
@threads.each { |t| t.kill }
3634
end
35+
36+
def pause
37+
@consumers.each { |c| c.pause }
38+
end
39+
40+
def resume
41+
@consumers.each { |c| c.resume }
42+
end
43+
44+
private
45+
46+
def build_consumer(queue)
47+
Consumer.new queue
48+
end
3749
end
3850
end

test/threaded_consumer_pool_test.rb

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,6 @@ def run
6464
TermJob.clear
6565
end
6666

67-
after do
68-
@tp.join
69-
end
70-
7167
it "processes work" do
7268
Resque.consumer_timeout = 1
7369
5.times { @write << Actionable.new }
@@ -87,6 +83,7 @@ def run
8783
sleep 1
8884
@tp.stop
8985
assert @write.empty?
86+
@tp.join
9087
end
9188

9289
it "terms the consumers" do
@@ -119,5 +116,28 @@ def run
119116
assert TermJob.termed.empty?
120117
assert @read.empty?
121118
end
119+
120+
it "pauses and resumes" do
121+
paused = []
122+
resumed = []
123+
124+
@tp = Class.new(ThreadedConsumerPool) {
125+
define_method(:build_consumer) { |q|
126+
super(q).extend(Module.new {
127+
define_method(:pause) { paused << self; super() }
128+
define_method(:resume) { resumed << self; super() }
129+
})
130+
}
131+
}.new(@read, 1)
132+
133+
@tp.start
134+
@tp.pause
135+
assert_equal 1, paused.length
136+
137+
@tp.resume
138+
assert_equal 1, resumed.length
139+
@tp.stop
140+
@tp.join
141+
end
122142
end
123143
end

0 commit comments

Comments
 (0)