File tree Expand file tree Collapse file tree 2 files changed +20
-5
lines changed
lib/resque/plugins/round_robin Expand file tree Collapse file tree 2 files changed +20
-5
lines changed Original file line number Diff line number Diff line change @@ -9,20 +9,35 @@ def filter_busy_queues qs
9
9
def rotated_queues
10
10
@n ||= 0
11
11
@n += 1
12
- rot_queues = queues
12
+ rot_queues = queues # since we rely on the resque-dynamic-queues plugin, this is all the queues, expanded out
13
13
if rot_queues . size > 0
14
- rot_queues . rotate ( @n % rot_queues . size )
14
+ @n = @n % rot_queues . size
15
+ rot_queues . rotate ( @n )
15
16
else
16
17
rot_queues
17
18
end
18
19
end
19
20
21
+ def queue_depth queuename
22
+ busy_queues = Resque ::Worker . working . map { |worker | worker . job [ "queue" ] } . compact
23
+ # find the queuename, count it.
24
+ busy_queues . select { |q | q == queuename } . size
25
+ end
26
+
27
+ def should_work_on_queue? queuename
28
+ return true if @queues . include? '*' # workers with QUEUES=* are special and are not subject to queue depth setting
29
+ max = 1 # by default, workers are limited to 1 per queue
30
+ max = ENV [ "RESQUE_QUEUE_DEPTH" ] . to_i if ENV [ "RESQUE_QUEUE_DEPTH" ] . present?
31
+ return true if max == 0 # 0 means no limiting
32
+ return true if queue_depth ( queuename ) < max
33
+ false
34
+ end
35
+
20
36
def reserve_with_round_robin
21
37
qs = rotated_queues
22
- qs = filter_busy_queues qs
23
38
qs . each do |queue |
24
39
log! "Checking #{ queue } "
25
- if job = Resque ::Job . reserve ( queue )
40
+ if should_work_on_queue? ( queue ) && job = Resque ::Job . reserve ( queue )
26
41
log! "Found job on #{ queue } "
27
42
return job
28
43
end
Original file line number Diff line number Diff line change 1
1
module Resque
2
2
module Plugins
3
3
module RoundRobin
4
- VERSION = "0.0.2 "
4
+ VERSION = "0.1.0 "
5
5
end
6
6
end
7
7
end
You can’t perform that action at this time.
0 commit comments