Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions snowplow_tracker/emitters.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

import celery
from celery import Celery
from celery.contrib.methods import task
import redis
import requests
from contracts import contract, new_contract
Expand All @@ -53,6 +52,20 @@

new_contract("redis", lambda x: isinstance(x, (redis.Redis, redis.StrictRedis)))

if not celery.current_app or not celery.current_app.configured:
try:
# Check whether a custom Celery configuration module named "snowplow_celery_config" exists
import snowplow_celery_config

app = Celery()
app.config_from_object(snowplow_celery_config)

except ImportError:
# Otherwise configure Celery with default settings
app = Celery("Snowplow", broker="redis://guest@localhost//")
else:
app = celery.current_app


class Emitter(object):
"""
Expand Down Expand Up @@ -163,7 +176,7 @@ def reached_limit(self):
else:
return self.bytes_queued >= self.byte_limit or len(self.buffer) >= self.buffer_size

@task(name="Flush")
@app.task(name="Flush")
def flush(self):
"""
Sends all events in the buffer to the collector.
Expand Down Expand Up @@ -385,16 +398,6 @@ class CeleryEmitter(Emitter):
but on_success and on_failure callbacks cannot be set.
"""
def __init__(self, endpoint, protocol="http", port=None, method="get", buffer_size=None, byte_limit=None):
if not celery.current_app or not celery.current_app.configured:
try:
# Check whether a custom Celery configuration module named "snowplow_celery_config" exists
import snowplow_celery_config
app = Celery()
app.config_from_object(snowplow_celery_config)

except ImportError:
# Otherwise configure Celery with default settings
app = Celery("Snowplow", broker="redis://guest@localhost//")
super(CeleryEmitter, self).__init__(endpoint, protocol, port, method, buffer_size, None, None, byte_limit)

def flush(self):
Expand Down