Skip to content

Commit 34020ea

Browse files
author
Matt George
committed
added the first parts of delayed tasks
1 parent 16ba89a commit 34020ea

File tree

3 files changed

+164
-1
lines changed

3 files changed

+164
-1
lines changed

pyres/__init__.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
from redis import Redis
44
import pyres.json_parser as json
55

6-
import types
6+
#import types
7+
import time, datetime
78

89
def my_import(name):
910
mod = __import__(name)
@@ -179,6 +180,45 @@ def close(self):
179180
"""
180181
self.redis.disconnect()
181182

183+
def enqueue_at(self, timestamp, klass, *args):
184+
class_name = '%s.%s' % (klass.__module__, klass.__name__)
185+
self.delayed_push(timestamp, {'class':class_name,'queue': klass.queue, 'args':args})
186+
187+
def delayed_push(self, timestamp, item):
188+
key = int(time.mktime(timestamp.timetuple()))
189+
self.redis.push('resque:delayed:%s' % key, ResQ.encode(item))
190+
self.redis.zadd('resque:delayed_queue_schedule', key, key)
191+
192+
def delayed_queue_peek(self, start, count):
193+
return [int(item) for item in self.redis.zrange('resque:delayed_queue_schedule', start, start+count)]
194+
195+
def delayed_queue_schedule_size(self):
196+
return self.redis.zcard('resque:delayed_queue_schedule')
197+
198+
def delayed_timestamp_size(self, timestamp):
199+
key = int(time.mktime(timestamp.timetuple()))
200+
return self.redis.llen("resque:delayed:%i" % key)
201+
202+
def next_delayed_timestamp(self):
203+
key = int(time.mktime(datetime.datetime.now().timetuple()))
204+
array = self.redis.zrangebyscore('resque:delayed_queue_schedule', '-inf', key)
205+
timestamp = None
206+
if array:
207+
timestamp = array[0]
208+
return timestamp
209+
210+
def next_item_for_timestamp(self, timestamp):
211+
#key = int(time.mktime(timestamp.timetuple()))
212+
key = "resque:delayed:%s" % timestamp
213+
ret = self.redis.pop(key)
214+
item = None
215+
if ret:
216+
item = ResQ.decode(ret)
217+
if self.redis.llen(key) == 0:
218+
self.redis.delete(key)
219+
self.redis.zrem('resque:delayed_queue_schedule', timestamp)
220+
return item
221+
182222
@classmethod
183223
def encode(cls, item):
184224
return json.dumps(item)

pyres/scheduler.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import signal
2+
import time
3+
4+
from pyres import ResQ
5+
6+
class Scheduler(object):
7+
8+
def __init__(self, server="localhost:6379", password=None):
9+
self._shutdown = False
10+
if isinstance(server,basestring):
11+
self.resq = ResQ(server=server, password=password)
12+
elif isinstance(server, ResQ):
13+
self.resq = server
14+
else:
15+
raise Exception("Bad server argument")
16+
17+
def register_signal_handlers(self):
18+
signal.signal(signal.SIGTERM, self.schedule_shutdown)
19+
signal.signal(signal.SIGINT, self.schedule_shutdown)
20+
signal.signal(signal.SIGQUIT, self.schedule_shutdown)
21+
22+
def schedule_shutdown(self):
23+
self._shutdown = True
24+
25+
def run(self):
26+
self.register_signal_handlers()
27+
#self.load_schedule()
28+
while True:
29+
if self._shutdown is True:
30+
break
31+
self.handle_delayed_items()
32+
time.sleep(5)
33+
34+
def next_timestamp(self):
35+
while True
36+
timestamp = self.resq.next_delayed_timestamp()
37+
if timestamp:
38+
yield timestamp
39+
else:
40+
break
41+
42+
43+
def next_item(self, timestamp):
44+
while True:
45+
item = self.resq.next_item_for_timestamp(timestamp)
46+
if item:
47+
yield item
48+
else:
49+
break
50+
51+
def handle_delayed_items(self):
52+
for timestamp in self.next_timestamp():
53+
for item in self.next_item(timestamp):
54+
print 'queueing item'
55+
klass = item['class']
56+
queue = item['queue']
57+
args = item['args']
58+
self.resq.enqueue_from_string(klass, queue, args)
59+
60+
61+

tests/test_schedule.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
from tests import PyResTests, Basic, TestProcess, ErrorObject
2+
from pyres import ResQ
3+
from pyres.job import Job
4+
from pyres.scheduler import Scheduler
5+
import os
6+
import datetime
7+
import time
8+
class ScheduleTests(PyResTests):
9+
def test_enqueue_at(self):
10+
d = datetime.datetime.now() + datetime.timedelta(days=1)
11+
d2 = d + datetime.timedelta(days=1)
12+
key = int(time.mktime(d.timetuple()))
13+
key2 = int(time.mktime(d2.timetuple()))
14+
self.resq.enqueue_at(d, Basic,"test1")
15+
self.resq.enqueue_at(d, Basic,"test2")
16+
assert self.redis.llen("resque:delayed:%s" % key) == 2
17+
assert len(self.redis.zrange('resque:delayed_queue_schedule',0,20)) == 1
18+
self.resq.enqueue_at(d2, Basic,"test1")
19+
assert self.redis.llen("resque:delayed:%s" % key2) == 1
20+
assert len(self.redis.zrange('resque:delayed_queue_schedule',0,20)) == 2
21+
22+
def test_delayed_queue_schedule_size(self):
23+
d = datetime.datetime.now() + datetime.timedelta(days=1)
24+
d2 = d + datetime.timedelta(days=1)
25+
key = int(time.mktime(d.timetuple()))
26+
key2 = int(time.mktime(d2.timetuple()))
27+
self.resq.enqueue_at(d, Basic,"test1")
28+
self.resq.enqueue_at(d2, Basic,"test1")
29+
assert self.resq.delayed_queue_schedule_size() == 2
30+
31+
def test_delayed_timestamp_size(self):
32+
d = datetime.datetime.now() + datetime.timedelta(days=1)
33+
d2 = d + datetime.timedelta(days=1)
34+
key = int(time.mktime(d.timetuple()))
35+
key2 = int(time.mktime(d2.timetuple()))
36+
self.resq.enqueue_at(d, Basic,"test1")
37+
assert self.resq.delayed_timestamp_size(d) == 1
38+
self.resq.enqueue_at(d, Basic,"test1")
39+
assert self.resq.delayed_timestamp_size(d) == 2
40+
41+
def test_next_delayed_timestamp(self):
42+
d = datetime.datetime.now() + datetime.timedelta(days=-1)
43+
d2 = d + datetime.timedelta(days=-2)
44+
key = int(time.mktime(d.timetuple()))
45+
key2 = int(time.mktime(d2.timetuple()))
46+
self.resq.enqueue_at(d, Basic,"test1")
47+
self.resq.enqueue_at(d2, Basic,"test1")
48+
item = self.resq.next_delayed_timestamp()
49+
assert item == key2
50+
51+
def test_next_item_for_timestamp(self):
52+
d = datetime.datetime.now() + datetime.timedelta(days=-1)
53+
d2 = d + datetime.timedelta(days=-2)
54+
#key = int(time.mktime(d.timetuple()))
55+
#key2 = int(time.mktime(d2.timetuple()))
56+
self.resq.enqueue_at(d, Basic,"test1")
57+
self.resq.enqueue_at(d2, Basic,"test1")
58+
timestamp = self.resq.next_delayed_timestamp()
59+
item = self.resq.next_item_for_timestamp(timestamp)
60+
assert isinstance(item, dict)
61+
assert self.redis.zcard('resque:delayed_queue_schedule') == 1
62+

0 commit comments

Comments
 (0)