diff --git a/server.js b/server.js index 4f905699..9d29befb 100644 --- a/server.js +++ b/server.js @@ -1,24 +1,19 @@ -module.exports = Server - -var Buffer = require('safe-buffer').Buffer -var bencode = require('bencode') -var debug = require('debug')('bittorrent-tracker:server') -var dgram = require('dgram') -var EventEmitter = require('events').EventEmitter -var http = require('http') -var inherits = require('inherits') -var peerid = require('bittorrent-peerid') -var series = require('run-series') -var string2compact = require('string2compact') -var WebSocketServer = require('ws').Server - -var common = require('./lib/common') -var Swarm = require('./lib/server/swarm') -var parseHttpRequest = require('./lib/server/parse-http') -var parseUdpRequest = require('./lib/server/parse-udp') -var parseWebSocketRequest = require('./lib/server/parse-websocket') - -inherits(Server, EventEmitter) +const { Buffer } = require('safe-buffer') +const bencode = require('bencode') +const debug = require('debug')('bittorrent-tracker:server') +const dgram = require('dgram') +const EventEmitter = require('events') +const http = require('http') +const peerid = require('bittorrent-peerid') +const series = require('run-series') +const string2compact = require('string2compact') +const WebSocketServer = require('ws').Server + +const common = require('./lib/common') +const Swarm = require('./lib/server/swarm') +const parseHttpRequest = require('./lib/server/parse-http') +const parseUdpRequest = require('./lib/server/parse-udp') +const parseWebSocketRequest = require('./lib/server/parse-websocket') /** * BitTorrent tracker server. @@ -36,745 +31,724 @@ inherits(Server, EventEmitter) * @param {boolean} opts.stats enable web-based statistics? (default: true) * @param {function} opts.filter black/whitelist fn for disallowing/allowing torrents */ -function Server (opts) { - var self = this - if (!(self instanceof Server)) return new Server(opts) - EventEmitter.call(self) - if (!opts) opts = {} - - debug('new server %s', JSON.stringify(opts)) - - self.intervalMs = opts.interval - ? opts.interval - : 10 * 60 * 1000 // 10 min - - self._trustProxy = !!opts.trustProxy - if (typeof opts.filter === 'function') self._filter = opts.filter - - self.peersCacheLength = opts.peersCacheLength - self.peersCacheTtl = opts.peersCacheTtl - - self._listenCalled = false - self.listening = false - self.destroyed = false - self.torrents = {} - - self.http = null - self.udp4 = null - self.udp6 = null - self.ws = null - - // start an http tracker unless the user explictly says no - if (opts.http !== false) { - self.http = http.createServer() - self.http.on('error', function (err) { self._onError(err) }) - self.http.on('listening', onListening) - - // Add default http request handler on next tick to give user the chance to add - // their own handler first. Handle requests untouched by user's handler. - process.nextTick(function () { - self.http.on('request', function (req, res) { - if (res.headersSent) return - self.onHttpRequest(req, res) - }) - }) - } +class Server extends EventEmitter { + constructor (opts = {}) { + super() + debug('new server %s', JSON.stringify(opts)) - // start a udp tracker unless the user explicitly says no - if (opts.udp !== false) { - var isNode10 = /^v0.10./.test(process.version) - - self.udp4 = self.udp = dgram.createSocket( - isNode10 ? 'udp4' : { type: 'udp4', reuseAddr: true } - ) - self.udp4.on('message', function (msg, rinfo) { self.onUdpRequest(msg, rinfo) }) - self.udp4.on('error', function (err) { self._onError(err) }) - self.udp4.on('listening', onListening) - - self.udp6 = dgram.createSocket( - isNode10 ? 'udp6' : { type: 'udp6', reuseAddr: true } - ) - self.udp6.on('message', function (msg, rinfo) { self.onUdpRequest(msg, rinfo) }) - self.udp6.on('error', function (err) { self._onError(err) }) - self.udp6.on('listening', onListening) - } + this.intervalMs = opts.interval + ? opts.interval + : 10 * 60 * 1000 // 10 min - // start a websocket tracker (for WebTorrent) unless the user explicitly says no - if (opts.ws !== false) { - if (!self.http) { - self.http = http.createServer() - self.http.on('error', function (err) { self._onError(err) }) - self.http.on('listening', onListening) + this._trustProxy = !!opts.trustProxy + if (typeof opts.filter === 'function') this._filter = opts.filter + + this.peersCacheLength = opts.peersCacheLength + this.peersCacheTtl = opts.peersCacheTtl + + this._listenCalled = false + this.listening = false + this.destroyed = false + this.torrents = {} + + this.http = null + this.udp4 = null + this.udp6 = null + this.ws = null + + // start an http tracker unless the user explictly says no + if (opts.http !== false) { + this.http = http.createServer() + this.http.on('error', err => { this._onError(err) }) + this.http.on('listening', onListening) // Add default http request handler on next tick to give user the chance to add // their own handler first. Handle requests untouched by user's handler. - process.nextTick(function () { - self.http.on('request', function (req, res) { + process.nextTick(() => { + this.http.on('request', (req, res) => { if (res.headersSent) return - // For websocket trackers, we only need to handle the UPGRADE http method. - // Return 404 for all other request types. - res.statusCode = 404 - res.end('404 Not Found') + this.onHttpRequest(req, res) }) }) } - self.ws = new WebSocketServer({ - server: self.http, - perMessageDeflate: false, - clientTracking: false - }) - self.ws.address = function () { - return self.http.address() + + // start a udp tracker unless the user explicitly says no + if (opts.udp !== false) { + const isNode10 = /^v0.10./.test(process.version) + + this.udp4 = this.udp = dgram.createSocket( + isNode10 ? 'udp4' : { type: 'udp4', reuseAddr: true } + ) + this.udp4.on('message', (msg, rinfo) => { this.onUdpRequest(msg, rinfo) }) + this.udp4.on('error', err => { this._onError(err) }) + this.udp4.on('listening', onListening) + + this.udp6 = dgram.createSocket( + isNode10 ? 'udp6' : { type: 'udp6', reuseAddr: true } + ) + this.udp6.on('message', (msg, rinfo) => { this.onUdpRequest(msg, rinfo) }) + this.udp6.on('error', err => { this._onError(err) }) + this.udp6.on('listening', onListening) } - self.ws.on('error', function (err) { self._onError(err) }) - self.ws.on('connection', function (socket, req) { - // Note: socket.upgradeReq was removed in ws@3.0.0, so re-add it. - // https://github.com/websockets/ws/pull/1099 - socket.upgradeReq = req - self.onWebSocketConnection(socket) - }) - } - if (opts.stats !== false) { - if (!self.http) { - self.http = http.createServer() - self.http.on('error', function (err) { self._onError(err) }) - self.http.on('listening', onListening) + // start a websocket tracker (for WebTorrent) unless the user explicitly says no + if (opts.ws !== false) { + if (!this.http) { + this.http = http.createServer() + this.http.on('error', err => { this._onError(err) }) + this.http.on('listening', onListening) + + // Add default http request handler on next tick to give user the chance to add + // their own handler first. Handle requests untouched by user's handler. + process.nextTick(() => { + this.http.on('request', (req, res) => { + if (res.headersSent) return + // For websocket trackers, we only need to handle the UPGRADE http method. + // Return 404 for all other request types. + res.statusCode = 404 + res.end('404 Not Found') + }) + }) + } + this.ws = new WebSocketServer({ + server: this.http, + perMessageDeflate: false, + clientTracking: false + }) + this.ws.address = () => { + return this.http.address() + } + this.ws.on('error', err => { this._onError(err) }) + this.ws.on('connection', (socket, req) => { + // Note: socket.upgradeReq was removed in ws@3.0.0, so re-add it. + // https://github.com/websockets/ws/pull/1099 + socket.upgradeReq = req + this.onWebSocketConnection(socket) + }) } - // Http handler for '/stats' route - self.http.on('request', function (req, res) { - if (res.headersSent) return + if (opts.stats !== false) { + if (!this.http) { + this.http = http.createServer() + this.http.on('error', err => { this._onError(err) }) + this.http.on('listening', onListening) + } + + // Http handler for '/stats' route + this.http.on('request', (req, res) => { + if (res.headersSent) return - var infoHashes = Object.keys(self.torrents) - var activeTorrents = 0 - var allPeers = {} + const infoHashes = Object.keys(this.torrents) + let activeTorrents = 0 + const allPeers = {} - function countPeers (filterFunction) { - var count = 0 - var key + function countPeers (filterFunction) { + let count = 0 + let key - for (key in allPeers) { - if (allPeers.hasOwnProperty(key) && filterFunction(allPeers[key])) { - count++ + for (key in allPeers) { + if (allPeers.hasOwnProperty(key) && filterFunction(allPeers[key])) { + count++ + } } - } - return count - } + return count + } - function groupByClient () { - var clients = {} - for (var key in allPeers) { - if (allPeers.hasOwnProperty(key)) { - var peer = allPeers[key] + function groupByClient () { + const clients = {} + for (const key in allPeers) { + if (allPeers.hasOwnProperty(key)) { + const peer = allPeers[key] - if (!clients[peer.client.client]) { - clients[peer.client.client] = {} - } - var client = clients[peer.client.client] - // If the client is not known show 8 chars from peerId as version - var version = peer.client.version || Buffer.from(peer.peerId, 'hex').toString().substring(0, 8) - if (!client[version]) { - client[version] = 0 + if (!clients[peer.client.client]) { + clients[peer.client.client] = {} + } + const client = clients[peer.client.client] + // If the client is not known show 8 chars from peerId as version + const version = peer.client.version || Buffer.from(peer.peerId, 'hex').toString().substring(0, 8) + if (!client[version]) { + client[version] = 0 + } + client[version]++ } - client[version]++ } + return clients } - return clients - } - function printClients (clients) { - var html = '' - return html - } - if (req.method === 'GET' && (req.url === '/stats' || req.url === '/stats.json')) { - infoHashes.forEach(function (infoHash) { - var peers = self.torrents[infoHash].peers - var keys = peers.keys - if (keys.length > 0) activeTorrents++ - - keys.forEach(function (peerId) { - // Don't mark the peer as most recently used for stats - var peer = peers.peek(peerId) - if (peer == null) return // peers.peek() can evict the peer - - if (!allPeers.hasOwnProperty(peerId)) { - allPeers[peerId] = { - ipv4: false, - ipv6: false, - seeder: false, - leecher: false + if (req.method === 'GET' && (req.url === '/stats' || req.url === '/stats.json')) { + infoHashes.forEach(infoHash => { + const peers = this.torrents[infoHash].peers + const keys = peers.keys + if (keys.length > 0) activeTorrents++ + + keys.forEach(peerId => { + // Don't mark the peer as most recently used for stats + const peer = peers.peek(peerId) + if (peer == null) return // peers.peek() can evict the peer + + if (!allPeers.hasOwnProperty(peerId)) { + allPeers[peerId] = { + ipv4: false, + ipv6: false, + seeder: false, + leecher: false + } } - } - if (peer.ip.indexOf(':') >= 0) { - allPeers[peerId].ipv6 = true - } else { - allPeers[peerId].ipv4 = true - } + if (peer.ip.includes(':')) { + allPeers[peerId].ipv6 = true + } else { + allPeers[peerId].ipv4 = true + } - if (peer.complete) { - allPeers[peerId].seeder = true - } else { - allPeers[peerId].leecher = true - } + if (peer.complete) { + allPeers[peerId].seeder = true + } else { + allPeers[peerId].leecher = true + } - allPeers[peerId].peerId = peer.peerId - allPeers[peerId].client = peerid(peer.peerId) + allPeers[peerId].peerId = peer.peerId + allPeers[peerId].client = peerid(peer.peerId) + }) }) - }) - var isSeederOnly = function (peer) { return peer.seeder && peer.leecher === false } - var isLeecherOnly = function (peer) { return peer.leecher && peer.seeder === false } - var isSeederAndLeecher = function (peer) { return peer.seeder && peer.leecher } - var isIPv4 = function (peer) { return peer.ipv4 } - var isIPv6 = function (peer) { return peer.ipv6 } - - var stats = { - torrents: infoHashes.length, - activeTorrents: activeTorrents, - peersAll: Object.keys(allPeers).length, - peersSeederOnly: countPeers(isSeederOnly), - peersLeecherOnly: countPeers(isLeecherOnly), - peersSeederAndLeecher: countPeers(isSeederAndLeecher), - peersIPv4: countPeers(isIPv4), - peersIPv6: countPeers(isIPv6), - clients: groupByClient() - } + const isSeederOnly = peer => { return peer.seeder && peer.leecher === false } + const isLeecherOnly = peer => { return peer.leecher && peer.seeder === false } + const isSeederAndLeecher = peer => { return peer.seeder && peer.leecher } + const isIPv4 = peer => { return peer.ipv4 } + const isIPv6 = peer => { return peer.ipv6 } + + const stats = { + torrents: infoHashes.length, + activeTorrents, + peersAll: Object.keys(allPeers).length, + peersSeederOnly: countPeers(isSeederOnly), + peersLeecherOnly: countPeers(isLeecherOnly), + peersSeederAndLeecher: countPeers(isSeederAndLeecher), + peersIPv4: countPeers(isIPv4), + peersIPv6: countPeers(isIPv6), + clients: groupByClient() + } - if (req.url === '/stats.json' || req.headers['accept'] === 'application/json') { - res.write(JSON.stringify(stats)) - res.end() - } else if (req.url === '/stats') { - res.end('

' + stats.torrents + ' torrents (' + stats.activeTorrents + ' active)

\n' + - '

Connected Peers: ' + stats.peersAll + '

\n' + - '

Peers Seeding Only: ' + stats.peersSeederOnly + '

\n' + - '

Peers Leeching Only: ' + stats.peersLeecherOnly + '

\n' + - '

Peers Seeding & Leeching: ' + stats.peersSeederAndLeecher + '

\n' + - '

IPv4 Peers: ' + stats.peersIPv4 + '

\n' + - '

IPv6 Peers: ' + stats.peersIPv6 + '

\n' + - '

Clients:

\n' + - printClients(stats.clients) - ) + if (req.url === '/stats.json' || req.headers['accept'] === 'application/json') { + res.write(JSON.stringify(stats)) + res.end() + } else if (req.url === '/stats') { + res.end(` +

${stats.torrents} torrents (${stats.activeTorrents} active)

+

Connected Peers: ${stats.peersAll}

+

Peers Seeding Only: ${stats.peersSeederOnly}

+

Peers Leeching Only: ${stats.peersLeecherOnly}

+

Peers Seeding & Leeching: ${stats.peersSeederAndLeecher}

+

IPv4 Peers: ${stats.peersIPv4}

+

IPv6 Peers: ${stats.peersIPv6}

+

Clients:

+ ${printClients(stats.clients)} + `.replace(/^\s+/gm, '')) // trim left + } } + }) + } + + let num = !!this.http + !!this.udp4 + !!this.udp6 + const self = this + function onListening () { + num -= 1 + if (num === 0) { + self.listening = true + debug('listening') + self.emit('listening') } - }) + } } - var num = !!self.http + !!self.udp4 + !!self.udp6 - function onListening () { - num -= 1 - if (num === 0) { - self.listening = true - debug('listening') - self.emit('listening') - } + _onError (err) { + this.emit('error', err) } -} -Server.Swarm = Swarm + listen (...args) /* port, hostname, onlistening */{ + if (this._listenCalled || this.listening) throw new Error('server already listening') + this._listenCalled = true -Server.prototype._onError = function (err) { - var self = this - self.emit('error', err) -} + const lastArg = args[args.length - 1] + if (typeof lastArg === 'function') this.once('listening', lastArg) -Server.prototype.listen = function (/* port, hostname, onlistening */) { - var self = this + const port = toNumber(args[0]) || args[0] || 0 + const hostname = typeof args[1] !== 'function' ? args[1] : undefined - if (self._listenCalled || self.listening) throw new Error('server already listening') - self._listenCalled = true + debug('listen (port: %o hostname: %o)', port, hostname) - var lastArg = arguments[arguments.length - 1] - if (typeof lastArg === 'function') self.once('listening', lastArg) + function isObject (obj) { + return typeof obj === 'object' && obj !== null + } - var port = toNumber(arguments[0]) || arguments[0] || 0 - var hostname = typeof arguments[1] !== 'function' ? arguments[1] : undefined + const httpPort = isObject(port) ? (port.http || 0) : port + const udpPort = isObject(port) ? (port.udp || 0) : port - debug('listen (port: %o hostname: %o)', port, hostname) + // binding to :: only receives IPv4 connections if the bindv6only sysctl is set 0, + // which is the default on many operating systems + const httpHostname = isObject(hostname) ? hostname.http : hostname + const udp4Hostname = isObject(hostname) ? hostname.udp : hostname + const udp6Hostname = isObject(hostname) ? hostname.udp6 : hostname - function isObject (obj) { - return typeof obj === 'object' && obj !== null + if (this.http) this.http.listen(httpPort, httpHostname) + if (this.udp4) this.udp4.bind(udpPort, udp4Hostname) + if (this.udp6) this.udp6.bind(udpPort, udp6Hostname) } - var httpPort = isObject(port) ? (port.http || 0) : port - var udpPort = isObject(port) ? (port.udp || 0) : port + close (cb = noop) { + debug('close') - // binding to :: only receives IPv4 connections if the bindv6only sysctl is set 0, - // which is the default on many operating systems - var httpHostname = isObject(hostname) ? hostname.http : hostname - var udp4Hostname = isObject(hostname) ? hostname.udp : hostname - var udp6Hostname = isObject(hostname) ? hostname.udp6 : hostname + this.listening = false + this.destroyed = true - if (self.http) self.http.listen(httpPort, httpHostname) - if (self.udp4) self.udp4.bind(udpPort, udp4Hostname) - if (self.udp6) self.udp6.bind(udpPort, udp6Hostname) -} + if (this.udp4) { + try { + this.udp4.close() + } catch (err) {} + } -Server.prototype.close = function (cb) { - var self = this - if (!cb) cb = noop - debug('close') + if (this.udp6) { + try { + this.udp6.close() + } catch (err) {} + } - self.listening = false - self.destroyed = true + if (this.ws) { + try { + this.ws.close() + } catch (err) {} + } - if (self.udp4) { - try { - self.udp4.close() - } catch (err) {} + if (this.http) this.http.close(cb) + else cb(null) } - if (self.udp6) { - try { - self.udp6.close() - } catch (err) {} - } + createSwarm (infoHash, cb) { + if (Buffer.isBuffer(infoHash)) infoHash = infoHash.toString('hex') - if (self.ws) { - try { - self.ws.close() - } catch (err) {} + process.nextTick(() => { + const swarm = this.torrents[infoHash] = new Server.Swarm(infoHash, this) + cb(null, swarm) + }) } - if (self.http) self.http.close(cb) - else cb(null) -} - -Server.prototype.createSwarm = function (infoHash, cb) { - var self = this - if (Buffer.isBuffer(infoHash)) infoHash = infoHash.toString('hex') - - process.nextTick(function () { - var swarm = self.torrents[infoHash] = new Server.Swarm(infoHash, self) - cb(null, swarm) - }) -} - -Server.prototype.getSwarm = function (infoHash, cb) { - var self = this - if (Buffer.isBuffer(infoHash)) infoHash = infoHash.toString('hex') - - process.nextTick(function () { - cb(null, self.torrents[infoHash]) - }) -} + getSwarm (infoHash, cb) { + if (Buffer.isBuffer(infoHash)) infoHash = infoHash.toString('hex') -Server.prototype.onHttpRequest = function (req, res, opts) { - var self = this - if (!opts) opts = {} - opts.trustProxy = opts.trustProxy || self._trustProxy - - var params - try { - params = parseHttpRequest(req, opts) - params.httpReq = req - params.httpRes = res - } catch (err) { - res.end(bencode.encode({ - 'failure reason': err.message - })) - - // even though it's an error for the client, it's just a warning for the server. - // don't crash the server because a client sent bad data :) - self.emit('warning', err) - return + process.nextTick(() => { + cb(null, this.torrents[infoHash]) + }) } - self._onRequest(params, function (err, response) { - if (err) { - self.emit('warning', err) - response = { - 'failure reason': err.message - } - } - if (self.destroyed) return res.end() + onHttpRequest (req, res, opts = {}) { + opts.trustProxy = opts.trustProxy || this._trustProxy - delete response.action // only needed for UDP encoding - res.end(bencode.encode(response)) + let params + try { + params = parseHttpRequest(req, opts) + params.httpReq = req + params.httpRes = res + } catch (err) { + res.end(bencode.encode({ + 'failure reason': err.message + })) - if (params.action === common.ACTIONS.ANNOUNCE) { - self.emit(common.EVENT_NAMES[params.event], params.addr, params) + // even though it's an error for the client, it's just a warning for the server. + // don't crash the server because a client sent bad data :) + this.emit('warning', err) + return } - }) -} - -Server.prototype.onUdpRequest = function (msg, rinfo) { - var self = this - var params - try { - params = parseUdpRequest(msg, rinfo) - } catch (err) { - self.emit('warning', err) - // Do not reply for parsing errors - return - } - - self._onRequest(params, function (err, response) { - if (err) { - self.emit('warning', err) - response = { - action: common.ACTIONS.ERROR, - 'failure reason': err.message + this._onRequest(params, (err, response) => { + if (err) { + this.emit('warning', err) + response = { + 'failure reason': err.message + } } - } - if (self.destroyed) return + if (this.destroyed) return res.end() - response.transactionId = params.transactionId - response.connectionId = params.connectionId + delete response.action // only needed for UDP encoding + res.end(bencode.encode(response)) - var buf = makeUdpPacket(response) + if (params.action === common.ACTIONS.ANNOUNCE) { + this.emit(common.EVENT_NAMES[params.event], params.addr, params) + } + }) + } + onUdpRequest (msg, rinfo) { + let params try { - var udp = (rinfo.family === 'IPv4') ? self.udp4 : self.udp6 - udp.send(buf, 0, buf.length, rinfo.port, rinfo.address) + params = parseUdpRequest(msg, rinfo) } catch (err) { - self.emit('warning', err) + this.emit('warning', err) + // Do not reply for parsing errors + return } - if (params.action === common.ACTIONS.ANNOUNCE) { - self.emit(common.EVENT_NAMES[params.event], params.addr, params) - } - }) -} + this._onRequest(params, (err, response) => { + if (err) { + this.emit('warning', err) + response = { + action: common.ACTIONS.ERROR, + 'failure reason': err.message + } + } + if (this.destroyed) return -Server.prototype.onWebSocketConnection = function (socket, opts) { - var self = this - if (!opts) opts = {} - opts.trustProxy = opts.trustProxy || self._trustProxy + response.transactionId = params.transactionId + response.connectionId = params.connectionId - socket.peerId = null // as hex - socket.infoHashes = [] // swarms that this socket is participating in - socket.onSend = function (err) { - self._onWebSocketSend(socket, err) - } + const buf = makeUdpPacket(response) - socket.onMessageBound = function (params) { - self._onWebSocketRequest(socket, opts, params) - } - socket.on('message', socket.onMessageBound) + try { + const udp = (rinfo.family === 'IPv4') ? this.udp4 : this.udp6 + udp.send(buf, 0, buf.length, rinfo.port, rinfo.address) + } catch (err) { + this.emit('warning', err) + } - socket.onErrorBound = function (err) { - self._onWebSocketError(socket, err) + if (params.action === common.ACTIONS.ANNOUNCE) { + this.emit(common.EVENT_NAMES[params.event], params.addr, params) + } + }) } - socket.on('error', socket.onErrorBound) - socket.onCloseBound = function () { - self._onWebSocketClose(socket) - } - socket.on('close', socket.onCloseBound) -} + onWebSocketConnection (socket, opts = {}) { + opts.trustProxy = opts.trustProxy || this._trustProxy -Server.prototype._onWebSocketRequest = function (socket, opts, params) { - var self = this + socket.peerId = null // as hex + socket.infoHashes = [] // swarms that this socket is participating in + socket.onSend = err => { + this._onWebSocketSend(socket, err) + } - try { - params = parseWebSocketRequest(socket, opts, params) - } catch (err) { - socket.send(JSON.stringify({ - 'failure reason': err.message - }), socket.onSend) + socket.onMessageBound = params => { + this._onWebSocketRequest(socket, opts, params) + } + socket.on('message', socket.onMessageBound) - // even though it's an error for the client, it's just a warning for the server. - // don't crash the server because a client sent bad data :) - self.emit('warning', err) - return - } + socket.onErrorBound = err => { + this._onWebSocketError(socket, err) + } + socket.on('error', socket.onErrorBound) - if (!socket.peerId) socket.peerId = params.peer_id // as hex + socket.onCloseBound = () => { + this._onWebSocketClose(socket) + } + socket.on('close', socket.onCloseBound) + } - self._onRequest(params, function (err, response) { - if (self.destroyed || socket.destroyed) return - if (err) { + _onWebSocketRequest (socket, opts, params) { + try { + params = parseWebSocketRequest(socket, opts, params) + } catch (err) { socket.send(JSON.stringify({ - action: params.action === common.ACTIONS.ANNOUNCE ? 'announce' : 'scrape', - 'failure reason': err.message, - info_hash: common.hexToBinary(params.info_hash) + 'failure reason': err.message }), socket.onSend) - self.emit('warning', err) + // even though it's an error for the client, it's just a warning for the server. + // don't crash the server because a client sent bad data :) + this.emit('warning', err) return } - response.action = params.action === common.ACTIONS.ANNOUNCE ? 'announce' : 'scrape' + if (!socket.peerId) socket.peerId = params.peer_id // as hex - var peers - if (response.action === 'announce') { - peers = response.peers - delete response.peers + this._onRequest(params, (err, response) => { + if (this.destroyed || socket.destroyed) return + if (err) { + socket.send(JSON.stringify({ + action: params.action === common.ACTIONS.ANNOUNCE ? 'announce' : 'scrape', + 'failure reason': err.message, + info_hash: common.hexToBinary(params.info_hash) + }), socket.onSend) - if (socket.infoHashes.indexOf(params.info_hash) === -1) { - socket.infoHashes.push(params.info_hash) + this.emit('warning', err) + return } - response.info_hash = common.hexToBinary(params.info_hash) + response.action = params.action === common.ACTIONS.ANNOUNCE ? 'announce' : 'scrape' - // WebSocket tracker should have a shorter interval – default: 2 minutes - response.interval = Math.ceil(self.intervalMs / 1000 / 5) - } + let peers + if (response.action === 'announce') { + peers = response.peers + delete response.peers - // Skip sending update back for 'answer' announce messages – not needed - if (!params.answer) { - socket.send(JSON.stringify(response), socket.onSend) - debug('sent response %s to %s', JSON.stringify(response), params.peer_id) - } - - if (Array.isArray(params.offers)) { - debug('got %s offers from %s', params.offers.length, params.peer_id) - debug('got %s peers from swarm %s', peers.length, params.info_hash) - peers.forEach(function (peer, i) { - peer.socket.send(JSON.stringify({ - action: 'announce', - offer: params.offers[i].offer, - offer_id: params.offers[i].offer_id, - peer_id: common.hexToBinary(params.peer_id), - info_hash: common.hexToBinary(params.info_hash) - }), peer.socket.onSend) - debug('sent offer to %s from %s', peer.peerId, params.peer_id) - }) - } + if (!socket.infoHashes.includes(params.info_hash)) { + socket.infoHashes.push(params.info_hash) + } - if (params.answer) { - debug('got answer %s from %s', JSON.stringify(params.answer), params.peer_id) + response.info_hash = common.hexToBinary(params.info_hash) - self.getSwarm(params.info_hash, function (err, swarm) { - if (self.destroyed) return - if (err) return self.emit('warning', err) - if (!swarm) { - return self.emit('warning', new Error('no swarm with that `info_hash`')) - } - // Mark the destination peer as recently used in cache - var toPeer = swarm.peers.get(params.to_peer_id) - if (!toPeer) { - return self.emit('warning', new Error('no peer with that `to_peer_id`')) - } + // WebSocket tracker should have a shorter interval – default: 2 minutes + response.interval = Math.ceil(this.intervalMs / 1000 / 5) + } - toPeer.socket.send(JSON.stringify({ - action: 'announce', - answer: params.answer, - offer_id: params.offer_id, - peer_id: common.hexToBinary(params.peer_id), - info_hash: common.hexToBinary(params.info_hash) - }), toPeer.socket.onSend) - debug('sent answer to %s from %s', toPeer.peerId, params.peer_id) + // Skip sending update back for 'answer' announce messages – not needed + if (!params.answer) { + socket.send(JSON.stringify(response), socket.onSend) + debug('sent response %s to %s', JSON.stringify(response), params.peer_id) + } - done() - }) - } else { - done() - } + if (Array.isArray(params.offers)) { + debug('got %s offers from %s', params.offers.length, params.peer_id) + debug('got %s peers from swarm %s', peers.length, params.info_hash) + peers.forEach((peer, i) => { + peer.socket.send(JSON.stringify({ + action: 'announce', + offer: params.offers[i].offer, + offer_id: params.offers[i].offer_id, + peer_id: common.hexToBinary(params.peer_id), + info_hash: common.hexToBinary(params.info_hash) + }), peer.socket.onSend) + debug('sent offer to %s from %s', peer.peerId, params.peer_id) + }) + } - function done () { - // emit event once the announce is fully "processed" - if (params.action === common.ACTIONS.ANNOUNCE) { - self.emit(common.EVENT_NAMES[params.event], params.peer_id, params) + const done = () => { + // emit event once the announce is fully "processed" + if (params.action === common.ACTIONS.ANNOUNCE) { + this.emit(common.EVENT_NAMES[params.event], params.peer_id, params) + } } - } - }) -} -Server.prototype._onWebSocketSend = function (socket, err) { - var self = this - if (err) self._onWebSocketError(socket, err) -} + if (params.answer) { + debug('got answer %s from %s', JSON.stringify(params.answer), params.peer_id) + + this.getSwarm(params.info_hash, (err, swarm) => { + if (this.destroyed) return + if (err) return this.emit('warning', err) + if (!swarm) { + return this.emit('warning', new Error('no swarm with that `info_hash`')) + } + // Mark the destination peer as recently used in cache + const toPeer = swarm.peers.get(params.to_peer_id) + if (!toPeer) { + return this.emit('warning', new Error('no peer with that `to_peer_id`')) + } + + toPeer.socket.send(JSON.stringify({ + action: 'announce', + answer: params.answer, + offer_id: params.offer_id, + peer_id: common.hexToBinary(params.peer_id), + info_hash: common.hexToBinary(params.info_hash) + }), toPeer.socket.onSend) + debug('sent answer to %s from %s', toPeer.peerId, params.peer_id) -Server.prototype._onWebSocketClose = function (socket) { - var self = this - debug('websocket close %s', socket.peerId) - socket.destroyed = true - - if (socket.peerId) { - socket.infoHashes.slice(0).forEach(function (infoHash) { - var swarm = self.torrents[infoHash] - if (swarm) { - swarm.announce({ - type: 'ws', - event: 'stopped', - numwant: 0, - peer_id: socket.peerId - }, noop) + done() + }) + } else { + done() } }) } - // ignore all future errors - socket.onSend = noop - socket.on('error', noop) + _onWebSocketSend (socket, err) { + if (err) this._onWebSocketError(socket, err) + } - socket.peerId = null - socket.infoHashes = null + _onWebSocketClose (socket) { + debug('websocket close %s', socket.peerId) + socket.destroyed = true - if (typeof socket.onMessageBound === 'function') { - socket.removeListener('message', socket.onMessageBound) - } - socket.onMessageBound = null + if (socket.peerId) { + socket.infoHashes.slice(0).forEach(infoHash => { + const swarm = this.torrents[infoHash] + if (swarm) { + swarm.announce({ + type: 'ws', + event: 'stopped', + numwant: 0, + peer_id: socket.peerId + }, noop) + } + }) + } - if (typeof socket.onErrorBound === 'function') { - socket.removeListener('error', socket.onErrorBound) - } - socket.onErrorBound = null + // ignore all future errors + socket.onSend = noop + socket.on('error', noop) + + socket.peerId = null + socket.infoHashes = null - if (typeof socket.onCloseBound === 'function') { - socket.removeListener('close', socket.onCloseBound) + if (typeof socket.onMessageBound === 'function') { + socket.removeListener('message', socket.onMessageBound) + } + socket.onMessageBound = null + + if (typeof socket.onErrorBound === 'function') { + socket.removeListener('error', socket.onErrorBound) + } + socket.onErrorBound = null + + if (typeof socket.onCloseBound === 'function') { + socket.removeListener('close', socket.onCloseBound) + } + socket.onCloseBound = null } - socket.onCloseBound = null -} -Server.prototype._onWebSocketError = function (socket, err) { - var self = this - debug('websocket error %s', err.message || err) - self.emit('warning', err) - self._onWebSocketClose(socket) -} + _onWebSocketError (socket, err) { + debug('websocket error %s', err.message || err) + this.emit('warning', err) + this._onWebSocketClose(socket) + } -Server.prototype._onRequest = function (params, cb) { - var self = this - if (params && params.action === common.ACTIONS.CONNECT) { - cb(null, { action: common.ACTIONS.CONNECT }) - } else if (params && params.action === common.ACTIONS.ANNOUNCE) { - self._onAnnounce(params, cb) - } else if (params && params.action === common.ACTIONS.SCRAPE) { - self._onScrape(params, cb) - } else { - cb(new Error('Invalid action')) + _onRequest (params, cb) { + if (params && params.action === common.ACTIONS.CONNECT) { + cb(null, { action: common.ACTIONS.CONNECT }) + } else if (params && params.action === common.ACTIONS.ANNOUNCE) { + this._onAnnounce(params, cb) + } else if (params && params.action === common.ACTIONS.SCRAPE) { + this._onScrape(params, cb) + } else { + cb(new Error('Invalid action')) + } } -} -Server.prototype._onAnnounce = function (params, cb) { - var self = this + _onAnnounce (params, cb) { + const self = this - if (self._filter) { - self._filter(params.info_hash, params, function (err) { - // Presence of `err` means that this announce request is disallowed - if (err) return cb(err) + if (this._filter) { + this._filter(params.info_hash, params, err => { + // Presence of `err` means that this announce request is disallowed + if (err) return cb(err) - getOrCreateSwarm(function (err, swarm) { + getOrCreateSwarm((err, swarm) => { + if (err) return cb(err) + announce(swarm) + }) + }) + } else { + getOrCreateSwarm((err, swarm) => { if (err) return cb(err) announce(swarm) }) - }) - } else { - getOrCreateSwarm(function (err, swarm) { - if (err) return cb(err) - announce(swarm) - }) - } + } - // Get existing swarm, or create one if one does not exist - function getOrCreateSwarm (cb) { - self.getSwarm(params.info_hash, function (err, swarm) { - if (err) return cb(err) - if (swarm) return cb(null, swarm) - self.createSwarm(params.info_hash, function (err, swarm) { + // Get existing swarm, or create one if one does not exist + function getOrCreateSwarm (cb) { + self.getSwarm(params.info_hash, (err, swarm) => { if (err) return cb(err) - cb(null, swarm) + if (swarm) return cb(null, swarm) + self.createSwarm(params.info_hash, (err, swarm) => { + if (err) return cb(err) + cb(null, swarm) + }) }) - }) - } + } - function announce (swarm) { - if (!params.event || params.event === 'empty') params.event = 'update' - swarm.announce(params, function (err, response) { - if (err) return cb(err) + function announce (swarm) { + if (!params.event || params.event === 'empty') params.event = 'update' + swarm.announce(params, (err, response) => { + if (err) return cb(err) - if (!response.action) response.action = common.ACTIONS.ANNOUNCE - if (!response.interval) response.interval = Math.ceil(self.intervalMs / 1000) - - if (params.compact === 1) { - var peers = response.peers - - // Find IPv4 peers - response.peers = string2compact(peers.filter(function (peer) { - return common.IPV4_RE.test(peer.ip) - }).map(function (peer) { - return peer.ip + ':' + peer.port - })) - // Find IPv6 peers - response.peers6 = string2compact(peers.filter(function (peer) { - return common.IPV6_RE.test(peer.ip) - }).map(function (peer) { - return '[' + peer.ip + ']:' + peer.port - })) - } else if (params.compact === 0) { - // IPv6 peers are not separate for non-compact responses - response.peers = response.peers.map(function (peer) { - return { - 'peer id': common.hexToBinary(peer.peerId), - ip: peer.ip, - port: peer.port - } - }) - } // else, return full peer objects (used for websocket responses) + if (!response.action) response.action = common.ACTIONS.ANNOUNCE + if (!response.interval) response.interval = Math.ceil(self.intervalMs / 1000) + + if (params.compact === 1) { + const peers = response.peers + + // Find IPv4 peers + response.peers = string2compact(peers.filter(peer => { + return common.IPV4_RE.test(peer.ip) + }).map(peer => { + return `${peer.ip}:${peer.port}` + })) + // Find IPv6 peers + response.peers6 = string2compact(peers.filter(peer => { + return common.IPV6_RE.test(peer.ip) + }).map(peer => { + return `[${peer.ip}]:${peer.port}` + })) + } else if (params.compact === 0) { + // IPv6 peers are not separate for non-compact responses + response.peers = response.peers.map(peer => { + return { + 'peer id': common.hexToBinary(peer.peerId), + ip: peer.ip, + port: peer.port + } + }) + } // else, return full peer objects (used for websocket responses) - cb(null, response) - }) + cb(null, response) + }) + } } -} -Server.prototype._onScrape = function (params, cb) { - var self = this - - if (params.info_hash == null) { - // if info_hash param is omitted, stats for all torrents are returned - // TODO: make this configurable! - params.info_hash = Object.keys(self.torrents) - } + _onScrape (params, cb) { + if (params.info_hash == null) { + // if info_hash param is omitted, stats for all torrents are returned + // TODO: make this configurable! + params.info_hash = Object.keys(this.torrents) + } - series(params.info_hash.map(function (infoHash) { - return function (cb) { - self.getSwarm(infoHash, function (err, swarm) { - if (err) return cb(err) - if (swarm) { - swarm.scrape(params, function (err, scrapeInfo) { - if (err) return cb(err) - cb(null, { - infoHash: infoHash, - complete: (scrapeInfo && scrapeInfo.complete) || 0, - incomplete: (scrapeInfo && scrapeInfo.incomplete) || 0 + series(params.info_hash.map(infoHash => { + return cb => { + this.getSwarm(infoHash, (err, swarm) => { + if (err) return cb(err) + if (swarm) { + swarm.scrape(params, (err, scrapeInfo) => { + if (err) return cb(err) + cb(null, { + infoHash, + complete: (scrapeInfo && scrapeInfo.complete) || 0, + incomplete: (scrapeInfo && scrapeInfo.incomplete) || 0 + }) }) - }) - } else { - cb(null, { infoHash: infoHash, complete: 0, incomplete: 0 }) + } else { + cb(null, { infoHash, complete: 0, incomplete: 0 }) + } + }) + } + }), (err, results) => { + if (err) return cb(err) + + const response = { + action: common.ACTIONS.SCRAPE, + files: {}, + flags: { min_request_interval: Math.ceil(this.intervalMs / 1000) } + } + + results.forEach(result => { + response.files[common.hexToBinary(result.infoHash)] = { + complete: result.complete || 0, + incomplete: result.incomplete || 0, + downloaded: result.complete || 0 // TODO: this only provides a lower-bound } }) - } - }), function (err, results) { - if (err) return cb(err) - var response = { - action: common.ACTIONS.SCRAPE, - files: {}, - flags: { min_request_interval: Math.ceil(self.intervalMs / 1000) } - } - - results.forEach(function (result) { - response.files[common.hexToBinary(result.infoHash)] = { - complete: result.complete || 0, - incomplete: result.incomplete || 0, - downloaded: result.complete || 0 // TODO: this only provides a lower-bound - } + cb(null, response) }) - - cb(null, response) - }) + } } +Server.Swarm = Swarm + function makeUdpPacket (params) { - var packet + let packet switch (params.action) { case common.ACTIONS.CONNECT: packet = Buffer.concat([ @@ -794,12 +768,12 @@ function makeUdpPacket (params) { ]) break case common.ACTIONS.SCRAPE: - var scrapeResponse = [ + const scrapeResponse = [ common.toUInt32(common.ACTIONS.SCRAPE), common.toUInt32(params.transactionId) ] - for (var infoHash in params.files) { - var file = params.files[infoHash] + for (const infoHash in params.files) { + const file = params.files[infoHash] scrapeResponse.push( common.toUInt32(file.complete), common.toUInt32(file.downloaded), // TODO: this only provides a lower-bound @@ -816,7 +790,7 @@ function makeUdpPacket (params) { ]) break default: - throw new Error('Action not implemented: ' + params.action) + throw new Error(`Action not implemented: ${params.action}`) } return packet } @@ -827,3 +801,5 @@ function toNumber (x) { } function noop () {} + +module.exports = Server