diff --git a/client.js b/client.js index cea05be3..f29e962d 100644 --- a/client.js +++ b/client.js @@ -69,6 +69,11 @@ function Client (opts) { // See: https://github.com/feross/webtorrent-hybrid/issues/46 self._wrtc = typeof opts.wrtc === 'function' ? opts.wrtc() : opts.wrtc + // Use a socket pool, so WebSocket tracker clients share WebSocket objects for + // the same server. In practice, WebSockets are pretty slow to establish, so this + // gives a nice performance boost, and saves browser resources. + self._socketPool = {} + var announce = typeof opts.announce === 'string' ? [ opts.announce ] : opts.announce == null ? [] : opts.announce diff --git a/lib/client/websocket-tracker.js b/lib/client/websocket-tracker.js index 894408e7..aa7e0430 100644 --- a/lib/client/websocket-tracker.js +++ b/lib/client/websocket-tracker.js @@ -10,11 +10,6 @@ var Socket = require('simple-websocket') var common = require('../common') var Tracker = require('./tracker') -// Use a socket pool, so tracker clients share WebSocket objects for the same server. -// In practice, WebSockets are pretty slow to establish, so this gives a nice performance -// boost, and saves browser resources. -var socketPool = {} - var RECONNECT_MINIMUM = 15 * 1000 var RECONNECT_MAXIMUM = 30 * 60 * 1000 var RECONNECT_VARIANCE = 30 * 1000 @@ -129,15 +124,15 @@ WebSocketTracker.prototype.destroy = function (cb) { self._onSocketDataBound = null self._onSocketCloseBound = null - if (socketPool[self.announceUrl]) { - socketPool[self.announceUrl].consumers -= 1 + if (self.client._socketPool[self.announceUrl]) { + self.client._socketPool[self.announceUrl].consumers -= 1 } // Other instances are using the socket, so there's nothing left to do here - if (socketPool[self.announceUrl].consumers > 0) return cb() + if (self.client._socketPool[self.announceUrl].consumers > 0) return cb() - var socket = socketPool[self.announceUrl] - delete socketPool[self.announceUrl] + var socket = self.client._socketPool[self.announceUrl] + delete self.client._socketPool[self.announceUrl] socket.on('error', noop) // ignore all future errors socket.once('close', cb) @@ -182,11 +177,11 @@ WebSocketTracker.prototype._openSocket = function () { self._onSocketClose() } - self.socket = socketPool[self.announceUrl] + self.socket = self.client._socketPool[self.announceUrl] if (self.socket) { - socketPool[self.announceUrl].consumers += 1 + self.client._socketPool[self.announceUrl].consumers += 1 } else { - self.socket = socketPool[self.announceUrl] = new Socket(self.announceUrl) + self.socket = self.client._socketPool[self.announceUrl] = new Socket(self.announceUrl) self.socket.consumers = 1 self.socket.once('connect', self._onSocketConnectBound) } diff --git a/lib/server/parse-http.js b/lib/server/parse-http.js index 24c3c69c..bf6cb2c4 100644 --- a/lib/server/parse-http.js +++ b/lib/server/parse-http.js @@ -25,7 +25,7 @@ function parseHttpRequest (req, opts) { if (!params.port) throw new Error('invalid port') params.left = Number(params.left) - if (isNaN(params.left)) params.left = Infinity + if (Number.isNaN(params.left)) params.left = Infinity params.compact = Number(params.compact) || 0 params.numwant = Math.min( diff --git a/lib/server/parse-websocket.js b/lib/server/parse-websocket.js index 0db50b55..b6fecb3d 100644 --- a/lib/server/parse-websocket.js +++ b/lib/server/parse-websocket.js @@ -28,7 +28,9 @@ function parseWebSocketRequest (socket, opts, params) { params.to_peer_id = common.binaryToHex(params.to_peer_id) } - params.left = Number(params.left) || Infinity + params.left = Number(params.left) + if (Number.isNaN(params.left)) params.left = Infinity + params.numwant = Math.min( Number(params.offers && params.offers.length) || 0, // no default - explicit only common.MAX_ANNOUNCE_PEERS diff --git a/lib/server/swarm.js b/lib/server/swarm.js index f8d8a446..5203318b 100644 --- a/lib/server/swarm.js +++ b/lib/server/swarm.js @@ -7,12 +7,37 @@ var randomIterate = require('random-iterate') // Regard this as the default implementation of an interface that you // need to support when overriding Server.createSwarm() and Server.getSwarm() function Swarm (infoHash, server) { - this.peers = new LRU({ + var self = this + self.infoHash = infoHash + self.complete = 0 + self.incomplete = 0 + + self.peers = new LRU({ max: server.peersCacheLength || 1000, - maxAge: server.peersCacheTtl || 900000 // 900 000ms = 15 minutes + maxAge: server.peersCacheTtl || 20 * 60 * 1000 // 20 minutes + }) + + // When a peer is evicted from the LRU store, send a synthetic 'stopped' event + // so the stats get updated correctly. + self.peers.on('evict', function (data) { + var peer = data.value + var params = { + type: peer.type, + event: 'stopped', + numwant: 0, + peer_id: peer.peerId + } + self._onAnnounceStopped(params, peer, peer.peerId) + + // When a websocket peer is evicted, and it's not in any other swarms, close + // the websocket to conserve server resources. + if (peer.socket && peer.socket.infoHashes.length === 0) { + try { + peer.socket.close() + peer.socket = null + } catch (err) {} + } }) - this.complete = 0 - this.incomplete = 0 } Swarm.prototype.announce = function (params, cb) { @@ -73,6 +98,14 @@ Swarm.prototype._onAnnounceStopped = function (params, peer, id) { if (peer.complete) this.complete -= 1 else this.incomplete -= 1 + + // If it's a websocket, remove this swarm's infohash from the list of active + // swarms that this peer is participating in. + if (peer.socket) { + var index = peer.socket.infoHashes.indexOf(this.infoHash) + peer.socket.infoHashes.splice(index, 1) + } + this.peers.remove(id) } @@ -82,8 +115,8 @@ Swarm.prototype._onAnnounceCompleted = function (params, peer, id) { return this._onAnnounceStarted(params, peer, id) // treat as a start } if (peer.complete) { - debug('unexpected `completed` event from peer that is already marked as completed') - return // do nothing + debug('unexpected `completed` event from peer that is already completed') + return this._onAnnounceUpdate(params, peer, id) // treat as an update } this.complete += 1 @@ -102,8 +135,8 @@ Swarm.prototype._onAnnounceUpdate = function (params, peer, id) { this.complete += 1 this.incomplete -= 1 peer.complete = true - this.peers.set(id, peer) } + this.peers.set(id, peer) } Swarm.prototype._getPeers = function (numwant, ownPeerId, isWebRTC) { diff --git a/server.js b/server.js index 6ba51a70..d28b6925 100644 --- a/server.js +++ b/server.js @@ -575,6 +575,59 @@ Server.prototype._onWebSocketRequest = function (socket, opts, params) { }) } +Server.prototype._onWebSocketSend = function (socket, err) { + var self = this + if (err) self._onWebSocketError(socket, err) +} + +Server.prototype._onWebSocketClose = function (socket) { + var self = this + debug('websocket close %s', socket.peerId) + + if (socket.peerId) { + socket.infoHashes.slice(0).forEach(function (infoHash) { + var swarm = self.torrents[infoHash] + if (swarm) { + swarm.announce({ + type: 'ws', + event: 'stopped', + numwant: 0, + peer_id: socket.peerId + }, noop) + } + }) + } + + // ignore all future errors + socket.onSend = noop + socket.on('error', noop) + + socket.peerId = null + socket.infoHashes = null + + if (typeof socket.onMessageBound === 'function') { + socket.removeListener('message', socket.onMessageBound) + } + socket.onMessageBound = null + + if (typeof socket.onErrorBound === 'function') { + socket.removeListener('error', socket.onErrorBound) + } + socket.onErrorBound = null + + if (typeof socket.onCloseBound === 'function') { + socket.removeListener('close', socket.onCloseBound) + } + socket.onCloseBound = null +} + +Server.prototype._onWebSocketError = function (socket, err) { + var self = this + debug('websocket error %s', err.message || err) + self.emit('warning', err) + self._onWebSocketClose(socket) +} + Server.prototype._onRequest = function (params, cb) { var self = this if (params && params.action === common.ACTIONS.CONNECT) { @@ -757,59 +810,6 @@ function makeUdpPacket (params) { return packet } -Server.prototype._onWebSocketSend = function (socket, err) { - var self = this - if (err) self._onWebSocketError(socket, err) -} - -Server.prototype._onWebSocketClose = function (socket) { - var self = this - debug('websocket close %s', socket.peerId) - - if (socket.peerId) { - socket.infoHashes.forEach(function (infoHash) { - var swarm = self.torrents[infoHash] - if (swarm) { - swarm.announce({ - type: 'ws', - event: 'stopped', - numwant: 0, - peer_id: socket.peerId - }, noop) - } - }) - } - - // ignore all future errors - socket.onSend = noop - socket.on('error', noop) - - socket.peerId = null - socket.infoHashes = null - - if (typeof socket.onMessageBound === 'function') { - socket.removeListener('message', socket.onMessageBound) - } - socket.onMessageBound = null - - if (typeof socket.onErrorBound === 'function') { - socket.removeListener('error', socket.onErrorBound) - } - socket.onErrorBound = null - - if (typeof socket.onCloseBound === 'function') { - socket.removeListener('close', socket.onCloseBound) - } - socket.onCloseBound = null -} - -Server.prototype._onWebSocketError = function (socket, err) { - var self = this - debug('websocket error %s', err.message || err) - self.emit('warning', err) - self._onWebSocketClose(socket) -} - function toNumber (x) { x = Number(x) return x >= 0 ? x : false diff --git a/test/evict.js b/test/evict.js new file mode 100644 index 00000000..1ff60bbb --- /dev/null +++ b/test/evict.js @@ -0,0 +1,126 @@ +var Buffer = require('safe-buffer').Buffer +var Client = require('../') +var common = require('./common') +var test = require('tape') +var electronWebrtc = require('electron-webrtc') + +var wrtc + +var infoHash = '4cb67059ed6bd08362da625b3ae77f6f4a075705' +var peerId = Buffer.from('01234567890123456789') +var peerId2 = Buffer.from('12345678901234567890') +var peerId3 = Buffer.from('23456789012345678901') + +function serverTest (t, serverType, serverFamily) { + t.plan(10) + + var hostname = serverFamily === 'inet6' + ? '[::1]' + : '127.0.0.1' + + var opts = { + serverType: serverType, + peersCacheLength: 2 // LRU cache can only contain a max of 2 peers + } + + common.createServer(t, opts, function (server) { + // Not using announceUrl param from `common.createServer()` since we + // want to control IPv4 vs IPv6. + var port = server[serverType].address().port + var announceUrl = serverType + '://' + hostname + ':' + port + '/announce' + + var client1 = new Client({ + infoHash: infoHash, + announce: [ announceUrl ], + peerId: peerId, + port: 6881, + wrtc: wrtc + }) + if (serverType === 'ws') common.mockWebsocketTracker(client1) + + client1.start() + + client1.once('update', function (data) { + var client2 = new Client({ + infoHash: infoHash, + announce: [ announceUrl ], + peerId: peerId2, + port: 6882, + wrtc: wrtc + }) + if (serverType === 'ws') common.mockWebsocketTracker(client2) + + client2.start() + + client2.once('update', function (data) { + server.getSwarm(infoHash, function (err, swarm) { + t.error(err) + + t.equal(swarm.complete + swarm.incomplete, 2) + + // Ensure that first peer is evicted when a third one is added + var evicted = false + swarm.peers.once('evict', function (evictedPeer) { + t.equal(evictedPeer.value.peerId, peerId.toString('hex')) + t.equal(swarm.complete + swarm.incomplete, 2) + evicted = true + }) + + var client3 = new Client({ + infoHash: infoHash, + announce: [ announceUrl ], + peerId: peerId3, + port: 6880, + wrtc: wrtc + }) + if (serverType === 'ws') common.mockWebsocketTracker(client3) + + client3.start() + + client3.once('update', function (data) { + t.ok(evicted, 'client1 was evicted from server before client3 gets response') + t.equal(swarm.complete + swarm.incomplete, 2) + + client1.destroy(function () { + t.pass('client1 destroyed') + }) + + client2.destroy(function () { + t.pass('client3 destroyed') + }) + + client3.destroy(function () { + t.pass('client3 destroyed') + }) + + server.close(function () { + t.pass('server destroyed') + }) + }) + }) + }) + }) + }) +} + +test('evict: ipv4 server', function (t) { + serverTest(t, 'http', 'inet') +}) + +test('evict: http ipv6 server', function (t) { + serverTest(t, 'http', 'inet6') +}) + +test('evict: udp server', function (t) { + serverTest(t, 'udp', 'inet') +}) + +test('evict: ws server', function (t) { + wrtc = electronWebrtc() + wrtc.electronDaemon.once('ready', function () { + serverTest(t, 'ws', 'inet') + }) + t.once('end', function () { + wrtc.close() + }) +}) diff --git a/test/server.js b/test/server.js index cc679eb1..73e9fe66 100644 --- a/test/server.js +++ b/test/server.js @@ -2,12 +2,9 @@ var Buffer = require('safe-buffer').Buffer var Client = require('../') var common = require('./common') var test = require('tape') -var wrtc = require('electron-webrtc')() +var electronWebrtc = require('electron-webrtc') -var wrtcReady = false -wrtc.electronDaemon.once('ready', function () { - wrtcReady = true -}) +var wrtc var infoHash = '4cb67059ed6bd08362da625b3ae77f6f4a075705' var peerId = Buffer.from('01234567890123456789') @@ -15,7 +12,7 @@ var peerId2 = Buffer.from('12345678901234567890') var peerId3 = Buffer.from('23456789012345678901') function serverTest (t, serverType, serverFamily) { - t.plan(36) + t.plan(40) var hostname = serverFamily === 'inet6' ? '[::1]' @@ -25,8 +22,7 @@ function serverTest (t, serverType, serverFamily) { : '127.0.0.1' var opts = { - serverType: serverType, - peersCacheLength: 2 + serverType: serverType } common.createServer(t, opts, function (server) { @@ -142,16 +138,23 @@ function serverTest (t, serverType, serverFamily) { t.equal(data.incomplete, 1) client2.destroy(function () { + t.pass('client2 destroyed') client3.stop() client3.once('update', function (data) { t.equal(data.announce, announceUrl) t.equal(data.complete, 1) t.equal(data.incomplete, 0) + client1.destroy(function () { + t.pass('client1 destroyed') + }) + client3.destroy(function () { - client1.destroy(function () { - server.close() - }) + t.pass('client3 destroyed') + }) + + server.close(function () { + t.pass('server destroyed') }) }) }) @@ -178,15 +181,11 @@ test('udp server', function (t) { }) test('ws server', function (t) { - if (wrtcReady) { - runTest() - } else { - wrtc.electronDaemon.once('ready', runTest) - } - function runTest () { - t.once('end', function () { - wrtc.close() - }) + wrtc = electronWebrtc() + wrtc.electronDaemon.once('ready', function () { serverTest(t, 'ws', 'inet') - } + }) + t.once('end', function () { + wrtc.close() + }) })