Skip to content

Commit d79bd49

Browse files
committed
merge except tempalte change
1 parent ea65c2c commit d79bd49

File tree

2 files changed

+40
-14
lines changed

2 files changed

+40
-14
lines changed

src/pyres/worker.py

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ def __init__(self, queues=[], server="localhost:6379"):
1313
self.validate_queues()
1414
self._shutdown = False
1515
self.child = None
16+
self.pid = os.getpid()
1617
if isinstance(server,basestring):
1718
self.resq = ResQ(server)
1819
elif isinstance(server, ResQ):
@@ -55,19 +56,33 @@ def kill_child(self, signum, frame):
5556
if self.child:
5657
print "Killing child at %s" % self.child
5758
os.kill(self.child, signal.SIGKILL)
58-
59+
60+
def __str__(self):
61+
if getattr(self,'id', None):
62+
return self.id
63+
hostname = os.uname()[1]
64+
return '%s:%s:%s' % (hostname, self.pid, ','.join(self.queues))
65+
5966
def work(self, interval=5):
6067
self.startup()
6168
while True:
6269
if self._shutdown:
70+
print 'shutdown scheduled'
6371
break
6472
job = self.reserve()
6573
if job:
6674
print "got: %s" % job
6775
self.child = os.fork()
6876
if self.child:
6977
print 'Forked %s at %s' % (self.child, datetime.datetime.now())
70-
os.waitpid(self.child, 0)
78+
try:
79+
os.waitpid(self.child, 0)
80+
except OSError, ose:
81+
import errno
82+
if ose.errno != errno.EINTR:
83+
raise ose
84+
#os.wait()
85+
print 'Done waiting'
7186
else:
7287
print 'Processing %s since %s' % (job._queue, datetime.datetime.now())
7388
self.process(job)
@@ -103,15 +118,19 @@ def reserve(self):
103118
return job
104119

105120
def working_on(self, job):
121+
print 'marking as working on'
106122
data = {
107123
'queue': job._queue,
108124
'run_at': str(datetime.datetime.now()),
109125
'payload': job._payload
110126
}
111127
data = simplejson.dumps(data)
112128
self.resq.redis["resque:worker:%s" % str(self)] = data
129+
print "worker:%s" % str(self)
130+
print self.resq.redis["resque:worker:%s" % str(self)]
113131

114132
def done_working(self):
133+
print 'done working'
115134
self.processed()
116135
self.resq.redis.delete("resque:worker:%s" % str(self))
117136

@@ -132,18 +151,16 @@ def job(self):
132151
if data:
133152
return ResQ.decode(data)
134153
return {}
154+
155+
def processing(self):
156+
return self.job()
135157

136-
def __str__(self):
137-
if getattr(self,'id', None):
138-
return self.id
139-
import os;
140-
hostname = os.uname()[1]
141-
pid = os.getpid()
142-
return '%s:%s:%s' % (hostname, pid, ','.join(self.queues))
158+
def state(self):
159+
return 'working' if self.resq.redis.exists('resque:worker:%s' % self) else 'idle'
143160

144161
@classmethod
145162
def run(cls, queues, server):
146-
worker = cls(queues=queues, host=server)
163+
worker = cls(queues=queues, server=server)
147164
worker.work()
148165

149166
@classmethod
@@ -152,7 +169,7 @@ def all(cls, host="localhost:6379"):
152169
resq = ResQ(host)
153170
elif isinstance(host, ResQ):
154171
resq = host
155-
return resq.redis.smembers('resque:workers')
172+
return [Worker.find(w,resq) for w in resq.redis.smembers('resque:workers')]
156173

157174
@classmethod
158175
def working(cls, host):
@@ -161,9 +178,8 @@ def working(cls, host):
161178
elif isinstance(host, ResQ):
162179
resq = host
163180
total = []
164-
for key in Worker.all(host):
165-
if Worker.exists(key, resq):
166-
total.append('resque:worker:%s' % key)
181+
for key in Worker.all(host):
182+
total.append('resque:worker:%s' % key)
167183
names = []
168184
for key in total:
169185
value = resq.redis.get(key)
@@ -194,6 +210,7 @@ def exists(cls, worker_id, resq):
194210
parser.add_option("-s", dest="server", default="localhost:6379")
195211
(options,args) = parser.parse_args()
196212
if not options.queue_list:
213+
parser.print_hel()
197214
parser.error("Please give each worker at least one queue.")
198215
queues = options.queue_list.split(',')
199216
import sys

src/tests.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,15 @@ class ErrorObject(object):
3030
def perform():
3131
raise Exception("Could not finish job")
3232

33+
class LongObject(object):
34+
queue = 'long_runnning'
35+
36+
@staticmethod
37+
def perform(sleep_time):
38+
import time
39+
time.sleep(sleep_time)
40+
print 'Done Sleeping'
41+
3342
def test_str_to_class():
3443
ret = str_to_class('tests.Basic')
3544
assert ret

0 commit comments

Comments
 (0)