Skip to content

Commit 4e54774

Browse files
author
Whit Morriss
committed
Make it possible to define job class as part of worker class. Add a hook method for inspecting/manipulating a job before it is processed.
1 parent fbe0de4 commit 4e54774

File tree

1 file changed

+17
-7
lines changed

1 file changed

+17
-7
lines changed

pyres/worker.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,7 @@
99
from pyres.job import Job
1010
from pyres import ResQ, Stat, __version__
1111

12-
try:
13-
from setproctitle import setproctitle
14-
except:
15-
def setproctitle(name):
16-
pass
12+
1713

1814
logger = logging.getLogger(__name__)
1915

@@ -25,6 +21,9 @@ class and passes a comma-separated list of queues to listen on.::
2521
>>> Worker.run([queue1, queue2], server="localhost:6379")
2622
2723
"""
24+
25+
job_class = Job
26+
2827
def __init__(self, queues=(), server="localhost:6379", password=None):
2928
self.queues = queues
3029
self.validate_queues()
@@ -140,7 +139,7 @@ def work(self, interval=5):
140139
if job:
141140
logger.info('picked up job')
142141
logger.debug('job details: %s' % job)
143-
self.child = os.fork()
142+
self.child = os.fork()
144143
if self.child:
145144
setproctitle("pyres_worker%s: Forked %s at %s" %
146145
(__version__,
@@ -176,11 +175,15 @@ def work(self, interval=5):
176175
#time.sleep(interval)
177176
self.unregister_worker()
178177

178+
def before_process(self, job):
179+
return job
180+
179181
def process(self, job=None):
180182
if not job:
181183
job = self.reserve()
182184
try:
183185
self.working_on(job)
186+
job = self.before_process(job)
184187
return job.perform()
185188
except Exception, e:
186189
exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
@@ -196,7 +199,7 @@ def process(self, job=None):
196199
def reserve(self, timeout=10):
197200
for q in self.queues:
198201
logger.debug('checking queue %s' % q)
199-
job = Job.reserve(q, self.resq, self.__str__(), timeout=timeout)
202+
job = self.job_class.reserve(q, self.resq, self.__str__(), timeout=timeout)
200203
if job:
201204
logger.info('Found job on %s' % q)
202205
return job
@@ -306,6 +309,13 @@ def exists(cls, worker_id, resq):
306309
return resq.redis.sismember('resque:workers', worker_id)
307310

308311

312+
try:
313+
from setproctitle import setproctitle
314+
except ImportError:
315+
def setproctitle(name):
316+
pass
317+
318+
309319
if __name__ == "__main__":
310320
from optparse import OptionParser
311321
parser = OptionParser()

0 commit comments

Comments
 (0)