Skip to content

Commit 240f841

Browse files
author
Matt George
committed
updated scheduler
1 parent f15345e commit 240f841

File tree

3 files changed

+33
-6
lines changed

3 files changed

+33
-6
lines changed

pyres/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ def enqueue_at(self, timestamp, klass, *args):
183183

184184
def delayed_push(self, timestamp, item):
185185
key = int(time.mktime(timestamp.timetuple()))
186-
self.redis.push('resque:delayed:%s' % key, ResQ.encode(item))
186+
self.redis.rpush('resque:delayed:%s' % key, ResQ.encode(item))
187187
self.redis.zadd('resque:delayed_queue_schedule', key, key)
188188

189189
def delayed_queue_peek(self, start, count):
@@ -207,7 +207,7 @@ def next_delayed_timestamp(self):
207207
def next_item_for_timestamp(self, timestamp):
208208
#key = int(time.mktime(timestamp.timetuple()))
209209
key = "resque:delayed:%s" % timestamp
210-
ret = self.redis.pop(key)
210+
ret = self.redis.lpop(key)
211211
item = None
212212
if ret:
213213
item = ResQ.decode(ret)

pyres/scheduler.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,27 @@ def __init__(self, server="localhost:6379", password=None):
1515
raise Exception("Bad server argument")
1616

1717
def register_signal_handlers(self):
18+
print 'registering signals'
1819
signal.signal(signal.SIGTERM, self.schedule_shutdown)
1920
signal.signal(signal.SIGINT, self.schedule_shutdown)
2021
signal.signal(signal.SIGQUIT, self.schedule_shutdown)
2122

22-
def schedule_shutdown(self):
23+
def schedule_shutdown(self, signal, frame):
24+
print 'shutting down started'
2325
self._shutdown = True
2426

25-
def run(self):
27+
def __call__(self):
28+
print 'starting up'
2629
self.register_signal_handlers()
2730
#self.load_schedule()
31+
print 'looking for delayed items'
2832
while True:
2933
if self._shutdown is True:
3034
break
3135
self.handle_delayed_items()
36+
print 'sleeping'
3237
time.sleep(5)
38+
print 'shutting down complete'
3339

3440
def next_timestamp(self):
3541
while True:
@@ -50,12 +56,18 @@ def next_item(self, timestamp):
5056

5157
def handle_delayed_items(self):
5258
for timestamp in self.next_timestamp():
59+
print 'handling timestamp: %s' % timestamp
5360
for item in self.next_item(timestamp):
54-
print 'queueing item'
61+
print 'queueing item %s' % item
5562
klass = item['class']
5663
queue = item['queue']
5764
args = item['args']
58-
self.resq.enqueue_from_string(klass, queue, args)
65+
self.resq.enqueue_from_string(klass, queue, *args)
5966

6067

68+
@classmethod
69+
def run(cls, server, password=None):
70+
sched = cls(server=server, password=password)
71+
sched()
6172

73+

scripts/pyres_scheduler

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#!/usr/bin/env python
2+
from pyres.scheduler import Scheduler
3+
def main():
4+
from optparse import OptionParser
5+
usage = "usage: %prog [options] arg1"
6+
parser = OptionParser(usage=usage)
7+
#parser.add_option("-q", dest="queue_list")
8+
parser.add_option("--host", dest="host", default="localhost")
9+
parser.add_option("--port",dest="port",type="int", default=6379)
10+
(options,args) = parser.parse_args()
11+
server = '%s:%s' % (options.host,options.port)
12+
Scheduler.run(server)
13+
14+
if __name__ == "__main__":
15+
main()

0 commit comments

Comments
 (0)