forked from resque/resque
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathresque.rb
More file actions
491 lines (421 loc) · 13.5 KB
/
resque.rb
File metadata and controls
491 lines (421 loc) · 13.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
require 'logger'
require 'redis/namespace'
require 'resque/version'
require 'resque/errors'
require 'resque/failure'
require 'resque/failure/base'
require 'resque/helpers'
require 'resque/stat'
require 'resque/logging'
require 'resque/job'
require 'resque/worker'
require 'resque/plugin'
require 'resque/queue'
require 'resque/multi_queue'
require 'resque/coder'
require 'resque/json_coder'
require 'resque/vendor/utf8_util'
module Resque
include Helpers
extend self
# Accepts:
# 1. A 'hostname:port' String
# 2. A 'hostname:port:db' String (to select the Redis db)
# 3. A 'hostname:port/namespace' String (to set the Redis namespace)
# 4. A Redis URL String 'redis://host:port'
# 5. An instance of `Redis`, `Redis::Client`, `Redis::DistRedis`,
# or `Redis::Namespace`.
def redis=(server)
case server
when String
if server['redis://']
redis = Redis.connect(:url => server, :thread_safe => true)
else
server, namespace = server.split('/', 2)
host, port, db = server.split(':')
redis = Redis.new(:host => host, :port => port,
:thread_safe => true, :db => db)
end
namespace ||= :resque
@redis = Redis::Namespace.new(namespace, :redis => redis)
when Redis::Namespace
@redis = server
else
@redis = Redis::Namespace.new(:resque, :redis => server)
end
@queues = Hash.new { |h,name|
h[name] = Resque::Queue.new(name, @redis, coder)
}
end
# Encapsulation of encode/decode. Overwrite this to use it across Resque.
# This defaults to JSON for backwards compatibility.
def coder
@coder ||= JsonCoder.new
end
attr_writer :coder
# Returns the current Redis connection. If none has been created, will
# create a new one.
def redis
return @redis if @redis
self.redis = Redis.respond_to?(:connect) ? Redis.connect(:thread_safe => true) : "localhost:6379"
self.redis
end
def redis_id
# support 1.x versions of redis-rb
if redis.respond_to?(:server)
redis.server
elsif redis.respond_to?(:nodes) # distributed
redis.nodes.map { |n| n.id }.join(', ')
else
redis.client.id
end
end
# Set or retrieve the current logger object
attr_accessor :logger
# The `before_first_fork` hook will be run in the **parent** process
# only once, before forking to run the first job. Be careful- any
# changes you make will be permanent for the lifespan of the
# worker.
#
# Call with a block to register a hook.
# Call with no arguments to return all registered hooks.
def before_first_fork(&block)
block ? register_hook(:before_first_fork, block) : hooks(:before_first_fork)
end
# Register a before_first_fork proc.
def before_first_fork=(block)
register_hook(:before_first_fork, block)
end
# The `before_fork` hook will be run in the **parent** process
# before every job, so be careful- any changes you make will be
# permanent for the lifespan of the worker.
#
# Call with a block to register a hook.
# Call with no arguments to return all registered hooks.
def before_fork(&block)
block ? register_hook(:before_fork, block) : hooks(:before_fork)
end
# Register a before_fork proc.
def before_fork=(block)
register_hook(:before_fork, block)
end
# The `after_fork` hook will be run in the child process and is passed
# the current job. Any changes you make, therefore, will only live as
# long as the job currently being processed.
#
# Call with a block to register a hook.
# Call with no arguments to return all registered hooks.
def after_fork(&block)
block ? register_hook(:after_fork, block) : hooks(:after_fork)
end
# Register an after_fork proc.
def after_fork=(block)
register_hook(:after_fork, block)
end
# The `before_pause` hook will be run in the parent process before the
# worker has paused processing (via #pause_processing or SIGUSR2).
def before_pause(&block)
block ? register_hook(:before_pause, block) : hooks(:before_pause)
end
# Register a before_pause proc.
def before_pause=(block)
register_hook(:before_pause, block)
end
# The `after_pause` hook will be run in the parent process after the
# worker has paused (via SIGCONT).
def after_pause(&block)
block ? register_hook(:after_pause, block) : hooks(:after_pause)
end
# Register an after_pause proc.
def after_pause=(block)
register_hook(:after_pause, block)
end
# The `before_perform` hook will be run in the child process before
# the job code is performed. This hook will run before any
# Job.before_perform hook.
#
# Call with a block to register a hook.
# Call with no arguments to return all registered hooks.
def before_perform(&block)
block ? register_hook(:before_perform, block) : hooks(:before_perform)
end
# Register an before_perform proc.
def before_perform=(block)
register_hook(:before_perform, block)
end
# The `after_perform` hook will be run in the child process after
# the job code has performed. This hook will run after any
# Job.after_perform hook.
#
# Call with a block to register a hook.
# Call with no arguments to return all registered hooks.
def after_perform(&block)
block ? register_hook(:after_perform, block) : hooks(:after_perform)
end
# Register an after_perform proc.
def after_perform=(block)
register_hook(:after_perform, block)
end
def to_s
"Resque Client connected to #{redis_id}"
end
attr_accessor :inline
# If 'inline' is true Resque will call #perform method inline
# without queuing it into Redis and without any Resque callbacks.
# The 'inline' is false Resque jobs will be put in queue regularly.
alias :inline? :inline
#
# queue manipulation
#
# Pushes a job onto a queue. Queue name should be a string and the
# item should be any JSON-able Ruby object.
#
# Resque workers generally expect the `item` to be a hash with the following
# keys:
#
# class - The String name of the job to run.
# args - An Array of arguments to pass the job. Usually passed
# via `class.to_class.perform(*args)`.
#
# Example
#
# Resque.push('archive', 'class' => 'Archive', 'args' => [ 35, 'tar' ])
#
# Returns nothing
def push(queue, item)
queue(queue) << item
end
# Pops a job off a queue. Queue name should be a string.
#
# Returns a Ruby object.
def pop(queue)
begin
queue(queue).pop(true)
rescue ThreadError
nil
end
end
# Returns an integer representing the size of a queue.
# Queue name should be a string.
def size(queue)
queue(queue).size
end
# Returns an array of items currently queued. Queue name should be
# a string.
#
# start and count should be integer and can be used for pagination.
# start is the item to begin, count is how many items to return.
#
# To get the 3rd page of a 30 item, paginated list one would use:
# Resque.peek('my_list', 59, 30)
def peek(queue, start = 0, count = 1)
result = queue(queue).slice(start, count)
if result.nil?
[]
elsif result.respond_to?(:to_ary)
result.to_ary || [result]
else
[result]
end
end
# Does the dirty work of fetching a range of items from a Redis list
# and converting them into Ruby objects.
def list_range(key, start = 0, count = 1)
if count == 1
decode redis.lindex(key, start)
else
Array(redis.lrange(key, start, start+count-1)).map do |item|
decode item
end
end
end
# Returns an array of all known Resque queues as strings.
def queues
Array(redis.smembers(:queues))
end
# Given a queue name, completely deletes the queue.
def remove_queue(queue)
queue(queue).destroy
@queues.delete(queue.to_s)
end
# Return the Resque::Queue object for a given name
def queue(name)
@queues[name.to_s]
end
#
# job shortcuts
#
# This method can be used to conveniently add a job to a queue.
# It assumes the class you're passing it is a real Ruby class (not
# a string or reference) which either:
#
# a) has a @queue ivar set
# b) responds to `queue`
#
# If either of those conditions are met, it will use the value obtained
# from performing one of the above operations to determine the queue.
#
# If no queue can be inferred this method will raise a `Resque::NoQueueError`
#
# Returns true if the job was queued, nil if the job was rejected by a
# before_enqueue hook.
#
# This method is considered part of the `stable` API.
def enqueue(klass, *args)
enqueue_to(queue_from_class(klass), klass, *args)
end
# Just like `enqueue` but allows you to specify the queue you want to
# use. Runs hooks.
#
# `queue` should be the String name of the queue you're targeting.
#
# Returns true if the job was queued, nil if the job was rejected by a
# before_enqueue hook.
#
# This method is considered part of the `stable` API.
def enqueue_to(queue, klass, *args)
validate(klass, queue)
# Perform before_enqueue hooks. Don't perform enqueue if any hook returns false
before_hooks = Plugin.before_enqueue_hooks(klass).collect do |hook|
klass.send(hook, *args)
end
return nil if before_hooks.any? { |result| result == false }
Job.create(queue, klass, *args)
Plugin.after_enqueue_hooks(klass).each do |hook|
klass.send(hook, *args)
end
return true
end
# This method can be used to conveniently remove a job from a queue.
# It assumes the class you're passing it is a real Ruby class (not
# a string or reference) which either:
#
# a) has a @queue ivar set
# b) responds to `queue`
#
# If either of those conditions are met, it will use the value obtained
# from performing one of the above operations to determine the queue.
#
# If no queue can be inferred this method will raise a `Resque::NoQueueError`
#
# If no args are given, this method will dequeue *all* jobs matching
# the provided class. See `Resque::Job.destroy` for more
# information.
#
# Returns the number of jobs destroyed.
#
# Example:
#
# # Removes all jobs of class `UpdateNetworkGraph`
# Resque.dequeue(GitHub::Jobs::UpdateNetworkGraph)
#
# # Removes all jobs of class `UpdateNetworkGraph` with matching args.
# Resque.dequeue(GitHub::Jobs::UpdateNetworkGraph, 'repo:135325')
#
# This method is considered part of the `stable` API.
def dequeue(klass, *args)
# Perform before_dequeue hooks. Don't perform dequeue if any hook returns false
before_hooks = Plugin.before_dequeue_hooks(klass).collect do |hook|
klass.send(hook, *args)
end
return if before_hooks.any? { |result| result == false }
destroyed = Job.destroy(queue_from_class(klass), klass, *args)
Plugin.after_dequeue_hooks(klass).each do |hook|
klass.send(hook, *args)
end
destroyed
end
# Given a class, try to extrapolate an appropriate queue based on a
# class instance variable or `queue` method.
def queue_from_class(klass)
klass.instance_variable_get(:@queue) ||
(klass.respond_to?(:queue) and klass.queue)
end
# This method will return a `Resque::Job` object or a non-true value
# depending on whether a job can be obtained. You should pass it the
# precise name of a queue: case matters.
#
# This method is considered part of the `stable` API.
def reserve(queue)
Job.reserve(queue)
end
# Validates if the given klass could be a valid Resque job
#
# If no queue can be inferred this method will raise a `Resque::NoQueueError`
#
# If given klass is nil this method will raise a `Resque::NoClassError`
def validate(klass, queue = nil)
queue ||= queue_from_class(klass)
if !queue
raise NoQueueError.new("Jobs must be placed onto a queue.")
end
if klass.to_s.empty?
raise NoClassError.new("Jobs must be given a class.")
end
end
#
# worker shortcuts
#
# A shortcut to Worker.all
def workers
Worker.all
end
# A shortcut to Worker.working
def working
Worker.working
end
# A shortcut to unregister_worker
# useful for command line tool
def remove_worker(worker_id)
worker = Resque::Worker.find(worker_id)
worker.unregister_worker
end
#
# stats
#
# Returns a hash, similar to redis-rb's #info, of interesting stats.
def info
return {
:pending => queues.inject(0) { |m,k| m + size(k) },
:processed => Stat[:processed],
:queues => queues.size,
:workers => workers.size.to_i,
:working => working.size,
:failed => Stat[:failed],
:servers => [redis_id],
:environment => ENV['RAILS_ENV'] || ENV['RACK_ENV'] || 'development'
}
end
# Returns an array of all known Resque keys in Redis. Redis' KEYS operation
# is O(N) for the keyspace, so be careful - this can be slow for big databases.
def keys
redis.keys("*").map do |key|
key.sub("#{redis.namespace}:", '')
end
end
private
# Register a new proc as a hook. If the block is nil this is the
# equivalent of removing all hooks of the given name.
#
# `name` is the hook that the block should be registered with.
def register_hook(name, block)
return clear_hooks(name) if block.nil?
@hooks ||= {}
@hooks[name] ||= []
@hooks[name] << block
end
# Clear all hooks given a hook name.
def clear_hooks(name)
@hooks && @hooks[name] = []
end
# Retrieve all hooks
def hooks
@hooks || {}
end
# Retrieve all hooks of a given name.
def hooks(name)
(@hooks && @hooks[name]) || []
end
end
# Log to STDOUT by default
Resque.logger = Logger.new(STDOUT)