diff --git a/apps/server/src/provider/Layers/ProviderRegistry.test.ts b/apps/server/src/provider/Layers/ProviderRegistry.test.ts index fb6eb3b443d..f4136e3ef73 100644 --- a/apps/server/src/provider/Layers/ProviderRegistry.test.ts +++ b/apps/server/src/provider/Layers/ProviderRegistry.test.ts @@ -1,9 +1,11 @@ import * as NodeServices from "@effect/platform-node/NodeServices"; import { describe, it, assert } from "@effect/vitest"; +import * as Deferred from "effect/Deferred"; import * as Effect from "effect/Effect"; import * as Exit from "effect/Exit"; import * as Fiber from "effect/Fiber"; import * as Layer from "effect/Layer"; +import * as Option from "effect/Option"; import * as PubSub from "effect/PubSub"; import * as Ref from "effect/Ref"; import * as Schema from "effect/Schema"; @@ -49,6 +51,7 @@ import type { ProviderInstance } from "../ProviderDriver.ts"; import { ProviderInstanceRegistry } from "../Services/ProviderInstanceRegistry.ts"; import { ProviderRegistry } from "../Services/ProviderRegistry.ts"; import { makeManualOnlyProviderMaintenanceCapabilities } from "../providerMaintenance.ts"; +import { makeManagedServerProvider } from "../makeManagedServerProvider.ts"; const decodeServerSettings = Schema.decodeSync(ServerSettings); const encodeServerSettings = Schema.encodeSync(ServerSettings); const encodedDefaultServerSettings = encodeServerSettings(DEFAULT_SERVER_SETTINGS); @@ -750,6 +753,7 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T assert.deepStrictEqual((yield* registry.getProviders)[0]?.models, [ ...initialProvider.models, ]); + yield* Effect.yieldNow; yield* PubSub.publish(changes, refreshedProvider); let cachedProvider = yield* readProviderStatusCache(filePath); @@ -771,6 +775,111 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T }), ); + it.effect("does not block layer build on the initial provider refresh", () => + Effect.gen(function* () { + const codexDriver = ProviderDriverKind.make("codex"); + const codexInstanceId = ProviderInstanceId.make("codex"); + const pendingProvider = { + instanceId: codexInstanceId, + driver: codexDriver, + status: "disabled", + enabled: false, + installed: false, + auth: { status: "unknown" }, + checkedAt: "2026-05-01T00:00:00.000Z", + version: null, + models: [], + slashCommands: [], + skills: [], + } as const satisfies ServerProvider; + const refreshedProvider = { + ...pendingProvider, + status: "ready", + enabled: true, + installed: true, + checkedAt: "2026-05-01T00:01:00.000Z", + version: "1.0.0", + } as const satisfies ServerProvider; + const refreshStarted = yield* Deferred.make(); + const releaseRefresh = yield* Deferred.make(); + const instance = { + instanceId: codexInstanceId, + driverKind: codexDriver, + continuationIdentity: { + driverKind: codexDriver, + continuationKey: "codex:instance:codex", + }, + displayName: undefined, + enabled: true, + snapshot: { + maintenanceCapabilities: makeManualOnlyProviderMaintenanceCapabilities({ + provider: codexDriver, + packageName: null, + }), + getSnapshot: Effect.succeed(pendingProvider), + refresh: Deferred.succeed(refreshStarted, undefined).pipe( + Effect.ignore, + Effect.andThen(Deferred.await(releaseRefresh)), + Effect.as(refreshedProvider), + ), + streamChanges: Stream.empty, + }, + adapter: {} as ProviderInstance["adapter"], + textGeneration: {} as ProviderInstance["textGeneration"], + } satisfies ProviderInstance; + const instanceRegistryLayer = Layer.succeed(ProviderInstanceRegistry, { + getInstance: (instanceId) => + Effect.succeed(instanceId === codexInstanceId ? instance : undefined), + listInstances: Effect.succeed([instance]), + listUnavailable: Effect.succeed([]), + streamChanges: Stream.empty, + subscribeChanges: Effect.flatMap(PubSub.unbounded(), (pubsub) => + PubSub.subscribe(pubsub), + ), + }); + const scope = yield* Scope.make(); + yield* Effect.addFinalizer(() => Scope.close(scope, Exit.void)); + const runtimeServices = yield* Layer.build( + ProviderRegistryLive.pipe( + Layer.provideMerge(instanceRegistryLayer), + Layer.provideMerge( + ServerConfig.layerTest(process.cwd(), { + prefix: "t3-provider-registry-nonblocking-refresh-", + }), + ), + Layer.provideMerge(NodeServices.layer), + ), + ).pipe(Scope.provide(scope), Effect.timeout("500 millis")); + + yield* Effect.gen(function* () { + const registry = yield* ProviderRegistry; + assert.deepStrictEqual(yield* registry.getProviders, [pendingProvider]); + const prematureRefresh = yield* Deferred.poll(refreshStarted); + assert.strictEqual(Option.isNone(prematureRefresh), true); + yield* registry.startBackgroundRefreshes; + let started = yield* Deferred.poll(refreshStarted); + for (let attempt = 0; attempt < 50 && Option.isNone(started); attempt += 1) { + yield* Effect.yieldNow; + started = yield* Deferred.poll(refreshStarted); + } + assert.strictEqual(Option.isSome(started), true); + + const updatesFiber = yield* registry.streamChanges.pipe( + Stream.take(1), + Stream.runCollect, + Effect.forkChild, + ); + yield* Deferred.succeed(releaseRefresh, undefined); + + const updates = Array.from( + yield* Fiber.join(updatesFiber).pipe(Effect.timeout("1 second")), + ); + assert.deepStrictEqual(updates, [[refreshedProvider]]); + assert.deepStrictEqual(yield* registry.getProviders, [refreshedProvider]); + }).pipe(Effect.provide(runtimeServices)); + }), + ); + it.effect("returns the cached provider list when a manual refresh fails", () => Effect.gen(function* () { const codexDriver = ProviderDriverKind.make("codex"); @@ -845,6 +954,129 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T }), ); + it.effect("is the single startup refresh owner for managed provider instances", () => + Effect.scoped( + Effect.gen(function* () { + const codexDriver = ProviderDriverKind.make("codex"); + const codexInstanceId = ProviderInstanceId.make("codex"); + const initialProvider = { + instanceId: codexInstanceId, + driver: codexDriver, + status: "warning", + enabled: true, + installed: false, + auth: { status: "unknown" }, + checkedAt: "2026-04-29T10:00:00.000Z", + version: null, + message: "Checking provider availability...", + models: [], + slashCommands: [], + skills: [], + } as const satisfies ServerProvider; + const { message: _message, ...initialProviderWithoutMessage } = initialProvider; + const refreshedProvider = { + ...initialProviderWithoutMessage, + status: "ready", + installed: true, + checkedAt: "2026-04-29T10:00:01.000Z", + } as const satisfies ServerProvider; + const checkCalls = yield* Ref.make(0); + const managedProvider = yield* makeManagedServerProvider<{ readonly enabled: boolean }>( + { + maintenanceCapabilities: makeManualOnlyProviderMaintenanceCapabilities({ + provider: codexDriver, + packageName: null, + }), + getSettings: Effect.succeed({ enabled: true }), + streamSettings: Stream.empty, + haveSettingsChanged: (previous, next) => previous.enabled !== next.enabled, + initialSnapshot: () => Effect.succeed(initialProvider), + checkProvider: Ref.update(checkCalls, (count) => count + 1).pipe( + Effect.as(refreshedProvider), + ), + refreshInterval: "1 hour", + }, + ); + + yield* Effect.yieldNow; + assert.strictEqual( + yield* Ref.get(checkCalls), + 0, + "managed providers must not self-start their own boot probe", + ); + + const instance = { + instanceId: codexInstanceId, + driverKind: codexDriver, + continuationIdentity: { + driverKind: codexDriver, + continuationKey: "codex:instance:codex", + }, + displayName: undefined, + enabled: true, + snapshot: managedProvider, + adapter: {} as ProviderInstance["adapter"], + textGeneration: {} as ProviderInstance["textGeneration"], + } satisfies ProviderInstance; + const instanceRegistryLayer = Layer.succeed(ProviderInstanceRegistry, { + getInstance: (instanceId) => + Effect.succeed(instanceId === codexInstanceId ? instance : undefined), + listInstances: Effect.succeed([instance]), + listUnavailable: Effect.succeed([]), + streamChanges: Stream.empty, + subscribeChanges: Effect.flatMap(PubSub.unbounded(), (pubsub) => + PubSub.subscribe(pubsub), + ), + }); + const scope = yield* Scope.make(); + yield* Effect.addFinalizer(() => Scope.close(scope, Exit.void)); + const runtimeServices = yield* Layer.build( + ProviderRegistryLive.pipe( + Layer.provideMerge(instanceRegistryLayer), + Layer.provideMerge( + ServerConfig.layerTest(process.cwd(), { + prefix: "t3-provider-registry-single-startup-refresh-", + }), + ), + Layer.provideMerge(NodeServices.layer), + ), + ).pipe(Scope.provide(scope)); + + yield* Effect.gen(function* () { + const registry = yield* ProviderRegistry; + + assert.deepStrictEqual(yield* registry.getProviders, [initialProvider]); + assert.strictEqual( + yield* Ref.get(checkCalls), + 0, + "ProviderRegistryLive should wait for the startup refresh lifecycle hook", + ); + + yield* registry.startBackgroundRefreshes; + + let providers = yield* registry.getProviders; + let calls = yield* Ref.get(checkCalls); + for ( + let attempt = 0; + attempt < 50 && (calls !== 1 || providers[0]?.status !== refreshedProvider.status); + attempt += 1 + ) { + yield* Effect.yieldNow; + providers = yield* registry.getProviders; + calls = yield* Ref.get(checkCalls); + } + + assert.deepStrictEqual(providers, [refreshedProvider]); + assert.strictEqual( + calls, + 1, + "ProviderRegistryLive should perform exactly one startup refresh", + ); + }).pipe(Effect.provide(runtimeServices)); + }), + ), + ); + it.effect("keeps consuming registry changes after one sync fails", () => Effect.gen(function* () { const codexDriver = ProviderDriverKind.make("codex"); @@ -940,6 +1172,7 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T yield* Effect.gen(function* () { const registry = yield* ProviderRegistry; assert.deepStrictEqual(yield* registry.getProviders, [codexProvider]); + yield* registry.startBackgroundRefreshes; yield* Ref.set(failNextList, true); yield* PubSub.publish(changes, undefined); @@ -1041,7 +1274,21 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T yield* Effect.gen(function* () { const registry = yield* ProviderRegistry; - const providers = yield* registry.getProviders; + yield* registry.startBackgroundRefreshes; + const providers = yield* Effect.gen(function* () { + for (let attempts = 0; attempts < 60; attempts += 1) { + const providers = yield* registry.getProviders; + const codexPersonal = providers.find( + (provider) => provider.instanceId === "codex_personal", + ); + if (codexPersonal?.status === "error") { + return providers; + } + yield* TestClock.adjust("50 millis"); + yield* Effect.yieldNow; + } + return yield* registry.getProviders; + }); const codexPersonal = providers.find( (provider) => provider.instanceId === "codex_personal", ); @@ -1116,29 +1363,49 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T yield* Effect.gen(function* () { const registry = yield* ProviderRegistry; + yield* registry.startBackgroundRefreshes; // Boot-time probe: the default codex instance is enabled with // `firstMissing`, so the real spawner yields ENOENT and the // snapshot should be `status: "error"`. What *distinguishes* // the two probe runs is `checkedAt` — each probe stamps a // fresh DateTime, so we capture it and assert it advances // after the settings mutation. - const initialProviders = yield* registry.getProviders; + const initialProviders = yield* Effect.gen(function* () { + for (let attempts = 0; attempts < 60; attempts += 1) { + const providers = yield* registry.getProviders; + const codex = providers.find((provider) => provider.instanceId === "codex"); + if (codex?.status === "error") { + return providers; + } + yield* TestClock.adjust("50 millis"); + yield* Effect.yieldNow; + } + return yield* registry.getProviders; + }); const initialCodex = initialProviders.find( (provider) => provider.instanceId === "codex", ); assert.strictEqual(initialCodex?.status, "error"); assert.strictEqual(initialCodex?.installed, false); const initialCheckedAt = initialCodex?.checkedAt; - assert.notStrictEqual(initialCheckedAt, undefined); + if (initialCheckedAt === undefined) { + assert.fail("Expected initial codex probe to set checkedAt"); + } + + yield* TestClock.adjust("50 millis"); + + // Advance the virtual clock before driving the settings change so + // the fresh probe's `checkedAt` can distinguish it from the + // boot-time probe. + yield* TestClock.adjust("50 millis"); // Drive a settings change. The Hydration layer's // `SettingsWatcherLive` consumes this via `streamChanges`, // calls `reconcile`, which rebuilds the codex instance (the // envelope changed because `binaryPath` differs → `entryEqual` - // is false). The registry's `Stream.runForEach( - // instanceRegistry.streamChanges, () => syncLiveSources)` - // fires `syncLiveSources`, which subscribes + awaits a fresh - // refresh on the rebuilt instance. + // is false). The registry change consumer fires `syncLiveSources`, + // which subscribes and starts a fresh refresh on the rebuilt + // instance. yield* serverSettings.updateSettings({ providers: { codex: { enabled: true, binaryPath: secondMissing }, @@ -1151,7 +1418,7 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T for (let attempts = 0; attempts < 60; attempts += 1) { const providers = yield* registry.getProviders; const codex = providers.find((provider) => provider.instanceId === "codex"); - if (codex !== undefined && codex.checkedAt !== initialCheckedAt) { + if (codex !== undefined && codex.checkedAt > initialCheckedAt) { return providers; } yield* TestClock.adjust("50 millis"); diff --git a/apps/server/src/provider/Layers/ProviderRegistry.ts b/apps/server/src/provider/Layers/ProviderRegistry.ts index 22a120f0a52..4e845b61feb 100644 --- a/apps/server/src/provider/Layers/ProviderRegistry.ts +++ b/apps/server/src/provider/Layers/ProviderRegistry.ts @@ -39,6 +39,7 @@ import * as PubSub from "effect/PubSub"; import * as Ref from "effect/Ref"; import * as Stream from "effect/Stream"; import * as Semaphore from "effect/Semaphore"; +import * as Scope from "effect/Scope"; import { ServerConfig } from "../../config.ts"; import { ProviderInstanceRegistry } from "../Services/ProviderInstanceRegistry.ts"; @@ -189,6 +190,7 @@ export const ProviderRegistryLive = Layer.effect( Effect.gen(function* () { const instanceRegistry = yield* ProviderInstanceRegistry; const config = yield* ServerConfig; + const providerRegistryScope = yield* Scope.Scope; const fileSystem = yield* FileSystem.FileSystem; const path = yield* Path.Path; @@ -278,6 +280,7 @@ export const ProviderRegistryLive = Layer.effect( const liveSubsRef = yield* Ref.make>( new Map(), ); + const isServerReadyRef = yield* Ref.make(false); // Serialize `syncLiveSources` so a rapid burst of reconciles doesn't // interleave two passes clobbering each other's fiber bookkeeping. const syncSemaphore = yield* Semaphore.make(1); @@ -438,6 +441,35 @@ export const ProviderRegistryLive = Layer.effect( ), ); }); + const refreshOneLiveSource = Effect.fn("refreshOneLiveSource")(function* ( + providerSource: ProviderSnapshotSource, + instance: ProviderInstance, + ) { + const nextProvider = yield* providerSource.refresh; + const liveSubs = yield* Ref.get(liveSubsRef); + if (liveSubs.get(providerSource.instanceId) !== instance) { + return yield* Ref.get(providersRef); + } + return yield* correlateSnapshotWithSource(providerSource, nextProvider).pipe( + Effect.flatMap(syncProvider), + ); + }); + const forkRefreshForLiveInstance = (instance: ProviderInstance) => + refreshOneLiveSource(buildSnapshotSource(instance), instance).pipe( + Effect.ignoreCause({ log: true }), + Effect.forkIn(providerRegistryScope), + ); + const refreshCurrentLiveSources = Effect.fn("refreshCurrentLiveSources")(function* () { + const liveSubs = yield* Ref.get(liveSubsRef); + yield* Effect.forEach( + Array.from(liveSubs.values()), + (instance) => forkRefreshForLiveInstance(instance), + { + concurrency: "unbounded", + discard: true, + }, + ); + }); const refreshAll = Effect.fn("refreshAll")(function* () { const sources = yield* getLiveSources; @@ -492,16 +524,16 @@ export const ProviderRegistryLive = Layer.effect( * - subscribe to each newly-added or rebuilt instance's * `streamChanges` (so periodic + enrichment refreshes land in * `providersRef`); - * - force-refresh each newly-added/rebuilt instance and feed the - * result directly into `providersRef`, bypassing the PubSub - * attachment race that otherwise drops the initial probe; + * - after the server is ready, start a force-refresh for each + * newly-added/rebuilt instance and feed the result into + * `providersRef` when it completes; * - prune `providersRef` of instances that no longer exist. * - * Initial refreshes are awaited in parallel rather than forked, so - * callers (layer build; `streamChanges` watcher) see fully-probed - * state on return. This matters for layer build in particular: - * consumers reading `getProviders` immediately after layer build - * expect the probe to have already landed. + * Boot refreshes wait until startup explicitly enables background + * refreshes, and every refresh is forked instead of awaited, so server + * readiness is not blocked on provider CLIs. Consumers first see cached + * or pending fallback snapshots, then receive provider updates through + * `streamChanges` as checks complete. * * Per-instance subscription fibers are not tracked explicitly. When * a rebuilt instance's old child scope closes, its PubSub shuts @@ -543,10 +575,8 @@ export const ProviderRegistryLive = Layer.effect( } // Fork long-lived subscriptions to each new/rebuilt instance's - // change stream BEFORE kicking off refreshes — if the driver's - // own initial probe (line 140 in `makeManagedServerProvider`) - // wins the refreshSemaphore race, its PubSub publish must land - // in an active subscriber or the result is dropped. + // change stream BEFORE kicking off refreshes so every published + // provider update has an active subscriber. for (const [, instance] of newlyAdded) { const source = buildSnapshotSource(instance); yield* Stream.runForEach(source.streamChanges, (provider) => @@ -554,18 +584,6 @@ export const ProviderRegistryLive = Layer.effect( ).pipe(Effect.forkScoped); } - // Force-refresh every new/rebuilt instance in parallel and wait - // for them all to complete. The refresh's result is piped - // directly into `syncProvider`, so `providersRef` is populated - // deterministically by the time this block returns — regardless - // of PubSub subscription timing. Failures are logged and - // swallowed so one bad driver can't wedge the whole registry. - yield* Effect.forEach( - newlyAdded, - ([, instance]) => - refreshOneSource(buildSnapshotSource(instance)).pipe(Effect.ignoreCause({ log: true })), - { concurrency: "unbounded", discard: true }, - ); yield* upsertProviders(unavailableProviders, { persist: false, replace: true, @@ -602,6 +620,12 @@ export const ProviderRegistryLive = Layer.effect( } return next; }); + + if (yield* Ref.get(isServerReadyRef)) { + for (const [, instance] of newlyAdded) { + yield* forkRefreshForLiveInstance(instance); + } + } }), ); const syncLiveSourcesAndContinue = syncLiveSources.pipe( @@ -655,8 +679,8 @@ export const ProviderRegistryLive = Layer.effect( // instance never propagate to the aggregator's `providersRef`.) const instanceChanges = yield* instanceRegistry.subscribeChanges; // Initial sync: subscribe + kick off refreshes for every instance - // present at boot. Run synchronously so consumers pulling immediately - // after the layer build see the correct aggregator state. + // present at boot. Provider checks run in the background so layer + // build can complete once fallback/cached snapshots are available. yield* syncLiveSources; // React to registry mutations — instance added / removed / rebuilt. // `Stream.fromSubscription` builds a stream over the pre-acquired @@ -687,6 +711,9 @@ export const ProviderRegistryLive = Layer.effect( refreshInstance(instanceId).pipe(Effect.catchCause(recoverRefreshFailure)), getProviderMaintenanceCapabilitiesForInstance, setProviderMaintenanceActionState, + startBackgroundRefreshes: Ref.set(isServerReadyRef, true).pipe( + Effect.andThen(refreshCurrentLiveSources), + ), get streamChanges() { return Stream.fromPubSub(changesPubSub); }, diff --git a/apps/server/src/provider/Services/ProviderRegistry.ts b/apps/server/src/provider/Services/ProviderRegistry.ts index b7426b30338..df991d9e6b7 100644 --- a/apps/server/src/provider/Services/ProviderRegistry.ts +++ b/apps/server/src/provider/Services/ProviderRegistry.ts @@ -69,6 +69,16 @@ export interface ProviderRegistryShape { readonly state: ServerProviderUpdateState | null; }) => Effect.Effect>; + /** + * Enable and start lifecycle-owned background refreshes. + * + * Server startup calls this after command readiness and the ready + * lifecycle event, so provider CLI probes cannot block the server from + * accepting commands. Manual `refresh` and `refreshInstance` remain + * available before this point. + */ + readonly startBackgroundRefreshes: Effect.Effect; + /** * Stream of provider snapshot updates — one emission per aggregated * change. The array contains the full current state. diff --git a/apps/server/src/provider/makeManagedServerProvider.test.ts b/apps/server/src/provider/makeManagedServerProvider.test.ts index 1f3ebeab089..502f87ecfa4 100644 --- a/apps/server/src/provider/makeManagedServerProvider.test.ts +++ b/apps/server/src/provider/makeManagedServerProvider.test.ts @@ -7,6 +7,7 @@ import * as Fiber from "effect/Fiber"; import * as PubSub from "effect/PubSub"; import * as Ref from "effect/Ref"; import * as Stream from "effect/Stream"; +import * as TestClock from "effect/testing/TestClock"; import { makeManagedServerProvider } from "./makeManagedServerProvider.ts"; @@ -101,45 +102,71 @@ const enrichedSnapshotSecond: ServerProvider = { }; describe("makeManagedServerProvider", () => { - it.effect( - "runs the initial provider check in the background and streams the refreshed snapshot", - () => - Effect.scoped( - Effect.gen(function* () { - const checkCalls = yield* Ref.make(0); - const releaseCheck = yield* Deferred.make(); - const provider = yield* makeManagedServerProvider({ - maintenanceCapabilities, - getSettings: Effect.succeed({ enabled: true }), - streamSettings: Stream.empty, - haveSettingsChanged: (previous, next) => previous.enabled !== next.enabled, - initialSnapshot: () => Effect.succeed(initialSnapshot), - checkProvider: Ref.update(checkCalls, (count) => count + 1).pipe( - Effect.flatMap(() => Deferred.await(releaseCheck)), - Effect.as(refreshedSnapshot), - ), - refreshInterval: "1 hour", - }); + it.effect("does not probe during construction or unchanged snapshot reads", () => + Effect.scoped( + Effect.gen(function* () { + const checkCalls = yield* Ref.make(0); + const provider = yield* makeManagedServerProvider({ + maintenanceCapabilities, + getSettings: Effect.succeed({ enabled: true }), + streamSettings: Stream.empty, + haveSettingsChanged: (previous, next) => previous.enabled !== next.enabled, + initialSnapshot: () => Effect.succeed(initialSnapshot), + checkProvider: Ref.update(checkCalls, (count) => count + 1).pipe( + Effect.as(refreshedSnapshot), + ), + refreshInterval: "1 hour", + }); - const initial = yield* provider.getSnapshot; - assert.deepStrictEqual(initial, initialSnapshot); + yield* Effect.yieldNow; - const updatesFiber = yield* Stream.take(provider.streamChanges, 1).pipe( - Stream.runCollect, - Effect.forkChild, - ); - yield* Effect.yieldNow; + assert.deepStrictEqual(yield* provider.getSnapshot, initialSnapshot); + assert.deepStrictEqual(yield* provider.getSnapshot, initialSnapshot); + assert.strictEqual(yield* Ref.get(checkCalls), 0); + }), + ), + ); - yield* Deferred.succeed(releaseCheck, undefined); + it.effect("streams an explicit provider refresh", () => + Effect.scoped( + Effect.gen(function* () { + const checkCalls = yield* Ref.make(0); + const releaseCheck = yield* Deferred.make(); + const provider = yield* makeManagedServerProvider({ + maintenanceCapabilities, + getSettings: Effect.succeed({ enabled: true }), + streamSettings: Stream.empty, + haveSettingsChanged: (previous, next) => previous.enabled !== next.enabled, + initialSnapshot: () => Effect.succeed(initialSnapshot), + checkProvider: Ref.update(checkCalls, (count) => count + 1).pipe( + Effect.flatMap(() => Deferred.await(releaseCheck)), + Effect.as(refreshedSnapshot), + ), + refreshInterval: "1 hour", + }); + + const initial = yield* provider.getSnapshot; + assert.deepStrictEqual(initial, initialSnapshot); + + const updatesFiber = yield* Stream.take(provider.streamChanges, 1).pipe( + Stream.runCollect, + Effect.forkChild, + ); + const refreshFiber = yield* provider.refresh.pipe(Effect.forkChild); + yield* Effect.yieldNow; + + yield* Deferred.succeed(releaseCheck, undefined); - const updates = Array.from(yield* Fiber.join(updatesFiber)); - const latest = yield* provider.getSnapshot; + const refreshed = yield* Fiber.join(refreshFiber); + const updates = Array.from(yield* Fiber.join(updatesFiber)); + const latest = yield* provider.getSnapshot; - assert.deepStrictEqual(updates, [refreshedSnapshot]); - assert.deepStrictEqual(latest, refreshedSnapshot); - assert.strictEqual(yield* Ref.get(checkCalls), 1); - }), - ), + assert.deepStrictEqual(refreshed, refreshedSnapshot); + assert.deepStrictEqual(updates, [refreshedSnapshot]); + assert.deepStrictEqual(latest, refreshedSnapshot); + assert.strictEqual(yield* Ref.get(checkCalls), 1); + }), + ), ); it.effect("reruns the provider check when streamed settings change", () => @@ -148,7 +175,6 @@ describe("makeManagedServerProvider", () => { const settingsRef = yield* Ref.make({ enabled: true }); const settingsChanges = yield* PubSub.unbounded(); const checkCalls = yield* Ref.make(0); - const releaseInitialCheck = yield* Deferred.make(); const releaseSettingsCheck = yield* Deferred.make(); const provider = yield* makeManagedServerProvider({ maintenanceCapabilities, @@ -157,22 +183,19 @@ describe("makeManagedServerProvider", () => { haveSettingsChanged: (previous, next) => previous.enabled !== next.enabled, initialSnapshot: () => Effect.succeed(initialSnapshot), checkProvider: Ref.updateAndGet(checkCalls, (count) => count + 1).pipe( - Effect.flatMap((count) => - count === 1 - ? Deferred.await(releaseInitialCheck).pipe(Effect.as(refreshedSnapshot)) - : Deferred.await(releaseSettingsCheck).pipe(Effect.as(refreshedSnapshotSecond)), + Effect.flatMap(() => + Deferred.await(releaseSettingsCheck).pipe(Effect.as(refreshedSnapshotSecond)), ), ), refreshInterval: "1 hour", }); - const updatesFiber = yield* Stream.take(provider.streamChanges, 2).pipe( + const updatesFiber = yield* Stream.take(provider.streamChanges, 1).pipe( Stream.runCollect, Effect.forkChild, ); yield* Effect.yieldNow; - yield* Deferred.succeed(releaseInitialCheck, undefined); yield* Ref.set(settingsRef, { enabled: false }); yield* PubSub.publish(settingsChanges, { enabled: false }); yield* Deferred.succeed(releaseSettingsCheck, undefined); @@ -180,13 +203,75 @@ describe("makeManagedServerProvider", () => { const updates = Array.from(yield* Fiber.join(updatesFiber)); const latest = yield* provider.getSnapshot; - assert.deepStrictEqual(updates, [refreshedSnapshot, refreshedSnapshotSecond]); + assert.deepStrictEqual(updates, [refreshedSnapshotSecond]); assert.deepStrictEqual(latest, refreshedSnapshotSecond); - assert.strictEqual(yield* Ref.get(checkCalls), 2); + assert.strictEqual(yield* Ref.get(checkCalls), 1); }), ), ); + it.effect("ignores streamed settings updates that do not change provider settings", () => + Effect.scoped( + Effect.gen(function* () { + const settingsRef = yield* Ref.make({ enabled: true }); + const settingsChanges = yield* PubSub.unbounded(); + const checkCalls = yield* Ref.make(0); + const provider = yield* makeManagedServerProvider({ + maintenanceCapabilities, + getSettings: Ref.get(settingsRef), + streamSettings: Stream.fromPubSub(settingsChanges), + haveSettingsChanged: (previous, next) => previous.enabled !== next.enabled, + initialSnapshot: () => Effect.succeed(initialSnapshot), + checkProvider: Ref.update(checkCalls, (count) => count + 1).pipe( + Effect.as(refreshedSnapshot), + ), + refreshInterval: "1 hour", + }); + + yield* PubSub.publish(settingsChanges, { enabled: true }); + yield* Effect.yieldNow; + + assert.deepStrictEqual(yield* provider.getSnapshot, initialSnapshot); + assert.strictEqual(yield* Ref.get(checkCalls), 0); + }), + ), + ); + + it.effect("still refreshes on the configured periodic interval", () => + Effect.scoped( + Effect.gen(function* () { + const checkCalls = yield* Ref.make(0); + const provider = yield* makeManagedServerProvider({ + maintenanceCapabilities, + getSettings: Effect.succeed({ enabled: true }), + streamSettings: Stream.empty, + haveSettingsChanged: (previous, next) => previous.enabled !== next.enabled, + initialSnapshot: () => Effect.succeed(initialSnapshot), + checkProvider: Ref.updateAndGet(checkCalls, (count) => count + 1).pipe( + Effect.map((count) => ({ + ...refreshedSnapshot, + checkedAt: `2026-04-10T00:00:0${count}.000Z`, + })), + ), + refreshInterval: "1 minute", + }); + + const updatesFiber = yield* Stream.take(provider.streamChanges, 1).pipe( + Stream.runCollect, + Effect.forkChild, + ); + + yield* TestClock.adjust("1 minute"); + yield* Effect.yieldNow; + + const updates = Array.from(yield* Fiber.join(updatesFiber)); + assert.deepStrictEqual(updates, [refreshedSnapshot]); + assert.deepStrictEqual(yield* provider.getSnapshot, refreshedSnapshot); + assert.strictEqual(yield* Ref.get(checkCalls), 1); + }), + ).pipe(Effect.provide(TestClock.layer())), + ); + it.effect("streams supplemental snapshot updates after the base provider check completes", () => Effect.scoped( Effect.gen(function* () { @@ -210,9 +295,11 @@ describe("makeManagedServerProvider", () => { Stream.runCollect, Effect.forkChild, ); + const refreshFiber = yield* provider.refresh.pipe(Effect.forkChild); yield* Effect.yieldNow; yield* Deferred.succeed(releaseCheck, undefined); + yield* Fiber.join(refreshFiber); yield* Deferred.succeed(releaseEnrichment, undefined); @@ -262,10 +349,12 @@ describe("makeManagedServerProvider", () => { Stream.runCollect, Effect.forkChild, ); + const firstRefreshFiber = yield* provider.refresh.pipe(Effect.forkChild); yield* Effect.yieldNow; yield* Deferred.succeed(allowFirstRefresh, undefined); yield* Deferred.await(firstCallbackReady); + yield* Fiber.join(firstRefreshFiber); yield* provider.refresh; yield* Deferred.await(secondCallbackReady); diff --git a/apps/server/src/provider/makeManagedServerProvider.ts b/apps/server/src/provider/makeManagedServerProvider.ts index 2f07c5d508c..61fb2376512 100644 --- a/apps/server/src/provider/makeManagedServerProvider.ts +++ b/apps/server/src/provider/makeManagedServerProvider.ts @@ -145,11 +145,6 @@ export const makeManagedServerProvider = Effect.fn("makeManagedServerProvider")( ), ).pipe(Effect.forkScoped); - yield* applySnapshot(initialSettings, { forceRefresh: true }).pipe( - Effect.ignoreCause({ log: true }), - Effect.forkScoped, - ); - return { maintenanceCapabilities: input.maintenanceCapabilities, getSnapshot: input.getSettings.pipe( diff --git a/apps/server/src/provider/providerMaintenanceRunner.test.ts b/apps/server/src/provider/providerMaintenanceRunner.test.ts index 5f5f975a4e3..45f201f5948 100644 --- a/apps/server/src/provider/providerMaintenanceRunner.test.ts +++ b/apps/server/src/provider/providerMaintenanceRunner.test.ts @@ -188,6 +188,7 @@ function makeRegistry( getProviderMaintenanceCapabilitiesForInstance: (_instanceId, provider) => Effect.succeed(lifecycleFor(provider)), setProviderMaintenanceActionState, + startBackgroundRefreshes: Effect.void, streamChanges: Stream.empty, }; diff --git a/apps/server/src/serverRuntimeStartup.ts b/apps/server/src/serverRuntimeStartup.ts index 9ec536105c8..a62f4f33656 100644 --- a/apps/server/src/serverRuntimeStartup.ts +++ b/apps/server/src/serverRuntimeStartup.ts @@ -32,6 +32,7 @@ import { ServerSettingsService } from "./serverSettings.ts"; import { ServerEnvironment } from "./environment/Services/ServerEnvironment.ts"; import { AnalyticsService } from "./telemetry/Services/AnalyticsService.ts"; import { ServerAuth } from "./auth/Services/ServerAuth.ts"; +import { ProviderRegistry } from "./provider/Services/ProviderRegistry.ts"; import { ProviderSessionReaper } from "./provider/Services/ProviderSessionReaper.ts"; import { formatHeadlessServeOutput, @@ -283,6 +284,7 @@ export const makeServerRuntimeStartup = Effect.gen(function* () { const keybindings = yield* Keybindings; const orchestrationReactor = yield* OrchestrationReactor; const providerSessionReaper = yield* ProviderSessionReaper; + const providerRegistry = yield* ProviderRegistry; const lifecycleEvents = yield* ServerLifecycleEvents; const serverSettings = yield* ServerSettingsService; const serverEnvironment = yield* ServerEnvironment; @@ -427,7 +429,6 @@ export const makeServerRuntimeStartup = Effect.gen(function* () { }, }), ); - yield* Effect.logDebug("startup phase: recording startup heartbeat"); yield* launchStartupHeartbeat; if (serverConfig.startupPresentation === "headless") { @@ -447,6 +448,16 @@ export const makeServerRuntimeStartup = Effect.gen(function* () { } yield* runStartupPhase("browser.open", maybeOpenBrowser(startupBrowserTarget)); } + yield* Effect.logDebug("startup phase: starting provider refreshes"); + yield* runStartupPhase( + "providers.refresh.start", + providerRegistry.startBackgroundRefreshes.pipe( + Effect.catchCause((cause) => + Effect.logWarning("provider startup refresh failed", { cause }), + ), + Effect.forkScoped, + ), + ); yield* Effect.logDebug("startup phase: complete"); }), ); diff --git a/package.json b/package.json index a1aa5d0b1cd..ca9ab4af438 100644 --- a/package.json +++ b/package.json @@ -44,6 +44,7 @@ "typecheck": "turbo run typecheck", "lint": "oxlint --report-unused-disable-directives", "test": "turbo run test", + "bench:startup": "bun scripts/bench-startup.js", "test:desktop-smoke": "turbo run smoke-test --filter=@t3tools/desktop", "fmt": "oxfmt", "fmt:check": "oxfmt --check", diff --git a/scripts/bench-startup.js b/scripts/bench-startup.js new file mode 100644 index 00000000000..9c597d4253e --- /dev/null +++ b/scripts/bench-startup.js @@ -0,0 +1,572 @@ +import { spawn, spawnSync } from "node:child_process"; +import { createReadStream, existsSync, mkdirSync, rmSync } from "node:fs"; +import { dirname, join, resolve } from "node:path"; +import { createInterface } from "node:readline"; +import { fileURLToPath, pathToFileURL } from "node:url"; +import { performance } from "node:perf_hooks"; + +const RESULT_PREFIX = "T3_STARTUP_BENCH_RESULT "; +const NOOP = () => undefined; + +function repoRoot() { + return resolve(dirname(fileURLToPath(import.meta.url)), ".."); +} + +function parseArgs(argv) { + const config = { + ref: "HEAD", + projectCount: 100, + threadsPerProject: 100, + activitiesPerThread: 0, + seedMode: "projected", + serverReadyOnly: false, + keep: false, + }; + const mutable = { ...config }; + for (let index = 0; index < argv.length; index += 1) { + const arg = argv[index]; + const next = argv[index + 1]; + switch (arg) { + case "--ref": + mutable.ref = requireValue(arg, next); + index += 1; + break; + case "--project-count": + mutable.projectCount = parsePositiveInt(arg, requireValue(arg, next)); + index += 1; + break; + case "--threads-per-project": + mutable.threadsPerProject = parsePositiveInt(arg, requireValue(arg, next)); + index += 1; + break; + case "--activities-per-thread": + mutable.activitiesPerThread = parseNonNegativeInt(arg, requireValue(arg, next)); + index += 1; + break; + case "--seed-mode": + mutable.seedMode = requireValue(arg, next); + if (mutable.seedMode !== "events-only" && mutable.seedMode !== "projected") { + throw new Error("--seed-mode must be events-only or projected."); + } + index += 1; + break; + case "--server-ready-only": + mutable.serverReadyOnly = true; + break; + case "--keep": + mutable.keep = true; + break; + case "--help": + case "-h": + printUsage(); + process.exit(0); + default: + throw new Error(`Unknown argument: ${arg}`); + } + } + return mutable; +} + +function printUsage() { + console.log(`Usage: bun scripts/bench-startup.js [options] + +Options: + --ref