forked from resque/resque
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy paththreaded_consumer_pool_test.rb
More file actions
143 lines (115 loc) · 2.67 KB
/
threaded_consumer_pool_test.rb
File metadata and controls
143 lines (115 loc) · 2.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
require "test_helper"
module Resque
describe "ThreadedConsumerPool" do
class Actionable
@@ran = []
def self.ran
@@ran
end
def run
self.class.ran << self
end
end
class FailingJob
def run
raise 'fuuu'
end
end
class TermJob
LATCHES = {}
@@termed = []
def self.clear
@@termed = []
end
def self.termed
@@termed
end
attr_reader :latch_id
def initialize(latch)
@latch_id = latch.object_id
LATCHES[@latch_id] = latch
end
def == other
return super unless other.is_a?(TermJob)
@latch_id == other.latch_id
end
def run
begin
LATCHES[@latch_id].release
sleep
rescue Resque::TermException
@@termed << self
end
end
end
before do
@write = Queue.new(:foo)
@read = Queue.new(:foo, Resque.pool)
@tp = ThreadedConsumerPool.new(@read, 5)
TermJob.clear
end
it "processes work" do
Resque.consumer_timeout = 1
5.times { @write << Actionable.new }
@tp.start
sleep 1
@tp.stop
assert @write.empty?
end
it "recovers from blowed-up jobs" do
Resque.consumer_timeout = 1
@tp = ThreadedConsumerPool.new(@read, 1)
@write << FailingJob.new
@write << Actionable.new
@tp.start
sleep 1
@tp.stop
assert @write.empty?
@tp.join
end
it "terms the consumers" do
@tp = ThreadedConsumerPool.new(@read, 1)
latch = Consumer::Latch.new
job = TermJob.new latch
@write << job
@tp.start
latch.await # sleep until latch#release is called
@tp.term
@tp.stop
@tp.join
assert_equal job, TermJob.termed.first
assert @read.empty?
end
it "kills running jobs" do
@tp = ThreadedConsumerPool.new(@read, 1)
latch = Consumer::Latch.new
job = TermJob.new latch
@write << job
@tp.start
latch.await
@tp.kill
@tp.join
assert TermJob.termed.empty?
assert @read.empty?
end
it "pauses and resumes" do
paused = []
resumed = []
@tp = Class.new(ThreadedConsumerPool) {
define_method(:build_consumer) { |q|
super(q).extend(Module.new {
define_method(:pause) { paused << self; super() }
define_method(:resume) { resumed << self; super() }
})
}
}.new(@read, 1)
@tp.start
@tp.pause
assert_equal 1, paused.length
@tp.resume
assert_equal 1, resumed.length
@tp.stop
@tp.join
end
end
end