Skip to content

Commit 25174ef

Browse files
committed
Merge pull request resque#693 from 37signals/1-x-stable
Backported fixes from 37s changes in master
2 parents 39aa466 + c014c52 commit 25174ef

File tree

3 files changed

+81
-15
lines changed

3 files changed

+81
-15
lines changed

lib/resque/worker.rb

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ def work(interval = 5.0, &block)
149149
else
150150
unregister_signal_handlers if will_fork? && term_child
151151
procline "Processing #{job.queue} since #{Time.now.to_i}"
152-
redis.client.reconnect # Don't share connection with parent
152+
reconnect
153153
perform(job, &block)
154154
exit! if will_fork?
155155
end
@@ -164,8 +164,9 @@ def work(interval = 5.0, &block)
164164
end
165165
end
166166

167-
ensure
168167
unregister_worker
168+
rescue Exception => exception
169+
unregister_worker(exception)
169170
end
170171

171172
# DEPRECATED. Processes a single job. If none is given, it will
@@ -218,6 +219,24 @@ def reserve
218219
raise e
219220
end
220221

222+
# Reconnect to Redis to avoid sharing a connection with the parent,
223+
# retry up to 3 times with increasing delay before giving up.
224+
def reconnect
225+
tries = 0
226+
begin
227+
redis.client.reconnect
228+
rescue Redis::BaseConnectionError
229+
if (tries += 1) <= 3
230+
log "Error reconnecting to Redis; retrying"
231+
sleep(tries)
232+
retry
233+
else
234+
log "Error reconnecting to Redis; quitting"
235+
raise
236+
end
237+
end
238+
end
239+
221240
# Returns a list of queues to use when searching for a job.
222241
# A splat ("*") means you want every queue (in alpha order) - this
223242
# can be useful for dynamically adding new queues.
@@ -425,15 +444,15 @@ def run_hook(name, *args)
425444
end
426445

427446
# Unregisters ourself as a worker. Useful when shutting down.
428-
def unregister_worker
447+
def unregister_worker(exception = nil)
429448
# If we're still processing a job, make sure it gets logged as a
430449
# failure.
431450
if (hash = processing) && !hash.empty?
432451
job = Job.new(hash['queue'], hash['payload'])
433452
# Ensure the proper worker is attached to this job, even if
434453
# it's not the precise instance that died.
435454
job.worker = self
436-
job.fail(DirtyExit.new)
455+
job.fail(exception || DirtyExit.new)
437456
end
438457

439458
redis.srem(:workers, self)

test/test_helper.rb

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -161,14 +161,18 @@ def now
161161
self.fake_time = nil
162162
end
163163

164-
def capture_stderr
165-
# The output stream must be an IO-like object. In this case we capture it in
166-
# an in-memory IO object so we can return the string value. You can assign any
167-
# IO object here.
168-
previous_stderr, $stderr = $stderr, StringIO.new
164+
# From minitest/unit
165+
def capture_io
166+
require 'stringio'
167+
168+
orig_stdout, orig_stderr = $stdout, $stderr
169+
captured_stdout, captured_stderr = StringIO.new, StringIO.new
170+
$stdout, $stderr = captured_stdout, captured_stderr
171+
169172
yield
170-
$stderr.string
173+
174+
return captured_stdout.string, captured_stderr.string
171175
ensure
172-
# Restore the previous value of stderr (typically equal to STDERR).
173-
$stderr = previous_stderr
176+
$stdout = orig_stdout
177+
$stderr = orig_stderr
174178
end

test/worker_test.rb

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,20 @@
3232
end
3333
end
3434

35-
test "fails uncompleted jobs on exit" do
35+
test "fails uncompleted jobs with DirtyExit by default on exit" do
3636
job = Resque::Job.new(:jobs, {'class' => 'GoodJob', 'args' => "blah"})
3737
@worker.working_on(job)
3838
@worker.unregister_worker
3939
assert_equal 1, Resque::Failure.count
40+
assert_equal('Resque::DirtyExit', Resque::Failure.all['exception'])
41+
end
42+
43+
test "fails uncompleted jobs with worker exception on exit" do
44+
job = Resque::Job.new(:jobs, {'class' => 'GoodJob', 'args' => "blah"})
45+
@worker.working_on(job)
46+
@worker.unregister_worker(StandardError.new)
47+
assert_equal 1, Resque::Failure.count
48+
assert_equal('StandardError', Resque::Failure.all['exception'])
4049
end
4150

4251
class ::SimpleJobWithFailureHandling
@@ -464,6 +473,40 @@ def self.perform
464473
assert_not_equal original_connection, Resque.redis.client.connection.instance_variable_get("@sock")
465474
end
466475

476+
test "tries to reconnect three times before giving up" do
477+
begin
478+
class Redis::Client
479+
alias_method :original_reconnect, :reconnect
480+
481+
def reconnect
482+
raise Redis::BaseConnectionError
483+
end
484+
end
485+
486+
class Resque::Worker
487+
alias_method :original_sleep, :sleep
488+
489+
def sleep(duration = nil)
490+
# noop
491+
end
492+
end
493+
494+
@worker.very_verbose = true
495+
stdout, stderr = capture_io { @worker.work(0) }
496+
497+
assert_equal 3, stdout.scan(/retrying/).count
498+
assert_equal 1, stdout.scan(/quitting/).count
499+
ensure
500+
class Redis::Client
501+
alias_method :reconnect, :original_reconnect
502+
end
503+
504+
class Resque::Worker
505+
alias_method :sleep, :original_sleep
506+
end
507+
end
508+
end
509+
467510
if !defined?(RUBY_ENGINE) || defined?(RUBY_ENGINE) && RUBY_ENGINE != "jruby"
468511
test "old signal handling is the default" do
469512
rescue_time = nil
@@ -599,14 +642,14 @@ def self.perform( run_time, rescue_time=nil )
599642
end
600643

601644
test "displays warning when not using term_child" do
602-
stderr = capture_stderr { @worker.work(0) }
645+
stdout, stderr = capture_io { @worker.work(0) }
603646

604647
assert stderr.match(/^WARNING:/)
605648
end
606649

607650
test "it does not display warning when using term_child" do
608651
@worker.term_child = "1"
609-
stderr = capture_stderr { @worker.work(0) }
652+
stdout, stderr = capture_io { @worker.work(0) }
610653

611654
assert !stderr.match(/^WARNING:/)
612655
end

0 commit comments

Comments
 (0)