diff --git a/.gitignore b/.gitignore index ee342e9..e3a0b9e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,10 @@ *.pyc build/ .coverage +*.report *.egg-info logs/ -dist/ \ No newline at end of file +dist/ +*.swp +*.swo +.tox/ diff --git a/.travis.yml b/.travis.yml index c8a616d..3a1cd4e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,10 +2,13 @@ language: python python: - "2.6" - "2.7" + - "3.3" + - "3.4" # - "pypy" # command to install dependencies install: - - pip install -r requirements-test.txt --use-mirrors - - pip install -r requirements.txt --use-mirrors + - python setup.py install # command to run tests -script: nosetests +script: python setup.py test +services: + - redis-server diff --git a/HISTORY.md b/HISTORY.md index 53e3ad1..7236716 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,3 +1,11 @@ +##1.4.2 (2013-06-21) +* __str__ returns correctly with dsn +* worker_pids returns correct set of workers +* workers are re-registered on every job +* add exception metadata for after_perform method +* logger no longer overrides root logger +* support for redis db in dsn + ##1.4.1 (2012-07-30) * fix for non existent system signal for linux * cleanup of setup.py and requirements diff --git a/LICENSE b/LICENSE index 22c6364..f9725b0 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2009 Matt George +Copyright (c) 2009-2013 Matt George Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the diff --git a/README.markdown b/README.markdown index 0af6efb..ef8d07c 100644 --- a/README.markdown +++ b/README.markdown @@ -10,7 +10,7 @@ Because of some differences between ruby and python, there are a couple of place ## Travis CI -Currently, pyres is being tested via travis ci for python version 2.6, 2.7, and pypy: +Currently, pyres is being tested via travis ci for python version 2.6, 2.7, and 3.3: [![Build Status](https://secure.travis-ci.org/binarydud/pyres.png)](http://travis-ci.org/binarydud/pyres) ## Running Tests diff --git a/coverage.report b/coverage.report deleted file mode 100644 index d9c33d7..0000000 --- a/coverage.report +++ /dev/null @@ -1,11 +0,0 @@ -Name Stmts Exec Cover Missing ------------------------------------------------- -pyres 138 131 94% 26, 39, 98, 133-134, 144-145 -pyres.exceptions 2 2 100% -pyres.failure 23 22 95% 41 -pyres.job 23 23 100% -pyres.worker 189 146 77% 66, 74, 84-112, 161, 179, 186, 230-241 ------------------------------------------------- -TOTAL 375 324 86% ----------------------------------------------------------------------- -Ran 32 tests in 0.884s \ No newline at end of file diff --git a/docs/source/example.rst b/docs/source/example.rst index 4c67e23..33d8c93 100644 --- a/docs/source/example.rst +++ b/docs/source/example.rst @@ -1,7 +1,7 @@ Example ========= -Let's take a real wold example of a blog where comments need to be checked for +Let's take a real world example of a blog where comments need to be checked for spam. When the comment is saved in the database, we create a job in the queue with that comment data. Let's take a django model in this case. @@ -33,7 +33,9 @@ You can convert your existing class to be compatible with pyres. All you need to do is add a :attr:`queue` attribute and define a :meth:`perform` method on the class. -To insert a job into the queue you need to do something like this:: +To insert a job into the queue you need to do something like this: + +.. code-block:: python >>> from pyres import ResQ >>> r = ResQ() @@ -47,4 +49,3 @@ In the **scripts** folder there is an executable:: Just pass a comma separated list of queues the worker should poll. - diff --git a/pyres/__init__.py b/pyres/__init__.py index 69b1462..011cd88 100644 --- a/pyres/__init__.py +++ b/pyres/__init__.py @@ -1,6 +1,7 @@ -__version__ = '1.4.1' +__version__ = '1.5' from redis import Redis +from pyres.compat import string_types import pyres.json_parser as json import os @@ -89,6 +90,15 @@ def safe_str_to_class(s): klass = lst[-1] mod_list = lst[:-1] module = ".".join(mod_list) + + # ruby compatibility kludge: resque sends just a class name and + # not a module name so if I use resque to queue a ruby class + # called "Worker" then pyres will throw a "ValueError: Empty + # module name" exception. To avoid that, if there's no module in + # the json then we'll use the classname as a module name. + if not module: + module = klass + mod = my_import(module) if hasattr(mod, klass): return getattr(mod, klass) @@ -120,14 +130,6 @@ class ResQ(object): ``password`` -- The password, if required, of your Redis server. Default is "None". - ``timeout`` -- The timeout keyword is in the signature, but is unused. Default is "None". - - ``retry_connection`` -- This keyword is in the signature but is deprecated. Default is "True". - - - Both ``timeout`` and ``retry_connection`` will be removed as the python-redis client - no longer uses them. - Example usage:: >>> from pyres import * @@ -152,13 +154,13 @@ def push(self, queue, item): self.redis.rpush("resque:queue:%s" % queue, ResQ.encode(item)) def pop(self, queues, timeout=10): - if isinstance(queues, basestring): + if isinstance(queues, string_types): queues = [queues] ret = self.redis.blpop(["resque:queue:%s" % q for q in queues], timeout=timeout) if ret: key, ret = ret - return key[13:], ResQ.decode(ret) # trim "resque:queue:" + return key[13:].decode(), ResQ.decode(ret) # trim "resque:queue:" else: return None, None @@ -186,7 +188,7 @@ def _get_redis(self): return self._redis def _set_redis(self, server): - if isinstance(server, basestring): + if isinstance(server, string_types): self.dsn = server address, _, db = server.partition('/') host, port = address.split(':') @@ -231,7 +233,10 @@ def enqueue_from_string(self, klass_as_string, queue, *args, **kwargs): logger.debug("no arguments passed in.") def queues(self): - return self.redis.smembers("resque:queues") or [] + return [sm.decode() for sm in self.redis.smembers("resque:queues")] or [] + + def workers(self): + return [w.decode() for w in self.redis.smembers("resque:workers")] or [] def info(self): """Returns a dictionary of the current status of the pending jobs, @@ -252,7 +257,7 @@ def info(self): } def keys(self): - return [key.replace('resque:','') + return [key.decode().replace('resque:','') for key in self.redis.keys('resque:*')] def reserve(self, queues): @@ -260,11 +265,7 @@ def reserve(self, queues): return Job.reserve(queues, self) def __str__(self): - return "PyRes Client connected to %s" % self.redis.server - - def workers(self): - from pyres.worker import Worker - return Worker.all(self) + return "PyRes Client connected to %s" % self.dsn def working(self): from pyres.worker import Worker @@ -312,7 +313,7 @@ def delayed_queue_schedule_size(self): size = 0 length = self.redis.zcard('resque:delayed_queue_schedule') for i in self.redis.zrange('resque:delayed_queue_schedule',0,length): - size += self.delayed_timestamp_size(i) + size += self.delayed_timestamp_size(i.decode()) return size def delayed_timestamp_size(self, timestamp): @@ -322,11 +323,13 @@ def delayed_timestamp_size(self, timestamp): def next_delayed_timestamp(self): key = int(time.mktime(ResQ._current_time().timetuple())) array = self.redis.zrangebyscore('resque:delayed_queue_schedule', - '-inf', key) + '-inf', key, start=0, num=1) timestamp = None if array: timestamp = array[0] - return timestamp + + if timestamp: + return timestamp.decode() def next_item_for_timestamp(self, timestamp): #key = int(time.mktime(timestamp.timetuple())) @@ -346,10 +349,10 @@ def encode(cls, item): @classmethod def decode(cls, item): - if isinstance(item, basestring): - ret = json.loads(item) - return ret - return None + if not isinstance(item, string_types): + item = item.decode() + ret = json.loads(item) + return ret @classmethod def _enqueue(cls, klass, *args): diff --git a/pyres/compat.py b/pyres/compat.py new file mode 100644 index 0000000..c39fd3f --- /dev/null +++ b/pyres/compat.py @@ -0,0 +1,30 @@ +import sys +import types + +try: + import cPickle as pickle +except ImportError: # pragma: no cover + import pickle + +# True if we are running on Python 3. +PY3 = sys.version_info[0] == 3 + +if PY3: # pragma: no cover + string_types = str, + integer_types = int, + class_types = type, + text_type = str + binary_type = bytes + long = int + import subprocess as commands + +else: + string_types = basestring, + integer_types = (int, long) + class_types = (type, types.ClassType) + text_type = unicode + binary_type = str + long = long + import commands + + diff --git a/pyres/failure/base.py b/pyres/failure/base.py index c902299..330fbe4 100644 --- a/pyres/failure/base.py +++ b/pyres/failure/base.py @@ -17,10 +17,14 @@ class BaseBackend(object): """ def __init__(self, exp, queue, payload, worker=None): - excc, _, tb = sys.exc_info() + excc = sys.exc_info()[0] self._exception = excc - self._traceback = traceback.format_exc() + try: + self._traceback = traceback.format_exc() + except AttributeError: + self._traceback = None + self._worker = worker self._queue = queue self._payload = payload diff --git a/pyres/failure/multiple.py b/pyres/failure/multiple.py index e4d05f7..6362363 100644 --- a/pyres/failure/multiple.py +++ b/pyres/failure/multiple.py @@ -1,5 +1,5 @@ -from base import BaseBackend -from redis import RedisBackend +from pyres.failure.base import BaseBackend +from pyres.failure.redis import RedisBackend class MultipleBackend(BaseBackend): """Extends ``BaseBackend`` to provide support for delegating calls to multiple diff --git a/pyres/failure/redis.py b/pyres/failure/redis.py index 715f9de..5fe71ee 100644 --- a/pyres/failure/redis.py +++ b/pyres/failure/redis.py @@ -1,7 +1,7 @@ import datetime, time from base64 import b64encode -from base import BaseBackend +from .base import BaseBackend from pyres import ResQ class RedisBackend(BaseBackend): diff --git a/pyres/horde.py b/pyres/horde.py index 0b47047..b41b7a4 100644 --- a/pyres/horde.py +++ b/pyres/horde.py @@ -10,8 +10,12 @@ import logging.handlers from pyres import ResQ, Stat, get_logging_handler, special_log_file from pyres.exceptions import NoQueueError -from pyres.utils import OrderedDict +try: + from collections import OrderedDict +except ImportError: + from ordereddict import OrderedDict from pyres.job import Job +from pyres.compat import string_types import pyres.json_parser as json try: from setproctitle import setproctitle @@ -20,7 +24,7 @@ def setproctitle(name): pass def setup_logging(procname, namespace='', log_level=logging.INFO, log_file=None): - + logger = multiprocessing.get_logger() #logger = multiprocessing.log_to_stderr() logger.setLevel(log_level) @@ -29,63 +33,67 @@ def setup_logging(procname, namespace='', log_level=logging.INFO, log_file=None) return logger class Minion(multiprocessing.Process): - def __init__(self, queues, server, password, log_level=logging.INFO, log_path=None): + def __init__(self, queues, server, password, log_level=logging.INFO, log_path=None, interval=5, concat_logs=False, + max_jobs=0): multiprocessing.Process.__init__(self, name='Minion') - + #format = '%(asctime)s %(levelname)s %(filename)s-%(lineno)d: %(message)s' #logHandler = logging.StreamHandler() #logHandler.setFormatter(logging.Formatter(format)) #self.logger = multiprocessing.get_logger() #self.logger.addHandler(logHandler) #self.logger.setLevel(logging.DEBUG) - + self.queues = queues self._shutdown = False self.hostname = os.uname()[1] self.server = server self.password = password - + self.interval = interval + self.log_level = log_level self.log_path = log_path self.log_file = None - + self.concat_logs = concat_logs + self.max_jobs = max_jobs + def prune_dead_workers(self): pass - + def schedule_shutdown(self, signum, frame): self._shutdown = True - + def register_signal_handlers(self): signal.signal(signal.SIGTERM, self.schedule_shutdown) signal.signal(signal.SIGINT, self.schedule_shutdown) signal.signal(signal.SIGQUIT, self.schedule_shutdown) - + def register_minion(self): self.resq.redis.sadd('resque:minions',str(self)) self.started = datetime.datetime.now() - + def startup(self): self.register_signal_handlers() self.prune_dead_workers() self.register_minion() - + def __str__(self): return '%s:%s:%s' % (self.hostname, self.pid, ','.join(self.queues)) - + def reserve(self): self.logger.debug('checking queues: %s' % self.queues) job = Job.reserve(self.queues, self.resq, self.__str__()) if job: self.logger.info('Found job on %s' % job._queue) return job - + def process(self, job): if not job: return try: self.working_on(job) job.perform() - except Exception, e: + except Exception as e: exceptionType, exceptionValue, exceptionTraceback = sys.exc_info() self.logger.error("%s failed: %s" % (job, e)) job.fail(exceptionTraceback) @@ -95,7 +103,7 @@ def process(self, job): self.logger.info('completed job: %s' % job) finally: self.done_working() - + def working_on(self, job): setproctitle('pyres_minion:%s: working on job: %s' % (os.getppid(), job._payload)) self.logger.debug('marking as working on') @@ -108,66 +116,78 @@ def working_on(self, job): self.resq.redis["resque:minion:%s" % str(self)] = data self.logger.debug("minion:%s" % str(self)) #self.logger.debug(self.resq.redis["resque:minion:%s" % str(self)]) - + def failed(self): Stat("failed", self.resq).incr() - + def processed(self): total_processed = Stat("processed", self.resq) total_processed.incr() - + def done_working(self): self.logger.debug('done working') self.processed() self.resq.redis.delete("resque:minion:%s" % str(self)) - + def unregister_minion(self): self.resq.redis.srem('resque:minions',str(self)) self.started = None - + def work(self, interval=5): - + self.startup() + cur_job = 0 while True: setproctitle('pyres_minion:%s: waiting for job on: %s' % (os.getppid(),self.queues)) self.logger.info('waiting on job') if self._shutdown: self.logger.info('shutdown scheduled') break + self.logger.debug('max_jobs: %d cur_jobs: %d' % (self.max_jobs, cur_job)) + if (self.max_jobs > 0 and self.max_jobs < cur_job): + self.logger.debug('max_jobs reached on %s: %d' % (self.pid, cur_job)) + self.logger.debug('minion sleeping for: %d secs' % interval) + time.sleep(interval) + cur_job = 0 job = self.reserve() if job: self.process(job) + cur_job = cur_job + 1 else: + cur_job = 0 + self.logger.debug('minion sleeping for: %d secs' % interval) time.sleep(interval) self.unregister_minion() - + def clear_logger(self): for handler in self.logger.handlers: self.logger.removeHandler(handler) - + def run(self): setproctitle('pyres_minion:%s: Starting' % (os.getppid(),)) if self.log_path: if special_log_file(self.log_path): self.log_file = self.log_path + elif self.concat_logs: + self.log_file = os.path.join(self.log_path, 'minion.log') else: self.log_file = os.path.join(self.log_path, 'minion-%s.log' % self.pid) namespace = 'minion:%s' % self.pid self.logger = setup_logging('minion', namespace, self.log_level, self.log_file) #self.clear_logger() - if isinstance(self.server,basestring): + if isinstance(self.server,string_types): self.resq = ResQ(server=self.server, password=self.password) elif isinstance(self.server, ResQ): self.resq = self.server else: raise Exception("Bad server argument") - - - self.work() + + + self.work(self.interval) #while True: # job = self.q.get() # print 'pid: %s is running %s ' % (self.pid,job) - + class Khan(object): _command_map = { @@ -175,7 +195,8 @@ class Khan(object): 'REMOVE': '_remove_minion', 'SHUTDOWN': '_schedule_shutdown' } - def __init__(self, pool_size=5, queues=[], server='localhost:6379', password=None, logging_level=logging.INFO, log_file=None): + def __init__(self, pool_size=5, queues=[], server='localhost:6379', password=None, logging_level=logging.INFO, + log_file=None, minions_interval=5, concat_minions_logs=False, max_jobs=0): #super(Khan,self).__init__(queues=queues,server=server,password=password) self._shutdown = False self.pool_size = int(pool_size) @@ -189,28 +210,31 @@ def __init__(self, pool_size=5, queues=[], server='localhost:6379', password=Non self.password = password self.logging_level = logging_level self.log_file = log_file - + self.minions_interval = minions_interval + self.concat_minions_logs = concat_minions_logs + self.max_jobs = max_jobs + #self._workers = list() - + def setup_resq(self): if hasattr(self,'logger'): self.logger.info('Connecting to redis server - %s' % self.server) - if isinstance(self.server,basestring): + if isinstance(self.server,string_types): self.resq = ResQ(server=self.server, password=self.password) elif isinstance(self.server, ResQ): self.resq = self.server else: raise Exception("Bad server argument") - + def validate_queues(self): "Checks if a worker is given atleast one queue to work on." if not self.queues: raise NoQueueError("Please give each worker at least one queue.") - + def startup(self): self.register_signal_handlers() - - + + def register_signal_handlers(self): signal.signal(signal.SIGTERM, self.schedule_shutdown) signal.signal(signal.SIGINT, self.schedule_shutdown) @@ -252,23 +276,23 @@ def current_state(self): def _schedule_shutdown(self): self.schedule_shutdown(None, None) - + def schedule_shutdown(self, signum, frame): self.logger.info('Khan Shutdown scheduled') self._shutdown = True - + def kill_child(self, signum, frame): self._remove_minion() - + def add_child(self, signum, frame): self.add_minion() - + def register_khan(self): if not hasattr(self, 'resq'): self.setup_resq() self.resq.redis.sadd('resque:khans',str(self)) self.started = datetime.datetime.now() - + def _check_commands(self): if not self._shutdown: self.logger.debug('Checking commands') @@ -277,7 +301,7 @@ def _check_commands(self): if command: self.process_command(command) self._check_commands() - + def process_command(self, command): self.logger.info('Processing Command') #available commands, shutdown, add 1, remove 1 @@ -286,13 +310,13 @@ def process_command(self, command): fn = getattr(self, command_item) if fn: fn() - + def add_minion(self): self._add_minion() self.resq.redis.srem('resque:khans',str(self)) self.pool_size += 1 self.resq.redis.sadd('resque:khans',str(self)) - + def _add_minion(self): if hasattr(self,'logger'): self.logger.info('Adding minion') @@ -303,13 +327,15 @@ def _add_minion(self): log_path = os.path.dirname(self.log_file) else: log_path = None - m = Minion(self.queues, self.server, self.password, log_level=self.logging_level, log_path=log_path) + m = Minion(self.queues, self.server, self.password, interval=self.minions_interval, + log_level=self.logging_level, log_path=log_path, concat_logs=self.concat_minions_logs, + max_jobs=self.max_jobs) m.start() self._workers[m.pid] = m if hasattr(self,'logger'): self.logger.info('minion added at: %s' % m.pid) return m - + def _shutdown_minions(self): """ send the SIGNINT signal to each worker in the pool. @@ -318,7 +344,7 @@ def _shutdown_minions(self): for minion in self._workers.values(): minion.terminate() minion.join() - + def _remove_minion(self, pid=None): #if pid: # m = self._workers.pop(pid) @@ -328,20 +354,20 @@ def _remove_minion(self, pid=None): self.pool_size -= 1 self.resq.redis.sadd('resque:khans',str(self)) return m - + def unregister_khan(self): if hasattr(self,'logger'): self.logger.debug('unregistering khan') self.resq.redis.srem('resque:khans',str(self)) self.started = None - + def setup_minions(self): for i in range(self.pool_size): self._add_minion() def _setup_logging(self): self.logger = setup_logging('khan', 'khan', self.logging_level, self.log_file) - + def work(self, interval=2): setproctitle('pyres_manager: Starting') self.startup() @@ -361,17 +387,21 @@ def work(self, interval=2): break #get job else: + self.logger.debug('manager sleeping for: %d secs' % interval) time.sleep(interval) self.unregister_khan() - + def __str__(self): hostname = os.uname()[1] return '%s:%s:%s' % (hostname, self.pid, self.pool_size) - + @classmethod - def run(cls, pool_size=5, queues=[], server='localhost:6379', logging_level=logging.INFO, log_file=None): - worker = cls(pool_size=pool_size, queues=queues, server=server, logging_level=logging_level, log_file=log_file) - worker.work() + def run(cls, pool_size=5, queues=[], server='localhost:6379', password=None, interval=2, + logging_level=logging.INFO, log_file=None, minions_interval=5, concat_minions_logs=False, max_jobs=0): + worker = cls(pool_size=pool_size, queues=queues, server=server, password=password, logging_level=logging_level, + log_file=log_file, minions_interval=minions_interval, concat_minions_logs=concat_minions_logs, + max_jobs=max_jobs) + worker.work(interval=interval) #if __name__ == "__main__": # k = Khan() diff --git a/pyres/job.py b/pyres/job.py index 5f88a0e..4f4b547 100644 --- a/pyres/job.py +++ b/pyres/job.py @@ -4,7 +4,7 @@ from pyres import ResQ, safe_str_to_class from pyres import failure from pyres.failure.redis import RedisBackend - +from pyres.compat import string_types class Job(object): """Every job on the ResQ is an instance of the *Job* class. @@ -26,7 +26,7 @@ class Job(object): """ safe_str_to_class = staticmethod(safe_str_to_class) - + def __init__(self, queue, payload, resq, worker=None): self._queue = queue self._payload = payload @@ -82,7 +82,6 @@ def perform(self): payload_class.before_perform(metadata) return payload_class.perform(*args) except Exception as e: - check_after = False metadata["failed"] = True metadata["exception"] = e if not self.retry(payload_class, args): @@ -93,8 +92,10 @@ def perform(self): logging.exception("Retry scheduled after error in %s", self._payload) finally: after_perform = getattr(payload_class, "after_perform", None) - if after_perform and check_after: + + if after_perform: payload_class.after_perform(metadata) + delattr(payload_class,'resq') def fail(self, exception): @@ -111,7 +112,7 @@ def retry(self, payload_class, args): """This method provides a way to retry a job after a failure. If the jobclass defined by the payload containes a ``retry_every`` attribute then pyres will attempt to retry the job until successful or until timeout defined by ``retry_timeout`` on the payload class. - + """ retry_every = getattr(payload_class, 'retry_every', None) retry_timeout = getattr(payload_class, 'retry_timeout', 0) @@ -133,7 +134,7 @@ def reserve(cls, queues, res, worker=None, timeout=10): that other workers will not pick it up. """ - if isinstance(queues, basestring): + if isinstance(queues, string_types): queues = [queues] queue, payload = res.pop(queues, timeout=timeout) if payload: diff --git a/pyres/json_parser.py b/pyres/json_parser.py index be80fb6..a8d18e7 100644 --- a/pyres/json_parser.py +++ b/pyres/json_parser.py @@ -1,4 +1,5 @@ from datetime import datetime +from pyres.compat import string_types try: #import simplejson as json @@ -24,13 +25,13 @@ def decode(self, json_string): return self.convert(decoded) def convert(self, value): - if isinstance(value, basestring) and value.startswith(DATE_PREFIX): + if isinstance(value, string_types) and value.startswith(DATE_PREFIX): try: return datetime.strptime(value[len(DATE_PREFIX):], DATE_FORMAT) except ValueError: return value elif isinstance(value, dict): - for k, v in value.iteritems(): + for k, v in value.items(): new = self.convert(v) if new != v: value[k] = new diff --git a/pyres/scheduler.py b/pyres/scheduler.py index af5a4d6..11e4f58 100644 --- a/pyres/scheduler.py +++ b/pyres/scheduler.py @@ -3,6 +3,7 @@ import logging from pyres import ResQ, __version__ +from pyres.compat import string_types logger = logging.getLogger(__name__) @@ -14,7 +15,7 @@ def __init__(self, server="localhost:6379", password=None): >>> scheduler = Scheduler('localhost:6379') """ self._shutdown = False - if isinstance(server, basestring): + if isinstance(server, string_types): self.resq = ResQ(server=server, password=password) elif isinstance(server, ResQ): self.resq = server diff --git a/pyres/scripts.py b/pyres/scripts.py index 55c0854..ff2d466 100644 --- a/pyres/scripts.py +++ b/pyres/scripts.py @@ -13,12 +13,16 @@ def pyres_manager(): parser = OptionParser(usage=usage) #parser.add_option("-q", dest="queue_list") parser.add_option("--host", dest="host", default="localhost") - parser.add_option("--port",dest="port",type="int", default=6379) - parser.add_option("-i", '--interval', dest='interval', default=None, help='the default time interval to sleep between runs') + parser.add_option("--port", dest="port",type="int", default=6379) + parser.add_option("--password", dest="password", default=None) + parser.add_option("-i", '--interval', dest='manager_interval', default=None, help='the default time interval to sleep between runs - manager') + parser.add_option("--minions_interval", dest='minions_interval', default=None, help='the default time interval to sleep between runs - minions') parser.add_option('-l', '--log-level', dest='log_level', default='info', help='log level. Valid values are "debug", "info", "warning", "error", "critical", in decreasing order of verbosity. Defaults to "info" if parameter not specified.') parser.add_option("--pool", type="int", dest="pool_size", default=1, help="Number of minions to spawn under the manager.") + parser.add_option("-j", "--process_max_jobs", dest="max_jobs", type=int, default=0, help='how many jobs should be processed on worker run.') parser.add_option('-f', dest='logfile', help='If present, a logfile will be used. "stderr", "stdout", and "syslog" are all special values.') parser.add_option('-p', dest='pidfile', help='If present, a pidfile will be used.') + parser.add_option("--concat_minions_logs", action="store_true", dest="concat_minions_logs", help='Concat all minions logs on same file.') (options,args) = parser.parse_args() if len(args) != 1: @@ -27,16 +31,23 @@ def pyres_manager(): log_level = getattr(logging, options.log_level.upper(), 'INFO') #logging.basicConfig(level=log_level, format="%(asctime)s: %(levelname)s: %(message)s") - + concat_minions_logs = options.concat_minions_logs setup_pidfile(options.pidfile) - interval = options.interval - if interval is not None: - interval = float(interval) + manager_interval = options.manager_interval + if manager_interval is not None: + manager_interval = float(manager_interval) + + minions_interval = options.minions_interval + if minions_interval is not None: + minions_interval = float(minions_interval) queues = args[0].split(',') server = '%s:%s' % (options.host,options.port) - Khan.run(pool_size=options.pool_size, queues=queues, server=server, logging_level=log_level, log_file=options.logfile) + password = options.password + Khan.run(pool_size=options.pool_size, queues=queues, server=server, password=password, interval=manager_interval, + logging_level=log_level, log_file=options.logfile, minions_interval=minions_interval, + concat_minions_logs=concat_minions_logs, max_jobs=options.max_jobs) def pyres_scheduler(): @@ -44,7 +55,8 @@ def pyres_scheduler(): parser = OptionParser(usage=usage) #parser.add_option("-q", dest="queue_list") parser.add_option("--host", dest="host", default="localhost") - parser.add_option("--port",dest="port",type="int", default=6379) + parser.add_option("--port", dest="port",type="int", default=6379) + parser.add_option("--password", dest="password", default=None) parser.add_option('-l', '--log-level', dest='log_level', default='info', help='log level. Valid values are "debug", "info", "warning", "error", "critical", in decreasing order of verbosity. Defaults to "info" if parameter not specified.') parser.add_option('-f', dest='logfile', help='If present, a logfile will be used. "stderr", "stdout", and "syslog" are all special values.') parser.add_option('-p', dest='pidfile', help='If present, a pidfile will be used.') @@ -54,7 +66,8 @@ def pyres_scheduler(): setup_logging(procname="pyres_scheduler", log_level=log_level, filename=options.logfile) setup_pidfile(options.pidfile) server = '%s:%s' % (options.host, options.port) - Scheduler.run(server) + password = options.password + Scheduler.run(server, password) def pyres_worker(): @@ -62,7 +75,8 @@ def pyres_worker(): parser = OptionParser(usage=usage) parser.add_option("--host", dest="host", default="localhost") - parser.add_option("--port",dest="port",type="int", default=6379) + parser.add_option("--port", dest="port",type="int", default=6379) + parser.add_option("--password", dest="password", default=None) parser.add_option("-i", '--interval', dest='interval', default=None, help='the default time interval to sleep between runs') parser.add_option('-l', '--log-level', dest='log_level', default='info', help='log level. Valid values are "debug", "info", "warning", "error", "critical", in decreasing order of verbosity. Defaults to "info" if parameter not specified.') parser.add_option('-f', dest='logfile', help='If present, a logfile will be used. "stderr", "stdout", and "syslog" are all special values.') @@ -86,4 +100,5 @@ def pyres_worker(): queues = args[0].split(',') server = '%s:%s' % (options.host,options.port) - Worker.run(queues, server, interval, timeout=timeout) + password = options.password + Worker.run(queues, server, password, interval, timeout=timeout) diff --git a/pyres/utils.py b/pyres/utils.py deleted file mode 100644 index d640c11..0000000 --- a/pyres/utils.py +++ /dev/null @@ -1,98 +0,0 @@ -from UserDict import DictMixin - -class OrderedDict(dict, DictMixin): - - def __init__(self, *args, **kwds): - if len(args) > 1: - raise TypeError('expected at most 1 arguments, got %d' % len(args)) - try: - self.__end - except AttributeError: - self.clear() - self.update(*args, **kwds) - - def clear(self): - self.__end = end = [] - end += [None, end, end] # sentinel node for doubly linked list - self.__map = {} # key --> [key, prev, next] - dict.clear(self) - - def __setitem__(self, key, value): - if key not in self: - end = self.__end - curr = end[1] - curr[2] = end[1] = self.__map[key] = [key, curr, end] - dict.__setitem__(self, key, value) - - def __delitem__(self, key): - dict.__delitem__(self, key) - key, prev, next = self.__map.pop(key) - prev[2] = next - next[1] = prev - - def __iter__(self): - end = self.__end - curr = end[2] - while curr is not end: - yield curr[0] - curr = curr[2] - - def __reversed__(self): - end = self.__end - curr = end[1] - while curr is not end: - yield curr[0] - curr = curr[1] - - def popitem(self, last=True): - if not self: - raise KeyError('dictionary is empty') - key = reversed(self).next() if last else iter(self).next() - value = self.pop(key) - return key, value - - def __reduce__(self): - items = [[k, self[k]] for k in self] - tmp = self.__map, self.__end - del self.__map, self.__end - inst_dict = vars(self).copy() - self.__map, self.__end = tmp - if inst_dict: - return (self.__class__, (items,), inst_dict) - return self.__class__, (items,) - - def keys(self): - return list(self) - - setdefault = DictMixin.setdefault - update = DictMixin.update - pop = DictMixin.pop - values = DictMixin.values - items = DictMixin.items - iterkeys = DictMixin.iterkeys - itervalues = DictMixin.itervalues - iteritems = DictMixin.iteritems - - def __repr__(self): - if not self: - return '%s()' % (self.__class__.__name__,) - return '%s(%r)' % (self.__class__.__name__, self.items()) - - def copy(self): - return self.__class__(self) - - @classmethod - def fromkeys(cls, iterable, value=None): - d = cls() - for key in iterable: - d[key] = value - return d - - def __eq__(self, other): - if isinstance(other, OrderedDict): - return len(self)==len(other) and \ - all(p==q for p, q in zip(self.items(), other.items())) - return dict.__eq__(self, other) - - def __ne__(self, other): - return not self == other diff --git a/pyres/worker.py b/pyres/worker.py index 4874db8..fc42b12 100644 --- a/pyres/worker.py +++ b/pyres/worker.py @@ -2,14 +2,14 @@ import signal import datetime, time import os, sys -import json_parser as json -import commands +from pyres import json_parser as json +from pyres.compat import commands import random from pyres.exceptions import NoQueueError, JobError, TimeoutError, CrashError from pyres.job import Job from pyres import ResQ, Stat, __version__ - +from pyres.compat import string_types logger = logging.getLogger(__name__) @@ -34,7 +34,7 @@ def __init__(self, queues=(), server="localhost:6379", password=None, timeout=No self.hostname = os.uname()[1] self.timeout = timeout - if isinstance(server, basestring): + if isinstance(server, string_types): self.resq = ResQ(server=server, password=password) elif isinstance(server, ResQ): self.resq = server @@ -75,7 +75,7 @@ def unregister_worker(self): def prune_dead_workers(self): all_workers = Worker.all(self.resq) - known_workers = self.worker_pids() + known_workers = Worker.worker_pids() for worker in all_workers: host, pid, queues = worker.id.split(':') if host != self.hostname: @@ -257,7 +257,7 @@ def process(self, job=None): except Exception: job_failed = True self._handle_job_exception(job) - except SystemExit, e: + except SystemExit as e: if e.code != 0: job_failed = True self._handle_job_exception(job) @@ -329,16 +329,20 @@ def state(self): return 'working' return 'idle' - def worker_pids(self): + @classmethod + def worker_pids(cls): """Returns an array of all pids (as strings) of the workers on this machine. Used when pruning dead workers.""" - return map(lambda l: l.strip().split(' ')[0], - commands.getoutput("ps -A -o pid,command | \ - grep pyres_worker").split("\n")) + cmd = "ps -A -o pid,command | grep pyres_worker | grep -v grep" + output = commands.getoutput(cmd) + if output: + return map(lambda l: l.strip().split(' ')[0], output.split("\n")) + else: + return [] @classmethod - def run(cls, queues, server="localhost:6379", interval=None, timeout=None): - worker = cls(queues=queues, server=server, timeout=timeout) + def run(cls, queues, server="localhost:6379", password=None, interval=None, timeout=None): + worker = cls(queues=queues, server=server, password=password, timeout=timeout) if interval is not None: worker.work(interval) else: @@ -346,16 +350,16 @@ def run(cls, queues, server="localhost:6379", interval=None, timeout=None): @classmethod def all(cls, host="localhost:6379"): - if isinstance(host,basestring): + if isinstance(host,string_types): resq = ResQ(host) elif isinstance(host, ResQ): resq = host - return [Worker.find(w,resq) for w in resq.redis.smembers('resque:workers') or []] + return [Worker.find(w,resq) for w in resq.workers() or []] @classmethod def working(cls, host): - if isinstance(host, basestring): + if isinstance(host, string_types): resq = ResQ(host) elif isinstance(host, ResQ): resq = host diff --git a/requirements.txt b/requirements.txt index 05131d7..96046b5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ -simplejson>=2.0.9 -redis>=2.4.12 -setproctitle>=1.0 +simplejson>3.0 +redis>2.4.12 +setproctitle>1.0 diff --git a/setup.py b/setup.py index e534abe..e0597ed 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,28 @@ +import sys from setuptools import setup, find_packages +from setuptools.command.test import test as TestCommand -version='1.4.1' +requires=[ + item for item in + open("requirements.txt").read().split("\n") + if item] + +if sys.version_info[0:2] == (2,6): + requires.append('ordereddict') + +class PyTest(TestCommand): + def finalize_options(self): + TestCommand.finalize_options(self) + self.test_args = [] + self.test_suite = True + + def run_tests(self): + #import here, cause outside the eggs aren't loaded + import pytest + result = pytest.main(self.test_args) + sys.exit(result) + +version='1.5' setup( name='pyres', version=version, @@ -20,15 +42,17 @@ pyres_scheduler=pyres.scripts:pyres_scheduler pyres_worker=pyres.scripts:pyres_worker """, - install_requires=[ - item for item in - open("requirements.txt").read().split("\n") - if item], + tests_require=requires + ['pytest',], + cmdclass={'test': PyTest}, + install_requires=requires, classifiers = [ 'Development Status :: 4 - Beta', 'Environment :: Console', 'Intended Audience :: Developers', 'License :: OSI Approved :: MIT License', 'Operating System :: OS Independent', + 'Programming Language :: Python :: 2.6', + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3.3', 'Programming Language :: Python'], ) diff --git a/tests/__init__.py b/tests/__init__.py index a75685a..64f09eb 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -2,6 +2,14 @@ import os from pyres import ResQ, str_to_class +class tests(object): + queue = 'basic' + + @staticmethod + def perform(name): + s = "name:%s" % name + return s + class Basic(object): queue = 'basic' @@ -14,7 +22,7 @@ class BasicMulti(object): queue = 'basic' @staticmethod def perform(name, age): - print 'name: %s, age: %s' % (name, age) + print('name: %s, age: %s' % (name, age)) class ReturnAllArgsJob(object): @@ -97,7 +105,7 @@ class LongObject(object): def perform(sleep_time): import time time.sleep(sleep_time) - print 'Done Sleeping' + print('Done Sleeping') def test_str_to_class(): ret = str_to_class('tests.Basic') @@ -111,7 +119,9 @@ def test_safe_str_to_class(self): assert safe_str_to_class('tests.Basic') == Basic self.assertRaises(ImportError, safe_str_to_class, 'test.Mine') self.assertRaises(ImportError, safe_str_to_class, 'tests.World') - + # test that we'll use the class name as a module name if no + # module name is provided (for Ruby compatibility) + assert safe_str_to_class('tests') == tests class PyResTests(unittest.TestCase): def setUp(self): diff --git a/tests/test_resq.py b/tests/test_resq.py index 7412899..1559202 100644 --- a/tests/test_resq.py +++ b/tests/test_resq.py @@ -10,14 +10,14 @@ def test_enqueue(self): ResQ._enqueue(Basic, "test3") assert self.redis.llen("resque:queue:basic") == 3 assert self.redis.sismember('resque:queues','basic') - + def test_push(self): self.resq.push('pushq','content-newqueue') self.resq.push('pushq','content2-newqueue') assert self.redis.llen('resque:queue:pushq') == 2 - assert self.redis.lindex('resque:queue:pushq', 0) == ResQ.encode('content-newqueue') - assert self.redis.lindex('resque:queue:pushq', 1) == ResQ.encode('content2-newqueue') - + assert self.redis.lindex('resque:queue:pushq', 0).decode() == ResQ.encode('content-newqueue') + assert self.redis.lindex('resque:queue:pushq', 1).decode() == ResQ.encode('content2-newqueue') + def test_pop(self): self.resq.push('pushq','content-newqueue') self.resq.push('pushq','content2-newqueue') @@ -43,25 +43,25 @@ def test_pop_two_queues(self): assert self.redis.llen('resque:queue:pushq1') == 0 assert self.redis.llen('resque:queue:pushq2') == 0 assert self.resq.pop(['pushq1', 'pushq2'], timeout=1) == (None, None) - + def test_peek(self): self.resq.enqueue(Basic,"test1") self.resq.enqueue(Basic,"test2") assert len(self.resq.peek('basic',0,20)) == 2 - + def test_size(self): self.resq.enqueue(Basic,"test1") self.resq.enqueue(Basic,"test2") assert self.resq.size('basic') == 2 assert self.resq.size('noq') == 0 - + def test_redis_property(self): from redis import Redis rq = ResQ(server="localhost:6379") red = Redis() #rq2 = ResQ(server=red) self.assertRaises(Exception, rq.redis,[Basic]) - + def test_info(self): self.resq.enqueue(Basic,"test1") self.resq.enqueue(TestProcess) @@ -73,14 +73,14 @@ def test_info(self): worker.register_worker() info = self.resq.info() assert info['workers'] == 1 - + def test_workers(self): worker = Worker(['basic']) worker.register_worker() name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic') assert len(self.resq.workers()) == 1 #assert Worker.find(name, self.resq) in self.resq.workers() - + def test_enqueue_from_string(self): self.resq.enqueue_from_string('tests.Basic','basic','test1') name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic') @@ -91,7 +91,7 @@ def test_enqueue_from_string(self): assert not self.redis.get('resque:worker:%s' % worker) assert not self.redis.get("resque:stat:failed") assert not self.redis.get("resque:stat:failed:%s" % name) - + def test_remove_queue(self): self.resq.enqueue_from_string('tests.Basic','basic','test1') assert 'basic' in self.resq._watched_queues @@ -101,12 +101,12 @@ def test_remove_queue(self): assert 'basic' not in self.resq._watched_queues assert not self.redis.sismember('resque:queues','basic') assert not self.redis.exists('resque:queue:basic') - + def test_keys(self): self.resq.enqueue_from_string('tests.Basic','basic','test1') assert 'queue:basic' in self.resq.keys() assert 'queues' in self.resq.keys() - + def test_queues(self): assert self.resq.queues() == [] self.resq.enqueue_from_string('tests.Basic','basic','test1') diff --git a/tests/test_schedule.py b/tests/test_schedule.py index 27b21fb..903c825 100644 --- a/tests/test_schedule.py +++ b/tests/test_schedule.py @@ -73,4 +73,4 @@ def test_schedule_shutdown(self): scheduler = Scheduler(self.resq) scheduler.schedule_shutdown(19,'') assert scheduler._shutdown - \ No newline at end of file + diff --git a/tests/test_stats.py b/tests/test_stats.py index c49ed1a..b6cf291 100644 --- a/tests/test_stats.py +++ b/tests/test_stats.py @@ -4,22 +4,22 @@ class StatTests(PyResTests): def test_incr(self): stat_obj = Stat('test_stat', self.resq) stat_obj.incr() - assert self.redis.get('resque:stat:test_stat') == str(1) + assert self.redis.get('resque:stat:test_stat') == b'1' stat_obj.incr() - assert self.redis.get('resque:stat:test_stat') == str(2) + assert self.redis.get('resque:stat:test_stat') == b'2' stat_obj.incr(2) - assert self.redis.get('resque:stat:test_stat') == str(4) + assert self.redis.get('resque:stat:test_stat') == b'4' def test_decr(self): stat_obj = Stat('test_stat', self.resq) stat_obj.incr() stat_obj.incr() - assert self.redis.get('resque:stat:test_stat') == str(2) + assert self.redis.get('resque:stat:test_stat') == b'2' stat_obj.decr() - assert self.redis.get('resque:stat:test_stat') == str(1) + assert self.redis.get('resque:stat:test_stat') == b'1' stat_obj.incr() stat_obj.decr(2) - assert self.redis.get('resque:stat:test_stat') == str(0) + assert self.redis.get('resque:stat:test_stat') == b'0' def test_get(self): stat_obj = Stat('test_stat', self.resq) @@ -33,4 +33,4 @@ def test_clear(self): stat_obj.incr() assert self.redis.exists('resque:stat:test_stat') stat_obj.clear() - assert not self.redis.exists('resque:stat:test_stat') \ No newline at end of file + assert not self.redis.exists('resque:stat:test_stat') diff --git a/tests/test_worker.py b/tests/test_worker.py index d390625..164be9c 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -13,7 +13,7 @@ def test_worker_init(self): from pyres.exceptions import NoQueueError self.assertRaises(NoQueueError, Worker,[]) self.assertRaises(Exception, Worker,['test'],TestProcess()) - + def test_startup(self): worker = Worker(['basic']) worker.startup() @@ -24,13 +24,13 @@ def test_startup(self): assert signal.getsignal(signal.SIGINT) == worker.shutdown_all assert signal.getsignal(signal.SIGQUIT) == worker.schedule_shutdown assert signal.getsignal(signal.SIGUSR1) == worker.kill_child - + def test_register(self): worker = Worker(['basic']) worker.register_worker() name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic') assert self.redis.sismember('resque:workers',name) - + def test_unregister(self): worker = Worker(['basic']) worker.register_worker() @@ -46,35 +46,35 @@ def test_working_on(self): worker = Worker(['basic']) worker.working_on(job) assert self.redis.exists("resque:worker:%s" % name) - + def test_processed(self): name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic') worker = Worker(['basic']) worker.processed() assert self.redis.exists("resque:stat:processed") assert self.redis.exists("resque:stat:processed:%s" % name) - assert self.redis.get("resque:stat:processed") == str(1) - assert self.redis.get("resque:stat:processed:%s" % name) == str(1) + assert self.redis.get("resque:stat:processed").decode() == str(1) + assert self.redis.get("resque:stat:processed:%s" % name).decode() == str(1) assert worker.get_processed() == 1 worker.processed() - assert self.redis.get("resque:stat:processed") == str(2) - assert self.redis.get("resque:stat:processed:%s" % name) == str(2) + assert self.redis.get("resque:stat:processed").decode() == str(2) + assert self.redis.get("resque:stat:processed:%s" % name).decode() == str(2) assert worker.get_processed() == 2 - + def test_failed(self): name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic') worker = Worker(['basic']) worker.failed() assert self.redis.exists("resque:stat:failed") assert self.redis.exists("resque:stat:failed:%s" % name) - assert self.redis.get("resque:stat:failed") == str(1) - assert self.redis.get("resque:stat:failed:%s" % name) == str(1) + assert self.redis.get("resque:stat:failed").decode() == str(1) + assert self.redis.get("resque:stat:failed:%s" % name).decode() == str(1) assert worker.get_failed() == 1 worker.failed() - assert self.redis.get("resque:stat:failed") == str(2) - assert self.redis.get("resque:stat:failed:%s" % name) == str(2) + assert self.redis.get("resque:stat:failed").decode() == str(2) + assert self.redis.get("resque:stat:failed:%s" % name).decode() == str(2) assert worker.get_failed() == 2 - + def test_process(self): name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic') self.resq.enqueue(Basic,"test1") @@ -89,8 +89,8 @@ def test_process(self): assert not self.redis.get('resque:worker:%s' % worker) assert not self.redis.get("resque:stat:failed") assert not self.redis.get("resque:stat:failed:%s" % name) - - + + def test_signals(self): worker = Worker(['basic']) worker.startup() @@ -104,16 +104,16 @@ def test_signals(self): #worker.work() #assert worker.child assert not worker.kill_child(frame, signal.SIGUSR1) - + def test_job_failure(self): self.resq.enqueue(ErrorObject) worker = Worker(['basic']) worker.process() name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic') assert not self.redis.get('resque:worker:%s' % worker) - assert self.redis.get("resque:stat:failed") == str(1) - assert self.redis.get("resque:stat:failed:%s" % name) == str(1) - + assert self.redis.get("resque:stat:failed").decode() == str(1) + assert self.redis.get("resque:stat:failed:%s" % name).decode() == str(1) + def test_get_job(self): worker = Worker(['basic']) self.resq.enqueue(Basic,"test1") @@ -124,9 +124,9 @@ def test_get_job(self): assert worker.processing() == ResQ.decode(self.redis.get('resque:worker:%s' % name)) worker.done_working(job) w2 = Worker(['basic']) - print w2.job() + print(w2.job()) assert w2.job() == {} - + def test_working(self): worker = Worker(['basic']) self.resq.enqueue_from_string('tests.Basic','basic','test1') @@ -138,18 +138,18 @@ def test_working(self): assert len(workers) == 1 assert str(worker) == str(workers[0]) assert worker != workers[0] - + def test_started(self): import datetime worker = Worker(['basic']) dt = datetime.datetime.now() worker.started = dt name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic') - assert self.redis.get('resque:worker:%s:started' % name) == str(int(time.mktime(dt.timetuple()))) - assert worker.started == str(int(time.mktime(dt.timetuple()))) + assert self.redis.get('resque:worker:%s:started' % name).decode() == str(int(time.mktime(dt.timetuple()))) + assert worker.started.decode() == str(int(time.mktime(dt.timetuple()))) worker.started = None assert not self.redis.exists('resque:worker:%s:started' % name) - + def test_state(self): worker = Worker(['basic']) assert worker.state() == 'idle' @@ -160,7 +160,7 @@ def test_state(self): assert worker.state() == 'working' worker.done_working(job) assert worker.state() == 'idle' - + def test_prune_dead_workers(self): worker = Worker(['basic']) # we haven't registered this worker, so the assertion below holds assert self.redis.scard('resque:workers') == 0 @@ -296,5 +296,36 @@ def test_retries_give_up_eventually(self): assert None == worker.process() assert worker.get_failed() == 1 + def test_worker_pids(self): + # spawn worker processes and get pids + pids = [] + pids.append(self.spawn_worker(['basic'])) + pids.append(self.spawn_worker(['basic'])) + time.sleep(1) + worker_pids = Worker.worker_pids() + + # send kill signal to workers and wait for them to exit + import signal + for pid in pids: + os.kill(pid, signal.SIGQUIT) + os.waitpid(pid, 0) + + # ensure worker_pids() returned the correct pids + for pid in pids: + assert str(pid) in worker_pids + + # ensure the workers are no longer returned by worker_pids() + worker_pids = Worker.worker_pids() + for pid in pids: + assert str(pid) not in worker_pids + + def spawn_worker(self, queues): + pid = os.fork() + if not pid: + Worker.run(queues, interval=1) + os._exit(0) + else: + return pid + def set_current_time(self, time): ResQ._current_time = staticmethod(lambda: time) diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..1657f6a --- /dev/null +++ b/tox.ini @@ -0,0 +1,9 @@ +[tox] +envlist = py27, py33 + +[testenv] +commands = py.test +deps = + pytest + nose + nosexcover