diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java index e8502fa8..900027b1 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java @@ -15,6 +15,8 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Preconditions; @@ -131,10 +133,11 @@ protected AbstractEmitter(final Builder builder) { } this.requestCallback = builder.requestCallback; + if (builder.requestExecutorService != null) { this.executor = builder.requestExecutorService; } else { - this.executor = Executors.newScheduledThreadPool(builder.threadCount); + this.executor = Executors.newScheduledThreadPool(builder.threadCount, new EmitterThreadFactory()); } } @@ -195,4 +198,34 @@ protected void execute(final Runnable runnable) { protected boolean isSuccessfulSend(final int code) { return code >= 200 && code < 300; } + + /** + * Copied from `Executors.defaultThreadFactory()`. + * The only change is the generated name prefix. + */ + static class EmitterThreadFactory implements ThreadFactory { + private static final AtomicInteger poolNumber = new AtomicInteger(1); + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + EmitterThreadFactory() { + SecurityManager securityManager = System.getSecurityManager(); + this.group = securityManager != null ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup(); + this.namePrefix = "snowplow-emitter-pool-" + poolNumber.getAndIncrement() + "-request-thread-"; + } + + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(this.group, runnable, this.namePrefix + this.threadNumber.getAndIncrement(), 0L); + if (thread.isDaemon()) { + thread.setDaemon(false); + } + + if (thread.getPriority() != 5) { + thread.setPriority(5); + } + + return thread; + } + } } diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java index f7e9baa2..ca3de117 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java @@ -20,6 +20,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Preconditions; import com.snowplowanalytics.snowplow.tracker.constants.Constants; @@ -38,6 +39,8 @@ public class BatchEmitter extends AbstractEmitter implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(BatchEmitter.class); + private static final AtomicInteger BUFFER_CONSUMER_THREAD_NUMBER = new AtomicInteger(1); + private static final String BUFFER_CONSUMER_THREAD_NAME_PREFIX = "snowplow-emitter-BufferConsumer-thread-"; private final Thread bufferConsumer; private boolean isClosing = false; @@ -89,7 +92,10 @@ protected BatchEmitter(final Builder builder) { this.bufferSize = builder.bufferSize; - bufferConsumer = new Thread(getBufferConsumerRunnable()); + bufferConsumer = new Thread( + getBufferConsumerRunnable(), + BUFFER_CONSUMER_THREAD_NAME_PREFIX + BUFFER_CONSUMER_THREAD_NUMBER.getAndIncrement() + ); bufferConsumer.start(); }