Skip to content

Commit 8788d75

Browse files
committed
udp/ws clients: add destroy functions
Fixes webtorrent#75
1 parent f285c9d commit 8788d75

File tree

3 files changed

+87
-65
lines changed

3 files changed

+87
-65
lines changed

client.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ Client.prototype.destroy = function () {
227227
debug('destroy')
228228

229229
self._trackers.forEach(function (tracker) {
230-
if (tracker.destroy) tracker.destroy()
230+
tracker.destroy()
231231
tracker.setInterval(0) // stop announcing on intervals
232232
})
233233
self._trackers = []

lib/udp-tracker.js

Lines changed: 67 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ var url = require('url')
1111

1212
var common = require('./common')
1313

14+
var TIMEOUT = 15000
15+
1416
inherits(UDPTracker, EventEmitter)
1517

1618
/**
@@ -31,6 +33,7 @@ function UDPTracker (client, announceUrl, opts) {
3133
self._announceUrl = announceUrl
3234
self._intervalMs = self.client._intervalMs // use client interval initially
3335
self._interval = null
36+
self._cleanupFns = []
3437
}
3538

3639
UDPTracker.prototype.announce = function (opts) {
@@ -44,60 +47,90 @@ UDPTracker.prototype.scrape = function (opts) {
4447
self._request(opts) // udp scrape uses same announce url
4548
}
4649

50+
// TODO: Improve this interface
51+
UDPTracker.prototype.setInterval = function (intervalMs) {
52+
var self = this
53+
clearInterval(self._interval)
54+
55+
self._intervalMs = intervalMs
56+
if (intervalMs) {
57+
// HACK
58+
var update = self.announce.bind(self, self.client._defaultAnnounceOpts())
59+
self._interval = setInterval(update, self._intervalMs)
60+
}
61+
}
62+
63+
UDPTracker.prototype.destroy = function () {
64+
var self = this
65+
if (self.destroyed) return
66+
self.destroyed = true
67+
68+
self._cleanupFns.slice(0).forEach(function (cleanup) {
69+
cleanup()
70+
})
71+
self._cleanupFns = []
72+
}
73+
4774
UDPTracker.prototype._request = function (opts) {
4875
var self = this
4976
if (!opts) opts = {}
5077
var parsedUrl = url.parse(self._announceUrl)
51-
var socket = dgram.createSocket('udp4')
5278
var transactionId = genTransactionId()
79+
var socket = dgram.createSocket('udp4')
80+
81+
var cleanup = function () {
82+
self._cleanupFns.splice(self._cleanupFns.indexOf(cleanup), 1)
83+
if (timeout) {
84+
clearTimeout(timeout)
85+
timeout = null
86+
}
87+
socket.removeListener('error', onError)
88+
socket.removeListener('message', onSocketMessage)
89+
try { socket.close() } catch (err) {}
90+
socket = null
91+
}
92+
self._cleanupFns.push(cleanup)
5393

5494
// does not matter if `stopped` event arrives, so supress errors & cleanup after timeout
95+
var ms = opts.event === 'stopped' ? TIMEOUT / 10 : TIMEOUT
5596
var timeout = setTimeout(function () {
5697
timeout = null
5798
cleanup()
5899
if (opts.event !== 'stopped') {
59-
error('tracker request timed out')
100+
onError(new Error('tracker request timed out'))
60101
}
61-
}, opts.event === 'stopped' ? 1500 : 15000)
62-
63-
if (timeout && timeout.unref) {
64-
timeout.unref()
65-
}
102+
}, ms)
103+
if (timeout.unref) timeout.unref()
66104

67105
send(Buffer.concat([
68106
common.CONNECTION_ID,
69107
common.toUInt32(common.ACTIONS.CONNECT),
70108
transactionId
71109
]))
72110

73-
socket.on('error', error)
111+
socket.on('error', onError)
112+
socket.on('message', onSocketMessage)
74113

75-
socket.on('message', function (msg) {
114+
function onSocketMessage (msg) {
115+
if (self.destroyed) return
76116
if (msg.length < 8 || msg.readUInt32BE(4) !== transactionId.readUInt32BE(0)) {
77-
return error('tracker sent invalid transaction id')
117+
return onError(new Error('tracker sent invalid transaction id'))
78118
}
79119

80120
var action = msg.readUInt32BE(0)
81-
debug(self._announceUrl + ' UDP response, action ' + action)
121+
debug('UDP response %s, action %s', self._announceUrl, action)
82122
switch (action) {
83123
case 0: // handshake
84-
if (msg.length < 16) {
85-
return error('invalid udp handshake')
86-
}
124+
if (msg.length < 16) return onError(new Error('invalid udp handshake'))
87125

88-
if (opts._scrape) {
89-
scrape(msg.slice(8, 16))
90-
} else {
91-
announce(msg.slice(8, 16), opts)
92-
}
126+
if (opts._scrape) scrape(msg.slice(8, 16))
127+
else announce(msg.slice(8, 16), opts)
93128

94129
return
95130

96131
case 1: // announce
97132
cleanup()
98-
if (msg.length < 20) {
99-
return error('invalid announce message')
100-
}
133+
if (msg.length < 20) return onError(new Error('invalid announce message'))
101134

102135
var interval = msg.readUInt32BE(8)
103136
if (interval && !self._opts.interval && self._intervalMs !== 0) {
@@ -126,7 +159,7 @@ UDPTracker.prototype._request = function (opts) {
126159
case 2: // scrape
127160
cleanup()
128161
if (msg.length < 20 || (msg.length - 8) % 12 !== 0) {
129-
return error('invalid scrape message')
162+
return onError(new Error('invalid scrape message'))
130163
}
131164
var infoHashes = (Array.isArray(opts.infoHash) && opts.infoHash.length > 0)
132165
? opts.infoHash.map(function (infoHash) { return infoHash.toString('hex') })
@@ -145,37 +178,29 @@ UDPTracker.prototype._request = function (opts) {
145178

146179
case 3: // error
147180
cleanup()
148-
if (msg.length < 8) {
149-
return error('invalid error message')
150-
}
181+
if (msg.length < 8) return onError(new Error('invalid error message'))
151182
self.client.emit('warning', new Error(msg.slice(8).toString()))
152183
break
153184

154185
default:
155-
error('tracker sent invalid action')
186+
onError(new Error('tracker sent invalid action'))
156187
break
157188
}
158-
})
159-
160-
function send (message) {
161-
if (!parsedUrl.port) {
162-
parsedUrl.port = 80
163-
}
164-
socket.send(message, 0, message.length, parsedUrl.port, parsedUrl.hostname)
165189
}
166190

167-
function error (message) {
168-
// errors will often happen if a tracker is offline, so don't treat it as fatal
169-
self.client.emit('warning', new Error(message + ' (' + self._announceUrl + ')'))
191+
function onError (err) {
192+
if (self.destroyed) return
170193
cleanup()
194+
if (err.message) err.message += ' (' + self._announceUrl + ')'
195+
// errors will often happen if a tracker is offline, so don't treat it as fatal
196+
self.client.emit('warning', err)
171197
}
172198

173-
function cleanup () {
174-
if (timeout) {
175-
clearTimeout(timeout)
176-
timeout = null
199+
function send (message) {
200+
if (!parsedUrl.port) {
201+
parsedUrl.port = 80
177202
}
178-
try { socket.close() } catch (err) {}
203+
socket.send(message, 0, message.length, parsedUrl.port, parsedUrl.hostname)
179204
}
180205

181206
function announce (connectionId, opts) {
@@ -215,19 +240,6 @@ UDPTracker.prototype._request = function (opts) {
215240
}
216241
}
217242

218-
// TODO: Improve this interface
219-
UDPTracker.prototype.setInterval = function (intervalMs) {
220-
var self = this
221-
clearInterval(self._interval)
222-
223-
self._intervalMs = intervalMs
224-
if (intervalMs) {
225-
// HACK
226-
var update = self.announce.bind(self, self.client._defaultAnnounceOpts())
227-
self._interval = setInterval(update, self._intervalMs)
228-
}
229-
}
230-
231243
function genTransactionId () {
232244
return new Buffer(hat(32), 'hex')
233245
}

lib/websocket-tracker.js

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// TODO: destroy the websocket
1+
// TODO: cleanup unused Peers when tracker doesn't respond with matches
22

33
module.exports = WebSocketTracker
44

@@ -11,10 +11,6 @@ var Socket = require('simple-websocket')
1111

1212
var common = require('./common')
1313

14-
// It turns out that you can't open multiple websockets to the same server within one
15-
// browser tab, so let's reuse them.
16-
var socketPool = {}
17-
1814
inherits(WebSocketTracker, EventEmitter)
1915

2016
function WebSocketTracker (client, announceUrl, opts) {
@@ -31,11 +27,12 @@ function WebSocketTracker (client, announceUrl, opts) {
3127
self._intervalMs = self.client._intervalMs // use client interval initially
3228
self._interval = null
3329

34-
if (socketPool[announceUrl]) self._socket = socketPool[announceUrl]
35-
else self._socket = socketPool[announceUrl] = new Socket(announceUrl)
30+
self._onSocketErrorBound = self._onSocketError.bind(self)
31+
self._onSocketDataBound = self._onSocketData.bind(self)
3632

37-
self._socket.on('error', self._onSocketError.bind(self))
38-
self._socket.on('data', self._onSocketData.bind(self))
33+
self._socket = new Socket(announceUrl + '?' + hat(40))
34+
self._socket.on('error', self._onSocketErrorBound)
35+
self._socket.on('data', self._onSocketDataBound)
3936
}
4037

4138
WebSocketTracker.prototype.announce = function (opts) {
@@ -79,13 +76,24 @@ WebSocketTracker.prototype.setInterval = function (intervalMs) {
7976
}
8077
}
8178

79+
WebSocketTracker.prototype.destroy = function () {
80+
var self = this
81+
if (self.destroyed) return
82+
self.destroyed = true
83+
self._socket.removeListener('error', self._onSocketErrorBound)
84+
self._socket.removeListener('data', self._onSocketDataBound)
85+
self._socket.close()
86+
}
87+
8288
WebSocketTracker.prototype._onSocketError = function (err) {
8389
var self = this
90+
if (self.destroyed) return
8491
self.client.emit('error', err)
8592
}
8693

8794
WebSocketTracker.prototype._onSocketData = function (data) {
8895
var self = this
96+
if (self.destroyed) return
8997

9098
if (!(typeof data === 'object' && data !== null)) {
9199
return self.client.emit('warning', new Error('Invalid tracker response'))
@@ -161,6 +169,8 @@ WebSocketTracker.prototype._onSocketData = function (data) {
161169

162170
WebSocketTracker.prototype._send = function (params) {
163171
var self = this
172+
if (self.destroyed) return
173+
164174
var message = JSON.stringify(params)
165175
debug('send %s', message)
166176
self._socket.send(message)

0 commit comments

Comments
 (0)