Skip to content

Commit 950edfc

Browse files
author
Aaron Patterson and Terence Lee
committed
add kill / term to ThreadedConsumerPool
1 parent e88c176 commit 950edfc

File tree

2 files changed

+74
-0
lines changed

2 files changed

+74
-0
lines changed

lib/resque/threaded_consumer_pool.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,13 @@ def stop
2626
def join
2727
@threads.each { |t| t.join }
2828
end
29+
30+
def term
31+
@threads.each { |t| t.raise(TermException.new("SIGTERM")) }
32+
end
33+
34+
def kill
35+
@threads.each { |t| t.kill }
36+
end
2937
end
3038
end

test/threaded_consumer_pool_test.rb

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,46 @@ def run
2222
end
2323
end
2424

25+
class TermJob
26+
LATCHES = {}
27+
28+
@@termed = []
29+
30+
def self.clear
31+
@@termed = []
32+
end
33+
34+
def self.termed
35+
@@termed
36+
end
37+
38+
attr_reader :latch_id
39+
40+
def initialize(latch)
41+
@latch_id = latch.object_id
42+
LATCHES[@latch_id] = latch
43+
end
44+
45+
def == other
46+
return super unless other.is_a?(TermJob)
47+
@latch_id == other.latch_id
48+
end
49+
50+
def run
51+
begin
52+
LATCHES[@latch_id].release
53+
sleep
54+
rescue Resque::TermException
55+
@@termed << self
56+
end
57+
end
58+
end
59+
2560
before do
2661
@write = Queue.new(:foo)
2762
@read = Queue.new(:foo, Resque.pool)
2863
@tp = ThreadedConsumerPool.new(@read, 5)
64+
TermJob.clear
2965
end
3066

3167
after do
@@ -53,5 +89,35 @@ def run
5389
assert @write.empty?
5490
end
5591

92+
it "terms the consumers" do
93+
@tp = ThreadedConsumerPool.new(@read, 1)
94+
latch = Consumer::Latch.new
95+
job = TermJob.new latch
96+
@write << job
97+
98+
@tp.start
99+
latch.await # sleep until latch#release is called
100+
@tp.term
101+
@tp.stop
102+
@tp.join
103+
104+
assert_equal job, TermJob.termed.first
105+
assert @read.empty?
106+
end
107+
108+
it "kills running jobs" do
109+
@tp = ThreadedConsumerPool.new(@read, 1)
110+
latch = Consumer::Latch.new
111+
job = TermJob.new latch
112+
@write << job
113+
114+
@tp.start
115+
latch.await
116+
@tp.kill
117+
@tp.join
118+
119+
assert TermJob.termed.empty?
120+
assert @read.empty?
121+
end
56122
end
57123
end

0 commit comments

Comments
 (0)