Skip to content

Commit a469740

Browse files
authored
Merge pull request #198 from feross/fix-196
Fix stats and leaked websockets
2 parents da7d732 + 3f3db7d commit a469740

File tree

8 files changed

+256
-96
lines changed

8 files changed

+256
-96
lines changed

client.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ function Client (opts) {
6969
// See: https://github.com/feross/webtorrent-hybrid/issues/46
7070
self._wrtc = typeof opts.wrtc === 'function' ? opts.wrtc() : opts.wrtc
7171

72+
// Use a socket pool, so WebSocket tracker clients share WebSocket objects for
73+
// the same server. In practice, WebSockets are pretty slow to establish, so this
74+
// gives a nice performance boost, and saves browser resources.
75+
self._socketPool = {}
76+
7277
var announce = typeof opts.announce === 'string'
7378
? [ opts.announce ]
7479
: opts.announce == null ? [] : opts.announce

lib/client/websocket-tracker.js

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,6 @@ var Socket = require('simple-websocket')
1010
var common = require('../common')
1111
var Tracker = require('./tracker')
1212

13-
// Use a socket pool, so tracker clients share WebSocket objects for the same server.
14-
// In practice, WebSockets are pretty slow to establish, so this gives a nice performance
15-
// boost, and saves browser resources.
16-
var socketPool = {}
17-
1813
var RECONNECT_MINIMUM = 15 * 1000
1914
var RECONNECT_MAXIMUM = 30 * 60 * 1000
2015
var RECONNECT_VARIANCE = 30 * 1000
@@ -129,15 +124,15 @@ WebSocketTracker.prototype.destroy = function (cb) {
129124
self._onSocketDataBound = null
130125
self._onSocketCloseBound = null
131126

132-
if (socketPool[self.announceUrl]) {
133-
socketPool[self.announceUrl].consumers -= 1
127+
if (self.client._socketPool[self.announceUrl]) {
128+
self.client._socketPool[self.announceUrl].consumers -= 1
134129
}
135130

136131
// Other instances are using the socket, so there's nothing left to do here
137-
if (socketPool[self.announceUrl].consumers > 0) return cb()
132+
if (self.client._socketPool[self.announceUrl].consumers > 0) return cb()
138133

139-
var socket = socketPool[self.announceUrl]
140-
delete socketPool[self.announceUrl]
134+
var socket = self.client._socketPool[self.announceUrl]
135+
delete self.client._socketPool[self.announceUrl]
141136
socket.on('error', noop) // ignore all future errors
142137
socket.once('close', cb)
143138

@@ -182,11 +177,11 @@ WebSocketTracker.prototype._openSocket = function () {
182177
self._onSocketClose()
183178
}
184179

185-
self.socket = socketPool[self.announceUrl]
180+
self.socket = self.client._socketPool[self.announceUrl]
186181
if (self.socket) {
187-
socketPool[self.announceUrl].consumers += 1
182+
self.client._socketPool[self.announceUrl].consumers += 1
188183
} else {
189-
self.socket = socketPool[self.announceUrl] = new Socket(self.announceUrl)
184+
self.socket = self.client._socketPool[self.announceUrl] = new Socket(self.announceUrl)
190185
self.socket.consumers = 1
191186
self.socket.once('connect', self._onSocketConnectBound)
192187
}

lib/server/parse-http.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ function parseHttpRequest (req, opts) {
2525
if (!params.port) throw new Error('invalid port')
2626

2727
params.left = Number(params.left)
28-
if (isNaN(params.left)) params.left = Infinity
28+
if (Number.isNaN(params.left)) params.left = Infinity
2929

3030
params.compact = Number(params.compact) || 0
3131
params.numwant = Math.min(

lib/server/parse-websocket.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ function parseWebSocketRequest (socket, opts, params) {
2828
params.to_peer_id = common.binaryToHex(params.to_peer_id)
2929
}
3030

31-
params.left = Number(params.left) || Infinity
31+
params.left = Number(params.left)
32+
if (Number.isNaN(params.left)) params.left = Infinity
33+
3234
params.numwant = Math.min(
3335
Number(params.offers && params.offers.length) || 0, // no default - explicit only
3436
common.MAX_ANNOUNCE_PEERS

lib/server/swarm.js

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,37 @@ var randomIterate = require('random-iterate')
77
// Regard this as the default implementation of an interface that you
88
// need to support when overriding Server.createSwarm() and Server.getSwarm()
99
function Swarm (infoHash, server) {
10-
this.peers = new LRU({
10+
var self = this
11+
self.infoHash = infoHash
12+
self.complete = 0
13+
self.incomplete = 0
14+
15+
self.peers = new LRU({
1116
max: server.peersCacheLength || 1000,
12-
maxAge: server.peersCacheTtl || 900000 // 900 000ms = 15 minutes
17+
maxAge: server.peersCacheTtl || 20 * 60 * 1000 // 20 minutes
18+
})
19+
20+
// When a peer is evicted from the LRU store, send a synthetic 'stopped' event
21+
// so the stats get updated correctly.
22+
self.peers.on('evict', function (data) {
23+
var peer = data.value
24+
var params = {
25+
type: peer.type,
26+
event: 'stopped',
27+
numwant: 0,
28+
peer_id: peer.peerId
29+
}
30+
self._onAnnounceStopped(params, peer, peer.peerId)
31+
32+
// When a websocket peer is evicted, and it's not in any other swarms, close
33+
// the websocket to conserve server resources.
34+
if (peer.socket && peer.socket.infoHashes.length === 0) {
35+
try {
36+
peer.socket.close()
37+
peer.socket = null
38+
} catch (err) {}
39+
}
1340
})
14-
this.complete = 0
15-
this.incomplete = 0
1641
}
1742

1843
Swarm.prototype.announce = function (params, cb) {
@@ -73,6 +98,14 @@ Swarm.prototype._onAnnounceStopped = function (params, peer, id) {
7398

7499
if (peer.complete) this.complete -= 1
75100
else this.incomplete -= 1
101+
102+
// If it's a websocket, remove this swarm's infohash from the list of active
103+
// swarms that this peer is participating in.
104+
if (peer.socket) {
105+
var index = peer.socket.infoHashes.indexOf(this.infoHash)
106+
peer.socket.infoHashes.splice(index, 1)
107+
}
108+
76109
this.peers.remove(id)
77110
}
78111

@@ -82,8 +115,8 @@ Swarm.prototype._onAnnounceCompleted = function (params, peer, id) {
82115
return this._onAnnounceStarted(params, peer, id) // treat as a start
83116
}
84117
if (peer.complete) {
85-
debug('unexpected `completed` event from peer that is already marked as completed')
86-
return // do nothing
118+
debug('unexpected `completed` event from peer that is already completed')
119+
return this._onAnnounceUpdate(params, peer, id) // treat as an update
87120
}
88121

89122
this.complete += 1
@@ -102,8 +135,8 @@ Swarm.prototype._onAnnounceUpdate = function (params, peer, id) {
102135
this.complete += 1
103136
this.incomplete -= 1
104137
peer.complete = true
105-
this.peers.set(id, peer)
106138
}
139+
this.peers.set(id, peer)
107140
}
108141

109142
Swarm.prototype._getPeers = function (numwant, ownPeerId, isWebRTC) {

server.js

Lines changed: 53 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,59 @@ Server.prototype._onWebSocketRequest = function (socket, opts, params) {
579579
})
580580
}
581581

582+
Server.prototype._onWebSocketSend = function (socket, err) {
583+
var self = this
584+
if (err) self._onWebSocketError(socket, err)
585+
}
586+
587+
Server.prototype._onWebSocketClose = function (socket) {
588+
var self = this
589+
debug('websocket close %s', socket.peerId)
590+
591+
if (socket.peerId) {
592+
socket.infoHashes.slice(0).forEach(function (infoHash) {
593+
var swarm = self.torrents[infoHash]
594+
if (swarm) {
595+
swarm.announce({
596+
type: 'ws',
597+
event: 'stopped',
598+
numwant: 0,
599+
peer_id: socket.peerId
600+
}, noop)
601+
}
602+
})
603+
}
604+
605+
// ignore all future errors
606+
socket.onSend = noop
607+
socket.on('error', noop)
608+
609+
socket.peerId = null
610+
socket.infoHashes = null
611+
612+
if (typeof socket.onMessageBound === 'function') {
613+
socket.removeListener('message', socket.onMessageBound)
614+
}
615+
socket.onMessageBound = null
616+
617+
if (typeof socket.onErrorBound === 'function') {
618+
socket.removeListener('error', socket.onErrorBound)
619+
}
620+
socket.onErrorBound = null
621+
622+
if (typeof socket.onCloseBound === 'function') {
623+
socket.removeListener('close', socket.onCloseBound)
624+
}
625+
socket.onCloseBound = null
626+
}
627+
628+
Server.prototype._onWebSocketError = function (socket, err) {
629+
var self = this
630+
debug('websocket error %s', err.message || err)
631+
self.emit('warning', err)
632+
self._onWebSocketClose(socket)
633+
}
634+
582635
Server.prototype._onRequest = function (params, cb) {
583636
var self = this
584637
if (params && params.action === common.ACTIONS.CONNECT) {
@@ -761,59 +814,6 @@ function makeUdpPacket (params) {
761814
return packet
762815
}
763816

764-
Server.prototype._onWebSocketSend = function (socket, err) {
765-
var self = this
766-
if (err) self._onWebSocketError(socket, err)
767-
}
768-
769-
Server.prototype._onWebSocketClose = function (socket) {
770-
var self = this
771-
debug('websocket close %s', socket.peerId)
772-
773-
if (socket.peerId) {
774-
socket.infoHashes.forEach(function (infoHash) {
775-
var swarm = self.torrents[infoHash]
776-
if (swarm) {
777-
swarm.announce({
778-
type: 'ws',
779-
event: 'stopped',
780-
numwant: 0,
781-
peer_id: socket.peerId
782-
}, noop)
783-
}
784-
})
785-
}
786-
787-
// ignore all future errors
788-
socket.onSend = noop
789-
socket.on('error', noop)
790-
791-
socket.peerId = null
792-
socket.infoHashes = null
793-
794-
if (typeof socket.onMessageBound === 'function') {
795-
socket.removeListener('message', socket.onMessageBound)
796-
}
797-
socket.onMessageBound = null
798-
799-
if (typeof socket.onErrorBound === 'function') {
800-
socket.removeListener('error', socket.onErrorBound)
801-
}
802-
socket.onErrorBound = null
803-
804-
if (typeof socket.onCloseBound === 'function') {
805-
socket.removeListener('close', socket.onCloseBound)
806-
}
807-
socket.onCloseBound = null
808-
}
809-
810-
Server.prototype._onWebSocketError = function (socket, err) {
811-
var self = this
812-
debug('websocket error %s', err.message || err)
813-
self.emit('warning', err)
814-
self._onWebSocketClose(socket)
815-
}
816-
817817
function toNumber (x) {
818818
x = Number(x)
819819
return x >= 0 ? x : false

0 commit comments

Comments
 (0)