Skip to content

Commit 3b96917

Browse files
author
Matt George
committed
Merge branch 'logging_merge'
2 parents 36ccf06 + a5a0b60 commit 3b96917

File tree

4 files changed

+71
-28
lines changed

4 files changed

+71
-28
lines changed

pyres/__init__.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
from redis import Redis
44
import pyres.json_parser as json
55

6+
import logging
7+
68
def my_import(name):
79
"""Helper function for walking import calls when searching for classes by string names."""
810
mod = __import__(name)
@@ -73,10 +75,7 @@ class ResQ(object):
7375
attribute on it.
7476
7577
"""
76-
def __init__(self, server="localhost:6379", password=None,
77-
timeout=None, retry_connection=True):
78-
self.timeout = timeout
79-
#self.retry_connection = retry_connection
78+
def __init__(self, server="localhost:6379", password=None):
8079
self.redis = server
8180
if password:
8281
self.redis.auth(password)
@@ -133,14 +132,21 @@ def enqueue(self, klass, *args):
133132
134133
"""
135134
queue = getattr(klass,'queue', None)
136-
#print cls._res
137135
if queue:
138136
class_name = '%s.%s' % (klass.__module__, klass.__name__)
139-
#print class_name
140137
self.push(queue, {'class':class_name,'args':args})
141-
#Job.create(queue, klass,*args)
138+
logging.info("enqueued '%s' job" % class_name)
139+
if args:
140+
logging.debug("job arguments: %s" % args)
141+
else:
142+
logging.debug("no arguments passed in.")
143+
else:
144+
logging.warning("unable to enqueue job with class %s" % str(klass))
145+
142146
def enqueue_from_string(self, klass_as_string, queue, *args):
143147
self.push(queue, {'class':klass_as_string,'args':args})
148+
logging.info("enqueued '%s' job" % klass_as_string)
149+
logging.debug("job arguments: %s" % args)
144150

145151
def queues(self):
146152
return self.redis.smembers("resque:queues") or []
@@ -207,13 +213,11 @@ def decode(cls, item):
207213
@classmethod
208214
def _enqueue(cls, klass, *args):
209215
queue = getattr(klass,'queue', None)
210-
#print cls._res
211216
_self = cls()
212217
if queue:
213218
class_name = '%s.%s' % (klass.__module__, klass.__name__)
214-
#print class_name
215219
_self.push(queue, {'class':class_name,'args':args})
216-
#Job.create(queue, klass,*args)
220+
217221

218222
class Stat(object):
219223
"""A Stat class which shows the current status of the queue.

pyres/job.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ def __init__(self, queue, payload, resq, worker=None):
2222
self.resq = resq
2323
self._worker = worker
2424

25+
def __str__(self):
26+
return "(Job{%s} | %s | %s)" % (
27+
self._queue, self._payload['class'], repr(self._payload['args']))
28+
2529
def perform(self):
2630
"""This method converts payload into args and calls the ``perform`` method
2731
on the payload class.

pyres/worker.py

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from pyres.exceptions import NoQueueError
22
from pyres.job import Job
33
from pyres import ResQ, Stat
4+
import logging
45
import signal
56
import datetime
67
import os, sys
@@ -80,7 +81,7 @@ def schedule_shutdown(self, signum, frame):
8081

8182
def kill_child(self, signum, frame):
8283
if self.child:
83-
print "Killing child at %s" % self.child
84+
logging.info("Killing child at %s" % self.child)
8485
os.kill(self.child, signal.SIGKILL)
8586

8687
def __str__(self):
@@ -105,24 +106,25 @@ def work(self, interval=5):
105106
self.startup()
106107
while True:
107108
if self._shutdown:
108-
print 'shutdown scheduled'
109+
logging.info('shutdown scheduled')
109110
break
110111
job = self.reserve()
111112
if job:
112-
print "got: %s" % job
113+
logging.info('picked up job')
114+
logging.debug('job details: %s' % job)
113115
self.child = os.fork()
114116
if self.child:
115-
print 'Forked %s at %s' % (self.child, datetime.datetime.now())
117+
logging.info('Forked %s at %s' % (self.child, datetime.datetime.now()))
116118
try:
117119
os.waitpid(self.child, 0)
118120
except OSError, ose:
119121
import errno
120122
if ose.errno != errno.EINTR:
121123
raise ose
122124
#os.wait()
123-
print 'Done waiting'
125+
logging.debug('done waiting')
124126
else:
125-
print 'Processing %s since %s' % (job._queue, datetime.datetime.now())
127+
logging.info('Processing %s since %s' % (job._queue, datetime.datetime.now()))
126128
self.process(job)
127129
os._exit(0)
128130
self.child = None
@@ -140,36 +142,37 @@ def process(self, job=None):
140142
job.perform()
141143
except Exception, e:
142144
exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
143-
print "%s failed: %s" % (job, e)
145+
logging.error("%s failed: %s" % (job, e))
144146
job.fail(exceptionTraceback)
145147
self.failed()
146148
else:
147-
print "done: %s" % job
149+
logging.info('completed job')
150+
logging.debug('job details: %s' % job)
148151
finally:
149152
self.done_working()
150153

