-
-
Notifications
You must be signed in to change notification settings - Fork 95
Expand file tree
/
Copy pathworker.ts
More file actions
125 lines (110 loc) · 3.34 KB
/
worker.ts
File metadata and controls
125 lines (110 loc) · 3.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import type { Settings } from "../settings.ts";
import type { Tracker } from "../tracker.ts";
import { sendMessage, UWebSocketsTracker } from "../uws-tracker.ts";
import type { UwsConnectionContext } from "../uws-tracker.ts";
import { readFileSync } from "fs";
import { MessagePort } from "worker_threads";
import {
isMainThread,
workerData,
parentPort,
threadId,
} from "node:worker_threads";
import { MultiWorkerTracker } from "../multi-worker-tracker/index.ts";
import { buildUwsTracker } from "../build-uws-tracker.ts";
import type {
AppDescriptorMessage,
AppsStatsResponse,
ServerWorkerInMessage,
ServerWorkerOutMessage,
WorkerDataType,
} from "./types.ts";
// TODO:
// - test what host and port workers require
// - handle socket bind errors here
// - configure worker ports
if (!isMainThread && parentPort) {
const { settings, trackerPorts } = workerData as WorkerDataType;
const tracker = new MultiWorkerTracker(trackerPorts, sendMessage);
await runSocketApp(tracker, settings, parentPort);
}
async function runSocketApp(
tracker: Tracker<UwsConnectionContext>,
settings: Settings,
parentPort: MessagePort,
): Promise<void> {
let indexHtml: Buffer | undefined = undefined;
try {
indexHtml = readFileSync("index.html");
} catch (e) {
if ((e as { code?: string }).code !== "ENOENT") {
throw e;
}
}
const servers: UWebSocketsTracker[] = [];
const getAppsStats = async () => {
return new Promise<AppsStatsResponse["stats"]>((resolve) => {
const id = Math.random();
const listener = (message: ServerWorkerInMessage) => {
if (message.type === "appsStats" && message.id === id) {
resolve(message.stats);
parentPort.removeListener("message", listener);
}
};
parentPort.addListener("message", listener);
parentPort.postMessage({
id,
type: "getAppsStats",
} satisfies ServerWorkerOutMessage);
});
};
const serverPromises = settings.servers.map(
async (serverSettings, appIndex) => {
const server = buildUwsTracker({
tracker,
serverSettings: {
server: {
...serverSettings.server,
port: (serverSettings.server?.port ?? 8000) + 10000,
},
...serverSettings,
},
websocketsAccess: settings.websocketsAccess,
indexHtml,
getServersStats: getAppsStats,
});
servers.push(server);
await server.run();
// The worker sends back its descriptor to the main acceptor
parentPort.postMessage({
type: "appDescriptor",
appIndex,
workerAppDescriptor: (
server.app as unknown as { getDescriptor: () => unknown }
).getDescriptor(),
} satisfies AppDescriptorMessage);
},
);
parentPort.on("message", (message: ServerWorkerInMessage) => {
if (message.type === "getAppStats") {
parentPort.postMessage({
type: "appStats",
id: message.id,
stats: servers.reduce(
(acc, server) => {
return {
threadId,
webSocketsCount:
acc.webSocketsCount + server.stats.webSocketsCount,
};
},
{
threadId,
webSocketsCount: 0,
},
),
} satisfies ServerWorkerOutMessage);
}
});
await Promise.all(serverPromises);
}