diff --git a/snowplow_tracker/emitters.py b/snowplow_tracker/emitters.py index 66eeba4b..b7932469 100644 --- a/snowplow_tracker/emitters.py +++ b/snowplow_tracker/emitters.py @@ -32,7 +32,6 @@ import celery from celery import Celery -from celery.contrib.methods import task import redis import requests from contracts import contract, new_contract @@ -53,6 +52,20 @@ new_contract("redis", lambda x: isinstance(x, (redis.Redis, redis.StrictRedis))) +if not celery.current_app or not celery.current_app.configured: + try: + # Check whether a custom Celery configuration module named "snowplow_celery_config" exists + import snowplow_celery_config + + app = Celery() + app.config_from_object(snowplow_celery_config) + + except ImportError: + # Otherwise configure Celery with default settings + app = Celery("Snowplow", broker="redis://guest@localhost//") +else: + app = celery.current_app + class Emitter(object): """ @@ -163,7 +176,7 @@ def reached_limit(self): else: return self.bytes_queued >= self.byte_limit or len(self.buffer) >= self.buffer_size - @task(name="Flush") + @app.task(name="Flush") def flush(self): """ Sends all events in the buffer to the collector. @@ -385,16 +398,6 @@ class CeleryEmitter(Emitter): but on_success and on_failure callbacks cannot be set. """ def __init__(self, endpoint, protocol="http", port=None, method="get", buffer_size=None, byte_limit=None): - if not celery.current_app or not celery.current_app.configured: - try: - # Check whether a custom Celery configuration module named "snowplow_celery_config" exists - import snowplow_celery_config - app = Celery() - app.config_from_object(snowplow_celery_config) - - except ImportError: - # Otherwise configure Celery with default settings - app = Celery("Snowplow", broker="redis://guest@localhost//") super(CeleryEmitter, self).__init__(endpoint, protocol, port, method, buffer_size, None, None, byte_limit) def flush(self):