Skip to content

Commit 1537aad

Browse files
committed
upgraded swarm to support database tracking
1 parent 7f07ec8 commit 1537aad

File tree

3 files changed

+125
-32
lines changed

3 files changed

+125
-32
lines changed

.vscode/settings.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"javascript.format.insertSpaceBeforeFunctionParenthesis": true
3+
}

lib/server/swarm.js

Lines changed: 120 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,151 @@
1+
import { utilitas } from 'utilitas'
12
import arrayRemove from 'unordered-array-remove'
23
import Debug from 'debug'
34
import LRU from 'lru'
45
import randomIterate from 'random-iterate'
56

67
const debug = Debug('bittorrent-tracker:swarm')
78

9+
const getLru = (options) => {
10+
if (!globalThis?.swarmHijack) { return new LRU(options); }
11+
const [EXCLUDED, SKIP_ECHO] = [['httpReq', 'httpRes'], { skipEcho: true }];
12+
return new class DbLru {
13+
constructor(options) {
14+
this.max = options?.max;
15+
this.maxAge = options?.maxAge;
16+
this.infoHash = options?.infoHash;
17+
this.dbio = globalThis.swarmHijack.dbio;
18+
this.table = globalThis.swarmHijack.table;
19+
this.lastCheck = 0;
20+
this.events = [];
21+
};
22+
23+
on (event, callback) {
24+
return this.events.push({ event, callback });
25+
};
26+
27+
async set (_id, peer, options) {
28+
return await this.dbio.upsert(this.table, {
29+
...peer, _id, deleted: false, infoHash: this.infoHash,
30+
peer: utilitas.exclude(options._.peer, EXCLUDED),
31+
socket: peer.socket || null, updatedAt: new Date(),
32+
}, SKIP_ECHO);
33+
};
34+
35+
async touch (_id) {
36+
return await this.dbio.execute(
37+
'UPDATE `' + this.table + '` SET `deleted` = ?, `updatedAt` = ? WHERE '
38+
+ '`infoHash` = ? AND `_id` = ?', [false, new Date(), this.infoHash, _id]
39+
);
40+
};
41+
42+
async peek (_id) {
43+
return await this.dbio.queryOne(
44+
'SELECT * FROM `' + this.table + '` WHERE `infoHash` = ? '
45+
+ 'AND `_id` = ? AND `deleted` = ?', [this.infoHash, _id, false]
46+
);
47+
};
48+
49+
async get (_id) {
50+
await this.touch(_id);
51+
return await this.peek(_id);
52+
};
53+
54+
async trigger (event, value) {
55+
for (let item of this.events.filter(x => x.event === event)) {
56+
await item.callback({ key: value._id, value });
57+
}
58+
};
59+
60+
async keys () {
61+
const [now, old, res] = [new Date(), [], []];
62+
const [lastValid, checkEvict] = [
63+
now - this.maxAge,
64+
now - this.lastCheck > 1000 * 60 && (this.lastCheck = now)
65+
]; // check evict peers every 1 minute
66+
const peers = (await this.dbio.query(
67+
'SELECT `id`, `_id`, `infoHash`, `peerId`, `type`, `ip`, `socket`, '
68+
+ '`port`, `complete`, `updatedAt` FROM `' + this.table + '` WHERE '
69+
+ '`infoHash` = ? AND `deleted` = ?', [this.infoHash, false]
70+
)).sort((x, y) => y.updatedAt - x.updatedAt);
71+
for (let peer of peers) {
72+
if (peer.updatedAt > lastValid && res.length < this.max) {
73+
res.push(peer._id);
74+
} else if (checkEvict) {
75+
old.push(peer.id);
76+
await this.trigger('evict', peer);
77+
}
78+
}
79+
old.length && await this.dbio.updateByKeyValue(
80+
this.table, 'id', old, { deleted: true, updatedAt: now }, SKIP_ECHO
81+
);
82+
return res;
83+
};
84+
85+
async remove (_id) {
86+
return this.dbio.execute(
87+
'UPDATED `' + this.table + '` SET `deleted` = ?, `updatedAt` = ? WHERE '
88+
+ '`infoHash` = ? AND `_id` = ?', [true, new Date(), this.infoHash, _id]
89+
);
90+
}
91+
}(options);
92+
};
93+
894
// Regard this as the default implementation of an interface that you
995
// need to support when overriding Server.createSwarm() and Server.getSwarm()
1096
class Swarm {
11-
constructor (infoHash, server) {
97+
constructor(infoHash, server) {
1298
const self = this
1399
self.infoHash = infoHash
14100
self.complete = 0
15101
self.incomplete = 0
16102

17-
self.peers = new LRU({
103+
self.peers = getLru({
18104
max: server.peersCacheLength || 1000,
19-
maxAge: server.peersCacheTtl || 20 * 60 * 1000 // 20 minutes
105+
maxAge: server.peersCacheTtl || 20 * 60 * 1000, // 20 minutes
106+
infoHash,
20107
})
21108

22109
// When a peer is evicted from the LRU store, send a synthetic 'stopped' event
23110
// so the stats get updated correctly.
24-
self.peers.on('evict', data => {
111+
self.peers.on('evict', async data => {
25112
const peer = data.value
26113
const params = {
27114
type: peer.type,
28115
event: 'stopped',
29116
numwant: 0,
30117
peer_id: peer.peerId
31118
}
32-
self._onAnnounceStopped(params, peer, peer.peerId)
119+
await self._onAnnounceStopped(params, peer, peer.peerId)
33120
peer.socket = null
34121
})
35122
}
36123

37-
announce (params, cb) {
124+
async announce (params, cb) {
38125
const self = this
39126
const id = params.type === 'ws' ? params.peer_id : params.addr
40127
// Mark the source peer as recently used in cache
41-
const peer = self.peers.get(id)
128+
const peer = await self.peers.get(id)
42129

43130
if (params.event === 'started') {
44-
self._onAnnounceStarted(params, peer, id)
131+
await self._onAnnounceStarted(params, peer, id)
45132
} else if (params.event === 'stopped') {
46-
self._onAnnounceStopped(params, peer, id)
133+
await self._onAnnounceStopped(params, peer, id)
47134
if (!cb) return // when websocket is closed
48135
} else if (params.event === 'completed') {
49-
self._onAnnounceCompleted(params, peer, id)
136+
await self._onAnnounceCompleted(params, peer, id)
50137
} else if (params.event === 'update') {
51-
self._onAnnounceUpdate(params, peer, id)
138+
await self._onAnnounceUpdate(params, peer, id)
52139
} else if (params.event === 'paused') {
53-
self._onAnnouncePaused(params, peer, id)
140+
await self._onAnnouncePaused(params, peer, id)
54141
} else {
55142
cb(new Error('invalid event'))
56143
return
57144
}
58145
cb(null, {
59146
complete: self.complete,
60147
incomplete: self.incomplete,
61-
peers: self._getPeers(params.numwant, params.peer_id, !!params.socket)
148+
peers: await self._getPeers(params.numwant, params.peer_id, !!params.socket)
62149
})
63150
}
64151

@@ -69,25 +156,25 @@ class Swarm {
69156
})
70157
}
71158

72-
_onAnnounceStarted (params, peer, id) {
159+
async _onAnnounceStarted (params, peer, id) {
73160
if (peer) {
74161
debug('unexpected `started` event from peer that is already in swarm')
75-
return this._onAnnounceUpdate(params, peer, id) // treat as an update
162+
return await this._onAnnounceUpdate(params, peer, id) // treat as an update
76163
}
77164

78165
if (params.left === 0) this.complete += 1
79166
else this.incomplete += 1
80-
this.peers.set(id, {
167+
await this.peers.set(id, {
81168
type: params.type,
82169
complete: params.left === 0,
83170
peerId: params.peer_id, // as hex
84171
ip: params.ip,
85172
port: params.port,
86173
socket: params.socket // only websocket
87-
})
174+
}, { _: { peer: params } })
88175
}
89176

90-
_onAnnounceStopped (params, peer, id) {
177+
async _onAnnounceStopped (params, peer, id) {
91178
if (!peer) {
92179
debug('unexpected `stopped` event from peer that is not in swarm')
93180
return // do nothing
@@ -103,55 +190,56 @@ class Swarm {
103190
arrayRemove(peer.socket.infoHashes, index)
104191
}
105192

106-
this.peers.remove(id)
193+
peer._id || await this.peers.remove(id)
107194
}
108195

109-
_onAnnounceCompleted (params, peer, id) {
196+
async _onAnnounceCompleted (params, peer, id) {
110197
if (!peer) {
111198
debug('unexpected `completed` event from peer that is not in swarm')
112-
return this._onAnnounceStarted(params, peer, id) // treat as a start
199+
return await this._onAnnounceStarted(params, peer, id) // treat as a start
113200
}
114201
if (peer.complete) {
115202
debug('unexpected `completed` event from peer that is already completed')
116-
return this._onAnnounceUpdate(params, peer, id) // treat as an update
203+
return await this._onAnnounceUpdate(params, peer, id) // treat as an update
117204
}
118205

119206
this.complete += 1
120207
this.incomplete -= 1
121208
peer.complete = true
122-
this.peers.set(id, peer)
209+
await this.peers.set(id, peer, { _: { peer: params } })
123210
}
124211

125-
_onAnnounceUpdate (params, peer, id) {
212+
async _onAnnounceUpdate (params, peer, id) {
126213
if (!peer) {
127214
debug('unexpected `update` event from peer that is not in swarm')
128-
return this._onAnnounceStarted(params, peer, id) // treat as a start
215+
return await this._onAnnounceStarted(params, peer, id) // treat as a start
129216
}
130217

131218
if (!peer.complete && params.left === 0) {
132219
this.complete += 1
133220
this.incomplete -= 1
134221
peer.complete = true
135222
}
136-
this.peers.set(id, peer)
223+
await this.peers.set(id, peer, { _: { peer: params } })
137224
}
138225

139-
_onAnnouncePaused (params, peer, id) {
226+
async _onAnnouncePaused (params, peer, id) {
140227
if (!peer) {
141228
debug('unexpected `paused` event from peer that is not in swarm')
142-
return this._onAnnounceStarted(params, peer, id) // treat as a start
229+
return await this._onAnnounceStarted(params, peer, id) // treat as a start
143230
}
144231

145-
this._onAnnounceUpdate(params, peer, id)
232+
await this._onAnnounceUpdate(params, peer, id)
146233
}
147234

148-
_getPeers (numwant, ownPeerId, isWebRTC) {
235+
async _getPeers (numwant, ownPeerId, isWebRTC) {
149236
const peers = []
150-
const ite = randomIterate(this.peers.keys)
237+
const ite = randomIterate(Function.isFunction(this.peers.keys)
238+
? await this.peers.keys() : this.peers.keys)
151239
let peerId
152240
while ((peerId = ite()) && peers.length < numwant) {
153241
// Don't mark the peer as most recently used on announce
154-
const peer = this.peers.peek(peerId)
242+
const peer = await this.peers.peek(peerId)
155243
if (!peer) continue
156244
if (isWebRTC && peer.peerId === ownPeerId) continue // don't send peer to itself
157245
if ((isWebRTC && peer.type !== 'ws') || (!isWebRTC && peer.type === 'ws')) continue // send proper peer type

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,11 @@
5555
"@mapbox/node-pre-gyp": "1.0.10",
5656
"@webtorrent/semantic-release-config": "1.0.8",
5757
"magnet-uri": "7.0.2",
58+
"mysql2": "^3.2.0",
5859
"semantic-release": "20.1.1",
5960
"standard": "*",
6061
"tape": "5.6.3",
62+
"utilitas": "^1992.4.5",
6163
"webtorrent-fixtures": "2.0.2",
6264
"wrtc": "0.4.7"
6365
},

0 commit comments

Comments
 (0)