forked from jordanlambrecht/tracker-tracker
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathuptime.ts
More file actions
128 lines (111 loc) · 3.81 KB
/
Copy pathuptime.ts
File metadata and controls
128 lines (111 loc) · 3.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// src/lib/uptime.ts
//
// In-memory heartbeat accumulator with periodic DB flush.
// Buckets are 5 minutes wide. The accumulator lives on globalThis
// to survive HMR reloads in development.
//
// Functions: floorToFiveMin, recordHeartbeat, flushCompletedBuckets, removeDownloadClientFromAccumulator, clearUptimeAccumulator
import { sql } from "drizzle-orm"
import { db } from "@/lib/db"
import { clientUptimeBuckets } from "@/lib/db/schema"
const BUCKET_MS = 5 * 60 * 1000 // 5 minutes
interface Bucket {
ts: number // floored epoch ms
ok: number
fail: number
}
type FlushEntry = Bucket & { clientId: number }
type Accumulator = Map<number, Bucket> // keyed by clientId
const g = globalThis as typeof globalThis & {
__uptimeAccumulator?: Accumulator
__uptimeFlushQueue?: FlushEntry[]
}
function getAccumulator(): Accumulator {
if (!g.__uptimeAccumulator) {
g.__uptimeAccumulator = new Map()
}
return g.__uptimeAccumulator
}
function getFlushQueue(): FlushEntry[] {
if (!g.__uptimeFlushQueue) {
g.__uptimeFlushQueue = []
}
return g.__uptimeFlushQueue
}
/** Floor a Date to the nearest 5-minute boundary. */
export function floorToFiveMin(date: Date): Date {
const ms = date.getTime()
return new Date(ms - (ms % BUCKET_MS))
}
/** Record a single heartbeat result for a client. */
export function recordHeartbeat(clientId: number, success: boolean): void {
const acc = getAccumulator()
const ts = floorToFiveMin(new Date()).getTime()
const existing = acc.get(clientId)
if (existing && existing.ts === ts) {
// Same bucket — increment
if (success) existing.ok++
else existing.fail++
} else {
// Bucket boundary crossed (or first record for this client)
if (existing) {
// Push the completed bucket to the flush queue
getFlushQueue().push({ clientId, ...existing })
}
acc.set(clientId, { ts, ok: success ? 1 : 0, fail: success ? 0 : 1 })
}
}
/**
* Write completed buckets to the database. Returns the number of buckets flushed.
*
* Scans the accumulator for buckets older than the current 5-min window and
* drains the flush queue. This ensures buckets are flushed even if
* recordHeartbeat hasn't detected the boundary crossing yet.
* Pruning is handled separately in the deep poll cron.
*/
export async function flushCompletedBuckets(): Promise<number> {
// Move any completed buckets from the accumulator to the flush queue
const acc = getAccumulator()
const currentTs = floorToFiveMin(new Date()).getTime()
for (const [clientId, bucket] of acc) {
if (bucket.ts < currentTs) {
getFlushQueue().push({ clientId, ...bucket })
acc.delete(clientId)
}
}
const queue = getFlushQueue()
if (queue.length === 0) return 0
const count = queue.length
const rows = queue.map((entry) => ({
clientId: entry.clientId,
bucketTs: new Date(entry.ts),
ok: entry.ok,
fail: entry.fail,
}))
await db
.insert(clientUptimeBuckets)
.values(rows)
.onConflictDoUpdate({
target: [clientUptimeBuckets.clientId, clientUptimeBuckets.bucketTs],
set: {
ok: sql`${clientUptimeBuckets.ok} + excluded.ok`,
fail: sql`${clientUptimeBuckets.fail} + excluded.fail`,
},
})
// Only remove from queue after successful write
queue.splice(0, count)
return count
}
/** Remove a single download client from the accumulator and flush queue. Called when a client is deleted. */
export function removeDownloadClientFromAccumulator(clientId: number): void {
getAccumulator().delete(clientId)
const queue = getFlushQueue()
const filtered = queue.filter((e) => e.clientId !== clientId)
queue.length = 0
queue.push(...filtered)
}
/** Clear the in-memory accumulator and flush queue. Called on logout/stop. */
export function clearUptimeAccumulator(): void {
g.__uptimeAccumulator = new Map()
g.__uptimeFlushQueue = []
}