Skip to content

Commit a5f94cf

Browse files
author
Matt George
committed
added some fixes and the web interface for
delayed tasks
1 parent 240f841 commit a5f94cf

File tree

7 files changed

+170
-11
lines changed

7 files changed

+170
-11
lines changed

pyres/__init__.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -177,24 +177,27 @@ def close(self):
177177
"""
178178
self.redis.disconnect()
179179

180-
def enqueue_at(self, timestamp, klass, *args):
180+
def enqueue_at(self, datetime, klass, *args):
181181
class_name = '%s.%s' % (klass.__module__, klass.__name__)
182-
self.delayed_push(timestamp, {'class':class_name,'queue': klass.queue, 'args':args})
182+
self.delayed_push(datetime, {'class':class_name,'queue': klass.queue, 'args':args})
183183

184-
def delayed_push(self, timestamp, item):
185-
key = int(time.mktime(timestamp.timetuple()))
184+
def delayed_push(self, datetime, item):
185+
key = int(time.mktime(datetime.timetuple()))
186186
self.redis.rpush('resque:delayed:%s' % key, ResQ.encode(item))
187187
self.redis.zadd('resque:delayed_queue_schedule', key, key)
188188

189189
def delayed_queue_peek(self, start, count):
190-
return [int(item) for item in self.redis.zrange('resque:delayed_queue_schedule', start, start+count)]
190+
return [int(item) for item in self.redis.zrange('resque:delayed_queue_schedule', start, start+count) or []]
191191

192+
def delayed_timestamp_peek(self, timestamp, start, count):
193+
return self.list_range('resque:delayed:%s' % timestamp, start, count)
194+
192195
def delayed_queue_schedule_size(self):
193196
return self.redis.zcard('resque:delayed_queue_schedule')
194197

195198
def delayed_timestamp_size(self, timestamp):
196-
key = int(time.mktime(timestamp.timetuple()))
197-
return self.redis.llen("resque:delayed:%i" % key)
199+
#key = int(time.mktime(timestamp.timetuple()))
200+
return self.redis.llen("resque:delayed:%s" % timestamp)
198201

199202
def next_delayed_timestamp(self):
200203
key = int(time.mktime(datetime.datetime.now().timetuple()))

resweb/server.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
Failed,
1212
Stats,
1313
Stat,
14-
Worker
14+
Worker,
15+
Delayed,
16+
DelayedTimestamp
1517
)
1618

1719
HOST = ResQ("localhost:6379")
@@ -61,6 +63,18 @@ def stats(request, key):
6163
def stat(request, stat_id):
6264
return str(Stat(HOST, stat_id).render())
6365

66+
@get('/delayed/')
67+
def delayed(request):
68+
start = request.GET.get('start',0)
69+
start = int(start)
70+
return str(Delayed(HOST, start).render())
71+
72+
@get('/delayed/(?P<timestamp>\w.+)')
73+
def delayed_timestamp(request, timestamp):
74+
start = request.GET.get('start',0)
75+
start = int(start)
76+
return str(DelayedTimestamp(HOST, timestamp, start).render())
77+
6478
@get('/media/(?P<filename>.+)')
6579
def my_media(request, filename):
6680
print filename

resweb/templates/delayed.mustache

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{{>header}}
2+
<h1>Delayed Jobs</h1>
3+
4+
<p class='intro'>
5+
This list below contains the timestamps for scheduled delayed jobs.
6+
</p>
7+
8+
<p class='sub'>
9+
Showing {{start}} to {{end}} of <b>{{size}}</b> timestamps
10+
</p>
11+
12+
13+
<table>
14+
<tr>
15+
<th>Timestamp</th>
16+
<th>Job count</th>
17+
</tr>
18+
{{#jobs}}
19+
<tr>
20+
<td><a href="/delayed/{{timestamp}}/">{{formated_time}}</a></td>
21+
<td>{{size}}</td>
22+
</tr>
23+
{{/jobs}}
24+
</table>
25+
26+
{{#pagination}}
27+
<li>
28+
{{#current}}<a href='{{link}}'>{{/current}}{{link_name}}{{#current}}</a>{{/current}}
29+
</li>
30+
{{/pagination}}
31+
{{>footer}}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{{>header}}
2+
3+
<h1>Delayed jobs scheduled for {{formated_timestamp}}</h1>
4+
5+
<p class='sub'>Showing {{start}} to {{end}} of <b>{{size}}</b> jobs</p>
6+
7+
<table class='jobs'>
8+
<tr>
9+
<th>Class</th>
10+
<th>Args</th>
11+
</tr>
12+
{{#jobs}}
13+
<tr>
14+
<td class='class'>{{class}}</td>
15+
<td class='args'>{{args}}</td>
16+
</tr>
17+
{{/jobs}}
18+
{{#no_jobs}}
19+
<tr>
20+
<td class='no-data' colspan='2'>There are no pending jobs scheduled for this time.</td>
21+
</tr>
22+
{{/no_jobs}}
23+
</table>
24+
25+
{{#pagination}}
26+
<li>
27+
{{#current}}<a href='{{link}}'>{{/current}}{{link_name}}{{#current}}</a>{{/current}}
28+
</li>
29+
{{/pagination}}
30+
{{>footer}}

resweb/templates/header.mustache

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
<li><a href='/failed/'>Failed</a></li>
1515
<li><a href='/queues/'>Queues</a></li>
1616
<li><a href='/workers/'>Workers</a></li>
17+
<li><a href='/delayed/'>Delayed</a></li>
1718
<li><a href='/stats/'>Stats</a></li>
1819
</ul>
1920
</div>

resweb/views.py

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,30 @@
11
import pystache
22

3-
from pyres import ResQ, __version__
3+
from pyres import __version__
44
from pyres.worker import Worker as Wrkr
55
from pyres import failure
66
import os
7+
import datetime
8+
79
TEMPLATE_PATH = os.path.join(os.path.dirname(__file__), 'templates')
810
class ResWeb(pystache.View):
911
template_path = TEMPLATE_PATH
1012
def __init__(self, host):
1113
super(ResWeb, self).__init__()
1214
self.resq = host
15+
1316
def media_folder(self):
1417
return '/media/'
18+
1519
def close(self):
1620
self.resq.close()
1721

1822
def address(self):
1923
return '%s:%s' % (self.resq.redis.host,self.resq.redis.port)
24+
2025
def version(self):
2126
return str(__version__)
27+
2228
def pages(self, start, size, link_function, width=20):
2329
pages = []
2430

@@ -406,6 +412,80 @@ def runat(self):
406412
item['nodata'] = not item['data']
407413
"""
408414
pass
415+
416+
class Delayed(ResWeb):
417+
def __init__(self, host, start=0):
418+
self._start = start
419+
super(Delayed, self).__init__(host)
420+
421+
def start(self):
422+
return str(self._start)
423+
424+
def end(self):
425+
return str(self._start + 20)
426+
427+
def size(self):
428+
item = self.resq.delayed_queue_schedule_size() or 0
429+
return str(item)
430+
431+
def jobs(self):
432+
jobs = []
433+
for timestamp in self.resq.delayed_queue_peek(self.start(), self.end()):
434+
t = datetime.datetime.fromtimestamp(float(timestamp))
435+
item = dict(timestamp=str(timestamp))
436+
item['size'] = str(self.resq.delayed_timestamp_size(timestamp))
437+
438+
item['formated_time'] = str(t)
439+
440+
jobs.append(item)
441+
return jobs
442+
443+
def pagination(self):
444+
return self.pages(self._start, int(self.size()), self.link_func)
445+
446+
def link_func(self, start):
447+
return '/delayed/?start=%s' % start
448+
449+
class DelayedTimestamp(ResWeb):
450+
def __init__(self, host, timestamp, start=0):
451+
self._start = start
452+
self._timestamp = timestamp
453+
super(DelayedTimestamp, self).__init__(host)
454+
455+
def formated_timestamp(self):
456+
return str(datetime.datetime.fromtimestamp(float(self._timestamp)))
457+
458+
def start(self):
459+
return str(self._start)
460+
461+
def end(self):
462+
return str(self._start + 20)
463+
464+
def size(self):
465+
item = self.resq.delayed_timestamp_size(self._timestamp) or 0
466+
return str(item)
467+
468+
def jobs(self):
469+
jobs = []
470+
for job in self.resq.delayed_timestamp_peek(self._timestamp, int(self.start()), int(self.end())):
471+
item = {
472+
'class': str(job['class']),
473+
'args': str(job['args'])
474+
}
475+
jobs.append(item)
476+
return jobs
477+
478+
def no_jobs(self):
479+
if int(self.size()) > 0:
480+
return False
481+
return True
482+
483+
def pagination(self):
484+
return self.pages(self._start, int(self.size()), self.link_func)
485+
486+
def link_func(self, start):
487+
return '/delayed/?start=%s' % start
488+
409489
def redis_size(key, resq):
410490
key_type = resq.redis.get_type('resque:'+key)
411491
item = 0

tests/test_schedule.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ def test_delayed_timestamp_size(self):
3434
key = int(time.mktime(d.timetuple()))
3535
key2 = int(time.mktime(d2.timetuple()))
3636
self.resq.enqueue_at(d, Basic,"test1")
37-
assert self.resq.delayed_timestamp_size(d) == 1
37+
assert self.resq.delayed_timestamp_size(key) == 1
3838
self.resq.enqueue_at(d, Basic,"test1")
39-
assert self.resq.delayed_timestamp_size(d) == 2
39+
assert self.resq.delayed_timestamp_size(key) == 2
4040

4141
def test_next_delayed_timestamp(self):
4242
d = datetime.datetime.now() + datetime.timedelta(days=-1)

0 commit comments

Comments
 (0)