Skip to content

Commit 1edd1fb

Browse files
fix(ext/node): ensure 'exit' event is fired only once for worker_threads (#31231)
Closes #30013 The `'exit'` event in `node:worker_threads` was not consistently fired, or in some cases emitted multiple times, diverging from Node.js behavior.
1 parent 5ed0646 commit 1edd1fb

File tree

2 files changed

+105
-4
lines changed

2 files changed

+105
-4
lines changed

ext/node/polyfills/worker_threads.ts

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ class NodeWorker extends EventEmitter {
9999
#messagePromise = undefined;
100100
#controlPromise = undefined;
101101
#workerOnline = false;
102+
#exited = false;
102103
// "RUNNING" | "CLOSED" | "TERMINATED"
103104
// "TERMINATED" means that any controls or messages received will be
104105
// discarded. "CLOSED" means that we have received a control
@@ -229,14 +230,23 @@ class NodeWorker extends EventEmitter {
229230
switch (type) {
230231
case 1: { // TerminalError
231232
this.#status = "CLOSED";
232-
} /* falls through */
233+
if (!this.#exited) {
234+
this.#exited = true;
235+
this.emit("exit", 1);
236+
}
237+
return;
238+
}
233239
case 2: { // Error
234240
this.#handleError(data);
235241
break;
236242
}
237243
case 3: { // Close
238244
debugWT(`Host got "close" message from worker: ${this.#name}`);
239245
this.#status = "CLOSED";
246+
if (!this.#exited) {
247+
this.#exited = true;
248+
this.emit("exit", 0);
249+
}
240250
return;
241251
}
242252
default: {
@@ -311,11 +321,18 @@ class NodeWorker extends EventEmitter {
311321

312322
// https://nodejs.org/api/worker_threads.html#workerterminate
313323
terminate() {
314-
if (this.#status !== "TERMINATED") {
315-
this.#status = "TERMINATED";
316-
op_host_terminate_worker(this.#id);
324+
if (this.#status === "TERMINATED") {
325+
return PromiseResolve(0);
326+
}
327+
328+
this.#status = "TERMINATED";
329+
op_host_terminate_worker(this.#id);
330+
331+
if (!this.#exited) {
332+
this.#exited = true;
317333
this.emit("exit", 0);
318334
}
335+
319336
return PromiseResolve(0);
320337
}
321338

tests/unit_node/worker_threads_test.ts

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -891,3 +891,87 @@ Deno.test("[node/worker_threads] Worker works with CJS require", async () => {
891891

892892
await recvMessage.promise;
893893
});
894+
895+
Deno.test({
896+
name: "[node/worker_threads] terminate emits 'exit' and stops receiving",
897+
async fn() {
898+
const recv: string[] = [];
899+
const done = Promise.withResolvers<void>();
900+
901+
const worker = new (await import("node:worker_threads")).Worker(
902+
`
903+
import { parentPort } from "node:worker_threads";
904+
// Periodically send messages to simulate ongoing work.
905+
const id = setInterval(() => {
906+
parentPort.postMessage("tick");
907+
}, 10);
908+
909+
parentPort.on("message", (m) => {
910+
if (m === "stop") {
911+
clearInterval(id);
912+
// Attempt to send one more message after stop;
913+
// the main thread should not receive it after terminate().
914+
parentPort.postMessage("last");
915+
}
916+
});
917+
`,
918+
{ eval: true },
919+
);
920+
921+
const first = await once(worker, "message");
922+
recv.push(first[0]);
923+
924+
const exitP = new Promise<number>((resolve) =>
925+
worker.once("exit", (code) => resolve(code))
926+
);
927+
928+
worker.postMessage("stop");
929+
930+
const termRet = worker.terminate();
931+
const code = await exitP;
932+
933+
if (typeof termRet.then === "function") {
934+
const v = await termRet;
935+
assertEquals(v, 0);
936+
}
937+
assertEquals(code, 0);
938+
939+
setTimeout(() => done.resolve(), 50);
940+
await done.promise;
941+
942+
assertEquals(recv[0], "tick");
943+
assertEquals(recv.length, 1);
944+
},
945+
sanitizeResources: false,
946+
});
947+
948+
Deno.test({
949+
name:
950+
"[node/worker_threads] 'online' fires before first user message (pollMessages)",
951+
async fn() {
952+
const wt = await import("node:worker_threads");
953+
let gotOnline = false;
954+
955+
const worker = new wt.Worker(
956+
`
957+
import { parentPort } from "node:worker_threads";
958+
// When the worker becomes ready, it will receive 'ping' and respond with 'pong'.
959+
parentPort.on("message", (m) => {
960+
if (m === "ping") parentPort.postMessage("pong");
961+
});
962+
`,
963+
{ eval: true },
964+
);
965+
966+
worker.on("online", () => {
967+
gotOnline = true;
968+
worker.postMessage("ping");
969+
});
970+
971+
const msg = await once(worker, "message");
972+
assertEquals(msg[0], "pong");
973+
assertEquals(gotOnline, true);
974+
975+
await worker.terminate();
976+
},
977+
});

0 commit comments

Comments
 (0)