From 104df5b1eec4c80aeb89f474ab5a587fea8d81fe Mon Sep 17 00:00:00 2001 From: Matt George Date: Thu, 3 Jan 2013 19:11:24 -0600 Subject: [PATCH 01/46] updating copyright --- LICENSE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/LICENSE b/LICENSE index 22c6364..f9725b0 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2009 Matt George +Copyright (c) 2009-2013 Matt George Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the From 0912dcb1876254fc27e9177f1a4d1c96d49e0216 Mon Sep 17 00:00:00 2001 From: Leevar Williams Date: Sun, 20 Jan 2013 06:38:35 +0000 Subject: [PATCH 02/46] Modified worker_pids() to return the correct set of pids for pyres_worker processes and made it a class method. --- pyres/worker.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/pyres/worker.py b/pyres/worker.py index 4874db8..f8bd76b 100644 --- a/pyres/worker.py +++ b/pyres/worker.py @@ -75,7 +75,7 @@ def unregister_worker(self): def prune_dead_workers(self): all_workers = Worker.all(self.resq) - known_workers = self.worker_pids() + known_workers = Worker.worker_pids() for worker in all_workers: host, pid, queues = worker.id.split(':') if host != self.hostname: @@ -329,12 +329,16 @@ def state(self): return 'working' return 'idle' - def worker_pids(self): + @classmethod + def worker_pids(cls): """Returns an array of all pids (as strings) of the workers on this machine. Used when pruning dead workers.""" - return map(lambda l: l.strip().split(' ')[0], - commands.getoutput("ps -A -o pid,command | \ - grep pyres_worker").split("\n")) + cmd = "ps -A -o pid,command | grep pyres_worker | grep -v grep" + output = commands.getoutput(cmd) + if output: + return map(lambda l: l.strip().split(' ')[0], output.split("\n")) + else: + return [] @classmethod def run(cls, queues, server="localhost:6379", interval=None, timeout=None): From 548af974926e3f0b42f73d397092ed7ba4e8f237 Mon Sep 17 00:00:00 2001 From: Leevar Williams Date: Sun, 20 Jan 2013 06:39:10 +0000 Subject: [PATCH 03/46] Added test case for worker_pids(). --- tests/test_worker.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/tests/test_worker.py b/tests/test_worker.py index d390625..792ba6c 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -296,5 +296,36 @@ def test_retries_give_up_eventually(self): assert None == worker.process() assert worker.get_failed() == 1 + def test_worker_pids(self): + # spawn worker processes and get pids + pids = [] + pids.append(self.spawn_worker(['basic'])) + pids.append(self.spawn_worker(['basic'])) + time.sleep(1) + worker_pids = Worker.worker_pids() + + # send kill signal to workers and wait for them to exit + import signal + for pid in pids: + os.kill(pid, signal.SIGQUIT) + os.waitpid(pid, 0) + + # ensure worker_pids() returned the correct pids + assert len(worker_pids) == len(pids) + for pid in pids: + assert str(pid) in worker_pids + + # ensure there are no longer any workers running + worker_pids = Worker.worker_pids() + assert len(worker_pids) == 0 + + def spawn_worker(self, queues): + pid = os.fork() + if not pid: + Worker.run(queues, interval=1) + os._exit(0) + else: + return pid + def set_current_time(self, time): ResQ._current_time = staticmethod(lambda: time) From 062b548f7084e39426e8815fc40183fd9b8bb334 Mon Sep 17 00:00:00 2001 From: Leevar Williams Date: Sun, 20 Jan 2013 06:39:34 +0000 Subject: [PATCH 04/46] Fixed small bug in ResQ __str__() method. --- pyres/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyres/__init__.py b/pyres/__init__.py index 69b1462..b64425c 100644 --- a/pyres/__init__.py +++ b/pyres/__init__.py @@ -260,7 +260,7 @@ def reserve(self, queues): return Job.reserve(queues, self) def __str__(self): - return "PyRes Client connected to %s" % self.redis.server + return "PyRes Client connected to %s" % self.dsn def workers(self): from pyres.worker import Worker From e119d7ed117267674eb3ead8d65caeb9b4d9c949 Mon Sep 17 00:00:00 2001 From: Leevar Williams Date: Tue, 22 Jan 2013 15:03:31 +0000 Subject: [PATCH 05/46] Fixed worker_pids test case. --- tests/test_worker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_worker.py b/tests/test_worker.py index 792ba6c..a6783a9 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -311,13 +311,13 @@ def test_worker_pids(self): os.waitpid(pid, 0) # ensure worker_pids() returned the correct pids - assert len(worker_pids) == len(pids) for pid in pids: assert str(pid) in worker_pids - # ensure there are no longer any workers running + # ensure the workers are no longer returned by worker_pids() worker_pids = Worker.worker_pids() - assert len(worker_pids) == 0 + for pid in pids: + assert str(pid) not in worker_pids def spawn_worker(self, queues): pid = os.fork() From 97cc4c7709242dd39f1f7bc17f6d62dbcc0117ee Mon Sep 17 00:00:00 2001 From: Matt George Date: Fri, 21 Jun 2013 06:31:38 -0500 Subject: [PATCH 06/46] 1.4.2 release --- HISTORY.md | 8 ++++++++ pyres/__init__.py | 2 +- setup.py | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 53e3ad1..7236716 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,3 +1,11 @@ +##1.4.2 (2013-06-21) +* __str__ returns correctly with dsn +* worker_pids returns correct set of workers +* workers are re-registered on every job +* add exception metadata for after_perform method +* logger no longer overrides root logger +* support for redis db in dsn + ##1.4.1 (2012-07-30) * fix for non existent system signal for linux * cleanup of setup.py and requirements diff --git a/pyres/__init__.py b/pyres/__init__.py index b64425c..b6c3ff1 100644 --- a/pyres/__init__.py +++ b/pyres/__init__.py @@ -1,4 +1,4 @@ -__version__ = '1.4.1' +__version__ = '1.4.2' from redis import Redis import pyres.json_parser as json diff --git a/setup.py b/setup.py index e534abe..e404e2f 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ from setuptools import setup, find_packages -version='1.4.1' +version='1.4.2' setup( name='pyres', version=version, From b440083a51f1dcacdcd25e74384d60966cbd2071 Mon Sep 17 00:00:00 2001 From: John Anderson Date: Sat, 13 Jul 2013 09:42:27 -0700 Subject: [PATCH 07/46] change prints --- tests/__init__.py | 4 ++-- tests/test_worker.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/__init__.py b/tests/__init__.py index a75685a..d75c866 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -14,7 +14,7 @@ class BasicMulti(object): queue = 'basic' @staticmethod def perform(name, age): - print 'name: %s, age: %s' % (name, age) + print('name: %s, age: %s' % (name, age)) class ReturnAllArgsJob(object): @@ -97,7 +97,7 @@ class LongObject(object): def perform(sleep_time): import time time.sleep(sleep_time) - print 'Done Sleeping' + print('Done Sleeping') def test_str_to_class(): ret = str_to_class('tests.Basic') diff --git a/tests/test_worker.py b/tests/test_worker.py index a6783a9..184cb93 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -124,7 +124,7 @@ def test_get_job(self): assert worker.processing() == ResQ.decode(self.redis.get('resque:worker:%s' % name)) worker.done_working(job) w2 = Worker(['basic']) - print w2.job() + print(w2.job()) assert w2.job() == {} def test_working(self): From 30dd8570476091597cfa392e837703246a0f3fc1 Mon Sep 17 00:00:00 2001 From: John Anderson Date: Sat, 13 Jul 2013 09:49:59 -0700 Subject: [PATCH 08/46] started the porting by adding a compat.py --- .gitignore | 3 ++- pyres/.compat.py.swo | Bin 0 -> 4096 bytes pyres/__init__.py | 7 ++++--- pyres/compat.py | 27 +++++++++++++++++++++++++++ pyres/json_parser.py | 3 ++- pyres/worker.py | 2 +- 6 files changed, 36 insertions(+), 6 deletions(-) create mode 100644 pyres/.compat.py.swo create mode 100644 pyres/compat.py diff --git a/.gitignore b/.gitignore index ee342e9..23a2c76 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ build/ .coverage *.egg-info logs/ -dist/ \ No newline at end of file +dist/ +*.swp diff --git a/pyres/.compat.py.swo b/pyres/.compat.py.swo new file mode 100644 index 0000000000000000000000000000000000000000..3eb14800a14458fc351e9627f72e593374d5e7d9 GIT binary patch literal 4096 zcmYc?2=nw+FxN9?00IF9hVqt&fzDM*406H@48{3*C8^mU3H%@_HLNd_Dw4S~@R7!85Z5Eu=C(GVC7fzc2c4S^vY0sv%q BC2;@% literal 0 HcmV?d00001 diff --git a/pyres/__init__.py b/pyres/__init__.py index b6c3ff1..a2b353e 100644 --- a/pyres/__init__.py +++ b/pyres/__init__.py @@ -1,6 +1,7 @@ __version__ = '1.4.2' from redis import Redis +from pyres.compat import string_types import pyres.json_parser as json import os @@ -152,7 +153,7 @@ def push(self, queue, item): self.redis.rpush("resque:queue:%s" % queue, ResQ.encode(item)) def pop(self, queues, timeout=10): - if isinstance(queues, basestring): + if isinstance(queues, string_types): queues = [queues] ret = self.redis.blpop(["resque:queue:%s" % q for q in queues], timeout=timeout) @@ -186,7 +187,7 @@ def _get_redis(self): return self._redis def _set_redis(self, server): - if isinstance(server, basestring): + if isinstance(server, string_types): self.dsn = server address, _, db = server.partition('/') host, port = address.split(':') @@ -346,7 +347,7 @@ def encode(cls, item): @classmethod def decode(cls, item): - if isinstance(item, basestring): + if isinstance(item, string_types): ret = json.loads(item) return ret return None diff --git a/pyres/compat.py b/pyres/compat.py new file mode 100644 index 0000000..430f72d --- /dev/null +++ b/pyres/compat.py @@ -0,0 +1,27 @@ +import sys +import types + +try: + import cPickle as pickle +except ImportError: # pragma: no cover + import pickle + +# True if we are running on Python 3. +PY3 = sys.version_info[0] == 3 + +if PY3: # pragma: no cover + string_types = str, + integer_types = int, + class_types = type, + text_type = str + binary_type = bytes + long = int +else: + string_types = basestring, + integer_types = (int, long) + class_types = (type, types.ClassType) + text_type = unicode + binary_type = str + long = long + + diff --git a/pyres/json_parser.py b/pyres/json_parser.py index be80fb6..d425683 100644 --- a/pyres/json_parser.py +++ b/pyres/json_parser.py @@ -1,4 +1,5 @@ from datetime import datetime +from pyres.compat import string_types try: #import simplejson as json @@ -24,7 +25,7 @@ def decode(self, json_string): return self.convert(decoded) def convert(self, value): - if isinstance(value, basestring) and value.startswith(DATE_PREFIX): + if isinstance(value, string_types) and value.startswith(DATE_PREFIX): try: return datetime.strptime(value[len(DATE_PREFIX):], DATE_FORMAT) except ValueError: diff --git a/pyres/worker.py b/pyres/worker.py index f8bd76b..972a7fe 100644 --- a/pyres/worker.py +++ b/pyres/worker.py @@ -257,7 +257,7 @@ def process(self, job=None): except Exception: job_failed = True self._handle_job_exception(job) - except SystemExit, e: + except SystemExit as e: if e.code != 0: job_failed = True self._handle_job_exception(job) From b33e01adeedc2dd3c5e12c4412598682e4f945b5 Mon Sep 17 00:00:00 2001 From: John Anderson Date: Sat, 13 Jul 2013 09:58:23 -0700 Subject: [PATCH 09/46] updated json_parser --- pyres/horde.py | 2 +- pyres/json_parser.py | 2 +- pyres/worker.py | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pyres/horde.py b/pyres/horde.py index 0b47047..70990fa 100644 --- a/pyres/horde.py +++ b/pyres/horde.py @@ -85,7 +85,7 @@ def process(self, job): try: self.working_on(job) job.perform() - except Exception, e: + except Exception as e: exceptionType, exceptionValue, exceptionTraceback = sys.exc_info() self.logger.error("%s failed: %s" % (job, e)) job.fail(exceptionTraceback) diff --git a/pyres/json_parser.py b/pyres/json_parser.py index d425683..a8d18e7 100644 --- a/pyres/json_parser.py +++ b/pyres/json_parser.py @@ -31,7 +31,7 @@ def convert(self, value): except ValueError: return value elif isinstance(value, dict): - for k, v in value.iteritems(): + for k, v in value.items(): new = self.convert(v) if new != v: value[k] = new diff --git a/pyres/worker.py b/pyres/worker.py index 972a7fe..3eb4d3c 100644 --- a/pyres/worker.py +++ b/pyres/worker.py @@ -2,8 +2,8 @@ import signal import datetime, time import os, sys -import json_parser as json -import commands +from pyres import json_parser as json +import subprocess import random from pyres.exceptions import NoQueueError, JobError, TimeoutError, CrashError @@ -334,7 +334,7 @@ def worker_pids(cls): """Returns an array of all pids (as strings) of the workers on this machine. Used when pruning dead workers.""" cmd = "ps -A -o pid,command | grep pyres_worker | grep -v grep" - output = commands.getoutput(cmd) + output = subprocess.getoutput(cmd) if output: return map(lambda l: l.strip().split(' ')[0], output.split("\n")) else: From a4fba8aea50a8168f84e6497f8266d6980d672c2 Mon Sep 17 00:00:00 2001 From: John Anderson Date: Sat, 13 Jul 2013 10:22:47 -0700 Subject: [PATCH 10/46] updated coommands to by py2/py3 compat --- pyres/compat.py | 2 ++ pyres/worker.py | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pyres/compat.py b/pyres/compat.py index 430f72d..4805b21 100644 --- a/pyres/compat.py +++ b/pyres/compat.py @@ -16,6 +16,7 @@ text_type = str binary_type = bytes long = int + import subprocess as commands else: string_types = basestring, integer_types = (int, long) @@ -23,5 +24,6 @@ text_type = unicode binary_type = str long = long + import commands diff --git a/pyres/worker.py b/pyres/worker.py index 3eb4d3c..9befb1c 100644 --- a/pyres/worker.py +++ b/pyres/worker.py @@ -3,7 +3,7 @@ import datetime, time import os, sys from pyres import json_parser as json -import subprocess +from pyres.compat import commands import random from pyres.exceptions import NoQueueError, JobError, TimeoutError, CrashError @@ -334,7 +334,7 @@ def worker_pids(cls): """Returns an array of all pids (as strings) of the workers on this machine. Used when pruning dead workers.""" cmd = "ps -A -o pid,command | grep pyres_worker | grep -v grep" - output = subprocess.getoutput(cmd) + output = commands.getoutput(cmd) if output: return map(lambda l: l.strip().split(' ')[0], output.split("\n")) else: From a4a2c9d259459270106d52f637ed50d40073b40e Mon Sep 17 00:00:00 2001 From: John Anderson Date: Sat, 13 Jul 2013 10:22:59 -0700 Subject: [PATCH 11/46] added tox --- tox.ini | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 tox.ini diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..99608d5 --- /dev/null +++ b/tox.ini @@ -0,0 +1,8 @@ +[tox] +envlist = py27, py33 + +[testenv] +commands = nosetests +deps = + nose + nosexcover From 4e5de75260a60c02229e5b0d4b685fc943420edd Mon Sep 17 00:00:00 2001 From: John Anderson Date: Sat, 13 Jul 2013 10:29:03 -0700 Subject: [PATCH 12/46] updated dependencies to work well in py2 and 3, use bytestring in test --- requirements.txt | 6 +++--- tests/test_stats.py | 14 +++++++------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/requirements.txt b/requirements.txt index 05131d7..96046b5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ -simplejson>=2.0.9 -redis>=2.4.12 -setproctitle>=1.0 +simplejson>3.0 +redis>2.4.12 +setproctitle>1.0 diff --git a/tests/test_stats.py b/tests/test_stats.py index c49ed1a..b6cf291 100644 --- a/tests/test_stats.py +++ b/tests/test_stats.py @@ -4,22 +4,22 @@ class StatTests(PyResTests): def test_incr(self): stat_obj = Stat('test_stat', self.resq) stat_obj.incr() - assert self.redis.get('resque:stat:test_stat') == str(1) + assert self.redis.get('resque:stat:test_stat') == b'1' stat_obj.incr() - assert self.redis.get('resque:stat:test_stat') == str(2) + assert self.redis.get('resque:stat:test_stat') == b'2' stat_obj.incr(2) - assert self.redis.get('resque:stat:test_stat') == str(4) + assert self.redis.get('resque:stat:test_stat') == b'4' def test_decr(self): stat_obj = Stat('test_stat', self.resq) stat_obj.incr() stat_obj.incr() - assert self.redis.get('resque:stat:test_stat') == str(2) + assert self.redis.get('resque:stat:test_stat') == b'2' stat_obj.decr() - assert self.redis.get('resque:stat:test_stat') == str(1) + assert self.redis.get('resque:stat:test_stat') == b'1' stat_obj.incr() stat_obj.decr(2) - assert self.redis.get('resque:stat:test_stat') == str(0) + assert self.redis.get('resque:stat:test_stat') == b'0' def test_get(self): stat_obj = Stat('test_stat', self.resq) @@ -33,4 +33,4 @@ def test_clear(self): stat_obj.incr() assert self.redis.exists('resque:stat:test_stat') stat_obj.clear() - assert not self.redis.exists('resque:stat:test_stat') \ No newline at end of file + assert not self.redis.exists('resque:stat:test_stat') From 6d9fcd1ce3b365e1b3b44df2b2f6566d3a099477 Mon Sep 17 00:00:00 2001 From: John Anderson Date: Sat, 13 Jul 2013 10:31:02 -0700 Subject: [PATCH 13/46] impore properly --- pyres/failure/redis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyres/failure/redis.py b/pyres/failure/redis.py index 715f9de..5fe71ee 100644 --- a/pyres/failure/redis.py +++ b/pyres/failure/redis.py @@ -1,7 +1,7 @@ import datetime, time from base64 import b64encode -from base import BaseBackend +from .base import BaseBackend from pyres import ResQ class RedisBackend(BaseBackend): From b2374c430cbb7232f0b5d7677e960fb52a4240ed Mon Sep 17 00:00:00 2001 From: John Anderson Date: Sat, 13 Jul 2013 10:33:48 -0700 Subject: [PATCH 14/46] update gitignore --- .gitignore | 2 ++ pyres/.compat.py.swo | Bin 4096 -> 0 bytes 2 files changed, 2 insertions(+) delete mode 100644 pyres/.compat.py.swo diff --git a/.gitignore b/.gitignore index 23a2c76..310a9ae 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ build/ logs/ dist/ *.swp +*.swo +.tox/ diff --git a/pyres/.compat.py.swo b/pyres/.compat.py.swo deleted file mode 100644 index 3eb14800a14458fc351e9627f72e593374d5e7d9..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 4096 zcmYc?2=nw+FxN9?00IF9hVqt&fzDM*406H@48{3*C8^mU3H%@_HLNd_Dw4S~@R7!85Z5Eu=C(GVC7fzc2c4S^vY0sv%q BC2;@% From f27561bd4eb9efa42c21a1155461b3ff951000d6 Mon Sep 17 00:00:00 2001 From: Wayne Witzel III Date: Sat, 13 Jul 2013 17:39:44 +0000 Subject: [PATCH 15/46] Converting basestring to compat.string_types --- pyres/horde.py | 95 +++++++++++++++++++++++----------------------- pyres/job.py | 8 ++-- pyres/scheduler.py | 3 +- pyres/worker.py | 8 ++-- 4 files changed, 58 insertions(+), 56 deletions(-) diff --git a/pyres/horde.py b/pyres/horde.py index 70990fa..43bd2d6 100644 --- a/pyres/horde.py +++ b/pyres/horde.py @@ -12,6 +12,7 @@ from pyres.exceptions import NoQueueError from pyres.utils import OrderedDict from pyres.job import Job +from pyres.compat import string_types import pyres.json_parser as json try: from setproctitle import setproctitle @@ -20,7 +21,7 @@ def setproctitle(name): pass def setup_logging(procname, namespace='', log_level=logging.INFO, log_file=None): - + logger = multiprocessing.get_logger() #logger = multiprocessing.log_to_stderr() logger.setLevel(log_level) @@ -31,54 +32,54 @@ def setup_logging(procname, namespace='', log_level=logging.INFO, log_file=None) class Minion(multiprocessing.Process): def __init__(self, queues, server, password, log_level=logging.INFO, log_path=None): multiprocessing.Process.__init__(self, name='Minion') - + #format = '%(asctime)s %(levelname)s %(filename)s-%(lineno)d: %(message)s' #logHandler = logging.StreamHandler() #logHandler.setFormatter(logging.Formatter(format)) #self.logger = multiprocessing.get_logger() #self.logger.addHandler(logHandler) #self.logger.setLevel(logging.DEBUG) - + self.queues = queues self._shutdown = False self.hostname = os.uname()[1] self.server = server self.password = password - + self.log_level = log_level self.log_path = log_path self.log_file = None - + def prune_dead_workers(self): pass - + def schedule_shutdown(self, signum, frame): self._shutdown = True - + def register_signal_handlers(self): signal.signal(signal.SIGTERM, self.schedule_shutdown) signal.signal(signal.SIGINT, self.schedule_shutdown) signal.signal(signal.SIGQUIT, self.schedule_shutdown) - + def register_minion(self): self.resq.redis.sadd('resque:minions',str(self)) self.started = datetime.datetime.now() - + def startup(self): self.register_signal_handlers() self.prune_dead_workers() self.register_minion() - + def __str__(self): return '%s:%s:%s' % (self.hostname, self.pid, ','.join(self.queues)) - + def reserve(self): self.logger.debug('checking queues: %s' % self.queues) job = Job.reserve(self.queues, self.resq, self.__str__()) if job: self.logger.info('Found job on %s' % job._queue) return job - + def process(self, job): if not job: return @@ -95,7 +96,7 @@ def process(self, job): self.logger.info('completed job: %s' % job) finally: self.done_working() - + def working_on(self, job): setproctitle('pyres_minion:%s: working on job: %s' % (os.getppid(), job._payload)) self.logger.debug('marking as working on') @@ -108,25 +109,25 @@ def working_on(self, job): self.resq.redis["resque:minion:%s" % str(self)] = data self.logger.debug("minion:%s" % str(self)) #self.logger.debug(self.resq.redis["resque:minion:%s" % str(self)]) - + def failed(self): Stat("failed", self.resq).incr() - + def processed(self): total_processed = Stat("processed", self.resq) total_processed.incr() - + def done_working(self): self.logger.debug('done working') self.processed() self.resq.redis.delete("resque:minion:%s" % str(self)) - + def unregister_minion(self): self.resq.redis.srem('resque:minions',str(self)) self.started = None - + def work(self, interval=5): - + self.startup() while True: setproctitle('pyres_minion:%s: waiting for job on: %s' % (os.getppid(),self.queues)) @@ -140,11 +141,11 @@ def work(self, interval=5): else: time.sleep(interval) self.unregister_minion() - + def clear_logger(self): for handler in self.logger.handlers: self.logger.removeHandler(handler) - + def run(self): setproctitle('pyres_minion:%s: Starting' % (os.getppid(),)) if self.log_path: @@ -155,19 +156,19 @@ def run(self): namespace = 'minion:%s' % self.pid self.logger = setup_logging('minion', namespace, self.log_level, self.log_file) #self.clear_logger() - if isinstance(self.server,basestring): + if isinstance(self.server,string_types): self.resq = ResQ(server=self.server, password=self.password) elif isinstance(self.server, ResQ): self.resq = self.server else: raise Exception("Bad server argument") - - + + self.work() #while True: # job = self.q.get() # print 'pid: %s is running %s ' % (self.pid,job) - + class Khan(object): _command_map = { @@ -189,28 +190,28 @@ def __init__(self, pool_size=5, queues=[], server='localhost:6379', password=Non self.password = password self.logging_level = logging_level self.log_file = log_file - + #self._workers = list() - + def setup_resq(self): if hasattr(self,'logger'): self.logger.info('Connecting to redis server - %s' % self.server) - if isinstance(self.server,basestring): + if isinstance(self.server,string_types): self.resq = ResQ(server=self.server, password=self.password) elif isinstance(self.server, ResQ): self.resq = self.server else: raise Exception("Bad server argument") - + def validate_queues(self): "Checks if a worker is given atleast one queue to work on." if not self.queues: raise NoQueueError("Please give each worker at least one queue.") - + def startup(self): self.register_signal_handlers() - - + + def register_signal_handlers(self): signal.signal(signal.SIGTERM, self.schedule_shutdown) signal.signal(signal.SIGINT, self.schedule_shutdown) @@ -252,23 +253,23 @@ def current_state(self): def _schedule_shutdown(self): self.schedule_shutdown(None, None) - + def schedule_shutdown(self, signum, frame): self.logger.info('Khan Shutdown scheduled') self._shutdown = True - + def kill_child(self, signum, frame): self._remove_minion() - + def add_child(self, signum, frame): self.add_minion() - + def register_khan(self): if not hasattr(self, 'resq'): self.setup_resq() self.resq.redis.sadd('resque:khans',str(self)) self.started = datetime.datetime.now() - + def _check_commands(self): if not self._shutdown: self.logger.debug('Checking commands') @@ -277,7 +278,7 @@ def _check_commands(self): if command: self.process_command(command) self._check_commands() - + def process_command(self, command): self.logger.info('Processing Command') #available commands, shutdown, add 1, remove 1 @@ -286,13 +287,13 @@ def process_command(self, command): fn = getattr(self, command_item) if fn: fn() - + def add_minion(self): self._add_minion() self.resq.redis.srem('resque:khans',str(self)) self.pool_size += 1 self.resq.redis.sadd('resque:khans',str(self)) - + def _add_minion(self): if hasattr(self,'logger'): self.logger.info('Adding minion') @@ -309,7 +310,7 @@ def _add_minion(self): if hasattr(self,'logger'): self.logger.info('minion added at: %s' % m.pid) return m - + def _shutdown_minions(self): """ send the SIGNINT signal to each worker in the pool. @@ -318,7 +319,7 @@ def _shutdown_minions(self): for minion in self._workers.values(): minion.terminate() minion.join() - + def _remove_minion(self, pid=None): #if pid: # m = self._workers.pop(pid) @@ -328,20 +329,20 @@ def _remove_minion(self, pid=None): self.pool_size -= 1 self.resq.redis.sadd('resque:khans',str(self)) return m - + def unregister_khan(self): if hasattr(self,'logger'): self.logger.debug('unregistering khan') self.resq.redis.srem('resque:khans',str(self)) self.started = None - + def setup_minions(self): for i in range(self.pool_size): self._add_minion() def _setup_logging(self): self.logger = setup_logging('khan', 'khan', self.logging_level, self.log_file) - + def work(self, interval=2): setproctitle('pyres_manager: Starting') self.startup() @@ -363,11 +364,11 @@ def work(self, interval=2): else: time.sleep(interval) self.unregister_khan() - + def __str__(self): hostname = os.uname()[1] return '%s:%s:%s' % (hostname, self.pid, self.pool_size) - + @classmethod def run(cls, pool_size=5, queues=[], server='localhost:6379', logging_level=logging.INFO, log_file=None): worker = cls(pool_size=pool_size, queues=queues, server=server, logging_level=logging_level, log_file=log_file) diff --git a/pyres/job.py b/pyres/job.py index 5f88a0e..8358494 100644 --- a/pyres/job.py +++ b/pyres/job.py @@ -4,7 +4,7 @@ from pyres import ResQ, safe_str_to_class from pyres import failure from pyres.failure.redis import RedisBackend - +from pyres.compat import string_types class Job(object): """Every job on the ResQ is an instance of the *Job* class. @@ -26,7 +26,7 @@ class Job(object): """ safe_str_to_class = staticmethod(safe_str_to_class) - + def __init__(self, queue, payload, resq, worker=None): self._queue = queue self._payload = payload @@ -111,7 +111,7 @@ def retry(self, payload_class, args): """This method provides a way to retry a job after a failure. If the jobclass defined by the payload containes a ``retry_every`` attribute then pyres will attempt to retry the job until successful or until timeout defined by ``retry_timeout`` on the payload class. - + """ retry_every = getattr(payload_class, 'retry_every', None) retry_timeout = getattr(payload_class, 'retry_timeout', 0) @@ -133,7 +133,7 @@ def reserve(cls, queues, res, worker=None, timeout=10): that other workers will not pick it up. """ - if isinstance(queues, basestring): + if isinstance(queues, string_types): queues = [queues] queue, payload = res.pop(queues, timeout=timeout) if payload: diff --git a/pyres/scheduler.py b/pyres/scheduler.py index af5a4d6..11e4f58 100644 --- a/pyres/scheduler.py +++ b/pyres/scheduler.py @@ -3,6 +3,7 @@ import logging from pyres import ResQ, __version__ +from pyres.compat import string_types logger = logging.getLogger(__name__) @@ -14,7 +15,7 @@ def __init__(self, server="localhost:6379", password=None): >>> scheduler = Scheduler('localhost:6379') """ self._shutdown = False - if isinstance(server, basestring): + if isinstance(server, string_types): self.resq = ResQ(server=server, password=password) elif isinstance(server, ResQ): self.resq = server diff --git a/pyres/worker.py b/pyres/worker.py index 9befb1c..908def8 100644 --- a/pyres/worker.py +++ b/pyres/worker.py @@ -9,7 +9,7 @@ from pyres.exceptions import NoQueueError, JobError, TimeoutError, CrashError from pyres.job import Job from pyres import ResQ, Stat, __version__ - +from pyres.compat import string_types logger = logging.getLogger(__name__) @@ -34,7 +34,7 @@ def __init__(self, queues=(), server="localhost:6379", password=None, timeout=No self.hostname = os.uname()[1] self.timeout = timeout - if isinstance(server, basestring): + if isinstance(server, string_types): self.resq = ResQ(server=server, password=password) elif isinstance(server, ResQ): self.resq = server @@ -350,7 +350,7 @@ def run(cls, queues, server="localhost:6379", interval=None, timeout=None): @classmethod def all(cls, host="localhost:6379"): - if isinstance(host,basestring): + if isinstance(host,string_types): resq = ResQ(host) elif isinstance(host, ResQ): resq = host @@ -359,7 +359,7 @@ def all(cls, host="localhost:6379"): @classmethod def working(cls, host): - if isinstance(host, basestring): + if isinstance(host, string_types): resq = ResQ(host) elif isinstance(host, ResQ): resq = host From 52a53eb0c9784321deb4bb543baef5cdfb6829aa Mon Sep 17 00:00:00 2001 From: John Anderson Date: Sat, 13 Jul 2013 10:52:54 -0700 Subject: [PATCH 16/46] switch to pytest --- tox.ini | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 99608d5..1657f6a 100644 --- a/tox.ini +++ b/tox.ini @@ -2,7 +2,8 @@ envlist = py27, py33 [testenv] -commands = nosetests +commands = py.test deps = + pytest nose nosexcover From 1df8942a310c2b3bac31bd3d194c97a44baec46f Mon Sep 17 00:00:00 2001 From: John Anderson Date: Sat, 13 Jul 2013 11:17:12 -0700 Subject: [PATCH 17/46] decode pop (we want strings!!) --- pyres/__init__.py | 2 +- pyres/compat.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pyres/__init__.py b/pyres/__init__.py index a2b353e..71784c6 100644 --- a/pyres/__init__.py +++ b/pyres/__init__.py @@ -159,7 +159,7 @@ def pop(self, queues, timeout=10): timeout=timeout) if ret: key, ret = ret - return key[13:], ResQ.decode(ret) # trim "resque:queue:" + return key[13:].decode(), ResQ.decode(ret.decode()) # trim "resque:queue:" else: return None, None diff --git a/pyres/compat.py b/pyres/compat.py index 4805b21..c39fd3f 100644 --- a/pyres/compat.py +++ b/pyres/compat.py @@ -17,6 +17,7 @@ binary_type = bytes long = int import subprocess as commands + else: string_types = basestring, integer_types = (int, long) From 2961b5eb113c9e0877b8b48a606bade37800b2b5 Mon Sep 17 00:00:00 2001 From: John Anderson Date: Sat, 13 Jul 2013 11:46:39 -0700 Subject: [PATCH 18/46] don't blow up in py3 --- pyres/failure/base.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pyres/failure/base.py b/pyres/failure/base.py index c902299..330fbe4 100644 --- a/pyres/failure/base.py +++ b/pyres/failure/base.py @@ -17,10 +17,14 @@ class BaseBackend(object): """ def __init__(self, exp, queue, payload, worker=None): - excc, _, tb = sys.exc_info() + excc = sys.exc_info()[0] self._exception = excc - self._traceback = traceback.format_exc() + try: + self._traceback = traceback.format_exc() + except AttributeError: + self._traceback = None + self._worker = worker self._queue = queue self._payload = payload From 607e08da090be42757cbc4f3c83c791c9738f1c6 Mon Sep 17 00:00:00 2001 From: Wayne Witzel III Date: Sat, 13 Jul 2013 18:51:12 +0000 Subject: [PATCH 19/46] Fixing some more worker tests --- pyres/__init__.py | 9 ++++----- pyres/worker.py | 2 +- tests/test_worker.py | 32 ++++++++++++++++---------------- 3 files changed, 21 insertions(+), 22 deletions(-) diff --git a/pyres/__init__.py b/pyres/__init__.py index 71784c6..d04a8ee 100644 --- a/pyres/__init__.py +++ b/pyres/__init__.py @@ -232,7 +232,10 @@ def enqueue_from_string(self, klass_as_string, queue, *args, **kwargs): logger.debug("no arguments passed in.") def queues(self): - return self.redis.smembers("resque:queues") or [] + return [sm.decode() for sm in self.redis.smembers("resque:queues")] or [] + + def workers(self): + return [w.decode() for w in self.redis.smembers("resque:workers")] or [] def info(self): """Returns a dictionary of the current status of the pending jobs, @@ -263,10 +266,6 @@ def reserve(self, queues): def __str__(self): return "PyRes Client connected to %s" % self.dsn - def workers(self): - from pyres.worker import Worker - return Worker.all(self) - def working(self): from pyres.worker import Worker return Worker.working(self) diff --git a/pyres/worker.py b/pyres/worker.py index 908def8..952df6a 100644 --- a/pyres/worker.py +++ b/pyres/worker.py @@ -355,7 +355,7 @@ def all(cls, host="localhost:6379"): elif isinstance(host, ResQ): resq = host - return [Worker.find(w,resq) for w in resq.redis.smembers('resque:workers') or []] + return [Worker.find(w,resq) for w in resq.workers() or []] @classmethod def working(cls, host): diff --git a/tests/test_worker.py b/tests/test_worker.py index 184cb93..76455db 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -13,7 +13,7 @@ def test_worker_init(self): from pyres.exceptions import NoQueueError self.assertRaises(NoQueueError, Worker,[]) self.assertRaises(Exception, Worker,['test'],TestProcess()) - + def test_startup(self): worker = Worker(['basic']) worker.startup() @@ -24,13 +24,13 @@ def test_startup(self): assert signal.getsignal(signal.SIGINT) == worker.shutdown_all assert signal.getsignal(signal.SIGQUIT) == worker.schedule_shutdown assert signal.getsignal(signal.SIGUSR1) == worker.kill_child - + def test_register(self): worker = Worker(['basic']) worker.register_worker() name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic') assert self.redis.sismember('resque:workers',name) - + def test_unregister(self): worker = Worker(['basic']) worker.register_worker() @@ -46,7 +46,7 @@ def test_working_on(self): worker = Worker(['basic']) worker.working_on(job) assert self.redis.exists("resque:worker:%s" % name) - + def test_processed(self): name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic') worker = Worker(['basic']) @@ -60,7 +60,7 @@ def test_processed(self): assert self.redis.get("resque:stat:processed") == str(2) assert self.redis.get("resque:stat:processed:%s" % name) == str(2) assert worker.get_processed() == 2 - + def test_failed(self): name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic') worker = Worker(['basic']) @@ -74,7 +74,7 @@ def test_failed(self): assert self.redis.get("resque:stat:failed") == str(2) assert self.redis.get("resque:stat:failed:%s" % name) == str(2) assert worker.get_failed() == 2 - + def test_process(self): name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic') self.resq.enqueue(Basic,"test1") @@ -89,8 +89,8 @@ def test_process(self): assert not self.redis.get('resque:worker:%s' % worker) assert not self.redis.get("resque:stat:failed") assert not self.redis.get("resque:stat:failed:%s" % name) - - + + def test_signals(self): worker = Worker(['basic']) worker.startup() @@ -104,7 +104,7 @@ def test_signals(self): #worker.work() #assert worker.child assert not worker.kill_child(frame, signal.SIGUSR1) - + def test_job_failure(self): self.resq.enqueue(ErrorObject) worker = Worker(['basic']) @@ -113,7 +113,7 @@ def test_job_failure(self): assert not self.redis.get('resque:worker:%s' % worker) assert self.redis.get("resque:stat:failed") == str(1) assert self.redis.get("resque:stat:failed:%s" % name) == str(1) - + def test_get_job(self): worker = Worker(['basic']) self.resq.enqueue(Basic,"test1") @@ -126,7 +126,7 @@ def test_get_job(self): w2 = Worker(['basic']) print(w2.job()) assert w2.job() == {} - + def test_working(self): worker = Worker(['basic']) self.resq.enqueue_from_string('tests.Basic','basic','test1') @@ -138,18 +138,18 @@ def test_working(self): assert len(workers) == 1 assert str(worker) == str(workers[0]) assert worker != workers[0] - + def test_started(self): import datetime worker = Worker(['basic']) dt = datetime.datetime.now() worker.started = dt name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic') - assert self.redis.get('resque:worker:%s:started' % name) == str(int(time.mktime(dt.timetuple()))) - assert worker.started == str(int(time.mktime(dt.timetuple()))) + assert self.redis.get('resque:worker:%s:started' % name).decode() == str(int(time.mktime(dt.timetuple()))) + assert worker.started.decode() == str(int(time.mktime(dt.timetuple()))) worker.started = None assert not self.redis.exists('resque:worker:%s:started' % name) - + def test_state(self): worker = Worker(['basic']) assert worker.state() == 'idle' @@ -160,7 +160,7 @@ def test_state(self): assert worker.state() == 'working' worker.done_working(job) assert worker.state() == 'idle' - + def test_prune_dead_workers(self): worker = Worker(['basic']) # we haven't registered this worker, so the assertion below holds assert self.redis.scard('resque:workers') == 0 From b624c7971fcd38ad7240ef496395b4149e93f8df Mon Sep 17 00:00:00 2001 From: John Anderson Date: Sat, 13 Jul 2013 11:55:44 -0700 Subject: [PATCH 20/46] we need to pass strings to decode but bytes to b64 --- pyres/failure/redis.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyres/failure/redis.py b/pyres/failure/redis.py index 5fe71ee..0b44739 100644 --- a/pyres/failure/redis.py +++ b/pyres/failure/redis.py @@ -34,7 +34,8 @@ def all(cls, resq, start=0, count=1): ret_list = [] for i in items: - failure = ResQ.decode(i) + converted = i.decode() + failure = ResQ.decode(converted) failure['redis_value'] = b64encode(i) ret_list.append(failure) return ret_list From 48a52d83c3f60cf3bec3e42ae84701966043e29b Mon Sep 17 00:00:00 2001 From: Wayne Witzel III Date: Sat, 13 Jul 2013 19:00:36 +0000 Subject: [PATCH 21/46] More test fixing --- pyres/__init__.py | 2 +- tests/test_resq.py | 26 +++++++++++++------------- tests/test_worker.py | 20 ++++++++++---------- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/pyres/__init__.py b/pyres/__init__.py index d04a8ee..bd589f0 100644 --- a/pyres/__init__.py +++ b/pyres/__init__.py @@ -256,7 +256,7 @@ def info(self): } def keys(self): - return [key.replace('resque:','') + return [key.decode().replace('resque:','') for key in self.redis.keys('resque:*')] def reserve(self, queues): diff --git a/tests/test_resq.py b/tests/test_resq.py index 7412899..1559202 100644 --- a/tests/test_resq.py +++ b/tests/test_resq.py @@ -10,14 +10,14 @@ def test_enqueue(self): ResQ._enqueue(Basic, "test3") assert self.redis.llen("resque:queue:basic") == 3 assert self.redis.sismember('resque:queues','basic') - + def test_push(self): self.resq.push('pushq','content-newqueue') self.resq.push('pushq','content2-newqueue') assert self.redis.llen('resque:queue:pushq') == 2 - assert self.redis.lindex('resque:queue:pushq', 0) == ResQ.encode('content-newqueue') - assert self.redis.lindex('resque:queue:pushq', 1) == ResQ.encode('content2-newqueue') - + assert self.redis.lindex('resque:queue:pushq', 0).decode() == ResQ.encode('content-newqueue') + assert self.redis.lindex('resque:queue:pushq', 1).decode() == ResQ.encode('content2-newqueue') + def test_pop(self): self.resq.push('pushq','content-newqueue') self.resq.push('pushq','content2-newqueue') @@ -43,25 +43,25 @@ def test_pop_two_queues(self): assert self.redis.llen('resque:queue:pushq1') == 0 assert self.redis.llen('resque:queue:pushq2') == 0 assert self.resq.pop(['pushq1', 'pushq2'], timeout=1) == (None, None) - + def test_peek(self): self.resq.enqueue(Basic,"test1") self.resq.enqueue(Basic,"test2") assert len(self.resq.peek('basic',0,20)) == 2 - + def test_size(self): self.resq.enqueue(Basic,"test1") self.resq.enqueue(Basic,"test2") assert self.resq.size('basic') == 2 assert self.resq.size('noq') == 0 - + def test_redis_property(self): from redis import Redis rq = ResQ(server="localhost:6379") red = Redis() #rq2 = ResQ(server=red) self.assertRaises(Exception, rq.redis,[Basic]) - + def test_info(self): self.resq.enqueue(Basic,"test1") self.resq.enqueue(TestProcess) @@ -73,14 +73,14 @@ def test_info(self): worker.register_worker() info = self.resq.info() assert info['workers'] == 1 - + def test_workers(self): worker = Worker(['basic']) worker.register_worker() name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic') assert len(self.resq.workers()) == 1 #assert Worker.find(name, self.resq) in self.resq.workers() - + def test_enqueue_from_string(self): self.resq.enqueue_from_string('tests.Basic','basic','test1') name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic') @@ -91,7 +91,7 @@ def test_enqueue_from_string(self): assert not self.redis.get('resque:worker:%s' % worker) assert not self.redis.get("resque:stat:failed") assert not self.redis.get("resque:stat:failed:%s" % name) - + def test_remove_queue(self): self.resq.enqueue_from_string('tests.Basic','basic','test1') assert 'basic' in self.resq._watched_queues @@ -101,12 +101,12 @@ def test_remove_queue(self): assert 'basic' not in self.resq._watched_queues assert not self.redis.sismember('resque:queues','basic') assert not self.redis.exists('resque:queue:basic') - + def test_keys(self): self.resq.enqueue_from_string('tests.Basic','basic','test1') assert 'queue:basic' in self.resq.keys() assert 'queues' in self.resq.keys() - + def test_queues(self): assert self.resq.queues() == [] self.resq.enqueue_from_string('tests.Basic','basic','test1') diff --git a/tests/test_worker.py b/tests/test_worker.py index 76455db..164be9c 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -53,12 +53,12 @@ def test_processed(self): worker.processed() assert self.redis.exists("resque:stat:processed") assert self.redis.exists("resque:stat:processed:%s" % name) - assert self.redis.get("resque:stat:processed") == str(1) - assert self.redis.get("resque:stat:processed:%s" % name) == str(1) + assert self.redis.get("resque:stat:processed").decode() == str(1) + assert self.redis.get("resque:stat:processed:%s" % name).decode() == str(1) assert worker.get_processed() == 1 worker.processed() - assert self.redis.get("resque:stat:processed") == str(2) - assert self.redis.get("resque:stat:processed:%s" % name) == str(2) + assert self.redis.get("resque:stat:processed").decode() == str(2) + assert self.redis.get("resque:stat:processed:%s" % name).decode() == str(2) assert worker.get_processed() == 2 def test_failed(self): @@ -67,12 +67,12 @@ def test_failed(self): worker.failed() assert self.redis.exists("resque:stat:failed") assert self.redis.exists("resque:stat:failed:%s" % name) - assert self.redis.get("resque:stat:failed") == str(1) - assert self.redis.get("resque:stat:failed:%s" % name) == str(1) + assert self.redis.get("resque:stat:failed").decode() == str(1) + assert self.redis.get("resque:stat:failed:%s" % name).decode() == str(1) assert worker.get_failed() == 1 worker.failed() - assert self.redis.get("resque:stat:failed") == str(2) - assert self.redis.get("resque:stat:failed:%s" % name) == str(2) + assert self.redis.get("resque:stat:failed").decode() == str(2) + assert self.redis.get("resque:stat:failed:%s" % name).decode() == str(2) assert worker.get_failed() == 2 def test_process(self): @@ -111,8 +111,8 @@ def test_job_failure(self): worker.process() name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic') assert not self.redis.get('resque:worker:%s' % worker) - assert self.redis.get("resque:stat:failed") == str(1) - assert self.redis.get("resque:stat:failed:%s" % name) == str(1) + assert self.redis.get("resque:stat:failed").decode() == str(1) + assert self.redis.get("resque:stat:failed:%s" % name).decode() == str(1) def test_get_job(self): worker = Worker(['basic']) From 29e580351644a4a5085b18c789adaddb0428df4a Mon Sep 17 00:00:00 2001 From: John Anderson Date: Sat, 13 Jul 2013 12:17:44 -0700 Subject: [PATCH 22/46] support py3 --- pyres/__init__.py | 8 +++++--- tests/test_schedule.py | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pyres/__init__.py b/pyres/__init__.py index bd589f0..bea6051 100644 --- a/pyres/__init__.py +++ b/pyres/__init__.py @@ -312,7 +312,7 @@ def delayed_queue_schedule_size(self): size = 0 length = self.redis.zcard('resque:delayed_queue_schedule') for i in self.redis.zrange('resque:delayed_queue_schedule',0,length): - size += self.delayed_timestamp_size(i) + size += self.delayed_timestamp_size(i.decode()) return size def delayed_timestamp_size(self, timestamp): @@ -326,7 +326,9 @@ def next_delayed_timestamp(self): timestamp = None if array: timestamp = array[0] - return timestamp + + if timestamp: + return timestamp.decode() def next_item_for_timestamp(self, timestamp): #key = int(time.mktime(timestamp.timetuple())) @@ -334,7 +336,7 @@ def next_item_for_timestamp(self, timestamp): ret = self.redis.lpop(key) item = None if ret: - item = ResQ.decode(ret) + item = ResQ.decode(ret.decode()) if self.redis.llen(key) == 0: self.redis.delete(key) self.redis.zrem('resque:delayed_queue_schedule', timestamp) diff --git a/tests/test_schedule.py b/tests/test_schedule.py index 27b21fb..903c825 100644 --- a/tests/test_schedule.py +++ b/tests/test_schedule.py @@ -73,4 +73,4 @@ def test_schedule_shutdown(self): scheduler = Scheduler(self.resq) scheduler.schedule_shutdown(19,'') assert scheduler._shutdown - \ No newline at end of file + From 4ed56f475305fb66e2942c294f1e1c6023cace6f Mon Sep 17 00:00:00 2001 From: Wayne Witzel III Date: Sat, 13 Jul 2013 19:20:15 +0000 Subject: [PATCH 23/46] More test fixing --- pyres/failure/multiple.py | 4 +- pyres/horde.py | 2 +- pyres/utils.py | 98 --------------------------------------- 3 files changed, 3 insertions(+), 101 deletions(-) delete mode 100644 pyres/utils.py diff --git a/pyres/failure/multiple.py b/pyres/failure/multiple.py index e4d05f7..6362363 100644 --- a/pyres/failure/multiple.py +++ b/pyres/failure/multiple.py @@ -1,5 +1,5 @@ -from base import BaseBackend -from redis import RedisBackend +from pyres.failure.base import BaseBackend +from pyres.failure.redis import RedisBackend class MultipleBackend(BaseBackend): """Extends ``BaseBackend`` to provide support for delegating calls to multiple diff --git a/pyres/horde.py b/pyres/horde.py index 43bd2d6..b3462de 100644 --- a/pyres/horde.py +++ b/pyres/horde.py @@ -10,7 +10,7 @@ import logging.handlers from pyres import ResQ, Stat, get_logging_handler, special_log_file from pyres.exceptions import NoQueueError -from pyres.utils import OrderedDict +from collections import OrderedDict from pyres.job import Job from pyres.compat import string_types import pyres.json_parser as json diff --git a/pyres/utils.py b/pyres/utils.py deleted file mode 100644 index d640c11..0000000 --- a/pyres/utils.py +++ /dev/null @@ -1,98 +0,0 @@ -from UserDict import DictMixin - -class OrderedDict(dict, DictMixin): - - def __init__(self, *args, **kwds): - if len(args) > 1: - raise TypeError('expected at most 1 arguments, got %d' % len(args)) - try: - self.__end - except AttributeError: - self.clear() - self.update(*args, **kwds) - - def clear(self): - self.__end = end = [] - end += [None, end, end] # sentinel node for doubly linked list - self.__map = {} # key --> [key, prev, next] - dict.clear(self) - - def __setitem__(self, key, value): - if key not in self: - end = self.__end - curr = end[1] - curr[2] = end[1] = self.__map[key] = [key, curr, end] - dict.__setitem__(self, key, value) - - def __delitem__(self, key): - dict.__delitem__(self, key) - key, prev, next = self.__map.pop(key) - prev[2] = next - next[1] = prev - - def __iter__(self): - end = self.__end - curr = end[2] - while curr is not end: - yield curr[0] - curr = curr[2] - - def __reversed__(self): - end = self.__end - curr = end[1] - while curr is not end: - yield curr[0] - curr = curr[1] - - def popitem(self, last=True): - if not self: - raise KeyError('dictionary is empty') - key = reversed(self).next() if last else iter(self).next() - value = self.pop(key) - return key, value - - def __reduce__(self): - items = [[k, self[k]] for k in self] - tmp = self.__map, self.__end - del self.__map, self.__end - inst_dict = vars(self).copy() - self.__map, self.__end = tmp - if inst_dict: - return (self.__class__, (items,), inst_dict) - return self.__class__, (items,) - - def keys(self): - return list(self) - - setdefault = DictMixin.setdefault - update = DictMixin.update - pop = DictMixin.pop - values = DictMixin.values - items = DictMixin.items - iterkeys = DictMixin.iterkeys - itervalues = DictMixin.itervalues - iteritems = DictMixin.iteritems - - def __repr__(self): - if not self: - return '%s()' % (self.__class__.__name__,) - return '%s(%r)' % (self.__class__.__name__, self.items()) - - def copy(self): - return self.__class__(self) - - @classmethod - def fromkeys(cls, iterable, value=None): - d = cls() - for key in iterable: - d[key] = value - return d - - def __eq__(self, other): - if isinstance(other, OrderedDict): - return len(self)==len(other) and \ - all(p==q for p, q in zip(self.items(), other.items())) - return dict.__eq__(self, other) - - def __ne__(self, other): - return not self == other From 5a418104cbeecf92f69434a51d15cb81288d5859 Mon Sep 17 00:00:00 2001 From: Wayne Witzel III Date: Sat, 13 Jul 2013 19:49:34 +0000 Subject: [PATCH 24/46] Final fixes for py3 compat --- pyres/__init__.py | 12 ++++++------ pyres/failure/redis.py | 3 +-- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/pyres/__init__.py b/pyres/__init__.py index bea6051..362f2dd 100644 --- a/pyres/__init__.py +++ b/pyres/__init__.py @@ -159,7 +159,7 @@ def pop(self, queues, timeout=10): timeout=timeout) if ret: key, ret = ret - return key[13:].decode(), ResQ.decode(ret.decode()) # trim "resque:queue:" + return key[13:].decode(), ResQ.decode(ret) # trim "resque:queue:" else: return None, None @@ -336,7 +336,7 @@ def next_item_for_timestamp(self, timestamp): ret = self.redis.lpop(key) item = None if ret: - item = ResQ.decode(ret.decode()) + item = ResQ.decode(ret) if self.redis.llen(key) == 0: self.redis.delete(key) self.redis.zrem('resque:delayed_queue_schedule', timestamp) @@ -348,10 +348,10 @@ def encode(cls, item): @classmethod def decode(cls, item): - if isinstance(item, string_types): - ret = json.loads(item) - return ret - return None + if not isinstance(item, string_types): + item = item.decode() + ret = json.loads(item) + return ret @classmethod def _enqueue(cls, klass, *args): diff --git a/pyres/failure/redis.py b/pyres/failure/redis.py index 0b44739..5fe71ee 100644 --- a/pyres/failure/redis.py +++ b/pyres/failure/redis.py @@ -34,8 +34,7 @@ def all(cls, resq, start=0, count=1): ret_list = [] for i in items: - converted = i.decode() - failure = ResQ.decode(converted) + failure = ResQ.decode(i) failure['redis_value'] = b64encode(i) ret_list.append(failure) return ret_list From 06442cfa26ed72b753f94c87344714aaaeb16a77 Mon Sep 17 00:00:00 2001 From: Wayne Witzel III Date: Sat, 13 Jul 2013 20:16:34 +0000 Subject: [PATCH 25/46] Fixed py26 support --- pyres/compat.py | 1 + pyres/horde.py | 5 ++++- setup.py | 14 ++++++++++---- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/pyres/compat.py b/pyres/compat.py index c39fd3f..20732bf 100644 --- a/pyres/compat.py +++ b/pyres/compat.py @@ -8,6 +8,7 @@ # True if we are running on Python 3. PY3 = sys.version_info[0] == 3 +PY26 = sys.version_info[0:2] == (2,6) if PY3: # pragma: no cover string_types = str, diff --git a/pyres/horde.py b/pyres/horde.py index b3462de..b150c93 100644 --- a/pyres/horde.py +++ b/pyres/horde.py @@ -10,7 +10,10 @@ import logging.handlers from pyres import ResQ, Stat, get_logging_handler, special_log_file from pyres.exceptions import NoQueueError -from collections import OrderedDict +try: + from collections import OrderedDict +except ImportError: + from ordereddict import OrderedDict from pyres.job import Job from pyres.compat import string_types import pyres.json_parser as json diff --git a/setup.py b/setup.py index e404e2f..b2bc918 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,13 @@ from setuptools import setup, find_packages +from pyres.compat import PY26 + +install_requires=[ + item for item in + open("requirements.txt").read().split("\n") + if item], + +if PY26: + install_requires.append('ordereddict') version='1.4.2' setup( @@ -20,10 +29,7 @@ pyres_scheduler=pyres.scripts:pyres_scheduler pyres_worker=pyres.scripts:pyres_worker """, - install_requires=[ - item for item in - open("requirements.txt").read().split("\n") - if item], + install_requires=install_requires, classifiers = [ 'Development Status :: 4 - Beta', 'Environment :: Console', From 1a73d1e1916b2ad85d5d1b7e58bbfdd39648cb1b Mon Sep 17 00:00:00 2001 From: Wayne Witzel III Date: Sat, 13 Jul 2013 20:30:50 +0000 Subject: [PATCH 26/46] fixes for py26, added py33 to travis --- .travis.yml | 6 +++--- setup.py | 28 ++++++++++++++++++++++------ 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/.travis.yml b/.travis.yml index c8a616d..66b134f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,10 +2,10 @@ language: python python: - "2.6" - "2.7" + - "3.3" # - "pypy" # command to install dependencies install: - - pip install -r requirements-test.txt --use-mirrors - - pip install -r requirements.txt --use-mirrors + - python setup.py install # command to run tests -script: nosetests +script: python setup.py test diff --git a/setup.py b/setup.py index b2bc918..7732582 100644 --- a/setup.py +++ b/setup.py @@ -1,13 +1,27 @@ +import sys from setuptools import setup, find_packages +from setuptools.command.test import test as TestCommand from pyres.compat import PY26 -install_requires=[ - item for item in - open("requirements.txt").read().split("\n") - if item], +requires=[ + item for item in + open("requirements.txt").read().split("\n") + if item] if PY26: - install_requires.append('ordereddict') + requires.append('ordereddict') + +class PyTest(TestCommand): + def finalize_options(self): + TestCommand.finalize_options(self) + self.test_args = [] + self.test_suite = True + + def run_tests(self): + #import here, cause outside the eggs aren't loaded + import pytest + result = pytest.main(self.test_args) + sys.exit(result) version='1.4.2' setup( @@ -29,7 +43,9 @@ pyres_scheduler=pyres.scripts:pyres_scheduler pyres_worker=pyres.scripts:pyres_worker """, - install_requires=install_requires, + tests_require=requires + ['pytest',], + cmdclass={'test': PyTest}, + install_requires=requires, classifiers = [ 'Development Status :: 4 - Beta', 'Environment :: Console', From f7c43084f558ae6921625b8f17c200c156b6424c Mon Sep 17 00:00:00 2001 From: Wayne Witzel III Date: Sat, 13 Jul 2013 20:34:49 +0000 Subject: [PATCH 27/46] Fixing import issue with compat.PY26 --- pyres/compat.py | 1 - setup.py | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/pyres/compat.py b/pyres/compat.py index 20732bf..c39fd3f 100644 --- a/pyres/compat.py +++ b/pyres/compat.py @@ -8,7 +8,6 @@ # True if we are running on Python 3. PY3 = sys.version_info[0] == 3 -PY26 = sys.version_info[0:2] == (2,6) if PY3: # pragma: no cover string_types = str, diff --git a/setup.py b/setup.py index 7732582..bb79183 100644 --- a/setup.py +++ b/setup.py @@ -1,14 +1,13 @@ import sys from setuptools import setup, find_packages from setuptools.command.test import test as TestCommand -from pyres.compat import PY26 requires=[ item for item in open("requirements.txt").read().split("\n") if item] -if PY26: +if sys.version_info[0:2] == (2,6): requires.append('ordereddict') class PyTest(TestCommand): From 401d963eeef7100616ce94748c9f41108b6a05f4 Mon Sep 17 00:00:00 2001 From: Wayne Witzel III Date: Mon, 15 Jul 2013 09:24:51 -0700 Subject: [PATCH 28/46] Update README.markdown --- README.markdown | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.markdown b/README.markdown index 0af6efb..ef8d07c 100644 --- a/README.markdown +++ b/README.markdown @@ -10,7 +10,7 @@ Because of some differences between ruby and python, there are a couple of place ## Travis CI -Currently, pyres is being tested via travis ci for python version 2.6, 2.7, and pypy: +Currently, pyres is being tested via travis ci for python version 2.6, 2.7, and 3.3: [![Build Status](https://secure.travis-ci.org/binarydud/pyres.png)](http://travis-ci.org/binarydud/pyres) ## Running Tests From c6c965313d012cda27b9fd078e5691f8d7aa3f6b Mon Sep 17 00:00:00 2001 From: Bamco Date: Tue, 16 Jul 2013 17:37:31 +0300 Subject: [PATCH 29/46] Removed unnecessary lines in ResQ docstring. --- pyres/__init__.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/pyres/__init__.py b/pyres/__init__.py index 362f2dd..3326b6e 100644 --- a/pyres/__init__.py +++ b/pyres/__init__.py @@ -121,14 +121,6 @@ class ResQ(object): ``password`` -- The password, if required, of your Redis server. Default is "None". - ``timeout`` -- The timeout keyword is in the signature, but is unused. Default is "None". - - ``retry_connection`` -- This keyword is in the signature but is deprecated. Default is "True". - - - Both ``timeout`` and ``retry_connection`` will be removed as the python-redis client - no longer uses them. - Example usage:: >>> from pyres import * From 3d0309e08c70622dc6c3cac683cb8294290b443f Mon Sep 17 00:00:00 2001 From: John Anderson Date: Thu, 18 Jul 2013 16:25:59 -0700 Subject: [PATCH 30/46] Update setup.py --- setup.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/setup.py b/setup.py index bb79183..966dad0 100644 --- a/setup.py +++ b/setup.py @@ -51,5 +51,8 @@ def run_tests(self): 'Intended Audience :: Developers', 'License :: OSI Approved :: MIT License', 'Operating System :: OS Independent', + 'Programming Language :: Python :: 2.6', + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3.3', 'Programming Language :: Python'], ) From fb8c8aa5fe53b7310e646d4f8d1373770988c3a3 Mon Sep 17 00:00:00 2001 From: kracekumar Date: Tue, 17 Sep 2013 22:03:20 +0530 Subject: [PATCH 31/46] Removed coverage.report --- .gitignore | 1 + coverage.report | 11 ----------- 2 files changed, 1 insertion(+), 11 deletions(-) delete mode 100644 coverage.report diff --git a/.gitignore b/.gitignore index 310a9ae..e3a0b9e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ *.pyc build/ .coverage +*.report *.egg-info logs/ dist/ diff --git a/coverage.report b/coverage.report deleted file mode 100644 index d9c33d7..0000000 --- a/coverage.report +++ /dev/null @@ -1,11 +0,0 @@ -Name Stmts Exec Cover Missing ------------------------------------------------- -pyres 138 131 94% 26, 39, 98, 133-134, 144-145 -pyres.exceptions 2 2 100% -pyres.failure 23 22 95% 41 -pyres.job 23 23 100% -pyres.worker 189 146 77% 66, 74, 84-112, 161, 179, 186, 230-241 ------------------------------------------------- -TOTAL 375 324 86% ----------------------------------------------------------------------- -Ran 32 tests in 0.884s \ No newline at end of file From d036a5d0b3b2e57174391b4d8aca9defff8b87f0 Mon Sep 17 00:00:00 2001 From: Paulo Sousa Date: Fri, 11 Oct 2013 16:06:04 -0300 Subject: [PATCH 32/46] Implementing password support on command line to worker, scheduler and manager --- pyres/horde.py | 4 ++-- pyres/scripts.py | 15 +++++++++------ pyres/worker.py | 4 ++-- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/pyres/horde.py b/pyres/horde.py index b150c93..6ea1eb3 100644 --- a/pyres/horde.py +++ b/pyres/horde.py @@ -373,8 +373,8 @@ def __str__(self): return '%s:%s:%s' % (hostname, self.pid, self.pool_size) @classmethod - def run(cls, pool_size=5, queues=[], server='localhost:6379', logging_level=logging.INFO, log_file=None): - worker = cls(pool_size=pool_size, queues=queues, server=server, logging_level=logging_level, log_file=log_file) + def run(cls, pool_size=5, queues=[], server='localhost:6379', password=None, logging_level=logging.INFO, log_file=None): + worker = cls(pool_size=pool_size, queues=queues, server=server, password=password, logging_level=logging_level, log_file=log_file) worker.work() #if __name__ == "__main__": diff --git a/pyres/scripts.py b/pyres/scripts.py index 55c0854..659a77f 100644 --- a/pyres/scripts.py +++ b/pyres/scripts.py @@ -13,7 +13,8 @@ def pyres_manager(): parser = OptionParser(usage=usage) #parser.add_option("-q", dest="queue_list") parser.add_option("--host", dest="host", default="localhost") - parser.add_option("--port",dest="port",type="int", default=6379) + parser.add_option("--port", dest="port",type="int", default=6379) + parser.add_option("--password", dest="password", default=None) parser.add_option("-i", '--interval', dest='interval', default=None, help='the default time interval to sleep between runs') parser.add_option('-l', '--log-level', dest='log_level', default='info', help='log level. Valid values are "debug", "info", "warning", "error", "critical", in decreasing order of verbosity. Defaults to "info" if parameter not specified.') parser.add_option("--pool", type="int", dest="pool_size", default=1, help="Number of minions to spawn under the manager.") @@ -36,7 +37,7 @@ def pyres_manager(): queues = args[0].split(',') server = '%s:%s' % (options.host,options.port) - Khan.run(pool_size=options.pool_size, queues=queues, server=server, logging_level=log_level, log_file=options.logfile) + Khan.run(pool_size=options.pool_size, queues=queues, server=server, password=password, logging_level=log_level, log_file=options.logfile) def pyres_scheduler(): @@ -44,7 +45,8 @@ def pyres_scheduler(): parser = OptionParser(usage=usage) #parser.add_option("-q", dest="queue_list") parser.add_option("--host", dest="host", default="localhost") - parser.add_option("--port",dest="port",type="int", default=6379) + parser.add_option("--port", dest="port",type="int", default=6379) + parser.add_option("--password", dest="password", default=None) parser.add_option('-l', '--log-level', dest='log_level', default='info', help='log level. Valid values are "debug", "info", "warning", "error", "critical", in decreasing order of verbosity. Defaults to "info" if parameter not specified.') parser.add_option('-f', dest='logfile', help='If present, a logfile will be used. "stderr", "stdout", and "syslog" are all special values.') parser.add_option('-p', dest='pidfile', help='If present, a pidfile will be used.') @@ -54,7 +56,7 @@ def pyres_scheduler(): setup_logging(procname="pyres_scheduler", log_level=log_level, filename=options.logfile) setup_pidfile(options.pidfile) server = '%s:%s' % (options.host, options.port) - Scheduler.run(server) + Scheduler.run(server, password) def pyres_worker(): @@ -62,7 +64,8 @@ def pyres_worker(): parser = OptionParser(usage=usage) parser.add_option("--host", dest="host", default="localhost") - parser.add_option("--port",dest="port",type="int", default=6379) + parser.add_option("--port", dest="port",type="int", default=6379) + parser.add_option("--password", dest="password", default=None) parser.add_option("-i", '--interval', dest='interval', default=None, help='the default time interval to sleep between runs') parser.add_option('-l', '--log-level', dest='log_level', default='info', help='log level. Valid values are "debug", "info", "warning", "error", "critical", in decreasing order of verbosity. Defaults to "info" if parameter not specified.') parser.add_option('-f', dest='logfile', help='If present, a logfile will be used. "stderr", "stdout", and "syslog" are all special values.') @@ -86,4 +89,4 @@ def pyres_worker(): queues = args[0].split(',') server = '%s:%s' % (options.host,options.port) - Worker.run(queues, server, interval, timeout=timeout) + Worker.run(queues, server, password, interval, timeout=timeout) diff --git a/pyres/worker.py b/pyres/worker.py index 952df6a..fc42b12 100644 --- a/pyres/worker.py +++ b/pyres/worker.py @@ -341,8 +341,8 @@ def worker_pids(cls): return [] @classmethod - def run(cls, queues, server="localhost:6379", interval=None, timeout=None): - worker = cls(queues=queues, server=server, timeout=timeout) + def run(cls, queues, server="localhost:6379", password=None, interval=None, timeout=None): + worker = cls(queues=queues, server=server, password=password, timeout=timeout) if interval is not None: worker.work(interval) else: From 4b747eb57c043f2c1aa28ef5a8d10eaba409e9dc Mon Sep 17 00:00:00 2001 From: Paulo Sousa Date: Mon, 14 Oct 2013 14:27:49 -0300 Subject: [PATCH 33/46] getting password option to password var --- pyres/scripts.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyres/scripts.py b/pyres/scripts.py index 659a77f..f59bf00 100644 --- a/pyres/scripts.py +++ b/pyres/scripts.py @@ -37,6 +37,7 @@ def pyres_manager(): queues = args[0].split(',') server = '%s:%s' % (options.host,options.port) + password = options.password Khan.run(pool_size=options.pool_size, queues=queues, server=server, password=password, logging_level=log_level, log_file=options.logfile) @@ -56,6 +57,7 @@ def pyres_scheduler(): setup_logging(procname="pyres_scheduler", log_level=log_level, filename=options.logfile) setup_pidfile(options.pidfile) server = '%s:%s' % (options.host, options.port) + password = options.password Scheduler.run(server, password) @@ -89,4 +91,5 @@ def pyres_worker(): queues = args[0].split(',') server = '%s:%s' % (options.host,options.port) + password = options.password Worker.run(queues, server, password, interval, timeout=timeout) From b3ad393f22b0f6d024cfabffedf64d600837dbb1 Mon Sep 17 00:00:00 2001 From: Paulo Sousa Date: Mon, 14 Oct 2013 16:39:24 -0300 Subject: [PATCH 34/46] Receiving command line interval properly on manager --- pyres/horde.py | 4 ++-- pyres/scripts.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pyres/horde.py b/pyres/horde.py index 6ea1eb3..e156eb7 100644 --- a/pyres/horde.py +++ b/pyres/horde.py @@ -373,8 +373,8 @@ def __str__(self): return '%s:%s:%s' % (hostname, self.pid, self.pool_size) @classmethod - def run(cls, pool_size=5, queues=[], server='localhost:6379', password=None, logging_level=logging.INFO, log_file=None): - worker = cls(pool_size=pool_size, queues=queues, server=server, password=password, logging_level=logging_level, log_file=log_file) + def run(cls, pool_size=5, queues=[], server='localhost:6379', password=None, interval=2, logging_level=logging.INFO, log_file=None): + worker = cls(pool_size=pool_size, queues=queues, server=server, password=password, interval=interval, logging_level=logging_level, log_file=log_file) worker.work() #if __name__ == "__main__": diff --git a/pyres/scripts.py b/pyres/scripts.py index f59bf00..26e07d7 100644 --- a/pyres/scripts.py +++ b/pyres/scripts.py @@ -38,7 +38,7 @@ def pyres_manager(): queues = args[0].split(',') server = '%s:%s' % (options.host,options.port) password = options.password - Khan.run(pool_size=options.pool_size, queues=queues, server=server, password=password, logging_level=log_level, log_file=options.logfile) + Khan.run(pool_size=options.pool_size, queues=queues, server=server, password=password, interval=interval, logging_level=log_level, log_file=options.logfile) def pyres_scheduler(): From d492bc3e412b74941697c59aa045dd344b63db63 Mon Sep 17 00:00:00 2001 From: Paulo Sousa Date: Mon, 14 Oct 2013 17:13:08 -0300 Subject: [PATCH 35/46] passing interval to work method, not init --- pyres/horde.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyres/horde.py b/pyres/horde.py index e156eb7..5db0dc9 100644 --- a/pyres/horde.py +++ b/pyres/horde.py @@ -374,8 +374,8 @@ def __str__(self): @classmethod def run(cls, pool_size=5, queues=[], server='localhost:6379', password=None, interval=2, logging_level=logging.INFO, log_file=None): - worker = cls(pool_size=pool_size, queues=queues, server=server, password=password, interval=interval, logging_level=logging_level, log_file=log_file) - worker.work() + worker = cls(pool_size=pool_size, queues=queues, server=server, password=password, logging_level=logging_level, log_file=log_file) + worker.work(interval=interval) #if __name__ == "__main__": # k = Khan() From e25091be367bbdcfe2d9393684b9ea84b0292ab7 Mon Sep 17 00:00:00 2001 From: Paulo Sousa Date: Tue, 15 Oct 2013 10:46:49 -0300 Subject: [PATCH 36/46] implementing minions interval on manager command line --- pyres/horde.py | 20 ++++++++++++++------ pyres/scripts.py | 16 +++++++++++----- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/pyres/horde.py b/pyres/horde.py index 5db0dc9..ce23210 100644 --- a/pyres/horde.py +++ b/pyres/horde.py @@ -33,7 +33,7 @@ def setup_logging(procname, namespace='', log_level=logging.INFO, log_file=None) return logger class Minion(multiprocessing.Process): - def __init__(self, queues, server, password, log_level=logging.INFO, log_path=None): + def __init__(self, queues, server, password, log_level=logging.INFO, log_path=None, interval=5): multiprocessing.Process.__init__(self, name='Minion') #format = '%(asctime)s %(levelname)s %(filename)s-%(lineno)d: %(message)s' @@ -48,6 +48,7 @@ def __init__(self, queues, server, password, log_level=logging.INFO, log_path=No self.hostname = os.uname()[1] self.server = server self.password = password + self.interval = interval self.log_level = log_level self.log_path = log_path @@ -142,6 +143,7 @@ def work(self, interval=5): if job: self.process(job) else: + self.logger.debug('minion sleeping for: %i secs' % interval) time.sleep(interval) self.unregister_minion() @@ -167,7 +169,7 @@ def run(self): raise Exception("Bad server argument") - self.work() + self.work(self.interval) #while True: # job = self.q.get() # print 'pid: %s is running %s ' % (self.pid,job) @@ -179,7 +181,8 @@ class Khan(object): 'REMOVE': '_remove_minion', 'SHUTDOWN': '_schedule_shutdown' } - def __init__(self, pool_size=5, queues=[], server='localhost:6379', password=None, logging_level=logging.INFO, log_file=None): + def __init__(self, pool_size=5, queues=[], server='localhost:6379', password=None, logging_level=logging.INFO, + log_file=None, minions_interval=5): #super(Khan,self).__init__(queues=queues,server=server,password=password) self._shutdown = False self.pool_size = int(pool_size) @@ -193,6 +196,7 @@ def __init__(self, pool_size=5, queues=[], server='localhost:6379', password=Non self.password = password self.logging_level = logging_level self.log_file = log_file + self.minions_interval = minions_interval #self._workers = list() @@ -307,7 +311,8 @@ def _add_minion(self): log_path = os.path.dirname(self.log_file) else: log_path = None - m = Minion(self.queues, self.server, self.password, log_level=self.logging_level, log_path=log_path) + m = Minion(self.queues, self.server, self.password, interval=self.minions_interval, + log_level=self.logging_level, log_path=log_path) m.start() self._workers[m.pid] = m if hasattr(self,'logger'): @@ -365,6 +370,7 @@ def work(self, interval=2): break #get job else: + self.logger.debug('manager sleeping for: %i secs' % interval) time.sleep(interval) self.unregister_khan() @@ -373,8 +379,10 @@ def __str__(self): return '%s:%s:%s' % (hostname, self.pid, self.pool_size) @classmethod - def run(cls, pool_size=5, queues=[], server='localhost:6379', password=None, interval=2, logging_level=logging.INFO, log_file=None): - worker = cls(pool_size=pool_size, queues=queues, server=server, password=password, logging_level=logging_level, log_file=log_file) + def run(cls, pool_size=5, queues=[], server='localhost:6379', password=None, interval=2, + logging_level=logging.INFO, log_file=None, minions_interval=5): + worker = cls(pool_size=pool_size, queues=queues, server=server, password=password, logging_level=logging_level, + log_file=log_file, minions_interval=minions_interval) worker.work(interval=interval) #if __name__ == "__main__": diff --git a/pyres/scripts.py b/pyres/scripts.py index 26e07d7..30bc4a6 100644 --- a/pyres/scripts.py +++ b/pyres/scripts.py @@ -15,7 +15,8 @@ def pyres_manager(): parser.add_option("--host", dest="host", default="localhost") parser.add_option("--port", dest="port",type="int", default=6379) parser.add_option("--password", dest="password", default=None) - parser.add_option("-i", '--interval', dest='interval', default=None, help='the default time interval to sleep between runs') + parser.add_option("-i", '--interval', dest='manager_interval', default=None, help='the default time interval to sleep between runs - manager') + parser.add_option("--minions_interval", dest='minions_interval', default=None, help='the default time interval to sleep between runs - minions') parser.add_option('-l', '--log-level', dest='log_level', default='info', help='log level. Valid values are "debug", "info", "warning", "error", "critical", in decreasing order of verbosity. Defaults to "info" if parameter not specified.') parser.add_option("--pool", type="int", dest="pool_size", default=1, help="Number of minions to spawn under the manager.") parser.add_option('-f', dest='logfile', help='If present, a logfile will be used. "stderr", "stdout", and "syslog" are all special values.') @@ -31,14 +32,19 @@ def pyres_manager(): setup_pidfile(options.pidfile) - interval = options.interval - if interval is not None: - interval = float(interval) + manager_interval = options.manager_interval + if manager_interval is not None: + manager_interval = float(manager_interval) + + minions_interval = options.minions_interval + if minions_interval is not None: + minions_interval = float(minions_interval) queues = args[0].split(',') server = '%s:%s' % (options.host,options.port) password = options.password - Khan.run(pool_size=options.pool_size, queues=queues, server=server, password=password, interval=interval, logging_level=log_level, log_file=options.logfile) + Khan.run(pool_size=options.pool_size, queues=queues, server=server, password=password, interval=interval, + logging_level=log_level, log_file=options.logfile, minions_interval=minions_interval) def pyres_scheduler(): From 004efa61a9774c98696660de4259bd14fcbbd155 Mon Sep 17 00:00:00 2001 From: Paulo Sousa Date: Tue, 15 Oct 2013 11:25:54 -0300 Subject: [PATCH 37/46] replacing interval for manager_interval --- pyres/scripts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyres/scripts.py b/pyres/scripts.py index 30bc4a6..75af6b7 100644 --- a/pyres/scripts.py +++ b/pyres/scripts.py @@ -43,7 +43,7 @@ def pyres_manager(): queues = args[0].split(',') server = '%s:%s' % (options.host,options.port) password = options.password - Khan.run(pool_size=options.pool_size, queues=queues, server=server, password=password, interval=interval, + Khan.run(pool_size=options.pool_size, queues=queues, server=server, password=password, interval=manager_interval, logging_level=log_level, log_file=options.logfile, minions_interval=minions_interval) From 51bcf4264cdb47eef5503b7f27366ee2befc9d06 Mon Sep 17 00:00:00 2001 From: Paulo Sousa Date: Tue, 15 Oct 2013 13:56:56 -0300 Subject: [PATCH 38/46] concat minions logs // implementing minion max process jobs --- pyres/horde.py | 26 +++++++++++++++++++++----- pyres/scripts.py | 9 ++++++--- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/pyres/horde.py b/pyres/horde.py index ce23210..6c0d0ae 100644 --- a/pyres/horde.py +++ b/pyres/horde.py @@ -33,7 +33,8 @@ def setup_logging(procname, namespace='', log_level=logging.INFO, log_file=None) return logger class Minion(multiprocessing.Process): - def __init__(self, queues, server, password, log_level=logging.INFO, log_path=None, interval=5): + def __init__(self, queues, server, password, log_level=logging.INFO, log_path=None, interval=5, concact_logs=False, + max_jobs=None): multiprocessing.Process.__init__(self, name='Minion') #format = '%(asctime)s %(levelname)s %(filename)s-%(lineno)d: %(message)s' @@ -53,6 +54,7 @@ def __init__(self, queues, server, password, log_level=logging.INFO, log_path=No self.log_level = log_level self.log_path = log_path self.log_file = None + self.concact_logs = concact_logs def prune_dead_workers(self): pass @@ -133,16 +135,24 @@ def unregister_minion(self): def work(self, interval=5): self.startup() + cur_job = 0 while True: setproctitle('pyres_minion:%s: waiting for job on: %s' % (os.getppid(),self.queues)) self.logger.info('waiting on job') if self._shutdown: self.logger.info('shutdown scheduled') break + if self.max_jobs and self.max_jobs > cur_job: + self.logger.debug('max_jobs reached on %s: %s' % (os.getppid(), cur_job)) + self.logger.debug('minion sleeping for: %i secs' % interval) + time.sleep(interval) + cur_job = 0 job = self.reserve() if job: self.process(job) + cur_job = cur_job + 1 else: + cur_job = 0 self.logger.debug('minion sleeping for: %i secs' % interval) time.sleep(interval) self.unregister_minion() @@ -156,6 +166,8 @@ def run(self): if self.log_path: if special_log_file(self.log_path): self.log_file = self.log_path + elif self.concat_logs: + self.log_file = os.path.join(self.log_path, 'minion.log') else: self.log_file = os.path.join(self.log_path, 'minion-%s.log' % self.pid) namespace = 'minion:%s' % self.pid @@ -182,7 +194,7 @@ class Khan(object): 'SHUTDOWN': '_schedule_shutdown' } def __init__(self, pool_size=5, queues=[], server='localhost:6379', password=None, logging_level=logging.INFO, - log_file=None, minions_interval=5): + log_file=None, minions_interval=5, minions_concact_logs=False, max_jobs=None): #super(Khan,self).__init__(queues=queues,server=server,password=password) self._shutdown = False self.pool_size = int(pool_size) @@ -197,6 +209,8 @@ def __init__(self, pool_size=5, queues=[], server='localhost:6379', password=Non self.logging_level = logging_level self.log_file = log_file self.minions_interval = minions_interval + self.minions_concact_logs = minions_concact_logs + self.max_jobs = max_jobs #self._workers = list() @@ -312,7 +326,8 @@ def _add_minion(self): else: log_path = None m = Minion(self.queues, self.server, self.password, interval=self.minions_interval, - log_level=self.logging_level, log_path=log_path) + log_level=self.logging_level, log_path=log_path, concact_logs=self.minions_concact_logs, + max_jobs=self.max_jobs) m.start() self._workers[m.pid] = m if hasattr(self,'logger'): @@ -380,9 +395,10 @@ def __str__(self): @classmethod def run(cls, pool_size=5, queues=[], server='localhost:6379', password=None, interval=2, - logging_level=logging.INFO, log_file=None, minions_interval=5): + logging_level=logging.INFO, log_file=None, minions_interval=5, minions_concact_logs=False, max_jobs=None): worker = cls(pool_size=pool_size, queues=queues, server=server, password=password, logging_level=logging_level, - log_file=log_file, minions_interval=minions_interval) + log_file=log_file, minions_interval=minions_interval, minions_concact_logs=minions_concact_logs, + max_jobs=max_jobs) worker.work(interval=interval) #if __name__ == "__main__": diff --git a/pyres/scripts.py b/pyres/scripts.py index 75af6b7..6d9a0af 100644 --- a/pyres/scripts.py +++ b/pyres/scripts.py @@ -19,8 +19,10 @@ def pyres_manager(): parser.add_option("--minions_interval", dest='minions_interval', default=None, help='the default time interval to sleep between runs - minions') parser.add_option('-l', '--log-level', dest='log_level', default='info', help='log level. Valid values are "debug", "info", "warning", "error", "critical", in decreasing order of verbosity. Defaults to "info" if parameter not specified.') parser.add_option("--pool", type="int", dest="pool_size", default=1, help="Number of minions to spawn under the manager.") + parser.add_option("-j", "--process_max_jobs", dest="max_jobs", default=None, help='how many jobs should be processed on worker run.') parser.add_option('-f', dest='logfile', help='If present, a logfile will be used. "stderr", "stdout", and "syslog" are all special values.') parser.add_option('-p', dest='pidfile', help='If present, a pidfile will be used.') + parser.add_option("--concat_minions_log", action="store_true", dest="concat_minions_log", help='Concat all minions logs on same file.') (options,args) = parser.parse_args() if len(args) != 1: @@ -29,7 +31,7 @@ def pyres_manager(): log_level = getattr(logging, options.log_level.upper(), 'INFO') #logging.basicConfig(level=log_level, format="%(asctime)s: %(levelname)s: %(message)s") - + concat_minions_log = options.concat_minions_log setup_pidfile(options.pidfile) manager_interval = options.manager_interval @@ -38,13 +40,14 @@ def pyres_manager(): minions_interval = options.minions_interval if minions_interval is not None: - minions_interval = float(minions_interval) + minions_interval = float(minions_interval) queues = args[0].split(',') server = '%s:%s' % (options.host,options.port) password = options.password Khan.run(pool_size=options.pool_size, queues=queues, server=server, password=password, interval=manager_interval, - logging_level=log_level, log_file=options.logfile, minions_interval=minions_interval) + logging_level=log_level, log_file=options.logfile, minions_interval=minions_interval, + concat_minions_log=concat_minions_log, max_jobs=options.max_jobs) def pyres_scheduler(): From 0da62c419564b3cf6b1303658c7a030af1cfa2b6 Mon Sep 17 00:00:00 2001 From: Paulo Sousa Date: Tue, 15 Oct 2013 15:52:49 -0300 Subject: [PATCH 39/46] fixing concat_minions_logs and max_jobs --- pyres/horde.py | 28 +++++++++++++++------------- pyres/scripts.py | 8 ++++---- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/pyres/horde.py b/pyres/horde.py index 6c0d0ae..b41b7a4 100644 --- a/pyres/horde.py +++ b/pyres/horde.py @@ -33,8 +33,8 @@ def setup_logging(procname, namespace='', log_level=logging.INFO, log_file=None) return logger class Minion(multiprocessing.Process): - def __init__(self, queues, server, password, log_level=logging.INFO, log_path=None, interval=5, concact_logs=False, - max_jobs=None): + def __init__(self, queues, server, password, log_level=logging.INFO, log_path=None, interval=5, concat_logs=False, + max_jobs=0): multiprocessing.Process.__init__(self, name='Minion') #format = '%(asctime)s %(levelname)s %(filename)s-%(lineno)d: %(message)s' @@ -54,7 +54,8 @@ def __init__(self, queues, server, password, log_level=logging.INFO, log_path=No self.log_level = log_level self.log_path = log_path self.log_file = None - self.concact_logs = concact_logs + self.concat_logs = concat_logs + self.max_jobs = max_jobs def prune_dead_workers(self): pass @@ -142,9 +143,10 @@ def work(self, interval=5): if self._shutdown: self.logger.info('shutdown scheduled') break - if self.max_jobs and self.max_jobs > cur_job: - self.logger.debug('max_jobs reached on %s: %s' % (os.getppid(), cur_job)) - self.logger.debug('minion sleeping for: %i secs' % interval) + self.logger.debug('max_jobs: %d cur_jobs: %d' % (self.max_jobs, cur_job)) + if (self.max_jobs > 0 and self.max_jobs < cur_job): + self.logger.debug('max_jobs reached on %s: %d' % (self.pid, cur_job)) + self.logger.debug('minion sleeping for: %d secs' % interval) time.sleep(interval) cur_job = 0 job = self.reserve() @@ -153,7 +155,7 @@ def work(self, interval=5): cur_job = cur_job + 1 else: cur_job = 0 - self.logger.debug('minion sleeping for: %i secs' % interval) + self.logger.debug('minion sleeping for: %d secs' % interval) time.sleep(interval) self.unregister_minion() @@ -194,7 +196,7 @@ class Khan(object): 'SHUTDOWN': '_schedule_shutdown' } def __init__(self, pool_size=5, queues=[], server='localhost:6379', password=None, logging_level=logging.INFO, - log_file=None, minions_interval=5, minions_concact_logs=False, max_jobs=None): + log_file=None, minions_interval=5, concat_minions_logs=False, max_jobs=0): #super(Khan,self).__init__(queues=queues,server=server,password=password) self._shutdown = False self.pool_size = int(pool_size) @@ -209,7 +211,7 @@ def __init__(self, pool_size=5, queues=[], server='localhost:6379', password=Non self.logging_level = logging_level self.log_file = log_file self.minions_interval = minions_interval - self.minions_concact_logs = minions_concact_logs + self.concat_minions_logs = concat_minions_logs self.max_jobs = max_jobs #self._workers = list() @@ -326,7 +328,7 @@ def _add_minion(self): else: log_path = None m = Minion(self.queues, self.server, self.password, interval=self.minions_interval, - log_level=self.logging_level, log_path=log_path, concact_logs=self.minions_concact_logs, + log_level=self.logging_level, log_path=log_path, concat_logs=self.concat_minions_logs, max_jobs=self.max_jobs) m.start() self._workers[m.pid] = m @@ -385,7 +387,7 @@ def work(self, interval=2): break #get job else: - self.logger.debug('manager sleeping for: %i secs' % interval) + self.logger.debug('manager sleeping for: %d secs' % interval) time.sleep(interval) self.unregister_khan() @@ -395,9 +397,9 @@ def __str__(self): @classmethod def run(cls, pool_size=5, queues=[], server='localhost:6379', password=None, interval=2, - logging_level=logging.INFO, log_file=None, minions_interval=5, minions_concact_logs=False, max_jobs=None): + logging_level=logging.INFO, log_file=None, minions_interval=5, concat_minions_logs=False, max_jobs=0): worker = cls(pool_size=pool_size, queues=queues, server=server, password=password, logging_level=logging_level, - log_file=log_file, minions_interval=minions_interval, minions_concact_logs=minions_concact_logs, + log_file=log_file, minions_interval=minions_interval, concat_minions_logs=concat_minions_logs, max_jobs=max_jobs) worker.work(interval=interval) diff --git a/pyres/scripts.py b/pyres/scripts.py index 6d9a0af..ff2d466 100644 --- a/pyres/scripts.py +++ b/pyres/scripts.py @@ -19,10 +19,10 @@ def pyres_manager(): parser.add_option("--minions_interval", dest='minions_interval', default=None, help='the default time interval to sleep between runs - minions') parser.add_option('-l', '--log-level', dest='log_level', default='info', help='log level. Valid values are "debug", "info", "warning", "error", "critical", in decreasing order of verbosity. Defaults to "info" if parameter not specified.') parser.add_option("--pool", type="int", dest="pool_size", default=1, help="Number of minions to spawn under the manager.") - parser.add_option("-j", "--process_max_jobs", dest="max_jobs", default=None, help='how many jobs should be processed on worker run.') + parser.add_option("-j", "--process_max_jobs", dest="max_jobs", type=int, default=0, help='how many jobs should be processed on worker run.') parser.add_option('-f', dest='logfile', help='If present, a logfile will be used. "stderr", "stdout", and "syslog" are all special values.') parser.add_option('-p', dest='pidfile', help='If present, a pidfile will be used.') - parser.add_option("--concat_minions_log", action="store_true", dest="concat_minions_log", help='Concat all minions logs on same file.') + parser.add_option("--concat_minions_logs", action="store_true", dest="concat_minions_logs", help='Concat all minions logs on same file.') (options,args) = parser.parse_args() if len(args) != 1: @@ -31,7 +31,7 @@ def pyres_manager(): log_level = getattr(logging, options.log_level.upper(), 'INFO') #logging.basicConfig(level=log_level, format="%(asctime)s: %(levelname)s: %(message)s") - concat_minions_log = options.concat_minions_log + concat_minions_logs = options.concat_minions_logs setup_pidfile(options.pidfile) manager_interval = options.manager_interval @@ -47,7 +47,7 @@ def pyres_manager(): password = options.password Khan.run(pool_size=options.pool_size, queues=queues, server=server, password=password, interval=manager_interval, logging_level=log_level, log_file=options.logfile, minions_interval=minions_interval, - concat_minions_log=concat_minions_log, max_jobs=options.max_jobs) + concat_minions_logs=concat_minions_logs, max_jobs=options.max_jobs) def pyres_scheduler(): From 3ec8d1751d491243d2d43a710a47ad95d584c136 Mon Sep 17 00:00:00 2001 From: Danny Cosson Date: Mon, 28 Oct 2013 14:56:43 -0400 Subject: [PATCH 40/46] when getting delayed timestamps, limit the query to one result since only the first item in that list is ever used --- pyres/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyres/__init__.py b/pyres/__init__.py index 3326b6e..cc366dc 100644 --- a/pyres/__init__.py +++ b/pyres/__init__.py @@ -314,7 +314,7 @@ def delayed_timestamp_size(self, timestamp): def next_delayed_timestamp(self): key = int(time.mktime(ResQ._current_time().timetuple())) array = self.redis.zrangebyscore('resque:delayed_queue_schedule', - '-inf', key) + '-inf', key, start=0, num=1) timestamp = None if array: timestamp = array[0] From fbc7bfe7730c8894edbd53e0500c236202dd6c11 Mon Sep 17 00:00:00 2001 From: Matt George Date: Wed, 16 Apr 2014 19:28:39 -0600 Subject: [PATCH 41/46] version bump for new release --- pyres/__init__.py | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyres/__init__.py b/pyres/__init__.py index cc366dc..53bc6ec 100644 --- a/pyres/__init__.py +++ b/pyres/__init__.py @@ -1,4 +1,4 @@ -__version__ = '1.4.2' +__version__ = '1.5' from redis import Redis from pyres.compat import string_types diff --git a/setup.py b/setup.py index 966dad0..e0597ed 100644 --- a/setup.py +++ b/setup.py @@ -22,7 +22,7 @@ def run_tests(self): result = pytest.main(self.test_args) sys.exit(result) -version='1.4.2' +version='1.5' setup( name='pyres', version=version, From 3140ba04c35734b5ae69f4b1bf2b7877e2dd1547 Mon Sep 17 00:00:00 2001 From: Matt George Date: Wed, 16 Apr 2014 19:32:37 -0600 Subject: [PATCH 42/46] adding redis support to ci --- .travis.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.travis.yml b/.travis.yml index 66b134f..cafbb59 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,3 +9,5 @@ install: - python setup.py install # command to run tests script: python setup.py test +services: + - redis-server From 49aebb24f8a58c6993ffeb1ea46add624c2706f7 Mon Sep 17 00:00:00 2001 From: Matt George Date: Wed, 16 Apr 2014 20:02:32 -0600 Subject: [PATCH 43/46] trying 3.4 support --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index cafbb59..3a1cd4e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,6 +3,7 @@ python: - "2.6" - "2.7" - "3.3" + - "3.4" # - "pypy" # command to install dependencies install: From 943a4a2808656a532fd9ab095b9df93833b81637 Mon Sep 17 00:00:00 2001 From: John Anderson Date: Wed, 6 Aug 2014 08:40:23 -0700 Subject: [PATCH 44/46] Always run after_perform, you should check 'failed' key --- pyres/job.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pyres/job.py b/pyres/job.py index 8358494..4f4b547 100644 --- a/pyres/job.py +++ b/pyres/job.py @@ -82,7 +82,6 @@ def perform(self): payload_class.before_perform(metadata) return payload_class.perform(*args) except Exception as e: - check_after = False metadata["failed"] = True metadata["exception"] = e if not self.retry(payload_class, args): @@ -93,8 +92,10 @@ def perform(self): logging.exception("Retry scheduled after error in %s", self._payload) finally: after_perform = getattr(payload_class, "after_perform", None) - if after_perform and check_after: + + if after_perform: payload_class.after_perform(metadata) + delattr(payload_class,'resq') def fail(self, exception): From 1bc12b635d31dbfa2b166064307c2762627a1590 Mon Sep 17 00:00:00 2001 From: Marc Abramowitz Date: Sun, 5 Oct 2014 23:01:34 -0700 Subject: [PATCH 45/46] Small tweaks to example.rst wold => world Add Python code block for syntax highlighting. --- docs/source/example.rst | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/source/example.rst b/docs/source/example.rst index 4c67e23..33d8c93 100644 --- a/docs/source/example.rst +++ b/docs/source/example.rst @@ -1,7 +1,7 @@ Example ========= -Let's take a real wold example of a blog where comments need to be checked for +Let's take a real world example of a blog where comments need to be checked for spam. When the comment is saved in the database, we create a job in the queue with that comment data. Let's take a django model in this case. @@ -33,7 +33,9 @@ You can convert your existing class to be compatible with pyres. All you need to do is add a :attr:`queue` attribute and define a :meth:`perform` method on the class. -To insert a job into the queue you need to do something like this:: +To insert a job into the queue you need to do something like this: + +.. code-block:: python >>> from pyres import ResQ >>> r = ResQ() @@ -47,4 +49,3 @@ In the **scripts** folder there is an executable:: Just pass a comma separated list of queues the worker should poll. - From c100e3f4384242351bc5cedb06589f5c8195000b Mon Sep 17 00:00:00 2001 From: toby cabot Date: Wed, 13 May 2015 15:44:46 -0400 Subject: [PATCH 46/46] handle jobs queued by Ruby Resque with no module Ruby compatibility: Resque sends just a class name and not a module name so if I use Resque to queue a ruby class called "Worker" then Pyres will throw a "ValueError: Empty module name" exception. To avoid that, if there's no module name in the json then we'll use the class name as a module name. --- pyres/__init__.py | 9 +++++++++ tests/__init__.py | 12 +++++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/pyres/__init__.py b/pyres/__init__.py index 53bc6ec..011cd88 100644 --- a/pyres/__init__.py +++ b/pyres/__init__.py @@ -90,6 +90,15 @@ def safe_str_to_class(s): klass = lst[-1] mod_list = lst[:-1] module = ".".join(mod_list) + + # ruby compatibility kludge: resque sends just a class name and + # not a module name so if I use resque to queue a ruby class + # called "Worker" then pyres will throw a "ValueError: Empty + # module name" exception. To avoid that, if there's no module in + # the json then we'll use the classname as a module name. + if not module: + module = klass + mod = my_import(module) if hasattr(mod, klass): return getattr(mod, klass) diff --git a/tests/__init__.py b/tests/__init__.py index d75c866..64f09eb 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -2,6 +2,14 @@ import os from pyres import ResQ, str_to_class +class tests(object): + queue = 'basic' + + @staticmethod + def perform(name): + s = "name:%s" % name + return s + class Basic(object): queue = 'basic' @@ -111,7 +119,9 @@ def test_safe_str_to_class(self): assert safe_str_to_class('tests.Basic') == Basic self.assertRaises(ImportError, safe_str_to_class, 'test.Mine') self.assertRaises(ImportError, safe_str_to_class, 'tests.World') - + # test that we'll use the class name as a module name if no + # module name is provided (for Ruby compatibility) + assert safe_str_to_class('tests') == tests class PyResTests(unittest.TestCase): def setUp(self):