Skip to content

Commit 6da2ee0

Browse files
author
Matt George
committed
adding the basics, still quite rough
1 parent a1ba344 commit 6da2ee0

File tree

11 files changed

+1585
-0
lines changed

11 files changed

+1585
-0
lines changed

README.markdown

Whitespace-only changes.

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
multiprocessing==2.6.2.1

setup.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from distutils.core import setup
2+
3+
setup(name='PyRes',
4+
version='0.1',
5+
description='Python Resque Job',
6+
author='Matt George',
7+
author_email='[email protected]',
8+
url='',
9+
packages=['pyres'],
10+
package_dir = {'': 'src'}
11+
)

src/pyres/__init__.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
from redis import Redis
2+
import simplejson
3+
4+
import types
5+
def str_to_class(s):
6+
lst = s.split(".")
7+
klass = lst[-1]
8+
mod_list = lst[:-1]
9+
module = ".".join(mod_list)
10+
try:
11+
mod = __import__(module)
12+
if hasattr(mod, klass):
13+
return getattr(mod, klass)
14+
else:
15+
return None
16+
except ImportError:
17+
return None
18+
19+
class ResQ(object):
20+
_res = None
21+
def __init__(self, server="localhost:6379"):
22+
host, port = server.split(':')
23+
self._redis = Redis(host=host, port=int(port))
24+
self._watched_queues = {}
25+
26+
def push(self, queue, item):
27+
self.watch_queue(queue)
28+
self._redis.push("queue:%s" % queue, ResQ.encode(item))
29+
30+
def pop(self, queue):
31+
ret = self._redis.pop("queue:%s" % queue)
32+
if ret:
33+
return ResQ.decode(ret)
34+
return ret
35+
36+
def size(self, queue):
37+
return int(self._redis.llen("queue:%s" % queue))
38+
39+
def watch_queue(self, queue):
40+
if self._watched_queues.has_key(queue):
41+
return
42+
else:
43+
if self._redis.sadd('queues',str(queue)):
44+
self._watched_queues[queue] = queue
45+
def peek(self, queue, start=0, count=1):
46+
return self.list_range('queue:%s' % queue, start, count)
47+
48+
def list_range(self, key, start, count):
49+
items = self._redis.lrange(key,start,start+count-1)
50+
ret_list = []
51+
for i in items:
52+
ret_list.append(ResQ.decode(i))
53+
return ret_list
54+
@classmethod
55+
def encode(cls, item):
56+
return simplejson.dumps(item)
57+
58+
@classmethod
59+
def decode(cls, item):
60+
ret = simplejson.loads(item)
61+
return ret
62+
63+
@classmethod
64+
def enqueue(cls, klass, *args):
65+
queue = getattr(klass,'queue', None)
66+
#print cls._res
67+
resq = cls()
68+
if queue:
69+
class_name = '%s.%s' % (klass.__module__, klass.__name__)
70+
#print class_name
71+
resq.push(queue, {'klass':class_name,'args':args})
72+
#Job.create(queue, klass,*args)
73+
74+
@classmethod
75+
def queues(cls, server="localhost:6379"):
76+
resq = cls(server)
77+
return resq._redis.smembers("queues")
78+
79+
class Stat(object):
80+
def __init__(self, name, resq):
81+
self.name = name
82+
self.key = "stat:%s" % self.name
83+
self.resq = resq
84+
85+
def get(self):
86+
val = self.resq._redis.get(self.key)
87+
return int(val)
88+
89+
def incr(self, ammount=1):
90+
self.resq._redis.incr(self.key, ammount)
91+
92+
def decr(self, ammount=1):
93+
self.resq._redis.decr(self.key, ammount)
94+
95+
def clear(self):
96+
self.resq._redis.delete(self.key)
97+

src/pyres/exceptions.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
class NoQueueError(Exception):
2+
pass

src/pyres/failure.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import datetime
2+
from pyres import ResQ
3+
class Failure(object):
4+
def __init__(self, exp, queue, payload):
5+
self._exception = exp
6+
#self._worker = worker
7+
self._queue = queue
8+
self._payload = payload
9+
10+
def save(self, resq):
11+
data = {
12+
'failed_at' : str(datetime.datetime.now()),
13+
'payload' : self._payload,
14+
'error' : self._exception,
15+
'queue' : self._queue
16+
}
17+
data = ResQ.encode(data)
18+
resq._redis.push('failed', data)
19+
20+
@classmethod
21+
def count(cls, resq):
22+
return int(resq._redis.llen('failed'))
23+
24+
@classmethod
25+
def create(cls, options={}):
26+
pass
27+

src/pyres/job.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from pyres import ResQ, str_to_class
2+
from pyres.failure import Failure
3+
class Job(object):
4+
def __init__(self, queue, payload, resq):
5+
self._queue = queue
6+
self._payload = payload
7+
self.resq = resq
8+
9+
def perform(self):
10+
payload_class_str = self._payload["klass"]
11+
payload_class = str_to_class(payload_class_str)
12+
args = self._payload.get("args", None)
13+
if args:
14+
return payload_class.perform(*args)
15+
else:
16+
return payload_class.perform()
17+
18+
def fail(self, exception):
19+
#Failure.create(exception)
20+
failure = Failure(exception, self._queue, self._payload)
21+
failure.save(self.resq)
22+
23+
@classmethod
24+
def reserve(cls, queue, res):
25+
payload = res.pop(queue)
26+
print "payload: %s" % payload
27+
if payload:
28+
return cls(queue, payload, res)

src/pyres/manager.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
from pyres.exceptions import NoQueueError
2+
class Manager(object):
3+
def __init__(self, queues, host):
4+
self.queues = queues
5+
self._host = host
6+
self.resq = ResQ(host)
7+
self.validate_queues()
8+
9+
def __str__(self):
10+
import os;
11+
hostname = os.uname()[1]
12+
pid = os.getpid()
13+
return 'Manager:%s:%s:%s' % (hostname, pid, ','.join(self.queues))
14+
15+
def validate_queues(self):
16+
if not self.queues:
17+
raise NoQueueError("Please give each worker at least one queue.")
18+
19+
def work(self):
20+
self.startup()
21+
while True:
22+
pass
23+
24+
def startup(self):
25+
self.register_manager()
26+
self.register_signals()
27+
28+
def register_manager(self):
29+
self.resq._redis.sadd('managers',str(self))
30+
31+
def unregister_manager(self):
32+
self.resq._redis.srem('managers',str(self))
33+
34+
def register_signals(self):
35+
signal.signal(signal.SIGTERM, self.shutdown_all)
36+
signal.signal(signal.SIGINT, self.shutdown_all)
37+
signal.signal(signal.SIGQUIT, self.schedule_shutdown)
38+
signal.signal(signal.SIGUSR1, self.kill_children)
39+
40+
def shutdown_all(self, signum, frame):
41+
self.schedule_shutdown(signum, frame)
42+
self.kill_children(signum, frame)
43+
44+
def schedule_shutdown(self, signum, frame):
45+
self._shutdown = True
46+
47+
def kill_children(self):
48+
for child in self._children:
49+
child.terminate()
50+
51+
def start_child(self, queue):
52+
from pyres.worker import JuniorWorker
53+
p = Process(target=JuniorWorker.run, args=([queue], self._host))
54+
self.children.append(p)
55+
p.start()
56+
return True
57+
58+
@classmethod
59+
def run(cls, queues=[], host="localhost:6379"):
60+
manager = cls(queues, host)
61+
manager.work()

0 commit comments

Comments
 (0)