From 79e6887ca723f1d926c200127b65612e9e214c40 Mon Sep 17 00:00:00 2001 From: Silviu Tantos Date: Sun, 5 Aug 2018 14:16:48 +0200 Subject: [PATCH 1/6] Do not pin dependencies to specific versions Because the packages in `setup.py` are really old and they use exact version, when installing the package into a virtualenv it will end-up downgrading already existing packages. In the Python documentation it is also mentioned that it is not a best practice to use `install_requires` to pin dependencies to specific versions. See also: - https://github.com/snowplow/snowplow-python-tracker/issues/195 - https://github.com/snowplow/snowplow-python-tracker/issues/198 - https://packaging.python.org/discussions/install-requires-vs-requirements/#install-requires --- setup.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/setup.py b/setup.py index 88392f47..cafd4581 100644 --- a/setup.py +++ b/setup.py @@ -75,12 +75,12 @@ ], install_requires=[ - "greenlet==0.4.10", - "requests==2.2.1", + "greenlet>=0.4.10", + "requests>=2.2.1", "pycontracts==1.7.6", - "celery==3.1.11", - "gevent==1.0.2", - "redis==2.9.1", - "six==1.9.0" + "celery>=3.1.11,<3.1.25", + "gevent>=1.0.2", + "redis>=2.9.1", + "six>=1.9.0" ], ) From 8e522bf86daed315e3f7656ccab04f08e1a4d7c2 Mon Sep 17 00:00:00 2001 From: Silviu Tantos Date: Sun, 5 Aug 2018 14:36:13 +0200 Subject: [PATCH 2/6] Default version of setuptools on Travis is old --- .travis.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.travis.yml b/.travis.yml index 7424bc4e..4f073aab 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,6 +7,8 @@ python: - '3.4' - '3.5' install: +# ensure we have recent pip/setuptools +- pip install --upgrade pip setuptools - pip install -r requirements-test.txt - pip install release-manager - pip install -e . From 2746c060f64a2188bd19511f41cc2c01aafd4252 Mon Sep 17 00:00:00 2001 From: Silviu Tantos Date: Sun, 22 Nov 2020 19:01:39 +0100 Subject: [PATCH 3/6] Allow custom config for celery. --- snowplow_tracker/emitters.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/snowplow_tracker/emitters.py b/snowplow_tracker/emitters.py index 2eccb0fb..60755c9a 100644 --- a/snowplow_tracker/emitters.py +++ b/snowplow_tracker/emitters.py @@ -393,16 +393,32 @@ class CeleryEmitter(Emitter): Works like the base Emitter class, but on_success and on_failure callbacks cannot be set. """ + celery_app = None + def __init__(self, endpoint, protocol="http", port=None, method="get", buffer_size=None, byte_limit=None): super(CeleryEmitter, self).__init__(endpoint, protocol, port, method, buffer_size, None, None, byte_limit) + try: + # Check whether a custom Celery configuration module named "snowplow_celery_config" exists + import snowplow_celery_config + self.celery_app = Celery() + self.celery_app.config_from_object(snowplow_celery_config) + except ImportError: + # Otherwise configure Celery with default settings + self.celery_app = Celery("Snowplow", broker="redis://guest@localhost//") + + self.async_flush = self.celery_app.task(self.async_flush) + def flush(self): """ Schedules a flush task """ - super(CeleryEmitter, self).flush.delay() + self.async_flush.delay() logger.info("Scheduled a Celery task to flush the event queue") + def async_flush(self): + super(CeleryEmitter, self).flush() + class RedisEmitter(object): """ From e250b95ebfd76a33716b3f7b77bbde82af69b0b5 Mon Sep 17 00:00:00 2001 From: Silviu Date: Sat, 3 Jul 2021 08:58:42 +0200 Subject: [PATCH 4/6] Don't override task name --- snowplow_tracker/emitters.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/snowplow_tracker/emitters.py b/snowplow_tracker/emitters.py index 60755c9a..e550bc1b 100644 --- a/snowplow_tracker/emitters.py +++ b/snowplow_tracker/emitters.py @@ -407,13 +407,13 @@ def __init__(self, endpoint, protocol="http", port=None, method="get", buffer_si # Otherwise configure Celery with default settings self.celery_app = Celery("Snowplow", broker="redis://guest@localhost//") - self.async_flush = self.celery_app.task(self.async_flush) + self.async_flush_task = self.celery_app.task(self.async_flush) def flush(self): """ Schedules a flush task """ - self.async_flush.delay() + self.async_flush_task.delay() logger.info("Scheduled a Celery task to flush the event queue") def async_flush(self): From 475ba772e0462b96fad3dc1e1611fdfa67cb317d Mon Sep 17 00:00:00 2001 From: Andrew Komar Date: Wed, 1 Sep 2021 16:05:37 +0200 Subject: [PATCH 5/6] Make the code compatible with celery 4+ celery.contrib.methods has been deprecated in Celery 4+ --- snowplow_tracker/emitters.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/snowplow_tracker/emitters.py b/snowplow_tracker/emitters.py index e550bc1b..450c0875 100644 --- a/snowplow_tracker/emitters.py +++ b/snowplow_tracker/emitters.py @@ -31,7 +31,6 @@ from queue import Queue from celery import Celery -from celery.contrib.methods import task import redis import requests from contracts import contract, new_contract @@ -172,7 +171,7 @@ def reached_limit(self): else: return self.bytes_queued >= self.byte_limit or len(self.buffer) >= self.buffer_size - @task(name="Flush") + @app.task(name="Flush") def flush(self): """ Sends all events in the buffer to the collector. From 5906a90d98886f0100aa79f38f123a8f1707a413 Mon Sep 17 00:00:00 2001 From: Andrew Komar Date: Tue, 5 Oct 2021 15:47:14 +0200 Subject: [PATCH 6/6] Async variable is reserved in python3.8 --- snowplow_tracker/tracker.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/snowplow_tracker/tracker.py b/snowplow_tracker/tracker.py index a6830a68..058fa2b6 100644 --- a/snowplow_tracker/tracker.py +++ b/snowplow_tracker/tracker.py @@ -626,16 +626,16 @@ def track_unstruct_event(self, event_json, context=None, tstamp=None): track_self_describing_event = track_unstruct_event @contract - def flush(self, async=False): + def flush(self, asynchronously=False): """ Flush the emitter - :param async: Whether the flush is done asynchronously. Default is False - :type async: bool + :param asynchronously: Whether the flush is done asynchronously. Default is False + :type asynchronously: bool :rtype: tracker """ for emitter in self.emitters: - if async: + if asynchronously: emitter.flush() else: emitter.sync_flush()