diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 00000000..9029450f --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "qbittorrent"] + path = qbittorrent + url = https://github.com/robertpelloni/qbittorrent diff --git a/README-monorepo.md b/README-monorepo.md new file mode 100644 index 00000000..f6ababa9 --- /dev/null +++ b/README-monorepo.md @@ -0,0 +1,21 @@ +# Megatorrent Monorepo + +This repository contains the reference implementation for the Megatorrent protocol, a decentralized, mutable successor to BitTorrent. + +## Structure + +* `docs/`: Architecture and Protocol Specifications. +* `tracker/`: Node.js implementation of the Tracker and Reference Client (the root of this repo, historically). +* `qbittorrent/`: C++ Client Fork (Submodule). + +## Getting Started + +### Node.js Tracker & Reference Client +Run the tracker and client tests: +```bash +npm install +npm test +``` + +### qBittorrent Client +See `qbittorrent/README.md` for C++ build instructions. diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md new file mode 100644 index 00000000..93c6bce4 --- /dev/null +++ b/docs/ARCHITECTURE.md @@ -0,0 +1,49 @@ + +## 6. Obfuscated Storage Protocol (Phase 2) +To achieve plausible deniability and distributed redundancy, data is not stored as "files" but as "Blobs" of high-entropy data. + +### Concepts +* **Blob**: A fixed-size (or variable) container stored by a Host. To the Host, it is just random bytes. +* **Chunk**: A segment of a user's file. +* **Encryption**: Each Chunk is encrypted with a *unique* random key (ChaCha20-Poly1305) before being placed into a Blob. +* **Muxing**: A Blob may contain multiple Chunks (potentially from different files) or padding. + +### Data Structure: The `FileEntry` +This structure replaces the simple "magnet link" in the Megatorrent Manifest. + +```json +{ + "name": "episode1.mkv", + "mime": "video/x-matroska", + "size": 104857600, + "chunks": [ + { + "blobId": "", + "offset": 0, // Byte offset in the Blob + "length": 1048576, // Length of the encrypted chunk + "key": "<32-byte Hex Key>", + "nonce": "<12-byte Hex Nonce>", + "authTag": "<16-byte Hex Tag>" + }, + ... + ] +} +``` + +### Process +1. **Ingest**: + * File is split into N chunks. + * For each chunk: + * Generate random Key & Nonce. + * Encrypt Chunk -> EncryptedChunk. + * (Simplification for Ref Impl) EncryptedChunk becomes a "Blob" directly (or is wrapped). + * Blob ID = SHA256(Blob). + * Result: A list of Blobs (to be uploaded) and a `FileEntry` (to be put in the Manifest). + +2. **Access**: + * Subscriber receives Manifest. + * Parses `FileEntry`. + * Requests Blob(ID) from the network (simulated via local dir or tracker relay). + * Extracts bytes at `offset` for `length`. + * Decrypts using `key` and `nonce`. + * Reassembles file. diff --git a/lib/manifest.js b/lib/manifest.js new file mode 100644 index 00000000..8896aede --- /dev/null +++ b/lib/manifest.js @@ -0,0 +1,37 @@ +import stringify from 'fast-json-stable-stringify' +import sodium from 'sodium-native' + +export function verify (message, signature, publicKey) { + const msgBuffer = Buffer.isBuffer(message) ? message : Buffer.from(message) + return sodium.crypto_sign_verify_detached(signature, msgBuffer, publicKey) +} + +export function validateManifest (manifest) { + if (!manifest || typeof manifest !== 'object') throw new Error('Invalid manifest') + if (!manifest.publicKey || !manifest.signature) throw new Error('Missing keys') + + // Validation + if (typeof manifest.publicKey !== 'string' || !/^[0-9a-fA-F]{64}$/.test(manifest.publicKey)) { + throw new Error('Invalid public key format') + } + if (typeof manifest.signature !== 'string' || !/^[0-9a-fA-F]{128}$/.test(manifest.signature)) { + throw new Error('Invalid signature format') + } + if (typeof manifest.sequence !== 'number') throw new Error('Invalid sequence') + if (typeof manifest.timestamp !== 'number') throw new Error('Invalid timestamp') + if (!Array.isArray(manifest.collections)) throw new Error('Invalid collections') + + // Reconstruct the payload to verify (exclude signature) + const payload = { + publicKey: manifest.publicKey, + sequence: manifest.sequence, + timestamp: manifest.timestamp, + collections: manifest.collections + } + + const jsonString = stringify(payload) + const publicKey = Buffer.from(manifest.publicKey, 'hex') + const signature = Buffer.from(manifest.signature, 'hex') + + return verify(jsonString, signature, publicKey) +} diff --git a/lib/server/parse-websocket.js b/lib/server/parse-websocket.js index 744a3c30..f16a6e91 100644 --- a/lib/server/parse-websocket.js +++ b/lib/server/parse-websocket.js @@ -48,6 +48,12 @@ export default function (socket, opts, params) { return bin2hex(binaryInfoHash) }) } + } else if (params.action === 'publish') { + // Custom Action: Publish Manifest + if (!params.manifest) throw new Error('missing manifest') + } else if (params.action === 'subscribe') { + // Custom Action: Subscribe to Key + if (!params.key) throw new Error('missing key') } else { throw new Error(`invalid action in WS request: ${params.action}`) } diff --git a/package.json b/package.json index ca9cebd6..8cbbc933 100644 --- a/package.json +++ b/package.json @@ -35,6 +35,7 @@ "compact2string": "^1.4.1", "cross-fetch-ponyfill": "^1.0.3", "debug": "^4.3.4", + "fast-json-stable-stringify": "^2.1.0", "ip": "^2.0.1", "lru": "^3.1.0", "minimist": "^1.2.8", @@ -44,6 +45,7 @@ "run-parallel": "^1.2.0", "run-series": "^1.1.9", "socks": "^2.8.3", + "sodium-native": "^5.0.10", "string2compact": "^2.0.1", "uint8-util": "^2.2.5", "unordered-array-remove": "^1.0.2", @@ -52,7 +54,7 @@ "devDependencies": { "@mapbox/node-pre-gyp": "1.0.11", "@webtorrent/semantic-release-config": "1.0.10", - "magnet-uri": "7.0.7", + "magnet-uri": "^7.0.7", "semantic-release": "21.1.2", "standard": "*", "tape": "5.9.0", diff --git a/qbittorrent b/qbittorrent new file mode 160000 index 00000000..5abf458e --- /dev/null +++ b/qbittorrent @@ -0,0 +1 @@ +Subproject commit 5abf458e69ff644b60a205814e3da2b22febf0a8 diff --git a/reference-client/index.js b/reference-client/index.js new file mode 100644 index 00000000..75d8c2c0 --- /dev/null +++ b/reference-client/index.js @@ -0,0 +1,248 @@ +#!/usr/bin/env node + +import fs from 'fs' +import path from 'path' +import minimist from 'minimist' +import Client from 'bittorrent-tracker' +import { generateKeypair } from './lib/crypto.js' +import { createManifest, validateManifest } from './lib/manifest.js' +import { ingest, reassemble } from './lib/storage.js' + +const argv = minimist(process.argv.slice(2), { + alias: { + k: 'keyfile', + t: 'tracker', + i: 'input', + o: 'output', + d: 'dir' + }, + default: { + keyfile: './identity.json', + tracker: 'ws://localhost:8000', // Default to local WS tracker + dir: './storage' + } +}) + +const command = argv._[0] + +if (!command) { + console.error(`Usage: + gen-key [-k identity.json] + ingest -i [-d ./storage] -> Returns FileEntry JSON + publish [-k identity.json] [-t ws://tracker] -i + subscribe [-t ws://tracker] [-d ./storage] + `) + process.exit(1) +} + +// Ensure storage dir exists +if (!fs.existsSync(argv.dir)) { + fs.mkdirSync(argv.dir, { recursive: true }) +} + +// 1. Generate Key +if (command === 'gen-key') { + const keypair = generateKeypair() + const data = { + publicKey: keypair.publicKey.toString('hex'), + secretKey: keypair.secretKey.toString('hex') + } + fs.writeFileSync(argv.keyfile, JSON.stringify(data, null, 2)) + console.log(`Identity generated at ${argv.keyfile}`) + console.log(`Public Key: ${data.publicKey}`) + process.exit(0) +} + +// 2. Ingest +if (command === 'ingest') { + if (!argv.input) { + console.error('Please specify input file with -i') + process.exit(1) + } + const fileBuf = fs.readFileSync(argv.input) + const result = ingest(fileBuf, path.basename(argv.input)) + + // Save Blobs + result.blobs.forEach(blob => { + fs.writeFileSync(path.join(argv.dir, blob.id), blob.buffer) + }) + + console.log(`Ingested ${result.blobs.length} blobs to ${argv.dir}`) + console.log('FileEntry JSON (save this to a file to publish it):') + console.log(JSON.stringify(result.fileEntry, null, 2)) + process.exit(0) +} + +// 3. Publish +if (command === 'publish') { + if (!fs.existsSync(argv.keyfile)) { + console.error('Keyfile not found. Run gen-key first.') + process.exit(1) + } + const keyData = JSON.parse(fs.readFileSync(argv.keyfile)) + const keypair = { + publicKey: Buffer.from(keyData.publicKey, 'hex'), + secretKey: Buffer.from(keyData.secretKey, 'hex') + } + + // Read Input + if (!argv.input) { + console.error('Please specify input file with -i (json file entry or text list)') + process.exit(1) + } + + const content = fs.readFileSync(argv.input, 'utf-8') + let items + try { + // Try parsing as JSON (FileEntry) + const json = JSON.parse(content) + // Wrap in our "Items" list. + // In the future, "Items" can be Magnet Links OR FileEntries. + items = [json] + } catch (e) { + // Fallback: Line-separated magnet links + items = content.split('\n').map(l => l.trim()).filter(l => l.length > 0) + } + + // Create Collections structure + const collections = [{ + title: 'Default Collection', + items + }] + + // Create Manifest + const sequence = Date.now() + const manifest = createManifest(keypair, sequence, collections) + + console.log('Publishing manifest:', JSON.stringify(manifest, null, 2)) + + // Connect to Tracker + const client = new Client({ + infoHash: Buffer.alloc(20), // Dummy, not used for custom proto + peerId: Buffer.alloc(20), // Dummy + announce: [argv.tracker], + port: 6666 + }) + + client.on('error', err => console.error('Client Error:', err.message)) + + client.on('update', () => { + // We don't expect standard updates + }) + + // We need to access the underlying socket to send our custom message + // bittorrent-tracker abstracts this, so we might need to hook into the `announce` phase or just use the socket directly if exposed. + // The library exposes `client._trackers` which is a list of Tracker instances. + + // Wait for socket connection + setTimeout(() => { + const trackers = client._trackers + let sent = false + + for (const tracker of trackers) { + // We only support WebSocket for this custom protocol right now + if (tracker.socket && tracker.socket.readyState === 1) { // OPEN + console.log('Sending publish message to ' + tracker.announceUrl) + tracker.socket.send(JSON.stringify({ + action: 'publish', + manifest + })) + sent = true + } + } + + if (sent) console.log('Publish sent!') + else console.error('No connected websocket trackers found.') + + setTimeout(() => { + client.destroy() + process.exit(0) + }, 1000) + }, 1000) +} + +// 4. Subscribe +if (command === 'subscribe') { + const pubKeyHex = argv._[1] + if (!pubKeyHex) { + console.error('Please provide public key hex') + process.exit(1) + } + + console.log(`Subscribing to ${pubKeyHex}...`) + + const client = new Client({ + infoHash: Buffer.alloc(20), + peerId: Buffer.alloc(20), + announce: [argv.tracker], + port: 6667 + }) + + client.on('error', err => console.error('Client Error:', err.message)) + + // Hook into internal trackers to send subscribe + setInterval(() => { + const trackers = client._trackers + for (const tracker of trackers) { + if (tracker.socket && tracker.socket.readyState === 1 && !tracker._subscribed) { + console.log('Sending subscribe to ' + tracker.announceUrl) + tracker.socket.send(JSON.stringify({ + action: 'subscribe', + key: pubKeyHex + })) + tracker._subscribed = true // simple flag to avoid spamming + + // Listen for responses + const originalOnMessage = tracker.socket.onmessage + tracker.socket.onmessage = (event) => { + let data + try { data = JSON.parse(event.data) } catch (e) { return } + + if (data.action === 'publish') { + console.log('\n>>> RECEIVED UPDATE <<<') + const valid = validateManifest(data.manifest) + if (valid && data.manifest.publicKey === pubKeyHex) { + console.log('VERIFIED UPDATE from ' + pubKeyHex) + console.log('Sequence:', data.manifest.sequence) + + const items = data.manifest.collections[0].items + console.log(`Received ${items.length} items.`) + + // Auto-Download Logic (Prototype) + items.forEach(async (item, idx) => { + if (typeof item === 'object' && item.chunks) { + console.log(`Item ${idx}: Detected Megatorrent FileEntry: ${item.name}`) + try { + const fileBuf = await reassemble(item, async (blobId) => { + const p = path.join(argv.dir, blobId) + if (fs.existsSync(p)) { + return fs.readFileSync(p) + } + // TODO: Network fetch + console.log(`Blob ${blobId} not found locally.`) + return null + }) + + if (fileBuf) { + const outPath = path.join(argv.dir, 'downloaded_' + item.name) + fs.writeFileSync(outPath, fileBuf) + console.log(`SUCCESS: Reassembled to ${outPath}`) + } + } catch (err) { + console.error('Failed to reassemble:', err.message) + } + } else { + console.log(`Item ${idx}: Standard Magnet/Text: ${item}`) + } + }) + } else { + console.error('Invalid signature or wrong key!') + } + } else { + if (originalOnMessage) originalOnMessage(event) + } + } + } + } + }, 1000) +} diff --git a/reference-client/lib/crypto.js b/reference-client/lib/crypto.js new file mode 100644 index 00000000..7ee9aaf3 --- /dev/null +++ b/reference-client/lib/crypto.js @@ -0,0 +1,20 @@ +import sodium from 'sodium-native' + +export function generateKeypair () { + const publicKey = Buffer.alloc(sodium.crypto_sign_PUBLICKEYBYTES) + const secretKey = Buffer.alloc(sodium.crypto_sign_SECRETKEYBYTES) + sodium.crypto_sign_keypair(publicKey, secretKey) + return { publicKey, secretKey } +} + +export function sign (message, secretKey) { + const signature = Buffer.alloc(sodium.crypto_sign_BYTES) + const msgBuffer = Buffer.isBuffer(message) ? message : Buffer.from(message) + sodium.crypto_sign_detached(signature, msgBuffer, secretKey) + return signature +} + +export function verify (message, signature, publicKey) { + const msgBuffer = Buffer.isBuffer(message) ? message : Buffer.from(message) + return sodium.crypto_sign_verify_detached(signature, msgBuffer, publicKey) +} diff --git a/reference-client/lib/manifest.js b/reference-client/lib/manifest.js new file mode 100644 index 00000000..854bf264 --- /dev/null +++ b/reference-client/lib/manifest.js @@ -0,0 +1,41 @@ +import stringify from 'fast-json-stable-stringify' +import { sign, verify } from './crypto.js' + +export function createManifest (keypair, sequence, collections) { + const payload = { + publicKey: keypair.publicKey.toString('hex'), + sequence, + timestamp: Date.now(), + collections + } + + // Canonicalize string for signing + const jsonString = stringify(payload) + + // Sign + const signature = sign(jsonString, keypair.secretKey) + + return { + ...payload, + signature: signature.toString('hex') + } +} + +export function validateManifest (manifest) { + if (!manifest || typeof manifest !== 'object') throw new Error('Invalid manifest') + if (!manifest.publicKey || !manifest.signature) throw new Error('Missing keys') + + // Reconstruct the payload to verify (exclude signature) + const payload = { + publicKey: manifest.publicKey, + sequence: manifest.sequence, + timestamp: manifest.timestamp, + collections: manifest.collections + } + + const jsonString = stringify(payload) + const publicKey = Buffer.from(manifest.publicKey, 'hex') + const signature = Buffer.from(manifest.signature, 'hex') + + return verify(jsonString, signature, publicKey) +} diff --git a/reference-client/lib/storage.js b/reference-client/lib/storage.js new file mode 100644 index 00000000..62566948 --- /dev/null +++ b/reference-client/lib/storage.js @@ -0,0 +1,119 @@ +import sodium from 'sodium-native' +import crypto from 'crypto' // for SHA256 (sodium has it too but standard lib is fine for hashing) + +const CHUNK_SIZE = 1024 * 1024 // 1MB chunks + +// Helper to hash a buffer +function sha256 (buffer) { + const hash = crypto.createHash('sha256') + hash.update(buffer) + return hash.digest('hex') +} + +/** + * Ingests a file buffer and returns a FileEntry (for manifest) and a list of Blobs (to store). + * + * @param {Buffer} fileBuffer + * @param {string} fileName + * @returns { fileEntry, blobs } + */ +export function ingest (fileBuffer, fileName) { + const totalSize = fileBuffer.length + const chunks = [] + const blobs = [] + + let offset = 0 + while (offset < totalSize) { + const end = Math.min(offset + CHUNK_SIZE, totalSize) + const chunkData = fileBuffer.slice(offset, end) + + // 1. Generate Encryption Params + const key = Buffer.alloc(sodium.crypto_aead_chacha20poly1305_ietf_KEYBYTES) + const nonce = Buffer.alloc(sodium.crypto_aead_chacha20poly1305_ietf_NPUBBYTES) + sodium.randombytes_buf(key) + sodium.randombytes_buf(nonce) + + // 2. Encrypt + const ciphertext = Buffer.alloc(chunkData.length + sodium.crypto_aead_chacha20poly1305_ietf_ABYTES) + sodium.crypto_aead_chacha20poly1305_ietf_encrypt( + ciphertext, + chunkData, + null, // aad + null, // nsec + nonce, + key + ) + + // 3. Create Blob (In this ref impl, 1 Encrypted Chunk = 1 Blob. Future: Muxing) + const blobBuffer = ciphertext + const blobId = sha256(blobBuffer) + + blobs.push({ + id: blobId, + buffer: blobBuffer + }) + + // 4. Record Metadata + chunks.push({ + blobId, + offset: 0, // 1:1 mapping for now + length: blobBuffer.length, // length includes auth tag + key: key.toString('hex'), + nonce: nonce.toString('hex') + }) + + offset = end + } + + return { + fileEntry: { + name: fileName, + size: totalSize, + chunks + }, + blobs + } +} + +/** + * Reassembles a file from a FileEntry and a getBlob function. + * + * @param {Object} fileEntry + * @param {Function} getBlobFn - async (blobId) -> Buffer + * @returns {Promise} + */ +export async function reassemble (fileEntry, getBlobFn) { + const parts = [] + + for (const chunkMeta of fileEntry.chunks) { + // 1. Fetch Blob + const blobBuffer = await getBlobFn(chunkMeta.blobId) + if (!blobBuffer) throw new Error(`Blob ${chunkMeta.blobId} not found`) + + // 2. Extract Encrypted Chunk (Handle Muxing logic here if/when implemented) + // For now, it's 1:1 + const ciphertext = blobBuffer.slice(chunkMeta.offset, chunkMeta.offset + chunkMeta.length) + + // 3. Decrypt + const key = Buffer.from(chunkMeta.key, 'hex') + const nonce = Buffer.from(chunkMeta.nonce, 'hex') + const plaintext = Buffer.alloc(ciphertext.length - sodium.crypto_aead_chacha20poly1305_ietf_ABYTES) + + try { + sodium.crypto_aead_chacha20poly1305_ietf_decrypt( + plaintext, + null, + ciphertext, + null, + nonce, + key + ) + } catch (err) { + throw new Error(`Decryption failed for blob ${chunkMeta.blobId}`) + } + + parts.push(plaintext) + } + + return Buffer.concat(parts) +} diff --git a/server.js b/server.js index f7023d99..1e22fec7 100644 --- a/server.js +++ b/server.js @@ -14,6 +14,8 @@ import Swarm from './lib/server/swarm.js' import parseHttpRequest from './lib/server/parse-http.js' import parseUdpRequest from './lib/server/parse-udp.js' import parseWebSocketRequest from './lib/server/parse-websocket.js' +import { validateManifest } from './lib/manifest.js' +import { LRUCache } from 'lru-cache' const debug = Debug('bittorrent-tracker:server') const hasOwnProperty = Object.prototype.hasOwnProperty @@ -54,6 +56,11 @@ class Server extends EventEmitter { this.destroyed = false this.torrents = {} + // LRU Cache for Manifests (max 1000 items) + this.manifests = new LRUCache({ max: 1000 }) + + this.subscriptions = {} // PublicKey -> Set + this.http = null this.udp4 = null this.udp6 = null @@ -631,11 +638,101 @@ class Server extends EventEmitter { this._onAnnounce(params, cb) } else if (params && params.action === common.ACTIONS.SCRAPE) { this._onScrape(params, cb) + } else if (params && params.action === 'publish') { + this._onPublish(params, cb) + } else if (params && params.action === 'subscribe') { + this._onSubscribe(params, cb) } else { cb(new Error('Invalid action')) } } + _onPublish (params, cb) { + try { + if (!validateManifest(params.manifest)) { + return cb(new Error('Invalid signature')) + } + } catch (e) { + return cb(new Error('Manifest validation failed: ' + e.message)) + } + + const key = params.manifest.publicKey + const current = this.manifests.get(key) + + if (current && params.manifest.sequence <= current.sequence) { + return cb(new Error('Sequence too low')) + } + + // Store manifest + this.manifests.set(key, params.manifest) + debug('Manifest updated for %s seq=%d', key, params.manifest.sequence) + + // Broadcast to subscribers + if (this.subscriptions[key]) { + this.subscriptions[key].forEach(socket => { + if (socket.readyState === 1) { // OPEN + try { + socket.send(JSON.stringify({ + action: 'publish', + manifest: params.manifest + })) + } catch (err) { + debug('Broadcast failed for peer %s: %s', socket.peerId, err.message) + } + } + }) + } + + cb(null, { action: 'publish', status: 'ok' }) + } + + _onSubscribe (params, cb) { + const key = params.key + const socket = params.socket + + if (!this.subscriptions[key]) { + this.subscriptions[key] = new Set() + } + this.subscriptions[key].add(socket) + + // Track subscriptions on the socket for efficient cleanup + if (!socket.subscribedKeys) { + socket.subscribedKeys = new Set() + } + socket.subscribedKeys.add(key) + + // Send latest if available + const cached = this.manifests.get(key) + if (cached) { + try { + socket.send(JSON.stringify({ + action: 'publish', + manifest: cached + })) + } catch (err) { + debug('Initial send failed for peer %s: %s', socket.peerId, err.message) + } + } + + // Cleanup on close + if (!socket._cleanupSetup) { + socket.on('close', () => { + if (socket.subscribedKeys) { + for (const k of socket.subscribedKeys) { + if (this.subscriptions[k]) { + this.subscriptions[k].delete(socket) + if (this.subscriptions[k].size === 0) delete this.subscriptions[k] + } + } + socket.subscribedKeys.clear() + } + }) + socket._cleanupSetup = true + } + + cb(null, { action: 'subscribe', status: 'ok' }) + } + _onAnnounce (params, cb) { const self = this diff --git a/test/megatorrent-validation.js b/test/megatorrent-validation.js new file mode 100644 index 00000000..ad838b9c --- /dev/null +++ b/test/megatorrent-validation.js @@ -0,0 +1,83 @@ +import test from 'tape' +import Server from '../server.js' +import WebSocket from 'ws' +import { generateKeypair, sign } from '../reference-client/lib/crypto.js' +import stringify from 'fast-json-stable-stringify' + +function createTracker (opts, cb) { + const server = new Server(opts) + server.on('listening', () => cb(server)) + server.listen(0) +} + +test('Megatorrent: Validation and Cleanup', function (t) { + t.plan(5) + + createTracker({ udp: false, http: false, ws: true, stats: false }, function (server) { + const port = server.ws.address().port + const trackerUrl = `ws://localhost:${port}` + const keypair = generateKeypair() + + const publisher = new WebSocket(trackerUrl) + + publisher.on('open', () => { + // Test 1: Invalid Signature + const invalidManifest = { + publicKey: keypair.publicKey.toString('hex'), + sequence: 1, + timestamp: Date.now(), + collections: [], + signature: Buffer.alloc(64).toString('hex') // invalid sig + } + + publisher.send(JSON.stringify({ + action: 'publish', + manifest: invalidManifest + })) + }) + + publisher.on('message', (data) => { + const msg = JSON.parse(data) + + if (msg['failure reason']) { + // We expect failures for invalid inputs + if (msg['failure reason'].includes('Invalid signature') || msg['failure reason'].includes('validation failed')) { + t.pass('Rejected invalid signature/manifest') + + // Now proceed to clean up test + publisher.close() + + // Test 2: Memory Leak / Cleanup Check + // Create a subscriber that subscribes and then disconnects + const subscriber = new WebSocket(trackerUrl) + subscriber.on('open', () => { + const k = keypair.publicKey.toString('hex') + subscriber.send(JSON.stringify({ action: 'subscribe', key: k })) + + // Allow server to process + setTimeout(() => { + t.ok(server.subscriptions[k], 'Subscription active') + t.equal(server.subscriptions[k].size, 1, 'One subscriber') + + subscriber.close() + + // Wait for cleanup + setTimeout(() => { + if (!server.subscriptions[k]) { + t.pass('Subscription Set removed after last subscriber left') + } else { + t.equal(server.subscriptions[k].size, 0, 'Subscriber removed from set') + } + + // Test 3: Cache Existence + t.ok(server.manifests, 'LRU Cache exists') + + server.close() + }, 100) + }, 100) + }) + } + } + }) + }) +})