Skip to content

Commit af82eed

Browse files
committed
raise Resque specific error to catch when a job is being termed.
This allows a Job class to rescue Resque::TermException to do quick/clean shutdown work. It inherits from SignalException since it's still a term that is causing this.
1 parent 13e64fa commit af82eed

File tree

3 files changed

+67
-60
lines changed

3 files changed

+67
-60
lines changed

lib/resque/errors.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,7 @@ class NoClassError < RuntimeError; end
77

88
# Raised when a worker was killed while processing a job.
99
class DirtyExit < RuntimeError; end
10+
11+
# Raised when child process is TERM'd so job can rescue this to do shutdown work.
12+
class TermException < SignalException; end
1013
end

lib/resque/worker.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ def register_signal_handlers
296296
end
297297

298298
def unregister_signal_handlers
299-
trap('TERM', 'DEFAULT')
299+
trap('TERM') { raise TermException.new("SIGTERM") }
300300
trap('INT', 'DEFAULT')
301301

302302
begin

test/worker_test.rb

Lines changed: 63 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -438,71 +438,75 @@ def self.perform
438438
assert_not_equal original_connection, Resque.redis.client.connection.instance_variable_get("@sock")
439439
end
440440

441-
{
442-
'cleanup occurs in allotted time' => nil,
443-
'cleanup takes too long' => 2
444-
}.each do |scenario,rescue_time|
445-
test "SIGTERM when #{scenario}" do
446-
begin
447-
class LongRunningJob
448-
@queue = :long_running_job
449-
def self.perform( run_time, rescue_time=nil )
450-
Resque.redis.client.reconnect # get its own connection
451-
Resque.redis.rpush( 'sigterm-test:start', Process.pid )
452-
sleep run_time
453-
Resque.redis.rpush( 'sigterm-test:result', 'Finished Normally' )
454-
rescue SignalException => e
455-
Resque.redis.rpush( 'sigterm-test:result', %Q(Caught SignalException: #{e.inspect}))
456-
sleep rescue_time unless rescue_time.nil?
457-
ensure
458-
Resque.redis.rpush( 'sigterm-test:final', 'exiting.' )
441+
[SignalException, Resque::TermException].each do |exception|
442+
{
443+
'cleanup occurs in allotted time' => nil,
444+
'cleanup takes too long' => 2
445+
}.each do |scenario,rescue_time|
446+
test "SIGTERM when #{scenario} while catching #{exception}" do
447+
begin
448+
eval("class LongRunningJob; @@exception = #{exception}; end")
449+
class LongRunningJob
450+
@queue = :long_running_job
451+
452+
def self.perform( run_time, rescue_time=nil )
453+
Resque.redis.client.reconnect # get its own connection
454+
Resque.redis.rpush( 'sigterm-test:start', Process.pid )
455+
sleep run_time
456+
Resque.redis.rpush( 'sigterm-test:result', 'Finished Normally' )
457+
rescue @@exception => e
458+
Resque.redis.rpush( 'sigterm-test:result', %Q(Caught SignalException: #{e.inspect}))
459+
sleep rescue_time unless rescue_time.nil?
460+
ensure
461+
Resque.redis.rpush( 'sigterm-test:final', 'exiting.' )
462+
end
459463
end
460-
end
461464

462-
Resque.enqueue( LongRunningJob, 5, rescue_time )
465+
Resque.enqueue( LongRunningJob, 5, rescue_time )
463466

464-
worker_pid = Kernel.fork do
465-
# ensure we actually fork
466-
$TESTING = false
467-
# reconnect since we just forked
468-
Resque.redis.client.reconnect
467+
worker_pid = Kernel.fork do
468+
# ensure we actually fork
469+
$TESTING = false
470+
# reconnect since we just forked
471+
Resque.redis.client.reconnect
469472

470-
worker = Resque::Worker.new(:long_running_job)
471-
worker.term_timeout = 1
472-
worker.term_child = 1
473+
worker = Resque::Worker.new(:long_running_job)
474+
worker.term_timeout = 1
475+
worker.term_child = 1
473476

474-
worker.work(0)
475-
exit!
476-
end
477+
worker.work(0)
478+
exit!
479+
end
477480

478-
# ensure the worker is started
479-
start_status = Resque.redis.blpop( 'sigterm-test:start', 5 )
480-
assert_not_nil start_status
481-
child_pid = start_status[1].to_i
482-
assert_operator child_pid, :>, 0
483-
484-
# send signal to abort the worker
485-
Process.kill('TERM', worker_pid)
486-
Process.waitpid(worker_pid)
487-
488-
# wait to see how it all came down
489-
result = Resque.redis.blpop( 'sigterm-test:result', 5 )
490-
assert_not_nil result
491-
assert !result[1].start_with?('Finished Normally'), 'Job Finished normally. Sleep not long enough?'
492-
assert result[1].start_with? 'Caught SignalException', 'Signal exception not raised in child.'
493-
494-
# ensure that the child pid is no longer running
495-
child_still_running = !(`ps -p #{child_pid.to_s} -o pid=`).empty?
496-
assert !child_still_running
497-
498-
# see if post-cleanup occurred. This should happen IFF the rescue_time is less than the term_timeout
499-
post_cleanup_occurred = Resque.redis.lpop( 'sigterm-test:final' )
500-
assert post_cleanup_occurred, 'post cleanup did not occur. SIGKILL sent too early?' if rescue_time.nil?
501-
assert !post_cleanup_occurred, 'post cleanup occurred. SIGKILL sent too late?' unless rescue_time.nil?
502-
503-
ensure
504-
remaining_keys = Resque.redis.keys('sigterm-test:*') || []
505-
Resque.redis.del(*remaining_keys) unless remaining_keys.empty?
481+
# ensure the worker is started
482+
start_status = Resque.redis.blpop( 'sigterm-test:start', 5 )
483+
assert_not_nil start_status
484+
child_pid = start_status[1].to_i
485+
assert_operator child_pid, :>, 0
486+
487+
# send signal to abort the worker
488+
Process.kill('TERM', worker_pid)
489+
Process.waitpid(worker_pid)
490+
491+
# wait to see how it all came down
492+
result = Resque.redis.blpop( 'sigterm-test:result', 5 )
493+
assert_not_nil result
494+
assert !result[1].start_with?('Finished Normally'), 'Job Finished normally. Sleep not long enough?'
495+
assert result[1].start_with? 'Caught SignalException', 'Signal exception not raised in child.'
496+
497+
# ensure that the child pid is no longer running
498+
child_still_running = !(`ps -p #{child_pid.to_s} -o pid=`).empty?
499+
assert !child_still_running
500+
501+
# see if post-cleanup occurred. This should happen IFF the rescue_time is less than the term_timeout
502+
post_cleanup_occurred = Resque.redis.lpop( 'sigterm-test:final' )
503+
assert post_cleanup_occurred, 'post cleanup did not occur. SIGKILL sent too early?' if rescue_time.nil?
504+
assert !post_cleanup_occurred, 'post cleanup occurred. SIGKILL sent too late?' unless rescue_time.nil?
505+
506+
ensure
507+
remaining_keys = Resque.redis.keys('sigterm-test:*') || []
508+
Resque.redis.del(*remaining_keys) unless remaining_keys.empty?
509+
end
506510
end
507511
end
508512
end

0 commit comments

Comments
 (0)