From cc820b7a86c59e363dbd24c93eb3aceb8e23ea60 Mon Sep 17 00:00:00 2001 From: GabrielXia Date: Thu, 6 Jul 2017 23:59:51 +0800 Subject: [PATCH 1/5] Emitter close issue --- .gitignore | 2 ++ .../snowplow/tracker/emitter/BatchEmitter.java | 14 ++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/.gitignore b/.gitignore index 8be46618..b631074f 100644 --- a/.gitignore +++ b/.gitignore @@ -57,3 +57,5 @@ Version.java # Vagrant .vagrant +# OS file +.DS_Store \ No newline at end of file diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java index 3a069be6..6c3e4626 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java @@ -17,6 +17,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; // Google import com.google.common.base.Preconditions; @@ -38,6 +39,8 @@ public class BatchEmitter extends AbstractEmitter implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(BatchEmitter.class); + private long closeTimeout = 5; + public static abstract class Builder> extends AbstractEmitter.Builder { private int bufferSize = 50; // Optional @@ -163,5 +166,16 @@ private SelfDescribingJson getFinalPost(List buffer) { @Override public void close() { flushBuffer(); + if (executor != null) { + executor.shutdown(); + try { + boolean e = executor.awaitTermination(closeTimeout, TimeUnit.SECONDS); + if (e == false) { + LOGGER.debug("Executor service shut down after time out"); + } + } catch (InterruptedException e) { + } + executor = null; + } } } From af37c3e503f364f4807d8bc8239002b4e5a93e3c Mon Sep 17 00:00:00 2001 From: GabrielXia Date: Fri, 7 Jul 2017 00:01:03 +0800 Subject: [PATCH 2/5] new line --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index b631074f..4cb98656 100644 --- a/.gitignore +++ b/.gitignore @@ -58,4 +58,4 @@ Version.java .vagrant # OS file -.DS_Store \ No newline at end of file +.DS_Store From ede62182012e7c947fc9acdf3a88ee1ec25a1c79 Mon Sep 17 00:00:00 2001 From: GabrielXia Date: Fri, 7 Jul 2017 12:08:52 +0800 Subject: [PATCH 3/5] wait for shut down --- .gitignore | 3 --- .../snowplow/tracker/emitter/BatchEmitter.java | 7 ++++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index 4cb98656..aca547ca 100644 --- a/.gitignore +++ b/.gitignore @@ -56,6 +56,3 @@ Version.java # Vagrant .vagrant - -# OS file -.DS_Store diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java index 6c3e4626..fba25e36 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java @@ -169,11 +169,12 @@ public void close() { if (executor != null) { executor.shutdown(); try { - boolean e = executor.awaitTermination(closeTimeout, TimeUnit.SECONDS); - if (e == false) { - LOGGER.debug("Executor service shut down after time out"); + while (!executor.awaitTermination(closeTimeout, TimeUnit.SECONDS)) { + LOGGER.debug("Executor doesn't shut down yet"); } + LOGGER.debug("Executor service shut down"); } catch (InterruptedException e) { + LOGGER.error("Encounter an interrupted exception", e); } executor = null; } From 10a3b4d4bf9a2d2c3bc8c0cf95a9f1a78ae7f346 Mon Sep 17 00:00:00 2001 From: GabrielXia Date: Fri, 7 Jul 2017 12:15:22 +0800 Subject: [PATCH 4/5] new line --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index aca547ca..8be46618 100644 --- a/.gitignore +++ b/.gitignore @@ -56,3 +56,4 @@ Version.java # Vagrant .vagrant + From 28a352939c6f5a8facc43ec556a66a9aabcaba32 Mon Sep 17 00:00:00 2001 From: GabrielXia Date: Fri, 7 Jul 2017 17:14:47 +0800 Subject: [PATCH 5/5] end thread in case of failure --- .../snowplow/tracker/emitter/BatchEmitter.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java index fba25e36..be738b59 100644 --- a/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java +++ b/src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BatchEmitter.java @@ -169,14 +169,15 @@ public void close() { if (executor != null) { executor.shutdown(); try { - while (!executor.awaitTermination(closeTimeout, TimeUnit.SECONDS)) { - LOGGER.debug("Executor doesn't shut down yet"); + if (!executor.awaitTermination(closeTimeout, TimeUnit.SECONDS)) { + executor.shutdownNow(); + if (!executor.awaitTermination(closeTimeout, TimeUnit.SECONDS)) + LOGGER.warn("Executor did not terminate"); } - LOGGER.debug("Executor service shut down"); - } catch (InterruptedException e) { - LOGGER.error("Encounter an interrupted exception", e); + } catch (InterruptedException ie) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); } - executor = null; } } }