From 92926232dda2c72510318a34b3f3d89ab4fb0c1a Mon Sep 17 00:00:00 2001 From: Paul Laturaze Date: Mon, 29 Jun 2020 21:42:27 +0200 Subject: [PATCH] Set Emitter's threads name for easier debugging Add a ThreadFactory to the Emitter executor service to set a custom name for threads that are created when calling execute(). Set a name for the buffer consumer thread in the BatchEmitter. Naming these threads can be useful when investigating a thread dump, or for monitoring purpose. --- .../tracker/emitter/AbstractEmitter.java | 35 +++++++++++++++++-- .../tracker/emitter/BatchEmitter.java | 8 ++++- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java index e200c140..0e14e732 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/AbstractEmitter.java @@ -15,7 +15,8 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Preconditions; @@ -119,7 +120,7 @@ protected AbstractEmitter(final Builder builder) { } this.requestCallback = builder.requestCallback; - this.executor = Executors.newScheduledThreadPool(builder.threadCount); + this.executor = Executors.newScheduledThreadPool(builder.threadCount, new EmitterThreadFactory()); } /** @@ -179,4 +180,34 @@ protected void execute(final Runnable runnable) { protected boolean isSuccessfulSend(final int code) { return code >= 200 && code < 300; } + + /** + * Copied from `Executors.defaultThreadFactory()`. + * The only change is the generated name prefix. + */ + static class EmitterThreadFactory implements ThreadFactory { + private static final AtomicInteger poolNumber = new AtomicInteger(1); + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + EmitterThreadFactory() { + SecurityManager securityManager = System.getSecurityManager(); + this.group = securityManager != null ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup(); + this.namePrefix = "snowplow-emitter-pool-" + poolNumber.getAndIncrement() + "-request-thread-"; + } + + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(this.group, runnable, this.namePrefix + this.threadNumber.getAndIncrement(), 0L); + if (thread.isDaemon()) { + thread.setDaemon(false); + } + + if (thread.getPriority() != 5) { + thread.setPriority(5); + } + + return thread; + } + } } diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java index f7e9baa2..ca3de117 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java @@ -20,6 +20,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Preconditions; import com.snowplowanalytics.snowplow.tracker.constants.Constants; @@ -38,6 +39,8 @@ public class BatchEmitter extends AbstractEmitter implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(BatchEmitter.class); + private static final AtomicInteger BUFFER_CONSUMER_THREAD_NUMBER = new AtomicInteger(1); + private static final String BUFFER_CONSUMER_THREAD_NAME_PREFIX = "snowplow-emitter-BufferConsumer-thread-"; private final Thread bufferConsumer; private boolean isClosing = false; @@ -89,7 +92,10 @@ protected BatchEmitter(final Builder builder) { this.bufferSize = builder.bufferSize; - bufferConsumer = new Thread(getBufferConsumerRunnable()); + bufferConsumer = new Thread( + getBufferConsumerRunnable(), + BUFFER_CONSUMER_THREAD_NAME_PREFIX + BUFFER_CONSUMER_THREAD_NUMBER.getAndIncrement() + ); bufferConsumer.start(); }