Skip to content

Commit ae414df

Browse files
committed
* global pop timeout
* ThreadedPool * add connection factory pairing with @tenderlove and @therealadam
1 parent 5144c70 commit ae414df

File tree

4 files changed

+90
-7
lines changed

4 files changed

+90
-7
lines changed

lib/resque.rb

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,17 @@
1717
require 'resque/coder'
1818
require 'resque/multi_json_coder'
1919
require 'resque/consumer'
20+
require 'resque/threaded_pool'
2021

2122
module Resque
2223
include Helpers
2324
extend self
2425

26+
@consumer_timeout = 5
27+
class << self
28+
attr_accessor :consumer_timeout
29+
end
30+
2531
# Accepts:
2632
# 1. A 'hostname:port' String
2733
# 2. A 'hostname:port:db' String (to select the Redis db)
@@ -30,6 +36,14 @@ module Resque
3036
# 5. An instance of `Redis`, `Redis::Client`, `Redis::DistRedis`,
3137
# or `Redis::Namespace`.
3238
def redis=(server)
39+
@server = server
40+
@redis = create_connection(server)
41+
@queues = Hash.new { |h,name|
42+
h[name] = Resque::Queue.new(name, @redis, coder)
43+
}
44+
end
45+
46+
def create_connection(server)
3347
case server
3448
when String
3549
if server =~ /redis\:\/\//
@@ -42,15 +56,16 @@ def redis=(server)
4256
end
4357
namespace ||= :resque
4458

45-
@redis = Redis::Namespace.new(namespace, :redis => redis)
59+
Redis::Namespace.new(namespace, :redis => redis)
4660
when Redis::Namespace
47-
@redis = server
61+
server
4862
else
49-
@redis = Redis::Namespace.new(:resque, :redis => server)
63+
Redis::Namespace.new(:resque, :redis => server)
5064
end
51-
@queues = Hash.new { |h,name|
52-
h[name] = Resque::Queue.new(name, @redis, coder)
53-
}
65+
end
66+
67+
def server
68+
@server
5469
end
5570

5671
# Encapsulation of encode/decode. Overwrite this to use it across Resque.

lib/resque/consumer.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def await
1515
end
1616
end
1717

18-
def initialize(queue, timeout = 5)
18+
def initialize(queue, timeout = Resque.consumer_timeout)
1919
@queue = queue
2020
@should_pause = false
2121
@should_shutdown = false

lib/resque/threaded_pool.rb

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
module Resque
2+
class ThreadedPool
3+
4+
def initialize(queue, pool)
5+
@queue = queue
6+
@pool = pool
7+
@threads = []
8+
@consumers = []
9+
end
10+
11+
12+
def start
13+
@consumers.clear
14+
@threads = @pool.times.map {
15+
c = Consumer.new(@queue)
16+
@consumers << c
17+
Thread.new { c.consume }
18+
}
19+
end
20+
21+
def stop
22+
@consumers.each { |c| c.shutdown }
23+
@threads.each { |t| t.join }
24+
end
25+
end
26+
end

test/threaded_pool_test.rb

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
require "test_helper"
2+
3+
Thread.abort_on_exception = true
4+
5+
module Resque
6+
7+
describe "ThreadedPool" do
8+
9+
class Actionable
10+
@@ran = []
11+
12+
def self.ran
13+
@@ran
14+
end
15+
16+
def run
17+
self.class.ran << self
18+
end
19+
end
20+
21+
before do
22+
@write = Queue.new(:foo)
23+
conn = Resque.create_connection(Resque.server)
24+
@read = Queue.new(:foo, conn)
25+
@tp = ThreadedPool.new(@read, 5)
26+
end
27+
28+
it "processes work" do
29+
Resque.consumer_timeout = 1
30+
5.times { @write << Actionable.new }
31+
@tp.start
32+
sleep 1
33+
@tp.stop
34+
assert @write.empty?
35+
end
36+
37+
it "recovers from blowed-up jobs" do
38+
skip
39+
end
40+
41+
end
42+
end

0 commit comments

Comments
 (0)