From ca522c0c4b985672591ea8bd77e834cca445dbc4 Mon Sep 17 00:00:00 2001 From: Yoann Ciabaud Date: Wed, 8 Jun 2016 23:57:01 +0200 Subject: [PATCH 1/7] Prune old peers in server with lru based cache. Issue #4 --- lib/server/swarm.js | 20 +++++++++++++------- package.json | 1 + server.js | 11 ++++++++--- test/server.js | 19 ++++++++++--------- 4 files changed, 32 insertions(+), 19 deletions(-) diff --git a/lib/server/swarm.js b/lib/server/swarm.js index 1dad0583..f44f3cfe 100644 --- a/lib/server/swarm.js +++ b/lib/server/swarm.js @@ -1,12 +1,16 @@ module.exports = Swarm var debug = require('debug')('bittorrent-tracker') +var LRU = require('lru') var randomIterate = require('random-iterate') // Regard this as the default implementation of an interface that you // need to support when overriding Server.createSwarm() and Server.getSwarm() function Swarm (infoHash, server) { - this.peers = {} + this.peers = new LRU({ + max: server.peersCacheLength || 10000, + maxAge: server.peersCacheTtl || 900 // 900s = 15 minutes + }) this.complete = 0 this.incomplete = 0 } @@ -14,7 +18,8 @@ function Swarm (infoHash, server) { Swarm.prototype.announce = function (params, cb) { var self = this var id = params.type === 'ws' ? params.peer_id : params.addr - var peer = self.peers[id] + // Mark the source peer as recently used in cache + var peer = self.peers.get(id) if (params.event === 'started') { self._onAnnounceStarted(params, peer) @@ -51,14 +56,14 @@ Swarm.prototype._onAnnounceStarted = function (params, peer) { if (params.left === 0) this.complete += 1 else this.incomplete += 1 var id = params.type === 'ws' ? params.peer_id : params.addr - peer = this.peers[id] = { + peer = this.peers.set(id, { type: params.type, complete: params.left === 0, peerId: params.peer_id, // as hex ip: params.ip, port: params.port, socket: params.socket // only websocket - } + }) } Swarm.prototype._onAnnounceStopped = function (params, peer) { @@ -70,7 +75,7 @@ Swarm.prototype._onAnnounceStopped = function (params, peer) { if (peer.complete) this.complete -= 1 else this.incomplete -= 1 var id = params.type === 'ws' ? params.peer_id : params.addr - delete this.peers[id] + this.peers.remove(id) } Swarm.prototype._onAnnounceCompleted = function (params, peer) { @@ -103,10 +108,11 @@ Swarm.prototype._onAnnounceUpdate = function (params, peer) { Swarm.prototype._getPeers = function (numwant, ownPeerId, isWebRTC) { var peers = [] - var ite = randomIterate(Object.keys(this.peers)) + var ite = randomIterate(Object.keys(this.peers.cache)) var peerId while ((peerId = ite()) && peers.length < numwant) { - var peer = this.peers[peerId] + // Don't mark the peer as most recently used on announce + var peer = this.peers.peek(peerId) if (isWebRTC && peer.peerId === ownPeerId) continue // don't send peer to itself if ((isWebRTC && peer.type !== 'ws') || (!isWebRTC && peer.type === 'ws')) continue // send proper peer type peers.push(peer) diff --git a/package.json b/package.json index 0e2da31e..13349220 100644 --- a/package.json +++ b/package.json @@ -27,6 +27,7 @@ "hat": "0.0.3", "inherits": "^2.0.1", "ip": "^1.0.1", + "lru": "^2.0.1", "minimist": "^1.1.1", "once": "^1.3.0", "random-iterate": "^1.0.1", diff --git a/server.js b/server.js index 62ca7bef..4fc9e1e4 100644 --- a/server.js +++ b/server.js @@ -50,6 +50,9 @@ function Server (opts) { self._trustProxy = !!opts.trustProxy if (typeof opts.filter === 'function') self._filter = opts.filter + self.peersCacheLength = opts.peersCacheLength + self.peersCacheTtl = opts.peersCacheTtl + self._listenCalled = false self.listening = false self.destroyed = false @@ -153,7 +156,7 @@ function Server (opts) { if (req.method === 'GET' && (req.url === '/stats' || req.url === '/stats.json')) { infoHashes.forEach(function (infoHash) { var peers = self.torrents[infoHash].peers - var keys = Object.keys(peers) + var keys = Object.keys(peers.cache) if (keys.length > 0) activeTorrents++ keys.forEach(function (peerId) { @@ -165,7 +168,8 @@ function Server (opts) { leecher: false } } - var peer = peers[peerId] + // Don't mark the peer as most recently used for stats + var peer = peers.peek(peerId) if (peer.ip.indexOf(':') >= 0) { allPeers[peerId].ipv6 = true } else { @@ -489,7 +493,8 @@ Server.prototype._onWebSocketRequest = function (socket, opts, params) { if (!swarm) { return self.emit('warning', new Error('no swarm with that `info_hash`')) } - var toPeer = swarm.peers[params.to_peer_id] + // Mark the destination peer as recently used in cache + var toPeer = swarm.peers.get(params.to_peer_id) if (!toPeer) { return self.emit('warning', new Error('no peer with that `to_peer_id`')) } diff --git a/test/server.js b/test/server.js index 526cab3a..d8e2f4d0 100644 --- a/test/server.js +++ b/test/server.js @@ -52,22 +52,23 @@ function serverTest (t, serverType, serverFamily) { t.equal(Object.keys(server.torrents).length, 1) t.equal(swarm.complete, 0) t.equal(swarm.incomplete, 1) - t.equal(Object.keys(swarm.peers).length, 1) + t.equal(Object.keys(swarm.peers.cache).length, 1) var id = serverType === 'ws' ? peerId.toString('hex') : hostname + ':6881' - t.equal(swarm.peers[id].type, serverType) - t.equal(swarm.peers[id].ip, clientIp) - t.equal(swarm.peers[id].peerId, peerId.toString('hex')) - t.equal(swarm.peers[id].complete, false) + var peer = swarm.peers.peek(id) + t.equal(peer.type, serverType) + t.equal(peer.ip, clientIp) + t.equal(peer.peerId, peerId.toString('hex')) + t.equal(peer.complete, false) if (serverType === 'ws') { - t.equal(typeof swarm.peers[id].port, 'number') - t.ok(swarm.peers[id].socket) + t.equal(typeof peer.port, 'number') + t.ok(peer.socket) } else { - t.equal(swarm.peers[id].port, 6881) - t.notOk(swarm.peers[id].socket) + t.equal(peer.port, 6881) + t.notOk(peer.socket) } client1.complete() From 1dbc95cbdc2e059c5e3c6713bfe1b2d7823ac0c1 Mon Sep 17 00:00:00 2001 From: Yoann Ciabaud Date: Thu, 9 Jun 2016 01:28:49 +0200 Subject: [PATCH 2/7] Add pruning to server tests. Issue #4 --- test/server.js | 58 ++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 16 deletions(-) diff --git a/test/server.js b/test/server.js index d8e2f4d0..706cb04c 100644 --- a/test/server.js +++ b/test/server.js @@ -12,9 +12,10 @@ var test = require('tape') var infoHash = '4cb67059ed6bd08362da625b3ae77f6f4a075705' var peerId = Buffer.from('01234567890123456789') var peerId2 = Buffer.from('12345678901234567890') +var peerId3 = Buffer.from('23456789012345678901') function serverTest (t, serverType, serverFamily) { - t.plan(30) + t.plan(32) var hostname = serverFamily === 'inet6' ? '[::1]' @@ -23,7 +24,12 @@ function serverTest (t, serverType, serverFamily) { ? '::1' : '127.0.0.1' - common.createServer(t, serverType, function (server) { + var opts = { + serverType: serverType, + peersCacheLength: 2 + } + + common.createServer(t, opts, function (server) { var port = server[serverType].address().port var announceUrl = serverType + '://' + hostname + ':' + port + '/announce' @@ -103,22 +109,42 @@ function serverTest (t, serverType, serverFamily) { client2.once('peer', function (addr) { t.ok(addr === hostname + ':6881' || addr === hostname + ':6882' || addr.id === peerId.toString('hex')) - client2.stop() - client2.once('update', function (data) { - t.equal(data.announce, announceUrl) - t.equal(data.complete, 1) - t.equal(data.incomplete, 0) - client2.destroy() + swarm.peers.once('evict', function (evicted) { + t.equals(evicted.value.peerId, peerId.toString('hex')) + }) + var client3 = new Client({ + infoHash: infoHash, + announce: [ announceUrl ], + peerId: peerId3, + port: 6880 + // wrtc: wrtc + }) + client3.start() + + server.once('start', function () { + t.pass('got start message from client3') + }) - client1.stop() - client1.once('update', function (data) { + client3.once('update', function () { + client2.stop() + client2.once('update', function (data) { t.equal(data.announce, announceUrl) - t.equal(data.complete, 0) - t.equal(data.incomplete, 0) - - client1.destroy(function () { - server.close() - // if (serverType === 'ws') wrtc.close() + t.equal(data.complete, 1) + t.equal(data.incomplete, 1) + client2.destroy() + + client3.stop() + client3.once('update', function (data) { + t.equal(data.announce, announceUrl) + t.equal(data.complete, 1) + t.equal(data.incomplete, 0) + + client3.destroy(function () { + client1.destroy(function () { + server.close() + }) + // if (serverType === 'ws') wrtc.close() + }) }) }) }) From 5dcc93224753a2545d9be48dfe490a42d3fef947 Mon Sep 17 00:00:00 2001 From: Yoann Ciabaud Date: Thu, 9 Jun 2016 02:10:07 +0200 Subject: [PATCH 3/7] Refresh cache on changes and handle access to evicted peer --- lib/server/swarm.js | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/lib/server/swarm.js b/lib/server/swarm.js index f44f3cfe..d07f0617 100644 --- a/lib/server/swarm.js +++ b/lib/server/swarm.js @@ -21,15 +21,18 @@ Swarm.prototype.announce = function (params, cb) { // Mark the source peer as recently used in cache var peer = self.peers.get(id) - if (params.event === 'started') { + // Get the peer back in swarm if missing + if (params.event === 'started' || !peer) { self._onAnnounceStarted(params, peer) - } else if (params.event === 'stopped') { + } + + if (params.event === 'stopped') { self._onAnnounceStopped(params, peer) } else if (params.event === 'completed') { self._onAnnounceCompleted(params, peer) } else if (params.event === 'update') { self._onAnnounceUpdate(params, peer) - } else { + } else if (params.event !== 'started') { cb(new Error('invalid event')) return } @@ -91,6 +94,8 @@ Swarm.prototype._onAnnounceCompleted = function (params, peer) { this.complete += 1 this.incomplete -= 1 peer.complete = true + var id = params.type === 'ws' ? params.peer_id : params.addr + this.peers.set(id, peer) } Swarm.prototype._onAnnounceUpdate = function (params, peer) { @@ -103,6 +108,8 @@ Swarm.prototype._onAnnounceUpdate = function (params, peer) { this.complete += 1 this.incomplete -= 1 peer.complete = true + var id = params.type === 'ws' ? params.peer_id : params.addr + this.peers.set(id, peer) } } From 7f98203d5ec63f395e2794700a10046c54dcf8f1 Mon Sep 17 00:00:00 2001 From: Yoann Ciabaud Date: Thu, 9 Jun 2016 16:28:31 +0200 Subject: [PATCH 4/7] Manually clean peers on announce --- lib/server/swarm.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/server/swarm.js b/lib/server/swarm.js index d07f0617..21940f86 100644 --- a/lib/server/swarm.js +++ b/lib/server/swarm.js @@ -118,6 +118,11 @@ Swarm.prototype._getPeers = function (numwant, ownPeerId, isWebRTC) { var ite = randomIterate(Object.keys(this.peers.cache)) var peerId while ((peerId = ite()) && peers.length < numwant) { + // Check manually if the peer is active + if (peers.maxAge && (Date.now() - peers.cache[peerId].modified) > peers.maxAge) { + peers.remove(peerId) + continue + } // Don't mark the peer as most recently used on announce var peer = this.peers.peek(peerId) if (isWebRTC && peer.peerId === ownPeerId) continue // don't send peer to itself From 5843b7d2f64a865d93330eb96727fe9986e0f31a Mon Sep 17 00:00:00 2001 From: Yoann Ciabaud Date: Fri, 10 Jun 2016 00:29:56 +0200 Subject: [PATCH 5/7] Bugfix in manual peer eviction. --- lib/server/swarm.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/server/swarm.js b/lib/server/swarm.js index 21940f86..4bb98203 100644 --- a/lib/server/swarm.js +++ b/lib/server/swarm.js @@ -119,7 +119,7 @@ Swarm.prototype._getPeers = function (numwant, ownPeerId, isWebRTC) { var peerId while ((peerId = ite()) && peers.length < numwant) { // Check manually if the peer is active - if (peers.maxAge && (Date.now() - peers.cache[peerId].modified) > peers.maxAge) { + if (this.peers.maxAge && (Date.now() - this.peers.cache[peerId].modified) > this.peers.maxAge) { peers.remove(peerId) continue } From d51a77b0287067c923973964e799d215e9fb986f Mon Sep 17 00:00:00 2001 From: Yoann Ciabaud Date: Fri, 10 Jun 2016 10:00:19 +0200 Subject: [PATCH 6/7] Update lru package to 3.0.0 and set peersCacheLength to 1K as default --- lib/server/swarm.js | 10 +++------- package.json | 2 +- server.js | 2 +- test/server.js | 2 +- 4 files changed, 6 insertions(+), 10 deletions(-) diff --git a/lib/server/swarm.js b/lib/server/swarm.js index 4bb98203..df129952 100644 --- a/lib/server/swarm.js +++ b/lib/server/swarm.js @@ -8,7 +8,7 @@ var randomIterate = require('random-iterate') // need to support when overriding Server.createSwarm() and Server.getSwarm() function Swarm (infoHash, server) { this.peers = new LRU({ - max: server.peersCacheLength || 10000, + max: server.peersCacheLength || 1000, maxAge: server.peersCacheTtl || 900 // 900s = 15 minutes }) this.complete = 0 @@ -115,16 +115,12 @@ Swarm.prototype._onAnnounceUpdate = function (params, peer) { Swarm.prototype._getPeers = function (numwant, ownPeerId, isWebRTC) { var peers = [] - var ite = randomIterate(Object.keys(this.peers.cache)) + var ite = randomIterate(this.peers.keys) var peerId while ((peerId = ite()) && peers.length < numwant) { - // Check manually if the peer is active - if (this.peers.maxAge && (Date.now() - this.peers.cache[peerId].modified) > this.peers.maxAge) { - peers.remove(peerId) - continue - } // Don't mark the peer as most recently used on announce var peer = this.peers.peek(peerId) + if (!peer) continue if (isWebRTC && peer.peerId === ownPeerId) continue // don't send peer to itself if ((isWebRTC && peer.type !== 'ws') || (!isWebRTC && peer.type === 'ws')) continue // send proper peer type peers.push(peer) diff --git a/package.json b/package.json index 13349220..c4bb5f72 100644 --- a/package.json +++ b/package.json @@ -27,7 +27,7 @@ "hat": "0.0.3", "inherits": "^2.0.1", "ip": "^1.0.1", - "lru": "^2.0.1", + "lru": "^3.0.0", "minimist": "^1.1.1", "once": "^1.3.0", "random-iterate": "^1.0.1", diff --git a/server.js b/server.js index 4fc9e1e4..6b56a9e4 100644 --- a/server.js +++ b/server.js @@ -156,7 +156,7 @@ function Server (opts) { if (req.method === 'GET' && (req.url === '/stats' || req.url === '/stats.json')) { infoHashes.forEach(function (infoHash) { var peers = self.torrents[infoHash].peers - var keys = Object.keys(peers.cache) + var keys = peers.keys if (keys.length > 0) activeTorrents++ keys.forEach(function (peerId) { diff --git a/test/server.js b/test/server.js index 706cb04c..b3a7b163 100644 --- a/test/server.js +++ b/test/server.js @@ -58,7 +58,7 @@ function serverTest (t, serverType, serverFamily) { t.equal(Object.keys(server.torrents).length, 1) t.equal(swarm.complete, 0) t.equal(swarm.incomplete, 1) - t.equal(Object.keys(swarm.peers.cache).length, 1) + t.equal(swarm.peers.length, 1) var id = serverType === 'ws' ? peerId.toString('hex') From 2c7ea4e30746a3a67855730dddd9cff3438fece5 Mon Sep 17 00:00:00 2001 From: Yoann Ciabaud Date: Tue, 14 Jun 2016 07:15:57 +0200 Subject: [PATCH 7/7] Refactor id usage in announce and fix default peersCacheTtl --- lib/server/swarm.js | 31 ++++++++++++------------------- 1 file changed, 12 insertions(+), 19 deletions(-) diff --git a/lib/server/swarm.js b/lib/server/swarm.js index df129952..d0b3277d 100644 --- a/lib/server/swarm.js +++ b/lib/server/swarm.js @@ -9,7 +9,7 @@ var randomIterate = require('random-iterate') function Swarm (infoHash, server) { this.peers = new LRU({ max: server.peersCacheLength || 1000, - maxAge: server.peersCacheTtl || 900 // 900s = 15 minutes + maxAge: server.peersCacheTtl || 900000 // 900 000ms = 15 minutes }) this.complete = 0 this.incomplete = 0 @@ -21,18 +21,15 @@ Swarm.prototype.announce = function (params, cb) { // Mark the source peer as recently used in cache var peer = self.peers.get(id) - // Get the peer back in swarm if missing - if (params.event === 'started' || !peer) { - self._onAnnounceStarted(params, peer) - } - - if (params.event === 'stopped') { - self._onAnnounceStopped(params, peer) + if (params.event === 'started') { + self._onAnnounceStarted(params, peer, id) + } else if (params.event === 'stopped') { + self._onAnnounceStopped(params, peer, id) } else if (params.event === 'completed') { - self._onAnnounceCompleted(params, peer) + self._onAnnounceCompleted(params, peer, id) } else if (params.event === 'update') { - self._onAnnounceUpdate(params, peer) - } else if (params.event !== 'started') { + self._onAnnounceUpdate(params, peer, id) + } else { cb(new Error('invalid event')) return } @@ -50,7 +47,7 @@ Swarm.prototype.scrape = function (params, cb) { }) } -Swarm.prototype._onAnnounceStarted = function (params, peer) { +Swarm.prototype._onAnnounceStarted = function (params, peer, id) { if (peer) { debug('unexpected `started` event from peer that is already in swarm') return this._onAnnounceUpdate(params, peer) // treat as an update @@ -58,7 +55,6 @@ Swarm.prototype._onAnnounceStarted = function (params, peer) { if (params.left === 0) this.complete += 1 else this.incomplete += 1 - var id = params.type === 'ws' ? params.peer_id : params.addr peer = this.peers.set(id, { type: params.type, complete: params.left === 0, @@ -69,7 +65,7 @@ Swarm.prototype._onAnnounceStarted = function (params, peer) { }) } -Swarm.prototype._onAnnounceStopped = function (params, peer) { +Swarm.prototype._onAnnounceStopped = function (params, peer, id) { if (!peer) { debug('unexpected `stopped` event from peer that is not in swarm') return // do nothing @@ -77,11 +73,10 @@ Swarm.prototype._onAnnounceStopped = function (params, peer) { if (peer.complete) this.complete -= 1 else this.incomplete -= 1 - var id = params.type === 'ws' ? params.peer_id : params.addr this.peers.remove(id) } -Swarm.prototype._onAnnounceCompleted = function (params, peer) { +Swarm.prototype._onAnnounceCompleted = function (params, peer, id) { if (!peer) { debug('unexpected `completed` event from peer that is not in swarm') return this._onAnnounceStarted(params, peer) // treat as a start @@ -94,11 +89,10 @@ Swarm.prototype._onAnnounceCompleted = function (params, peer) { this.complete += 1 this.incomplete -= 1 peer.complete = true - var id = params.type === 'ws' ? params.peer_id : params.addr this.peers.set(id, peer) } -Swarm.prototype._onAnnounceUpdate = function (params, peer) { +Swarm.prototype._onAnnounceUpdate = function (params, peer, id) { if (!peer) { debug('unexpected `update` event from peer that is not in swarm') return this._onAnnounceStarted(params, peer) // treat as a start @@ -108,7 +102,6 @@ Swarm.prototype._onAnnounceUpdate = function (params, peer) { this.complete += 1 this.incomplete -= 1 peer.complete = true - var id = params.type === 'ws' ? params.peer_id : params.addr this.peers.set(id, peer) } }