Skip to content

Commit 0aadcc1

Browse files
committed
Wait up to 1s for pending requests before destroy()
If the user calls: client.stop() client.destroy() We should ensure that the final 'stopped' message reaches the tracker server, even though the client will not get the response (because they destroyed the client and no more events will be emitted). If there are pending requests when destroy() is called, then a 1s timer is set after which point all requests are forcibly cleaned up. If the requests complete before the 1s timer fires, then cleanup happens right away (so we're not stuck waiting for the 1s timer). So, destroy() can happen one of three ways: - immediately, if no pending requests exist - after exactly 1s, if pending requests exist and they don't complete within 1s - less than 1s, if pending requests exist and they all complete before the 1s timer fires
1 parent 9cf2dff commit 0aadcc1

File tree

4 files changed

+175
-65
lines changed

4 files changed

+175
-65
lines changed

lib/client/http-tracker.js

Lines changed: 52 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,16 @@ function HTTPTracker (client, announceUrl, opts) {
2828

2929
// Determine scrape url (if http tracker supports it)
3030
self.scrapeUrl = null
31-
var m
32-
if ((m = self.announceUrl.match(HTTP_SCRAPE_SUPPORT))) {
33-
self.scrapeUrl = self.announceUrl.slice(0, m.index) + '/scrape' +
34-
self.announceUrl.slice(m.index + 9)
31+
32+
var match = self.announceUrl.match(HTTP_SCRAPE_SUPPORT)
33+
if (match) {
34+
var pre = self.announceUrl.slice(0, match.index)
35+
var post = self.announceUrl.slice(match.index + 9)
36+
self.scrapeUrl = pre + '/scrape' + post
3537
}
38+
39+
self.cleanupFns = []
40+
self.maybeDestroyCleanup = null
3641
}
3742

3843
HTTPTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 60 * 1000 // 30 minutes
@@ -82,22 +87,61 @@ HTTPTracker.prototype.destroy = function (cb) {
8287
self.destroyed = true
8388
clearInterval(self.interval)
8489

85-
cb(null)
90+
// If there are no pending requests, destroy immediately.
91+
if (self.cleanupFns.length === 0) return destroyCleanup()
92+
93+
// Otherwise, wait a short time for pending requests to complete, then force
94+
// destroy them.
95+
var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT)
96+
97+
// But, if all pending requests complete before the timeout fires, do cleanup
98+
// right away.
99+
self.maybeDestroyCleanup = function () {
100+
if (self.cleanupFns.length === 0) destroyCleanup()
101+
}
102+
103+
function destroyCleanup () {
104+
if (timeout) {
105+
clearTimeout(timeout)
106+
timeout = null
107+
}
108+
self.maybeDestroyCleanup = null
109+
self.cleanupFns.slice(0).forEach(function (cleanup) {
110+
cleanup()
111+
})
112+
self.cleanupFns = []
113+
cb(null)
114+
}
86115
}
87116

88117
HTTPTracker.prototype._request = function (requestUrl, params, cb) {
89118
var self = this
90119
var u = requestUrl + (requestUrl.indexOf('?') === -1 ? '?' : '&') +
91120
common.querystringStringify(params)
92-
var opts = {
121+
122+
self.cleanupFns.push(cleanup)
123+
124+
var request = get.concat({
93125
url: u,
126+
timeout: common.REQUEST_TIMEOUT,
94127
headers: {
95128
'user-agent': self.client._userAgent || ''
96129
}
130+
}, onResponse)
131+
132+
function cleanup () {
133+
if (request) {
134+
self.cleanupFns.splice(self.cleanupFns.indexOf(cleanup), 1)
135+
request.abort()
136+
request = null
137+
}
138+
if (self.maybeDestroyCleanup) self.maybeDestroyCleanup()
97139
}
98140

99-
get.concat(opts, function (err, res, data) {
141+
function onResponse (err, res, data) {
142+
cleanup()
100143
if (self.destroyed) return
144+
101145
if (err) return self.client.emit('warning', err)
102146
if (res.statusCode !== 200) {
103147
return self.client.emit('warning', new Error('Non-200 response code ' +
@@ -128,7 +172,7 @@ HTTPTracker.prototype._request = function (requestUrl, params, cb) {
128172
debug('response from ' + requestUrl)
129173

130174
cb(data)
131-
})
175+
}
132176
}
133177

134178
HTTPTracker.prototype._onAnnounceResponse = function (data) {

lib/client/udp-tracker.js

Lines changed: 70 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ var url = require('url')
1212
var common = require('../common')
1313
var Tracker = require('./tracker')
1414

15-
var TIMEOUT = 15000
16-
1715
inherits(UDPTracker, Tracker)
1816

1917
/**
@@ -29,6 +27,7 @@ function UDPTracker (client, announceUrl, opts) {
2927
debug('new udp tracker %s', announceUrl)
3028

3129
self.cleanupFns = []
30+
self.maybeDestroyCleanup = null
3231
}
3332

3433
UDPTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 60 * 1000 // 30 minutes
@@ -52,11 +51,31 @@ UDPTracker.prototype.destroy = function (cb) {
5251
self.destroyed = true
5352
clearInterval(self.interval)
5453

55-
self.cleanupFns.slice(0).forEach(function (cleanup) {
56-
cleanup()
57-
})
58-
self.cleanupFns = []
59-
cb(null)
54+
// If there are no pending requests, destroy immediately.
55+
if (self.cleanupFns.length === 0) return destroyCleanup()
56+
57+
// Otherwise, wait a short time for pending requests to complete, then force
58+
// destroy them.
59+
var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT)
60+
61+
// But, if all pending requests complete before the timeout fires, do cleanup
62+
// right away.
63+
self.maybeDestroyCleanup = function () {
64+
if (self.cleanupFns.length === 0) destroyCleanup()
65+
}
66+
67+
function destroyCleanup () {
68+
if (timeout) {
69+
clearTimeout(timeout)
70+
timeout = null
71+
}
72+
self.maybeDestroyCleanup = null
73+
self.cleanupFns.slice(0).forEach(function (cleanup) {
74+
cleanup()
75+
})
76+
self.cleanupFns = []
77+
cb(null)
78+
}
6079
}
6180

6281
UDPTracker.prototype._request = function (opts) {
@@ -66,41 +85,51 @@ UDPTracker.prototype._request = function (opts) {
6685
var transactionId = genTransactionId()
6786
var socket = dgram.createSocket('udp4')
6887

69-
var cleanup = function () {
70-
if (!socket) return
71-
self.cleanupFns.splice(self.cleanupFns.indexOf(cleanup), 1)
72-
if (timeout) {
73-
clearTimeout(timeout)
74-
timeout = null
75-
}
76-
socket.removeListener('error', onError)
77-
socket.removeListener('message', onSocketMessage)
78-
socket.on('error', noop) // ignore all future errors
79-
try { socket.close() } catch (err) {}
80-
socket = null
81-
}
82-
self.cleanupFns.push(cleanup)
83-
84-
// does not matter if `stopped` event arrives, so supress errors & cleanup after timeout
85-
var ms = opts.event === 'stopped' ? TIMEOUT / 10 : TIMEOUT
8688
var timeout = setTimeout(function () {
87-
timeout = null
89+
// does not matter if `stopped` event arrives, so supress errors
8890
if (opts.event === 'stopped') cleanup()
8991
else onError(new Error('tracker request timed out (' + opts.event + ')'))
90-
}, ms)
92+
timeout = null
93+
}, common.REQUEST_TIMEOUT)
9194
if (timeout.unref) timeout.unref()
9295

96+
self.cleanupFns.push(cleanup)
97+
9398
send(Buffer.concat([
9499
common.CONNECTION_ID,
95100
common.toUInt32(common.ACTIONS.CONNECT),
96101
transactionId
97102
]))
98103

99-
socket.on('error', onError)
104+
socket.once('error', onError)
100105
socket.on('message', onSocketMessage)
101106

102-
function onSocketMessage (msg) {
107+
function cleanup () {
108+
if (timeout) {
109+
clearTimeout(timeout)
110+
timeout = null
111+
}
112+
if (socket) {
113+
self.cleanupFns.splice(self.cleanupFns.indexOf(cleanup), 1)
114+
socket.removeListener('error', onError)
115+
socket.removeListener('message', onSocketMessage)
116+
socket.on('error', noop) // ignore all future errors
117+
try { socket.close() } catch (err) {}
118+
socket = null
119+
}
120+
if (self.maybeDestroyCleanup) self.maybeDestroyCleanup()
121+
}
122+
123+
function onError (err) {
124+
cleanup()
103125
if (self.destroyed) return
126+
127+
if (err.message) err.message += ' (' + self.announceUrl + ')'
128+
// errors will often happen if a tracker is offline, so don't treat it as fatal
129+
self.client.emit('warning', err)
130+
}
131+
132+
function onSocketMessage (msg) {
104133
if (msg.length < 8 || msg.readUInt32BE(4) !== transactionId.readUInt32BE(0)) {
105134
return onError(new Error('tracker sent invalid transaction id'))
106135
}
@@ -109,15 +138,20 @@ UDPTracker.prototype._request = function (opts) {
109138
debug('UDP response %s, action %s', self.announceUrl, action)
110139
switch (action) {
111140
case 0: // handshake
141+
// Note: no check for `self.destroyed` so that pending messages to the
142+
// tracker can still be sent/received even after destroy() is called
143+
112144
if (msg.length < 16) return onError(new Error('invalid udp handshake'))
113145

114146
if (opts._scrape) scrape(msg.slice(8, 16))
115147
else announce(msg.slice(8, 16), opts)
116148

117-
return
149+
break
118150

119151
case 1: // announce
120152
cleanup()
153+
if (self.destroyed) return
154+
121155
if (msg.length < 20) return onError(new Error('invalid announce message'))
122156

123157
var interval = msg.readUInt32BE(8)
@@ -138,10 +172,13 @@ UDPTracker.prototype._request = function (opts) {
138172
addrs.forEach(function (addr) {
139173
self.client.emit('peer', addr)
140174
})
175+
141176
break
142177

143178
case 2: // scrape
144179
cleanup()
180+
if (self.destroyed) return
181+
145182
if (msg.length < 20 || (msg.length - 8) % 12 !== 0) {
146183
return onError(new Error('invalid scrape message'))
147184
}
@@ -158,12 +195,16 @@ UDPTracker.prototype._request = function (opts) {
158195
incomplete: msg.readUInt32BE(16 + (i * 12))
159196
})
160197
}
198+
161199
break
162200

163201
case 3: // error
164202
cleanup()
203+
if (self.destroyed) return
204+
165205
if (msg.length < 8) return onError(new Error('invalid error message'))
166206
self.client.emit('warning', new Error(msg.slice(8).toString()))
207+
167208
break
168209

169210
default:
@@ -172,14 +213,6 @@ UDPTracker.prototype._request = function (opts) {
172213
}
173214
}
174215

175-
function onError (err) {
176-
if (self.destroyed) return
177-
cleanup()
178-
if (err.message) err.message += ' (' + self.announceUrl + ')'
179-
// errors will often happen if a tracker is offline, so don't treat it as fatal
180-
self.client.emit('warning', err)
181-
}
182-
183216
function send (message) {
184217
if (!parsedUrl.port) {
185218
parsedUrl.port = 80

lib/client/websocket-tracker.js

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ function WebSocketTracker (client, announceUrl, opts) {
3434
self.retries = 0
3535
self.reconnectTimer = null
3636

37+
// Simple boolean flag to track whether the socket has received data from
38+
// the websocket server since the last time socket.send() was called.
39+
self.expectingResponse = false
40+
3741
self._openSocket()
3842
}
3943

@@ -104,44 +108,59 @@ WebSocketTracker.prototype.destroy = function (cb) {
104108
clearInterval(self.interval)
105109
clearTimeout(self.reconnectTimer)
106110

111+
// Destroy peers
112+
for (var peerId in self.peers) {
113+
var peer = self.peers[peerId]
114+
clearTimeout(peer.trackerTimeout)
115+
peer.destroy()
116+
}
117+
self.peers = null
118+
107119
if (self.socket) {
108120
self.socket.removeListener('connect', self._onSocketConnectBound)
109121
self.socket.removeListener('data', self._onSocketDataBound)
110122
self.socket.removeListener('close', self._onSocketCloseBound)
111123
self.socket.removeListener('error', self._onSocketErrorBound)
124+
self.socket = null
112125
}
113126

114127
self._onSocketConnectBound = null
115128
self._onSocketErrorBound = null
116129
self._onSocketDataBound = null
117130
self._onSocketCloseBound = null
118131

119-
// Destroy peers
120-
for (var peerId in self.peers) {
121-
var peer = self.peers[peerId]
122-
clearTimeout(peer.trackerTimeout)
123-
peer.destroy()
124-
}
125-
self.peers = null
126-
127132
if (socketPool[self.announceUrl]) {
128133
socketPool[self.announceUrl].consumers -= 1
129134
}
130135

131-
if (socketPool[self.announceUrl].consumers === 0) {
132-
delete socketPool[self.announceUrl]
136+
// Other instances are using the socket, so there's nothing left to do here
137+
if (socketPool[self.announceUrl].consumers > 0) return cb()
138+
139+
var socket = socketPool[self.announceUrl]
140+
delete socketPool[self.announceUrl]
141+
socket.on('error', noop) // ignore all future errors
142+
socket.once('close', cb)
143+
144+
// If there is no data response expected, destroy immediately.
145+
if (!self.expectingResponse) return destroyCleanup()
146+
147+
// Otherwise, wait a short time for potential responses to come in from the
148+
// server, then force close the socket.
149+
var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT)
133150

134-
try {
135-
self.socket.on('error', noop) // ignore all future errors
136-
self.socket.destroy(cb)
137-
} catch (err) {
138-
cb(null)
151+
// But, if a response comes from the server before the timeout fires, do cleanup
152+
// right away.
153+
socket.once('data', destroyCleanup)
154+
155+
function destroyCleanup () {
156+
if (timeout) {
157+
clearTimeout(timeout)
158+
timeout = null
139159
}
140-
} else {
141-
cb(null)
160+
socket.removeListener('data', destroyCleanup)
161+
socket.destroy()
162+
socket = null
142163
}
143-
144-
self.socket = null
145164
}
146165

147166
WebSocketTracker.prototype._openSocket = function () {
@@ -192,6 +211,8 @@ WebSocketTracker.prototype._onSocketData = function (data) {
192211
var self = this
193212
if (self.destroyed) return
194213

214+
self.expectingResponse = false
215+
195216
try {
196217
data = JSON.parse(data)
197218
} catch (err) {
@@ -352,7 +373,7 @@ WebSocketTracker.prototype._startReconnectTimer = function () {
352373
WebSocketTracker.prototype._send = function (params) {
353374
var self = this
354375
if (self.destroyed) return
355-
376+
self.expectingResponse = true
356377
var message = JSON.stringify(params)
357378
debug('send %s', message)
358379
self.socket.send(message)

0 commit comments

Comments
 (0)