forked from resque/resque
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsumer_test.rb
More file actions
151 lines (120 loc) · 3.02 KB
/
consumer_test.rb
File metadata and controls
151 lines (120 loc) · 3.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
require "test_helper"
module Resque
describe "Consumer" do
class Actionable
@@ran = []
def self.ran
@@ran
end
def run
self.class.ran << self
end
end
class FailingJob
def run
raise "fuuuu"
end
end
class Resumer
LATCHES = {}
def initialize(latch)
@latch_id = latch.object_id
LATCHES[@latch_id] = latch
end
def run
LATCHES[@latch_id].release
end
end
class Poison
CONSUMERS = {}
def initialize(consumer)
@consumer_id = consumer.object_id
CONSUMERS[@consumer_id] = consumer
end
def run
CONSUMERS[@consumer_id].shutdown
end
end
before do
Actionable.ran.clear
end
it "consumes jobs" do
q = Queue.new(:foo)
q << Actionable.new
c = Consumer.new(q)
# avoid using begin / rescue
assert_raises Timeout::Error do
Timeout.timeout(1) { c.consume }
end
assert_equal 1, Actionable.ran.length
assert q.empty?
end
it "pauses" do
q = Queue.new(:foo)
c = Consumer.new(q, 1)
t = Thread.new { c.consume }
# wait until queue blocks
Thread.pass until t.status == "sleep"
c.pause
sleep 2.1 # wait until poll times out
assert c.paused?
q << Actionable.new
# Wait until timeout seconds to make sure our job isn't
# consumed
sleep 2
assert_equal 1, q.length
q << Poison.new(c)
end
it "resumes" do
q = Queue.new(:foo)
q.pop until q.empty?
c = Consumer.new(q, 1)
consumed = Consumer::Latch.new
c.pause
t = Thread.new { c.consume }
Thread.pass until c.paused?
# A job that unblocks the main thread
q << Resumer.new(consumed)
c.resume
# wait until consumed
consumed.await
assert_equal 0, q.length, 'all jobs should be consumed'
q << Poison.new(c) # gracefully shutdown the consumer
end
it "shuts down" do
q = Queue.new(:foo)
c = Consumer.new(q, 1)
t = Thread.new { c.consume }
# wait until queue blocks
Thread.pass until t.status == "sleep"
c.shutdown
# sleep past the poll timeout
sleep 2
q << Actionable.new
# sleep past the poll timeout
sleep 2
assert_equal 1, q.length
assert c.shutdown?
q.pop until q.empty?
q << Consumer::POISON
end
it "shuts down with a poison object" do
q = Queue.new(:foo)
q << Consumer::POISON
q << Actionable.new
c = Consumer.new(q, 1)
t = Thread.new { c.consume }
Timeout.timeout(1) { sleep(0.1) while t.status }
assert_equal 1, q.length
assert_equal 0, Actionable.ran.length
end
it "gracefully handles a job failure" do
q = Queue.new(:foo)
c = Consumer.new(q, 1)
q << FailingJob.new
q << Poison.new(c)
c.consume
assert_equal 1, Stat["failed"], 'should have one fail'
end
end
end