151154
def reserve(self):
152155
for q in self.queues:
153-
print "Checking %s" % q
156+
logging.debug('checking queue %s' % q)
154157
job = Job.reserve(q, self.resq, self.__str__())
155158
if job:
156-
print "Found job on %s" % q
159+
logging.info('Found job on %s' % q)
157160
return job
158161

159162
def working_on(self, job):
160-
print 'marking as working on'
163+
logging.debug('marking as working on')
161164
data = {
162165
'queue': job._queue,
163166
'run_at': str(datetime.datetime.now()),
164167
'payload': job._payload
165168
}
166169
data = json.dumps(data)
167170
self.resq.redis["resque:worker:%s" % str(self)] = data
168-
print "worker:%s" % str(self)
169-
print self.resq.redis["resque:worker:%s" % str(self)]
171+
logging.debug("worker:%s" % str(self))
172+
logging.debug(self.resq.redis["resque:worker:%s" % str(self)])
170173

171174
def done_working(self):
172-
print 'done working'
175+
logging.info('done working')
173176
self.processed()
174177
self.resq.redis.delete("resque:worker:%s" % str(self))
175178

@@ -203,9 +206,12 @@ def state(self):
203206
return 'working' if self.resq.redis.exists('resque:worker:%s' % self) else 'idle'
204207

205208
@classmethod
206-
def run(cls, queues, server):
209+
def run(cls, queues, server, interval):
207210
worker = cls(queues=queues, server=server)
208-
worker.work()
211+
if interval is not None:
212+
worker.work(interval)
213+
else:
214+
worker.work()
209215

210216
@classmethod
211217
def all(cls, host="localhost:6379"):

scripts/pyres_worker

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,48 @@
11
#!/usr/bin/env python
2+
3+
import logging
4+
5+
from optparse import OptionParser
26
from pyres.worker import Worker
7+
8+
LOG_LEVELS = {
9+
'debug': logging.DEBUG,
10+
'info': logging.INFO,
11+
'warning': logging.WARNING,
12+
'error': logging.ERROR,
13+
'critical': logging.CRITICAL
14+
}
15+
316
def main():
4-
from optparse import OptionParser
17+
518
usage = "usage: %prog [options] arg1"
619
parser = OptionParser(usage=usage)
720
#parser.add_option("-q", dest="queue_list")
821
parser.add_option("--host", dest="host", default="localhost")
922
parser.add_option("--port",dest="port",type="int", default=6379)
23+
parser.add_option("-i", '--interval', dest='interval', default=None, help='the default time interval to sleep between runs')
24+
parser.add_option('-l', '--log-level', dest='log_level', default=LOG_LEVELS['info'], help='log level. Valid values are "debug", "info", "warning", "error", "critical", in decreasing order of verbosity. Defaults to "info" if parameter not specified.')
1025
(options,args) = parser.parse_args()
26+
1127
if len(args) != 1:
1228
parser.print_help()
1329
parser.error("Argument must be a comma seperated list of queues")
30+
31+
if options.log_level not in LOG_LEVELS.keys():
32+
parser.print_help()
33+
parser.error("invalid log level specified")
34+
else:
35+
log_level = LOG_LEVELS[options.log_level]
36+
37+
logging.basicConfig(level=log_level, format="%(asctime)s: %(levelname)s: %(message)s")
38+
39+
interval = options.interval
40+
if interval is not None:
41+
interval = float(interval)
42+
1443
queues = args[0].split(',')
1544
server = '%s:%s' % (options.host,options.port)
16-
Worker.run(queues, server)
45+
Worker.run(queues, server, interval)
1746

1847
if __name__ == "__main__":
1948
main()

0 commit comments

Comments
 (0)