From 4e66eecc9ee3382c0d8f78bfad037b5d38f2d87d Mon Sep 17 00:00:00 2001 From: Miranda Wilson Date: Mon, 10 Jan 2022 14:36:50 +0000 Subject: [PATCH 1/7] Start changing Tracker to create TrackerPayloads not TrackerEvents --- .../snowplow/tracker/Tracker.java | 67 ++++++++++++++++++- .../tracker/emitter/AbstractEmitter.java | 6 +- .../tracker/emitter/BatchEmitter.java | 8 ++- .../snowplow/tracker/emitter/Emitter.java | 5 +- .../tracker/emitter/SimpleEmitter.java | 7 +- .../tracker/payload/TrackerEvent.java | 33 ++++----- .../snowplow/tracker/TrackerTest.java | 9 ++- .../tracker/emitter/BatchEmitterTest.java | 14 ++-- 8 files changed, 113 insertions(+), 36 deletions(-) diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java index a992c48c..264aff36 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java @@ -14,10 +14,19 @@ import com.google.common.base.Preconditions; +import com.snowplowanalytics.snowplow.tracker.constants.Constants; +import com.snowplowanalytics.snowplow.tracker.constants.Parameter; import com.snowplowanalytics.snowplow.tracker.emitter.Emitter; import com.snowplowanalytics.snowplow.tracker.events.*; +import com.snowplowanalytics.snowplow.tracker.payload.SelfDescribingJson; import com.snowplowanalytics.snowplow.tracker.payload.TrackerEvent; import com.snowplowanalytics.snowplow.tracker.payload.TrackerParameters; +import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; public class Tracker { @@ -190,7 +199,61 @@ public TrackerParameters getParameters() { * @param event the event to track */ public void track(Event event) { - // Emit the event - this.emitter.emit(new TrackerEvent(event, this.parameters, this.subject)); + final Class eventClass = event.getClass(); + + if (eventClass.equals(PageView.class)) { + System.out.println("got a pageView!"); + + TrackerPayload payload = (TrackerPayload) event.getPayload(); + Subject eventSubject = event.getSubject(); + final List entities = event.getContext(); + + addTrackerParameters(payload); + // Build the final context and add it to the payload + if (entities != null && entities.size() > 0) { + SelfDescribingJson envelope = getFinalContext(entities); + payload.addMap(envelope.getMap(), this.parameters.getBase64Encoded(), Parameter.CONTEXT_ENCODED, Parameter.CONTEXT); + } + + // Add subject if available + if (eventSubject != null) { + payload.addMap(new HashMap<>(eventSubject.getSubject())); + } else if (this.subject != null) { + payload.addMap(new HashMap<>(this.subject.getSubject())); + } + this.emitter.add(payload); + + } + + TrackerEvent trackerEvent = new TrackerEvent(event, this.parameters, this.subject); + // convert trackerEvent into payload (hashmap-based) + // it's a list for now because of eCommerce event nesting + + + + // Send the event to the Emitter + // change this to send the payload to Emitter instead + this.emitter.add(trackerEvent); + } + + /** + * Builds the final event context. + * + * @param entities the base event context + * @return the final event context json with many entities inside + */ + private SelfDescribingJson getFinalContext(List entities) { + List> entityMaps = new LinkedList<>(); + for (SelfDescribingJson selfDescribingJson : entities) { + entityMaps.add(selfDescribingJson.getMap()); + } + return new SelfDescribingJson(Constants.SCHEMA_CONTEXTS, entityMaps); + } + + private void addTrackerParameters(TrackerPayload payload) { + payload.add(Parameter.PLATFORM, this.parameters.getPlatform().toString()); + payload.add(Parameter.APP_ID, this.parameters.getAppId()); + payload.add(Parameter.NAMESPACE, this.parameters.getNamespace()); + payload.add(Parameter.TRACKER_VERSION, this.parameters.getTrackerVersion()); } } diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java index b1108a4d..ee185919 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java @@ -24,6 +24,7 @@ import com.snowplowanalytics.snowplow.tracker.http.OkHttpClientAdapter; import com.snowplowanalytics.snowplow.tracker.payload.TrackerEvent; +import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload; import okhttp3.OkHttpClient; /** @@ -147,7 +148,10 @@ protected AbstractEmitter(final Builder builder) { * @param event an event */ @Override - public abstract void emit(TrackerEvent event); + public abstract void add(TrackerEvent event); + + @Override + public abstract void add(TrackerPayload payload); /** * Customize the emitter buffer size to any valid integer greater than zero. diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java index 377c1e8d..6cf5c614 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java @@ -105,7 +105,7 @@ protected BatchEmitter(final Builder builder) { * @param event an event */ @Override - public void emit(final TrackerEvent event) { + public void add(final TrackerEvent event) { boolean result = eventStore.add(event); if (!result) { @@ -113,6 +113,12 @@ public void emit(final TrackerEvent event) { } } + @Override + public void add(TrackerPayload payload) { + System.out.println("payload received"); + } + + /* * Forces all the events currently in the buffer to be sent */ diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/Emitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/Emitter.java index c51a456d..95ea7515 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/Emitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/Emitter.java @@ -15,6 +15,7 @@ import java.util.List; import com.snowplowanalytics.snowplow.tracker.payload.TrackerEvent; +import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload; /** * Emitter interface. @@ -27,7 +28,9 @@ public interface Emitter { * * @param event an event to be emitted */ - void emit(TrackerEvent event); + void add(TrackerEvent event); + + void add(TrackerPayload payload); /** * Customize the emitter buffer size to any valid integer diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/SimpleEmitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/SimpleEmitter.java index af7b50fc..3132ba79 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/SimpleEmitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/SimpleEmitter.java @@ -58,10 +58,15 @@ protected SimpleEmitter(final Builder builder) { * @param event an event */ @Override - public void emit(final TrackerEvent event) { + public void add(final TrackerEvent event) { execute(getGetRequestRunnable(event)); } + @Override + public void add(TrackerPayload payload) { + // nothing happens + } + /** * Sends buffered events, but SimpleEmitter does not buffer events * So has no effect diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/payload/TrackerEvent.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/payload/TrackerEvent.java index ced65783..7f38204b 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/payload/TrackerEvent.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/payload/TrackerEvent.java @@ -68,10 +68,7 @@ public List getTrackerPayloads() { // Need to set the Base64 rule for Unstructured events final Unstructured unstructured = (Unstructured) event; unstructured.setBase64Encode(this.parameters.getBase64Encoded()); - TrackerPayload payload = unstructured.getPayload(); - addTrackerParameters(payload); - addContextsAndSubject(contexts, subject, payload); - payloads.add(payload); + completePayloadCreation(payloads, unstructured.getPayload(), subject, contexts); } else if (eventClass.equals(Timing.class) || eventClass.equals(ScreenView.class)) { // These are wrapper classes for Unstructured events; need to create @@ -86,39 +83,33 @@ public List getTrackerPayloads() { .build(); unstructured.setBase64Encode(this.parameters.getBase64Encoded()); - TrackerPayload payload = unstructured.getPayload(); - addTrackerParameters(payload); - addContextsAndSubject(contexts, subject, payload); - payloads.add(payload); + completePayloadCreation(payloads, unstructured.getPayload(), subject, contexts); } else if (eventClass.equals(EcommerceTransaction.class)) { final EcommerceTransaction ecommerceTransaction = (EcommerceTransaction) event; - TrackerPayload payload = ecommerceTransaction.getPayload(); - addTrackerParameters(payload); - addContextsAndSubject(contexts, subject, payload); - payloads.add(payload); + completePayloadCreation(payloads, ecommerceTransaction.getPayload(), subject, contexts); // Track each item individually for (final EcommerceTransactionItem item : ecommerceTransaction.getItems()) { item.setDeviceCreatedTimestamp(ecommerceTransaction.getDeviceCreatedTimestamp()); - TrackerPayload itemPayload = item.getPayload(); - addTrackerParameters(itemPayload); - addContextsAndSubject(item.getContext(), item.getSubject(), itemPayload); - payloads.add(itemPayload); - } + completePayloadCreation(payloads, item.getPayload(), item.getSubject(), item.getContext()); + } } else { // For all other events, simply get the payload TrackerPayload payload = (TrackerPayload) event.getPayload(); - addTrackerParameters(payload); - addContextsAndSubject(contexts, subject, payload); - payloads.add(payload); + completePayloadCreation(payloads, payload, subject, contexts); } - return payloads; } + private void completePayloadCreation(List payloads, TrackerPayload payload, Subject subject, List contexts) { + addTrackerParameters(payload); + addContextsAndSubject(contexts, subject, payload); + payloads.add(payload); + } + /** * Adds the context and subject to the event payload * diff --git a/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java b/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java index 1743c9e3..9ab77f4a 100644 --- a/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java +++ b/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java @@ -36,10 +36,15 @@ public static class MockEmitter implements Emitter { public ArrayList eventList = new ArrayList<>(); @Override - public void emit(TrackerEvent event) { + public void add(TrackerEvent event) { eventList.add(event); } + @Override + public void add(TrackerPayload payload) { + System.out.println("MockEmitter got a payload"); + } + @Override public void setBufferSize(int bufferSize) {} @@ -500,7 +505,7 @@ public void testTrackTimingWithSubject() { @Test public void testGetTrackerVersion() { Tracker tracker = new Tracker.TrackerBuilder(mockEmitter, "namespace", "an-app-id").build(); - assertEquals("java-0.10.1", tracker.getTrackerVersion()); + assertEquals("java-0.11.0", tracker.getTrackerVersion()); } @Test diff --git a/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitterTest.java b/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitterTest.java index a3c63ce8..d0c99948 100644 --- a/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitterTest.java +++ b/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitterTest.java @@ -82,7 +82,7 @@ public void setUp() { public void addToBuffer_withLess10Payloads_shouldNotEmptyBuffer() throws InterruptedException { List events = createEvents(2); for (TrackerEvent event : events) { - emitter.emit(event); + emitter.add(event); } Thread.sleep(500); @@ -96,7 +96,7 @@ public void addToBuffer_withLess10Payloads_shouldNotEmptyBuffer() throws Interru public void addToBuffer_withMore10Payloads_shouldEmptyBuffer() throws InterruptedException { List events = createEvents(10); for (TrackerEvent event : events) { - emitter.emit(event); + emitter.add(event); } Thread.sleep(500); @@ -113,7 +113,7 @@ public void addToBuffer_withMore10Payloads_shouldEmptyBuffer() throws Interrupte public void flushBuffer_shouldEmptyBuffer() throws InterruptedException { List events = createEvents(2); for (TrackerEvent event : events) { - emitter.emit(event); + emitter.add(event); } emitter.flushBuffer(); @@ -140,7 +140,7 @@ public void setAndGetBufferSizeWorksAsExpected() throws InterruptedException { List events = createEvents(2); for (TrackerEvent event : events) { - emitter.emit(event); + emitter.add(event); } Thread.sleep(500); @@ -153,7 +153,7 @@ public void setAndGetBufferSizeWorksAsExpected() throws InterruptedException { public void getFinalPost_shouldAddSTMParameter() throws InterruptedException { List events = createEvents(10); for (TrackerEvent event : events) { - emitter.emit(event); + emitter.add(event); } Thread.sleep(500); @@ -203,7 +203,7 @@ public void threadsHaveExpectedNames() { public void close_sendsEventsAndStopsThreads() throws InterruptedException { List events = createEvents(2); for (TrackerEvent event : events) { - emitter.emit(event); + emitter.add(event); } emitter.close(); @@ -216,7 +216,7 @@ public void close_sendsEventsAndStopsThreads() throws InterruptedException { // these events can be added to storage but should not be sent List moreEvents = createEvents(20); for (TrackerEvent event : moreEvents) { - emitter.emit(event); + emitter.add(event); } Assert.assertEquals(20, emitter.getBuffer().size()); } From 9fec27c6bbe1b21e67584202590ed1ac7ee9c4f0 Mon Sep 17 00:00:00 2001 From: Miranda Wilson Date: Fri, 14 Jan 2022 17:07:05 +0000 Subject: [PATCH 2/7] Copy payload creation methods into Tracker --- .../snowplow/tracker/Tracker.java | 109 ++++++++++++------ .../tracker/payload/TrackerEvent.java | 2 + .../snowplow/tracker/TrackerTest.java | 1 + 3 files changed, 78 insertions(+), 34 deletions(-) diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java index 264aff36..8dce0b64 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java @@ -23,10 +23,7 @@ import com.snowplowanalytics.snowplow.tracker.payload.TrackerParameters; import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.*; public class Tracker { @@ -199,43 +196,83 @@ public TrackerParameters getParameters() { * @param event the event to track */ public void track(Event event) { - final Class eventClass = event.getClass(); - - if (eventClass.equals(PageView.class)) { - System.out.println("got a pageView!"); - - TrackerPayload payload = (TrackerPayload) event.getPayload(); - Subject eventSubject = event.getSubject(); - final List entities = event.getContext(); + // a list because Ecommerce events become multiple payloads + List processedEvents = eventTypeSpecificPreProcessing(event); + for (Event processedEvent : processedEvents) { + TrackerPayload payload = (TrackerPayload) processedEvent.getPayload(); addTrackerParameters(payload); - // Build the final context and add it to the payload - if (entities != null && entities.size() > 0) { - SelfDescribingJson envelope = getFinalContext(entities); - payload.addMap(envelope.getMap(), this.parameters.getBase64Encoded(), Parameter.CONTEXT_ENCODED, Parameter.CONTEXT); - } - - // Add subject if available - if (eventSubject != null) { - payload.addMap(new HashMap<>(eventSubject.getSubject())); - } else if (this.subject != null) { - payload.addMap(new HashMap<>(this.subject.getSubject())); - } + addContext(processedEvent, payload); + addSubject(processedEvent, payload); this.emitter.add(payload); - } TrackerEvent trackerEvent = new TrackerEvent(event, this.parameters, this.subject); - // convert trackerEvent into payload (hashmap-based) - // it's a list for now because of eCommerce event nesting - - // Send the event to the Emitter // change this to send the payload to Emitter instead this.emitter.add(trackerEvent); } + private List eventTypeSpecificPreProcessing(Event event) { + // a list because Ecommerce events become multiple payloads + List eventList = new ArrayList<>(); + + final Class eventClass = event.getClass(); + if (eventClass.equals(Unstructured.class)) { + // Need to set the Base64 rule for Unstructured events + final Unstructured unstructured = (Unstructured) event; + unstructured.setBase64Encode(this.parameters.getBase64Encoded()); + eventList.add(unstructured); + + } else if (eventClass.equals(EcommerceTransaction.class)) { + + final EcommerceTransaction ecommerceTransaction = (EcommerceTransaction) event; + eventList.add(ecommerceTransaction); + + // Track each item individually + for (final EcommerceTransactionItem item : ecommerceTransaction.getItems()) { + + item.setDeviceCreatedTimestamp(ecommerceTransaction.getDeviceCreatedTimestamp()); + eventList.add(item); + } + } else if (eventClass.equals(Timing.class) || eventClass.equals(ScreenView.class)) { + // These are wrapper classes for Unstructured events; need to create + // Unstructured events from them and resend. + final Unstructured unstructured = Unstructured.builder() + .eventData((SelfDescribingJson) event.getPayload()) + .customContext(event.getContext()) + .deviceCreatedTimestamp(event.getDeviceCreatedTimestamp()) + .trueTimestamp(event.getTrueTimestamp()) + .eventId(event.getEventId()) + .subject(event.getSubject()) + .build(); + + unstructured.setBase64Encode(this.parameters.getBase64Encoded()); + eventList.add(unstructured); + } else { + eventList.add(event); + } + return eventList; + } + + private void addTrackerParameters(TrackerPayload payload) { + payload.add(Parameter.PLATFORM, this.parameters.getPlatform().toString()); + payload.add(Parameter.APP_ID, this.parameters.getAppId()); + payload.add(Parameter.NAMESPACE, this.parameters.getNamespace()); + payload.add(Parameter.TRACKER_VERSION, this.parameters.getTrackerVersion()); + } + + private void addContext(Event event, TrackerPayload payload) { + List entities = event.getContext(); + + // Build the final context and add it to the payload + if (entities != null && entities.size() > 0) { + SelfDescribingJson envelope = getFinalContext(entities); + payload.addMap(envelope.getMap(), this.parameters.getBase64Encoded(), Parameter.CONTEXT_ENCODED, Parameter.CONTEXT); + } + } + /** * Builds the final event context. * @@ -250,10 +287,14 @@ private SelfDescribingJson getFinalContext(List entities) { return new SelfDescribingJson(Constants.SCHEMA_CONTEXTS, entityMaps); } - private void addTrackerParameters(TrackerPayload payload) { - payload.add(Parameter.PLATFORM, this.parameters.getPlatform().toString()); - payload.add(Parameter.APP_ID, this.parameters.getAppId()); - payload.add(Parameter.NAMESPACE, this.parameters.getNamespace()); - payload.add(Parameter.TRACKER_VERSION, this.parameters.getTrackerVersion()); + private void addSubject(Event event, TrackerPayload payload) { + Subject eventSubject = event.getSubject(); + + // Add subject if available + if (eventSubject != null) { + payload.addMap(new HashMap<>(eventSubject.getSubject())); + } else if (this.subject != null) { + payload.addMap(new HashMap<>(this.subject.getSubject())); + } } } diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/payload/TrackerEvent.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/payload/TrackerEvent.java index 7f38204b..8b0216be 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/payload/TrackerEvent.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/payload/TrackerEvent.java @@ -69,6 +69,7 @@ public List getTrackerPayloads() { final Unstructured unstructured = (Unstructured) event; unstructured.setBase64Encode(this.parameters.getBase64Encoded()); completePayloadCreation(payloads, unstructured.getPayload(), subject, contexts); + } else if (eventClass.equals(Timing.class) || eventClass.equals(ScreenView.class)) { // These are wrapper classes for Unstructured events; need to create @@ -84,6 +85,7 @@ public List getTrackerPayloads() { unstructured.setBase64Encode(this.parameters.getBase64Encoded()); completePayloadCreation(payloads, unstructured.getPayload(), subject, contexts); + } else if (eventClass.equals(EcommerceTransaction.class)) { final EcommerceTransaction ecommerceTransaction = (EcommerceTransaction) event; diff --git a/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java b/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java index 9ab77f4a..53f5ba7d 100644 --- a/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java +++ b/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java @@ -43,6 +43,7 @@ public void add(TrackerEvent event) { @Override public void add(TrackerPayload payload) { System.out.println("MockEmitter got a payload"); + System.out.println(payload.toString()); } @Override From 355fb918e597ed3e51a5cdc76f6dc486bfa64323 Mon Sep 17 00:00:00 2001 From: Miranda Wilson Date: Mon, 17 Jan 2022 10:39:24 +0000 Subject: [PATCH 3/7] Refactor Emitter to use TrackerPayload --- .../snowplow/tracker/Tracker.java | 13 ++- .../tracker/emitter/AbstractEmitter.java | 7 +- .../tracker/emitter/BatchEmitter.java | 49 ++++------ .../snowplow/tracker/emitter/Emitter.java | 6 +- .../snowplow/tracker/emitter/EventStore.java | 7 +- .../tracker/emitter/InMemoryEventStore.java | 13 +-- .../tracker/emitter/SimpleEmitter.java | 22 ++--- .../snowplow/tracker/TrackerTest.java | 36 ++++--- .../tracker/emitter/BatchEmitterTest.java | 93 +++++++++---------- .../emitter/InMemoryEventStoreTest.java | 42 +++++---- 10 files changed, 135 insertions(+), 153 deletions(-) diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java index 8dce0b64..6b299e39 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java @@ -196,7 +196,7 @@ public TrackerParameters getParameters() { * @param event the event to track */ public void track(Event event) { - // a list because Ecommerce events become multiple payloads + // a list because Ecommerce events become multiple Payloads List processedEvents = eventTypeSpecificPreProcessing(event); for (Event processedEvent : processedEvents) { TrackerPayload payload = (TrackerPayload) processedEvent.getPayload(); @@ -207,11 +207,11 @@ public void track(Event event) { this.emitter.add(payload); } - TrackerEvent trackerEvent = new TrackerEvent(event, this.parameters, this.subject); - - // Send the event to the Emitter - // change this to send the payload to Emitter instead - this.emitter.add(trackerEvent); +// TrackerEvent trackerEvent = new TrackerEvent(event, this.parameters, this.subject); +// +// // Send the event to the Emitter +// // change this to send the payload to Emitter instead +// this.emitter.add(trackerEvent); } private List eventTypeSpecificPreProcessing(Event event) { @@ -232,7 +232,6 @@ private List eventTypeSpecificPreProcessing(Event event) { // Track each item individually for (final EcommerceTransactionItem item : ecommerceTransaction.getItems()) { - item.setDeviceCreatedTimestamp(ecommerceTransaction.getDeviceCreatedTimestamp()); eventList.add(item); } diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java index ee185919..bf43102b 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java @@ -145,11 +145,8 @@ protected AbstractEmitter(final Builder builder) { /** * Adds an event to the buffer * - * @param event an event + * @param payload an event */ - @Override - public abstract void add(TrackerEvent event); - @Override public abstract void add(TrackerPayload payload); @@ -182,7 +179,7 @@ protected AbstractEmitter(final Builder builder) { * @return the buffered events */ @Override - public abstract List getBuffer(); + public abstract List getBuffer(); /** * Sends a runnable to the executor service. diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java index 6cf5c614..66d15bfb 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java @@ -102,23 +102,17 @@ protected BatchEmitter(final Builder builder) { /** * Adds a TrackerEvent to the concurrent queue buffer * - * @param event an event + * @param payload an event */ @Override - public void add(final TrackerEvent event) { - boolean result = eventStore.add(event); + public void add(final TrackerPayload payload) { + boolean result = eventStore.add(payload); if (!result) { LOGGER.error("Unable to add event to emitter, emitter buffer is full"); } } - @Override - public void add(TrackerPayload payload) { - System.out.println("payload received"); - } - - /* * Forces all the events currently in the buffer to be sent */ @@ -133,7 +127,7 @@ public void flushBuffer() { * @return the buffered events */ @Override - public List getBuffer() { + public List getBuffer() { return eventStore.getAllEvents(); } @@ -174,8 +168,8 @@ private Runnable getCheckForEventsToSendRunnable() { } private void drainEventsAndSend(int numberOfEvents) { - List events = eventStore.removeEvents(numberOfEvents); - execute(getPostRequestRunnable(events)); + List payloads = eventStore.removeEvents(numberOfEvents); + execute(getPostRequestRunnable(payloads)); } /** @@ -184,7 +178,7 @@ private void drainEventsAndSend(int numberOfEvents) { * @param buffer the event buffer to be sent * @return the new Runnable object */ - private Runnable getPostRequestRunnable(final List buffer) { + private Runnable getPostRequestRunnable(final List buffer) { return () -> { if (buffer.size() == 0) { return; @@ -204,15 +198,15 @@ private Runnable getPostRequestRunnable(final List buffer) { success += buffer.size(); } - // Send the callback if available - if (requestCallback != null) { - if (failure != 0) { - requestCallback.onFailure(success, - buffer.stream().map(TrackerEvent::getEvent).collect(Collectors.toList())); - } else { - requestCallback.onSuccess(success); - } - } +// // Send the callback if available +// if (requestCallback != null) { +// if (failure != 0) { +// requestCallback.onFailure(success, +// buffer.stream().map(TrackerEvent::getEvent).collect(Collectors.toList())); +// } else { +// requestCallback.onSuccess(success); +// } +// } }; } @@ -222,16 +216,13 @@ private Runnable getPostRequestRunnable(final List buffer) { * @param buffer the event buffer * @return the constructed POST payload */ - private SelfDescribingJson getFinalPost(final List buffer) { + private SelfDescribingJson getFinalPost(final List buffer) { final List> toSendPayloads = new ArrayList<>(); final String sentTimestamp = Long.toString(System.currentTimeMillis()); - for (TrackerEvent event : buffer) { - List payloads = event.getTrackerPayloads(); - for (TrackerPayload payload : payloads) { - payload.add(Parameter.DEVICE_SENT_TIMESTAMP, sentTimestamp); - toSendPayloads.add(payload.getMap()); - } + for (TrackerPayload payload : buffer) { + payload.add(Parameter.DEVICE_SENT_TIMESTAMP, sentTimestamp); + toSendPayloads.add(payload.getMap()); } return new SelfDescribingJson(Constants.SCHEMA_PAYLOAD_DATA, toSendPayloads); diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/Emitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/Emitter.java index 95ea7515..8e323c13 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/Emitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/Emitter.java @@ -26,10 +26,8 @@ public interface Emitter { * Adds an event to the buffer and checks whether * we have reached the buffer limit yet. * - * @param event an event to be emitted + * @param payload an event to be emitted */ - void add(TrackerEvent event); - void add(TrackerPayload payload); /** @@ -63,5 +61,5 @@ public interface Emitter { * * @return the buffer events */ - List getBuffer(); + List getBuffer(); } diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/EventStore.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/EventStore.java index 07a4d4a4..521478ae 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/EventStore.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/EventStore.java @@ -4,14 +4,15 @@ import java.util.List; import com.snowplowanalytics.snowplow.tracker.payload.TrackerEvent; +import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload; public interface EventStore { - boolean add(TrackerEvent trackerEvent); + boolean add(TrackerPayload trackerPayload); - List removeEvents(int numberToRemove); + List removeEvents(int numberToRemove); int getSize(); - List getAllEvents(); + List getAllEvents(); } diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStore.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStore.java index 6fb3362e..be2f67aa 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStore.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStore.java @@ -1,6 +1,7 @@ package com.snowplowanalytics.snowplow.tracker.emitter; import com.snowplowanalytics.snowplow.tracker.payload.TrackerEvent; +import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload; import java.util.ArrayList; import java.util.concurrent.BlockingQueue; @@ -8,18 +9,18 @@ import java.util.List; public class InMemoryEventStore implements EventStore { - public final BlockingQueue eventBuffer = new LinkedBlockingQueue<>(); + public final BlockingQueue eventBuffer = new LinkedBlockingQueue<>(); @Override - public boolean add(TrackerEvent trackerEvent) { - return eventBuffer.offer(trackerEvent); + public boolean add(TrackerPayload trackerPayload) { + return eventBuffer.offer(trackerPayload); } @Override - public List removeEvents(int numberToRemove) { + public List removeEvents(int numberToRemove) { // if numberToRemove is greater than the number of events present, // it will return all the events (there's no error) - List eventsList = new ArrayList<>(); + List eventsList = new ArrayList<>(); eventBuffer.drainTo(eventsList, numberToRemove); return eventsList; } @@ -30,7 +31,7 @@ public int getSize() { } @Override - public List getAllEvents() { + public List getAllEvents() { return new ArrayList<>(eventBuffer); } } diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/SimpleEmitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/SimpleEmitter.java index 3132ba79..6b6a63e8 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/SimpleEmitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/SimpleEmitter.java @@ -52,16 +52,16 @@ protected SimpleEmitter(final Builder builder) { super(builder); } - /** - * Adds an event to the buffer and instantly sends it - * - * @param event an event - */ - @Override - public void add(final TrackerEvent event) { - execute(getGetRequestRunnable(event)); - } - +// /** +// * Adds an event to the buffer and instantly sends it +// * +// * @param event an event +// */ +// @Override +// public void add(final TrackerEvent event) { +// execute(getGetRequestRunnable(event)); +// } +// @Override public void add(TrackerPayload payload) { // nothing happens @@ -126,7 +126,7 @@ public void run() { * @return the empty buffer */ @Override - public List getBuffer() { + public List getBuffer() { return new ArrayList<>(); } diff --git a/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java b/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java index 53f5ba7d..068d40fa 100644 --- a/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java +++ b/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java @@ -33,17 +33,13 @@ public class TrackerTest { public static final String EXPECTED_EVENT_ID = "15e9b149-6029-4f6e-8447-5b9797c9e6be"; public static class MockEmitter implements Emitter { - public ArrayList eventList = new ArrayList<>(); - - @Override - public void add(TrackerEvent event) { - eventList.add(event); - } + public ArrayList eventList = new ArrayList<>(); @Override public void add(TrackerPayload payload) { System.out.println("MockEmitter got a payload"); System.out.println(payload.toString()); + eventList.add(payload); } @Override @@ -58,7 +54,7 @@ public int getBufferSize() { } @Override - public List getBuffer() { + public List getBuffer() { return null; } } @@ -116,7 +112,7 @@ public void testEcommerceEvent() { .build()); // Then - List results = mockEmitter.eventList.get(0).getTrackerPayloads(); + List results = mockEmitter.eventList; assertEquals(2, results.size()); Map result1 = results.get(0).getMap(); @@ -179,7 +175,7 @@ public void testUnstructuredEventWithContext() { .build()); // Then - Map result = mockEmitter.eventList.get(0).getTrackerPayloads().get(0).getMap(); + Map result = mockEmitter.eventList.get(0).getMap(); assertEquals(ImmutableMap.builder() .put("p", "srv") .put("tv", Version.TRACKER) @@ -209,7 +205,7 @@ public void testUnstructuredEventWithoutContext() { .build()); // Then - Map result = mockEmitter.eventList.get(0).getTrackerPayloads().get(0).getMap(); + Map result = mockEmitter.eventList.get(0).getMap(); assertEquals(ImmutableMap.builder() .put("p", "srv") .put("tv", Version.TRACKER) @@ -237,7 +233,7 @@ public void testUnstructuredEventWithoutTrueTimestamp() { .build()); // Then - Map result = mockEmitter.eventList.get(0).getTrackerPayloads().get(0).getMap(); + Map result = mockEmitter.eventList.get(0).getMap(); assertEquals(ImmutableMap.builder() .put("p", "srv") .put("tv", Version.TRACKER) @@ -271,7 +267,7 @@ public void testTrackPageView() { .build()); // Then - Map result = mockEmitter.eventList.get(0).getTrackerPayloads().get(0).getMap(); + Map result = mockEmitter.eventList.get(0).getMap(); assertEquals(ImmutableMap.builder() .put("dtm", "123456") .put("ttm", "456789") @@ -311,10 +307,10 @@ public void testTrackTwoEvents() { .build()); // Then - List results = mockEmitter.eventList; + List results = mockEmitter.eventList; assertEquals(2, results.size()); - Map result1 = results.get(0).getTrackerPayloads().get(0).getMap(); + Map result1 = results.get(0).getMap(); assertEquals(ImmutableMap.builder() .put("dtm", "123456") .put("ttm", "456789") @@ -330,7 +326,7 @@ public void testTrackTwoEvents() { .put("url", "url") .build(), result1); - Map result2 = results.get(1).getTrackerPayloads().get(0).getMap(); + Map result2 = results.get(1).getMap(); assertEquals(ImmutableMap.builder() .put("dtm", "123456") .put("ttm", "456789") @@ -360,7 +356,7 @@ public void testTrackScreenView() { .build()); // Then - Map result = mockEmitter.eventList.get(0).getTrackerPayloads().get(0).getMap(); + Map result = mockEmitter.eventList.get(0).getMap(); assertEquals(ImmutableMap.builder() .put("dtm", "123456") .put("ttm", "456789") @@ -388,7 +384,7 @@ public void testTrackScreenViewWithTimestamp() { .build()); // Then - Map result = mockEmitter.eventList.get(0).getTrackerPayloads().get(0).getMap(); + Map result = mockEmitter.eventList.get(0).getMap(); assertEquals(ImmutableMap.builder() .put("dtm", "123456") .put("ttm", "456789") @@ -416,7 +412,7 @@ public void testTrackScreenViewWithDefaultContextAndTimestamp() { .build()); // Then - Map result = mockEmitter.eventList.get(0).getTrackerPayloads().get(0).getMap(); + Map result = mockEmitter.eventList.get(0).getMap(); assertEquals(ImmutableMap.builder() .put("p", "srv") .put("tv", Version.TRACKER) @@ -447,7 +443,7 @@ public void testTrackTiming() { .build()); // Then - Map result = mockEmitter.eventList.get(0).getTrackerPayloads().get(0).getMap(); + Map result = mockEmitter.eventList.get(0).getMap(); assertEquals(ImmutableMap.builder() .put("p", "srv") .put("tv", Version.TRACKER) @@ -484,7 +480,7 @@ public void testTrackTimingWithSubject() { .build()); // Then - Map result = mockEmitter.eventList.get(0).getTrackerPayloads().get(0).getMap(); + Map result = mockEmitter.eventList.get(0).getMap(); assertEquals(ImmutableMap.builder() .put("p", "srv") .put("ue_pr", "{\"schema\":\"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0\",\"data\":{\"schema\":\"iglu:com.snowplowanalytics.snowplow/timing/jsonschema/1-0-0\",\"data\":{\"category\":\"category\",\"label\":\"label\",\"timing\":10,\"variable\":\"variable\"}}}") diff --git a/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitterTest.java b/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitterTest.java index d0c99948..47d69843 100644 --- a/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitterTest.java +++ b/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitterTest.java @@ -19,19 +19,16 @@ import com.google.common.collect.Lists; +import com.snowplowanalytics.snowplow.tracker.constants.Parameter; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; -import com.snowplowanalytics.snowplow.tracker.DevicePlatform; import com.snowplowanalytics.snowplow.tracker.payload.SelfDescribingJson; -import com.snowplowanalytics.snowplow.tracker.payload.TrackerEvent; -import com.snowplowanalytics.snowplow.tracker.payload.TrackerParameters; import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload; -import com.snowplowanalytics.snowplow.tracker.constants.Parameter; import com.snowplowanalytics.snowplow.tracker.events.PageView; import com.snowplowanalytics.snowplow.tracker.http.HttpClientAdapter; @@ -80,23 +77,23 @@ public void setUp() { @Test public void addToBuffer_withLess10Payloads_shouldNotEmptyBuffer() throws InterruptedException { - List events = createEvents(2); - for (TrackerEvent event : events) { - emitter.add(event); + List payloads = createPayloads(2); + for (TrackerPayload payload : payloads) { + emitter.add(payload); } Thread.sleep(500); Assert.assertFalse(mockHttpClientAdapter.isPostCalled); Assert.assertEquals(2, emitter.getBuffer().size()); - Assert.assertEquals(events, emitter.getBuffer()); + Assert.assertEquals(payloads, emitter.getBuffer()); } @Test public void addToBuffer_withMore10Payloads_shouldEmptyBuffer() throws InterruptedException { - List events = createEvents(10); - for (TrackerEvent event : events) { - emitter.add(event); + List payloads = createPayloads(10); + for (TrackerPayload payload : payloads) { + emitter.add(payload); } Thread.sleep(500); @@ -105,15 +102,15 @@ public void addToBuffer_withMore10Payloads_shouldEmptyBuffer() throws Interrupte @SuppressWarnings("unchecked") List> capturedPayload = (List>) mockHttpClientAdapter.capturedPayload.getMap().get("data"); - assertPayload(events, capturedPayload); + assertPayload(payloads, capturedPayload); Assert.assertEquals(0, emitter.getBuffer().size()); } @Test public void flushBuffer_shouldEmptyBuffer() throws InterruptedException { - List events = createEvents(2); - for (TrackerEvent event : events) { - emitter.add(event); + List payloads = createPayloads(2); + for (TrackerPayload payload : payloads) { + emitter.add(payload); } emitter.flushBuffer(); @@ -123,7 +120,7 @@ public void flushBuffer_shouldEmptyBuffer() throws InterruptedException { @SuppressWarnings("unchecked") List> capturedPayload = (List>) mockHttpClientAdapter.capturedPayload.getMap().get("data"); - assertPayload(events, capturedPayload); + assertPayload(payloads, capturedPayload); Assert.assertEquals(0, emitter.getBuffer().size()); } @@ -138,9 +135,9 @@ public void setAndGetBufferSizeWorksAsExpected() throws InterruptedException { emitter.setBufferSize(2); Assert.assertEquals(2, emitter.getBufferSize()); - List events = createEvents(2); - for (TrackerEvent event : events) { - emitter.add(event); + List payloads = createPayloads(2); + for (TrackerPayload payload : payloads) { + emitter.add(payload); } Thread.sleep(500); @@ -151,9 +148,9 @@ public void setAndGetBufferSizeWorksAsExpected() throws InterruptedException { @Test public void getFinalPost_shouldAddSTMParameter() throws InterruptedException { - List events = createEvents(10); - for (TrackerEvent event : events) { - emitter.add(event); + List payloads = createPayloads(10); + for (TrackerPayload payload : payloads) { + emitter.add(payload); } Thread.sleep(500); @@ -161,7 +158,7 @@ public void getFinalPost_shouldAddSTMParameter() throws InterruptedException { Assert.assertTrue(mockHttpClientAdapter.isPostCalled); @SuppressWarnings("unchecked") List> capturedPayload = (List>) mockHttpClientAdapter.capturedPayload.getMap().get("data"); - + for (Map payloadMap : capturedPayload) { Assert.assertTrue(payloadMap.containsKey(Parameter.DEVICE_SENT_TIMESTAMP)); } @@ -201,9 +198,9 @@ public void threadsHaveExpectedNames() { @Test public void close_sendsEventsAndStopsThreads() throws InterruptedException { - List events = createEvents(2); - for (TrackerEvent event : events) { - emitter.add(event); + List payloads = createPayloads(2); + for (TrackerPayload payload : payloads) { + emitter.add(payload); } emitter.close(); @@ -214,36 +211,35 @@ public void close_sendsEventsAndStopsThreads() throws InterruptedException { Assert.assertEquals(0, emitter.getBuffer().size()); // these events can be added to storage but should not be sent - List moreEvents = createEvents(20); - for (TrackerEvent event : moreEvents) { - emitter.add(event); + List morePayloads = createPayloads(20); + for (TrackerPayload payload : morePayloads) { + emitter.add(payload); } Assert.assertEquals(20, emitter.getBuffer().size()); } - private List createEvents(int numEvents) { - final List payloads = Lists.newArrayList(); - for (int i = 0; i < numEvents; i++) { - payloads.add(createEvent()); - } - return payloads; - } - - private TrackerEvent createEvent() { + private TrackerPayload createPayload() { PageView pv = PageView.builder() - .pageUrl("https://www.snowplowanalytics.com/") - .pageTitle("Snowplow") - .referrer("https://www.google.com/") - .build(); + .pageUrl("https://www.snowplowanalytics.com/") + .pageTitle("Snowplow") + .referrer("https://www.google.com/") + .build(); + + return pv.getPayload(); + } - return new TrackerEvent(pv, new TrackerParameters("appId", DevicePlatform.ServerSideApp, "namespace", "0.0.0", false), null); + private List createPayloads(int numPayloads) { + final List payloads = Lists.newArrayList(); + for (int i = 0; i < numPayloads; i++) { + payloads.add(createPayload()); + } + return payloads; } - private void assertPayload(List events, List> capturedPayload) { + private void assertPayload(List payloads, List> capturedPayload) { List> eventPayloads = new ArrayList<>(); - for (TrackerEvent event : events) { - //All PageView events so we can get(0) from payloads - eventPayloads.add(event.getTrackerPayloads().get(0).getMap()); + for (TrackerPayload payload : payloads) { + eventPayloads.add(payload.getMap()); } //Iterate through all captured payloads @@ -264,3 +260,4 @@ private void assertPayload(List events, List> } } } + diff --git a/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStoreTest.java b/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStoreTest.java index de8162b7..a85fb362 100644 --- a/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStoreTest.java +++ b/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStoreTest.java @@ -12,10 +12,12 @@ */ package com.snowplowanalytics.snowplow.tracker.emitter; +import com.google.common.collect.Lists; import com.snowplowanalytics.snowplow.tracker.DevicePlatform; import com.snowplowanalytics.snowplow.tracker.events.PageView; import com.snowplowanalytics.snowplow.tracker.payload.TrackerEvent; import com.snowplowanalytics.snowplow.tracker.payload.TrackerParameters; +import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -25,78 +27,78 @@ public class InMemoryEventStoreTest { - private TrackerEvent trackerEvent; + private TrackerPayload trackerPayload; private InMemoryEventStore eventStore; - private List singleEventList; - private List twoEventsList; + private List singleEventList; + private List twoEventsList; @Before public void setUp() { - trackerEvent = createEvent(); + trackerPayload = createPayload(); eventStore = new InMemoryEventStore(); singleEventList = new ArrayList<>(); twoEventsList = new ArrayList<>(); - singleEventList.add(trackerEvent); - twoEventsList.add(trackerEvent); - twoEventsList.add(trackerEvent); + singleEventList.add(trackerPayload); + twoEventsList.add(trackerPayload); + twoEventsList.add(trackerPayload); } @Test public void correctlyAddAnEventToStore() { - boolean result = eventStore.add(trackerEvent); + boolean result = eventStore.add(trackerPayload); Assert.assertTrue(result); } @Test public void getSize_returnsCorrectNumberOfStoredEvents() { - storeTwoEvents(); + storeTwoPayloads(); Assert.assertEquals(2, eventStore.getSize()); } @Test public void removeAddedEvent() { - storeTwoEvents(); + storeTwoPayloads(); - List removedEventList = eventStore.removeEvents(1); + List removedEventList = eventStore.removeEvents(1); Assert.assertEquals(singleEventList, removedEventList); Assert.assertEquals(1, eventStore.getSize()); } @Test public void removeAllEventsIfAskedForMoreEventsThanAreStored() { - storeTwoEvents(); + storeTwoPayloads(); - List removedEventList = eventStore.removeEvents(100); + List removedEventList = eventStore.removeEvents(100); Assert.assertEquals(twoEventsList, removedEventList); Assert.assertEquals(0, eventStore.getSize()); } @Test public void getAllEvents_doesNotRemoveEventsFromStore() { - storeTwoEvents(); + storeTwoPayloads(); - List retrievedEventsList = eventStore.getAllEvents(); + List retrievedEventsList = eventStore.getAllEvents(); Assert.assertEquals(twoEventsList, retrievedEventsList); Assert.assertEquals(2, eventStore.getSize()); } - private TrackerEvent createEvent() { + private TrackerPayload createPayload() { PageView pv = PageView.builder() .pageUrl("https://www.snowplowanalytics.com/") .pageTitle("Snowplow") .referrer("https://www.google.com/") .build(); - return new TrackerEvent(pv, new TrackerParameters("appId", DevicePlatform.ServerSideApp, "namespace", "0.0.0", false), null); + return pv.getPayload(); } - private void storeTwoEvents() { - for (TrackerEvent event : twoEventsList) { - eventStore.add(event); + private void storeTwoPayloads() { + for (TrackerPayload payload : twoEventsList) { + eventStore.add(payload); } } } From 8d332b5fb722ead67750960b99df65e7fd143bea Mon Sep 17 00:00:00 2001 From: Miranda Wilson Date: Mon, 17 Jan 2022 12:09:01 +0000 Subject: [PATCH 4/7] Remove TrackerEvent --- .../snowplow/tracker/Tracker.java | 7 - .../tracker/emitter/AbstractEmitter.java | 11 +- .../tracker/emitter/BatchEmitter.java | 21 +-- .../snowplow/tracker/emitter/Emitter.java | 9 +- .../snowplow/tracker/emitter/EventStore.java | 2 - .../tracker/emitter/InMemoryEventStore.java | 1 - .../tracker/emitter/SimpleEmitter.java | 50 ++---- .../tracker/payload/TrackerEvent.java | 157 ------------------ .../tracker/payload/TrackerParameters.java | 5 - .../snowplow/tracker/TrackerTest.java | 1 - .../emitter/InMemoryEventStoreTest.java | 4 - 11 files changed, 28 insertions(+), 240 deletions(-) delete mode 100644 src/main/java/com/snowplowanalytics/snowplow/tracker/payload/TrackerEvent.java diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java index 6b299e39..be47d431 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java @@ -19,7 +19,6 @@ import com.snowplowanalytics.snowplow.tracker.emitter.Emitter; import com.snowplowanalytics.snowplow.tracker.events.*; import com.snowplowanalytics.snowplow.tracker.payload.SelfDescribingJson; -import com.snowplowanalytics.snowplow.tracker.payload.TrackerEvent; import com.snowplowanalytics.snowplow.tracker.payload.TrackerParameters; import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload; @@ -206,12 +205,6 @@ public void track(Event event) { addSubject(processedEvent, payload); this.emitter.add(payload); } - -// TrackerEvent trackerEvent = new TrackerEvent(event, this.parameters, this.subject); -// -// // Send the event to the Emitter -// // change this to send the payload to Emitter instead -// this.emitter.add(trackerEvent); } private List eventTypeSpecificPreProcessing(Event event) { diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java index bf43102b..b1923b76 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java @@ -22,7 +22,6 @@ import com.snowplowanalytics.snowplow.tracker.http.HttpClientAdapter; import com.snowplowanalytics.snowplow.tracker.http.OkHttpClientAdapter; -import com.snowplowanalytics.snowplow.tracker.payload.TrackerEvent; import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload; import okhttp3.OkHttpClient; @@ -134,7 +133,7 @@ protected AbstractEmitter(final Builder builder) { } this.requestCallback = builder.requestCallback; - + if (builder.requestExecutorService != null) { this.executor = builder.requestExecutorService; } else { @@ -143,9 +142,9 @@ protected AbstractEmitter(final Builder builder) { } /** - * Adds an event to the buffer + * Adds a payload to the buffer * - * @param payload an event + * @param payload an payload */ @Override public abstract void add(TrackerPayload payload); @@ -160,7 +159,7 @@ protected AbstractEmitter(final Builder builder) { public abstract void setBufferSize(final int bufferSize); /** - * Removes all events from the buffer and sends them + * Removes all payloads from the buffer and sends them */ @Override public abstract void flushBuffer(); @@ -174,7 +173,7 @@ protected AbstractEmitter(final Builder builder) { public abstract int getBufferSize(); /** - * Returns List of Events that are in the buffer. + * Returns List of Payloads that are in the buffer. * * @return the buffered events */ diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java index 66d15bfb..3d167da4 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java @@ -24,7 +24,6 @@ import com.snowplowanalytics.snowplow.tracker.constants.Constants; import com.snowplowanalytics.snowplow.tracker.constants.Parameter; import com.snowplowanalytics.snowplow.tracker.payload.SelfDescribingJson; -import com.snowplowanalytics.snowplow.tracker.payload.TrackerEvent; import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload; import org.slf4j.Logger; @@ -32,7 +31,7 @@ /** * An emitter that emit a batch of events in a single call - * It uses the post method of under-laying http adapter + * It uses the post method of underlying http adapter */ public class BatchEmitter extends AbstractEmitter implements Closeable { @@ -100,21 +99,21 @@ protected BatchEmitter(final Builder builder) { } /** - * Adds a TrackerEvent to the concurrent queue buffer + * Adds a TrackerPayload to the concurrent queue buffer * - * @param payload an event + * @param payload a payload */ @Override public void add(final TrackerPayload payload) { boolean result = eventStore.add(payload); if (!result) { - LOGGER.error("Unable to add event to emitter, emitter buffer is full"); + LOGGER.error("Unable to add payload to emitter, emitter buffer is full"); } } /* - * Forces all the events currently in the buffer to be sent + * Forces all the payloads currently in the buffer to be sent */ @Override public void flushBuffer() { @@ -122,7 +121,7 @@ public void flushBuffer() { } /** - * Returns List of Events that are in the buffer. + * Returns List of Payloads that are in the buffer. * * @return the buffered events */ @@ -200,13 +199,7 @@ private Runnable getPostRequestRunnable(final List buffer) { // // Send the callback if available // if (requestCallback != null) { -// if (failure != 0) { -// requestCallback.onFailure(success, -// buffer.stream().map(TrackerEvent::getEvent).collect(Collectors.toList())); -// } else { -// requestCallback.onSuccess(success); -// } -// } + }; } diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/Emitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/Emitter.java index 8e323c13..fddc0c56 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/Emitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/Emitter.java @@ -14,7 +14,6 @@ import java.util.List; -import com.snowplowanalytics.snowplow.tracker.payload.TrackerEvent; import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload; /** @@ -23,17 +22,17 @@ public interface Emitter { /** - * Adds an event to the buffer and checks whether + * Adds a payload to the buffer and checks whether * we have reached the buffer limit yet. * - * @param payload an event to be emitted + * @param payload a payload to be emitted */ void add(TrackerPayload payload); /** * Customize the emitter buffer size to any valid integer * greater than zero. - * - Will only effect the BatchEmitter + * - Will only affect the BatchEmitter * * @param bufferSize number of events to collect before * sending @@ -57,7 +56,7 @@ public interface Emitter { int getBufferSize(); /** - * Returns the List of Events that are in the buffer. + * Returns the List of Payloads that are in the buffer. * * @return the buffer events */ diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/EventStore.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/EventStore.java index 521478ae..4f61e1d3 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/EventStore.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/EventStore.java @@ -1,9 +1,7 @@ package com.snowplowanalytics.snowplow.tracker.emitter; -import java.util.Collection; import java.util.List; -import com.snowplowanalytics.snowplow.tracker.payload.TrackerEvent; import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload; public interface EventStore { diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStore.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStore.java index be2f67aa..e3ab9477 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStore.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStore.java @@ -1,6 +1,5 @@ package com.snowplowanalytics.snowplow.tracker.emitter; -import com.snowplowanalytics.snowplow.tracker.payload.TrackerEvent; import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload; import java.util.ArrayList; diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/SimpleEmitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/SimpleEmitter.java index 6b6a63e8..d165710e 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/SimpleEmitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/SimpleEmitter.java @@ -18,10 +18,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.snowplowanalytics.snowplow.tracker.payload.TrackerEvent; import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload; import com.snowplowanalytics.snowplow.tracker.constants.Parameter; -import com.snowplowanalytics.snowplow.tracker.events.Event; /** * An emitter which sends events as soon as they are received via @@ -52,16 +50,6 @@ protected SimpleEmitter(final Builder builder) { super(builder); } -// /** -// * Adds an event to the buffer and instantly sends it -// * -// * @param event an event -// */ -// @Override -// public void add(final TrackerEvent event) { -// execute(getGetRequestRunnable(event)); -// } -// @Override public void add(TrackerPayload payload) { // nothing happens @@ -79,42 +67,28 @@ public void flushBuffer() { /** * Returns a Runnable GET Request operation * - * @param event the event to be sent + * @param payload the event to be sent * @return the new Callable object */ - private Runnable getGetRequestRunnable(final TrackerEvent event) { + private Runnable getGetRequestRunnable(final TrackerPayload payload) { return new Runnable() { @Override public void run() { int success = 0; int failure = 0; - List payloads = event.getTrackerPayloads(); - - for (TrackerPayload payload : payloads) { - payload.add(Parameter.DEVICE_SENT_TIMESTAMP, Long.toString(System.currentTimeMillis())); - final int code = httpClientAdapter.get(payload); - - // Process results - if (!isSuccessfulSend(code)) { - LOGGER.error("SimpleEmitter failed to send {} events: code: {}", 1, code); - failure += 1; - } else { - LOGGER.debug("SimpleEmitter successfully sent {} events: code: {}", 1, code); - success += 1; - } + payload.add(Parameter.DEVICE_SENT_TIMESTAMP, Long.toString(System.currentTimeMillis())); + final int code = httpClientAdapter.get(payload); + + // Process results + if (!isSuccessfulSend(code)) { + LOGGER.error("SimpleEmitter failed to send {} events: code: {}", 1, code); + failure += 1; + } else { + LOGGER.debug("SimpleEmitter successfully sent {} events: code: {}", 1, code); + success += 1; } - // Send the callback if available - if (requestCallback != null) { - if (failure != 0) { - final List buffer = new ArrayList<>(); - buffer.add(event.getEvent()); - requestCallback.onFailure(success, buffer); - } else { - requestCallback.onSuccess(success); - } - } } }; } diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/payload/TrackerEvent.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/payload/TrackerEvent.java deleted file mode 100644 index 8b0216be..00000000 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/payload/TrackerEvent.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright (c) 2014-2021 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.tracker.payload; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import com.snowplowanalytics.snowplow.tracker.Subject; -import com.snowplowanalytics.snowplow.tracker.constants.Constants; -import com.snowplowanalytics.snowplow.tracker.constants.Parameter; -import com.snowplowanalytics.snowplow.tracker.events.*; - -/** - * A TrackerEvent which allows the TrackerPayload to be filled later. The payload will be - * filled by the Emitter in the Emitter thread, using the getTrackerPayload() method. - */ -public class TrackerEvent { - - private final Event event; - private final TrackerParameters parameters; - private final Subject subject; - - public TrackerEvent(final Event event, final TrackerParameters parameters, final Subject subject) { - this.event = event; - this.parameters = parameters; - this.subject = subject; - } - - /** - * Returns the {@link Event} - * - * @return The {@link Event} - */ - public Event getEvent() { - return this.event; - } - - /** - * Converts a {@link Event} to a list of {@link TrackerPayload} and caches the values. - * Returns a list as some Events contain nested payloads (e.g. {@link EcommerceTransaction}) - * Adds fields to the {@link TrackerPayload} based on the type of the {@link Event}. - * - * @return The populated TrackerPayloads - */ - public List getTrackerPayloads() { - final List payloads = new ArrayList<>(); - final List contexts = event.getContext(); - final Subject subject = event.getSubject(); - - // Figure out what type of event it is - final Class eventClass = event.getClass(); - - if (eventClass.equals(Unstructured.class)) { - - // Need to set the Base64 rule for Unstructured events - final Unstructured unstructured = (Unstructured) event; - unstructured.setBase64Encode(this.parameters.getBase64Encoded()); - completePayloadCreation(payloads, unstructured.getPayload(), subject, contexts); - - } else if (eventClass.equals(Timing.class) || eventClass.equals(ScreenView.class)) { - - // These are wrapper classes for Unstructured events; need to create - // Unstructured events from them and resend. - final Unstructured unstructured = Unstructured.builder() - .eventData((SelfDescribingJson) event.getPayload()) - .customContext(contexts) - .deviceCreatedTimestamp(event.getDeviceCreatedTimestamp()) - .trueTimestamp(event.getTrueTimestamp()) - .eventId(event.getEventId()) - .subject(subject) - .build(); - - unstructured.setBase64Encode(this.parameters.getBase64Encoded()); - completePayloadCreation(payloads, unstructured.getPayload(), subject, contexts); - - } else if (eventClass.equals(EcommerceTransaction.class)) { - - final EcommerceTransaction ecommerceTransaction = (EcommerceTransaction) event; - completePayloadCreation(payloads, ecommerceTransaction.getPayload(), subject, contexts); - - // Track each item individually - for (final EcommerceTransactionItem item : ecommerceTransaction.getItems()) { - - item.setDeviceCreatedTimestamp(ecommerceTransaction.getDeviceCreatedTimestamp()); - completePayloadCreation(payloads, item.getPayload(), item.getSubject(), item.getContext()); - } - } else { - - // For all other events, simply get the payload - TrackerPayload payload = (TrackerPayload) event.getPayload(); - completePayloadCreation(payloads, payload, subject, contexts); - } - return payloads; - } - - private void completePayloadCreation(List payloads, TrackerPayload payload, Subject subject, List contexts) { - addTrackerParameters(payload); - addContextsAndSubject(contexts, subject, payload); - payloads.add(payload); - } - - /** - * Adds the context and subject to the event payload - * - * @param contexts the base event context - can be null or empty - * @param subject the event subject - can be null - * @param payload the payload to add the contexts and subjects to - */ - private void addContextsAndSubject(final List contexts, final Subject subject, TrackerPayload payload) { - // Build the final context and add it to the payload - if (contexts != null && contexts.size() > 0) { - SelfDescribingJson envelope = getFinalContext(contexts); - payload.addMap(envelope.getMap(), this.parameters.getBase64Encoded(), Parameter.CONTEXT_ENCODED, Parameter.CONTEXT); - } - - // Add subject if available - if (subject != null) { - payload.addMap(new HashMap<>(subject.getSubject())); - } else if (this.subject != null) { - payload.addMap(new HashMap<>(this.subject.getSubject())); - } - } - - /** - * Builds the final event context. - * - * @param contexts the base event context - * @return the final event context json with many contexts inside - */ - private SelfDescribingJson getFinalContext(List contexts) { - List> contextMaps = new LinkedList<>(); - for (SelfDescribingJson selfDescribingJson : contexts) { - contextMaps.add(selfDescribingJson.getMap()); - } - return new SelfDescribingJson(Constants.SCHEMA_CONTEXTS, contextMaps); - } - - private void addTrackerParameters(TrackerPayload payload) { - payload.add(Parameter.PLATFORM, this.parameters.getPlatform().toString()); - payload.add(Parameter.APP_ID, this.parameters.getAppId()); - payload.add(Parameter.NAMESPACE, this.parameters.getNamespace()); - payload.add(Parameter.TRACKER_VERSION, this.parameters.getTrackerVersion()); - } -} diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/payload/TrackerParameters.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/payload/TrackerParameters.java index 90bac316..66d5be3e 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/payload/TrackerParameters.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/payload/TrackerParameters.java @@ -14,11 +14,6 @@ import com.snowplowanalytics.snowplow.tracker.DevicePlatform; -/** - * A TrackerEvent which allows the TrackerPayload to be filled later. The - * payload will be filled by the Emitter in the Emitter thread, using the - * getTrackerPayload() method. - */ public class TrackerParameters { private final String trackerVersion; diff --git a/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java b/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java index 068d40fa..dea1ad6a 100644 --- a/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java +++ b/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java @@ -25,7 +25,6 @@ import com.snowplowanalytics.snowplow.tracker.emitter.Emitter; import com.snowplowanalytics.snowplow.tracker.events.*; import com.snowplowanalytics.snowplow.tracker.payload.SelfDescribingJson; -import com.snowplowanalytics.snowplow.tracker.payload.TrackerEvent; public class TrackerTest { diff --git a/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStoreTest.java b/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStoreTest.java index a85fb362..7c341dfb 100644 --- a/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStoreTest.java +++ b/src/test/java/com/snowplowanalytics/snowplow/tracker/emitter/InMemoryEventStoreTest.java @@ -12,11 +12,7 @@ */ package com.snowplowanalytics.snowplow.tracker.emitter; -import com.google.common.collect.Lists; -import com.snowplowanalytics.snowplow.tracker.DevicePlatform; import com.snowplowanalytics.snowplow.tracker.events.PageView; -import com.snowplowanalytics.snowplow.tracker.payload.TrackerEvent; -import com.snowplowanalytics.snowplow.tracker.payload.TrackerParameters; import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload; import org.junit.Assert; import org.junit.Before; From 5db44053d4c6b496553a9c82c0fdee66493edbce Mon Sep 17 00:00:00 2001 From: Miranda Wilson Date: Mon, 17 Jan 2022 17:45:01 +0000 Subject: [PATCH 5/7] Remove request callbacks --- .../main/java/com/snowplowanalytics/Main.java | 14 ------- .../snowplow/tracker/Tracker.java | 13 +++--- .../tracker/emitter/AbstractEmitter.java | 16 -------- .../tracker/emitter/BatchEmitter.java | 9 ---- .../tracker/emitter/RequestCallback.java | 41 ------------------- .../tracker/emitter/SimpleEmitter.java | 5 --- 6 files changed, 8 insertions(+), 90 deletions(-) delete mode 100644 src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/RequestCallback.java diff --git a/examples/simple-console/src/main/java/com/snowplowanalytics/Main.java b/examples/simple-console/src/main/java/com/snowplowanalytics/Main.java index 6b9f2ab3..037bb9d5 100644 --- a/examples/simple-console/src/main/java/com/snowplowanalytics/Main.java +++ b/examples/simple-console/src/main/java/com/snowplowanalytics/Main.java @@ -17,7 +17,6 @@ import com.snowplowanalytics.snowplow.tracker.Tracker; import com.snowplowanalytics.snowplow.tracker.emitter.BatchEmitter; import com.snowplowanalytics.snowplow.tracker.emitter.Emitter; -import com.snowplowanalytics.snowplow.tracker.emitter.RequestCallback; import com.snowplowanalytics.snowplow.tracker.events.*; import com.snowplowanalytics.snowplow.tracker.payload.SelfDescribingJson; import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload; @@ -53,19 +52,6 @@ public static void main(String[] args) { // build an emitter, this is used by the tracker to batch and schedule transmission of events BatchEmitter emitter = BatchEmitter.builder() .url(collectorEndpoint) - .requestCallback(new RequestCallback() { - // let us know on successes (may be called multiple times) - @Override - public synchronized void onSuccess(int successCount) { - System.out.println("Successfully sent " + successCount + " events"); - } - - // let us know if something has gone wrong (may be called multiple times) - @Override - public synchronized void onFailure(int successCount, List failedEvents) { - System.err.println("Successfully sent " + successCount + " events; failed to send " + failedEvents.size() + " events"); - } - }) .bufferSize(4) // send an event every time one is given (no batching). In production this number should be higher, depending on the size/event volume .build(); diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java index be47d431..1e5c787e 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java @@ -208,10 +208,13 @@ public void track(Event event) { } private List eventTypeSpecificPreProcessing(Event event) { - // a list because Ecommerce events become multiple payloads + // Different event types must be processed in slightly different ways. + // EcommerceTransaction events are an outlier, as they are processed into + // multiple payloads (a "tr" event plus one "ti" event per item). + // Because of this, this method returns a list of Events. List eventList = new ArrayList<>(); - final Class eventClass = event.getClass(); + if (eventClass.equals(Unstructured.class)) { // Need to set the Base64 rule for Unstructured events final Unstructured unstructured = (Unstructured) event; @@ -219,7 +222,6 @@ private List eventTypeSpecificPreProcessing(Event event) { eventList.add(unstructured); } else if (eventClass.equals(EcommerceTransaction.class)) { - final EcommerceTransaction ecommerceTransaction = (EcommerceTransaction) event; eventList.add(ecommerceTransaction); @@ -229,8 +231,8 @@ private List eventTypeSpecificPreProcessing(Event event) { eventList.add(item); } } else if (eventClass.equals(Timing.class) || eventClass.equals(ScreenView.class)) { - // These are wrapper classes for Unstructured events; need to create - // Unstructured events from them and resend. + // Timing and ScreenView events are wrapper classes for Unstructured events + // Need to create Unstructured events from them to send. final Unstructured unstructured = Unstructured.builder() .eventData((SelfDescribingJson) event.getPayload()) .customContext(event.getContext()) @@ -242,6 +244,7 @@ private List eventTypeSpecificPreProcessing(Event event) { unstructured.setBase64Encode(this.parameters.getBase64Encoded()); eventList.add(unstructured); + } else { eventList.add(event); } diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java index b1923b76..6b3075a1 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java @@ -33,13 +33,11 @@ public abstract class AbstractEmitter implements Emitter { protected HttpClientAdapter httpClientAdapter; - protected RequestCallback requestCallback; protected ExecutorService executor; public static abstract class Builder> { private HttpClientAdapter httpClientAdapter; // Optional - private RequestCallback requestCallback = null; // Optional private int threadCount = 50; // Optional private ExecutorService requestExecutorService = null; // Optional private String collectorUrl = null; // Required if not specifying a httpClientAdapter @@ -68,18 +66,6 @@ public T httpClientAdapter(final HttpClientAdapter httpClientAdapter) { return self(); } - /** - * An optional Request Callback for adding the ability to handle failure cases - * for sending. - * - * @param requestCallback the emitter request callback - * @return itself - */ - public T requestCallback(final RequestCallback requestCallback) { - this.requestCallback = requestCallback; - return self(); - } - /** * Sets the Thread Count for the ExecutorService * @@ -132,8 +118,6 @@ protected AbstractEmitter(final Builder builder) { .build(); } - this.requestCallback = builder.requestCallback; - if (builder.requestExecutorService != null) { this.executor = builder.requestExecutorService; } else { diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java index 3d167da4..a4b0d11d 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java @@ -17,7 +17,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Preconditions; @@ -187,19 +186,11 @@ private Runnable getPostRequestRunnable(final List buffer) { final int code = httpClientAdapter.post(post); // Process results - int success = 0; - int failure = 0; if (!isSuccessfulSend(code)) { LOGGER.error("BatchEmitter failed to send {} events: code: {}", buffer.size(), code); - failure += buffer.size(); } else { LOGGER.debug("BatchEmitter successfully sent {} events: code: {}", buffer.size(), code); - success += buffer.size(); } - -// // Send the callback if available -// if (requestCallback != null) { - }; } diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/RequestCallback.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/RequestCallback.java deleted file mode 100644 index 4df7c8bb..00000000 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/RequestCallback.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2014-2021 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.tracker.emitter; - -import java.util.List; - -import com.snowplowanalytics.snowplow.tracker.events.Event; - -/** - * Provides a callback interface for reporting counts of successfully sent - * events and returning any failed events to be handled by the developer. - */ -public interface RequestCallback { - - /** - * If all events are sent successfully then the count - * of sent events are returned. - * - * @param successCount the successful count - */ - void onSuccess(int successCount); - - /** - * If all/some events failed then the count of successful - * events is returned along with all the failed Events. - * - * @param successCount the successful count - * @param failedEvents the list of failed events - */ - void onFailure(int successCount, List failedEvents); -} diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/SimpleEmitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/SimpleEmitter.java index d165710e..e3849efb 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/SimpleEmitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/SimpleEmitter.java @@ -74,19 +74,14 @@ private Runnable getGetRequestRunnable(final TrackerPayload payload) { return new Runnable() { @Override public void run() { - int success = 0; - int failure = 0; - payload.add(Parameter.DEVICE_SENT_TIMESTAMP, Long.toString(System.currentTimeMillis())); final int code = httpClientAdapter.get(payload); // Process results if (!isSuccessfulSend(code)) { LOGGER.error("SimpleEmitter failed to send {} events: code: {}", 1, code); - failure += 1; } else { LOGGER.debug("SimpleEmitter successfully sent {} events: code: {}", 1, code); - success += 1; } } From 60ae054ff3ef9909723be37cc953da97fb72155d Mon Sep 17 00:00:00 2001 From: Miranda Wilson Date: Mon, 17 Jan 2022 18:37:32 +0000 Subject: [PATCH 6/7] Use threadpool inside Tracker --- .../snowplow/tracker/Tracker.java | 102 ++++++++++++++++-- .../snowplow/tracker/TrackerTest.java | 76 ++++++++++--- 2 files changed, 155 insertions(+), 23 deletions(-) diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java index 1e5c787e..f52f38cc 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java @@ -23,12 +23,17 @@ import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; public class Tracker { private Emitter emitter; private Subject subject; private final TrackerParameters parameters; + protected ExecutorService executor; /** * Creates a new Snowplow Tracker. @@ -47,6 +52,12 @@ private Tracker(TrackerBuilder builder) { this.parameters = new TrackerParameters(builder.appId, builder.platform, builder.namespace, Version.TRACKER, builder.base64Encoded); this.emitter = builder.emitter; this.subject = builder.subject; + + if (builder.requestExecutorService != null) { + this.executor = builder.requestExecutorService; + } else { + this.executor = Executors.newScheduledThreadPool(builder.threadCount, new TrackerThreadFactory()); + } } /** @@ -60,6 +71,8 @@ public static class TrackerBuilder { private Subject subject = null; // Optional private DevicePlatform platform = DevicePlatform.ServerSideApp; // Optional private boolean base64Encoded = true; // Optional + private int threadCount = 50; // Optional + private ExecutorService requestExecutorService = null; // Optional /** * @param emitter Emitter to which events will be sent @@ -99,6 +112,30 @@ public TrackerBuilder base64(Boolean base64) { return this; } + /** + * Sets the Thread Count for the ExecutorService + * + * @param threadCount the size of the thread pool + * @return itself + */ + public TrackerBuilder threadCount(final int threadCount) { + this.threadCount = threadCount; + return this; + } + + /** + * Set a custom ExecutorService to send http request. + * + * @param executorService the ExecutorService to use + * @return itself + */ + public TrackerBuilder requestExecutorService(final ExecutorService executorService) { + this.requestExecutorService = executorService; + return this; + } + + + /** * Creates a new Tracker * @@ -188,6 +225,45 @@ public TrackerParameters getParameters() { // --- Event Tracking Functions + /** + * Sends a runnable to the executor service. + * + * @param runnable the runnable to be queued + */ + protected void execute(final Runnable runnable) { + this.executor.execute(runnable); + } + + /** + * Copied from `Executors.defaultThreadFactory()`. + * The only change is the generated name prefix. + */ + static class TrackerThreadFactory implements ThreadFactory { + private static final AtomicInteger poolNumber = new AtomicInteger(1); + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + TrackerThreadFactory() { + SecurityManager securityManager = System.getSecurityManager(); + this.group = securityManager != null ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup(); + this.namePrefix = "snowplow-tracker-pool-" + poolNumber.getAndIncrement() + "-event-thread-"; + } + + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(this.group, runnable, this.namePrefix + this.threadNumber.getAndIncrement(), 0L); + if (thread.isDaemon()) { + thread.setDaemon(false); + } + + if (thread.getPriority() != 5) { + thread.setPriority(5); + } + + return thread; + } + } + /** * Handles tracking the different types of events that * the Tracker can encounter. @@ -195,16 +271,22 @@ public TrackerParameters getParameters() { * @param event the event to track */ public void track(Event event) { - // a list because Ecommerce events become multiple Payloads - List processedEvents = eventTypeSpecificPreProcessing(event); - for (Event processedEvent : processedEvents) { - TrackerPayload payload = (TrackerPayload) processedEvent.getPayload(); - - addTrackerParameters(payload); - addContext(processedEvent, payload); - addSubject(processedEvent, payload); - this.emitter.add(payload); - } + execute(getProcessEventRunnable(event)); + } + + private Runnable getProcessEventRunnable(Event event) { + return () -> { + // a list because Ecommerce events become multiple Payloads + List processedEvents = eventTypeSpecificPreProcessing(event); + for (Event processedEvent : processedEvents) { + TrackerPayload payload = (TrackerPayload) processedEvent.getPayload(); + + addTrackerParameters(payload); + addContext(processedEvent, payload); + addSubject(processedEvent, payload); + this.emitter.add(payload); + } + }; } private List eventTypeSpecificPreProcessing(Event event) { diff --git a/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java b/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java index dea1ad6a..7e1ce57b 100644 --- a/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java +++ b/src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java @@ -15,6 +15,7 @@ import java.util.*; import static java.util.Collections.singletonList; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import static org.junit.Assert.*; @@ -36,8 +37,6 @@ public static class MockEmitter implements Emitter { @Override public void add(TrackerPayload payload) { - System.out.println("MockEmitter got a payload"); - System.out.println(payload.toString()); eventList.add(payload); } @@ -76,7 +75,7 @@ public void setUp() { // --- Event Tests @Test - public void testEcommerceEvent() { + public void testEcommerceEvent() throws InterruptedException { // Given EcommerceTransactionItem item = EcommerceTransactionItem.builder() .itemId("order_id") @@ -111,6 +110,8 @@ public void testEcommerceEvent() { .build()); // Then + Thread.sleep(500); + List results = mockEmitter.eventList; assertEquals(2, results.size()); @@ -160,7 +161,7 @@ public void testEcommerceEvent() { } @Test - public void testUnstructuredEventWithContext() { + public void testUnstructuredEventWithContext() throws InterruptedException { // When tracker.track(Unstructured.builder() .eventData(new SelfDescribingJson( @@ -174,6 +175,8 @@ public void testUnstructuredEventWithContext() { .build()); // Then + Thread.sleep(500); + Map result = mockEmitter.eventList.get(0).getMap(); assertEquals(ImmutableMap.builder() .put("p", "srv") @@ -191,7 +194,7 @@ public void testUnstructuredEventWithContext() { } @Test - public void testUnstructuredEventWithoutContext() { + public void testUnstructuredEventWithoutContext() throws InterruptedException { // When tracker.track(Unstructured.builder() .eventData(new SelfDescribingJson( @@ -204,6 +207,8 @@ public void testUnstructuredEventWithoutContext() { .build()); // Then + Thread.sleep(500); + Map result = mockEmitter.eventList.get(0).getMap(); assertEquals(ImmutableMap.builder() .put("p", "srv") @@ -220,7 +225,7 @@ public void testUnstructuredEventWithoutContext() { } @Test - public void testUnstructuredEventWithoutTrueTimestamp() { + public void testUnstructuredEventWithoutTrueTimestamp() throws InterruptedException { // When tracker.track(Unstructured.builder() .eventData(new SelfDescribingJson( @@ -232,6 +237,8 @@ public void testUnstructuredEventWithoutTrueTimestamp() { .build()); // Then + Thread.sleep(500); + Map result = mockEmitter.eventList.get(0).getMap(); assertEquals(ImmutableMap.builder() .put("p", "srv") @@ -247,7 +254,7 @@ public void testUnstructuredEventWithoutTrueTimestamp() { } @Test - public void testTrackPageView() { + public void testTrackPageView() throws InterruptedException { tracker = new Tracker.TrackerBuilder(this.mockEmitter, "AF003", "cloudfront") .subject(new Subject.SubjectBuilder().build()) .base64(false) @@ -266,6 +273,8 @@ public void testTrackPageView() { .build()); // Then + Thread.sleep(500); + Map result = mockEmitter.eventList.get(0).getMap(); assertEquals(ImmutableMap.builder() .put("dtm", "123456") @@ -285,7 +294,7 @@ public void testTrackPageView() { } @Test - public void testTrackTwoEvents() { + public void testTrackTwoEvents() throws InterruptedException { // When tracker.track(PageView.builder() .pageUrl("url") @@ -296,6 +305,8 @@ public void testTrackTwoEvents() { .eventId("9783090a-dace-4c85-a75c-933b4596a6c5") .build()); + Thread.sleep(500); + tracker.track(PageView.builder() .pageUrl("url") .pageTitle("title") @@ -306,6 +317,8 @@ public void testTrackTwoEvents() { .build()); // Then + Thread.sleep(500); + List results = mockEmitter.eventList; assertEquals(2, results.size()); @@ -343,7 +356,7 @@ public void testTrackTwoEvents() { } @Test - public void testTrackScreenView() { + public void testTrackScreenView() throws InterruptedException { // When tracker.track(ScreenView.builder() .name("name") @@ -355,6 +368,8 @@ public void testTrackScreenView() { .build()); // Then + Thread.sleep(500); + Map result = mockEmitter.eventList.get(0).getMap(); assertEquals(ImmutableMap.builder() .put("dtm", "123456") @@ -372,7 +387,7 @@ public void testTrackScreenView() { } @Test - public void testTrackScreenViewWithTimestamp() { + public void testTrackScreenViewWithTimestamp() throws InterruptedException { // When tracker.track(ScreenView.builder() .name("name") @@ -383,6 +398,8 @@ public void testTrackScreenViewWithTimestamp() { .build()); // Then + Thread.sleep(500); + Map result = mockEmitter.eventList.get(0).getMap(); assertEquals(ImmutableMap.builder() .put("dtm", "123456") @@ -399,7 +416,7 @@ public void testTrackScreenViewWithTimestamp() { } @Test - public void testTrackScreenViewWithDefaultContextAndTimestamp() { + public void testTrackScreenViewWithDefaultContextAndTimestamp() throws InterruptedException { // When tracker.track(ScreenView.builder() .name("name") @@ -411,6 +428,8 @@ public void testTrackScreenViewWithDefaultContextAndTimestamp() { .build()); // Then + Thread.sleep(500); + Map result = mockEmitter.eventList.get(0).getMap(); assertEquals(ImmutableMap.builder() .put("p", "srv") @@ -428,7 +447,7 @@ public void testTrackScreenViewWithDefaultContextAndTimestamp() { } @Test - public void testTrackTiming() { + public void testTrackTiming() throws InterruptedException { // When tracker.track(Timing.builder() .category("category") @@ -442,6 +461,8 @@ public void testTrackTiming() { .build()); // Then + Thread.sleep(500); + Map result = mockEmitter.eventList.get(0).getMap(); assertEquals(ImmutableMap.builder() .put("p", "srv") @@ -459,7 +480,7 @@ public void testTrackTiming() { } @Test - public void testTrackTimingWithSubject() { + public void testTrackTimingWithSubject() throws InterruptedException { // Make Subject Subject s1 = new Subject.SubjectBuilder().build(); s1.setIpAddress("127.0.0.1"); @@ -479,6 +500,8 @@ public void testTrackTimingWithSubject() { .build()); // Then + Thread.sleep(500); + Map result = mockEmitter.eventList.get(0).getMap(); assertEquals(ImmutableMap.builder() .put("p", "srv") @@ -547,4 +570,31 @@ public void testSetNamespace() { Tracker tracker = new Tracker.TrackerBuilder(mockEmitter, "namespace", "an-app-id").build(); assertEquals("namespace", tracker.getNamespace()); } + + @Test + public void threadsHaveExpectedNames() { + // A new thread should be created for each event tracked, + // up to the configurable pool size limit + tracker.track(PageView.builder() + .pageUrl("url") + .pageTitle("title") + .referrer("referer") + .build()); + + tracker.track(PageView.builder() + .pageUrl("url") + .pageTitle("title") + .referrer("referer") + .build()); + + // Create a list of all live thread names + List threadList = new ArrayList<>(Thread.getAllStackTraces().keySet()); + List threadNames = new ArrayList<>(); + for (Thread thread : threadList) { + threadNames.add(thread.getName()); + } + + Assert.assertTrue(threadNames.contains("snowplow-tracker-pool-1-event-thread-1")); + Assert.assertTrue(threadNames.contains("snowplow-tracker-pool-1-event-thread-2")); + } } From 9fc40f3170d7f8030a6c90d8a4074eea1021828b Mon Sep 17 00:00:00 2001 From: Miranda Wilson Date: Tue, 18 Jan 2022 10:18:24 +0000 Subject: [PATCH 7/7] Add comment about event ID and dtm timestamp --- .../java/com/snowplowanalytics/snowplow/tracker/Tracker.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java index f52f38cc..67e91d28 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java @@ -279,6 +279,7 @@ private Runnable getProcessEventRunnable(Event event) { // a list because Ecommerce events become multiple Payloads List processedEvents = eventTypeSpecificPreProcessing(event); for (Event processedEvent : processedEvents) { + // Event ID (eid) and device_created_timestamp (dtm) are added during getPayload() TrackerPayload payload = (TrackerPayload) processedEvent.getPayload(); addTrackerParameters(payload);