Skip to content

Commit 0635482

Browse files
committed
Added async option for sending events
- Fixes snowplow#37 - Moved enum classes to 'emitter' package
1 parent 05294f7 commit 0635482

9 files changed

Lines changed: 93 additions & 16 deletions

File tree

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ dependencies {
3434

3535
// Apache HTTP
3636
compile 'org.apache.httpcomponents:httpclient:4.3.3'
37+
compile 'org.apache.httpcomponents:httpasyncclient:4.0.1'
3738

3839
// SLF4J logging API
3940
compile 'org.slf4j:slf4j-simple:1.7.7'

src/main/java/com/snowplowanalytics/snowplow/tracker/DevicePlatform.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
1+
/*
2+
* Copyright (c) 2014 Snowplow Analytics Ltd. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
114
package com.snowplowanalytics.snowplow.tracker;
215

316
public enum DevicePlatform {

src/main/java/com/snowplowanalytics/snowplow/tracker/Emitter.java

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,19 @@
1414
package com.snowplowanalytics.snowplow.tracker;
1515

1616
import com.fasterxml.jackson.databind.JsonNode;
17+
import com.snowplowanalytics.snowplow.tracker.emitter.BufferOption;
18+
import com.snowplowanalytics.snowplow.tracker.emitter.HttpMethod;
19+
import com.snowplowanalytics.snowplow.tracker.emitter.HttpOption;
1720

21+
import org.apache.http.HttpResponse;
1822
import org.apache.http.client.methods.HttpGet;
1923
import org.apache.http.client.methods.HttpPost;
2024
import org.apache.http.client.utils.URIBuilder;
2125
import org.apache.http.entity.StringEntity;
2226
import org.apache.http.impl.client.CloseableHttpClient;
2327
import org.apache.http.impl.client.HttpClients;
28+
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
29+
import org.apache.http.impl.nio.client.HttpAsyncClients;
2430
import org.slf4j.Logger;
2531
import org.slf4j.LoggerFactory;
2632

@@ -29,29 +35,40 @@
2935
import java.net.URISyntaxException;
3036
import java.util.ArrayList;
3137
import java.util.Iterator;
38+
import java.util.concurrent.ExecutionException;
39+
import java.util.concurrent.Future;
3240

3341
public class Emitter {
3442

3543
private URIBuilder uri;
3644
private BufferOption option = BufferOption.Default;
37-
private EmitterHttpMethod httpMethod = EmitterHttpMethod.GET;
45+
private HttpMethod httpMethod = HttpMethod.GET;
46+
private HttpOption httpOption = HttpOption.Synchronous;
47+
private CloseableHttpClient httpClient;
48+
private CloseableHttpAsyncClient httpAsyncClient;
3849
private final ArrayList<Payload> buffer = new ArrayList<Payload>();
39-
private final CloseableHttpClient httpClient = HttpClients.createDefault();
4050

4151
private final Logger logger = LoggerFactory.getLogger(Emitter.class);
4252

43-
public Emitter(String URI, EmitterHttpMethod httpMethod) {
53+
public Emitter(String URI, HttpMethod httpMethod) {
4454
uri = new URIBuilder()
4555
.setScheme("http")
4656
.setHost(URI)
4757
.setPath("/i");
4858
this.httpMethod = httpMethod;
59+
this.httpClient = HttpClients.createDefault();
4960
}
5061

5162
public void setBufferOption(BufferOption option) {
5263
this.option = option;
5364
}
5465

66+
public void setHttpOption(HttpOption option) {
67+
this.httpOption = option;
68+
this.httpAsyncClient = HttpAsyncClients.createDefault();
69+
this.httpAsyncClient.start();
70+
}
71+
5572
public boolean addToBuffer(Payload payload) {
5673
boolean ret = buffer.add(payload);
5774
if (buffer.size() == option.getCode())
@@ -60,11 +77,11 @@ public boolean addToBuffer(Payload payload) {
6077
}
6178

6279
public void flushBuffer() {
63-
if (httpMethod == EmitterHttpMethod.GET) {
80+
if (httpMethod == HttpMethod.GET) {
6481
for (Payload payload : buffer) {
6582
sendGetData(payload);
6683
}
67-
} else if (httpMethod == EmitterHttpMethod.POST) {
84+
} else if (httpMethod == HttpMethod.POST) {
6885
Payload postPayload = new TrackerPayload();
6986
postPayload.setSchema(Constants.SCHEMA_PAYLOAD_DATA);
7087
postPayload.setData(buffer);
@@ -79,14 +96,26 @@ private void sendPostData(Payload payload) {
7996

8097
try {
8198
StringEntity params = new StringEntity(payload.toString());
99+
HttpResponse httpResponse;
82100
httpPost.setEntity(params);
83-
httpClient.execute(httpPost);
101+
if (httpOption == HttpOption.Asynchronous) {
102+
Future<HttpResponse> future = httpAsyncClient.execute(httpPost, null);
103+
httpResponse = future.get();
104+
} else {
105+
httpResponse = httpClient.execute(httpPost);
106+
}
107+
logger.debug(httpResponse.getStatusLine().toString());
84108
} catch (UnsupportedEncodingException e) {
85109
logger.error("Encoding exception with the payload.");
86110
e.printStackTrace();
87111
} catch (IOException e) {
88112
logger.error("Error when sending HTTP POST.");
89113
e.printStackTrace();
114+
} catch (InterruptedException e) {
115+
logger.error("Interruption error when sending HTTP POST request.");
116+
e.printStackTrace();
117+
} catch (ExecutionException e) {
118+
e.printStackTrace();
90119
}
91120

92121
}
@@ -108,13 +137,25 @@ private void sendGetData(Payload payload) {
108137

109138
try {
110139
HttpGet httpGet = new HttpGet(requestUri.build());
111-
httpClient.execute(httpGet);
140+
HttpResponse httpResponse;
141+
if (httpOption == HttpOption.Asynchronous) {
142+
Future<HttpResponse> future = httpAsyncClient.execute(httpGet, null);
143+
httpResponse = future.get();
144+
} else {
145+
httpResponse = httpClient.execute(httpGet);
146+
}
147+
logger.debug(httpResponse.getStatusLine().toString());
112148
} catch (IOException e) {
113149
logger.error("Error when sending HTTP GET error.");
114150
e.printStackTrace();
115151
} catch (URISyntaxException e) {
116152
logger.error("Error when creating HTTP GET request. Probably parsing error..");
117153
e.printStackTrace();
154+
} catch (InterruptedException e) {
155+
logger.error("Interruption error when sending HTTP GET request.");
156+
e.printStackTrace();
157+
} catch (ExecutionException e) {
158+
e.printStackTrace();
118159
}
119160
}
120161
}

src/main/java/com/snowplowanalytics/snowplow/tracker/Util.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.slf4j.LoggerFactory;
2222

2323
import java.io.IOException;
24-
import java.io.UnsupportedEncodingException;
2524
import java.nio.charset.Charset;
2625
import java.util.Map;
2726
import java.util.Random;

src/main/java/com/snowplowanalytics/snowplow/tracker/BufferOption.java renamed to src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/BufferOption.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
1212
*/
1313

14-
package com.snowplowanalytics.snowplow.tracker;
14+
package com.snowplowanalytics.snowplow.tracker.emitter;
1515

1616
public enum BufferOption {
1717
Instant(1),

src/main/java/com/snowplowanalytics/snowplow/tracker/EmitterHttpMethod.java renamed to src/main/java/com/snowplowanalytics/snowplow/tracker/emitter/HttpMethod.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
1212
*/
1313

14-
package com.snowplowanalytics.snowplow.tracker;
14+
package com.snowplowanalytics.snowplow.tracker.emitter;
1515

16-
public enum EmitterHttpMethod {
16+
public enum HttpMethod {
1717
GET,
1818
POST
1919
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright (c) 2014 Snowplow Analytics Ltd. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
14+
package com.snowplowanalytics.snowplow.tracker.emitter;
15+
16+
public enum HttpOption {
17+
Synchronous,
18+
Asynchronous
19+
}

src/test/java/com/snowplowanalytics/snowplow/tracker/EmitterTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.snowplowanalytics.snowplow.tracker;
22

3+
import com.snowplowanalytics.snowplow.tracker.emitter.HttpMethod;
4+
35
import junit.framework.TestCase;
46

57
import org.junit.Test;
@@ -11,12 +13,12 @@ public class EmitterTest extends TestCase {
1113

1214
@Test
1315
public void testEmitterConstructor() throws Exception {
14-
Emitter emitter = new Emitter("segfault.ngrok.com", EmitterHttpMethod.POST);
16+
Emitter emitter = new Emitter("segfault.ngrok.com", HttpMethod.POST);
1517
}
1618

1719
@Test
1820
public void testFlushGet() throws Exception {
19-
Emitter emitter = new Emitter("segfault.ngrok.com", EmitterHttpMethod.GET);
21+
Emitter emitter = new Emitter("segfault.ngrok.com", HttpMethod.GET);
2022

2123
Payload payload;
2224
String res;

src/test/java/com/snowplowanalytics/snowplow/tracker/TrackerTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
package com.snowplowanalytics.snowplow.tracker;
22

3+
import com.snowplowanalytics.snowplow.tracker.emitter.HttpMethod;
4+
import com.snowplowanalytics.snowplow.tracker.emitter.HttpOption;
5+
36
import junit.framework.TestCase;
47

58
import org.junit.Test;
69

710
import java.util.HashMap;
811
import java.util.Map;
912

10-
import static org.junit.Assert.*;
11-
1213
public class TrackerTest extends TestCase {
1314

1415
@Test
@@ -33,8 +34,9 @@ public void testTrackPageView2() throws Exception {
3334

3435
@Test
3536
public void testTrackPageView3() throws Exception {
36-
Emitter emitter = new Emitter("segfault.ngrok.com", EmitterHttpMethod.GET);
37+
Emitter emitter = new Emitter("segfault.ngrok.com", HttpMethod.GET);
3738
Tracker tracker = new Tracker(emitter, "AF003", true, "cloudfront");
39+
emitter.setHttpOption(HttpOption.Asynchronous);
3840

3941
Map<String, String> context = new HashMap<String, String>();
4042
context.put("some key", "some value");

0 commit comments

Comments
 (0)