diff --git a/.travis.yml b/.travis.yml index a038d04b..4f46a018 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,6 +5,7 @@ python: - '2.7' - '3.4' - '3.5' + - '3.6' matrix: include: - python: '3.7' diff --git a/snowplow_tracker/_version.py b/snowplow_tracker/_version.py index 59f9008f..63c85c93 100644 --- a/snowplow_tracker/_version.py +++ b/snowplow_tracker/_version.py @@ -20,6 +20,6 @@ """ -__version_info__ = (0, 8, 2) +__version_info__ = (0, 8, 4) __version__ = ".".join(str(x) for x in __version_info__) __build_version__ = __version__ + '' diff --git a/snowplow_tracker/emitters.py b/snowplow_tracker/emitters.py index ff2380cc..e2dfab28 100644 --- a/snowplow_tracker/emitters.py +++ b/snowplow_tracker/emitters.py @@ -30,6 +30,7 @@ # Python 3 from queue import Queue +import celery from celery import Celery import redis import requests @@ -51,16 +52,6 @@ new_contract("redis", lambda x: isinstance(x, (redis.Redis, redis.StrictRedis))) -try: - # Check whether a custom Celery configuration module named "snowplow_celery_config" exists - import snowplow_celery_config - app = Celery() - app.config_from_object(snowplow_celery_config) -except ImportError: - # Otherwise configure Celery with default settings - snowplow_celery_config = None - app = Celery("Snowplow", broker="redis://guest@localhost//") - class Emitter(object): """ @@ -385,14 +376,6 @@ def consume(self): self.queue.task_done() -@app.task(bind=True, name='tasks.flush') # the self passed with bind can be used for on_fail/retrying -def flush_emitter(self, emitter): - try: - emitter.flush() - finally: - logger.info("Flush called on emitter") - - class CeleryEmitter(Emitter): """ Uses a Celery worker to send HTTP requests asynchronously. @@ -400,13 +383,24 @@ class CeleryEmitter(Emitter): but on_success and on_failure callbacks cannot be set. """ def __init__(self, endpoint, protocol="http", port=None, method="get", buffer_size=None, byte_limit=None): + if not celery.current_app or not celery.current_app.configured: + try: + # Check whether a custom Celery configuration module named "snowplow_celery_config" exists + import snowplow_celery_config + app = Celery() + app.config_from_object(snowplow_celery_config) + + except ImportError: + # Otherwise configure Celery with default settings + snowplow_celery_config = None + app = Celery("Snowplow", broker="redis://guest@localhost//") super(CeleryEmitter, self).__init__(endpoint, protocol, port, method, buffer_size, None, None, byte_limit) def flush(self): """ Schedules a flush task """ - flush_emitter.delay(self) # passes emitter (self - CeleryEmitter) to task + super(CeleryEmitter, self).flush.delay() logger.info("Scheduled a Celery task to flush the event queue") diff --git a/snowplow_tracker/payload.py b/snowplow_tracker/payload.py index c7c788b5..894505c7 100644 --- a/snowplow_tracker/payload.py +++ b/snowplow_tracker/payload.py @@ -82,7 +82,7 @@ def add_json(self, dict_, encode_base64, type_when_encoded, type_when_not_encode json_dict = json.dumps(dict_, ensure_ascii=False) if encode_base64: - encoded_dict = base64.urlsafe_b64encode(json_dict.encode("ascii")) + encoded_dict = base64.urlsafe_b64encode(json_dict.encode("utf-8")) if not isinstance(encoded_dict, str): encoded_dict = encoded_dict.decode("utf-8") self.add(type_when_encoded, encoded_dict)