Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 24 additions & 17 deletions lib/server/swarm.js
Original file line number Diff line number Diff line change
@@ -1,29 +1,34 @@
module.exports = Swarm

var debug = require('debug')('bittorrent-tracker')
var LRU = require('lru')
var randomIterate = require('random-iterate')

// Regard this as the default implementation of an interface that you
// need to support when overriding Server.createSwarm() and Server.getSwarm()
function Swarm (infoHash, server) {
this.peers = {}
this.peers = new LRU({
max: server.peersCacheLength || 1000,
maxAge: server.peersCacheTtl || 900000 // 900 000ms = 15 minutes
})
this.complete = 0
this.incomplete = 0
}

Swarm.prototype.announce = function (params, cb) {
var self = this
var id = params.type === 'ws' ? params.peer_id : params.addr
var peer = self.peers[id]
// Mark the source peer as recently used in cache
var peer = self.peers.get(id)

if (params.event === 'started') {
self._onAnnounceStarted(params, peer)
self._onAnnounceStarted(params, peer, id)
} else if (params.event === 'stopped') {
self._onAnnounceStopped(params, peer)
self._onAnnounceStopped(params, peer, id)
} else if (params.event === 'completed') {
self._onAnnounceCompleted(params, peer)
self._onAnnounceCompleted(params, peer, id)
} else if (params.event === 'update') {
self._onAnnounceUpdate(params, peer)
self._onAnnounceUpdate(params, peer, id)
} else {
cb(new Error('invalid event'))
return
Expand All @@ -42,38 +47,36 @@ Swarm.prototype.scrape = function (params, cb) {
})
}

