forked from snowplow/snowplow-java-tracker
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAbstractEmitter.java
More file actions
173 lines (151 loc) · 5.17 KB
/
AbstractEmitter.java
File metadata and controls
173 lines (151 loc) · 5.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
/*
* Copyright (c) 2015 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.tracker.emitter;
// Java
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
// Google
import com.google.common.base.Preconditions;
// This library
import com.snowplowanalytics.snowplow.tracker.http.HttpClientAdapter;
import com.snowplowanalytics.snowplow.tracker.payload.TrackerPayload;
/**
* AbstractEmitter class which contains common elements to
* the emitters wrapped in a builder format.
*/
public abstract class AbstractEmitter implements Emitter {
protected HttpClientAdapter httpClientAdapter;
protected RequestCallback requestCallback;
protected ExecutorService executor;
protected List<TrackerPayload> buffer = new ArrayList<>();
protected int bufferSize = 1;
public static abstract class Builder<T extends Builder<T>> {
private HttpClientAdapter httpClientAdapter; // Required
private RequestCallback requestCallback = null; // Optional
private int threadCount = 50; // Optional
protected abstract T self();
/**
* Adds the HttpClientAdapter to the AbstractEmitter
*
* @param httpClientAdapter the adapter to use
* @return itself
*/
public T httpClientAdapter(HttpClientAdapter httpClientAdapter) {
this.httpClientAdapter = httpClientAdapter;
return self();
}
/**
* An optional Request Callback for adding the ability to
* handle failure cases for sending.
*
* @param requestCallback the emitter request callback
* @return itself
*/
public T requestCallback(RequestCallback requestCallback) {
this.requestCallback = requestCallback;
return self();
}
/**
* Sets the Thread Count for the ExecutorService
*
* @param threadCount the size of the thread pool
* @return itself
*/
public T threadCount(int threadCount) {
this.threadCount = threadCount;
return self();
}
}
private static class Builder2 extends Builder<Builder2> {
@Override
protected Builder2 self() {
return this;
}
}
public static Builder<?> builder() {
return new Builder2();
}
protected AbstractEmitter(Builder<?> builder) {
// Precondition checks
Preconditions.checkNotNull(builder.httpClientAdapter);
Preconditions.checkArgument(builder.threadCount > 0, "threadCount must be greater than 0");
this.httpClientAdapter = builder.httpClientAdapter;
this.requestCallback = builder.requestCallback;
this.executor = Executors.newScheduledThreadPool(builder.threadCount);
}
/**
* Adds a payload to the buffer and checks whether
* we have reached the buffer limit yet.
*
* @param payload an event payload
*/
@Override
public abstract void emit(TrackerPayload payload);
/**
* Customize the emitter buffer size to any valid integer
* greater than zero.
* - Will only effect the BatchEmitter
*
* @param bufferSize number of events to collect before
* sending
*/
@Override
public void setBufferSize(int bufferSize) {
Preconditions.checkArgument(bufferSize > 0, "bufferSize must be greater than 0");
this.bufferSize = bufferSize;
}
/**
* When the buffer limit is reached sending of the buffer is
* initiated.
*/
@Override
public abstract void flushBuffer();
/**
* Gets the Emitter Buffer Size
* - Will always be 1 for SimpleEmitter
*
* @return the buffer size
*/
@Override
public int getBufferSize() {
return this.bufferSize;
}
/**
* Returns the List of Payloads that are in the buffer.
*
* @return the buffer payloads
*/
@Override
public List<TrackerPayload> getBuffer() {
return this.buffer;
}
/**
* Sends a runnable to the executor service.
*
* @param runnable the runnable to be queued
*/
protected void execute(Runnable runnable) {
this.executor.execute(runnable);
}
/**
* Checks whether the response code was a success or not.
*
* @param code the response code
* @return whether it is in the success range
*/
protected boolean isSuccessfulSend(int code) {
return code >= 200 && code < 300;
}
}