diff --git a/.codesandbox/Dockerfile b/.codesandbox/Dockerfile new file mode 100644 index 00000000..1205b221 --- /dev/null +++ b/.codesandbox/Dockerfile @@ -0,0 +1,3 @@ +RUN apt update && apt install -y curl unzip && curl -fsSL https://deno.land/x/install/install.sh | sh +ENV DENO_INSTALL="/root/.deno" +ENV PATH="$DENO_INSTALL/bin:$PATH" \ No newline at end of file diff --git a/.codesandbox/tasks.json b/.codesandbox/tasks.json new file mode 100644 index 00000000..8fe19d5d --- /dev/null +++ b/.codesandbox/tasks.json @@ -0,0 +1,13 @@ +{ + // These tasks will run in order when initializing your CodeSandbox project. + "setupTasks": [ + { + "name": "curl -fsSL https://deno.land/install.sh | sh", + "command": "curl -fsSL https://deno.land/install.sh | sh" + } + ], + + // These tasks can be run from CodeSandbox. Running one will open a log in the app. + "tasks": { + } +} diff --git a/.dagger/.gitattributes b/.dagger/.gitattributes new file mode 100644 index 00000000..82741846 --- /dev/null +++ b/.dagger/.gitattributes @@ -0,0 +1 @@ +/sdk/** linguist-generated diff --git a/.dagger/.gitignore b/.dagger/.gitignore new file mode 100644 index 00000000..040187c6 --- /dev/null +++ b/.dagger/.gitignore @@ -0,0 +1,4 @@ +/sdk +/**/node_modules/** +/**/.pnpm-store/** +/.env diff --git a/.dagger/.yarn/install-state.gz b/.dagger/.yarn/install-state.gz new file mode 100644 index 00000000..b717a151 Binary files /dev/null and b/.dagger/.yarn/install-state.gz differ diff --git a/.dagger/package-lock.json b/.dagger/package-lock.json new file mode 100644 index 00000000..e19b6234 --- /dev/null +++ b/.dagger/package-lock.json @@ -0,0 +1,25 @@ +{ + "name": ".dagger", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "dependencies": { + "typescript": "5.9.3" + } + }, + "node_modules/typescript": { + "version": "5.9.3", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.9.3.tgz", + "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", + "license": "Apache-2.0", + "bin": { + "tsc": "bin/tsc", + "tsserver": "bin/tsserver" + }, + "engines": { + "node": ">=14.17" + } + } + } +} diff --git a/.dagger/package.json b/.dagger/package.json new file mode 100644 index 00000000..8cdd871f --- /dev/null +++ b/.dagger/package.json @@ -0,0 +1,7 @@ +{ + "type": "module", + "dependencies": { + "typescript": "5.9.3" + }, + "packageManager": "yarn@1.22.22+sha512.a6b2f7906b721bba3d67d4aff083df04dad64c399707841b7acf00f6b133b7ac24255f2652fa22ae3534329dc6180534e98d17432037ff6fd140556e2bb3137e" +} diff --git a/.dagger/src/index.ts b/.dagger/src/index.ts new file mode 100644 index 00000000..ad87fad2 --- /dev/null +++ b/.dagger/src/index.ts @@ -0,0 +1,78 @@ +/** + * A generated module for Effectionx functions + * + * This module has been generated via dagger init and serves as a reference to + * basic module structure as you get started with Dagger. + * + * Two functions have been pre-created. You can modify, delete, or add to them, + * as needed. They demonstrate usage of arguments and return types using simple + * echo and grep commands. The functions can be called from the dagger CLI or + * from one of the SDKs. + * + * The first line in this comment block is a short description line and the + * rest is a long description with more detail on the module's purpose or usage, + * if appropriate. All modules should have a short description. + */ +import { dag, Container, Directory, File, object, func, argument } from "@dagger.io/dagger" + +@object() +export class Effectionx { + source: Directory + + constructor( + @argument({ defaultPath: ".", ignore: ["**/.dagger/**/*"] }) + source: Directory, + ) { + this.source = source + } + + @func() + ubuntu(): Container { + return dag + .container() + .from("ubuntu:latest") + .withExec(["apt-get", "update"]) + .withExec(["apt-get", "install", "-y", "software-properties-common"]) + .withExec(["add-apt-repository", "-y", "ppa:colin-king/stress-ng"]) + .withExec(["apt-get", "update"]) + .withExec(["apt-get", "install", "-y", "curl", "unzip", "nodejs", "stress-ng"]) + .withExec(["sh", "-c", "curl -fsSL https://deno.land/install.sh | sh"]) + .withMountedDirectory("/effectionx", this.source) + .withEnvVariable("PATH", "$PATH:/root/.deno/bin", { expand: true }) + .withWorkdir("/effectionx") + .withExec(["deno", "install", "--allow-scripts"]) + .withExec(["deno", "task", "generate-importmap"]) + } + + @func() + async testV3Stress( + @argument({ defaultValue: 100 }) + rounds: number + ): Promise { + const container = this.ubuntu() + .withExec(["bash", "run-test-v3-stress.sh", rounds.toString()]) + + // Get the filename + const filename = await container + .withExec(["sh", "-c", "ls /effectionx/test-summary-v3_*.log"]) + .stdout() + + return container.file(filename.trim()) + } + + @func() + async testV4Stress( + @argument({ defaultValue: 100 }) + rounds: number + ): Promise { + const container = this.ubuntu() + .withExec(["bash", "run-test-v4-stress.sh", rounds.toString()]) + + // Get the filename + const filename = await container + .withExec(["sh", "-c", "ls /effectionx/test-summary-v4_*.log"]) + .stdout() + + return container.file(filename.trim()) + } +} diff --git a/.dagger/tsconfig.json b/.dagger/tsconfig.json new file mode 100644 index 00000000..aab5941a --- /dev/null +++ b/.dagger/tsconfig.json @@ -0,0 +1,17 @@ +{ + "compilerOptions": { + "target": "ES2022", + "moduleResolution": "Node", + "experimentalDecorators": true, + "strict": true, + "skipLibCheck": true, + "paths": { + "@dagger.io/dagger": [ + "./sdk/index.ts" + ], + "@dagger.io/dagger/telemetry": [ + "./sdk/telemetry.ts" + ] + } + } +} \ No newline at end of file diff --git a/.dagger/yarn.lock b/.dagger/yarn.lock new file mode 100644 index 00000000..e6a8a33e --- /dev/null +++ b/.dagger/yarn.lock @@ -0,0 +1,34 @@ +# This file is generated by running "yarn install" inside your project. +# Manual changes might be lost - proceed with caution! + +__metadata: + version: 8 + cacheKey: 10c0 + +"root-workspace-0b6124@workspace:.": + version: 0.0.0-use.local + resolution: "root-workspace-0b6124@workspace:." + dependencies: + typescript: "npm:5.9.3" + languageName: unknown + linkType: soft + +"typescript@npm:5.9.3": + version: 5.9.3 + resolution: "typescript@npm:5.9.3" + bin: + tsc: bin/tsc + tsserver: bin/tsserver + checksum: 10c0/6bd7552ce39f97e711db5aa048f6f9995b53f1c52f7d8667c1abdc1700c68a76a308f579cd309ce6b53646deb4e9a1be7c813a93baaf0a28ccd536a30270e1c5 + languageName: node + linkType: hard + +"typescript@patch:typescript@npm%3A5.9.3#optional!builtin": + version: 5.9.3 + resolution: "typescript@patch:typescript@npm%3A5.9.3#optional!builtin::version=5.9.3&hash=5786d5" + bin: + tsc: bin/tsc + tsserver: bin/tsserver + checksum: 10c0/ad09fdf7a756814dce65bc60c1657b40d44451346858eea230e10f2e95a289d9183b6e32e5c11e95acc0ccc214b4f36289dcad4bf1886b0adb84d711d336a430 + languageName: node + linkType: hard diff --git a/.github/workflows/verify-posix.yaml b/.github/workflows/verify-posix.yaml index a99028f7..df553063 100644 --- a/.github/workflows/verify-posix.yaml +++ b/.github/workflows/verify-posix.yaml @@ -34,5 +34,10 @@ jobs: - run: deno fmt --check - run: deno lint + - run: deno task generate-importmap - - run: deno test --allow-net --allow-read --allow-write --allow-env --allow-run --allow-ffi + - name: V3 Tests + run: deno test --allow-net --allow-read --allow-write --allow-env --allow-run --allow-ffi + + - name: V4 Tests + run: deno test --import-map v4.importmap.json --allow-net --allow-read --allow-write --allow-env --allow-run --allow-ffi --trace-leaks diff --git a/.github/workflows/verify-windows.yaml b/.github/workflows/verify-windows.yaml index 16c4e0e6..3ac350bb 100644 --- a/.github/workflows/verify-windows.yaml +++ b/.github/workflows/verify-windows.yaml @@ -41,20 +41,40 @@ jobs: - run: deno lint - - name: "Test (${{matrix.shell}})" + - run: deno task generate-importmap + + - name: "Test V3 (${{matrix.shell}})" if: matrix.shell == 'powershell' || matrix.shell == 'pwsh' run: | $env:SHELL = "${{matrix.shell}}" deno test --allow-net --allow-read --allow-write --allow-env --allow-run --allow-ffi --trace-leaks - - name: "Test (${{matrix.shell}})" + - name: "Test V4 (${{matrix.shell}})" + if: matrix.shell == 'powershell' || matrix.shell == 'pwsh' + run: | + $env:SHELL = "${{matrix.shell}}" + deno test --import-map v4.importmap.json --allow-net --allow-read --allow-write --allow-env --allow-run --allow-ffi --trace-leaks + + - name: "Test V3 (${{matrix.shell}})" if: matrix.shell == 'cmd' run: | set SHELL=${{matrix.shell}} deno test --allow-net --allow-read --allow-write --allow-env --allow-run --allow-ffi --trace-leaks - - name: "Test (${{matrix.shell}})" + - name: "Test V4 (${{matrix.shell}})" + if: matrix.shell == 'cmd' + run: | + set SHELL=${{matrix.shell}} + deno test --import-map v4.importmap.json --allow-net --allow-read --allow-write --allow-env --allow-run --allow-ffi --trace-leaks + + - name: "Test V3 (${{matrix.shell}})" if: matrix.shell == 'bash' run: | export SHELL=${{matrix.shell}} deno test --allow-net --allow-read --allow-write --allow-env --allow-run --allow-ffi --trace-leaks + + - name: "Test V4 (${{matrix.shell}})" + if: matrix.shell == 'bash' + run: | + export SHELL=${{matrix.shell}} + deno test --import-map v4.importmap.json --allow-net --allow-read --allow-write --allow-env --allow-run --allow-ffi --trace-leaks diff --git a/.gitignore b/.gitignore index 6847cb4a..85db91de 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ /deno.lock /**/build/ +/v4.importmap.json .vscode \ No newline at end of file diff --git a/dagger.json b/dagger.json new file mode 100644 index 00000000..47658cf5 --- /dev/null +++ b/dagger.json @@ -0,0 +1,8 @@ +{ + "name": "effectionx", + "engineVersion": "v0.19.4", + "sdk": { + "source": "typescript" + }, + "source": ".dagger" +} diff --git a/deno.json b/deno.json index 8f1d9f28..6ac40a14 100644 --- a/deno.json +++ b/deno.json @@ -3,7 +3,8 @@ "effection": "npm:effection@^3", "@deno/dnt": "jsr:@deno/dnt@0.42.3", "@std/path": "jsr:@std/path@^1", - "zod": "npm:zod@3.23.8" + "zod": "npm:zod@3.23.8", + "@std/fs": "jsr:@std/fs@^1" }, "compilerOptions": { "lib": [ @@ -20,7 +21,10 @@ "prefer-const", "require-yield" ] - } + }, + "exclude": [ + "tasks" + ] }, "workspace": [ "./context-api", @@ -39,5 +43,10 @@ "./stream-helpers", "./signals", "./process" - ] + ], + "tasks": { + "generate-importmap": "deno run --allow-read --allow-write tasks/generate-importmap.ts", + "test": "deno test -A --trace-leaks", + "test:v4": "deno test -A --import-map=v4.importmap.json --trace-leaks" + } } diff --git a/fx/parallel.test.ts b/fx/parallel.test.ts index 7bfcfba7..9ba7f201 100644 --- a/fx/parallel.test.ts +++ b/fx/parallel.test.ts @@ -1,6 +1,6 @@ import { describe, it } from "@effectionx/bdd"; import { expect } from "@std/expect"; -import { each, Err, Ok, sleep, spawn } from "effection"; +import { each, Err, Ok, sleep, spawn, until } from "effection"; import { parallel } from "./parallel.ts"; @@ -101,7 +101,8 @@ describe("parallel()", () => { yield* sleep(15); two.resolve(2); }); - const results = yield* parallel([one, () => two.promise]); + + const results = yield* parallel([one, () => until(two.promise)]); expect(yield* results).toEqual([Ok(1), Ok(2)]); }); @@ -112,7 +113,10 @@ describe("parallel()", () => { function* genFn() { try { - const results = yield* parallel([() => one.promise, () => two.promise]); + const results = yield* parallel([ + () => until(one.promise), + () => until(two.promise), + ]); actual = yield* results; } catch (_) { actual = [Err(new Error("should not get hit"))]; diff --git a/fx/parallel.ts b/fx/parallel.ts index c7e2ee5a..e10007b1 100644 --- a/fx/parallel.ts +++ b/fx/parallel.ts @@ -1,10 +1,9 @@ -import type { Callable, Channel, Operation, Result } from "effection"; +import type { Channel, Operation, Result, Task } from "effection"; import { createChannel, resource, spawn } from "effection"; import { safe } from "./safe.ts"; -import type { Computation } from "./type.ts"; -export interface ParallelRet extends Computation[]> { +export interface ParallelRet extends Operation[]> { sequence: Channel, void>; immediate: Channel, void>; } @@ -61,8 +60,8 @@ export interface ParallelRet extends Computation[]> { * } * ``` */ -export function parallel( - operations: Callable[], +export function parallel( + operations: ((...args: TArgs) => Operation)[], ): Operation> { const sequence = createChannel>(); const immediate = createChannel>(); @@ -70,7 +69,7 @@ export function parallel( return resource>(function* (provide) { const task = yield* spawn(function* () { - const tasks = []; + const tasks = [] as Task>[]; for (const op of operations) { tasks.push( yield* spawn(function* () { @@ -95,7 +94,6 @@ export function parallel( yield* task; return results; } - yield* provide({ sequence, immediate, diff --git a/fx/race.test.ts b/fx/race.test.ts index ee1fc7d3..c4d96c49 100644 --- a/fx/race.test.ts +++ b/fx/race.test.ts @@ -10,12 +10,12 @@ describe("raceMap()", () => { function* () { let winner; const results = yield* raceMap({ - first: function* () { + *first() { yield* sleep(10); winner = "first"; return "first"; }, - second: function* () { + *second() { yield* sleep(20); winner = "second"; return "second"; diff --git a/fx/race.ts b/fx/race.ts index de23dd08..05620028 100644 --- a/fx/race.ts +++ b/fx/race.ts @@ -1,9 +1,9 @@ // deno-lint-ignore-file no-explicit-any -import type { Callable, Operation, Task } from "effection"; -import { action, call, resource, spawn } from "effection"; +import type { Operation, Task } from "effection"; +import { resource, spawn, withResolvers } from "effection"; -interface OpMap { - [key: string]: Callable; +interface OpMap { + [key: string]: (...args: TArgs) => Operation; } export function raceMap(opMap: OpMap): Operation< @@ -16,21 +16,26 @@ export function raceMap(opMap: OpMap): Operation< return resource(function* Race(provide) { const keys = Object.keys(opMap); const taskMap: { [key: string]: Task } = {}; - const resultMap: { [key: keyof OpMap]: OpMap[keyof OpMap] } = {}; + const resultMap: { [key: keyof OpMap]: ReturnType } = + {}; + + function* start() { + const resolvers = withResolvers(); - const winner = yield* action>(function* (resolve) { for (let i = 0; i < keys.length; i += 1) { const key = keys[i]; yield* spawn(function* () { - const task = yield* spawn(function* () { - yield* call(opMap[key] as any); - }); + const task = yield* spawn(opMap[key]); taskMap[key] = task; - (resultMap as any)[key] = yield* task; - resolve(task); + (resultMap[key] as any) = yield* task; + resolvers.resolve(task); }); } - }); + + return yield* resolvers.operation; + } + + const winner = yield* start(); for (let i = 0; i < keys.length; i += 1) { const key = keys[i]; diff --git a/fx/request.test.ts b/fx/request.test.ts index bc33a001..27af4ee3 100644 --- a/fx/request.test.ts +++ b/fx/request.test.ts @@ -1,9 +1,9 @@ import { beforeEach, describe, it } from "@effectionx/bdd"; import { expect } from "@std/expect"; -import { json, request } from "./request.ts"; import { call, ensure, withResolvers } from "effection"; import { createServer } from "node:http"; +import { json, request } from "./request.ts"; // Ensure to run tests with --allow-net permission describe("request() and json()", () => { diff --git a/fx/safe.ts b/fx/safe.ts index 015b1a19..e6ab9023 100644 --- a/fx/safe.ts +++ b/fx/safe.ts @@ -1,4 +1,4 @@ -import type { Callable, Operation, Result } from "effection"; +import type { Operation, Result } from "effection"; import { call, Err, Ok } from "effection"; /** @@ -23,10 +23,11 @@ function isError(error: unknown): error is Error { return error instanceof Error; } -export function* safe(operator: Callable): Operation> { +export function* safe( + operator: (...args: TArgs) => Operation, +): Operation> { try { - // deno-lint-ignore no-explicit-any - const value = yield* call(operator as any); + const value = yield* call(operator); return Ok(value); } catch (error) { return Err(isError(error) ? error : new Error(String(error))); diff --git a/process/src/exec/posix.ts b/process/src/exec/posix.ts index 08336b45..81c6e97b 100644 --- a/process/src/exec/posix.ts +++ b/process/src/exec/posix.ts @@ -4,6 +4,7 @@ import { createSignal, Err, Ok, + resource, type Result, spawn, withResolvers, @@ -16,78 +17,107 @@ import { ExecError } from "./error.ts"; type ProcessResultValue = [number?, string?]; -export const createPosixProcess: CreateOSProcess = function* createPosixProcess( +export const createPosixProcess: CreateOSProcess = ( command, options, -) { - let processResult = withResolvers>(); +) => { + return resource(function* (provide) { + let processResult = withResolvers>(); - // Killing all child processes started by this command is surprisingly - // tricky. If a process spawns another processes and we kill the parent, - // then the child process is NOT automatically killed. Instead we're using - // the `detached` option to force the child into its own process group, - // which all of its children in turn will inherit. By sending the signal to - // `-pid` rather than `pid`, we are sending it to the entire process group - // instead. This will send the signal to all processes started by the child - // process. - // - // More information here: https://unix.stackexchange.com/questions/14815/process-descendants - let childProcess = spawnProcess(command, options.arguments || [], { - detached: true, - shell: options.shell, - env: options.env, - cwd: options.cwd, - stdio: "pipe", - }); + // Killing all child processes started by this command is surprisingly + // tricky. If a process spawns another processes and we kill the parent, + // then the child process is NOT automatically killed. Instead we're using + // the `detached` option to force the child into its own process group, + // which all of its children in turn will inherit. By sending the signal to + // `-pid` rather than `pid`, we are sending it to the entire process group + // instead. This will send the signal to all processes started by the child + // process. + // + // More information here: https://unix.stackexchange.com/questions/14815/process-descendants + let childProcess = spawnProcess(command, options.arguments || [], { + detached: true, + shell: options.shell, + env: options.env, + cwd: options.cwd, + stdio: "pipe", + }); - let { pid } = childProcess; + let { pid } = childProcess; - let io = { - stdout: yield* useReadable(childProcess.stdout), - stderr: yield* useReadable(childProcess.stderr), - stdoutDone: withResolvers(), - stderrDone: withResolvers(), - }; + let io = { + stdout: yield* useReadable(childProcess.stdout), + stderr: yield* useReadable(childProcess.stderr), + stdoutDone: withResolvers(), + stderrDone: withResolvers(), + }; - let stdout = createSignal(); - let stderr = createSignal(); + let stdout = createSignal(); + let stderr = createSignal(); - yield* spawn(function* () { - let next = yield* io.stdout.next(); - while (!next.done) { - stdout.send(next.value); - next = yield* io.stdout.next(); - } - stdout.close(); - io.stdoutDone.resolve(); - }); + yield* spawn(function* () { + let next = yield* io.stdout.next(); + while (!next.done) { + stdout.send(next.value); + next = yield* io.stdout.next(); + } + stdout.close(); + io.stdoutDone.resolve(); + }); - yield* spawn(function* () { - let next = yield* io.stderr.next(); - while (!next.done) { - stderr.send(next.value); - next = yield* io.stderr.next(); - } - stderr.close(); - io.stderrDone.resolve(); - }); + yield* spawn(function* () { + let next = yield* io.stderr.next(); + while (!next.done) { + stderr.send(next.value); + next = yield* io.stderr.next(); + } + stderr.close(); + io.stderrDone.resolve(); + }); - yield* spawn(function* trapError() { - let [error] = yield* once<[Error]>(childProcess, "error"); - processResult.resolve(Err(error)); - }); + let stdin: Writable = { + send(data: string) { + childProcess.stdin.write(data); + }, + }; - let stdin: Writable = { - send(data: string) { - childProcess.stdin.write(data); - }, - }; + yield* spawn(function* trapError() { + let [error] = yield* once<[Error]>(childProcess, "error"); + processResult.resolve(Err(error)); + }); - yield* spawn(function* () { - try { + yield* spawn(function* () { let value = yield* once(childProcess, "close"); - yield* all([io.stdoutDone.operation, io.stderrDone.operation]); processResult.resolve(Ok(value)); + }); + + function* join() { + let result = yield* processResult.operation; + if (result.ok) { + let [code, signal] = result.value; + return { command, options, code, signal } as ExitStatus; + } else { + throw result.error; + } + } + + function* expect() { + let status: ExitStatus = yield* join(); + if (status.code != 0) { + throw new ExecError(status, command, options); + } else { + return status; + } + } + + try { + yield* provide({ + pid: pid as number, + stdin, + stdout, + stderr, + join, + expect, + }); } finally { try { if (typeof childProcess.pid === "undefined") { @@ -101,33 +131,4 @@ export const createPosixProcess: CreateOSProcess = function* createPosixProcess( } } }); - - function* join() { - let result = yield* processResult.operation; - if (result.ok) { - let [code, signal] = result.value; - return { command, options, code, signal } as ExitStatus; - } else { - throw result.error; - } - } - - function* expect() { - let status: ExitStatus = yield* join(); - if (status.code != 0) { - throw new ExecError(status, command, options); - } else { - return status; - } - } - - // FYI: this function starts a process and returns without blocking - return { - pid: pid as number, - stdin, - stdout, - stderr, - join, - expect, - }; }; diff --git a/process/src/exec/win32.ts b/process/src/exec/win32.ts index e733b3be..659d15f5 100644 --- a/process/src/exec/win32.ts +++ b/process/src/exec/win32.ts @@ -4,9 +4,8 @@ import { createSignal, Err, Ok, - race, + resource, type Result, - sleep, spawn, withResolvers, } from "effection"; @@ -33,81 +32,111 @@ function* killTree(pid: number) { } } -export const createWin32Process: CreateOSProcess = function* createWin32Process( +export const createWin32Process: CreateOSProcess = ( command, options, -) { - let processResult = withResolvers>(); - - // Windows-specific process spawning with different options than POSIX - let childProcess = spawnProcess(command, options.arguments || [], { - // We lose exit information and events if this is detached in windows - // and it opens a window in windows+powershell. - detached: false, - // The `shell` option is passed to `cross-spawn` to control whether a shell is used. - // On Windows, `shell: true` is necessary to run command strings, as it uses - // `cmd.exe` to parse the command and find executables in the PATH. - // Using a boolean `true` was previously disabled, causing ENOENT errors for - // commands that were not a direct path to an executable. - shell: options.shell || false, - // With stdio as pipe, windows gets stuck where neither the child nor the - // parent wants to close the stream, so we call it ourselves in the exit event. - stdio: "pipe", - // Hide the child window so that killing it will not block the parent - // with a Terminate Batch Process (Y/n) - windowsHide: true, - env: options.env, - cwd: options.cwd, - }); - - let { pid } = childProcess; - - let io = { - stdout: yield* useReadable(childProcess.stdout), - stderr: yield* useReadable(childProcess.stderr), - stdoutDone: withResolvers(), - stderrDone: withResolvers(), - }; - - const stdout = createSignal(); - const stderr = createSignal(); - - yield* spawn(function* () { - let next = yield* io.stdout.next(); - while (!next.done) { - stdout.send(next.value); - next = yield* io.stdout.next(); +) => { + return resource(function* (provide) { + let processResult = withResolvers>(); + + // Windows-specific process spawning with different options than POSIX + let childProcess = spawnProcess(command, options.arguments || [], { + // We lose exit information and events if this is detached in windows + // and it opens a window in windows+powershell. + detached: false, + // The `shell` option is passed to `cross-spawn` to control whether a shell is used. + // On Windows, `shell: true` is necessary to run command strings, as it uses + // `cmd.exe` to parse the command and find executables in the PATH. + // Using a boolean `true` was previously disabled, causing ENOENT errors for + // commands that were not a direct path to an executable. + shell: options.shell || false, + // With stdio as pipe, windows gets stuck where neither the child nor the + // parent wants to close the stream, so we call it ourselves in the exit event. + stdio: "pipe", + // Hide the child window so that killing it will not block the parent + // with a Terminate Batch Process (Y/n) + windowsHide: true, + env: options.env, + cwd: options.cwd, + }); + + let { pid } = childProcess; + + let io = { + stdout: yield* useReadable(childProcess.stdout), + stderr: yield* useReadable(childProcess.stderr), + stdoutDone: withResolvers(), + stderrDone: withResolvers(), + }; + + const stdout = createSignal(); + const stderr = createSignal(); + + yield* spawn(function* () { + let next = yield* io.stdout.next(); + while (!next.done) { + stdout.send(next.value); + next = yield* io.stdout.next(); + } + stdout.close(); + io.stdoutDone.resolve(); + }); + + yield* spawn(function* () { + let next = yield* io.stderr.next(); + while (!next.done) { + stderr.send(next.value); + next = yield* io.stderr.next(); + } + stderr.close(); + io.stderrDone.resolve(); + }); + + let stdin: Writable = { + send(data: string) { + childProcess.stdin.write(data); + }, + }; + + yield* spawn(function* trapError() { + const [error] = yield* once(childProcess, "error"); + processResult.resolve(Err(error)); + }); + + yield* spawn(function* () { + let value = yield* once(childProcess, "close"); + yield* all([io.stdoutDone.operation, io.stderrDone.operation]); + processResult.resolve(Ok(value)); + }); + + function* join() { + let result = yield* processResult.operation; + if (result.ok) { + let [code, signal] = result.value; + return { command, options, code, signal } as ExitStatus; + } else { + throw result.error; + } } - stdout.close(); - io.stdoutDone.resolve(); - }); - yield* spawn(function* () { - let next = yield* io.stderr.next(); - while (!next.done) { - stderr.send(next.value); - next = yield* io.stderr.next(); + function* expect() { + let status = yield* join(); + if (status.code != 0) { + throw new ExecError(status, command, options); + } else { + return status; + } } - stderr.close(); - io.stderrDone.resolve(); - }); - yield* spawn(function* trapError() { - const [error] = yield* once(childProcess, "error"); - processResult.resolve(Err(error)); - }); - - let stdin: Writable = { - send(data: string) { - childProcess.stdin.write(data); - }, - }; - - yield* spawn(function* () { try { - let value = yield* once(childProcess, "close"); - yield* all([io.stdoutDone.operation, io.stderrDone.operation]); - processResult.resolve(Ok(value)); + yield* provide({ + pid: pid as number, + stdin, + stdout, + stderr, + join, + expect, + }); } finally { try { // Only try to kill the process if it hasn't exited yet @@ -144,9 +173,6 @@ export const createWin32Process: CreateOSProcess = function* createWin32Process( // stdin might already be closed } - // Wait for graceful exit with a timeout - yield* race([processResult.operation, sleep(300)]); - // If process still hasn't exited, escalate if ( childProcess.exitCode === null && @@ -178,35 +204,6 @@ export const createWin32Process: CreateOSProcess = function* createWin32Process( } } }); - - function* join() { - let result = yield* processResult.operation; - if (result.ok) { - let [code, signal] = result.value; - return { command, options, code, signal } as ExitStatus; - } else { - throw result.error; - } - } - - function* expect() { - let status = yield* join(); - if (status.code != 0) { - throw new ExecError(status, command, options); - } else { - return status; - } - } - - // FYI: this function starts a process and returns without blocking - return { - pid: pid as number, - stdin, - stdout, - stderr, - join, - expect, - }; }; export const isWin32 = (): boolean => platform() === "win32"; diff --git a/process/test/eventemitter.test.ts b/process/test/eventemitter.test.ts index 985d525f..35ebdf19 100644 --- a/process/test/eventemitter.test.ts +++ b/process/test/eventemitter.test.ts @@ -1,5 +1,5 @@ import { expect } from "@std/expect"; -import { spawn } from "effection"; +import { spawn, withResolvers } from "effection"; import { describe, it } from "@effectionx/bdd"; import { EventEmitter } from "node:events"; @@ -9,58 +9,74 @@ describe("once", () => { it("resolves with single argument as array", function* () { expect.assertions(1); const emitter = new EventEmitter(); + const { resolve, operation } = withResolvers<[string]>(); - let result; yield* spawn(function* () { - result = yield* once<[string]>(emitter, "test"); + resolve(yield* once<[string]>(emitter, "test")); }); - emitter.emit("test", "hello"); + yield* spawn(function* () { + emitter.emit("test", "hello"); + }); - expect(result).toEqual(["hello"]); + expect(yield* operation).toEqual(["hello"]); }); it("resolves with multiple arguments as array", function* () { expect.assertions(1); const emitter = new EventEmitter(); - let result; + let { resolve, operation } = withResolvers<[number, string]>(); yield* spawn(function* () { - result = yield* once<[number, string]>(emitter, "exit"); + resolve(yield* once<[number, string]>(emitter, "exit")); }); - emitter.emit("exit", 42, "SIGTERM"); + yield* spawn(function* () { + emitter.emit("exit", 42, "SIGTERM"); + }); - expect(result).toEqual([42, "SIGTERM"]); + expect(yield* operation).toEqual([42, "SIGTERM"]); }); it("only resolves once even with multiple emissions", function* () { const emitter = new EventEmitter(); - let first; + const { resolve, operation } = withResolvers(); + let results: string[][] = []; + + yield* spawn(function* () { + results.push(yield* once<[string]>(emitter, "data")); + resolve(); + }); + yield* spawn(function* () { - first = yield* once<[string]>(emitter, "data"); + emitter.emit("data", "first"); + emitter.emit("data", "second"); }); - emitter.emit("data", "first"); - emitter.emit("data", "second"); + yield* operation; - expect(first).toEqual(["first"]); + expect(results).toEqual([["first"]]); }); it("removes listener after resolving", function* () { expect.assertions(2); const emitter = new EventEmitter(); + const { resolve, operation } = withResolvers(); + yield* spawn(function* () { yield* once<[string]>(emitter, "test"); + resolve(); }); - expect(emitter.listenerCount("test")).toBe(1); - - emitter.emit("test", "hello"); + yield* spawn(function* () { + expect(emitter.listenerCount("test")).toBe(1); + emitter.emit("test", "hello"); + }); + yield* operation; expect(emitter.listenerCount("test")).toBe(0); }); }); diff --git a/raf/raf.test.ts b/raf/raf.test.ts index 19f137f7..27cd0f28 100644 --- a/raf/raf.test.ts +++ b/raf/raf.test.ts @@ -23,7 +23,7 @@ describe("raf", () => { yield* each.next(); } }); - yield* sleep(100); - expect(count > 5).toBe(true); + yield* sleep(150); + expect(count).toBeGreaterThanOrEqual(5); }); }); diff --git a/run-test-v3-stress.sh b/run-test-v3-stress.sh new file mode 100755 index 00000000..c40343af --- /dev/null +++ b/run-test-v3-stress.sh @@ -0,0 +1,18 @@ +#!/bin/bash +set -e + +# Accept rounds parameter, default to 100 +ROUNDS=${1:-100} + +# Start stress-ng in background +stress-ng --all 2 >/dev/null 2>&1 & +STRESS_PID=$! + +# Give it time to start +sleep 2 + +# Run tests with rounds parameter +bash test-v3.sh "$ROUNDS" + +# Kill stress-ng when done +kill $STRESS_PID 2>/dev/null || true diff --git a/run-test-v4-stress.sh b/run-test-v4-stress.sh new file mode 100755 index 00000000..ae5663f9 --- /dev/null +++ b/run-test-v4-stress.sh @@ -0,0 +1,18 @@ +#!/bin/bash +set -e + +# Accept rounds parameter, default to 100 +ROUNDS=${1:-100} + +# Start stress-ng in background +stress-ng --all 2 >/dev/null 2>&1 & +STRESS_PID=$! + +# Give it time to start +sleep 2 + +# Run tests with rounds parameter +bash test-v4.sh "$ROUNDS" + +# Kill stress-ng when done +kill $STRESS_PID 2>/dev/null || true diff --git a/signals/array.test.ts b/signals/array.test.ts index 06b580f5..d04cf349 100644 --- a/signals/array.test.ts +++ b/signals/array.test.ts @@ -1,6 +1,6 @@ import { describe, it } from "@effectionx/bdd"; import { expect } from "@std/expect"; -import { each, sleep, spawn } from "effection"; +import { race, sleep, spawn, withResolvers } from "effection"; import { createArraySignal } from "./array.ts"; @@ -21,24 +21,35 @@ describe("array signal", () => { }); it("does not send a value to the stream when the set value is the same as the current value", function* () { + expect.assertions(2); + const array = yield* createArraySignal([]); - const updates: number[][] = []; + const { resolve, operation } = withResolvers(); + const subscription = yield* array; yield* spawn(function* () { - for (const update of yield* each(array)) { - updates.push(update); - yield* each.next(); - } - }); + array.set([1, 2, 3]); + + const firstUpdate = yield* subscription.next(); - array.set([1, 2, 3]); + expect(firstUpdate.value).toEqual([1, 2, 3]); - expect(updates).toEqual([[1, 2, 3]]); + array.set([1, 2, 3]); - array.set([1, 2, 3]); + const result = yield* race([ + subscription.next(), + (function* () { + yield* sleep(1); + return "sleep won; update not received"; + })(), + ]); + + expect(result).toEqual("sleep won; update not received"); + resolve(); + }); - expect(updates).toEqual([[1, 2, 3]]); + yield* operation; }); }); @@ -73,7 +84,7 @@ describe("array signal", () => { }); array.push(1); - yield* sleep(1); + yield* sleep(0); expect(ops).toEqual(["before shift", "got 1", "after shift"]); expect(array.valueOf()).toEqual([]); diff --git a/signals/boolean.test.ts b/signals/boolean.test.ts index 0008d494..d7f5dc83 100644 --- a/signals/boolean.test.ts +++ b/signals/boolean.test.ts @@ -1,4 +1,11 @@ -import { each, spawn } from "effection"; +import { + createChannel, + each, + race, + sleep, + spawn, + withResolvers, +} from "effection"; import { describe, it } from "@effectionx/bdd"; import { expect } from "@std/expect"; import { createBooleanSignal } from "./boolean.ts"; @@ -18,24 +25,43 @@ describe("boolean", () => { expect(boolean.valueOf()).toEqual(false); }); it("does not send a value to the stream when the set value is the same as the current value", function* () { + expect.assertions(2); const boolean = yield* createBooleanSignal(true); - const updates: boolean[] = []; + const { resolve, operation } = withResolvers(); + + const updates = createChannel(); + const subscription = yield* updates; yield* spawn(function* () { for (const update of yield* each(boolean)) { - updates.push(update); + yield* updates.send(update); yield* each.next(); } }); - boolean.set(true); + yield* spawn(function* () { + boolean.set(true); + + let next = yield* race([ + subscription.next(), + function* () { + yield* sleep(1); + return `sleep won; update not received`; + }(), + ]); - expect(updates).toEqual([]); + expect(next).toEqual(`sleep won; update not received`); - boolean.set(false); + boolean.set(false); + + next = yield* subscription.next(); + + expect(next.value).toEqual(false); + resolve(); + }); - expect(updates).toEqual([false]); + yield* operation; }); }); describe("update", () => { diff --git a/signals/helpers.test.ts b/signals/helpers.test.ts index 8fb7076d..098bac68 100644 --- a/signals/helpers.test.ts +++ b/signals/helpers.test.ts @@ -1,6 +1,6 @@ import { describe, it } from "@effectionx/bdd"; import { expect } from "@std/expect"; -import { spawn } from "effection"; +import { sleep, spawn, withResolvers } from "effection"; import { createBooleanSignal } from "./boolean.ts"; import { is } from "./helpers.ts"; @@ -10,12 +10,20 @@ describe("is", () => { const open = yield* createBooleanSignal(false); const update: string[] = []; + const { resolve, operation } = withResolvers(); + yield* spawn(function* () { yield* is(open, (open) => open === true); update.push("floodgates are open!"); + resolve(); + }); + + yield* spawn(function* () { + yield* sleep(1); + open.set(true); }); - open.set(true); + yield* operation; expect(update).toEqual(["floodgates are open!"]); }); diff --git a/stream-helpers/batch.test.ts b/stream-helpers/batch.test.ts index fbf35949..7067ef4e 100644 --- a/stream-helpers/batch.test.ts +++ b/stream-helpers/batch.test.ts @@ -1,7 +1,7 @@ import { describe, it } from "@effectionx/bdd"; import { createArraySignal, is } from "@effectionx/signals"; import { expect } from "@std/expect"; -import { sleep, spawn } from "effection"; +import { createChannel, sleep, spawn } from "effection"; import { batch } from "./batch.ts"; import { forEach } from "./for-each.ts"; @@ -9,25 +9,18 @@ import { useFaucet } from "./test-helpers/faucet.ts"; describe("batch", () => { it("creates a batch when maxTime expires", function* () { - const faucet = yield* useFaucet({ open: true }); - const stream = batch({ maxTime: 5 })(faucet); + const source = createChannel(); + const stream = batch({ maxTime: 100 })(source); const subscription = yield* stream; - yield* faucet.pour(function* (send) { - yield* sleep(1); - yield* send(1); - yield* sleep(1); - yield* send(2); - yield* sleep(1); - yield* send(3); - }); + let next = yield* spawn(() => subscription.next()); - yield* sleep(10); + yield* source.send(1); + yield* source.send(2); + yield* source.send(3); - let next = yield* subscription.next(); - - expect(next.value).toEqual([1, 2, 3]); + expect((yield* next).value).toEqual([1, 2, 3]); }); it("creates a batch by maxSize when maxTime is not set", function* () { @@ -50,13 +43,22 @@ describe("batch", () => { const stream = batch({ maxSize: 8, maxTime: 50 })(faucet); const batches = yield* createArraySignal([]); + const windows: number[] = []; + + let last = performance.now(); yield* spawn(() => forEach(function* (batch) { + const now = performance.now(); + windows.push(now - last); + last = now; + batches.push(batch); }, stream) ); + yield* sleep(1); + yield* faucet.pour(function* (send) { for (let i = 1; i <= 10; i++) { yield* send(i); @@ -66,7 +68,12 @@ describe("batch", () => { yield* is(batches, (list) => list.flat().length >= 10); - expect(batches.valueOf()).toHaveLength(4); + expect(windows.length).toBeGreaterThanOrEqual(3); + + const avg = average(windows); + const percentDiff = Math.abs((avg - 50) / 50) * 100; + expect(percentDiff).toBeLessThanOrEqual(50); + expect(batches.valueOf().flat()).toHaveLength(10); }); @@ -82,10 +89,24 @@ describe("batch", () => { }, stream) ); + yield* sleep(1); + yield* faucet.pour([1, 2, 3, 4, 5, 6]); - yield* is(batches, (list) => list.length === 2); + yield* is(batches, (batches) => batches.flat().length >= 6); - expect(batches.valueOf()).toEqual([[1, 2, 3, 4, 5], [6]]); + expect(batches.length).toBeGreaterThan(1); + expect(batches.valueOf().every((batch) => batch.length <= 5)).toBe(true); }); }); + +function average(arr: number[]) { + if (arr.length === 0) { + return 0; + } + const sum = arr.reduce( + (accumulator, currentValue) => accumulator + currentValue, + 0, + ); + return sum / arr.length; +} diff --git a/stream-helpers/batch.ts b/stream-helpers/batch.ts index 44ee8d6d..84f1aebd 100644 --- a/stream-helpers/batch.ts +++ b/stream-helpers/batch.ts @@ -1,8 +1,11 @@ -import { each, race, sleep, spawn, type Stream } from "effection"; -import { createArraySignal, is } from "@effectionx/signals"; +import { spawn, type Stream, type Task } from "effection"; +import { timebox } from "@effectionx/timebox"; type RequireAtLeastOne = - & Pick> + & Pick< + T, + Exclude + > & { [K in Keys]-?: Required> & Partial>>; }[Keys]; @@ -28,41 +31,75 @@ export function batch( return function (stream: Stream): Stream, never> { return { *[Symbol.iterator]() { - let batch = yield* createArraySignal([]); + const subscription = yield* stream; + let lastPull: Task> | undefined; - yield* spawn(function* () { - for (let item of yield* each(stream)) { - batch.push(item); - if (options.maxSize && batch.length >= options.maxSize) { - // wait until it's drained - yield* is(batch, (batch) => batch.length === 0); + return { + *next() { + let start: DOMHighResTimeStamp = performance.now(); + const batch: T[] = []; + let next: IteratorResult = { + done: true as const, + value: undefined as never, + }; + if (lastPull && options.maxTime) { + const timeout = yield* timebox(options.maxTime, () => lastPull!); + if (timeout.timeout) { + yield* lastPull.halt(); + lastPull = undefined; + } else { + next = timeout.value; + lastPull = undefined; + } + } else { + next = yield* subscription.next(); } - yield* each.next(); - } - }); + // push the next value into the batch + while (!next.done) { + batch.push(next.value); + const now = performance.now(); + if (options.maxSize && batch.length >= options.maxSize) { + return { + done: false as const, + value: batch, + }; + } else if (options.maxTime && start + options.maxTime <= now) { + return { + done: false as const, + value: batch, + }; + } else if (options.maxTime) { + const task = yield* spawn(() => subscription.next()); - function drain() { - let value = batch.valueOf(); - batch.set([]); - return value; - } + const timeout = yield* timebox( + start + options.maxTime - performance.now(), + () => task!, + ); - return { - *next() { - yield* is(batch, (batch) => batch.length >= 1); + if (timeout.timeout) { + // produce the batch that we have, save task for next batch + lastPull = task; + return { + done: false as const, + value: batch, + }; + } else { + next = timeout.value; + } + } else { + next = yield* subscription.next(); + } + } - if (options.maxTime && options.maxSize) { - yield* race([ - is(batch, (batch) => batch.length === options.maxSize), - sleep(options.maxTime), - ]); - } else if (options.maxTime) { - yield* sleep(options.maxTime); - } else if (options.maxSize) { - yield* is(batch, (batch) => batch.length === options.maxSize); + // Stream is done, return any remaining batch + if (batch.length > 0) { + return { + done: false as const, + value: batch, + }; } - return { done: false, value: drain() }; + return next; }, }; }, diff --git a/stream-helpers/deno.json b/stream-helpers/deno.json index cf2cfa9f..937a6297 100644 --- a/stream-helpers/deno.json +++ b/stream-helpers/deno.json @@ -7,8 +7,9 @@ "@effectionx/signals": "npm:@effectionx/signals@0.3.0", "@std/expect": "jsr:@std/expect@^1", "@std/testing": "jsr:@std/testing@^1", + "@effectionx/timebox": "npm:@effectionx/timebox@0.1.0", "@effectionx/bdd": "jsr:@effectionx/bdd@0.2.2", - "remeda": "npm:remeda@2.21.3" + "remeda": "npm:remeda@^2" }, "license": "MIT", "exports": { diff --git a/stream-helpers/for-each.test.ts b/stream-helpers/for-each.test.ts index da3e8ad7..c6706114 100644 --- a/stream-helpers/for-each.test.ts +++ b/stream-helpers/for-each.test.ts @@ -1,4 +1,4 @@ -import { createSignal, spawn } from "effection"; +import { createChannel, sleep, spawn } from "effection"; import { describe, it } from "@effectionx/bdd"; import { expect } from "@std/expect"; @@ -6,36 +6,37 @@ import { forEach } from "./for-each.ts"; describe("forEach", () => { it("should invoke function for each item in the stream", function* () { - const stream = createSignal(); + expect.assertions(1); + + const stream = createChannel(); const processedItems: number[] = []; - yield* spawn(() => - forEach(function* (item: number) { - processedItems.push(item); - }, stream) - ); + yield* spawn(function* () { + yield* sleep(0); + yield* stream.send(1); + yield* stream.send(2); + yield* stream.send(3); + yield* stream.close(); + }); - stream.send(1); - stream.send(2); - stream.send(3); + yield* forEach(function* (item: number) { + processedItems.push(item); + }, stream); expect(processedItems).toEqual([1, 2, 3]); }); it("should return the close value of the stream", function* () { - const stream = createSignal(); - - const result = yield* spawn(() => - forEach(function* () { - // Just process the item - }, stream) - ); + const stream = createChannel(); - stream.send("hello"); - stream.send("world"); - stream.close(42); // Close with value 42 + yield* spawn(function* () { + yield* sleep(0); + yield* stream.send("hello"); + yield* stream.send("world"); + yield* stream.close(42); // Close with value 42 + }); - const closeValue = yield* result; + const closeValue = yield* forEach(function* () {}, stream); expect(closeValue).toBe(42); }); }); diff --git a/stream-helpers/map.ts b/stream-helpers/map.ts index 92ffee9b..b25cdb7d 100644 --- a/stream-helpers/map.ts +++ b/stream-helpers/map.ts @@ -21,9 +21,11 @@ export function map( return next; } + const value = yield* fn(next.value); + return { done: false, - value: yield* fn(next.value), + value, }; }, }; diff --git a/stream-helpers/test-helpers/faucet.test.ts b/stream-helpers/test-helpers/faucet.test.ts index 879ca54d..cc022f93 100644 --- a/stream-helpers/test-helpers/faucet.test.ts +++ b/stream-helpers/test-helpers/faucet.test.ts @@ -18,8 +18,11 @@ describe("useFaucet", () => { } }); - // Pour an array of items - yield* faucet.pour([1, 2, 3]); + yield* spawn(function* () { + yield* sleep(1); + // Pour an array of items + yield* faucet.pour([1, 2, 3]); + }); // Wait for processing yield* is(results, (results) => results.length === 3); @@ -28,6 +31,7 @@ describe("useFaucet", () => { }); it("respects the open state", function* () { + expect.assertions(2); const faucet = yield* useFaucet({ open: false }); const results = yield* createArraySignal([]); @@ -48,6 +52,8 @@ describe("useFaucet", () => { yield* faucet.pour([4, 5, 6]); + yield* is(results, (results) => results.length === 3); + expect(results.valueOf()).toEqual([4, 5, 6]); }); @@ -65,6 +71,7 @@ describe("useFaucet", () => { // Pour using a generator function yield* spawn(function* () { + yield* sleep(1); yield* faucet.pour(function* (send) { yield* send(1); yield* sleep(10); @@ -93,6 +100,7 @@ describe("useFaucet", () => { // Start pouring with a generator yield* spawn(function* () { + yield* sleep(1); yield* faucet.pour(function* (send) { yield* send(1); yield* sleep(10); @@ -105,8 +113,7 @@ describe("useFaucet", () => { }); }); - // Wait for the first item - yield* sleep(50); + yield* is(results, (results) => results.length === 2); expect(results.valueOf()).toEqual([1, 2]); }); diff --git a/stream-helpers/test-helpers/faucet.ts b/stream-helpers/test-helpers/faucet.ts index 808df8ef..2454f824 100644 --- a/stream-helpers/test-helpers/faucet.ts +++ b/stream-helpers/test-helpers/faucet.ts @@ -1,4 +1,4 @@ -import { createSignal, type Operation, type Stream } from "effection"; +import { createChannel, type Operation, type Stream } from "effection"; import { createBooleanSignal, is } from "@effectionx/signals"; /** @@ -81,7 +81,7 @@ export interface FaucetOptions { * @returns stream of items coming from the faucet */ export function* useFaucet(options: FaucetOptions): Operation> { - let signal = createSignal(); + let signal = createChannel(); let open = yield* createBooleanSignal(options.open); return { @@ -90,12 +90,12 @@ export function* useFaucet(options: FaucetOptions): Operation> { if (Array.isArray(items)) { for (let i of items) { yield* is(open, (open) => open); - signal.send(i); + yield* signal.send(i); } } else { yield* items(function* (item) { yield* is(open, (open) => open); - signal.send(item); + yield* signal.send(item); }); } }, diff --git a/stream-helpers/tracker.test.ts b/stream-helpers/tracker.test.ts index 105f5755..3a70df61 100644 --- a/stream-helpers/tracker.test.ts +++ b/stream-helpers/tracker.test.ts @@ -1,18 +1,19 @@ import { describe, it } from "@effectionx/bdd"; import { expect } from "@std/expect"; -import { each, sleep, spawn, withResolvers } from "effection"; +import { each, sleep, spawn } from "effection"; import { pipe } from "remeda"; import { batch } from "./batch.ts"; import { map } from "./map.ts"; import { useFaucet } from "./test-helpers/faucet.ts"; import { createTracker } from "./tracker.ts"; +import { createArraySignal, is } from "@effectionx/signals"; describe("tracker", () => { it("waits for all items to be processed", function* () { - const { operation, resolve } = withResolvers(); const tracker = yield* createTracker(); const faucet = yield* useFaucet({ open: true }); + const received = yield* createArraySignal([]); const stream = pipe( faucet, @@ -23,22 +24,17 @@ describe("tracker", () => { }), ); - const received: number[] = []; - yield* spawn(function* () { - let count = 0; for (const item of yield* each(stream)) { yield* sleep(10); received.push(item); tracker.markOne(item); - count++; - if (count >= 3) { - resolve(); - } yield* each.next(); } }); + yield* sleep(1); + yield* faucet.pour(function* (send) { yield* send(1); yield* sleep(1); @@ -47,16 +43,18 @@ describe("tracker", () => { yield* send(3); }); - yield* operation; yield* tracker; - expect(received).toEqual([1, 2, 3]); + yield* is(received, (received) => received.length === 3); + + expect(received.valueOf()).toEqual([1, 2, 3]); }); - it("tracks batched items", function* () { - const { operation, resolve } = withResolvers(); + it("tracks batched items", function* () { const tracker = yield* createTracker(); const faucet = yield* useFaucet({ open: true }); + const received = yield* createArraySignal([]); + const stream = pipe( faucet, tracker.passthrough(), @@ -69,21 +67,16 @@ describe("tracker", () => { }), ); - const received: Readonly[] = []; - yield* spawn(function* () { - let count = 0; for (const items of yield* each(stream)) { received.push(items); tracker.markMany(items); - count = count + items.length; - if (count >= 9) { - resolve(); - } yield* each.next(); } }); + yield* sleep(1); + yield* faucet.pour(function* (send) { yield* send(1); yield* sleep(10); @@ -104,9 +97,9 @@ describe("tracker", () => { yield* send(9); }); - yield* operation; + yield* is(received, (received) => received.flat().length >= 9); yield* tracker; - expect(received).toEqual([[1, 2, 3], [4, 5, 6], [7, 8, 9]]); + expect(received.valueOf()).toEqual([[1, 2, 3], [4, 5, 6], [7, 8, 9]]); }); }); diff --git a/stream-helpers/valve.test.ts b/stream-helpers/valve.test.ts index c6bb1e73..4ee60bf4 100644 --- a/stream-helpers/valve.test.ts +++ b/stream-helpers/valve.test.ts @@ -36,6 +36,8 @@ describe("valve", () => { } }); + yield* sleep(1); + yield* faucet.pour([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); yield* is(values, (values) => values.length === 10); diff --git a/task-buffer/task-buffer.test.ts b/task-buffer/task-buffer.test.ts index df8e480a..99b34969 100644 --- a/task-buffer/task-buffer.test.ts +++ b/task-buffer/task-buffer.test.ts @@ -1,5 +1,5 @@ import { FakeTime } from "@std/testing/time"; -import { sleep, spawn, type Task } from "effection"; +import { sleep, spawn, type Task, until } from "effection"; import { describe, it } from "@effectionx/bdd"; import { expect } from "@std/expect"; import { useTaskBuffer } from "./task-buffer.ts"; @@ -19,12 +19,12 @@ describe("TaskBuffer", () => { third = yield* yield* buffer.spawn(() => sleep(10)); }); - time.tick(5); + yield* until(time.tickAsync(5)); // right now the third spawn is queued up, but not spawned. expect(third).toBeUndefined(); - time.tick(10); + yield* until(time.tickAsync(10)); // the other tasks finished and so the third task is active. expect(third).toBeDefined(); @@ -48,7 +48,7 @@ describe("TaskBuffer", () => { expect(finished).toEqual(0); - time.tick(10); + yield* until(time.tickAsync(10)); yield* buffer; expect(finished).toEqual(3); diff --git a/tasks/build-npm.ts b/tasks/build-npm.ts index ba1f87ea..295617f3 100644 --- a/tasks/build-npm.ts +++ b/tasks/build-npm.ts @@ -52,6 +52,9 @@ await build({ node: ">= 16", }, sideEffects: false, + dependencies: { + "effection": "^3 || ^4", + }, }, }); diff --git a/tasks/check-version-mismatches.ts b/tasks/check-version-mismatches.ts new file mode 100644 index 00000000..7890b694 --- /dev/null +++ b/tasks/check-version-mismatches.ts @@ -0,0 +1,77 @@ +import { expandGlob } from "jsr:@std/fs@^1"; + +const rootDir = new URL("..", import.meta.url).pathname; + +interface DenoConfig { + name?: string; + version?: string; + imports?: Record; +} + +// First, collect all package versions +const packageVersions = new Map(); + +for await ( + const file of expandGlob("**/deno.json", { + root: rootDir, + exclude: ["node_modules", "build"], + }) +) { + const content = await Deno.readTextFile(file.path); + const config: DenoConfig = JSON.parse(content); + + if (config.name && config.version) { + packageVersions.set(config.name, config.version); + } +} + +console.log("Package versions found:"); +for (const [name, version] of packageVersions) { + console.log(` ${name}: ${version}`); +} +console.log(""); + +// Now check for mismatches +let foundMismatches = false; + +for await ( + const file of expandGlob("**/deno.json", { + root: rootDir, + exclude: ["node_modules", "build"], + }) +) { + const content = await Deno.readTextFile(file.path); + const config: DenoConfig = JSON.parse(content); + const relativePath = file.path.replace(rootDir, ""); + + if (config.imports) { + for (const [depName, depSpec] of Object.entries(config.imports)) { + if (depName.startsWith("@effectionx/")) { + const actualVersion = packageVersions.get(depName); + if (actualVersion) { + // Extract version from spec (handles jsr:@effectionx/foo@version or npm:@effectionx/foo@version) + const match = depSpec.match(/@effectionx\/[^@]+@(.+)$/); + if (match) { + const declaredVersion = match[1]; + // Check if it's a caret version or exact version + const isCaretVersion = declaredVersion.startsWith("^"); + const versionToCheck = isCaretVersion + ? declaredVersion.substring(1) + : declaredVersion; + + if (versionToCheck !== actualVersion) { + foundMismatches = true; + console.log( + `❌ ${relativePath}: ${depName} declared as ${declaredVersion} but actual version is ${actualVersion}`, + ); + } + } + } + } + } + } +} + +if (!foundMismatches) { + console.log("✓ All internal dependency versions match!"); +} diff --git a/tasks/generate-importmap.ts b/tasks/generate-importmap.ts new file mode 100644 index 00000000..3072ece6 --- /dev/null +++ b/tasks/generate-importmap.ts @@ -0,0 +1,107 @@ +#!/usr/bin/env -S deno run --allow-read --allow-write + +import { expandGlob } from "@std/fs"; +import { fromFileUrl, join } from "@std/path"; + +interface DenoConfig { + name?: string; + exports?: string | Record; + imports?: Record; +} + +const rootDir = fromFileUrl(new URL("..", import.meta.url)); +const imports = new Map(); +const workspacePackages = new Map(); + +// First pass: collect all workspace package names and their exports +for await ( + const file of expandGlob("**/deno.json", { + root: rootDir, + exclude: ["node_modules", "build"], + }) +) { + const content = await Deno.readTextFile(file.path); + const config: DenoConfig = JSON.parse(content); + + if (config.name?.startsWith("@effectionx/")) { + // Get the default export path + let exportPath = "./mod.ts"; // fallback + if (typeof config.exports === "string") { + exportPath = config.exports; + } else if (config.exports && typeof config.exports === "object") { + exportPath = config.exports["."] || config.exports["default"] || + "./mod.ts"; + } + + // Get package directory relative to root + const packageDir = file.path.replace(rootDir, "").replace( + /[\/\\]deno\.json$/, + "", + ); + const fullExportPath = `./${packageDir}/${exportPath.replace("./", "")}` + .replace(/\\/g, "/"); + + workspacePackages.set(config.name, fullExportPath); + } +} + +// Second pass: collect external dependencies and convert workspace packages to local paths +for await ( + const file of expandGlob("**/deno.json", { + root: rootDir, + exclude: ["node_modules", "build"], + }) +) { + const content = await Deno.readTextFile(file.path); + const config: DenoConfig = JSON.parse(content); + + if (config.imports) { + for (const [key, value] of Object.entries(config.imports)) { + // Convert workspace packages to local paths using their actual export + if (workspacePackages.has(key)) { + imports.set(key, workspacePackages.get(key)!); + continue; + } + + // Skip other internal @effectionx packages + if (key.startsWith("@effectionx/")) { + continue; + } + + // Only include external dependencies (jsr: or npm:) + if (value.startsWith("jsr:") || value.startsWith("npm:")) { + imports.set(key, value); + } + } + } +} + +// Add effection v4 +imports.set("effection", "npm:effection@^4.0.0-0"); + +// Add @std/testing sub-packages if @std/testing is present +if (imports.has("@std/testing")) { + const testingSpec = imports.get("@std/testing")!; + // Extract version from "jsr:@std/testing@^1" -> "^1" + const version = testingSpec.split("@std/testing@")[1]; + imports.set("@std/testing/bdd", `jsr:@std/testing@${version}/bdd`); + imports.set("@std/testing/mock", `jsr:@std/testing@${version}/mock`); + imports.set("@std/testing/time", `jsr:@std/testing@${version}/time`); +} + +// Sort by key +const sortedImports = Object.fromEntries( + Array.from(imports.entries()).sort(([a], [b]) => a.localeCompare(b)), +); + +const output = { + imports: sortedImports, +}; + +const outputPath = join(rootDir, "v4.importmap.json"); +await Deno.writeTextFile( + outputPath, + JSON.stringify(output, null, 2) + "\n", +); + +console.log(`Generated ${outputPath} with ${imports.size} dependencies`); diff --git a/tasks/update-effection-version.ts b/tasks/update-effection-version.ts new file mode 100644 index 00000000..a3fe058b --- /dev/null +++ b/tasks/update-effection-version.ts @@ -0,0 +1,33 @@ +import { expandGlob } from "jsr:@std/fs@^1"; +import { join } from "jsr:@std/path@^1"; + +const rootDir = new URL("..", import.meta.url).pathname; + +interface DenoConfig { + imports?: Record; +} + +for await ( + const file of expandGlob("**/deno.json", { + root: rootDir, + exclude: ["node_modules", "build"], + }) +) { + const content = await Deno.readTextFile(file.path); + const config: DenoConfig = JSON.parse(content); + + if (config.imports && config.imports["effection"]) { + const currentVersion = config.imports["effection"]; + if (currentVersion === "npm:effection@^3") { + config.imports["effection"] = "npm:effection@^4.0.0-0"; + + const updatedContent = JSON.stringify(config, null, 2) + "\n"; + await Deno.writeTextFile(file.path, updatedContent); + + const relativePath = file.path.replace(rootDir, ""); + console.log(`Updated ${relativePath}`); + } + } +} + +console.log("✓ Effection version update complete"); diff --git a/test-v3.sh b/test-v3.sh new file mode 100755 index 00000000..a418e588 --- /dev/null +++ b/test-v3.sh @@ -0,0 +1,51 @@ +#!/bin/bash + +failures=0 +total=${1:-100} +timestamp=$(date +%Y%m%d_%H%M%S) +summary_file="test-summary-v3_${timestamp}.log" + +echo "V3 Test Run - Started at $(date)" > "$summary_file" +echo "==========================================" >> "$summary_file" +echo "" >> "$summary_file" + +for i in $(seq 1 $total); do + echo "=====================================" + echo "Run $i/$total..." + echo "=====================================" + + start_time=$(date +%s) + + set -o pipefail + NO_COLOR=1 timeout 120 deno task test 2>&1 | tee /tmp/test-output-$i.log + exit_code=$? + set +o pipefail + + end_time=$(date +%s) + duration=$((end_time - start_time)) + + echo "DEBUG: Exit code = $exit_code, Duration = ${duration}s" + + if [ $exit_code -eq 0 ]; then + echo "✓ PASSED (${duration}s)" + echo "Run $i: PASSED (${duration}s)" >> "$summary_file" + else + ((failures++)) + echo "✗ FAILED (${duration}s)" + echo "Run $i: FAILED (${duration}s)" >> "$summary_file" + cat /tmp/test-output-$i.log | sed -n '/ ERRORS/,/^FAILED/p' >> "$summary_file" + echo "" >> "$summary_file" + fi + rm -f /tmp/test-output-$i.log +done + +echo "==========================================" >> "$summary_file" +echo "V3 Tests - Total failures: $failures/$total" >> "$summary_file" +echo "Completed at $(date)" >> "$summary_file" + +cat "$summary_file" +echo "" +echo "Summary saved to: $summary_file" + +# Exit 0 so Dagger exports the results even when tests fail +exit 0 diff --git a/test-v4.sh b/test-v4.sh new file mode 100755 index 00000000..d08c0783 --- /dev/null +++ b/test-v4.sh @@ -0,0 +1,51 @@ +#!/bin/bash + +failures=0 +total=${1:-100} +timestamp=$(date +%Y%m%d_%H%M%S) +summary_file="test-summary-v4_${timestamp}.log" + +echo "V4 Test Run - Started at $(date)" > "$summary_file" +echo "==========================================" >> "$summary_file" +echo "" >> "$summary_file" + +for i in $(seq 1 $total); do + echo "=====================================" + echo "Run $i/$total..." + echo "=====================================" + + start_time=$(date +%s) + + set -o pipefail + NO_COLOR=1 timeout 120 deno task test:v4 2>&1 | tee /tmp/test-output-$i.log + exit_code=$? + set +o pipefail + + end_time=$(date +%s) + duration=$((end_time - start_time)) + + echo "DEBUG: Exit code = $exit_code, Duration = ${duration}s" + + if [ $exit_code -eq 0 ]; then + echo "✓ PASSED (${duration}s)" + echo "Run $i: PASSED (${duration}s)" >> "$summary_file" + else + ((failures++)) + echo "✗ FAILED (${duration}s)" + echo "Run $i: FAILED (${duration}s)" >> "$summary_file" + cat /tmp/test-output-$i.log | sed -n '/ ERRORS/,/^FAILED/p' >> "$summary_file" + echo "" >> "$summary_file" + fi + rm -f /tmp/test-output-$i.log +done + +echo "==========================================" >> "$summary_file" +echo "V4 Tests - Total failures: $failures/$total" >> "$summary_file" +echo "Completed at $(date)" >> "$summary_file" + +cat "$summary_file" +echo "" +echo "Summary saved to: $summary_file" + +# Exit 0 so Dagger exports the results even when tests fail +exit 0 diff --git a/v3-test-summary.md b/v3-test-summary.md new file mode 100644 index 00000000..8847ab40 --- /dev/null +++ b/v3-test-summary.md @@ -0,0 +1,31 @@ +# V3 Test Results (with stress-ng --all 2) + +## Summary +- **27/100 runs failed** (73% success rate) +- **1 resource leak detected** (Run 71) +- **26 timing failures** (expected under CPU stress) + +## Failures + +| Test | Count | Type | Issue | +|------|-------|------|-------| +| watch | 17 | Timing | Process startup timeout | +| parallel() | 4 | Timing | Race condition in async order | +| batch | 2 | Timing | Exceeded 50% threshold (78.9ms vs 50ms) | +| **watch** | **1** | **LEAK** | **Chokidar resources not cleaned up** | + +## Resource Leak (Run 71) ⚠️ + +**Location**: `watch/watch.ts:156` - `yield* until(watcher.close())` + +**Leaked resources**: +1. Timer from `FSWatcher._throttle` (chokidar) +2. Async directory read from `ReaddirpStream._exploreDir` (readdirp) + +**Root cause**: Chokidar cleanup doesn't complete before test exits under CPU stress + +## Next Steps + +1. Fix chokidar cleanup race condition in `watch/watch.ts` +2. Explicitly clear timers before `watcher.close()` +3. Run watch tests in isolation under stress to reproduce consistently diff --git a/watch/deno.json b/watch/deno.json index bc216443..06b652cf 100644 --- a/watch/deno.json +++ b/watch/deno.json @@ -11,7 +11,7 @@ "ignore": "npm:ignore@^7.0.3", "@std/expect": "jsr:@std/expect@^1", "@std/assert": "jsr:@std/assert@^1", - "@gordonb/pipe": "jsr:@gordonb/pipe@0.1.0", + "remeda": "npm:remeda@^2", "@std/fs": "jsr:@std/fs@^1", "@std/path": "jsr:@std/path@^1", "chokidar": "npm:chokidar@^4.0.3", diff --git a/watch/test/helpers.ts b/watch/test/helpers.ts index 067a16ce..f7b28016 100644 --- a/watch/test/helpers.ts +++ b/watch/test/helpers.ts @@ -3,7 +3,7 @@ import { assert } from "@std/assert"; import { emptyDir, ensureDir } from "@std/fs"; import { dirname, fromFileUrl, join } from "@std/path"; import type { Operation, Result, Stream } from "effection"; -import { each, Ok, sleep, spawn, until } from "effection"; +import { each, Ok, resource, sleep, spawn, until } from "effection"; import { cp, readFile, writeFile } from "node:fs/promises"; import type { Start } from "../watch.ts"; @@ -67,72 +67,79 @@ type SuccessfulStart = { type ProcessStart = Result; -export function* inspector(stream: Stream) { - let starts: ProcessStart[] = []; +interface Inspector { + starts: ProcessStart[]; + expectNext(): Operation; + expectNoRestart(): Operation; +} - let expected = 0; +export function inspector(stream: Stream): Operation { + return resource(function* (provide) { + let starts: ProcessStart[] = []; - yield* spawn(function* () { - for (let { result } of yield* each(stream)) { - if (result.ok) { - let process = result.value; - let start = { - stdout: "", - stderr: "", - process: result.value, - }; - starts.push(Ok(start)); - yield* spawn(function* () { - for (let chunk of yield* each(process.stdout)) { - start.stdout += String(chunk); - yield* each.next(); - } - }); - yield* spawn(function* () { - for (let chunk of yield* each(process.stderr)) { - start.stderr += String(chunk); - yield* each.next(); - } - }); - } else { - starts.push(result); - } + let expected = 0; - yield* each.next(); - } - }); + yield* spawn(function* () { + for (let { result } of yield* each(stream)) { + if (result.ok) { + let process = result.value; + let start = { + stdout: "", + stderr: "", + process: result.value, + }; + starts.push(Ok(start)); + yield* spawn(function* () { + for (let chunk of yield* each(process.stdout)) { + start.stdout += String(chunk); + yield* each.next(); + } + }); + yield* spawn(function* () { + for (let chunk of yield* each(process.stderr)) { + start.stderr += String(chunk); + yield* each.next(); + } + }); + } else { + starts.push(result); + } - let inspector = { - starts, - *expectNext(): Operation { - let initial = expected; - for (let i = 0; i < 500; i++) { - if (initial < starts.length) { - yield* sleep(10); - expected = starts.length; - let result = inspector.starts[inspector.starts.length - 1]; - if (result.ok) { - return result.value; + yield* each.next(); + } + }); + + yield* provide({ + starts, + *expectNext() { + let initial = expected; + for (let i = 0; i < 500; i++) { + if (initial < starts.length) { + yield* sleep(10); + expected = starts.length; + let result = starts[starts.length - 1]; + if (result.ok) { + return result.value; + } else { + throw new Error( + `expected successful start, but failed: ${result.error}`, + ); + } } else { - throw new Error( - `expected successful start, but failed: ${result.error}`, - ); + yield* sleep(10); } - } else { - yield* sleep(10); } - } - throw new Error(`expecting a sucessful start but it never appeared.`); - }, - *expectNoRestart() { - let prexisting = inspector.starts.length; - yield* sleep(200); - let restarts = inspector.starts.length - prexisting; - assert( - restarts === 0, - `expected no process restarts to have happened, but instead there were: ${restarts}`, - ); - }, - }; - return inspector; + throw new Error(`expecting a sucessful start but it never appeared.`); + }, + *expectNoRestart() { + let prexisting = starts.length; + yield* sleep(200); + let restarts = starts.length - prexisting; + assert( + restarts === 0, + `expected no process restarts to have happened, but instead there were: ${restarts}`, + ); + }, + }); + }); } diff --git a/watch/test/watch.test.ts b/watch/test/watch.test.ts index f029d1ca..1111ee3c 100644 --- a/watch/test/watch.test.ts +++ b/watch/test/watch.test.ts @@ -48,6 +48,7 @@ describe("watch", () => { }); it("ignores files in .gitignore", function* () { + expect.assertions(1); let fixture = yield* useFixture(); let processes = yield* inspector( @@ -64,6 +65,8 @@ describe("watch", () => { yield* fixture.write("dist/artifact.txt", "this file was built again"); yield* processes.expectNoRestart(); + + expect(processes.starts).toHaveLength(1); }); it.skip("ignores files in a .gitignore that is in a parent directory", function* () { diff --git a/watch/watch.ts b/watch/watch.ts index 9dd7b279..3baa5fee 100644 --- a/watch/watch.ts +++ b/watch/watch.ts @@ -16,7 +16,7 @@ import { withResolvers, } from "effection"; import { default as createIgnore } from "ignore"; -import { pipe } from "@gordonb/pipe"; +import { pipe } from "remeda"; import { readFile } from "node:fs/promises"; import { exec, type ExecOptions, type Process } from "@effectionx/process"; diff --git a/worker/deno.json b/worker/deno.json index 5d684b28..38bb17c4 100644 --- a/worker/deno.json +++ b/worker/deno.json @@ -11,6 +11,6 @@ "@std/path": "jsr:@std/path@^1", "@std/fs": "jsr:@std/fs@^1", "@effectionx/bdd": "jsr:@effectionx/bdd@0.2.2", - "@effection/signals": "npm:@effectionx/signals@0.3.0" + "@effectionx/signals": "npm:@effectionx/signals@0.3.0" } }