Skip to content

Commit 68165f1

Browse files
author
Matt George
committed
Merge branch 'scheduled_tasks'
Conflicts: pyres/__init__.py
2 parents 2463471 + 1a6ce15 commit 68165f1

File tree

10 files changed

+389
-7
lines changed

10 files changed

+389
-7
lines changed

pyres/__init__.py

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
from redis import Redis
44
import pyres.json_parser as json
55

6+
import time, datetime
7+
68
import logging
79

810
def my_import(name):
@@ -19,12 +21,12 @@ def safe_str_to_class(s):
1921
klass = lst[-1]
2022
mod_list = lst[:-1]
2123
module = ".".join(mod_list)
22-
mod = my_import(module)
23-
if hasattr(mod, klass):
24-
return getattr(mod, klass)
25-
else:
24+
mod = my_import(module)
25+
if hasattr(mod, klass):
26+
return getattr(mod, klass)
27+
else:
2628
raise ImportError('')
27-
29+
2830
def str_to_class(s):
2931
"""Alternate helper function to map string class names to module classes."""
3032
lst = s.split(".")
@@ -199,6 +201,51 @@ def close(self):
199201
"""
200202
self.redis.disconnect()
201203

204+
def enqueue_at(self, datetime, klass, *args):
205+
class_name = '%s.%s' % (klass.__module__, klass.__name__)
206+
logging.info("enqueued '%s' job for execution at %s" % (class_name, datetime))
207+
if args:
208+
logging.debug("job arguments are: %s" % str(args))
209+
self.delayed_push(datetime, {'class':class_name,'queue': klass.queue, 'args':args})
210+
211+
def delayed_push(self, datetime, item):
212+
key = int(time.mktime(datetime.timetuple()))
213+
self.redis.rpush('resque:delayed:%s' % key, ResQ.encode(item))
214+
self.redis.zadd('resque:delayed_queue_schedule', key, key)
215+
216+
def delayed_queue_peek(self, start, count):
217+
return [int(item) for item in self.redis.zrange('resque:delayed_queue_schedule', start, start+count) or []]
218+
219+
def delayed_timestamp_peek(self, timestamp, start, count):
220+
return self.list_range('resque:delayed:%s' % timestamp, start, count)
221+
222+
def delayed_queue_schedule_size(self):
223+
return self.redis.zcard('resque:delayed_queue_schedule')
224+
225+
def delayed_timestamp_size(self, timestamp):
226+
#key = int(time.mktime(timestamp.timetuple()))
227+
return self.redis.llen("resque:delayed:%s" % timestamp)
228+
229+
def next_delayed_timestamp(self):
230+
key = int(time.mktime(datetime.datetime.now().timetuple()))
231+
array = self.redis.zrangebyscore('resque:delayed_queue_schedule', '-inf', key)
232+
timestamp = None
233+
if array:
234+
timestamp = array[0]
235+
return timestamp
236+
237+
def next_item_for_timestamp(self, timestamp):
238+
#key = int(time.mktime(timestamp.timetuple()))
239+
key = "resque:delayed:%s" % timestamp
240+
ret = self.redis.lpop(key)
241+
item = None
242+
if ret:
243+
item = ResQ.decode(ret)
244+
if self.redis.llen(key) == 0:
245+
self.redis.delete(key)
246+
self.redis.zrem('resque:delayed_queue_schedule', timestamp)
247+
return item
248+
202249
@classmethod
203250
def encode(cls, item):
204251
return json.dumps(item)

pyres/scheduler.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import signal
2+
import time
3+
import logging
4+
5+
from pyres import ResQ
6+
7+
class Scheduler(object):
8+
9+
def __init__(self, server="localhost:6379", password=None):
10+
"""
11+
>>> from pyres.scheduler import Scheduler
12+
>>> scheduler = Scheduler('localhost:6379')
13+
"""
14+
self._shutdown = False
15+
if isinstance(server,basestring):
16+
self.resq = ResQ(server=server, password=password)
17+
elif isinstance(server, ResQ):
18+
self.resq = server
19+
else:
20+
raise Exception("Bad server argument")
21+
22+
def register_signal_handlers(self):
23+
logging.info('registering signals')
24+
signal.signal(signal.SIGTERM, self.schedule_shutdown)
25+
signal.signal(signal.SIGINT, self.schedule_shutdown)
26+
signal.signal(signal.SIGQUIT, self.schedule_shutdown)
27+
28+
def schedule_shutdown(self, signal, frame):
29+
logging.info('shutting down started')
30+
self._shutdown = True
31+
32+
def __call__(self):
33+
logging.info('starting up')
34+
self.register_signal_handlers()
35+
#self.load_schedule()
36+
logging.info('looking for delayed items')
37+
while True:
38+
if self._shutdown is True:
39+
break
40+
self.handle_delayed_items()
41+
logging.debug('sleeping')
42+
time.sleep(5)
43+
logging.info('shutting down complete')
44+
45+
def next_timestamp(self):
46+
while True:
47+
timestamp = self.resq.next_delayed_timestamp()
48+
if timestamp:
49+
yield timestamp
50+
else:
51+
break
52+
53+
54+
def next_item(self, timestamp):
55+
while True:
56+
item = self.resq.next_item_for_timestamp(timestamp)
57+
if item:
58+
yield item
59+
else:
60+
break
61+
62+
def handle_delayed_items(self):
63+
for timestamp in self.next_timestamp():
64+
logging.info('handling timestamp: %s' % timestamp)
65+
for item in self.next_item(timestamp):
66+
logging.debug('queueing item %s' % item)
67+
klass = item['class']
68+
queue = item['queue']
69+
args = item['args']
70+
self.resq.enqueue_from_string(klass, queue, *args)
71+
72+
73+
@classmethod
74+
def run(cls, server, password=None):
75+
sched = cls(server=server, password=password)
76+
sched()
77+
78+

resweb/server.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
Failed,
1212
Stats,
1313
Stat,
14-
Worker
14+
Worker,
15+
Delayed,
16+
DelayedTimestamp
1517
)
1618

1719
HOST = ResQ("localhost:6379")
@@ -61,6 +63,18 @@ def stats(request, key):
6163
def stat(request, stat_id):
6264
return str(Stat(HOST, stat_id).render())
6365

66+
@get('/delayed/')
67+
def delayed(request):
68+
start = request.GET.get('start',0)
69+
start = int(start)
70+
return str(Delayed(HOST, start).render())
71+
72+
@get('/delayed/(?P<timestamp>\w.+)')
73+
def delayed_timestamp(request, timestamp):
74+
start = request.GET.get('start',0)
75+
start = int(start)
76+
return str(DelayedTimestamp(HOST, timestamp, start).render())
77+
6478
@get('/media/(?P<filename>.+)')
6579
def my_media(request, filename):
6680
print filename

resweb/templates/delayed.mustache

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{{>header}}
2+
<h1>Delayed Jobs</h1>
3+
4+
<p class='intro'>
5+
This list below contains the timestamps for scheduled delayed jobs.
6+
</p>
7+
8+
<p class='sub'>
9+
Showing {{start}} to {{end}} of <b>{{size}}</b> timestamps
10+
</p>
11+
12+
13+
<table>
14+
<tr>
15+
<th>Timestamp</th>
16+
<th>Job count</th>
17+
</tr>
18+
{{#jobs}}
19+
<tr>
20+
<td><a href="/delayed/{{timestamp}}/">{{formated_time}}</a></td>
21+
<td>{{size}}</td>
22+
</tr>
23+
{{/jobs}}
24+
</table>
25+
26+
{{#pagination}}
27+
<li>
28+
{{#current}}<a href='{{link}}'>{{/current}}{{link_name}}{{#current}}</a>{{/current}}
29+
</li>
30+
{{/pagination}}
31+
{{>footer}}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{{>header}}
2+
3+
<h1>Delayed jobs scheduled for {{formated_timestamp}}</h1>
4+
5+
<p class='sub'>Showing {{start}} to {{end}} of <b>{{size}}</b> jobs</p>
6+
7+
<table class='jobs'>
8+
<tr>
9+
<th>Class</th>
10+
<th>Args</th>
11+
</tr>
12+
{{#jobs}}
13+
<tr>
14+
<td class='class'>{{class}}</td>
15+
<td class='args'>{{args}}</td>
16+
</tr>
17+
{{/jobs}}
18+
{{#no_jobs}}
19+
<tr>
20+
<td class='no-data' colspan='2'>There are no pending jobs scheduled for this time.</td>
21+
</tr>
22+
{{/no_jobs}}
23+
</table>
24+
25+
{{#pagination}}
26+
<li>
27+
{{#current}}<a href='{{link}}'>{{/current}}{{link_name}}{{#current}}</a>{{/current}}
28+
</li>
29+
{{/pagination}}
30+
{{>footer}}

resweb/templates/header.mustache

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
<li><a href='/failed/'>Failed</a></li>
1515
<li><a href='/queues/'>Queues</a></li>
1616
<li><a href='/workers/'>Workers</a></li>
17+
<li><a href='/delayed/'>Delayed</a></li>
1718
<li><a href='/stats/'>Stats</a></li>
1819
</ul>
1920
</div>

resweb/views.py

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,30 @@
11
import pystache
22

3-
from pyres import ResQ, __version__
3+
from pyres import __version__
44
from pyres.worker import Worker as Wrkr
55
from pyres import failure
66
import os
7+
import datetime
8+
79
TEMPLATE_PATH = os.path.join(os.path.dirname(__file__), 'templates')
810
class ResWeb(pystache.View):
911
template_path = TEMPLATE_PATH
1012
def __init__(self, host):
1113
super(ResWeb, self).__init__()
1214
self.resq = host
15+
1316
def media_folder(self):
1417
return '/media/'
18+
1519
def close(self):
1620
self.resq.close()
1721

1822
def address(self):
1923
return '%s:%s' % (self.resq.redis.host,self.resq.redis.port)
24+
2025
def version(self):
2126
return str(__version__)
27+
2228
def pages(self, start, size, link_function, width=20):
2329
pages = []
2430

@@ -406,6 +412,80 @@ def runat(self):
406412
item['nodata'] = not item['data']
407413
"""
408414
pass
415+
416+
class Delayed(ResWeb):
417+
def __init__(self, host, start=0):
418+
self._start = start
419+
super(Delayed, self).__init__(host)
420+
421+
def start(self):
422+
return str(self._start)
423+
424+
def end(self):
425+
return str(self._start + 20)
426+
427+
def size(self):
428+
item = self.resq.delayed_queue_schedule_size() or 0
429+
return str(item)
430+
431+
def jobs(self):
432+
jobs = []
433+
for timestamp in self.resq.delayed_queue_peek(self.start(), self.end()):
434+
t = datetime.datetime.fromtimestamp(float(timestamp))
435+
item = dict(timestamp=str(timestamp))
436+
item['size'] = str(self.resq.delayed_timestamp_size(timestamp))
437+
438+
item['formated_time'] = str(t)
439+
440+
jobs.append(item)
441+
return jobs
442+
443+
def pagination(self):
444+
return self.pages(self._start, int(self.size()), self.link_func)
445+
446+
def link_func(self, start):
447+
return '/delayed/?start=%s' % start
448+
449+
class DelayedTimestamp(ResWeb):
450+
def __init__(self, host, timestamp, start=0):
451+
self._start = start
452+
self._timestamp = timestamp
453+
super(DelayedTimestamp, self).__init__(host)
454+
455+
def formated_timestamp(self):
456+
return str(datetime.datetime.fromtimestamp(float(self._timestamp)))
457+
458+
def start(self):
459+
return str(self._start)
460+
461+
def end(self):
462+
return str(self._start + 20)
463+
464+
def size(self):
465+
item = self.resq.delayed_timestamp_size(self._timestamp) or 0
466+
return str(item)
467+
468+
def jobs(self):
469+
jobs = []
470+
for job in self.resq.delayed_timestamp_peek(self._timestamp, int(self.start()), int(self.end())):
471+
item = {
472+
'class': str(job['class']),
473+
'args': str(job['args'])
474+
}
475+
jobs.append(item)
476+
return jobs
477+
478+
def no_jobs(self):
479+
if int(self.size()) > 0:
480+
return False
481+
return True
482+
483+
def pagination(self):
484+
return self.pages(self._start, int(self.size()), self.link_func)
485+
486+
def link_func(self, start):
487+
return '/delayed/?start=%s' % start
488+
409489
def redis_size(key, resq):
410490
key_type = resq.redis.type('resque:'+key)
411491
item = 0

scripts/pyres_scheduler

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#!/usr/bin/env python
2+
from pyres.scheduler import Scheduler
3+
import logging
4+
5+
def main():
6+
from optparse import OptionParser
7+
usage = "usage: %prog [options] arg1"
8+
parser = OptionParser(usage=usage)
9+
#parser.add_option("-q", dest="queue_list")
10+
parser.add_option("--host", dest="host", default="localhost")
11+
parser.add_option("--port",dest="port",type="int", default=6379)
12+
parser.add_option('-l', '--log-level', dest='log_level', default='info', help='log level. Valid values are "debug", "info", "warning", "error", "critical", in decreasing order of verbosity. Defaults to "info" if parameter not specified.')
13+
(options,args) = parser.parse_args()
14+
log_level = getattr(logging, options.log_level.upper(),'INFO')
15+
logging.basicConfig(level=log_level, format="%(module)s: %(asctime)s: %(levelname)s: %(message)s")
16+
server = '%s:%s' % (options.host, options.port)
17+
Scheduler.run(server)
18+
19+
if __name__ == "__main__":
20+
main()

0 commit comments

Comments
 (0)