Skip to content

Commit 291ff84

Browse files
Fix PR Feedback & Prepare Monorepo
- **Fixes**: - Implemented `LRU` cache for manifests (prevent leaks). - Added strict input validation for Manifests. - Fixed `hex2bin` crash in error handler. - Refactored socket cleanup. - Added `test/megatorrent-validation.js`. - **Monorepo**: - Moved `ARCHITECTURE.md` to `docs/`. - Added `qbittorrent` submodule. - Added `README-monorepo.md`.
1 parent 6a9a1f1 commit 291ff84

File tree

7 files changed

+159
-18
lines changed

7 files changed

+159
-18
lines changed

.gitmodules

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[submodule "qbittorrent"]
2+
path = qbittorrent
3+
url = https://github.com/robertpelloni/qbittorrent

README-monorepo.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Megatorrent Monorepo
2+
3+
This repository contains the reference implementation for the Megatorrent protocol, a decentralized, mutable successor to BitTorrent.
4+
5+
## Structure
6+
7+
* `docs/`: Architecture and Protocol Specifications.
8+
* `tracker/`: Node.js implementation of the Tracker and Reference Client (the root of this repo, historically).
9+
* `qbittorrent/`: C++ Client Fork (Submodule).
10+
11+
## Getting Started
12+
13+
### Node.js Tracker & Reference Client
14+
Run the tracker and client tests:
15+
```bash
16+
npm install
17+
npm test
18+
```
19+
20+
### qBittorrent Client
21+
See `qbittorrent/README.md` for C++ build instructions.
File renamed without changes.

