diff --git a/async/deno.json b/async/deno.json index 74c61259d3ca..18189b9f6170 100644 --- a/async/deno.json +++ b/async/deno.json @@ -15,6 +15,7 @@ "./tee": "./tee.ts", "./unstable-throttle": "./unstable_throttle.ts", "./unstable-wait-for": "./unstable_wait_for.ts", - "./unstable-semaphore": "./unstable_semaphore.ts" + "./unstable-semaphore": "./unstable_semaphore.ts", + "./unstable-circuit-breaker": "./unstable_circuit_breaker.ts" } } diff --git a/async/unstable_circuit_breaker.ts b/async/unstable_circuit_breaker.ts new file mode 100644 index 000000000000..436a8d20fdda --- /dev/null +++ b/async/unstable_circuit_breaker.ts @@ -0,0 +1,685 @@ +// Copyright 2018-2025 the Deno authors. MIT license. +// This module is browser compatible. + +/** + * Circuit breaker states following the standard pattern. + * - `"closed"`: Normal operation, requests pass through + * - `"open"`: Failing, all requests rejected immediately + * - `"half_open"`: Testing recovery, limited requests allowed + */ +export type CircuitState = "closed" | "open" | "half_open"; + +/** Options for {@linkcode CircuitBreaker}. */ +export interface CircuitBreakerOptions { + /** + * Number of failures before opening the circuit. + * + * @default {5} + */ + failureThreshold?: number; + + /** + * Duration in milliseconds the circuit stays open before entering half-open. + * + * @default {30000} + */ + cooldownMs?: number; + + /** + * Number of consecutive successes needed to close from half-open state. + * + * @default {2} + */ + successThreshold?: number; + + /** + * Maximum concurrent requests allowed in half-open state. + * + * @default {1} + */ + halfOpenMaxConcurrent?: number; + + /** + * Time window in milliseconds for counting failures. + * Failures older than this are forgotten (sliding window). + * Set to `0` to disable decay. + * + * @default {60000} + */ + failureWindowMs?: number; + + /** + * Custom predicate to determine if an error should count as a circuit failure. + * By default, any thrown error is a failure. + * + * @param error The error that was thrown. + * @returns `true` if the error should count as a circuit failure. + */ + isFailure?: (error: unknown) => boolean; + + /** + * Custom predicate to determine if a successful result should count as failure. + * Useful for HTTP responses where status codes indicate logical failures. + * + * @param result The successful result. + * @returns `true` if the result should count as a circuit failure. + */ + isResultFailure?: (result: T) => boolean; + + /** + * Callback invoked when circuit state changes. + * + * @param from Previous state. + * @param to New state. + */ + onStateChange?: (from: CircuitState, to: CircuitState) => void; + + /** + * Callback invoked when a failure is recorded. + * + * @param error The error that caused the failure. + * @param failureCount Current number of failures in the window. + */ + onFailure?: (error: unknown, failureCount: number) => void; + + /** + * Callback invoked when circuit opens. + * + * @param failureCount Number of failures that triggered the open. + */ + onOpen?: (failureCount: number) => void; + + /** + * Callback invoked when circuit closes (recovery complete). + */ + onClose?: () => void; +} + +/** Statistics returned by {@linkcode CircuitBreaker.getStats}. */ +export interface CircuitBreakerStats { + /** Current state of the circuit breaker. */ + state: CircuitState; + /** Number of failures in the current window. */ + failureCount: number; + /** Number of consecutive successes (relevant in half-open state). */ + consecutiveSuccesses: number; + /** Whether the circuit is currently allowing requests. */ + isAvailable: boolean; +} + +/** + * Error thrown when {@linkcode CircuitBreaker} is open and rejects a request. + * + * @experimental **UNSTABLE**: New API, yet to be vetted. + * + * @example Usage + * ```ts + * import { CircuitBreaker, CircuitBreakerOpenError } from "@std/async/unstable-circuit-breaker"; + * import { assert } from "@std/assert"; + * + * const breaker = new CircuitBreaker({ failureThreshold: 1 }); + * + * // Trigger a failure to open the circuit + * try { + * await breaker.execute(() => Promise.reject(new Error("fail"))); + * } catch (_) { + * // Expected to fail + * } + * + * // Now the circuit is open + * try { + * await breaker.execute(() => Promise.resolve("ok")); + * } catch (error) { + * assert(error instanceof CircuitBreakerOpenError); + * assert(error.remainingCooldownMs >= 0); + * } + * ``` + */ +export class CircuitBreakerOpenError extends Error { + /** + * Milliseconds until the circuit breaker cooldown expires. + * + * @example Usage + * ```ts + * import { CircuitBreakerOpenError } from "@std/async/unstable-circuit-breaker"; + * import { assertEquals } from "@std/assert"; + * + * const error = new CircuitBreakerOpenError(5000); + * assertEquals(error.remainingCooldownMs, 5000); + * ``` + */ + remainingCooldownMs: number; + + /** + * Constructs a new {@linkcode CircuitBreakerOpenError} instance. + * + * @param remainingCooldownMs Milliseconds until cooldown expires. + */ + constructor(remainingCooldownMs: number) { + super( + `Circuit breaker is open. Retry after ${remainingCooldownMs}ms.`, + ); + this.name = "CircuitBreakerOpenError"; + this.remainingCooldownMs = remainingCooldownMs; + } +} + +/** Internal state managed by the circuit breaker. */ +interface CircuitBreakerState { + state: CircuitState; + /** Failure timestamps in milliseconds since epoch. */ + failureTimestamps: number[]; + consecutiveSuccesses: number; + /** Timestamp when circuit opened, in milliseconds since epoch. */ + openedAt: number | null; + halfOpenInFlight: number; +} + +/** Creates initial circuit breaker state. */ +function createInitialState(): CircuitBreakerState { + return { + state: "closed", + failureTimestamps: [], + consecutiveSuccesses: 0, + openedAt: null, + halfOpenInFlight: 0, + }; +} + +/** + * Removes failure timestamps outside the decay window. + * + * @param timestamps Array of failure timestamps in ms. + * @param windowMs Duration window in milliseconds. + * @param nowMs Current time in milliseconds. + * @returns Filtered array of timestamps within the window. + */ +function pruneOldFailures( + timestamps: number[], + windowMs: number, + nowMs: number, +): number[] { + if (windowMs === 0) return timestamps; + const cutoff = nowMs - windowMs; + return timestamps.filter((ts) => ts > cutoff); +} + +/** + * A circuit breaker that wraps async operations to prevent cascading failures. + * + * The circuit breaker monitors for failures and "trips" (opens) when a threshold + * is reached, rejecting subsequent requests immediately without executing them. + * After a cooldown period, it enters a "half-open" state to test if the service + * has recovered. + * + * @experimental **UNSTABLE**: New API, yet to be vetted. + * + * @example Usage + * ```ts + * import { CircuitBreaker } from "@std/async/unstable-circuit-breaker"; + * import { assertEquals } from "@std/assert"; + * + * const breaker = new CircuitBreaker({ + * failureThreshold: 5, + * cooldownMs: 30000, + * }); + * + * assertEquals(breaker.state, "closed"); + * + * const result = await breaker.execute(() => Promise.resolve("success")); + * assertEquals(result, "success"); + * ``` + * + * @example Handling open circuit + * ```ts + * import { CircuitBreaker, CircuitBreakerOpenError } from "@std/async/unstable-circuit-breaker"; + * + * const breaker = new CircuitBreaker({ failureThreshold: 5 }); + * + * try { + * const result = await breaker.execute(() => fetch("https://api.example.com")); + * } catch (error) { + * if (error instanceof CircuitBreakerOpenError) { + * console.log(`Service unavailable, retry in ${error.remainingCooldownMs}ms`); + * } + * } + * ``` + * + * @example With custom failure detection + * ```ts no-assert + * import { CircuitBreaker } from "@std/async/unstable-circuit-breaker"; + * + * const breaker = new CircuitBreaker({ + * failureThreshold: 3, + * // Only count server errors as circuit failures + * isResultFailure: (response) => response.status >= 500, + * onStateChange: (from, to) => console.log(`Circuit: ${from} → ${to}`), + * }); + * ``` + * + * @example Composing with retry + * ```ts ignore + * import { retry } from "@std/async/retry"; + * import { CircuitBreaker } from "@std/async/unstable-circuit-breaker"; + * + * const breaker = new CircuitBreaker({ failureThreshold: 5 }); + * + * // Circuit breaker wraps retry - if service is down, fail fast + * const result = await breaker.execute(() => + * retry(() => fetch("https://api.example.com"), { maxAttempts: 3 }) + * ); + * ``` + * + * @typeParam T The type of value returned by the executed function. + */ +export class CircuitBreaker { + #failureThreshold: number; + #cooldownMs: number; + #successThreshold: number; + #halfOpenMaxConcurrent: number; + #failureWindowMs: number; + #isFailure: (error: unknown) => boolean; + #isResultFailure: (result: T) => boolean; + #onStateChange: ((from: CircuitState, to: CircuitState) => void) | undefined; + #onFailure: ((error: unknown, failureCount: number) => void) | undefined; + #onOpen: ((failureCount: number) => void) | undefined; + #onClose: (() => void) | undefined; + #state: CircuitBreakerState; + + /** + * Constructs a new {@linkcode CircuitBreaker} instance. + * + * @param options Configuration options for the circuit breaker. + */ + constructor(options: CircuitBreakerOptions = {}) { + const { + failureThreshold = 5, + cooldownMs = 30_000, + successThreshold = 2, + halfOpenMaxConcurrent = 1, + failureWindowMs = 60_000, + isFailure = () => true, + isResultFailure = () => false, + onStateChange, + onFailure, + onOpen, + onClose, + } = options; + + if (failureThreshold < 1) { + throw new TypeError( + `Cannot create circuit breaker as 'failureThreshold' must be at least 1: current value is ${failureThreshold}`, + ); + } + if (cooldownMs < 0) { + throw new TypeError( + `Cannot create circuit breaker as 'cooldownMs' must be non-negative: current value is ${cooldownMs}`, + ); + } + if (successThreshold < 1) { + throw new TypeError( + `Cannot create circuit breaker as 'successThreshold' must be at least 1: current value is ${successThreshold}`, + ); + } + if (halfOpenMaxConcurrent < 1) { + throw new TypeError( + `Cannot create circuit breaker as 'halfOpenMaxConcurrent' must be at least 1: current value is ${halfOpenMaxConcurrent}`, + ); + } + if (failureWindowMs < 0) { + throw new TypeError( + `Cannot create circuit breaker as 'failureWindowMs' must be non-negative: current value is ${failureWindowMs}`, + ); + } + + this.#failureThreshold = failureThreshold; + this.#cooldownMs = cooldownMs; + this.#successThreshold = successThreshold; + this.#halfOpenMaxConcurrent = halfOpenMaxConcurrent; + this.#failureWindowMs = failureWindowMs; + this.#isFailure = isFailure; + this.#isResultFailure = isResultFailure as (result: T) => boolean; + this.#onStateChange = onStateChange; + this.#onFailure = onFailure; + this.#onOpen = onOpen; + this.#onClose = onClose; + this.#state = createInitialState(); + } + + /** + * Current state of the circuit breaker. + * + * @example Usage + * ```ts + * import { CircuitBreaker } from "@std/async/unstable-circuit-breaker"; + * import { assertEquals } from "@std/assert"; + * + * const breaker = new CircuitBreaker(); + * assertEquals(breaker.state, "closed"); + * ``` + * + * @returns The current {@linkcode CircuitState}. + */ + get state(): CircuitState { + return this.#resolveCurrentState().state; + } + + /** + * Number of failures in the current window. + * + * @example Usage + * ```ts + * import { CircuitBreaker } from "@std/async/unstable-circuit-breaker"; + * import { assertEquals } from "@std/assert"; + * + * const breaker = new CircuitBreaker(); + * assertEquals(breaker.failureCount, 0); + * ``` + * + * @returns The number of failures recorded in the sliding window. + */ + get failureCount(): number { + return pruneOldFailures( + this.#state.failureTimestamps, + this.#failureWindowMs, + Date.now(), + ).length; + } + + /** + * Whether the circuit is currently allowing requests. + * + * @example Usage + * ```ts + * import { CircuitBreaker } from "@std/async/unstable-circuit-breaker"; + * import { assertEquals } from "@std/assert"; + * + * const breaker = new CircuitBreaker(); + * assertEquals(breaker.isAvailable, true); + * ``` + * + * @returns `true` if requests will be attempted, `false` if rejected. + */ + get isAvailable(): boolean { + const resolved = this.#resolveCurrentState(); + if (resolved.state === "closed") return true; + if (resolved.state === "open") return false; + // half_open: available if under concurrent limit + return resolved.halfOpenInFlight < this.#halfOpenMaxConcurrent; + } + + /** + * Executes an async operation through the circuit breaker. + * + * @example Usage + * ```ts + * import { CircuitBreaker } from "@std/async/unstable-circuit-breaker"; + * import { assertEquals } from "@std/assert"; + * + * const breaker = new CircuitBreaker({ failureThreshold: 5 }); + * const result = await breaker.execute(() => Promise.resolve("success")); + * assertEquals(result, "success"); + * ``` + * + * @typeParam R The return type of the function, must extend T. + * @param fn The async operation to execute. + * @returns The result of the operation. + * @throws {CircuitBreakerOpenError} If circuit is open. + */ + async execute(fn: () => Promise): Promise { + const currentTime = Date.now(); + const currentState = this.#resolveCurrentState(); + + // Check if we should reject + if (currentState.state === "open") { + const openedAt = currentState.openedAt!; + const cooldownEnd = openedAt + this.#cooldownMs; + const remainingMs = Math.max(0, cooldownEnd - currentTime); + throw new CircuitBreakerOpenError(Math.round(remainingMs)); + } + + // In half-open, check concurrent limit + if (currentState.state === "half_open") { + if (currentState.halfOpenInFlight >= this.#halfOpenMaxConcurrent) { + throw new CircuitBreakerOpenError(0); + } + this.#state = { + ...this.#state, + halfOpenInFlight: this.#state.halfOpenInFlight + 1, + }; + } + + let result: R; + try { + result = await fn(); + } catch (error) { + this.#handleFailure(error, currentState.state); + throw error; + } finally { + // Decrement half-open in-flight counter + if (currentState.state === "half_open") { + this.#state = { + ...this.#state, + halfOpenInFlight: Math.max(0, this.#state.halfOpenInFlight - 1), + }; + } + } + + // Check if successful result should count as failure + if (this.#isResultFailure(result)) { + const syntheticError = new Error("Result classified as failure"); + this.#handleFailure(syntheticError, currentState.state); + return result; // Still return the result, but record the failure + } + + // Success path + this.#handleSuccess(currentState.state); + return result; + } + + /** + * Forces the circuit breaker to open state. + * Useful for maintenance or known outages. + * + * @example Usage + * ```ts + * import { CircuitBreaker } from "@std/async/unstable-circuit-breaker"; + * import { assertEquals } from "@std/assert"; + * + * const breaker = new CircuitBreaker(); + * breaker.forceOpen(); + * assertEquals(breaker.state, "open"); + * ``` + */ + forceOpen(): void { + const previous = this.#state.state; + this.#state = { + ...this.#state, + state: "open", + openedAt: Date.now(), + consecutiveSuccesses: 0, + }; + if (previous !== "open") { + this.#onStateChange?.(previous, "open"); + this.#onOpen?.(this.failureCount); + } + } + + /** + * Forces the circuit breaker to closed state. + * Resets all failure counters. + * + * @example Usage + * ```ts + * import { CircuitBreaker } from "@std/async/unstable-circuit-breaker"; + * import { assertEquals } from "@std/assert"; + * + * const breaker = new CircuitBreaker(); + * breaker.forceOpen(); + * breaker.forceClose(); + * assertEquals(breaker.state, "closed"); + * ``` + */ + forceClose(): void { + const previous = this.#state.state; + this.#state = createInitialState(); + if (previous !== "closed") { + this.#onStateChange?.(previous, "closed"); + this.#onClose?.(); + } + } + + /** + * Resets the circuit breaker to initial state. + * + * @example Usage + * ```ts + * import { CircuitBreaker } from "@std/async/unstable-circuit-breaker"; + * import { assertEquals } from "@std/assert"; + * + * const breaker = new CircuitBreaker(); + * breaker.reset(); + * assertEquals(breaker.state, "closed"); + * assertEquals(breaker.failureCount, 0); + * ``` + */ + reset(): void { + const previous = this.#state.state; + this.#state = createInitialState(); + if (previous !== "closed") { + this.#onStateChange?.(previous, "closed"); + } + } + + /** + * Returns circuit breaker statistics for monitoring. + * + * @example Usage + * ```ts + * import { CircuitBreaker } from "@std/async/unstable-circuit-breaker"; + * import { assertEquals } from "@std/assert"; + * + * const breaker = new CircuitBreaker(); + * const stats = breaker.getStats(); + * assertEquals(stats.state, "closed"); + * assertEquals(stats.failureCount, 0); + * assertEquals(stats.isAvailable, true); + * ``` + * + * @returns Current stats including state, failure count, and availability. + */ + getStats(): CircuitBreakerStats { + return { + state: this.state, + failureCount: this.failureCount, + consecutiveSuccesses: this.#state.consecutiveSuccesses, + isAvailable: this.isAvailable, + }; + } + + /** + * Resolves the current state, handling automatic transitions. + * OPEN → HALF_OPEN after cooldown expires. + */ + #resolveCurrentState(): CircuitBreakerState { + const currentTime = Date.now(); + + // Auto-transition from OPEN to HALF_OPEN after cooldown + if ( + this.#state.state === "open" && + this.#state.openedAt !== null + ) { + const cooldownEnd = this.#state.openedAt + this.#cooldownMs; + if (currentTime >= cooldownEnd) { + const newState: CircuitBreakerState = { + ...this.#state, + state: "half_open", + consecutiveSuccesses: 0, + halfOpenInFlight: 0, + }; + this.#state = newState; + this.#onStateChange?.("open", "half_open"); + } + } + + return this.#state; + } + + /** Records a failure and potentially opens the circuit. */ + #handleFailure(error: unknown, previousState: CircuitState): void { + // Check if this error should count as a failure + if (!this.#isFailure(error)) { + return; + } + + const currentTime = Date.now(); + + // Prune old failures and add new one + const prunedFailures = pruneOldFailures( + this.#state.failureTimestamps, + this.#failureWindowMs, + currentTime, + ); + const newFailures = [...prunedFailures, currentTime]; + + this.#onFailure?.(error, newFailures.length); + + // In half-open, any failure reopens the circuit + if (previousState === "half_open") { + this.#state = { + ...this.#state, + state: "open", + failureTimestamps: newFailures, + openedAt: currentTime, + consecutiveSuccesses: 0, + }; + this.#onStateChange?.("half_open", "open"); + this.#onOpen?.(newFailures.length); + return; + } + + // In closed state, check threshold + if (newFailures.length >= this.#failureThreshold) { + this.#state = { + ...this.#state, + state: "open", + failureTimestamps: newFailures, + openedAt: currentTime, + consecutiveSuccesses: 0, + }; + this.#onStateChange?.("closed", "open"); + this.#onOpen?.(newFailures.length); + } else { + this.#state = { + ...this.#state, + failureTimestamps: newFailures, + consecutiveSuccesses: 0, + }; + } + } + + /** Records a success and potentially closes the circuit from half-open. */ + #handleSuccess(previousState: CircuitState): void { + if (previousState === "half_open") { + const newSuccessCount = this.#state.consecutiveSuccesses + 1; + + if (newSuccessCount >= this.#successThreshold) { + // Recovered! Close the circuit + this.#state = createInitialState(); + this.#onStateChange?.("half_open", "closed"); + this.#onClose?.(); + } else { + this.#state = { + ...this.#state, + consecutiveSuccesses: newSuccessCount, + }; + } + } else if (previousState === "closed") { + // Reset consecutive success counter on success in closed state + this.#state = { + ...this.#state, + consecutiveSuccesses: 0, + }; + } + } +} diff --git a/async/unstable_circuit_breaker_test.ts b/async/unstable_circuit_breaker_test.ts new file mode 100644 index 000000000000..b3e9a5c5d7fc --- /dev/null +++ b/async/unstable_circuit_breaker_test.ts @@ -0,0 +1,812 @@ +// Copyright 2018-2025 the Deno authors. MIT license. + +import { + assert, + assertEquals, + assertInstanceOf, + assertRejects, + assertThrows, +} from "@std/assert"; +import { FakeTime } from "@std/testing/time"; +import { + CircuitBreaker, + CircuitBreakerOpenError, + type CircuitBreakerStats, + type CircuitState, +} from "./unstable_circuit_breaker.ts"; + +Deno.test("CircuitBreaker constructor throws for invalid failureThreshold", () => { + assertThrows( + () => new CircuitBreaker({ failureThreshold: 0 }), + TypeError, + "'failureThreshold' must be at least 1", + ); + assertThrows( + () => new CircuitBreaker({ failureThreshold: -1 }), + TypeError, + "'failureThreshold' must be at least 1", + ); +}); + +Deno.test("CircuitBreaker constructor throws for invalid cooldownMs", () => { + assertThrows( + () => new CircuitBreaker({ cooldownMs: -1 }), + TypeError, + "'cooldownMs' must be non-negative", + ); +}); + +Deno.test("CircuitBreaker constructor throws for invalid successThreshold", () => { + assertThrows( + () => new CircuitBreaker({ successThreshold: 0 }), + TypeError, + "'successThreshold' must be at least 1", + ); +}); + +Deno.test("CircuitBreaker constructor throws for invalid halfOpenMaxConcurrent", () => { + assertThrows( + () => new CircuitBreaker({ halfOpenMaxConcurrent: 0 }), + TypeError, + "'halfOpenMaxConcurrent' must be at least 1", + ); +}); + +Deno.test("CircuitBreaker constructor throws for invalid failureWindowMs", () => { + assertThrows( + () => new CircuitBreaker({ failureWindowMs: -1 }), + TypeError, + "'failureWindowMs' must be non-negative", + ); +}); + +Deno.test("CircuitBreaker constructor defaults work correctly", () => { + const breaker = new CircuitBreaker(); + assertEquals(breaker.state, "closed"); + assertEquals(breaker.failureCount, 0); + assertEquals(breaker.isAvailable, true); +}); + +Deno.test("CircuitBreaker.execute() returns result on success", async () => { + const breaker = new CircuitBreaker(); + const result = await breaker.execute(() => Promise.resolve("success")); + assertEquals(result, "success"); + assertEquals(breaker.state, "closed"); +}); + +Deno.test("CircuitBreaker.execute() throws and records failure", async () => { + const breaker = new CircuitBreaker(); + const error = new Error("test error"); + + await assertRejects( + () => breaker.execute(() => Promise.reject(error)), + Error, + "test error", + ); + + assertEquals(breaker.failureCount, 1); +}); + +Deno.test("CircuitBreaker opens after reaching failure threshold", async () => { + const breaker = new CircuitBreaker({ failureThreshold: 3 }); + const error = new Error("fail"); + + // First two failures - still closed + for (let i = 0; i < 2; i++) { + try { + await breaker.execute(() => Promise.reject(error)); + } catch { /* expected */ } + } + assertEquals(breaker.state, "closed"); + assertEquals(breaker.failureCount, 2); + + // Third failure - opens the circuit + try { + await breaker.execute(() => Promise.reject(error)); + } catch { /* expected */ } + + assertEquals(breaker.state, "open"); + assertEquals(breaker.isAvailable, false); +}); + +Deno.test("CircuitBreaker throws CircuitBreakerOpenError when open", async () => { + const breaker = new CircuitBreaker({ + failureThreshold: 1, + cooldownMs: 30000, + }); + const error = new Error("fail"); + + // Trigger opening + try { + await breaker.execute(() => Promise.reject(error)); + } catch { /* expected */ } + + assertEquals(breaker.state, "open"); + + // Now execute should throw CircuitBreakerOpenError + const openError = await assertRejects( + () => breaker.execute(() => Promise.resolve("ignored")), + CircuitBreakerOpenError, + "Circuit breaker is open", + ); + assertInstanceOf(openError, CircuitBreakerOpenError); + assert(openError.remainingCooldownMs > 0); +}); + +Deno.test("CircuitBreaker transitions to half_open after cooldown", async () => { + using time = new FakeTime(); + + const breaker = new CircuitBreaker({ + failureThreshold: 1, + cooldownMs: 1000, + }); + + // Open the circuit + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + assertEquals(breaker.state, "open"); + + // Advance time past cooldown + time.tick(1001); + + assertEquals(breaker.state, "half_open"); + assertEquals(breaker.isAvailable, true); +}); + +Deno.test("CircuitBreaker closes from half_open after success threshold", async () => { + using time = new FakeTime(); + + const breaker = new CircuitBreaker({ + failureThreshold: 1, + cooldownMs: 1000, + successThreshold: 2, + }); + + // Open the circuit + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + + // Enter half_open + time.tick(1001); + assertEquals(breaker.state, "half_open"); + + // First success + await breaker.execute(() => Promise.resolve("ok")); + assertEquals(breaker.state, "half_open"); + + // Second success - should close + await breaker.execute(() => Promise.resolve("ok")); + assertEquals(breaker.state, "closed"); + assertEquals(breaker.failureCount, 0); +}); + +Deno.test("CircuitBreaker reopens from half_open on failure", async () => { + using time = new FakeTime(); + + const breaker = new CircuitBreaker({ + failureThreshold: 1, + cooldownMs: 1000, + }); + + // Open the circuit + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + + // Enter half_open + time.tick(1001); + assertEquals(breaker.state, "half_open"); + + // Failure in half_open should reopen + try { + await breaker.execute(() => Promise.reject(new Error("fail again"))); + } catch { /* expected */ } + + assertEquals(breaker.state, "open"); +}); + +Deno.test("CircuitBreaker.forceOpen() opens the circuit", () => { + const breaker = new CircuitBreaker(); + assertEquals(breaker.state, "closed"); + + breaker.forceOpen(); + assertEquals(breaker.state, "open"); + assertEquals(breaker.isAvailable, false); +}); + +Deno.test("CircuitBreaker.forceClose() closes the circuit and resets", async () => { + const breaker = new CircuitBreaker({ failureThreshold: 1 }); + + // Open the circuit + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + assertEquals(breaker.state, "open"); + + breaker.forceClose(); + assertEquals(breaker.state, "closed"); + assertEquals(breaker.failureCount, 0); + assertEquals(breaker.isAvailable, true); +}); + +Deno.test("CircuitBreaker.reset() resets to initial state", async () => { + const breaker = new CircuitBreaker({ failureThreshold: 3 }); + + // Accumulate some failures + for (let i = 0; i < 2; i++) { + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + } + assertEquals(breaker.failureCount, 2); + + breaker.reset(); + assertEquals(breaker.state, "closed"); + assertEquals(breaker.failureCount, 0); +}); + +Deno.test("CircuitBreaker.getStats() returns correct statistics", async () => { + const breaker = new CircuitBreaker({ failureThreshold: 3 }); + + let stats: CircuitBreakerStats = breaker.getStats(); + assertEquals(stats.state, "closed"); + assertEquals(stats.failureCount, 0); + assertEquals(stats.consecutiveSuccesses, 0); + assertEquals(stats.isAvailable, true); + + // Add a failure + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + + stats = breaker.getStats(); + assertEquals(stats.failureCount, 1); +}); + +Deno.test("CircuitBreaker isFailure predicate filters errors", async () => { + const breaker = new CircuitBreaker({ + failureThreshold: 1, + // Only count TypeError as failures + isFailure: (error) => error instanceof TypeError, + }); + + // Regular Error should not count + try { + await breaker.execute(() => Promise.reject(new Error("ignored"))); + } catch { /* expected */ } + assertEquals(breaker.failureCount, 0); + assertEquals(breaker.state, "closed"); + + // TypeError should count and open circuit + try { + await breaker.execute(() => Promise.reject(new TypeError("counts"))); + } catch { /* expected */ } + assertEquals(breaker.failureCount, 1); + assertEquals(breaker.state, "open"); +}); + +Deno.test("CircuitBreaker isResultFailure counts successful results as failures", async () => { + const breaker = new CircuitBreaker({ + failureThreshold: 1, + isResultFailure: (result) => result < 0, + }); + + // Positive result - success + const result1 = await breaker.execute(() => Promise.resolve(42)); + assertEquals(result1, 42); + assertEquals(breaker.failureCount, 0); + + // Negative result - counts as failure but still returns + const result2 = await breaker.execute(() => Promise.resolve(-1)); + assertEquals(result2, -1); + assertEquals(breaker.state, "open"); +}); + +Deno.test("CircuitBreaker failure window prunes old failures", async () => { + using time = new FakeTime(); + + const breaker = new CircuitBreaker({ + failureThreshold: 3, + failureWindowMs: 1000, + }); + + // Two failures + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + assertEquals(breaker.failureCount, 2); + + // Advance past window + time.tick(1001); + + // Old failures should be pruned + assertEquals(breaker.failureCount, 0); + + // Should now need 3 new failures to open + for (let i = 0; i < 2; i++) { + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + } + assertEquals(breaker.state, "closed"); + + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + assertEquals(breaker.state, "open"); +}); + +Deno.test("CircuitBreaker onStateChange callback is invoked", async () => { + using time = new FakeTime(); + + const transitions: Array<[CircuitState, CircuitState]> = []; + const breaker = new CircuitBreaker({ + failureThreshold: 1, + cooldownMs: 1000, + successThreshold: 1, + onStateChange: (from, to) => transitions.push([from, to]), + }); + + // Open + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + assertEquals(transitions, [["closed", "open"]]); + + // Half-open + time.tick(1001); + // Access state to trigger transition + breaker.state; + assertEquals(transitions, [["closed", "open"], ["open", "half_open"]]); + + // Close + await breaker.execute(() => Promise.resolve("ok")); + assertEquals(transitions, [ + ["closed", "open"], + ["open", "half_open"], + ["half_open", "closed"], + ]); +}); + +Deno.test("CircuitBreaker onFailure callback is invoked", async () => { + const failures: Array<{ error: unknown; count: number }> = []; + const breaker = new CircuitBreaker({ + failureThreshold: 3, + onFailure: (error, count) => failures.push({ error, count }), + }); + + const err1 = new Error("fail1"); + const err2 = new Error("fail2"); + + try { + await breaker.execute(() => Promise.reject(err1)); + } catch { /* expected */ } + try { + await breaker.execute(() => Promise.reject(err2)); + } catch { /* expected */ } + + assertEquals(failures.length, 2); + assertEquals(failures[0]?.error, err1); + assertEquals(failures[0]?.count, 1); + assertEquals(failures[1]?.error, err2); + assertEquals(failures[1]?.count, 2); +}); + +Deno.test("CircuitBreaker onOpen callback is invoked", async () => { + const openCalls: number[] = []; + const breaker = new CircuitBreaker({ + failureThreshold: 2, + onOpen: (count) => openCalls.push(count), + }); + + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + assertEquals(openCalls.length, 0); + + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + assertEquals(openCalls, [2]); +}); + +Deno.test("CircuitBreaker onClose callback is invoked", async () => { + using time = new FakeTime(); + + let closeCalled = false; + const breaker = new CircuitBreaker({ + failureThreshold: 1, + cooldownMs: 1000, + successThreshold: 1, + onClose: () => { + closeCalled = true; + }, + }); + + // Open + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + + // Half-open + time.tick(1001); + + // Close + await breaker.execute(() => Promise.resolve("ok")); + assertEquals(closeCalled, true); +}); + +Deno.test("CircuitBreaker half_open limits concurrent requests", async () => { + using time = new FakeTime(); + + const breaker = new CircuitBreaker({ + failureThreshold: 1, + cooldownMs: 1000, + halfOpenMaxConcurrent: 1, + }); + + // Open + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + + // Enter half-open + time.tick(1001); + assertEquals(breaker.state, "half_open"); + + // Start a slow request + let resolveFirst: (() => void) | undefined; + const firstPromise = breaker.execute( + () => + new Promise((resolve) => { + resolveFirst = () => resolve("first"); + }), + ); + + // Second request should be rejected + await assertRejects( + () => breaker.execute(() => Promise.resolve("second")), + CircuitBreakerOpenError, + ); + + // Complete first request + resolveFirst?.(); + await firstPromise; +}); + +Deno.test("CircuitBreaker with disabled failure window (0)", async () => { + using time = new FakeTime(); + + const breaker = new CircuitBreaker({ + failureThreshold: 3, + failureWindowMs: 0, // Disabled - failures never expire + }); + + // Add two failures + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + + // Advance time significantly + time.tick(100_000); + + // Failures should still be counted + assertEquals(breaker.failureCount, 2); + + // One more should open + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + assertEquals(breaker.state, "open"); +}); + +Deno.test("CircuitBreaker with zero cooldown transitions immediately", async () => { + const breaker = new CircuitBreaker({ + failureThreshold: 1, + cooldownMs: 0, + successThreshold: 1, + }); + + // Open + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + + // Should immediately be half_open (or allow immediate transition) + // Since cooldown is 0, checking state should transition + assertEquals(breaker.state, "half_open"); + + // Should be able to close immediately + await breaker.execute(() => Promise.resolve("ok")); + assertEquals(breaker.state, "closed"); +}); + +Deno.test("CircuitBreakerOpenError has correct properties", () => { + const error = new CircuitBreakerOpenError(5000); + + assertEquals(error.name, "CircuitBreakerOpenError"); + assertEquals(error.remainingCooldownMs, 5000); + assert(error.message.includes("5000ms")); + assertInstanceOf(error, Error); +}); + +Deno.test("CircuitBreaker type parameter constrains isResultFailure", async () => { + // This test verifies the generic type works correctly + interface ApiResponse { + status: number; + data: string; + } + + const breaker = new CircuitBreaker({ + failureThreshold: 1, + isResultFailure: (response) => response.status >= 500, + }); + + const result = await breaker.execute(() => + Promise.resolve({ status: 200, data: "ok" }) + ); + assertEquals(result.status, 200); + assertEquals(breaker.state, "closed"); + + // 500 error counts as failure + await breaker.execute(() => Promise.resolve({ status: 500, data: "error" })); + assertEquals(breaker.state, "open"); +}); + +Deno.test("CircuitBreaker multiple force operations", () => { + const transitions: Array<[CircuitState, CircuitState]> = []; + const breaker = new CircuitBreaker({ + onStateChange: (from, to) => transitions.push([from, to]), + }); + + // Repeated forceOpen should only trigger one transition + breaker.forceOpen(); + breaker.forceOpen(); + assertEquals(transitions.length, 1); + + // Repeated forceClose should only trigger one transition + breaker.forceClose(); + breaker.forceClose(); + assertEquals(transitions.length, 2); +}); + +Deno.test("CircuitBreaker.forceOpen() invokes onOpen callback", () => { + const openCalls: number[] = []; + const breaker = new CircuitBreaker({ + onOpen: (count) => openCalls.push(count), + }); + + breaker.forceOpen(); + assertEquals(openCalls, [0]); // No failures recorded, so count is 0 +}); + +Deno.test("CircuitBreaker.forceClose() invokes onClose callback", async () => { + let closeCalled = false; + const breaker = new CircuitBreaker({ + failureThreshold: 1, + onClose: () => { + closeCalled = true; + }, + }); + + // Open the circuit first + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + assertEquals(breaker.state, "open"); + + // forceClose should invoke onClose + breaker.forceClose(); + assertEquals(closeCalled, true); +}); + +Deno.test("CircuitBreaker.reset() invokes onStateChange when not closed", async () => { + const transitions: Array<[CircuitState, CircuitState]> = []; + const breaker = new CircuitBreaker({ + failureThreshold: 1, + onStateChange: (from, to) => transitions.push([from, to]), + }); + + // Open the circuit + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + assertEquals(breaker.state, "open"); + assertEquals(transitions, [["closed", "open"]]); + + // Reset should trigger state change callback + breaker.reset(); + assertEquals(transitions, [["closed", "open"], ["open", "closed"]]); +}); + +Deno.test("CircuitBreaker.reset() does not invoke onStateChange when already closed", () => { + const transitions: Array<[CircuitState, CircuitState]> = []; + const breaker = new CircuitBreaker({ + onStateChange: (from, to) => transitions.push([from, to]), + }); + + assertEquals(breaker.state, "closed"); + breaker.reset(); + assertEquals(transitions.length, 0); // No state change callback +}); + +Deno.test("CircuitBreaker half_open failure invokes onStateChange and onOpen", async () => { + using time = new FakeTime(); + + const transitions: Array<[CircuitState, CircuitState]> = []; + const openCalls: number[] = []; + const breaker = new CircuitBreaker({ + failureThreshold: 1, + cooldownMs: 1000, + onStateChange: (from, to) => transitions.push([from, to]), + onOpen: (count) => openCalls.push(count), + }); + + // Open the circuit + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + assertEquals(breaker.state, "open"); + assertEquals(transitions, [["closed", "open"]]); + assertEquals(openCalls, [1]); + + // Enter half-open + time.tick(1001); + assertEquals(breaker.state, "half_open"); + assertEquals(transitions, [["closed", "open"], ["open", "half_open"]]); + + // Failure in half-open should reopen and invoke callbacks + try { + await breaker.execute(() => Promise.reject(new Error("fail again"))); + } catch { /* expected */ } + assertEquals(breaker.state, "open"); + assertEquals(transitions, [ + ["closed", "open"], + ["open", "half_open"], + ["half_open", "open"], + ]); + assertEquals(openCalls, [1, 2]); // Second open call with 2 failures +}); + +Deno.test("CircuitBreaker consecutiveSuccesses tracked in half_open getStats", async () => { + using time = new FakeTime(); + + const breaker = new CircuitBreaker({ + failureThreshold: 1, + cooldownMs: 1000, + successThreshold: 3, + }); + + // Open the circuit + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + + // Enter half-open + time.tick(1001); + assertEquals(breaker.state, "half_open"); + + // First success + await breaker.execute(() => Promise.resolve("ok")); + let stats = breaker.getStats(); + assertEquals(stats.consecutiveSuccesses, 1); + + // Second success + await breaker.execute(() => Promise.resolve("ok")); + stats = breaker.getStats(); + assertEquals(stats.consecutiveSuccesses, 2); + + // Third success closes the circuit + await breaker.execute(() => Promise.resolve("ok")); + stats = breaker.getStats(); + assertEquals(stats.state, "closed"); + assertEquals(stats.consecutiveSuccesses, 0); // Reset after closing +}); + +Deno.test("CircuitBreaker isResultFailure in half_open reopens circuit", async () => { + using time = new FakeTime(); + + const breaker = new CircuitBreaker({ + failureThreshold: 1, + cooldownMs: 1000, + isResultFailure: (result) => result < 0, + }); + + // Open the circuit with a thrown error + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + + // Enter half-open + time.tick(1001); + assertEquals(breaker.state, "half_open"); + + // Result failure in half-open should reopen + const result = await breaker.execute(() => Promise.resolve(-1)); + assertEquals(result, -1); // Still returns the result + assertEquals(breaker.state, "open"); // But circuit is reopened +}); + +Deno.test("CircuitBreaker handles multiple half_open concurrent slots", async () => { + using time = new FakeTime(); + + const breaker = new CircuitBreaker({ + failureThreshold: 1, + cooldownMs: 1000, + halfOpenMaxConcurrent: 2, + successThreshold: 2, + }); + + // Open + try { + await breaker.execute(() => Promise.reject(new Error("fail"))); + } catch { /* expected */ } + + // Enter half-open + time.tick(1001); + assertEquals(breaker.state, "half_open"); + assertEquals(breaker.isAvailable, true); + + // Start two concurrent requests (should both be allowed) + let resolve1: (() => void) | undefined; + let resolve2: (() => void) | undefined; + const promise1 = breaker.execute( + () => + new Promise((r) => { + resolve1 = () => r("first"); + }), + ); + const promise2 = breaker.execute( + () => + new Promise((r) => { + resolve2 = () => r("second"); + }), + ); + + // Third request should be rejected (at max concurrent) + await assertRejects( + () => breaker.execute(() => Promise.resolve("third")), + CircuitBreakerOpenError, + ); + + // Complete both requests + resolve1?.(); + resolve2?.(); + await promise1; + await promise2; + + assertEquals(breaker.state, "closed"); // Both successes met threshold +}); + +Deno.test("CircuitBreaker isFailure predicate prevents failure recording", async () => { + const failures: Array<{ error: unknown; count: number }> = []; + const breaker = new CircuitBreaker({ + failureThreshold: 2, + isFailure: (error) => !(error instanceof TypeError), + onFailure: (error, count) => failures.push({ error, count }), + }); + + // TypeError should not be recorded + const typeError = new TypeError("ignored"); + try { + await breaker.execute(() => Promise.reject(typeError)); + } catch { /* expected */ } + assertEquals(failures.length, 0); + assertEquals(breaker.failureCount, 0); + + // Regular Error should be recorded + const regularError = new Error("counted"); + try { + await breaker.execute(() => Promise.reject(regularError)); + } catch { /* expected */ } + assertEquals(failures.length, 1); + assertEquals(breaker.failureCount, 1); +});