From 39762fb34f60f399344737b2c4c488b616f0a5de Mon Sep 17 00:00:00 2001 From: Dustin Farris Date: Fri, 2 Jan 2015 14:08:39 -0500 Subject: [PATCH 1/2] Use the user's celery app Celery will provide a "default" app if one is not already define. The shared_task decorator will use this default app, or the "current" app if one has already been created by the user. If the user is already creating their own app, there is no need to instantiate one specifically for snowplow. --- setup.py | 2 +- snowplow_tracker/emitters.py | 25 +++++++------------------ 2 files changed, 8 insertions(+), 19 deletions(-) diff --git a/setup.py b/setup.py index 5620ce3d..5623ecde 100644 --- a/setup.py +++ b/setup.py @@ -70,7 +70,7 @@ install_requires = [ "requests", "pycontracts", - "celery", + "celery>=3.1", "gevent", "redis" ], diff --git a/snowplow_tracker/emitters.py b/snowplow_tracker/emitters.py index 7241cba3..ecf7f9e8 100644 --- a/snowplow_tracker/emitters.py +++ b/snowplow_tracker/emitters.py @@ -22,9 +22,8 @@ import requests import json import threading -import celery -from celery import Celery -from celery.contrib.methods import task + +from celery import shared_task import redis import logging from contracts import contract, new_contract @@ -44,16 +43,6 @@ new_contract("redis", lambda x: isinstance(x, (redis.Redis, redis.StrictRedis))) -try: - # Check whether a custom Celery configuration module named "snowplow_celery_config" exists - import snowplow_celery_config - app = Celery() - app.config_from_object(snowplow_celery_config) - -except ImportError: - # Otherwise configure Celery with default settings - app = Celery("Snowplow", broker="redis://guest@localhost//") - class Emitter(object): """ @@ -82,7 +71,7 @@ def __init__(self, endpoint, protocol="http", port=None, method="get", buffer_si 1) The number of events which were successfully sent 2) If method is "post": The unsent data in string form; If method is "get": An array of dictionaries corresponding to the unsent events' payloads - :type on_failure: function | None + :type on_failure: function | None """ self.endpoint = Emitter.as_collector_uri(endpoint, protocol, port, method) @@ -110,9 +99,9 @@ def as_collector_uri(endpoint, protocol="http", port=None, method="get"): :param endpoint: The raw endpoint provided by the user :type endpoint: string :param protocol: The protocol to use - http or https - :type protocol: protocol + :type protocol: protocol :param port: The collector port to connect to - :type port: int | None + :type port: int | None :rtype: string """ if method == "get": @@ -141,7 +130,7 @@ def input(self, payload): if len(self.buffer) >= self.buffer_size: self.flush() - @task(name="Flush") + @shared_task(name="Flush") def flush(self): """ Sends all events in the buffer to the collector. @@ -201,7 +190,7 @@ def http_get(self, payload): :type payload: dict(string:*) """ logger.debug("Sending GET request...") - r = requests.get(self.endpoint, params=payload) + r = requests.get(self.endpoint, params=payload) logger.info("GET request finished with status code: " + str(r.status_code)) return r From e84187f43e64a93644fb70d117d3c0eb5c101049 Mon Sep 17 00:00:00 2001 From: Fred Blundun Date: Thu, 12 Feb 2015 10:54:21 +0000 Subject: [PATCH 2/2] Got tests passing using shared_task --- snowplow_tracker/emitters.py | 87 ++++++++++++++++++++---------------- 1 file changed, 48 insertions(+), 39 deletions(-) diff --git a/snowplow_tracker/emitters.py b/snowplow_tracker/emitters.py index 10bee10e..1be738d1 100644 --- a/snowplow_tracker/emitters.py +++ b/snowplow_tracker/emitters.py @@ -130,48 +130,14 @@ def input(self, payload): if len(self.buffer) >= self.buffer_size: self.flush() - @shared_task(name="Flush") - def flush(self): + def flush(self, celery=False): """ Sends all events in the buffer to the collector. """ - logger.info("Attempting to send %s requests" % len(self.buffer)) - if self.method == "post": - if self.buffer: - data = json.dumps({ - "schema": PAYLOAD_DATA_SCHEMA, - "data": self.buffer - }, separators=(',', ':')) - temp_buffer = self.buffer - self.buffer = [] - status_code = self.http_post(data).status_code - if status_code == 200 and self.on_success is not None: - self.on_success(len(temp_buffer)) - elif self.on_failure is not None: - self.on_failure(0, temp_buffer) - - elif self.method == "get": - success_count = 0 - unsent_requests = [] - status_code = None - - while len(self.buffer) > 0: - payload = self.buffer.pop() - status_code = self.http_get(payload).status_code - if status_code == 200: - success_count += 1 - else: - unsent_requests.append(payload) - - if len(unsent_requests) == 0: - if self.on_success is not None: - self.on_success(success_count) - - elif self.on_failure is not None: - self.on_failure(success_count, unsent_requests) - + if celery: + static_flush.delay(self) else: - logger.warn(self.method + ' is not a recognised HTTP method. Use "get" or "post".') + static_flush(self) @contract def http_post(self, data): @@ -238,7 +204,7 @@ def flush(self): """ Schedules a flush task """ - super(CeleryEmitter, self).flush.delay() + super(CeleryEmitter, self).flush(True) logger.info("Scheduled a Celery task to flush the event queue") @@ -274,3 +240,46 @@ def flush(self): def sync_flush(self): self.flush() + +@shared_task(name="Flush") +def static_flush(self): + """ + Sends all events in the buffer to the collector. + """ + logger.info("Attempting to send %s requests" % len(self.buffer)) + if self.method == "post": + if self.buffer: + data = json.dumps({ + "schema": PAYLOAD_DATA_SCHEMA, + "data": self.buffer + }, separators=(',', ':')) + temp_buffer = self.buffer + self.buffer = [] + status_code = self.http_post(data).status_code + if status_code == 200 and self.on_success is not None: + self.on_success(len(temp_buffer)) + elif self.on_failure is not None: + self.on_failure(0, temp_buffer) + + elif self.method == "get": + success_count = 0 + unsent_requests = [] + status_code = None + + while len(self.buffer) > 0: + payload = self.buffer.pop() + status_code = self.http_get(payload).status_code + if status_code == 200: + success_count += 1 + else: + unsent_requests.append(payload) + + if len(unsent_requests) == 0: + if self.on_success is not None: + self.on_success(success_count) + + elif self.on_failure is not None: + self.on_failure(success_count, unsent_requests) + + else: + logger.warn(self.method + ' is not a recognised HTTP method. Use "get" or "post".')