Skip to content

Commit 0609cae

Browse files
committed
saving into extentions branch.
1 parent a346a39 commit 0609cae

File tree

7 files changed

+54
-6
lines changed

7 files changed

+54
-6
lines changed

pyres/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ def _load_after_fork(mod):
9696
_after_forks = map(_load_after_fork, _modules)
9797
return _before_jobs, _after_jobs, _before_forks, _after_forks
9898

99-
_before_jobs, _after_jobs, _before_forks, _after_forks = _load_extensions()
99+
#_before_jobs, _after_jobs, _before_forks, _after_forks = _load_extensions()
100100

101101
class ResQ(object):
102102
"""The ResQ class defines the Redis server object to which we will

pyres/ext.py

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
import pkgutil
2-
import pyresext
2+
import logging
3+
#import pyresext
34

45
def simple_extensions():
6+
import pyresext
57
extension_modules = [name for _, name, _ in pkgutil.iter_modules([pyresext.__name__])]
68
def _load_before_job(mod):
79
func = getattr(mod,'before_job', None)
810
if func:
911
return func
12+
1013
def _load_after_job(mod):
1114
func = getattr(mod,'after_job', None)
1215
if func:
@@ -21,7 +24,34 @@ def _load_after_fork(mod):
2124
func = getattr(mod,'after_fork', None)
2225
if func:
2326
return func
24-
27+
28+
before_job_functions = map(_load_before_job, extension_modules)
29+
after_job_functions = map(_load_after_job, extension_modules)
30+
before_fork_functions = map(_load_before_fork, extension_modules)
31+
after_fork_functions = map(_load_after_fork, extension_modules)
32+
33+
return before_job_functions, after_job_functions, before_fork_functions, after_fork_functions
34+
35+
before_job_functions, after_job_functions, before_fork_functions, after_fork_functions = simple_extensions()
36+
37+
def _run_job_functions(job, func_list):
38+
for func in func_list:
39+
try:
40+
func(job)
41+
except:
42+
logging.exception('Error running %s' % func)
43+
44+
def run_before_job(job):
45+
_run_job_functions(job, before_job_functions)
46+
47+
def run_after_job(job):
48+
_run_job_functions(job, after_job_functions)
49+
50+
def run_before_fork(job):
51+
_run_job_functions(job, before_fork_functions)
52+
53+
def run_after_fork(job):
54+
_run_job_functions(job, after_fork_functions)
2555

2656
def _load_extensions(package_name='pyresext'):
2757
"""Loads modules from the pyresext."""
@@ -56,5 +86,5 @@ def _load_after_fork(mod):
5686
_after_forks = map(_load_after_fork, _modules)
5787
return _before_jobs, _after_jobs, _before_forks, _after_forks
5888

59-
_before_jobs, _after_jobs, _before_forks, _after_forks = _load_extensions()
89+
#_before_jobs, _after_jobs, _before_forks, _after_forks = _load_extensions()
6090

pyres/job.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@ def perform(self):
3737
on the payload class.
3838
3939
"""
40-
payload_class_str = self._payload["class"]
41-
payload_class = safe_str_to_class(payload_class_str)
40+
payload_class = self.job_class()
4241
payload_class.resq = self.resq
4342
args = self._payload.get("args", None)
4443
try:
@@ -71,6 +70,11 @@ def retry(self, payload_class, args):
7170
return True
7271
return False
7372

73+
def job_class(self):
74+
payload_class_str = self._payload["class"]
75+
payload_class = safe_str_to_class(payload_class_str)
76+
return payload_class
77+
7478
@classmethod
7579
def reserve(cls, queue, res, worker=None):
7680
"""Reserve a job on the queue. This marks this job so that other workers

pyres/worker.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from pyres.exceptions import NoQueueError
22
from pyres.job import Job
33
from pyres import ResQ, Stat, __version__
4+
from pyres.ext import run_before_job, run_after_job, run_before_fork, run_after_fork
45
import logging
56
import signal
67
import datetime, time
@@ -132,6 +133,7 @@ def work(self, interval=5):
132133
if job:
133134
logging.info('picked up job')
134135
logging.debug('job details: %s' % job)
136+
run_before_fork(job)
135137
self.child = os.fork()
136138
if self.child:
137139
setproctitle("pyres_worker%s: Forked %s at %s" % (__version__, self.child, datetime.datetime.now()))
@@ -145,6 +147,7 @@ def work(self, interval=5):
145147
#os.wait()
146148
logging.debug('done waiting')
147149
else:
150+
run_after_fork(job)
148151
setproctitle("pyres_worker-%s: Processing %s since %s" % (__version__, job._queue, datetime.datetime.now()))
149152
logging.info('Processing %s since %s' % (job._queue, datetime.datetime.now()))
150153
self.process(job)
@@ -163,6 +166,7 @@ def process(self, job=None):
163166
job = self.reserve()
164167
try:
165168
self.working_on(job)
169+
run_before_job(job)
166170
return job.perform()
167171
except Exception, e:
168172
exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
@@ -173,6 +177,7 @@ def process(self, job=None):
173177
logging.info('completed job')
174178
logging.debug('job details: %s' % job)
175179
finally:
180+
run_after_job(job)
176181
self.done_working()
177182

178183
def reserve(self):

pyresext/__init__.py

Whitespace-only changes.

pyresext/example.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
def before_job(job):
2+
print 'Before job'
3+
return True

tests/test_extensions.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from tests import PyResTests, Basic, TestProcess, ReturnAllArgsJob
2+
3+
class ExtTests(PyResTests):
4+
def test_before_job(self):
5+
6+
assert False

0 commit comments

Comments
 (0)