diff --git a/.travis.yml b/.travis.yml index 7424bc4e..4f073aab 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,6 +7,8 @@ python: - '3.4' - '3.5' install: +# ensure we have recent pip/setuptools +- pip install --upgrade pip setuptools - pip install -r requirements-test.txt - pip install release-manager - pip install -e . diff --git a/setup.py b/setup.py index 88392f47..cafd4581 100644 --- a/setup.py +++ b/setup.py @@ -75,12 +75,12 @@ ], install_requires=[ - "greenlet==0.4.10", - "requests==2.2.1", + "greenlet>=0.4.10", + "requests>=2.2.1", "pycontracts==1.7.6", - "celery==3.1.11", - "gevent==1.0.2", - "redis==2.9.1", - "six==1.9.0" + "celery>=3.1.11,<3.1.25", + "gevent>=1.0.2", + "redis>=2.9.1", + "six>=1.9.0" ], ) diff --git a/snowplow_tracker/emitters.py b/snowplow_tracker/emitters.py index 2eccb0fb..450c0875 100644 --- a/snowplow_tracker/emitters.py +++ b/snowplow_tracker/emitters.py @@ -31,7 +31,6 @@ from queue import Queue from celery import Celery -from celery.contrib.methods import task import redis import requests from contracts import contract, new_contract @@ -172,7 +171,7 @@ def reached_limit(self): else: return self.bytes_queued >= self.byte_limit or len(self.buffer) >= self.buffer_size - @task(name="Flush") + @app.task(name="Flush") def flush(self): """ Sends all events in the buffer to the collector. @@ -393,16 +392,32 @@ class CeleryEmitter(Emitter): Works like the base Emitter class, but on_success and on_failure callbacks cannot be set. """ + celery_app = None + def __init__(self, endpoint, protocol="http", port=None, method="get", buffer_size=None, byte_limit=None): super(CeleryEmitter, self).__init__(endpoint, protocol, port, method, buffer_size, None, None, byte_limit) + try: + # Check whether a custom Celery configuration module named "snowplow_celery_config" exists + import snowplow_celery_config + self.celery_app = Celery() + self.celery_app.config_from_object(snowplow_celery_config) + except ImportError: + # Otherwise configure Celery with default settings + self.celery_app = Celery("Snowplow", broker="redis://guest@localhost//") + + self.async_flush_task = self.celery_app.task(self.async_flush) + def flush(self): """ Schedules a flush task """ - super(CeleryEmitter, self).flush.delay() + self.async_flush_task.delay() logger.info("Scheduled a Celery task to flush the event queue") + def async_flush(self): + super(CeleryEmitter, self).flush() + class RedisEmitter(object): """ diff --git a/snowplow_tracker/tracker.py b/snowplow_tracker/tracker.py index a6830a68..058fa2b6 100644 --- a/snowplow_tracker/tracker.py +++ b/snowplow_tracker/tracker.py @@ -626,16 +626,16 @@ def track_unstruct_event(self, event_json, context=None, tstamp=None): track_self_describing_event = track_unstruct_event @contract - def flush(self, async=False): + def flush(self, asynchronously=False): """ Flush the emitter - :param async: Whether the flush is done asynchronously. Default is False - :type async: bool + :param asynchronously: Whether the flush is done asynchronously. Default is False + :type asynchronously: bool :rtype: tracker """ for emitter in self.emitters: - if async: + if asynchronously: emitter.flush() else: emitter.sync_flush()