lib/manifest.js

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,17 @@ export function validateManifest (manifest) {
1010
if (!manifest || typeof manifest !== 'object') throw new Error('Invalid manifest')
1111
if (!manifest.publicKey || !manifest.signature) throw new Error('Missing keys')
1212

13+
// Validation
14+
if (typeof manifest.publicKey !== 'string' || !/^[0-9a-fA-F]{64}$/.test(manifest.publicKey)) {
15+
throw new Error('Invalid public key format')
16+
}
17+
if (typeof manifest.signature !== 'string' || !/^[0-9a-fA-F]{128}$/.test(manifest.signature)) {
18+
throw new Error('Invalid signature format')
19+
}
20+
if (typeof manifest.sequence !== 'number') throw new Error('Invalid sequence')
21+
if (typeof manifest.timestamp !== 'number') throw new Error('Invalid timestamp')
22+
if (!Array.isArray(manifest.collections)) throw new Error('Invalid collections')
23+
1324
// Reconstruct the payload to verify (exclude signature)
1425
const payload = {
1526
publicKey: manifest.publicKey,

qbittorrent

Submodule qbittorrent added at 5abf458

server.js

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import parseHttpRequest from './lib/server/parse-http.js'
1515
import parseUdpRequest from './lib/server/parse-udp.js'
1616
import parseWebSocketRequest from './lib/server/parse-websocket.js'
1717
import { validateManifest } from './lib/manifest.js'
18+
import { LRUCache } from 'lru-cache'
1819

1920
const debug = Debug('bittorrent-tracker:server')
2021
const hasOwnProperty = Object.prototype.hasOwnProperty
@@ -54,7 +55,10 @@ class Server extends EventEmitter {
5455
this.listening = false
5556
this.destroyed = false
5657
this.torrents = {}
57-
this.manifests = {} // PublicKey -> Manifest
58+
59+
// LRU Cache for Manifests (max 1000 items)
60+
this.manifests = new LRUCache({ max: 1000 })
61+
5862
this.subscriptions = {} // PublicKey -> Set<Socket>
5963

6064
this.http = null
@@ -649,28 +653,32 @@ class Server extends EventEmitter {
649653
return cb(new Error('Invalid signature'))
650654
}
651655
} catch (e) {
652-
return cb(e)
656+
return cb(new Error('Manifest validation failed: ' + e.message))
653657
}
654658

655659
const key = params.manifest.publicKey
656-
const current = this.manifests[key]
660+
const current = this.manifests.get(key)
657661

658662
if (current && params.manifest.sequence <= current.sequence) {
659663
return cb(new Error('Sequence too low'))
660664
}
661665

662666
// Store manifest
663-
this.manifests[key] = params.manifest
667+
this.manifests.set(key, params.manifest)
664668
debug('Manifest updated for %s seq=%d', key, params.manifest.sequence)
665669

666670
// Broadcast to subscribers
667671
if (this.subscriptions[key]) {
668672
this.subscriptions[key].forEach(socket => {
669673
if (socket.readyState === 1) { // OPEN
670-
socket.send(JSON.stringify({
671-
action: 'publish',
672-
manifest: params.manifest
673-
}))
674+
try {
675+
socket.send(JSON.stringify({
676+
action: 'publish',
677+
manifest: params.manifest
678+
}))
679+
} catch (err) {
680+
debug('Broadcast failed for peer %s: %s', socket.peerId, err.message)
681+
}
674682
}
675683
})
676684
}
@@ -687,22 +695,36 @@ class Server extends EventEmitter {
687695
}
688696
this.subscriptions[key].add(socket)
689697

698+
// Track subscriptions on the socket for efficient cleanup
699+
if (!socket.subscribedKeys) {
700+
socket.subscribedKeys = new Set()
701+
}
702+
socket.subscribedKeys.add(key)
703+
690704
// Send latest if available
691-
if (this.manifests[key]) {
692-
socket.send(JSON.stringify({
693-
action: 'publish',
694-
manifest: this.manifests[key]
695-
}))
705+
const cached = this.manifests.get(key)
706+
if (cached) {
707+
try {
708+
socket.send(JSON.stringify({
709+
action: 'publish',
710+
manifest: cached
711+
}))
712+
} catch (err) {
713+
debug('Initial send failed for peer %s: %s', socket.peerId, err.message)
714+
}
696715
}
697716

698717
// Cleanup on close
699718
if (!socket._cleanupSetup) {
700719
socket.on('close', () => {
701-
// We have to iterate since we don't store reverse mapping efficiently here
702-
// (Optimally we should store subscribed keys on socket)
703-
for (const k in this.subscriptions) {
704-
this.subscriptions[k].delete(socket)
705-
if (this.subscriptions[k].size === 0) delete this.subscriptions[k]
720+
if (socket.subscribedKeys) {
721+
for (const k of socket.subscribedKeys) {
722+
if (this.subscriptions[k]) {
723+
this.subscriptions[k].delete(socket)
724+
if (this.subscriptions[k].size === 0) delete this.subscriptions[k]
725+
}
726+
}
727+
socket.subscribedKeys.clear()
706728
}
707729
})
708730
socket._cleanupSetup = true

test/megatorrent-validation.js

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import test from 'tape'
2+
import Server from '../server.js'
3+
import WebSocket from 'ws'
4+
import { generateKeypair, sign } from '../reference-client/lib/crypto.js'
5+
import stringify from 'fast-json-stable-stringify'
6+
7+
function createTracker (opts, cb) {
8+
const server = new Server(opts)
9+
server.on('listening', () => cb(server))
10+
server.listen(0)
11+
}
12+
13+
test('Megatorrent: Validation and Cleanup', function (t) {
14+
t.plan(5)
15+
16+
createTracker({ udp: false, http: false, ws: true, stats: false }, function (server) {
17+
const port = server.ws.address().port
18+
const trackerUrl = `ws://localhost:${port}`
19+
const keypair = generateKeypair()
20+
21+
const publisher = new WebSocket(trackerUrl)
22+
23+
publisher.on('open', () => {
24+
// Test 1: Invalid Signature
25+
const invalidManifest = {
26+
publicKey: keypair.publicKey.toString('hex'),
27+
sequence: 1,
28+
timestamp: Date.now(),
29+
collections: [],
30+
signature: Buffer.alloc(64).toString('hex') // invalid sig
31+
}
32+
33+
publisher.send(JSON.stringify({
34+
action: 'publish',
35+
manifest: invalidManifest
36+
}))
37+
})
38+
39+
publisher.on('message', (data) => {
40+
const msg = JSON.parse(data)
41+
42+
if (msg['failure reason']) {
43+
// We expect failures for invalid inputs
44+
if (msg['failure reason'].includes('Invalid signature') || msg['failure reason'].includes('validation failed')) {
45+
t.pass('Rejected invalid signature/manifest')
46+
47+
// Now proceed to clean up test
48+
publisher.close()
49+
50+
// Test 2: Memory Leak / Cleanup Check
51+
// Create a subscriber that subscribes and then disconnects
52+
const subscriber = new WebSocket(trackerUrl)
53+
subscriber.on('open', () => {
54+
const k = keypair.publicKey.toString('hex')
55+
subscriber.send(JSON.stringify({ action: 'subscribe', key: k }))
56+
57+
// Allow server to process
58+
setTimeout(() => {
59+
t.ok(server.subscriptions[k], 'Subscription active')
60+
t.equal(server.subscriptions[k].size, 1, 'One subscriber')
61+
62+
subscriber.close()
63+
64+
// Wait for cleanup
65+
setTimeout(() => {
66+
if (!server.subscriptions[k]) {
67+
t.pass('Subscription Set removed after last subscriber left')
68+
} else {
69+
t.equal(server.subscriptions[k].size, 0, 'Subscriber removed from set')
70+
}
71+
72+
// Test 3: Cache Existence
73+
t.ok(server.manifests, 'LRU Cache exists')
74+
75+
server.close()
76+
}, 100)
77+
}, 100)
78+
})
79+
}
80+
}
81+
})
82+
})
83+
})

0 commit comments

Comments
 (0)