Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ wrapper.gradleVersion = '6.5.0'

group = 'com.snowplowanalytics'
archivesBaseName = 'snowplow-java-tracker'
version = '0.12.0-alpha.0'
version = '0.12.0-alpha.1'
sourceCompatibility = '1.8'
targetCompatibility = '1.8'

Expand Down Expand Up @@ -80,7 +80,8 @@ dependencies {

// Testing libraries
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
testCompileOnly 'junit:junit:4.13'
testRuntimeOnly 'org.junit.vintage:junit-vintage-engine'

testImplementation 'org.hamcrest:hamcrest:2.2'
testImplementation 'com.squareup.okhttp3:mockwebserver:4.9.2'
Expand Down
1 change: 0 additions & 1 deletion examples/benchmarking/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,3 @@ repositories {
dependencies {
jmh 'com.snowplowanalytics:snowplow-java-tracker:0.10.1'
}

Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ public static Tracker getTracker(Emitter emitter) {
}

public static void closeThreads(Tracker tracker) {
// Use this line for versions 0.12.0 onwards
// tracker.close();
// Use these lines for previous versions
BatchEmitter emitter = (BatchEmitter) tracker.getEmitter();
emitter.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import com.snowplowanalytics.snowplow.tracker.Tracker;
import com.snowplowanalytics.snowplow.tracker.emitter.BatchEmitter;
import com.snowplowanalytics.snowplow.tracker.events.*;
import com.snowplowanalytics.snowplow.tracker.http.HttpClientAdapter;
import com.snowplowanalytics.snowplow.tracker.payload.SelfDescribingJson;

import java.util.List;
import static java.util.Collections.singletonList;

import com.google.common.collect.ImmutableMap;
import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload;

public class Main {

Expand All @@ -34,7 +36,7 @@ public static String getUrlFromArgs(String[] args) {
return args[0];
}

public static void main(String[] args) {
public static void main(String[] args) throws InterruptedException {
String collectorEndpoint = getUrlFromArgs(args);

// the application id to attach to events
Expand Down Expand Up @@ -155,6 +157,7 @@ public static void main(String[] args) {

// Will close all threads and force send remaining events
emitter.close();
Thread.sleep(5000);

System.out.println("Tracked 7 events");
}
Expand Down
128 changes: 11 additions & 117 deletions src/main/java/com/snowplowanalytics/snowplow/tracker/Tracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import com.snowplowanalytics.snowplow.tracker.constants.Constants;
import com.snowplowanalytics.snowplow.tracker.constants.Parameter;
import com.snowplowanalytics.snowplow.tracker.emitter.BatchEmitter;
import com.snowplowanalytics.snowplow.tracker.emitter.Emitter;
import com.snowplowanalytics.snowplow.tracker.events.*;
import com.snowplowanalytics.snowplow.tracker.payload.SelfDescribingJson;
Expand All @@ -26,18 +25,12 @@
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Tracker {

private Emitter emitter;
private Subject subject;
private final TrackerParameters parameters;
protected ExecutorService executor;
private static final Logger LOGGER = LoggerFactory.getLogger(Tracker.class);

/**
Expand All @@ -58,11 +51,6 @@ private Tracker(TrackerBuilder builder) {
this.emitter = builder.emitter;
this.subject = builder.subject;

if (builder.requestExecutorService != null) {
this.executor = builder.requestExecutorService;
} else {
this.executor = Executors.newScheduledThreadPool(builder.threadCount, new TrackerThreadFactory());
}
}

/**
Expand All @@ -76,8 +64,6 @@ public static class TrackerBuilder {
private Subject subject = null; // Optional
private DevicePlatform platform = DevicePlatform.ServerSideApp; // Optional
private boolean base64Encoded = true; // Optional
private int threadCount = 50; // Optional
private ExecutorService requestExecutorService = null; // Optional

/**
* @param emitter Emitter to which events will be sent
Expand Down Expand Up @@ -117,30 +103,6 @@ public TrackerBuilder base64(Boolean base64) {
return this;
}

/**
* Sets the Thread Count for the ExecutorService
*
* @param threadCount the size of the thread pool
* @return itself
*/
public TrackerBuilder threadCount(final int threadCount) {
this.threadCount = threadCount;
return this;
}

/**
* Set a custom ExecutorService to send http request.
*
* @param executorService the ExecutorService to use
* @return itself
*/
public TrackerBuilder requestExecutorService(final ExecutorService executorService) {
this.requestExecutorService = executorService;
return this;
}



/**
* Creates a new Tracker
*
Expand Down Expand Up @@ -230,69 +192,24 @@ public TrackerParameters getParameters() {

// --- Event Tracking Functions

/**
* Sends a runnable to the executor service.
*
* @param runnable the runnable to be queued
*/
protected void execute(final Runnable runnable) {
this.executor.execute(runnable);
}

/**
* Copied from `Executors.defaultThreadFactory()`.
* The only change is the generated name prefix.
*/
static class TrackerThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

TrackerThreadFactory() {
SecurityManager securityManager = System.getSecurityManager();
this.group = securityManager != null ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.namePrefix = "snowplow-tracker-pool-" + poolNumber.getAndIncrement() + "-event-thread-";
}

public Thread newThread(Runnable runnable) {
Thread thread = new Thread(this.group, runnable, this.namePrefix + this.threadNumber.getAndIncrement(), 0L);
if (thread.isDaemon()) {
thread.setDaemon(false);
}

if (thread.getPriority() != 5) {
thread.setPriority(5);
}

return thread;
}
}

/**
* Handles tracking the different types of events that
* the Tracker can encounter.
*
* @param event the event to track
*/
public void track(Event event) {
execute(getProcessEventRunnable(event));
}

private Runnable getProcessEventRunnable(Event event) {
return () -> {
// a list because Ecommerce events become multiple Payloads
List<Event> processedEvents = eventTypeSpecificPreProcessing(event);
for (Event processedEvent : processedEvents) {
// Event ID (eid) and device_created_timestamp (dtm) are added during getPayload()
TrackerPayload payload = (TrackerPayload) processedEvent.getPayload();

addTrackerParameters(payload);
addContext(processedEvent, payload);
addSubject(processedEvent, payload);
this.emitter.add(payload);
}
};
// a list because Ecommerce events become multiple Payloads
List<Event> processedEvents = eventTypeSpecificPreProcessing(event);
for (Event processedEvent : processedEvents) {
// Event ID (eid) and device_created_timestamp (dtm) are generated when the Event is initialised
TrackerPayload payload = (TrackerPayload) processedEvent.getPayload();

addTrackerParameters(payload);
addContext(processedEvent, payload);
addSubject(processedEvent, payload);
this.emitter.add(payload);
}
}

private List<Event> eventTypeSpecificPreProcessing(Event event) {
Expand Down Expand Up @@ -381,27 +298,4 @@ private void addSubject(Event event, TrackerPayload payload) {
}
}

public void close() {
// Shutdown executor thread pool for the tracker
if (executor != null) {
executor.shutdown();
try {
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
executor.shutdownNow();
if (!executor.awaitTermination(1, TimeUnit.SECONDS))
LOGGER.warn("Tracker executor did not terminate");
}
} catch (InterruptedException ie) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}

// Shutdown executor thread pool for the emitter
if (this.emitter.getClass().equals(BatchEmitter.class)) {
BatchEmitter emitter = (BatchEmitter) this.emitter;
emitter.close();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
package com.snowplowanalytics.snowplow.tracker.emitter;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -33,24 +33,26 @@
public abstract class AbstractEmitter implements Emitter {

protected HttpClientAdapter httpClientAdapter;
protected ExecutorService executor;
protected ScheduledExecutorService executor;

public static abstract class Builder<T extends Builder<T>> {

private HttpClientAdapter httpClientAdapter; // Optional
private int threadCount = 50; // Optional
private ExecutorService requestExecutorService = null; // Optional
private ScheduledExecutorService requestExecutorService = null; // Optional
private String collectorUrl = null; // Required if not specifying a httpClientAdapter
protected abstract T self();

/**
* Set a custom ExecutorService to send http request.
* Set a custom ScheduledExecutorService to send http request.
* <p>
* <b>Implementation note: </b><em>Be aware that calling `close()` on a BatchEmitter instance
* has a side-effect and will shutdown that ExecutorService.</em>
*
* /!\ Be aware that calling `close()` on a BatchEmitter instance has a side-effect and will shutdown that ExecutorService.
* @param executorService the ExecutorService to use
* @param executorService the ScheduledExecutorService to use
* @return itself
*/
public T requestExecutorService(final ExecutorService executorService) {
public T requestExecutorService(final ScheduledExecutorService executorService) {
this.requestExecutorService = executorService;
return self();
}
Expand Down Expand Up @@ -164,15 +166,6 @@ protected AbstractEmitter(final Builder<?> builder) {
@Override
public abstract List<TrackerPayload> getBuffer();

/**
* Sends a runnable to the executor service.
*
* @param runnable the runnable to be queued
*/
protected void execute(final Runnable runnable) {
this.executor.execute(runnable);
}

/**
* Checks whether the response code was a success or not.
*
Expand Down
Loading