forked from snowplow/snowplow-cpp-tracker
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstorage.cpp
More file actions
350 lines (278 loc) · 9.98 KB
/
storage.cpp
File metadata and controls
350 lines (278 loc) · 9.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
/*
Copyright (c) 2022 Snowplow Analytics Ltd. All rights reserved.
This program is licensed to you under the Apache License Version 2.0,
and you may not use this file except in compliance with the Apache License Version 2.0.
You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
Unless required by applicable law or agreed to in writing,
software distributed under the Apache License Version 2.0 is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
#include "storage.hpp"
using namespace snowplow;
using std::cerr;
using std::endl;
using std::lock_guard;
using std::mutex;
using std::runtime_error;
using std::string;
const string db_table_events = "events";
const string db_column_events_id = "id";
const string db_column_events_data = "data";
const string db_table_session = "sessions";
const string db_column_session_id = "id";
const string db_column_session_data = "data";
// --- Static Singleton Access
Storage *Storage::m_instance = 0;
mutex Storage::m_db_get;
Storage *Storage::init(const string &db_name) {
lock_guard<mutex> guard(m_db_get);
if (!m_instance) {
m_instance = new Storage(db_name);
}
return m_instance;
}
Storage *Storage::instance() {
lock_guard<mutex> guard(m_db_get);
if (!m_instance) {
throw runtime_error("FATAL: Storage must be initialized first.");
}
return m_instance;
}
void Storage::close() {
lock_guard<mutex> guard(m_db_get);
if (m_instance) {
delete (m_instance);
}
m_instance = 0;
}
// --- Constructor & Destructor
Storage::Storage(const string &db_name) {
sqlite3 *db;
char *err_msg = 0;
int rc;
// Open Database connection
rc = sqlite3_open((const char *)db_name.c_str(), &db);
if (rc) {
throw runtime_error((string) "FATAL: Cannot open database: " + sqlite3_errmsg(db));
}
this->m_db_name = db_name;
this->m_db = db;
// WAL query
string wal_query = "PRAGMA journal_mode=WAL;";
rc = sqlite3_exec(this->m_db, (const char *)wal_query.c_str(), NULL, NULL, &err_msg);
if (rc != SQLITE_OK) {
string err = "FATAL: Cannot enable WAL: " + string(err_msg);
sqlite3_free(err_msg);
throw runtime_error(err);
}
// Create events table query
string create_events_query =
"CREATE TABLE IF NOT EXISTS " + db_table_events + "(" +
db_column_events_id + " INTEGER PRIMARY KEY, " +
db_column_events_data + " STRING" +
");";
// Make new events table
rc = sqlite3_exec(this->m_db, (const char *)create_events_query.c_str(), NULL, NULL, &err_msg);
if (rc != SQLITE_OK) {
cerr << "FATAL: Cannot create events table: " << err_msg << endl;
string err = "FATAL: Cannot create events table: " + string(err_msg);
sqlite3_free(err_msg);
throw runtime_error(err);
}
// Create session table query
string create_sessions_query =
"CREATE TABLE IF NOT EXISTS " + db_table_session + "(" +
db_column_session_id + " INTEGER PRIMARY KEY, " +
db_column_session_data + " STRING" +
");";
// Make new session table
rc = sqlite3_exec(this->m_db, (const char *)create_sessions_query.c_str(), NULL, NULL, &err_msg);
if (rc != SQLITE_OK) {
cerr << "FATAL: Cannot create sessions table: " << err_msg << endl;
string err = "FATAL: Cannot create sessions table: " + string(err_msg);
sqlite3_free(err_msg);
throw runtime_error(err);
}
// Insert query
string insert_query =
"INSERT INTO " + db_table_events + "(" +
db_column_events_data +
") values(?1);";
// Prepare insert statement
rc = sqlite3_prepare_v2(this->m_db, (const char *)insert_query.c_str(), -1, &this->m_add_stmt, NULL);
if (rc != SQLITE_OK) {
throw runtime_error("FATAL: Cannot prepare event insert statement: " + std::to_string(rc));
}
}
Storage::~Storage() {
sqlite3_finalize(this->m_add_stmt);
sqlite3_close(this->m_db);
}
// --- INSERT
void Storage::insert_payload(Payload payload) {
lock_guard<mutex> guard(this->m_db_access);
int rc;
string payload_str = Utils::serialize_payload(payload);
rc = sqlite3_bind_text(this->m_add_stmt, 1, payload_str.c_str(), payload_str.length(), SQLITE_STATIC);
if (rc != SQLITE_OK) {
cerr << "ERROR: Failed to bind payload to statement: " << rc << endl;
return;
}
rc = sqlite3_step(this->m_add_stmt);
if (rc != SQLITE_DONE) {
cerr << "ERROR: Failed to execute add_stmt: " << rc << endl;
return;
}
rc = sqlite3_reset(this->m_add_stmt);
if (rc != SQLITE_OK) {
cerr << "ERROR: Failed to reset add_stmt after insert: " << rc << endl;
return;
}
}
void Storage::insert_update_session(json session_data) {
lock_guard<mutex> guard(this->m_db_access);
int rc;
sqlite3_stmt *insert_stmt;
string insert_query =
"INSERT OR REPLACE INTO " + db_table_session + "(" +
db_column_session_id + "," + db_column_session_data +
") values(?1, ?2);";
rc = sqlite3_prepare_v2(this->m_db, (const char *)insert_query.c_str(), -1, &insert_stmt, NULL);
if (rc != SQLITE_OK) {
cerr << "ERROR: Cannot prepare session insert statement: " << std::to_string(rc) << endl;
return;
}
rc = sqlite3_bind_int(insert_stmt, 1, 1);
if (rc != SQLITE_OK) {
cerr << "ERROR: Failed to bind id to statement: " << rc << endl;
return;
}
string session_data_str = session_data.dump();
rc = sqlite3_bind_text(insert_stmt, 2, session_data_str.c_str(), session_data_str.length(), SQLITE_STATIC);
if (rc != SQLITE_OK) {
cerr << "ERROR: Failed to bind data to statement: " << rc << endl;
return;
}
rc = sqlite3_step(insert_stmt);
if (rc != SQLITE_DONE) {
cerr << "ERROR: Failed to execute insert_stmt: " << rc << endl;
return;
}
rc = sqlite3_reset(insert_stmt);
if (rc != SQLITE_OK) {
cerr << "ERROR: Failed to reset insert_stmt after insert: " << rc << endl;
return;
}
sqlite3_finalize(insert_stmt);
}
// --- SELECT
static int select_event_callback(void *data, int argc, char **argv, char **az_col_name) {
int i, id = 0;
list<Storage::EventRow> *data_list = (list<Storage::EventRow> *)data;
Payload event;
for (i = 0; i < argc; i++) {
if (az_col_name[i] == db_column_events_id) {
id = std::stoi(argv[i] ? argv[i] : "-1");
} else if (az_col_name[i] == db_column_events_data) {
event = Utils::deserialize_json_str(argv[i] ? argv[i] : "");
}
}
Storage::EventRow event_row;
event_row.id = id;
event_row.event = event;
data_list->push_back(event_row);
return 0;
}
void Storage::select_all_event_rows(list<Storage::EventRow> *event_list) {
lock_guard<mutex> guard(this->m_db_access);
int rc;
char *err_msg = 0;
string select_all_query =
"SELECT * FROM " + db_table_events + ";";
rc = sqlite3_exec(this->m_db, (const char *)select_all_query.c_str(), select_event_callback, (void *)event_list, &err_msg);
if (rc != SQLITE_OK) {
cerr << "ERROR: Failed to execute select_all_query: " << rc << "; " << err_msg << endl;
sqlite3_free(err_msg);
}
}
void Storage::select_event_row_range(list<Storage::EventRow> *event_list, int range) {
lock_guard<mutex> guard(this->m_db_access);
int rc;
char *err_msg = 0;
string select_range_query =
"SELECT * FROM " + db_table_events + " " +
"ORDER BY " + db_column_events_id + " ASC LIMIT " + std::to_string(range) + ";";
rc = sqlite3_exec(this->m_db, (const char *)select_range_query.c_str(), select_event_callback, (void *)event_list, &err_msg);
if (rc != SQLITE_OK) {
cerr << "ERROR: Failed to execute select_range_query: " << rc << "; " << err_msg << endl;
sqlite3_free(err_msg);
}
}
static int select_session_callback(void *data, int argc, char **argv, char **az_col_name) {
int i;
list<json> *data_list = (list<json> *)data;
json session_data;
for (i = 0; i < argc; i++) {
if (az_col_name[i] == db_column_session_data) {
session_data = json::parse(argv[i] ? argv[i] : "");
break;
}
}
data_list->push_back(session_data);
return 0;
}
void Storage::select_all_session_rows(list<json> *session_list) {
lock_guard<mutex> guard(this->m_db_access);
int rc;
char *err_msg = 0;
string select_all_query =
"SELECT * FROM " + db_table_session + ";";
rc = sqlite3_exec(this->m_db, (const char *)select_all_query.c_str(), select_session_callback, (void *)session_list, &err_msg);
if (rc != SQLITE_OK) {
cerr << "ERROR: Failed to execute select_all_query: " << rc << "; " << err_msg << endl;
sqlite3_free(err_msg);
}
}
// --- DELETE
void Storage::delete_all_event_rows() {
lock_guard<mutex> guard(this->m_db_access);
int rc;
char *err_msg = 0;
string delete_all_query =
"DELETE FROM " + db_table_events + ";";
rc = sqlite3_exec(this->m_db, (const char *)delete_all_query.c_str(), NULL, NULL, &err_msg);
if (rc != SQLITE_OK) {
cerr << "ERROR: Failed to execute delete_all_query: " << rc << "; " << err_msg << endl;
sqlite3_free(err_msg);
}
}
void Storage::delete_event_row_ids(list<int> *id_list) {
lock_guard<mutex> guard(this->m_db_access);
int rc;
char *err_msg = 0;
string delete_range_query =
"DELETE FROM " + db_table_events + " " +
"WHERE " + db_column_events_id + " in (" + Utils::int_list_to_string(id_list, ",") + ");";
rc = sqlite3_exec(this->m_db, (const char *)delete_range_query.c_str(), NULL, NULL, &err_msg);
if (rc != SQLITE_OK) {
cerr << "ERROR: Failed to execute delete_range_query: " << rc << "; " << err_msg << endl;
sqlite3_free(err_msg);
}
}
void Storage::delete_all_session_rows() {
lock_guard<mutex> guard(this->m_db_access);
int rc;
char *err_msg = 0;
string delete_all_query =
"DELETE FROM " + db_table_session + ";";
rc = sqlite3_exec(this->m_db, (const char *)delete_all_query.c_str(), NULL, NULL, &err_msg);
if (rc != SQLITE_OK) {
cerr << "ERROR: Failed to execute delete_all_query: " << rc << "; " << err_msg << endl;
sqlite3_free(err_msg);
}
}
// --- Getters
string Storage::get_db_name() {
return this->m_db_name;
}