Skip to content

Commit c7d2021

Browse files
committed
block when popping jobs in Worker
pairing with @tenderlove
1 parent 1d89dc6 commit c7d2021

File tree

2 files changed

+18
-5
lines changed

2 files changed

+18
-5
lines changed

lib/resque/worker.rb

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ def work(interval = 5.0, &block)
128128
loop do
129129
break if shutdown?
130130

131-
if not paused? and job = reserve
131+
if not paused? and job = reserve(interval)
132132
log "got: #{job.inspect}"
133133
job.worker = self
134134
run_hook :before_fork, job
@@ -148,9 +148,8 @@ def work(interval = 5.0, &block)
148148
@child = nil
149149
else
150150
break if interval.zero?
151-
log! "Sleeping for #{interval} seconds"
151+
log! "Timed out after #{interval} seconds"
152152
procline paused? ? "Paused" : "Waiting for #{@queues.join(',')}"
153-
sleep interval
154153
end
155154
end
156155

@@ -192,12 +191,18 @@ def perform(job)
192191

193192
# Attempts to grab a job off one of the provided queues. Returns
194193
# nil if no job can be found.
195-
def reserve
194+
def reserve(interval = 5.0)
195+
interval = interval.to_i
196196
multi_queue = MultiQueue.new(
197197
queues.map {|queue| Queue.new(queue, Resque.redis, Resque.coder) },
198198
Resque.redis)
199199

200-
queue, job = multi_queue.pop(true)
200+
if interval < 1
201+
queue, job = multi_queue.pop(true)
202+
else
203+
queue, job = multi_queue.poll(interval.to_i)
204+
end
205+
201206
log! "Found job on #{queue}"
202207
return Job.new(queue.name, job)
203208
rescue ThreadError

test/worker_test.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,14 @@ def self.exception
264264
assert_equal 3, @worker.processed
265265
end
266266

267+
it "reserve blocks when the queue is empty" do
268+
worker = Resque::Worker.new(:timeout)
269+
270+
assert_raises Timeout::Error do
271+
Timeout.timeout(1) { worker.reserve(5) }
272+
end
273+
end
274+
267275
it "keeps track of how many failures it has seen" do
268276
Resque::Job.create(:jobs, BadJob)
269277
Resque::Job.create(:jobs, BadJob)

0 commit comments

Comments
 (0)