diff --git a/setup.py b/setup.py index 5620ce3d..5623ecde 100644 --- a/setup.py +++ b/setup.py @@ -70,7 +70,7 @@ install_requires = [ "requests", "pycontracts", - "celery", + "celery>=3.1", "gevent", "redis" ], diff --git a/snowplow_tracker/emitters.py b/snowplow_tracker/emitters.py index 51d9b84f..1be738d1 100644 --- a/snowplow_tracker/emitters.py +++ b/snowplow_tracker/emitters.py @@ -22,9 +22,8 @@ import requests import json import threading -import celery -from celery import Celery -from celery.contrib.methods import task + +from celery import shared_task import redis import logging from contracts import contract, new_contract @@ -44,16 +43,6 @@ new_contract("redis", lambda x: isinstance(x, (redis.Redis, redis.StrictRedis))) -try: - # Check whether a custom Celery configuration module named "snowplow_celery_config" exists - import snowplow_celery_config - app = Celery() - app.config_from_object(snowplow_celery_config) - -except ImportError: - # Otherwise configure Celery with default settings - app = Celery("Snowplow", broker="redis://guest@localhost//") - class Emitter(object): """ @@ -82,7 +71,7 @@ def __init__(self, endpoint, protocol="http", port=None, method="get", buffer_si 1) The number of events which were successfully sent 2) If method is "post": The unsent data in string form; If method is "get": An array of dictionaries corresponding to the unsent events' payloads - :type on_failure: function | None + :type on_failure: function | None """ self.endpoint = Emitter.as_collector_uri(endpoint, protocol, port, method) @@ -110,9 +99,9 @@ def as_collector_uri(endpoint, protocol="http", port=None, method="get"): :param endpoint: The raw endpoint provided by the user :type endpoint: string :param protocol: The protocol to use - http or https - :type protocol: protocol + :type protocol: protocol :param port: The collector port to connect to - :type port: int | None + :type port: int | None :rtype: string """ if method == "get": @@ -141,48 +130,14 @@ def input(self, payload): if len(self.buffer) >= self.buffer_size: self.flush() - @task(name="Flush") - def flush(self): + def flush(self, celery=False): """ Sends all events in the buffer to the collector. """ - logger.info("Attempting to send %s requests" % len(self.buffer)) - if self.method == "post": - if self.buffer: - data = json.dumps({ - "schema": PAYLOAD_DATA_SCHEMA, - "data": self.buffer - }, separators=(',', ':')) - temp_buffer = self.buffer - self.buffer = [] - status_code = self.http_post(data).status_code - if status_code == 200 and self.on_success is not None: - self.on_success(len(temp_buffer)) - elif self.on_failure is not None: - self.on_failure(0, temp_buffer) - - elif self.method == "get": - success_count = 0 - unsent_requests = [] - status_code = None - - while len(self.buffer) > 0: - payload = self.buffer.pop() - status_code = self.http_get(payload).status_code - if status_code == 200: - success_count += 1 - else: - unsent_requests.append(payload) - - if len(unsent_requests) == 0: - if self.on_success is not None: - self.on_success(success_count) - - elif self.on_failure is not None: - self.on_failure(success_count, unsent_requests) - + if celery: + static_flush.delay(self) else: - logger.warn(self.method + ' is not a recognised HTTP method. Use "get" or "post".') + static_flush(self) @contract def http_post(self, data): @@ -249,7 +204,7 @@ def flush(self): """ Schedules a flush task """ - super(CeleryEmitter, self).flush.delay() + super(CeleryEmitter, self).flush(True) logger.info("Scheduled a Celery task to flush the event queue") @@ -285,3 +240,46 @@ def flush(self): def sync_flush(self): self.flush() + +@shared_task(name="Flush") +def static_flush(self): + """ + Sends all events in the buffer to the collector. + """ + logger.info("Attempting to send %s requests" % len(self.buffer)) + if self.method == "post": + if self.buffer: + data = json.dumps({ + "schema": PAYLOAD_DATA_SCHEMA, + "data": self.buffer + }, separators=(',', ':')) + temp_buffer = self.buffer + self.buffer = [] + status_code = self.http_post(data).status_code + if status_code == 200 and self.on_success is not None: + self.on_success(len(temp_buffer)) + elif self.on_failure is not None: + self.on_failure(0, temp_buffer) + + elif self.method == "get": + success_count = 0 + unsent_requests = [] + status_code = None + + while len(self.buffer) > 0: + payload = self.buffer.pop() + status_code = self.http_get(payload).status_code + if status_code == 200: + success_count += 1 + else: + unsent_requests.append(payload) + + if len(unsent_requests) == 0: + if self.on_success is not None: + self.on_success(success_count) + + elif self.on_failure is not None: + self.on_failure(success_count, unsent_requests) + + else: + logger.warn(self.method + ' is not a recognised HTTP method. Use "get" or "post".')