Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.snowplowanalytics.snowplow.tracker.Tracker;
import com.snowplowanalytics.snowplow.tracker.emitter.BatchEmitter;
import com.snowplowanalytics.snowplow.tracker.emitter.Emitter;
import com.snowplowanalytics.snowplow.tracker.emitter.RequestCallback;
import com.snowplowanalytics.snowplow.tracker.events.*;
import com.snowplowanalytics.snowplow.tracker.payload.SelfDescribingJson;
import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload;
Expand Down Expand Up @@ -53,19 +52,6 @@ public static void main(String[] args) {
// build an emitter, this is used by the tracker to batch and schedule transmission of events
BatchEmitter emitter = BatchEmitter.builder()
.url(collectorEndpoint)
.requestCallback(new RequestCallback() {
// let us know on successes (may be called multiple times)
@Override
public synchronized void onSuccess(int successCount) {
System.out.println("Successfully sent " + successCount + " events");
}

// let us know if something has gone wrong (may be called multiple times)
@Override
public synchronized void onFailure(int successCount, List<Event> failedEvents) {
System.err.println("Successfully sent " + successCount + " events; failed to send " + failedEvents.size() + " events");
}
})
.bufferSize(4) // send an event every time one is given (no batching). In production this number should be higher, depending on the size/event volume
.build();

Expand Down
188 changes: 185 additions & 3 deletions src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,26 @@

import com.google.common.base.Preconditions;

import com.snowplowanalytics.snowplow.tracker.constants.Constants;
import com.snowplowanalytics.snowplow.tracker.constants.Parameter;
import com.snowplowanalytics.snowplow.tracker.emitter.Emitter;
import com.snowplowanalytics.snowplow.tracker.events.*;
import com.snowplowanalytics.snowplow.tracker.payload.TrackerEvent;
import com.snowplowanalytics.snowplow.tracker.payload.SelfDescribingJson;
import com.snowplowanalytics.snowplow.tracker.payload.TrackerParameters;
import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload;

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class Tracker {

private Emitter emitter;
private Subject subject;
private final TrackerParameters parameters;
protected ExecutorService executor;

/**
* Creates a new Snowplow Tracker.
Expand All @@ -42,6 +52,12 @@ private Tracker(TrackerBuilder builder) {
this.parameters = new TrackerParameters(builder.appId, builder.platform, builder.namespace, Version.TRACKER, builder.base64Encoded);
this.emitter = builder.emitter;
this.subject = builder.subject;

if (builder.requestExecutorService != null) {
this.executor = builder.requestExecutorService;
} else {
this.executor = Executors.newScheduledThreadPool(builder.threadCount, new TrackerThreadFactory());
}
}

/**
Expand All @@ -55,6 +71,8 @@ public static class TrackerBuilder {
private Subject subject = null; // Optional
private DevicePlatform platform = DevicePlatform.ServerSideApp; // Optional
private boolean base64Encoded = true; // Optional
private int threadCount = 50; // Optional
private ExecutorService requestExecutorService = null; // Optional

/**
* @param emitter Emitter to which events will be sent
Expand Down Expand Up @@ -94,6 +112,30 @@ public TrackerBuilder base64(Boolean base64) {
return this;
}

/**
* Sets the Thread Count for the ExecutorService
*
* @param threadCount the size of the thread pool
* @return itself
*/
public TrackerBuilder threadCount(final int threadCount) {
this.threadCount = threadCount;
return this;
}

/**
* Set a custom ExecutorService to send http request.
*
* @param executorService the ExecutorService to use
* @return itself
*/
public TrackerBuilder requestExecutorService(final ExecutorService executorService) {
this.requestExecutorService = executorService;
return this;
}



/**
* Creates a new Tracker
*
Expand Down Expand Up @@ -183,14 +225,154 @@ public TrackerParameters getParameters() {

// --- Event Tracking Functions

/**
* Sends a runnable to the executor service.
*
* @param runnable the runnable to be queued
*/
protected void execute(final Runnable runnable) {
this.executor.execute(runnable);
}

/**
* Copied from `Executors.defaultThreadFactory()`.
* The only change is the generated name prefix.
*/
static class TrackerThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

TrackerThreadFactory() {
SecurityManager securityManager = System.getSecurityManager();
this.group = securityManager != null ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.namePrefix = "snowplow-tracker-pool-" + poolNumber.getAndIncrement() + "-event-thread-";
}

public Thread newThread(Runnable runnable) {
Thread thread = new Thread(this.group, runnable, this.namePrefix + this.threadNumber.getAndIncrement(), 0L);
if (thread.isDaemon()) {
thread.setDaemon(false);
}

if (thread.getPriority() != 5) {
thread.setPriority(5);
}

return thread;
}
}

/**
* Handles tracking the different types of events that
* the Tracker can encounter.
*
* @param event the event to track
*/
public void track(Event event) {
// Emit the event
this.emitter.emit(new TrackerEvent(event, this.parameters, this.subject));
execute(getProcessEventRunnable(event));
}

private Runnable getProcessEventRunnable(Event event) {
return () -> {
// a list because Ecommerce events become multiple Payloads
List<Event> processedEvents = eventTypeSpecificPreProcessing(event);
for (Event processedEvent : processedEvents) {
// Event ID (eid) and device_created_timestamp (dtm) are added during getPayload()
TrackerPayload payload = (TrackerPayload) processedEvent.getPayload();

addTrackerParameters(payload);
addContext(processedEvent, payload);
addSubject(processedEvent, payload);
this.emitter.add(payload);
}
};
}

private List<Event> eventTypeSpecificPreProcessing(Event event) {
// Different event types must be processed in slightly different ways.
// EcommerceTransaction events are an outlier, as they are processed into
// multiple payloads (a "tr" event plus one "ti" event per item).
// Because of this, this method returns a list of Events.
List<Event> eventList = new ArrayList<>();
final Class<?> eventClass = event.getClass();

if (eventClass.equals(Unstructured.class)) {
// Need to set the Base64 rule for Unstructured events
final Unstructured unstructured = (Unstructured) event;
unstructured.setBase64Encode(this.parameters.getBase64Encoded());
eventList.add(unstructured);

} else if (eventClass.equals(EcommerceTransaction.class)) {
final EcommerceTransaction ecommerceTransaction = (EcommerceTransaction) event;
eventList.add(ecommerceTransaction);

// Track each item individually
for (final EcommerceTransactionItem item : ecommerceTransaction.getItems()) {
item.setDeviceCreatedTimestamp(ecommerceTransaction.getDeviceCreatedTimestamp());
eventList.add(item);
}
} else if (eventClass.equals(Timing.class) || eventClass.equals(ScreenView.class)) {
// Timing and ScreenView events are wrapper classes for Unstructured events
// Need to create Unstructured events from them to send.
final Unstructured unstructured = Unstructured.builder()
.eventData((SelfDescribingJson) event.getPayload())
.customContext(event.getContext())
.deviceCreatedTimestamp(event.getDeviceCreatedTimestamp())
.trueTimestamp(event.getTrueTimestamp())
.eventId(event.getEventId())
.subject(event.getSubject())
.build();

unstructured.setBase64Encode(this.parameters.getBase64Encoded());
eventList.add(unstructured);

} else {
eventList.add(event);
}
return eventList;
}

private void addTrackerParameters(TrackerPayload payload) {
payload.add(Parameter.PLATFORM, this.parameters.getPlatform().toString());
payload.add(Parameter.APP_ID, this.parameters.getAppId());
payload.add(Parameter.NAMESPACE, this.parameters.getNamespace());
payload.add(Parameter.TRACKER_VERSION, this.parameters.getTrackerVersion());
}

private void addContext(Event event, TrackerPayload payload) {
List<SelfDescribingJson> entities = event.getContext();

// Build the final context and add it to the payload
if (entities != null && entities.size() > 0) {
SelfDescribingJson envelope = getFinalContext(entities);
payload.addMap(envelope.getMap(), this.parameters.getBase64Encoded(), Parameter.CONTEXT_ENCODED, Parameter.CONTEXT);
}
}

/**
* Builds the final event context.
*
* @param entities the base event context
* @return the final event context json with many entities inside
*/
Comment on lines +354 to +359
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opinion sharing:
I don't see much value in overzealous javadoc on private methods (I know you just copied/pasted this code).
I feel that in private methods we can remove these when the method signature is already self-explanatory, or just leaving a simple comment like "Builds the final event context." without the other rows.
Not something to change, btw.

private SelfDescribingJson getFinalContext(List<SelfDescribingJson> entities) {
List<Map<String, Object>> entityMaps = new LinkedList<>();
for (SelfDescribingJson selfDescribingJson : entities) {
entityMaps.add(selfDescribingJson.getMap());
}
return new SelfDescribingJson(Constants.SCHEMA_CONTEXTS, entityMaps);
}

private void addSubject(Event event, TrackerPayload payload) {
Subject eventSubject = event.getSubject();

// Add subject if available
if (eventSubject != null) {
payload.addMap(new HashMap<>(eventSubject.getSubject()));
} else if (this.subject != null) {
payload.addMap(new HashMap<>(this.subject.getSubject()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

import com.snowplowanalytics.snowplow.tracker.http.HttpClientAdapter;
import com.snowplowanalytics.snowplow.tracker.http.OkHttpClientAdapter;
import com.snowplowanalytics.snowplow.tracker.payload.TrackerEvent;

import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload;
import okhttp3.OkHttpClient;

/**
Expand All @@ -33,13 +33,11 @@
public abstract class AbstractEmitter implements Emitter {

protected HttpClientAdapter httpClientAdapter;
protected RequestCallback requestCallback;
protected ExecutorService executor;

public static abstract class Builder<T extends Builder<T>> {

private HttpClientAdapter httpClientAdapter; // Optional
private RequestCallback requestCallback = null; // Optional
private int threadCount = 50; // Optional
private ExecutorService requestExecutorService = null; // Optional
private String collectorUrl = null; // Required if not specifying a httpClientAdapter
Expand Down Expand Up @@ -68,18 +66,6 @@ public T httpClientAdapter(final HttpClientAdapter httpClientAdapter) {
return self();
}

/**
* An optional Request Callback for adding the ability to handle failure cases
* for sending.
*
* @param requestCallback the emitter request callback
* @return itself
*/
public T requestCallback(final RequestCallback requestCallback) {
this.requestCallback = requestCallback;
return self();
}

/**
* Sets the Thread Count for the ExecutorService
*
Expand Down Expand Up @@ -132,8 +118,6 @@ protected AbstractEmitter(final Builder<?> builder) {
.build();
}

this.requestCallback = builder.requestCallback;

if (builder.requestExecutorService != null) {
this.executor = builder.requestExecutorService;
} else {
Expand All @@ -142,12 +126,12 @@ protected AbstractEmitter(final Builder<?> builder) {
}

/**
* Adds an event to the buffer
* Adds a payload to the buffer
*
* @param event an event
* @param payload an payload
*/
@Override
public abstract void emit(TrackerEvent event);
public abstract void add(TrackerPayload payload);

/**
* Customize the emitter buffer size to any valid integer greater than zero.
Expand All @@ -159,7 +143,7 @@ protected AbstractEmitter(final Builder<?> builder) {
public abstract void setBufferSize(final int bufferSize);

/**
* Removes all events from the buffer and sends them
* Removes all payloads from the buffer and sends them
*/
@Override
public abstract void flushBuffer();
Expand All @@ -173,12 +157,12 @@ protected AbstractEmitter(final Builder<?> builder) {
public abstract int getBufferSize();

/**
* Returns List of Events that are in the buffer.
* Returns List of Payloads that are in the buffer.
*
* @return the buffered events
*/
@Override
public abstract List<TrackerEvent> getBuffer();
public abstract List<TrackerPayload> getBuffer();

/**
* Sends a runnable to the executor service.
Expand Down
Loading