diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index a3729e14672cb7..00a69acb4c2291 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -99,6 +99,7 @@ class NodeWorker extends EventEmitter { #messagePromise = undefined; #controlPromise = undefined; #workerOnline = false; + #exited = false; // "RUNNING" | "CLOSED" | "TERMINATED" // "TERMINATED" means that any controls or messages received will be // discarded. "CLOSED" means that we have received a control @@ -229,7 +230,12 @@ class NodeWorker extends EventEmitter { switch (type) { case 1: { // TerminalError this.#status = "CLOSED"; - } /* falls through */ + if (!this.#exited) { + this.#exited = true; + this.emit("exit", 1); + } + return; + } case 2: { // Error this.#handleError(data); break; @@ -237,6 +243,10 @@ class NodeWorker extends EventEmitter { case 3: { // Close debugWT(`Host got "close" message from worker: ${this.#name}`); this.#status = "CLOSED"; + if (!this.#exited) { + this.#exited = true; + this.emit("exit", 0); + } return; } default: { @@ -311,11 +321,18 @@ class NodeWorker extends EventEmitter { // https://nodejs.org/api/worker_threads.html#workerterminate terminate() { - if (this.#status !== "TERMINATED") { - this.#status = "TERMINATED"; - op_host_terminate_worker(this.#id); + if (this.#status === "TERMINATED") { + return PromiseResolve(0); + } + + this.#status = "TERMINATED"; + op_host_terminate_worker(this.#id); + + if (!this.#exited) { + this.#exited = true; this.emit("exit", 0); } + return PromiseResolve(0); } diff --git a/tests/unit_node/worker_threads_test.ts b/tests/unit_node/worker_threads_test.ts index 6e80f4663f1091..1fb72c91ef680f 100644 --- a/tests/unit_node/worker_threads_test.ts +++ b/tests/unit_node/worker_threads_test.ts @@ -891,3 +891,87 @@ Deno.test("[node/worker_threads] Worker works with CJS require", async () => { await recvMessage.promise; }); + +Deno.test({ + name: "[node/worker_threads] terminate emits 'exit' and stops receiving", + async fn() { + const recv: string[] = []; + const done = Promise.withResolvers(); + + const worker = new (await import("node:worker_threads")).Worker( + ` + import { parentPort } from "node:worker_threads"; + // Periodically send messages to simulate ongoing work. + const id = setInterval(() => { + parentPort.postMessage("tick"); + }, 10); + + parentPort.on("message", (m) => { + if (m === "stop") { + clearInterval(id); + // Attempt to send one more message after stop; + // the main thread should not receive it after terminate(). + parentPort.postMessage("last"); + } + }); + `, + { eval: true }, + ); + + const first = await once(worker, "message"); + recv.push(first[0]); + + const exitP = new Promise((resolve) => + worker.once("exit", (code) => resolve(code)) + ); + + worker.postMessage("stop"); + + const termRet = worker.terminate(); + const code = await exitP; + + if (typeof termRet.then === "function") { + const v = await termRet; + assertEquals(v, 0); + } + assertEquals(code, 0); + + setTimeout(() => done.resolve(), 50); + await done.promise; + + assertEquals(recv[0], "tick"); + assertEquals(recv.length, 1); + }, + sanitizeResources: false, +}); + +Deno.test({ + name: + "[node/worker_threads] 'online' fires before first user message (pollMessages)", + async fn() { + const wt = await import("node:worker_threads"); + let gotOnline = false; + + const worker = new wt.Worker( + ` + import { parentPort } from "node:worker_threads"; + // When the worker becomes ready, it will receive 'ping' and respond with 'pong'. + parentPort.on("message", (m) => { + if (m === "ping") parentPort.postMessage("pong"); + }); + `, + { eval: true }, + ); + + worker.on("online", () => { + gotOnline = true; + worker.postMessage("ping"); + }); + + const msg = await once(worker, "message"); + assertEquals(msg[0], "pong"); + assertEquals(gotOnline, true); + + await worker.terminate(); + }, +});