File tree Expand file tree Collapse file tree 2 files changed +37
-5
lines changed
Expand file tree Collapse file tree 2 files changed +37
-5
lines changed Original file line number Diff line number Diff line change @@ -16,17 +16,24 @@ def await
1616 end
1717
1818 def initialize ( queue , timeout = 5 )
19- @queue = queue
20- @should_pause = false
21- @paused = false
22- @timeout = timeout
23- @latch = Latch . new
19+ @queue = queue
20+ @should_pause = false
21+ @should_shutdown = false
22+ @paused = false
23+ @shutdown = false
24+ @timeout = timeout
25+ @latch = Latch . new
2426 end
2527
2628 def consume
2729 loop do
2830 suspend if @should_pause
2931
32+ if @should_shutdown
33+ @shutdown = true
34+ break
35+ end
36+
3037 queue , job = @queue . poll ( @timeout )
3138 next unless job
3239 job . run
@@ -41,13 +48,18 @@ def paused?
4148 @paused
4249 end
4350
51+ def shutdown?
52+ @shutdown
53+ end
54+
4455 def resume
4556 @should_pause = false
4657 @paused = false
4758 @latch . release
4859 end
4960
5061 def shutdown
62+ @should_shutdown = true
5163 end
5264
5365 private
Original file line number Diff line number Diff line change @@ -71,6 +71,26 @@ def run
7171 Thread . pass until t . status != "sleep"
7272 sleep 1
7373 assert_equal 0 , q . length , 'all jobs should be consumed'
74+ t . kill
75+ end
76+
77+ it "shuts down" do
78+ q = Queue . new ( :foo )
79+ c = Consumer . new ( q , 1 )
80+ t = Thread . new { c . consume }
81+ # wait until queue blocks
82+ Thread . pass until t . status == "sleep"
83+ c . shutdown
84+
85+ # sleep past the poll timeout
86+ sleep 2
87+ q << Actionable . new
88+ # sleep past the poll timeout
89+ sleep 2
90+ assert_equal 1 , q . length
91+ assert c . shutdown?
92+ q . pop until q . empty?
93+ t . kill
7494 end
7595 end
7696end
You can’t perform that action at this time.
0 commit comments