Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions ext/node/polyfills/worker_threads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class NodeWorker extends EventEmitter {
#messagePromise = undefined;
#controlPromise = undefined;
#workerOnline = false;
#exited = false;
// "RUNNING" | "CLOSED" | "TERMINATED"
// "TERMINATED" means that any controls or messages received will be
// discarded. "CLOSED" means that we have received a control
Expand Down Expand Up @@ -229,14 +230,23 @@ class NodeWorker extends EventEmitter {
switch (type) {
case 1: { // TerminalError
this.#status = "CLOSED";
} /* falls through */
if (!this.#exited) {
this.#exited = true;
this.emit("exit", 1);
}
return;
}
case 2: { // Error
this.#handleError(data);
break;
}
case 3: { // Close
debugWT(`Host got "close" message from worker: ${this.#name}`);
this.#status = "CLOSED";
if (!this.#exited) {
this.#exited = true;
this.emit("exit", 0);
}
return;
}
default: {
Expand Down Expand Up @@ -311,11 +321,18 @@ class NodeWorker extends EventEmitter {

// https://nodejs.org/api/worker_threads.html#workerterminate
terminate() {
if (this.#status !== "TERMINATED") {
this.#status = "TERMINATED";
op_host_terminate_worker(this.#id);
if (this.#status === "TERMINATED") {
return PromiseResolve(0);
}

this.#status = "TERMINATED";
op_host_terminate_worker(this.#id);

if (!this.#exited) {
this.#exited = true;
this.emit("exit", 0);
}

return PromiseResolve(0);
}

Expand Down
84 changes: 84 additions & 0 deletions tests/unit_node/worker_threads_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -891,3 +891,87 @@ Deno.test("[node/worker_threads] Worker works with CJS require", async () => {

await recvMessage.promise;
});

Deno.test({
name: "[node/worker_threads] terminate emits 'exit' and stops receiving",
async fn() {
const recv: string[] = [];
const done = Promise.withResolvers<void>();

const worker = new (await import("node:worker_threads")).Worker(
`
import { parentPort } from "node:worker_threads";
// Periodically send messages to simulate ongoing work.
const id = setInterval(() => {
parentPort.postMessage("tick");
}, 10);

parentPort.on("message", (m) => {
if (m === "stop") {
clearInterval(id);
// Attempt to send one more message after stop;
// the main thread should not receive it after terminate().
parentPort.postMessage("last");
}
});
`,
{ eval: true },
);

const first = await once(worker, "message");
recv.push(first[0]);

const exitP = new Promise<number>((resolve) =>
worker.once("exit", (code) => resolve(code))
);

worker.postMessage("stop");

const termRet = worker.terminate();
const code = await exitP;

if (typeof termRet.then === "function") {
const v = await termRet;
assertEquals(v, 0);
}
assertEquals(code, 0);

setTimeout(() => done.resolve(), 50);
await done.promise;

assertEquals(recv[0], "tick");
assertEquals(recv.length, 1);
},
sanitizeResources: false,
});

Deno.test({
name:
"[node/worker_threads] 'online' fires before first user message (pollMessages)",
async fn() {
const wt = await import("node:worker_threads");
let gotOnline = false;

const worker = new wt.Worker(
`
import { parentPort } from "node:worker_threads";
// When the worker becomes ready, it will receive 'ping' and respond with 'pong'.
parentPort.on("message", (m) => {
if (m === "ping") parentPort.postMessage("pong");
});
`,
{ eval: true },
);

worker.on("online", () => {
gotOnline = true;
worker.postMessage("ping");
});

const msg = await once(worker, "message");
assertEquals(msg[0], "pong");
assertEquals(gotOnline, true);

await worker.terminate();
},
});
Loading