diff --git a/build.gradle b/build.gradle index 15ba1289..639091a3 100644 --- a/build.gradle +++ b/build.gradle @@ -25,7 +25,7 @@ wrapper.gradleVersion = '6.5.0' group = 'com.snowplowanalytics' archivesBaseName = 'snowplow-java-tracker' -version = '0.12.0-alpha.0' +version = '0.12.0-alpha.1' sourceCompatibility = '1.8' targetCompatibility = '1.8' @@ -80,7 +80,8 @@ dependencies { // Testing libraries testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1' - testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1' + testCompileOnly 'junit:junit:4.13' + testRuntimeOnly 'org.junit.vintage:junit-vintage-engine' testImplementation 'org.hamcrest:hamcrest:2.2' testImplementation 'com.squareup.okhttp3:mockwebserver:4.9.2' diff --git a/examples/benchmarking/build.gradle b/examples/benchmarking/build.gradle index f5c3f773..e00d4ad9 100644 --- a/examples/benchmarking/build.gradle +++ b/examples/benchmarking/build.gradle @@ -34,4 +34,3 @@ repositories { dependencies { jmh 'com.snowplowanalytics:snowplow-java-tracker:0.10.1' } - diff --git a/examples/benchmarking/src/jmh/java/com/snowplowanalytics/TrackerBenchmark.java b/examples/benchmarking/src/jmh/java/com/snowplowanalytics/TrackerBenchmark.java index 0683c9af..c51b900f 100644 --- a/examples/benchmarking/src/jmh/java/com/snowplowanalytics/TrackerBenchmark.java +++ b/examples/benchmarking/src/jmh/java/com/snowplowanalytics/TrackerBenchmark.java @@ -66,9 +66,6 @@ public static Tracker getTracker(Emitter emitter) { } public static void closeThreads(Tracker tracker) { - // Use this line for versions 0.12.0 onwards -// tracker.close(); - // Use these lines for previous versions BatchEmitter emitter = (BatchEmitter) tracker.getEmitter(); emitter.close(); } diff --git a/examples/simple-console/src/main/java/com/snowplowanalytics/Main.java b/examples/simple-console/src/main/java/com/snowplowanalytics/Main.java index 4ea1f85d..490c30dd 100644 --- a/examples/simple-console/src/main/java/com/snowplowanalytics/Main.java +++ b/examples/simple-console/src/main/java/com/snowplowanalytics/Main.java @@ -18,12 +18,14 @@ import com.snowplowanalytics.snowplow.tracker.Tracker; import com.snowplowanalytics.snowplow.tracker.emitter.BatchEmitter; import com.snowplowanalytics.snowplow.tracker.events.*; +import com.snowplowanalytics.snowplow.tracker.http.HttpClientAdapter; import com.snowplowanalytics.snowplow.tracker.payload.SelfDescribingJson; import java.util.List; import static java.util.Collections.singletonList; import com.google.common.collect.ImmutableMap; +import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload; public class Main { @@ -34,7 +36,7 @@ public static String getUrlFromArgs(String[] args) { return args[0]; } - public static void main(String[] args) { + public static void main(String[] args) throws InterruptedException { String collectorEndpoint = getUrlFromArgs(args); // the application id to attach to events @@ -155,6 +157,7 @@ public static void main(String[] args) { // Will close all threads and force send remaining events emitter.close(); + Thread.sleep(5000); System.out.println("Tracked 7 events"); } diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java index ae92a5eb..8079ab9b 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java @@ -16,7 +16,6 @@ import com.snowplowanalytics.snowplow.tracker.constants.Constants; import com.snowplowanalytics.snowplow.tracker.constants.Parameter; -import com.snowplowanalytics.snowplow.tracker.emitter.BatchEmitter; import com.snowplowanalytics.snowplow.tracker.emitter.Emitter; import com.snowplowanalytics.snowplow.tracker.events.*; import com.snowplowanalytics.snowplow.tracker.payload.SelfDescribingJson; @@ -26,18 +25,12 @@ import org.slf4j.LoggerFactory; import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; public class Tracker { private Emitter emitter; private Subject subject; private final TrackerParameters parameters; - protected ExecutorService executor; private static final Logger LOGGER = LoggerFactory.getLogger(Tracker.class); /** @@ -58,11 +51,6 @@ private Tracker(TrackerBuilder builder) { this.emitter = builder.emitter; this.subject = builder.subject; - if (builder.requestExecutorService != null) { - this.executor = builder.requestExecutorService; - } else { - this.executor = Executors.newScheduledThreadPool(builder.threadCount, new TrackerThreadFactory()); - } } /** @@ -76,8 +64,6 @@ public static class TrackerBuilder { private Subject subject = null; // Optional private DevicePlatform platform = DevicePlatform.ServerSideApp; // Optional private boolean base64Encoded = true; // Optional - private int threadCount = 50; // Optional - private ExecutorService requestExecutorService = null; // Optional /** * @param emitter Emitter to which events will be sent @@ -117,30 +103,6 @@ public TrackerBuilder base64(Boolean base64) { return this; } - /** - * Sets the Thread Count for the ExecutorService - * - * @param threadCount the size of the thread pool - * @return itself - */ - public TrackerBuilder threadCount(final int threadCount) { - this.threadCount = threadCount; - return this; - } - - /** - * Set a custom ExecutorService to send http request. - * - * @param executorService the ExecutorService to use - * @return itself - */ - public TrackerBuilder requestExecutorService(final ExecutorService executorService) { - this.requestExecutorService = executorService; - return this; - } - - - /** * Creates a new Tracker * @@ -230,45 +192,6 @@ public TrackerParameters getParameters() { // --- Event Tracking Functions - /** - * Sends a runnable to the executor service. - * - * @param runnable the runnable to be queued - */ - protected void execute(final Runnable runnable) { - this.executor.execute(runnable); - } - - /** - * Copied from `Executors.defaultThreadFactory()`. - * The only change is the generated name prefix. - */ - static class TrackerThreadFactory implements ThreadFactory { - private static final AtomicInteger poolNumber = new AtomicInteger(1); - private final ThreadGroup group; - private final AtomicInteger threadNumber = new AtomicInteger(1); - private final String namePrefix; - - TrackerThreadFactory() { - SecurityManager securityManager = System.getSecurityManager(); - this.group = securityManager != null ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup(); - this.namePrefix = "snowplow-tracker-pool-" + poolNumber.getAndIncrement() + "-event-thread-"; - } - - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(this.group, runnable, this.namePrefix + this.threadNumber.getAndIncrement(), 0L); - if (thread.isDaemon()) { - thread.setDaemon(false); - } - - if (thread.getPriority() != 5) { - thread.setPriority(5); - } - - return thread; - } - } - /** * Handles tracking the different types of events that * the Tracker can encounter. @@ -276,23 +199,17 @@ public Thread newThread(Runnable runnable) { * @param event the event to track */ public void track(Event event) { - execute(getProcessEventRunnable(event)); - } - - private Runnable getProcessEventRunnable(Event event) { - return () -> { - // a list because Ecommerce events become multiple Payloads - List processedEvents = eventTypeSpecificPreProcessing(event); - for (Event processedEvent : processedEvents) { - // Event ID (eid) and device_created_timestamp (dtm) are added during getPayload() - TrackerPayload payload = (TrackerPayload) processedEvent.getPayload(); - - addTrackerParameters(payload); - addContext(processedEvent, payload); - addSubject(processedEvent, payload); - this.emitter.add(payload); - } - }; + // a list because Ecommerce events become multiple Payloads + List processedEvents = eventTypeSpecificPreProcessing(event); + for (Event processedEvent : processedEvents) { + // Event ID (eid) and device_created_timestamp (dtm) are generated when the Event is initialised + TrackerPayload payload = (TrackerPayload) processedEvent.getPayload(); + + addTrackerParameters(payload); + addContext(processedEvent, payload); + addSubject(processedEvent, payload); + this.emitter.add(payload); + } } private List eventTypeSpecificPreProcessing(Event event) { @@ -381,27 +298,4 @@ private void addSubject(Event event, TrackerPayload payload) { } } - public void close() { - // Shutdown executor thread pool for the tracker - if (executor != null) { - executor.shutdown(); - try { - if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { - executor.shutdownNow(); - if (!executor.awaitTermination(1, TimeUnit.SECONDS)) - LOGGER.warn("Tracker executor did not terminate"); - } - } catch (InterruptedException ie) { - executor.shutdownNow(); - Thread.currentThread().interrupt(); - } - } - - // Shutdown executor thread pool for the emitter - if (this.emitter.getClass().equals(BatchEmitter.class)) { - BatchEmitter emitter = (BatchEmitter) this.emitter; - emitter.close(); - } - } - } diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java index ccf6b551..e56e4c14 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java @@ -13,8 +13,8 @@ package com.snowplowanalytics.snowplow.tracker.emitter; import java.util.List; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; @@ -33,24 +33,26 @@ public abstract class AbstractEmitter implements Emitter { protected HttpClientAdapter httpClientAdapter; - protected ExecutorService executor; + protected ScheduledExecutorService executor; public static abstract class Builder> { private HttpClientAdapter httpClientAdapter; // Optional private int threadCount = 50; // Optional - private ExecutorService requestExecutorService = null; // Optional + private ScheduledExecutorService requestExecutorService = null; // Optional private String collectorUrl = null; // Required if not specifying a httpClientAdapter protected abstract T self(); /** - * Set a custom ExecutorService to send http request. + * Set a custom ScheduledExecutorService to send http request. + *

+ * Implementation note: Be aware that calling `close()` on a BatchEmitter instance + * has a side-effect and will shutdown that ExecutorService. * - * /!\ Be aware that calling `close()` on a BatchEmitter instance has a side-effect and will shutdown that ExecutorService. - * @param executorService the ExecutorService to use + * @param executorService the ScheduledExecutorService to use * @return itself */ - public T requestExecutorService(final ExecutorService executorService) { + public T requestExecutorService(final ScheduledExecutorService executorService) { this.requestExecutorService = executorService; return self(); } @@ -164,15 +166,6 @@ protected AbstractEmitter(final Builder builder) { @Override public abstract List getBuffer(); - /** - * Sends a runnable to the executor service. - * - * @param runnable the runnable to be queued - */ - protected void execute(final Runnable runnable) { - this.executor.execute(runnable); - } - /** * Checks whether the response code was a success or not. * diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java index 888adf19..d155ce5a 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java @@ -17,7 +17,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Preconditions; import com.snowplowanalytics.snowplow.tracker.constants.Constants; @@ -35,21 +35,17 @@ public class BatchEmitter extends AbstractEmitter implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(BatchEmitter.class); - private static final AtomicInteger EVENTS_CHECK_THREAD_NUMBER = new AtomicInteger(1); - private static final String EVENTS_CHECK_THREAD_NAME_PREFIX = "snowplow-emitter-checkForEvents-thread-"; - - private final Thread checkForEventsToSend; private boolean isClosing = false; - + private int batchSize; private final EventStore eventStore; - - private final long closeTimeout = 5; + private final AtomicLong retryDelay; public static abstract class Builder> extends AbstractEmitter.Builder { private int batchSize = 50; // Optional - private EventStore eventStore = new InMemoryEventStore(); + private int bufferCapacity = Integer.MAX_VALUE; + private EventStore eventStore; /** * @param batchSize The count of events to buffer before sending @@ -60,11 +56,24 @@ public T batchSize(final int batchSize) { return self(); } + /** + * @param eventStore The EventStore to use + * @return itself + */ public T eventStore(final EventStore eventStore) { this.eventStore = eventStore; return self(); } + /** + * @param bufferCapacity The maximum capacity of the default InMemoryEventStore event buffer + * @return itself + */ + public T bufferCapacity(final int bufferCapacity) { + this.bufferCapacity = bufferCapacity; + return self(); + } + public BatchEmitter build() { return new BatchEmitter(this); } @@ -86,37 +95,45 @@ protected BatchEmitter(final Builder builder) { // Precondition checks Preconditions.checkArgument(builder.batchSize > 0, "batchSize must be greater than 0"); + batchSize = builder.batchSize; - this.batchSize = builder.batchSize; - this.eventStore = builder.eventStore; - - checkForEventsToSend = new Thread( - getCheckForEventsToSendRunnable(), - EVENTS_CHECK_THREAD_NAME_PREFIX + EVENTS_CHECK_THREAD_NUMBER.getAndIncrement() - ); - checkForEventsToSend.start(); + if (builder.eventStore == null) { + eventStore = new InMemoryEventStore(builder.bufferCapacity); + } else { + eventStore = builder.eventStore; + } + retryDelay = new AtomicLong(0L); } /** * Adds a TrackerPayload to the concurrent queue buffer + *

+ * Implementation note: Be aware that calling `close()` on a BatchEmitter instance + * has a side-effect and will shutdown that ExecutorService. * * @param payload a payload */ @Override public void add(final TrackerPayload payload) { - boolean result = eventStore.add(payload); + boolean result = eventStore.addEvent(payload); + + if (!isClosing) { + if (eventStore.size() >= bufferSize) { + executor.schedule(getPostRequestRunnable(bufferSize), retryDelay.get(), TimeUnit.MILLISECONDS); + } + } if (!result) { LOGGER.error("Unable to add payload to emitter, emitter buffer is full"); } } - /* - * Forces all the payloads currently in the buffer to be sent + /** + * Forces all the payloads currently in the buffer to be sent immediately */ @Override public void flushBuffer() { - drainEventsAndSend(eventStore.getSize()); + executor.schedule(getPostRequestRunnable(eventStore.size()), 0, TimeUnit.MILLISECONDS); } /** @@ -147,49 +164,52 @@ public void setBatchSize(final int batchSize) { */ @Override public int getBatchSize() { - return this.batchSize; - } - - /** - * Checks if batchSize is reached - * - * @return the new Runnable object - */ - private Runnable getCheckForEventsToSendRunnable() { - return () -> { - while (!isClosing) { - if (eventStore.getSize() >= batchSize) { - drainEventsAndSend(this.getBatchSize()); - } - } - }; + return batchSize; } - private void drainEventsAndSend(int numberOfEvents) { - List payloads = eventStore.removeEvents(numberOfEvents); - execute(getPostRequestRunnable(payloads)); + long getRetryDelay() { + return retryDelay.get(); } /** * Returns a Runnable POST Request operation * - * @param buffer the event buffer to be sent + * @param numberOfEvents the number of events to be sent in the request * @return the new Runnable object */ - private Runnable getPostRequestRunnable(final List buffer) { + private Runnable getPostRequestRunnable(int numberOfEvents) { return () -> { - if (buffer.size() == 0) { - return; - } + BatchPayload batchedEvents = null; + try { + batchedEvents = eventStore.getEventsBatch(numberOfEvents); + List eventsInRequest = batchedEvents.getPayloads(); - final SelfDescribingJson post = getFinalPost(buffer); - final int code = httpClientAdapter.post(post); + if (eventsInRequest.size() == 0) { + return; + } - // Process results - if (!isSuccessfulSend(code)) { - LOGGER.error("BatchEmitter failed to send {} events: code: {}", buffer.size(), code); - } else { - LOGGER.debug("BatchEmitter successfully sent {} events: code: {}", buffer.size(), code); + final SelfDescribingJson post = getFinalPost(eventsInRequest); + final int code = httpClientAdapter.post(post); + + // Process results + if (isSuccessfulSend(code)) { + LOGGER.debug("BatchEmitter successfully sent {} events: code: {}", eventsInRequest.size(), code); + retryDelay.set(0L); + eventStore.cleanupAfterSendingAttempt(true, batchedEvents.getBatchId()); + } else { + LOGGER.error("BatchEmitter failed to send {} events: code: {}", eventsInRequest.size(), code); + eventStore.cleanupAfterSendingAttempt(false, batchedEvents.getBatchId()); + + // exponentially increase retry backoff time after the first failure + if (!retryDelay.compareAndSet(0, 50L)) { + retryDelay.updateAndGet(currentDelay -> currentDelay * 2); + } + } + } catch (Exception e) { + LOGGER.error("BatchEmitter event sending error: {}", e.getMessage()); + if (batchedEvents != null) { + eventStore.cleanupAfterSendingAttempt(false, batchedEvents.getBatchId()); + } } }; } @@ -197,14 +217,14 @@ private Runnable getPostRequestRunnable(final List buffer) { /** * Constructs the SelfDescribingJson to be sent to the endpoint * - * @param buffer the event buffer + * @param events the event buffer * @return the constructed POST payload */ - private SelfDescribingJson getFinalPost(final List buffer) { + private SelfDescribingJson getFinalPost(final List events) { final List> toSendPayloads = new ArrayList<>(); final String sentTimestamp = Long.toString(System.currentTimeMillis()); - for (TrackerPayload payload : buffer) { + for (TrackerPayload payload : events) { payload.add(Parameter.DEVICE_SENT_TIMESTAMP, sentTimestamp); toSendPayloads.add(payload.getMap()); } @@ -213,13 +233,16 @@ private SelfDescribingJson getFinalPost(final List buffer) { } /** - * On close attempt to send all remaining events. + * On close, attempt to send all remaining events. + *

+ * Implementation note: Be aware that calling `close()` + * has a side-effect of shutting down the Emitter ScheduledExecutorService. */ @Override public void close() { + final long closeTimeout = 5; isClosing = true; - checkForEventsToSend.interrupt(); // Kill checkForEventsToSend thread flushBuffer(); // Attempt to send all remaining events //Shutdown executor threadpool diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchPayload.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchPayload.java new file mode 100644 index 00000000..f561e1aa --- /dev/null +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchPayload.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2014-2021 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.tracker.emitter; + +import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload; + +import java.util.List; + + +public class BatchPayload { + + private final Long batchId; + private final List payloads; + + public BatchPayload(Long payloadId, List payloads) { + this.batchId = payloadId; + this.payloads = payloads; + } + + public Long getBatchId() { + return batchId; + } + + public List getPayloads() { + return payloads; + } +} diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/EventStore.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/EventStore.java index 4f61e1d3..9a2eaa6a 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/EventStore.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/EventStore.java @@ -6,11 +6,13 @@ public interface EventStore { - boolean add(TrackerPayload trackerPayload); + boolean addEvent(TrackerPayload trackerPayload); - List removeEvents(int numberToRemove); - - int getSize(); + BatchPayload getEventsBatch(int numberToRemove); List getAllEvents(); + + void cleanupAfterSendingAttempt(boolean successfullySent, long batchId); + + int size(); } diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStore.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStore.java index e3ab9477..d5366da5 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStore.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStore.java @@ -1,36 +1,75 @@ package com.snowplowanalytics.snowplow.tracker.emitter; import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.List; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicLong; public class InMemoryEventStore implements EventStore { - public final BlockingQueue eventBuffer = new LinkedBlockingQueue<>(); + private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryEventStore.class); + private final AtomicLong batchId = new AtomicLong(1); + + public final LinkedBlockingDeque eventBuffer; + public final ConcurrentHashMap> eventsBeingSent = new ConcurrentHashMap<>(); + + public InMemoryEventStore() { + eventBuffer = new LinkedBlockingDeque<>(); + } + + public InMemoryEventStore(int bufferCapacity) { + eventBuffer = new LinkedBlockingDeque<>(bufferCapacity); + } @Override - public boolean add(TrackerPayload trackerPayload) { + public boolean addEvent(TrackerPayload trackerPayload) { return eventBuffer.offer(trackerPayload); } @Override - public List removeEvents(int numberToRemove) { - // if numberToRemove is greater than the number of events present, - // it will return all the events (there's no error) - List eventsList = new ArrayList<>(); - eventBuffer.drainTo(eventsList, numberToRemove); - return eventsList; + public BatchPayload getEventsBatch(int numberToGet) { + List eventsToSend = new ArrayList<>(); + + eventBuffer.drainTo(eventsToSend, numberToGet); + + // The batch of events is wrapped as a BatchPayload + // They're also added to the "pending" event buffer, the eventsBeingSent HashMap + BatchPayload batchedEvents = new BatchPayload(batchId.getAndIncrement(), eventsToSend); + eventsBeingSent.put(batchedEvents.getBatchId(), batchedEvents.getPayloads()); + return batchedEvents; } @Override - public int getSize() { - return eventBuffer.size(); + public void cleanupAfterSendingAttempt(boolean successfullySent, long batchId) { + // Events that successfully sent are deleted from the pending buffer + List events = eventsBeingSent.remove(batchId); + + // Events that didn't send are inserted at the head of the eventBuffer + // for immediate resending. + if (!successfullySent) { + while (events.size() > 0) { + TrackerPayload payloadToReinsert = events.remove(0); + boolean result = eventBuffer.offerFirst(payloadToReinsert); + if (!result) { + LOGGER.error("Event buffer is full. Dropping newer payload to reinsert older payload"); + eventBuffer.removeLast(); + eventBuffer.offerFirst(payloadToReinsert); + } + } + } } @Override public List getAllEvents() { - return new ArrayList<>(eventBuffer); + TrackerPayload[] events = eventBuffer.toArray(new TrackerPayload[0]); + return Arrays.asList(events); + } + + @Override + public int size() { + return eventBuffer.size(); } } diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/SimpleEmitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/SimpleEmitter.java index 14d49327..715da53b 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/SimpleEmitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/SimpleEmitter.java @@ -50,9 +50,14 @@ protected SimpleEmitter(final Builder builder) { super(builder); } + /** + * Adds an event to the buffer and instantly sends it + * + * @param payload a payload + */ @Override public void add(TrackerPayload payload) { - // nothing happens + executor.execute(getGetRequestRunnable(payload)); } /** diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/events/AbstractEvent.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/events/AbstractEvent.java index 31161c0e..92bb8988 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/events/AbstractEvent.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/events/AbstractEvent.java @@ -214,7 +214,7 @@ public Subject getSubject() { /** * Adds the default parameters to a TrackerPayload object. * - * @param payload the payload to add too. + * @param payload the payload to add to. * @return the TrackerPayload with appended values. */ protected TrackerPayload putDefaultParams(TrackerPayload payload) { diff --git a/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java b/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java index 533c0385..b2840266 100644 --- a/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java +++ b/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java @@ -15,6 +15,7 @@ import java.util.*; import static java.util.Collections.singletonList; +import com.snowplowanalytics.snowplow.tracker.emitter.BatchPayload; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -524,7 +525,7 @@ public void testTrackTimingWithSubject() throws InterruptedException { @Test public void testGetTrackerVersion() { Tracker tracker = new Tracker.TrackerBuilder(mockEmitter, "namespace", "an-app-id").build(); - assertEquals("java-0.11.0", tracker.getTrackerVersion()); + assertEquals("java-0.12.0-alpha.1", tracker.getTrackerVersion()); } @Test @@ -570,31 +571,4 @@ public void testSetNamespace() { Tracker tracker = new Tracker.TrackerBuilder(mockEmitter, "namespace", "an-app-id").build(); assertEquals("namespace", tracker.getNamespace()); } - - @Test - public void threadsHaveExpectedNames() { - // A new thread should be created for each event tracked, - // up to the configurable pool size limit - tracker.track(PageView.builder() - .pageUrl("url") - .pageTitle("title") - .referrer("referer") - .build()); - - tracker.track(PageView.builder() - .pageUrl("url") - .pageTitle("title") - .referrer("referer") - .build()); - - // Create a list of all live thread names - List threadList = new ArrayList<>(Thread.getAllStackTraces().keySet()); - List threadNames = new ArrayList<>(); - for (Thread thread : threadList) { - threadNames.add(thread.getName()); - } - - Assert.assertTrue(threadNames.contains("snowplow-tracker-pool-1-event-thread-1")); - Assert.assertTrue(threadNames.contains("snowplow-tracker-pool-1-event-thread-2")); - } } diff --git a/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitterTest.java b/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitterTest.java index 05c570c6..9a8f2346 100644 --- a/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitterTest.java +++ b/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitterTest.java @@ -16,6 +16,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.regex.Pattern; import com.google.common.collect.Lists; @@ -35,16 +36,19 @@ public class BatchEmitterTest { private MockHttpClientAdapter mockHttpClientAdapter; + private FailingHttpClientAdapter failingHttpClientAdapter; private BatchEmitter emitter; public static class MockHttpClientAdapter implements HttpClientAdapter { public boolean isGetCalled = false; public boolean isPostCalled = false; + public int postCounter = 0; public SelfDescribingJson capturedPayload; @Override public int post(SelfDescribingJson payload) { isPostCalled = true; + postCounter++; capturedPayload = payload; return 200; } @@ -66,9 +70,42 @@ public Object getHttpClient() { } } + // this class fails to "send" the first 4 requests + // but returns a successful result (200) subsequently + static class FailingHttpClientAdapter implements HttpClientAdapter { + int failedPostCounter = 0; + int successfulPostCounter = 0; + @Override + public int post(SelfDescribingJson payload) { + if (failedPostCounter >= 4) { + successfulPostCounter++; + return 200; + } + + failedPostCounter++; + return 500; + } + + @Override + public int get(TrackerPayload payload) { + return 0; + } + + @Override + public String getUrl() { + return null; + } + + @Override + public Object getHttpClient() { + return null; + } + } + @Before public void setUp() { mockHttpClientAdapter = new MockHttpClientAdapter(); + failingHttpClientAdapter = new FailingHttpClientAdapter(); emitter = BatchEmitter.builder() .httpClientAdapter(mockHttpClientAdapter) .batchSize(10) @@ -102,8 +139,23 @@ public void addToBuffer_withMore10Payloads_shouldEmptyBuffer() throws Interrupte @SuppressWarnings("unchecked") List> capturedPayload = (List>) mockHttpClientAdapter.capturedPayload.getMap().get("data"); - assertPayload(payloads, capturedPayload); Assert.assertEquals(0, emitter.getBuffer().size()); + Assert.assertEquals(1, mockHttpClientAdapter.postCounter); + } + + @Test + public void addToBuffer_doesNotAddEventIfBufferFull() { + emitter = BatchEmitter.builder() + .httpClientAdapter(mockHttpClientAdapter) + .bufferCapacity(1) + .build(); + + emitter.add(createPayload()); + + TrackerPayload differentPayload = createPayload(); + emitter.add(differentPayload); + + Assert.assertFalse(emitter.getBuffer().contains(differentPayload)); } @Test @@ -164,25 +216,9 @@ public void getFinalPost_shouldAddSTMParameter() throws InterruptedException { } } - @Test - public void emitterThreadFactory_correctlyNamesThreads() { - class MyRunnable implements Runnable { - @Override - public void run() {} - } - - BatchEmitter.EmitterThreadFactory threadFactory = new BatchEmitter.EmitterThreadFactory(); - String threadName = threadFactory.newThread(new MyRunnable()).getName(); - - // It's pool-2 because pool-1 was created during emitter instantiation - Assert.assertEquals("snowplow-emitter-pool-2-request-thread-1", threadName); - } - @Test public void threadsHaveExpectedNames() { - // A checkForEventsToSend thread is created on BatchEmitter instantiation. - // Calling flushBuffer() here to require another thread - causing - // creation of a request thread within the scheduledThreadPool. + // Calling flushBuffer() here to create a request thread for event sending emitter.flushBuffer(); // Create a list of all live thread names @@ -192,8 +228,16 @@ public void threadsHaveExpectedNames() { threadNames.add(thread.getName()); } - Assert.assertTrue(threadNames.contains("snowplow-emitter-checkForEvents-thread-1")); - Assert.assertTrue(threadNames.contains("snowplow-emitter-pool-1-request-thread-1")); + // Because the threadpools are named by a static ThreadFactory, + // the pool number varies if this test is run in isolation or not + boolean matchResult = false; + for (String name : threadNames) { + if (Pattern.matches("snowplow-emitter-pool-\\d+-request-thread-1", name)) { + matchResult = true; + } + } + + Assert.assertTrue(matchResult); } @Test @@ -202,6 +246,8 @@ public void close_sendsEventsAndStopsThreads() throws InterruptedException { for (TrackerPayload payload : payloads) { emitter.add(payload); } + Thread.sleep(500); + emitter.close(); Thread.sleep(500); @@ -218,6 +264,65 @@ public void close_sendsEventsAndStopsThreads() throws InterruptedException { Assert.assertEquals(20, emitter.getBuffer().size()); } + @Test + public void eventsThatFailToSendAreReturnedToEventBuffer() throws InterruptedException { + emitter = BatchEmitter.builder() + .httpClientAdapter(new FailingHttpClientAdapter()) + .bufferSize(10) + .build(); + + List payloads = createPayloads(2); + for (TrackerPayload payload : payloads) { + emitter.add(payload); + } + emitter.flushBuffer(); + Thread.sleep(500); + + List storedEvents = emitter.getBuffer(); + + Assert.assertEquals(2, storedEvents.size()); + Assert.assertTrue(storedEvents.contains(payloads.get(0))); + Assert.assertTrue(storedEvents.contains(payloads.get(1))); + } + + @Test + public void eventSendingFailureIncreasesBackoffTime() throws InterruptedException { + emitter = BatchEmitter.builder() + .httpClientAdapter(failingHttpClientAdapter) + .bufferSize(1) + .build(); + + List payloads = createPayloads(2); + for (TrackerPayload payload : payloads) { + emitter.add(payload); + } + Thread.sleep(500); + + Assert.assertEquals(100, emitter.getRetryDelay()); + } + + @Test + public void successfulSendAfterFailureResetsBackoffTime() throws InterruptedException { + // the FailingHttpClientAdapter returns 500 for the first 4 requests + // then subsequently returns 200 + FailingHttpClientAdapter failingHttpClientAdapter = new FailingHttpClientAdapter(); + emitter = BatchEmitter.builder() + .httpClientAdapter(failingHttpClientAdapter) + .bufferSize(1) + .threadCount(1) + .build(); + + List payloads = createPayloads(6); + for (TrackerPayload payload : payloads) { + emitter.add(payload); + } + + Thread.sleep(500); + + Assert.assertEquals(2, failingHttpClientAdapter.successfulPostCounter); + Assert.assertEquals(0, emitter.getRetryDelay()); + } + private TrackerPayload createPayload() { PageView pv = PageView.builder() .pageUrl("https://www.snowplowanalytics.com/") diff --git a/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStoreTest.java b/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStoreTest.java index 7c341dfb..33a7d843 100644 --- a/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStoreTest.java +++ b/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStoreTest.java @@ -18,71 +18,101 @@ import org.junit.Before; import org.junit.Test; -import java.util.ArrayList; import java.util.List; public class InMemoryEventStoreTest { private TrackerPayload trackerPayload; private InMemoryEventStore eventStore; - private List singleEventList; - private List twoEventsList; - @Before public void setUp() { - trackerPayload = createPayload(); + trackerPayload = createTrackerPayload(); eventStore = new InMemoryEventStore(); - singleEventList = new ArrayList<>(); - twoEventsList = new ArrayList<>(); - - singleEventList.add(trackerPayload); - twoEventsList.add(trackerPayload); - twoEventsList.add(trackerPayload); } @Test public void correctlyAddAnEventToStore() { - boolean result = eventStore.add(trackerPayload); + boolean result = eventStore.addEvent(trackerPayload); Assert.assertTrue(result); } @Test public void getSize_returnsCorrectNumberOfStoredEvents() { - storeTwoPayloads(); + eventStore.addEvent(trackerPayload); + eventStore.addEvent(trackerPayload); + + Assert.assertEquals(2, eventStore.size()); + } + + @Test + public void getEventsFromStorage() { + eventStore.addEvent(trackerPayload); + eventStore.addEvent(trackerPayload); + eventStore.addEvent(trackerPayload); + eventStore.addEvent(trackerPayload); + + Assert.assertEquals(2, eventStore.getEventsBatch(2).getPayloads().size()); + Assert.assertEquals(2, eventStore.size()); + } + + @Test + public void getAllEventsIfAskedForMoreEventsThanAreStored() { + eventStore.addEvent(trackerPayload); + eventStore.addEvent(trackerPayload); + + List events = eventStore.getEventsBatch(3).getPayloads(); - Assert.assertEquals(2, eventStore.getSize()); + Assert.assertEquals(2, events.size()); } @Test - public void removeAddedEvent() { - storeTwoPayloads(); + public void putEventsBackInBufferIfFailedToSend() { + eventStore.addEvent(trackerPayload); + eventStore.addEvent(trackerPayload); + eventStore.getEventsBatch(2); - List removedEventList = eventStore.removeEvents(1); - Assert.assertEquals(singleEventList, removedEventList); - Assert.assertEquals(1, eventStore.getSize()); + Assert.assertEquals(0, eventStore.size()); + + eventStore.cleanupAfterSendingAttempt(false, 1L); + + Assert.assertEquals(2, eventStore.size()); } @Test - public void removeAllEventsIfAskedForMoreEventsThanAreStored() { - storeTwoPayloads(); + public void doNotPutEventsBackInBufferIfSent() { + eventStore.addEvent(trackerPayload); + eventStore.addEvent(trackerPayload); + eventStore.getEventsBatch(2); + + Assert.assertEquals(0, eventStore.size()); - List removedEventList = eventStore.removeEvents(100); - Assert.assertEquals(twoEventsList, removedEventList); - Assert.assertEquals(0, eventStore.getSize()); + eventStore.cleanupAfterSendingAttempt(true, 1L); + + Assert.assertEquals(0, eventStore.size()); } @Test - public void getAllEvents_doesNotRemoveEventsFromStore() { - storeTwoPayloads(); + public void dropNewerEventsOnFailureWhenBufferFull() { + eventStore = new InMemoryEventStore(3); + + TrackerPayload differentPayload = createTrackerPayload(); + + eventStore.addEvent(differentPayload); + eventStore.getEventsBatch(1); + + eventStore.addEvent(trackerPayload); + eventStore.addEvent(trackerPayload); + eventStore.addEvent(trackerPayload); + + eventStore.cleanupAfterSendingAttempt(false, 1L); + Assert.assertEquals(3, eventStore.size()); + Assert.assertTrue(eventStore.getAllEvents().contains(differentPayload)); - List retrievedEventsList = eventStore.getAllEvents(); - Assert.assertEquals(twoEventsList, retrievedEventsList); - Assert.assertEquals(2, eventStore.getSize()); } - private TrackerPayload createPayload() { + private TrackerPayload createTrackerPayload() { PageView pv = PageView.builder() .pageUrl("https://www.snowplowanalytics.com/") .pageTitle("Snowplow") @@ -91,10 +121,4 @@ private TrackerPayload createPayload() { return pv.getPayload(); } - - private void storeTwoPayloads() { - for (TrackerPayload payload : twoEventsList) { - eventStore.add(payload); - } - } }