Swarm.prototype._onAnnounceStarted = function (params, peer) {
Swarm.prototype._onAnnounceStarted = function (params, peer, id) {
if (peer) {
debug('unexpected `started` event from peer that is already in swarm')
return this._onAnnounceUpdate(params, peer) // treat as an update
}

if (params.left === 0) this.complete += 1
else this.incomplete += 1
var id = params.type === 'ws' ? params.peer_id : params.addr
peer = this.peers[id] = {
peer = this.peers.set(id, {
type: params.type,
complete: params.left === 0,
peerId: params.peer_id, // as hex
ip: params.ip,
port: params.port,
socket: params.socket // only websocket
}
})
}

Swarm.prototype._onAnnounceStopped = function (params, peer) {
Swarm.prototype._onAnnounceStopped = function (params, peer, id) {
if (!peer) {
debug('unexpected `stopped` event from peer that is not in swarm')
return // do nothing
}

if (peer.complete) this.complete -= 1
else this.incomplete -= 1
var id = params.type === 'ws' ? params.peer_id : params.addr
delete this.peers[id]
this.peers.remove(id)
}

Swarm.prototype._onAnnounceCompleted = function (params, peer) {
Swarm.prototype._onAnnounceCompleted = function (params, peer, id) {
if (!peer) {
debug('unexpected `completed` event from peer that is not in swarm')
return this._onAnnounceStarted(params, peer) // treat as a start
Expand All @@ -86,9 +89,10 @@ Swarm.prototype._onAnnounceCompleted = function (params, peer) {
this.complete += 1
this.incomplete -= 1
peer.complete = true
this.peers.set(id, peer)
}

Swarm.prototype._onAnnounceUpdate = function (params, peer) {
Swarm.prototype._onAnnounceUpdate = function (params, peer, id) {
if (!peer) {
debug('unexpected `update` event from peer that is not in swarm')
return this._onAnnounceStarted(params, peer) // treat as a start
Expand All @@ -98,15 +102,18 @@ Swarm.prototype._onAnnounceUpdate = function (params, peer) {
this.complete += 1
this.incomplete -= 1
peer.complete = true
this.peers.set(id, peer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change this to a get() and pass in id as a parameter? All 4 announce event types now require the id, so that would be cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what you mean by changing this to get(), I am pushing the other changes but what do you mean?

}
}

Swarm.prototype._getPeers = function (numwant, ownPeerId, isWebRTC) {
var peers = []
var ite = randomIterate(Object.keys(this.peers))
var ite = randomIterate(this.peers.keys)
var peerId
while ((peerId = ite()) && peers.length < numwant) {
var peer = this.peers[peerId]
// Don't mark the peer as most recently used on announce
var peer = this.peers.peek(peerId)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a problem here, on one hand, if I use LRU.get, the peer will be marked as used and the max condition can evict a more recent peer.
On the other hand, if I use LRU.peek, we never access old peers and the maxAge eviction will not happen.

I think I will clean old peers manually as they are picked in announce...

if (!peer) continue
if (isWebRTC && peer.peerId === ownPeerId) continue // don't send peer to itself
if ((isWebRTC && peer.type !== 'ws') || (!isWebRTC && peer.type === 'ws')) continue // send proper peer type
peers.push(peer)
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"hat": "0.0.3",
"inherits": "^2.0.1",
"ip": "^1.0.1",
"lru": "^3.0.0",
"minimist": "^1.1.1",
"once": "^1.3.0",
"random-iterate": "^1.0.1",
Expand Down
11 changes: 8 additions & 3 deletions server.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ function Server (opts) {
self._trustProxy = !!opts.trustProxy
if (typeof opts.filter === 'function') self._filter = opts.filter

self.peersCacheLength = opts.peersCacheLength
self.peersCacheTtl = opts.peersCacheTtl

self._listenCalled = false
self.listening = false
self.destroyed = false
Expand Down Expand Up @@ -153,7 +156,7 @@ function Server (opts) {
if (req.method === 'GET' && (req.url === '/stats' || req.url === '/stats.json')) {
infoHashes.forEach(function (infoHash) {
var peers = self.torrents[infoHash].peers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be .peers.cache

var keys = Object.keys(peers)
var keys = peers.keys
if (keys.length > 0) activeTorrents++

keys.forEach(function (peerId) {
Expand All @@ -165,7 +168,8 @@ function Server (opts) {
leecher: false
}
}
var peer = peers[peerId]
// Don't mark the peer as most recently used for stats
var peer = peers.peek(peerId)
if (peer.ip.indexOf(':') >= 0) {
allPeers[peerId].ipv6 = true
} else {
Expand Down Expand Up @@ -489,7 +493,8 @@ Server.prototype._onWebSocketRequest = function (socket, opts, params) {
if (!swarm) {
return self.emit('warning', new Error('no swarm with that `info_hash`'))
}
var toPeer = swarm.peers[params.to_peer_id]
// Mark the destination peer as recently used in cache
var toPeer = swarm.peers.get(params.to_peer_id)
if (!toPeer) {
return self.emit('warning', new Error('no peer with that `to_peer_id`'))
}
Expand Down
77 changes: 52 additions & 25 deletions test/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ var test = require('tape')
var infoHash = '4cb67059ed6bd08362da625b3ae77f6f4a075705'
var peerId = Buffer.from('01234567890123456789')
var peerId2 = Buffer.from('12345678901234567890')
var peerId3 = Buffer.from('23456789012345678901')

function serverTest (t, serverType, serverFamily) {
t.plan(30)
t.plan(32)

var hostname = serverFamily === 'inet6'
? '[::1]'
Expand All @@ -23,7 +24,12 @@ function serverTest (t, serverType, serverFamily) {
? '::1'
: '127.0.0.1'

common.createServer(t, serverType, function (server) {
var opts = {
serverType: serverType,
peersCacheLength: 2
}

common.createServer(t, opts, function (server) {
var port = server[serverType].address().port
var announceUrl = serverType + '://' + hostname + ':' + port + '/announce'

Expand Down Expand Up @@ -52,22 +58,23 @@ function serverTest (t, serverType, serverFamily) {
t.equal(Object.keys(server.torrents).length, 1)
t.equal(swarm.complete, 0)
t.equal(swarm.incomplete, 1)
t.equal(Object.keys(swarm.peers).length, 1)
t.equal(swarm.peers.length, 1)

var id = serverType === 'ws'
? peerId.toString('hex')
: hostname + ':6881'

t.equal(swarm.peers[id].type, serverType)
t.equal(swarm.peers[id].ip, clientIp)
t.equal(swarm.peers[id].peerId, peerId.toString('hex'))
t.equal(swarm.peers[id].complete, false)
var peer = swarm.peers.peek(id)
t.equal(peer.type, serverType)
t.equal(peer.ip, clientIp)
t.equal(peer.peerId, peerId.toString('hex'))
t.equal(peer.complete, false)
if (serverType === 'ws') {
t.equal(typeof swarm.peers[id].port, 'number')
t.ok(swarm.peers[id].socket)
t.equal(typeof peer.port, 'number')
t.ok(peer.socket)
} else {
t.equal(swarm.peers[id].port, 6881)
t.notOk(swarm.peers[id].socket)
t.equal(peer.port, 6881)
t.notOk(peer.socket)
}

client1.complete()
Expand Down Expand Up @@ -102,22 +109,42 @@ function serverTest (t, serverType, serverFamily) {
client2.once('peer', function (addr) {
t.ok(addr === hostname + ':6881' || addr === hostname + ':6882' || addr.id === peerId.toString('hex'))

client2.stop()
client2.once('update', function (data) {
t.equal(data.announce, announceUrl)
t.equal(data.complete, 1)
t.equal(data.incomplete, 0)
client2.destroy()
swarm.peers.once('evict', function (evicted) {
t.equals(evicted.value.peerId, peerId.toString('hex'))
})
var client3 = new Client({
infoHash: infoHash,
announce: [ announceUrl ],
peerId: peerId3,
port: 6880
// wrtc: wrtc
})
client3.start()

server.once('start', function () {
t.pass('got start message from client3')
})

client1.stop()
client1.once('update', function (data) {
client3.once('update', function () {
client2.stop()
client2.once('update', function (data) {
t.equal(data.announce, announceUrl)
t.equal(data.complete, 0)
t.equal(data.incomplete, 0)

client1.destroy(function () {
server.close()
// if (serverType === 'ws') wrtc.close()
t.equal(data.complete, 1)
t.equal(data.incomplete, 1)
client2.destroy()

client3.stop()
client3.once('update', function (data) {
t.equal(data.announce, announceUrl)
t.equal(data.complete, 1)
t.equal(data.incomplete, 0)

client3.destroy(function () {
client1.destroy(function () {
server.close()
})
// if (serverType === 'ws') wrtc.close()
})
})
})
})
Expand Down