Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 275 additions & 8 deletions apps/server/src/provider/Layers/ProviderRegistry.test.ts

Large diffs are not rendered by default.

79 changes: 53 additions & 26 deletions apps/server/src/provider/Layers/ProviderRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import * as PubSub from "effect/PubSub";
import * as Ref from "effect/Ref";
import * as Stream from "effect/Stream";
import * as Semaphore from "effect/Semaphore";
import * as Scope from "effect/Scope";

import { ServerConfig } from "../../config.ts";
import { ProviderInstanceRegistry } from "../Services/ProviderInstanceRegistry.ts";
Expand Down Expand Up @@ -189,6 +190,7 @@ export const ProviderRegistryLive = Layer.effect(
Effect.gen(function* () {
const instanceRegistry = yield* ProviderInstanceRegistry;
const config = yield* ServerConfig;
const providerRegistryScope = yield* Scope.Scope;
const fileSystem = yield* FileSystem.FileSystem;
const path = yield* Path.Path;

Expand Down Expand Up @@ -278,6 +280,7 @@ export const ProviderRegistryLive = Layer.effect(
const liveSubsRef = yield* Ref.make<ReadonlyMap<ProviderInstanceId, ProviderInstance>>(
new Map(),
);
const isServerReadyRef = yield* Ref.make(false);
// Serialize `syncLiveSources` so a rapid burst of reconciles doesn't
// interleave two passes clobbering each other's fiber bookkeeping.
const syncSemaphore = yield* Semaphore.make(1);
Expand Down Expand Up @@ -438,6 +441,35 @@ export const ProviderRegistryLive = Layer.effect(
),
);
});
const refreshOneLiveSource = Effect.fn("refreshOneLiveSource")(function* (
providerSource: ProviderSnapshotSource,
instance: ProviderInstance,
) {
const nextProvider = yield* providerSource.refresh;
const liveSubs = yield* Ref.get(liveSubsRef);
if (liveSubs.get(providerSource.instanceId) !== instance) {
return yield* Ref.get(providersRef);
}
return yield* correlateSnapshotWithSource(providerSource, nextProvider).pipe(
Effect.flatMap(syncProvider),
);
});
const forkRefreshForLiveInstance = (instance: ProviderInstance) =>
refreshOneLiveSource(buildSnapshotSource(instance), instance).pipe(
Effect.ignoreCause({ log: true }),
Effect.forkIn(providerRegistryScope),
);
const refreshCurrentLiveSources = Effect.fn("refreshCurrentLiveSources")(function* () {
const liveSubs = yield* Ref.get(liveSubsRef);
yield* Effect.forEach(
Array.from(liveSubs.values()),
(instance) => forkRefreshForLiveInstance(instance),
{
concurrency: "unbounded",
discard: true,
},
);
});

const refreshAll = Effect.fn("refreshAll")(function* () {
const sources = yield* getLiveSources;
Expand Down Expand Up @@ -492,16 +524,16 @@ export const ProviderRegistryLive = Layer.effect(
* - subscribe to each newly-added or rebuilt instance's
* `streamChanges` (so periodic + enrichment refreshes land in
* `providersRef`);
* - force-refresh each newly-added/rebuilt instance and feed the
* result directly into `providersRef`, bypassing the PubSub
* attachment race that otherwise drops the initial probe;
* - after the server is ready, start a force-refresh for each
* newly-added/rebuilt instance and feed the result into
* `providersRef` when it completes;
* - prune `providersRef` of instances that no longer exist.
*
* Initial refreshes are awaited in parallel rather than forked, so
* callers (layer build; `streamChanges` watcher) see fully-probed
* state on return. This matters for layer build in particular:
* consumers reading `getProviders` immediately after layer build
* expect the probe to have already landed.
* Boot refreshes wait until startup explicitly enables background
* refreshes, and every refresh is forked instead of awaited, so server
* readiness is not blocked on provider CLIs. Consumers first see cached
* or pending fallback snapshots, then receive provider updates through
* `streamChanges` as checks complete.
*
* Per-instance subscription fibers are not tracked explicitly. When
* a rebuilt instance's old child scope closes, its PubSub shuts
Expand Down Expand Up @@ -543,29 +575,15 @@ export const ProviderRegistryLive = Layer.effect(
}

// Fork long-lived subscriptions to each new/rebuilt instance's
// change stream BEFORE kicking off refreshes — if the driver's
// own initial probe (line 140 in `makeManagedServerProvider`)
// wins the refreshSemaphore race, its PubSub publish must land
// in an active subscriber or the result is dropped.
// change stream BEFORE kicking off refreshes so every published
// provider update has an active subscriber.
for (const [, instance] of newlyAdded) {
const source = buildSnapshotSource(instance);
yield* Stream.runForEach(source.streamChanges, (provider) =>
correlateSnapshotWithSource(source, provider).pipe(Effect.flatMap(syncProvider)),
).pipe(Effect.forkScoped);
}

// Force-refresh every new/rebuilt instance in parallel and wait
// for them all to complete. The refresh's result is piped
// directly into `syncProvider`, so `providersRef` is populated
// deterministically by the time this block returns — regardless
// of PubSub subscription timing. Failures are logged and
// swallowed so one bad driver can't wedge the whole registry.
yield* Effect.forEach(
newlyAdded,
([, instance]) =>
refreshOneSource(buildSnapshotSource(instance)).pipe(Effect.ignoreCause({ log: true })),
{ concurrency: "unbounded", discard: true },
);
yield* upsertProviders(unavailableProviders, {
persist: false,
replace: true,
Expand Down Expand Up @@ -602,6 +620,12 @@ export const ProviderRegistryLive = Layer.effect(
}
return next;
});

if (yield* Ref.get(isServerReadyRef)) {
for (const [, instance] of newlyAdded) {
yield* forkRefreshForLiveInstance(instance);
}
}
}),
);
const syncLiveSourcesAndContinue = syncLiveSources.pipe(
Expand Down Expand Up @@ -655,8 +679,8 @@ export const ProviderRegistryLive = Layer.effect(
// instance never propagate to the aggregator's `providersRef`.)
const instanceChanges = yield* instanceRegistry.subscribeChanges;
// Initial sync: subscribe + kick off refreshes for every instance
// present at boot. Run synchronously so consumers pulling immediately
// after the layer build see the correct aggregator state.
// present at boot. Provider checks run in the background so layer
// build can complete once fallback/cached snapshots are available.
yield* syncLiveSources;
// React to registry mutations — instance added / removed / rebuilt.
// `Stream.fromSubscription` builds a stream over the pre-acquired
Expand Down Expand Up @@ -687,6 +711,9 @@ export const ProviderRegistryLive = Layer.effect(
refreshInstance(instanceId).pipe(Effect.catchCause(recoverRefreshFailure)),
getProviderMaintenanceCapabilitiesForInstance,
setProviderMaintenanceActionState,
startBackgroundRefreshes: Ref.set(isServerReadyRef, true).pipe(
Effect.andThen(refreshCurrentLiveSources),
),
get streamChanges() {
return Stream.fromPubSub(changesPubSub);
},
Expand Down
10 changes: 10 additions & 0 deletions apps/server/src/provider/Services/ProviderRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ export interface ProviderRegistryShape {
readonly state: ServerProviderUpdateState | null;
}) => Effect.Effect<ReadonlyArray<ServerProvider>>;

/**
* Enable and start lifecycle-owned background refreshes.
*
* Server startup calls this after command readiness and the ready
* lifecycle event, so provider CLI probes cannot block the server from
* accepting commands. Manual `refresh` and `refreshInstance` remain
* available before this point.
*/
readonly startBackgroundRefreshes: Effect.Effect<void>;

/**
* Stream of provider snapshot updates — one emission per aggregated
* change. The array contains the full current state.
Expand Down
Loading
Loading