Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions client.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ function Client (opts) {
// See: https://github.com/feross/webtorrent-hybrid/issues/46
self._wrtc = typeof opts.wrtc === 'function' ? opts.wrtc() : opts.wrtc

// Use a socket pool, so WebSocket tracker clients share WebSocket objects for
// the same server. In practice, WebSockets are pretty slow to establish, so this
// gives a nice performance boost, and saves browser resources.
self._socketPool = {}

var announce = typeof opts.announce === 'string'
? [ opts.announce ]
: opts.announce == null ? [] : opts.announce
Expand Down
21 changes: 8 additions & 13 deletions lib/client/websocket-tracker.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@ var Socket = require('simple-websocket')
var common = require('../common')
var Tracker = require('./tracker')

// Use a socket pool, so tracker clients share WebSocket objects for the same server.
// In practice, WebSockets are pretty slow to establish, so this gives a nice performance
// boost, and saves browser resources.
var socketPool = {}

var RECONNECT_MINIMUM = 15 * 1000
var RECONNECT_MAXIMUM = 30 * 60 * 1000
var RECONNECT_VARIANCE = 30 * 1000
Expand Down Expand Up @@ -129,15 +124,15 @@ WebSocketTracker.prototype.destroy = function (cb) {
self._onSocketDataBound = null
self._onSocketCloseBound = null

if (socketPool[self.announceUrl]) {
socketPool[self.announceUrl].consumers -= 1
if (self.client._socketPool[self.announceUrl]) {
self.client._socketPool[self.announceUrl].consumers -= 1
}

// Other instances are using the socket, so there's nothing left to do here
if (socketPool[self.announceUrl].consumers > 0) return cb()
if (self.client._socketPool[self.announceUrl].consumers > 0) return cb()

var socket = socketPool[self.announceUrl]
delete socketPool[self.announceUrl]
var socket = self.client._socketPool[self.announceUrl]
delete self.client._socketPool[self.announceUrl]
socket.on('error', noop) // ignore all future errors
socket.once('close', cb)

Expand Down Expand Up @@ -182,11 +177,11 @@ WebSocketTracker.prototype._openSocket = function () {
self._onSocketClose()
}

self.socket = socketPool[self.announceUrl]
self.socket = self.client._socketPool[self.announceUrl]
if (self.socket) {
socketPool[self.announceUrl].consumers += 1
self.client._socketPool[self.announceUrl].consumers += 1
} else {
self.socket = socketPool[self.announceUrl] = new Socket(self.announceUrl)
self.socket = self.client._socketPool[self.announceUrl] = new Socket(self.announceUrl)
self.socket.consumers = 1
self.socket.once('connect', self._onSocketConnectBound)
}
Expand Down
2 changes: 1 addition & 1 deletion lib/server/parse-http.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ function parseHttpRequest (req, opts) {
if (!params.port) throw new Error('invalid port')

params.left = Number(params.left)
if (isNaN(params.left)) params.left = Infinity
if (Number.isNaN(params.left)) params.left = Infinity

params.compact = Number(params.compact) || 0
params.numwant = Math.min(
Expand Down
4 changes: 3 additions & 1 deletion lib/server/parse-websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ function parseWebSocketRequest (socket, opts, params) {
params.to_peer_id = common.binaryToHex(params.to_peer_id)
}

params.left = Number(params.left) || Infinity
params.left = Number(params.left)
if (Number.isNaN(params.left)) params.left = Infinity

params.numwant = Math.min(
Number(params.offers && params.offers.length) || 0, // no default - explicit only
common.MAX_ANNOUNCE_PEERS
Expand Down
47 changes: 40 additions & 7 deletions lib/server/swarm.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,37 @@ var randomIterate = require('random-iterate')
// Regard this as the default implementation of an interface that you
// need to support when overriding Server.createSwarm() and Server.getSwarm()
function Swarm (infoHash, server) {
this.peers = new LRU({
var self = this
self.infoHash = infoHash
self.complete = 0
self.incomplete = 0

self.peers = new LRU({
max: server.peersCacheLength || 1000,
maxAge: server.peersCacheTtl || 900000 // 900 000ms = 15 minutes
maxAge: server.peersCacheTtl || 20 * 60 * 1000 // 20 minutes
})

// When a peer is evicted from the LRU store, send a synthetic 'stopped' event
// so the stats get updated correctly.
self.peers.on('evict', function (data) {
var peer = data.value
var params = {
type: peer.type,
event: 'stopped',
numwant: 0,
peer_id: peer.peerId
}
self._onAnnounceStopped(params, peer, peer.peerId)

// When a websocket peer is evicted, and it's not in any other swarms, close
// the websocket to conserve server resources.
if (peer.socket && peer.socket.infoHashes.length === 0) {
try {
peer.socket.close()
peer.socket = null
} catch (err) {}
}
})
this.complete = 0
this.incomplete = 0
}

Swarm.prototype.announce = function (params, cb) {
Expand Down Expand Up @@ -73,6 +98,14 @@ Swarm.prototype._onAnnounceStopped = function (params, peer, id) {

if (peer.complete) this.complete -= 1
else this.incomplete -= 1

// If it's a websocket, remove this swarm's infohash from the list of active
// swarms that this peer is participating in.
if (peer.socket) {
var index = peer.socket.infoHashes.indexOf(this.infoHash)
peer.socket.infoHashes.splice(index, 1)
}

this.peers.remove(id)
}

Expand All @@ -82,8 +115,8 @@ Swarm.prototype._onAnnounceCompleted = function (params, peer, id) {
return this._onAnnounceStarted(params, peer, id) // treat as a start
}
if (peer.complete) {
debug('unexpected `completed` event from peer that is already marked as completed')
return // do nothing
debug('unexpected `completed` event from peer that is already completed')
return this._onAnnounceUpdate(params, peer, id) // treat as an update
}

this.complete += 1
Expand All @@ -102,8 +135,8 @@ Swarm.prototype._onAnnounceUpdate = function (params, peer, id) {
this.complete += 1
this.incomplete -= 1
peer.complete = true
this.peers.set(id, peer)
}
this.peers.set(id, peer)
}

Swarm.prototype._getPeers = function (numwant, ownPeerId, isWebRTC) {
Expand Down
106 changes: 53 additions & 53 deletions server.js
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,59 @@ Server.prototype._onWebSocketRequest = function (socket, opts, params) {
})
}

Server.prototype._onWebSocketSend = function (socket, err) {
var self = this
if (err) self._onWebSocketError(socket, err)
}

Server.prototype._onWebSocketClose = function (socket) {
var self = this
debug('websocket close %s', socket.peerId)

if (socket.peerId) {
socket.infoHashes.slice(0).forEach(function (infoHash) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.slice(0) does nothing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to clone the array before looping over it, otherwise it behaves incorrectly because we're mutating it inside the loop here: https://github.com/feross/bittorrent-tracker/pull/198/files/ebe3c218feab4f5a6774d5092531c6b2e55fe608#diff-831f46884145ede920506b29d5681ab2R104

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var swarm = self.torrents[infoHash]
if (swarm) {
swarm.announce({
type: 'ws',
event: 'stopped',
numwant: 0,
peer_id: socket.peerId
}, noop)
}
})
}

// ignore all future errors
socket.onSend = noop
socket.on('error', noop)

socket.peerId = null
socket.infoHashes = null

if (typeof socket.onMessageBound === 'function') {
socket.removeListener('message', socket.onMessageBound)
}
socket.onMessageBound = null

if (typeof socket.onErrorBound === 'function') {
socket.removeListener('error', socket.onErrorBound)
}
socket.onErrorBound = null

if (typeof socket.onCloseBound === 'function') {
socket.removeListener('close', socket.onCloseBound)
}
socket.onCloseBound = null
}

Server.prototype._onWebSocketError = function (socket, err) {
var self = this
debug('websocket error %s', err.message || err)
self.emit('warning', err)
self._onWebSocketClose(socket)
}

Server.prototype._onRequest = function (params, cb) {
var self = this
if (params && params.action === common.ACTIONS.CONNECT) {
Expand Down Expand Up @@ -757,59 +810,6 @@ function makeUdpPacket (params) {
return packet
}

Server.prototype._onWebSocketSend = function (socket, err) {
var self = this
if (err) self._onWebSocketError(socket, err)
}

Server.prototype._onWebSocketClose = function (socket) {
var self = this
debug('websocket close %s', socket.peerId)

if (socket.peerId) {
socket.infoHashes.forEach(function (infoHash) {
var swarm = self.torrents[infoHash]
if (swarm) {
swarm.announce({
type: 'ws',
event: 'stopped',
numwant: 0,
peer_id: socket.peerId
}, noop)
}
})
}

// ignore all future errors
socket.onSend = noop
socket.on('error', noop)

socket.peerId = null
socket.infoHashes = null

if (typeof socket.onMessageBound === 'function') {
socket.removeListener('message', socket.onMessageBound)
}
socket.onMessageBound = null

if (typeof socket.onErrorBound === 'function') {
socket.removeListener('error', socket.onErrorBound)
}
socket.onErrorBound = null

if (typeof socket.onCloseBound === 'function') {
socket.removeListener('close', socket.onCloseBound)
}
socket.onCloseBound = null
}

Server.prototype._onWebSocketError = function (socket, err) {
var self = this
debug('websocket error %s', err.message || err)
self.emit('warning', err)
self._onWebSocketClose(socket)
}

function toNumber (x) {
x = Number(x)
return x >= 0 ? x : false
Expand Down
Loading