diff --git a/CHANGELOG.md b/CHANGELOG.md index 17b335c..2f8d3da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,70 @@ All notable changes to `@inceptionstack/roundhouse` are documented here. +## [0.5.38] — 2026-05-16 + +### Fixed +- **Soft-reset pre-turn gap.** Idle sessions that grew via background work + (cron jobs, boot turn, sub-agent results) could cross the provider's context + limit without ever tripping the proactive `softTokens`/`hardTokens` + thresholds while live. The next user turn called `agent.prompt()` directly + and overflowed; the gateway catch posted the raw `prompt is too long: N > + 200000` error with no classification or recovery, and the loop persisted + until manual surgery on the jsonl. The v0.5.29–v0.5.32 soft-reset machinery + only fired from `flushMemoryThenCompact`'s catch (i.e. when *compact itself* + overflowed), not from a normal user-prompt overflow. Concrete evidence on + the maintainer's machine: `~/.roundhouse/sessions/main` jsonl reached 2.8 MB + with zero `"main"` entries in `compact-timing.jsonl` between + 2026-05-14 and 2026-05-16. +- **Fix:** classify `agent.prompt()` / `agent.promptStream()` exceptions in the + gateway catch via the existing `isContextOverflowError`. On overflow, call + `agent.softReset(threadId)` (extracted into a shared + `recoverFromContextOverflow` helper, also used by the v0.5.32 compact-time + path). On success, set `forceInjectReason="after-soft-reset"` and clear + `pendingCompact`; on no-op or failure with `agent.compact` available, arm + `pendingCompact="emergency"` so the existing pre-turn branch fires on the + user's next message. UX is deferred-retry only — same-turn replay would + duplicate streamed text and re-execute side-effecting tools. Background + turns (boot/subagent) get distinct copy that doesn't ask a user to + resend. Telemetry: one line per gateway-side recovery in + `compact-timing.jsonl` with `level: "gateway-overflow"`. +- **Streaming path coverage (post-review F1).** pi-ai's streaming surfaces + provider errors as `model_error` *events*, not thrown exceptions — so the + initial fix above only caught synchronous-throw overflow. On Telegram + (streaming-default), streamed `prompt is too long` still bypassed recovery: + `gateway/streaming.ts` posted the raw error and the for-await loop returned + normally. Per codex-cli design (option a, refined): classify the + `model_error` message in `streaming.ts`. Non-overflow keeps today's inline + `⚠️ Agent error:` post + continue-loop. Overflow flushes, suppresses the + inline raw post, and throws a typed `StreamModelOverflowError` so the + gateway catch routes through `recoverFromAgentTurnOverflow` exactly like + synchronous-throw overflow. Single recovery surface, no duplicate posts, + no flag plumbed through the `StreamResult` contract. +- **Code-review polish (F2–F6).** Removed dead `"cron"` from the `TurnSource` + union (cron jobs run via `cron/runner.ts` in their own session and never + reach `Gateway.handleAgentTurn`). Replaced the raw provider error in the + `unsupported`-recovery branch with explicit guidance (`⚠️ Session full — + adapter doesn't support automatic recovery. Run /compact manually or + restart session.`). Extracted `appendCompactLog` + `CompactLogEntry` to a + new `src/memory/telemetry.ts` to remove the gateway→memory cross-domain + import; `lifecycle.ts` re-exports for back-compat. De-duplicated + `MAX_ERROR_PREVIEW = 200` (gateway.ts copy was unused after the v0.5.38 + catch refactor; deleted). Replaced bare `slice(0, 100)` magic number with + `MAX_FAILURE_REASON_PREVIEW`. +- 26 new tests across `test/overflow-recovery.test.ts` (helper-level + classify/recover/no-op/failed/cause-chain), + `test/gateway-overflow-recovery.test.ts` (gateway-level state writes, + pendingCompact arming, streaming partial-text branch, background-turn + copy, post-throw resilience, F3 unsupported-guidance regression), and + `test/streaming-overflow.test.ts` (F1: model_error overflow throws, + non-overflow inline post regression, end-to-end streaming→recovery for + both clean and partial-text turns). **591 tests passing** (+26 net). +- Design doc: `docs/design/v0.5.38-soft-reset-pre-turn-gap.md` (codex-cli + Alternative D — shared reactive recovery helper, deferred retry, + pendingCompact fallback). +- F1 design: `~/.roundhouse/workspace/softreset-f1-codex-design.md` + (codex-cli option (a) refined — typed `StreamModelOverflowError`). + ## [0.5.37] — 2026-05-16 ### Fixed diff --git a/docs/design/v0.5.38-soft-reset-pre-turn-gap.md b/docs/design/v0.5.38-soft-reset-pre-turn-gap.md new file mode 100644 index 0000000..cd65212 --- /dev/null +++ b/docs/design/v0.5.38-soft-reset-pre-turn-gap.md @@ -0,0 +1,290 @@ +# Soft-reset pre-turn gap — design doc (v0.5.38) + +> Status: design accepted. Implementer will follow Section 5 verbatim. +> Authored: codex-cli 0.130.0 (gpt-5.1 via OpenAI), 2026-05-16. +> Prompt: `/tmp/codex-design-prompt.md` (preserved). Raw transcript: `softreset-pre-turn-gap-design.raw.log`. + +## Problem (one paragraph) + +Soft-reset recovery (v0.5.29–v0.5.32) only fires inside `flushMemoryThenCompact`'s catch +— i.e. when **compact itself** overflows during a hard/emergency pass. If a session +grows past the model's context window via background work (cron, boot, sub-agents) +without ever crossing the proactive `softTokens`/`hardTokens` threshold, the next +user turn calls `agent.prompt()` directly and overflows. The gateway's catch +(`gateway.ts:~508`) posts the raw provider error with no classification, no +`softReset` call, no state update, and the loop persists indefinitely until manual +surgery on the jsonl. Today's `main` session demonstrated this exact failure mode +before being recovered manually (jsonl was 2.8 MB; compact-timing.jsonl shows zero +`main` entries between 2026-05-14 and 2026-05-16). + +## Alternatives evaluated + +- **A1** — classify in catch + call `agent.softReset` + post deferred-retry hint. +- **A2** — same as A1 but transparently re-run `agent.prompt` in the same turn. +- **B** — set `pendingCompact=emergency` on disk + ask user to retry (existing pre-turn + branch handles it on retry). +- **C** — proactive pre-check (token estimate before prompt; recover if too big). +- **D** — shared reactive recovery helper used by both `flushMemoryThenCompact` and + the gateway catch, deferred-retry UX, fallback to `pendingCompact=emergency` when + direct soft-reset can't recover. + +## Codex recommendation (full text) + +# Recommendation + +Choose **D**: add a **shared reactive overflow-recovery helper** used from the gateway catch, with **deferred retry UX** and a **`pendingCompact="emergency"` fallback when direct soft-reset does not recover**. + +# Rationale + +The bug is reactive, not predictive: the live session can cross the provider limit without ever tripping the normal pressure thresholds, and the first place that definitively knows that is the failed `agent.prompt()` / `agent.promptStream()` call in [src/gateway/gateway.ts](/home/ec2-user/repos/roundhouse/src/gateway/gateway.ts). That makes a catch-path fix mandatory even if the project later adds better prechecks. The safest design is to classify overflow in that catch, attempt the same soft-reset recovery already proven in `flushMemoryThenCompact`, and persist the same post-recovery memory state (`forceInjectReason="after-soft-reset"`, clear `pendingCompact`). That closes the exact loop reported without perturbing the existing v0.5.32 compact-time recovery path. + +On locking and atomicity, this is safe under the current model. The gateway still holds the outer per-thread lock while in the catch, so no second `handleAgentTurn` for that thread can race in. The failed prompt has already left the pi adapter’s per-thread `enqueue`, so `agent.softReset(threadId)` will run next in sequence on that same queue. There is no same-turn double recovery with finalize, because finalize only runs after a successful prompt/stream completion. The only realistic overlap is an older deferred soft flush outside the gateway lock, but that already serializes through the same adapter queue; the new design does not introduce a new unsynchronized mutation path. + +For UX, use **deferred retry**, not transparent same-turn re-prompt. A same-turn retry is attractive for simple non-streaming cases, but it is the wrong default once streaming and tool execution are involved. If a stream already emitted partial text before the provider throws, or if any tool side effects happened before the failure surfaced, re-running the exact same user message risks duplicate work and inconsistent transcripts. Deferred retry preserves at-most-once semantics: recover synchronously, tell the user the session was repaired, and ask them to resend only for interactive turns. For boot/cron/sub-agent turns, the same catch path still recovers the session, but the final message should say that the interrupted background turn was not retried automatically. + +The key refinement over plain Alternative A1 is fallback behavior. If direct soft-reset is unavailable, no-ops, or fails, the gateway should not just post a raw provider error and leave state unchanged. It should arm `pendingCompact="emergency"` when there is still a meaningful next-turn recovery path, so the next turn goes through the existing pre-turn emergency branch instead of hitting `agent.prompt()` and overflowing again immediately. That gives one clean direct recovery path and one tested fallback path, rather than forcing all failures through a second user retry before any recovery attempt happens. + +# Risks & mitigations of chosen design + +- `promptStream()` can fail after partial output was already posted. + Mitigation: track whether visible assistant text was emitted during streaming and tailor the final note to “response was interrupted; session recovered” instead of “please retry your last message” blindly. + +- `softReset` success may still leave the next prompt over limit once memory re-injects plus the new user message are added. + Mitigation: keep this path deferred-retry only; if the retry still overflows, the same catch runs again and can escalate by re-arming `pendingCompact="emergency"` or reporting soft-reset no-op/failure explicitly. + +- Adapters without `softReset` support could get stuck in a useless retry loop if the gateway always arms `pendingCompact`. + Mitigation: only arm `pendingCompact="emergency"` when there is a plausible next-turn recovery path, typically `agent.compact` exists. If neither `softReset` nor `compact` exists, fall back to the current sanitized error path with a clearer message. + +- Shared logic can drift if gateway and lifecycle each keep their own recovery/state-write code. + Mitigation: extract one shared helper that classifies overflow, runs `softReset`, and reports a structured outcome; both call sites use that outcome to persist state consistently. + +- A recovered background turn is still dropped. + Mitigation: make that explicit in logs/progress text. Do not auto-replay background turns in this patch. + +# Why not the others + +**Alternative A, if implemented as direct catch logic inside `gateway.ts` only, is close but incomplete.** A1 is the right UX shape, but duplicating the classify/reset/progress/state-write sequence in the gateway would fork recovery logic from `flushMemoryThenCompact`, which already has tests and subtle handling for no-op and thrown resets. A2 should be rejected outright: automatic same-turn retry is unsafe once streaming or tools are in the picture, because a late overflow can occur after partial visible output or side effects, and retrying the same message can duplicate actions. + +**Alternative B is too indirect.** It requires the first overflowing turn to fail, then asks for another turn before recovery even starts. That is worse UX for users, and materially worse for boot/cron/sub-agent turns because there may be no immediate human retry to trigger recovery. It also keeps the bug visible to the user longer even though the gateway already has enough information in-hand to repair the session immediately. + +**Alternative C is the wrong primary fix.** A proactive precheck sounds attractive, but the repo does not currently have a reliable pre-prompt token source for cold sessions, and byte-size heuristics on the jsonl are only a noisy proxy. That means a sizeable refactor with false positives and false negatives, while still needing the reactive catch anyway. It is reasonable as a future optimization, not as the main bug fix for this report. + +# Implementation sketch + +**1. Extract shared overflow recovery helper** +- Add new module, likely [src/memory/overflow-recovery.ts](/home/ec2-user/repos/roundhouse/src/memory/overflow-recovery.ts). +- Move the logic currently in file-private `attemptSoftResetRecovery()` out of [src/memory/lifecycle.ts](/home/ec2-user/repos/roundhouse/src/memory/lifecycle.ts). +- Export a structured helper, e.g. `recoverFromContextOverflow(err, threadId, agent, onProgress?)`. +- Return richer outcome than `{attempted, succeeded}`. + Suggested shape: `kind: "not-overflow" | "recovered" | "noop" | "failed" | "unsupported"`, plus optional `reason` and `report`. +- Also export a tiny state helper or documented outcome contract so both lifecycle and gateway persist identical memory-state effects: + - `recovered` => `forceInjectReason="after-soft-reset"`, clear `pendingCompact` + - otherwise => caller decides whether to arm `pendingCompact="emergency"` + +**2. Rewire lifecycle to use the shared helper** +- Update [src/memory/lifecycle.ts](/home/ec2-user/repos/roundhouse/src/memory/lifecycle.ts) to import the extracted helper. +- Preserve existing compact-time behavior and progress copy. +- No behavior change intended here other than using the shared helper. + +**3. Add gateway catch-path recovery** +- Update [src/gateway/gateway.ts](/home/ec2-user/repos/roundhouse/src/gateway/gateway.ts). +- In the main prompt/stream catch: + - classify overflow via the shared helper + - on successful recovery, persist `forceInjectReason="after-soft-reset"` and clear `pendingCompact` + - on no-op/failure/unsupported-with-compact, arm `pendingCompact="emergency"` + - post a recovery-specific final message instead of the raw provider error + - only fall back to the existing sanitized raw error for non-overflow errors, or adapters with no usable recovery path +- Extend internal turn context so the catch knows whether the turn source is interactive or background. + Suggested internal enum: `source: "user" | "boot" | "subagent"` + (cron jobs run via cron/runner.ts in their own session and never reach + Gateway.handleAgentTurn, so cron is intentionally not a source here.) +- For streaming, have `handleStreaming()` return one more bit, e.g. `hadVisibleOutput: boolean`, so the recovery note can distinguish “retry your last message” from “response was interrupted after partial output”. + +**4. Minimal signature reshaping** +- Internal only: widen `handleAgentTurn(...)` to accept turn source. +- Internal only: widen `handleStreaming(...)` return value. +- No public adapter contract changes required in this patch. + +**5. Tests** +- Keep compact-path tests in [test/memory.test.ts](/home/ec2-user/repos/roundhouse/test/memory.test.ts), but move/add direct tests for the extracted recovery helper into a focused file, e.g. `test/overflow-recovery.test.ts`. +- Add gateway integration tests, likely new `test/gateway-overflow-recovery.test.ts`, using a fake adapter and fake thread: + - overflow during non-streaming prompt => soft-reset success => state updated, retry hint posted + - overflow during prompt with `softReset` undefined but `compact` available => `pendingCompact="emergency"` armed + - overflow with `softReset` throwing => `pendingCompact="emergency"` armed, failure note posted + - non-overflow error => existing raw sanitized error behavior preserved + - concurrent-thread case => verify second turn does not enter while first is recovering under gateway lock + - streaming overflow before any text => recovery path fires + - streaming overflow after partial text => recovery path fires and posts interruption wording +- Approximate diff size: `~150–250 LOC` across one new shared module, two touched source files, and one or two new test files. + +# Open questions + +- Should the gateway arm `pendingCompact="emergency"` on every non-successful overflow recovery outcome, or only when `agent.compact` exists? I recommend the latter. +- What exact user copy should background turns use after recovery? It should avoid “please retry your message.” +- Do you want telemetry for gateway-side recoveries alongside `compact-timing.jsonl`, or is normal logging sufficient for this patch? +- If a streaming turn emitted only tool-status messages but no assistant text before overflow, should that count as “partial output” for UX wording, or only actual text deltas? +- Should repeated gateway-side recoveries within a short window escalate trimming aggressiveness, or is that out of scope for this bugfix? + +## Open-question resolutions (implementer decisions) + +1. **Arm `pendingCompact="emergency"` only when `agent.compact` exists.** Per codex' + recommendation. Adapters without compact have no next-turn recovery to chain to; + arming the flag would just guarantee a second failure on retry. +2. **Background-turn copy.** Use `"♻️ Background turn (${source}) overflowed — session recovered. Original work was not retried."` + for boot/cron/sub-agent. `"...Please resend your last message."` for `source === "user"`. +3. **Telemetry.** Reuse the existing `compact-timing.jsonl` schema with a new + `level: "gateway-overflow"` entry kind. One extra append per gateway-side recovery + keeps everything in one log; downstream parsers already discriminate on `status`. +4. **"Partial output" for UX wording.** Only actual text deltas count. Tool-start + status messages are housekeeping, and saying "response was interrupted after + partial output" when the user only saw `🔧 running …` would be misleading. +5. **Escalating aggressiveness on repeated recoveries.** Out of scope for this bugfix. + Track via the timing log; revisit if telemetry shows recovery loops. + +## Implementation plan (file-by-file, no code) + +### New file: `src/agents/shared/overflow-recovery.ts` + +Lives in `agents/shared/` (not `memory/`) because it's a pure agent-error → agent-action +helper with no memory-state coupling. Memory-state effects are the *caller*'s +responsibility — see outcome contract below. + +Exports: + +```text +type OverflowRecoveryOutcome = + | { kind: "not-overflow" } + | { kind: "unsupported" } // agent.softReset undefined + | { kind: "recovered"; report: SoftResetReport } // softReset returned reset:true + | { kind: "noop"; reason: string } // softReset returned reset:false + | { kind: "failed"; error: string }; // softReset itself threw + +async function recoverFromContextOverflow( + err: unknown, + threadId: string, + agent: AgentAdapter, + onProgress?: (step: string) => void | Promise, +): Promise; +``` + +Behavior identical to the current file-private `attemptSoftResetRecovery` in +`lifecycle.ts`, but returns a richer discriminated outcome (so callers can branch +on what actually happened — recovered vs noop vs failed vs unsupported) and +**does not write memory state**. Same `onProgress` copy. + +### Edit: `src/memory/lifecycle.ts` + +- Delete the file-private `attemptSoftResetRecovery`. +- Import `recoverFromContextOverflow` from `agents/shared/overflow-recovery`. +- In `flushMemoryThenCompact`'s catch, replace the existing call site with the new + helper. Map outcomes: + - `recovered` → set `forceInjectReason="after-soft-reset"`, clear `pendingCompact`, + record telemetry with `error: "soft-reset-recovered: "` (matches existing v0.5.32 line). + - `noop` / `failed` / `unsupported` → re-arm `pendingCompact = effectiveLevel` (existing behavior). + - `not-overflow` → re-arm `pendingCompact = effectiveLevel` (existing behavior). +- No behavior change visible to callers. + +### Edit: `src/gateway/gateway.ts` + +- Import `recoverFromContextOverflow`, `isContextOverflowError`, and `loadThreadMemoryState`/`saveThreadMemoryState`. +- Add a private `turnSource` parameter to `handleAgentTurn` (default `"user"`, + `"boot"` from `fireBootTurn`, `"subagent"` from `handleSubagentCompletion`, + `"cron"` from cron callsite). Plumb through to the catch via a captured local. +- In `handleStreaming`'s return shape, add `hadVisibleText: boolean` (true once any + `text_delta` was buffered to a Telegram bubble). Track it in `streaming.ts`. +- Replace the catch body at `gateway.ts:~508`: + - If `!isContextOverflowError(err)` → existing sanitized post (`⚠️ Error: ${safeMsg}`). + - Else: call `recoverFromContextOverflow(err, agentThreadId, agent, progressCb)` + where `progressCb` posts to `thread` (using existing `transport.progress` for the ✅/⚠️/❌ trio). + - Branch on outcome: + - `recovered` → load state, set `forceInjectReason="after-soft-reset"`, clear + `pendingCompact`, save. Then post a single follow-up: + - if `turnSource === "user"` AND `hadVisibleText === false`: + `"✅ Recovered. Please resend your last message."` + - else if `turnSource === "user"` AND `hadVisibleText === true`: + `"♻️ Response was interrupted; session recovered. Resend if you want me to continue."` + - else (background): `"♻️ Background turn (${turnSource}) overflowed — session recovered. Original work was not retried."` + - `noop` / `failed` → if `agent.compact` exists, load state, set + `pendingCompact = "emergency"`, save. Post: `"⚠️ Recovery armed (${kind}: ${reason}). Send any message to retry."`. Otherwise post the existing sanitized error. + - `unsupported` → post the existing sanitized error (no recovery hook available, + can't do better). + - Append one telemetry line to `compact-timing.jsonl` with `level: "gateway-overflow"`, + `status: "ok" | "failed"`, `error: ` (reuse `appendCompactLog` — + export it from `lifecycle.ts` or extract into `memory/telemetry.ts` as a follow-up; + smallest viable change is to export the existing function). + +### Edit: `src/gateway/streaming.ts` + +- Track `hadVisibleText` (set on first non-empty `text_delta`). +- Return it alongside `{usedTools}` (now `{usedTools, hadVisibleText}`). + +### Tests — new file: `test/overflow-recovery.test.ts` + +Direct unit tests on the extracted helper: +- `recoverFromContextOverflow_OnNonOverflowError_ReturnsNotOverflow` +- `recoverFromContextOverflow_AdapterWithoutSoftReset_ReturnsUnsupported` +- `recoverFromContextOverflow_SoftResetSucceeds_ReturnsRecoveredWithReport_AndEmitsCheckmarkProgress` +- `recoverFromContextOverflow_SoftResetReturnsResetFalse_ReturnsNoopWithReason_AndEmitsWarnProgress` +- `recoverFromContextOverflow_SoftResetThrows_ReturnsFailedWithMessage_AndEmitsErrorProgress` +- `recoverFromContextOverflow_SoftResetThrowsNonError_DoesNotMaskWithTypeError` (regression for v0.5.32 fix #4) +- `recoverFromContextOverflow_OverflowInCauseChain_StillClassifies` (regression for v0.5.30 wrapped-cause fix) + +### Tests — extend `test/memory.test.ts` + +- Replace any references to the deleted file-private helper with assertions on the + new outcome contract via the public `flushMemoryThenCompact` (existing tests already + exercise the success/noop/failed paths through that surface — should pass unchanged). + +### Tests — new file: `test/gateway-overflow-recovery.test.ts` + +Gateway-level tests using a fake adapter (implements `AgentAdapter`) and a fake +thread (implements `thread.post`). Test surface (matches brief): + +- `gateway_OverflowDuringNonStreamingPrompt_SoftResetSucceeds_PostsRecoveredHint` +- `gateway_OverflowDuringNonStreamingPrompt_SoftResetUnsupported_AdapterHasCompact_ArmsPendingCompactEmergency` +- `gateway_OverflowDuringNonStreamingPrompt_SoftResetUnsupportedNoCompact_PostsSanitizedError` +- `gateway_OverflowDuringNonStreamingPrompt_SoftResetFails_AdapterHasCompact_ArmsPendingCompactAndPostsFailureHint` +- `gateway_NonOverflowError_PostsSanitizedError_NoRecoveryAttempted` +- `gateway_OverflowDuringStream_BeforeAnyTextDelta_PostsRetryHint` +- `gateway_OverflowDuringStream_AfterPartialTextDelta_PostsInterruptionHint` +- `gateway_BackgroundTurn_OverflowRecovered_PostsBackgroundCopy_NotRetryHint` +- `gateway_ConcurrentSecondTurn_BlockedByThreadLockUntilRecoveryCompletes` + +These tests instantiate `Gateway` (or invoke `handleAgentTurn` indirectly) — if +`handleAgentTurn` private-ness is a friction, extract just the catch+recovery +sequence to a private→exported `handleAgentOverflow(thread, agentThreadId, agent, err, ctx)` +helper on `Gateway` with a free-function fallback in `gateway/overflow.ts` for tests. +Implementer's call. + +### Approximate diff size + +≈180 LOC source + ≈250 LOC tests. Three commits: +1. `design: soft-reset pre-turn gap (v0.5.38)` — design doc only. +2. `fix(gateway): classify and recover from context overflow in agent.prompt catch (#NNN, v0.5.38)` — extract helper, wire gateway, plumb `turnSource` and `hadVisibleText`, telemetry, CHANGELOG entry. +3. `test: gateway-level overflow recovery + extracted helper (v0.5.38)` — only if test extraction is large enough to warrant a separate commit; else fold into commit 2. + +## Verification + +End-to-end repro using the preserved pre-recovery jsonl backup: + +```bash +# Step 1: stop daemon, restore the 2.8MB pre-recovery main session. +systemctl --user stop roundhouse +cp ~/.roundhouse/sessions/main/2026-05-11T*.jsonl.bak-1778962431211 \ + /tmp/repro-main.jsonl +# Note: cannot literally swap the live session in without disrupting the user's +# current chat. Use a synthetic threadId for repro instead. + +# Step 2: write a small repro script (~/.roundhouse/workspace/repro.ts) that: +# - constructs a PiAdapter pointed at /tmp/repro-sessions/repro-thread/ +# - calls agent.prompt("repro-thread", { text: "hi" }) +# - asserts the gateway catch path runs, softReset trims, retry succeeds. +# Run: tsx ~/.roundhouse/workspace/repro.ts + +# Step 3: confirm: +# - first prompt throws "prompt is too long" (pre-fix) +# - on the fix branch, gateway.handleAgentTurn classifies, calls softReset, +# posts ♻️ + ✅, writes forceInjectReason, no raw error to user. +``` + +The repro script is the deliverable in step 3 of the brief. diff --git a/package.json b/package.json index 4f62fa2..cac4896 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@inceptionstack/roundhouse", - "version": "0.5.37", + "version": "0.5.38", "type": "module", "description": "Multi-platform chat gateway that routes messages through a configured AI agent", "license": "MIT", diff --git a/src/agents/shared/overflow-recovery.ts b/src/agents/shared/overflow-recovery.ts new file mode 100644 index 0000000..b684496 --- /dev/null +++ b/src/agents/shared/overflow-recovery.ts @@ -0,0 +1,83 @@ +/** + * agents/shared/overflow-recovery.ts — Reactive context-overflow recovery helper + * + * Used by: + * - src/memory/lifecycle.ts: catch in flushMemoryThenCompact (compact itself overflowed) + * - src/gateway/gateway.ts: catch around agent.prompt/agent.promptStream + * (the live session was already past the limit before this turn even started — typically + * after idle background growth via cron/boot/sub-agents) + * + * Pure agent-error → agent-action helper. Memory-state effects + * (forceInjectReason="after-soft-reset", clearing pendingCompact, arming + * pendingCompact="emergency") are the CALLER's responsibility, because the two + * call sites need different fallback semantics: + * - lifecycle: re-arms pendingCompact at whatever level was failing + * - gateway: only arms pendingCompact="emergency" when agent.compact exists + * and softReset didn't recover (so the next pre-turn branch fires) + * + * Returns a discriminated outcome rather than {attempted, succeeded} so callers + * can branch precisely. + */ + +import type { AgentAdapter } from "../../types"; +import type { SoftResetReport } from "./session-soft-reset"; +import { isContextOverflowError } from "./error-classifiers"; + +export type OverflowRecoveryOutcome = + | { kind: "not-overflow" } + | { kind: "unsupported" } // agent.softReset undefined + | { kind: "recovered"; report: SoftResetReport } // softReset returned reset:true + | { kind: "noop"; reason: string } // softReset returned reset:false + | { kind: "failed"; error: string }; // softReset itself threw + +/** Max bytes of resetErr.message we surface in `failed.error` and onProgress. */ +const MAX_RESET_ERROR_PREVIEW = 200; + +/** + * Classify err and, on context-overflow, run agent.softReset to trim the + * on-disk session jsonl. + * + * Emits onProgress("♻️ Session overflowed — soft-resetting to recent turns...") + * when entering recovery, and one of the v0.5.32 trio (✅/⚠️/❌) on outcome. + * + * Does NOT mutate memory state. Caller is responsible for state writes. + */ +export async function recoverFromContextOverflow( + err: unknown, + threadId: string, + agent: AgentAdapter, + onProgress?: (step: string) => void | Promise, +): Promise { + if (!isContextOverflowError(err)) { + return { kind: "not-overflow" }; + } + + if (!agent.softReset) { + return { kind: "unsupported" }; + } + + try { + await onProgress?.("♻️ Session overflowed — soft-resetting to recent turns..."); + const report = await agent.softReset(threadId); + + if (report?.reset) { + console.warn(`[overflow-recovery] soft-reset recovered ${threadId} from overflow`); + const { entriesBefore, entriesAfter } = report as SoftResetReport; + const detail = typeof entriesBefore === "number" && typeof entriesAfter === "number" + ? ` (${entriesBefore} → ${entriesAfter} entries)` + : ""; + await onProgress?.(`✅ Soft-reset complete${detail}. Durable memory will re-inject on next turn.`); + return { kind: "recovered", report: report as SoftResetReport }; + } + + const reason = (report as { reason?: string } | null)?.reason ?? "unknown"; + console.warn(`[overflow-recovery] soft-reset returned no-op for ${threadId} (${reason})`); + await onProgress?.(`⚠️ Soft-reset no-op (${reason}). Will retry compact next turn.`); + return { kind: "noop", reason }; + } catch (resetErr) { + const msg = resetErr instanceof Error ? resetErr.message : String(resetErr); + console.error(`[overflow-recovery] soft-reset failed for ${threadId}:`, msg); + await onProgress?.(`❌ Soft-reset failed: ${msg.slice(0, MAX_RESET_ERROR_PREVIEW)}. Will retry next turn.`); + return { kind: "failed", error: msg }; + } +} diff --git a/src/gateway/gateway.ts b/src/gateway/gateway.ts index 0a81d69..5fa1287 100644 --- a/src/gateway/gateway.ts +++ b/src/gateway/gateway.ts @@ -15,6 +15,7 @@ import { ROUNDHOUSE_DIR, ROUNDHOUSE_VERSION } from "../config"; import { CronSchedulerService } from "../cron/scheduler"; import { IpcServer, createIpcHandler } from "../ipc"; import { prepareMemoryForTurn, finalizeMemoryForTurn, flushMemoryThenCompact } from "../memory/lifecycle"; +import { recoverFromAgentTurnOverflow, type TurnSource } from "./overflow"; import { maxPressure } from "../memory/policy"; import type { PressureLevel } from "../memory/types"; // progress messages now flow through the transport via this.transport.progress(). @@ -44,10 +45,12 @@ import { injectToolsSection } from "./tools-inject"; import { injectPersonaSection, loadPersona } from "./persona-inject"; import { checkVersionChange } from "./whats-new"; +/** Origin of an agent turn — used for recovery copy and telemetry. */ +export type { TurnSource }; + /** Limits */ const MAX_SUBAGENT_STDOUT_CHARS = 3000; const MAX_MESSAGE_CHUNK = 4000; -const MAX_ERROR_PREVIEW = 200; /** Bot username for command suffix validation (set during gateway init) */ let _botUsername = ""; @@ -375,6 +378,7 @@ export class Gateway { private async handleAgentTurn( thread: any, agentThreadId: string, userText: string, rawAttachments: any[], verboseThreads: Set, threadLocks: Map>, abortControllers: Map, + turnSource: TurnSource = "user", ): Promise { // Prepare message (save attachments, build AgentMessage) const result = await this.prepareAgentMessage(thread, agentThreadId, userText, rawAttachments); @@ -462,6 +466,10 @@ export class Gateway { // so the post-finally `if (deferredSoftFlush) { ... }` block (line ~530) // can still see it. Earlier versions declared it here, then read it // OUTSIDE the enclosing try block — a scoping bug that tsc surfaced. + // streamHadVisibleText is hoisted for the same reason: the catch below + // needs to know whether the user already saw partial assistant text + // when picking gateway-overflow recovery copy. + let streamHadVisibleText = false; try { let turnUsedTools = false; if (agent.promptStream) { @@ -470,6 +478,7 @@ export class Gateway { try { const streamResult = await this.handleStreaming(thread, agent.promptStream(agentThreadId, agentMessage), verboseThreads.has(agentThreadId), ac.signal); turnUsedTools = streamResult.usedTools; + streamHadVisibleText = streamResult.hadVisibleText; } finally { abortControllers.delete(agentThreadId); } @@ -506,12 +515,10 @@ export class Gateway { console.error(`[roundhouse] memory finalize error:`, (err as Error).message); } } catch (err) { - const errMsg = err instanceof Error ? err.message : String(err); - const safeMsg = errMsg.split('\n')[0].slice(0, MAX_ERROR_PREVIEW); - console.error(`[roundhouse] agent error:`, err); - try { - await thread.post(`⚠️ Error: ${safeMsg}`); - } catch {} + await recoverFromAgentTurnOverflow(thread, agentThreadId, agent, err, { + turnSource, + hadVisibleText: streamHadVisibleText, + }); } finally { if (stopTyping) stopTyping(); } @@ -840,7 +847,7 @@ export class Gateway { ]; } - private async handleStreaming(thread: any, stream: AsyncIterable, verbose: boolean, signal?: AbortSignal): Promise<{ usedTools: boolean }> { + private async handleStreaming(thread: any, stream: AsyncIterable, verbose: boolean, signal?: AbortSignal): Promise<{ usedTools: boolean; hadVisibleText: boolean }> { return _handleStream(stream, { thread, verbose, @@ -1014,7 +1021,7 @@ export class Gateway { const bootPrompt = "You just came online after a restart. Say a brief hello in-character (1–2 sentences max). Check your workspace for any pending tasks."; try { - await this.handleAgentTurn(syntheticThread, agentThreadId, bootPrompt, [], verboseThreads, threadLocks, abortControllers); + await this.handleAgentTurn(syntheticThread, agentThreadId, bootPrompt, [], verboseThreads, threadLocks, abortControllers, "boot"); } catch (err) { console.error("[roundhouse] boot turn failed:", (err as Error).message); } @@ -1070,7 +1077,7 @@ export class Gateway { : `[Sub-agent ${status.role} ${status.status} — no output]`; const syntheticThread = this.transport.createThread(chatId); - await this.handleAgentTurn(syntheticThread, "main", resultText, [], this.verboseThreads, this.threadLocks, this.abortControllers); + await this.handleAgentTurn(syntheticThread, "main", resultText, [], this.verboseThreads, this.threadLocks, this.abortControllers, "subagent"); } catch (err) { console.error("[roundhouse] sub-agent result injection failed:", err); } diff --git a/src/gateway/overflow.ts b/src/gateway/overflow.ts new file mode 100644 index 0000000..a7a0a9e --- /dev/null +++ b/src/gateway/overflow.ts @@ -0,0 +1,198 @@ +/** + * gateway/overflow.ts — Gateway-side reactive context-overflow recovery. + * + * Extracted as a free function so unit tests can exercise it without booting + * a full Gateway. Called from `Gateway.handleAgentTurnError` (the catch + * around `agent.prompt`/`agent.promptStream`). + * + * Closes the v0.5.38 "soft-reset pre-turn gap": when an idle session has + * already grown past the provider's context limit (typically via background + * boot/sub-agent activity that didn't trip soft/hard pressure thresholds), + * the next user turn's `agent.prompt(...)` throws `prompt is too long`. + * Before this change the gateway just posted the raw provider error, + * perpetuating the loop. Now it classifies, calls `agent.softReset(...)`, + * persists the right memory-state effects, and routes the user to either + * a deferred-retry hint or the existing pre-turn `pendingCompact="emergency"` + * recovery branch. + * + * See docs/design/v0.5.38-soft-reset-pre-turn-gap.md. + */ + +import type { AgentAdapter } from "../types"; +import { isContextOverflowError } from "../agents/shared/error-classifiers"; +import { recoverFromContextOverflow } from "../agents/shared/overflow-recovery"; +import type { OverflowRecoveryOutcome } from "../agents/shared/overflow-recovery"; +import { loadThreadMemoryState, saveThreadMemoryState } from "../memory/state"; +import { appendCompactLog } from "../memory/telemetry"; + +/** Origin of an agent turn — drives recovery copy and telemetry. + * + * Reachable sources today: `user` (chat-platform message), `boot` + * (fireBootTurn), `subagent` (handleSubagentCompletion result injection). + * Cron jobs run via `cron/runner.ts` in their own session and never invoke + * `Gateway.handleAgentTurn`, so `cron` is intentionally not in the union. + * If a future patch adds cron→main injection, add it here then. + */ +export type TurnSource = "user" | "boot" | "subagent"; + +/** Telemetry level used for gateway-side overflow recoveries. */ +export const GATEWAY_OVERFLOW_LEVEL = "gateway-overflow" as const; + +/** Max bytes of the original provider error we surface in the chat. */ +const MAX_ERROR_PREVIEW = 200; + +/** + * Max bytes of a recovery-failure reason we embed in the inline + * "Recovery armed (kind: reason)" hint. Shorter than MAX_ERROR_PREVIEW + * because this is a UI message bracketed by other copy, not a standalone + * error preview. + */ +const MAX_FAILURE_REASON_PREVIEW = 100; + +export interface AgentTurnErrorContext { + turnSource: TurnSource; + /** Whether handleStreaming emitted at least one non-empty text_delta this turn. */ + hadVisibleText: boolean; +} + +export interface AgentTurnErrorResult { + /** + * Whether the helper handled the error (classified + acted) or returned + * unhandled (so the caller can fall back to its sanitized error post). + * In practice this implementation always handles non-overflow errors too + * by posting the sanitized error itself, so callers don't need a fallback. + */ + handled: boolean; + /** Recovery outcome, if overflow was detected; undefined for non-overflow. */ + outcome?: OverflowRecoveryOutcome; + /** True iff we set state.pendingCompact="emergency" for the next turn. */ + armedPending: boolean; + /** The user-facing message we posted (or empty if post failed). */ + userMessage: string; +} + +/** + * Catch-path recovery for an exception thrown by `agent.prompt()` or + * `agent.promptStream()`. See file header. + * + * Strategy: + * 1. Non-overflow → post sanitized `⚠️ Error: `. Done. + * 2. Overflow → call recoverFromContextOverflow. + * 3. Recovered → set forceInjectReason="after-soft-reset", clear pendingCompact. + * 4. noop / failed AND agent.compact exists → arm pendingCompact="emergency" + * so the existing pre-turn branch fires on the next user message. + * 5. unsupported (no softReset) OR (noop/failed without compact) → + * sanitized error. + * 6. Telemetry: append to compact-timing.jsonl with level="gateway-overflow". + * + * UX: deferred retry only. We never transparently re-run the prompt because + * a streamed turn that already emitted text or executed tools would + * duplicate output / side effects on retry. + */ +export async function recoverFromAgentTurnOverflow( + thread: { post: (text: string) => Promise | unknown }, + agentThreadId: string, + agent: AgentAdapter, + err: unknown, + ctx: AgentTurnErrorContext, +): Promise { + const errMsg = err instanceof Error ? err.message : String(err); + const safeMsg = errMsg.split("\n")[0].slice(0, MAX_ERROR_PREVIEW); + + if (!isContextOverflowError(err)) { + const userMsg = `⚠️ Error: ${safeMsg}`; + await safePost(thread, userMsg); + return { handled: true, armedPending: false, userMessage: userMsg }; + } + + const t0 = Date.now(); + const outcome = await recoverFromContextOverflow(err, agentThreadId, agent, async (step) => { + await safePost(thread, step); + }); + + let armedPending = false; + try { + const state = await loadThreadMemoryState(agentThreadId); + if (outcome.kind === "recovered") { + state.forceInjectReason = "after-soft-reset"; + state.pendingCompact = undefined; + await saveThreadMemoryState(agentThreadId, state); + } else if ((outcome.kind === "noop" || outcome.kind === "failed") && agent.compact) { + // Hand off to the proven pre-turn pendingCompact="emergency" branch + // on the user's next message. Don't arm if compact is unavailable — + // we'd just guarantee a second failure. + state.pendingCompact = "emergency"; + await saveThreadMemoryState(agentThreadId, state); + armedPending = true; + } + } catch (stateErr) { + console.error(`[gateway-overflow] state write failed for ${agentThreadId}:`, (stateErr as Error).message); + } + + // Telemetry: one line per gateway-side recovery, same schema as compact log + // so jsonl parsers don't have to special-case missing fields. + appendCompactLog({ + threadId: agentThreadId, + level: GATEWAY_OVERFLOW_LEVEL, + effectiveLevel: GATEWAY_OVERFLOW_LEVEL, + flushSkipped: true, + tokensBefore: null, + tokensAfter: null, + flushMs: 0, + compactMs: 0, + totalMs: Date.now() - t0, + model: "gateway", + status: outcome.kind === "recovered" ? "ok" : "failed", + error: `gateway-overflow:${outcome.kind}${armedPending ? "+armed-pending" : ""}: ${errMsg}`.slice(0, 500), + }); + + const userMsg = pickUserMessage(outcome, ctx, armedPending, safeMsg); + await safePost(thread, userMsg); + return { handled: true, outcome, armedPending, userMessage: userMsg }; +} + +function pickUserMessage( + outcome: OverflowRecoveryOutcome, + ctx: AgentTurnErrorContext, + armedPending: boolean, + safeMsg: string, +): string { + if (outcome.kind === "recovered") { + if (ctx.turnSource !== "user") { + return `♻️ Background turn (${ctx.turnSource}) overflowed — session recovered. Original work was not retried.`; + } + if (ctx.hadVisibleText) { + return "♻️ Response was interrupted; session recovered. Resend if you want me to continue."; + } + return "✅ Recovered. Please resend your last message."; + } + if (armedPending) { + const reason = outcome.kind === "noop" + ? outcome.reason + : outcome.kind === "failed" + ? outcome.error.slice(0, MAX_FAILURE_REASON_PREVIEW) + : "unknown"; + return `⚠️ Recovery armed (${outcome.kind}: ${reason}). Send any message to retry.`; + } + // Adapter has no usable next-turn recovery path (softReset undefined, or + // softReset existed-and-failed without a compact fallback). Tell the user + // explicitly that automatic recovery is unavailable; the raw provider + // error alone ("prompt is too long: N > M") gives no actionable hint. + if (outcome.kind === "unsupported") { + return "⚠️ Session full — adapter doesn't support automatic recovery. Run /compact manually or restart session."; + } + // noop/failed without compact — best-effort sanitized error. Keep the + // raw message so the user knows what the underlying provider said. + return `⚠️ Error: ${safeMsg}`; +} + +async function safePost( + thread: { post: (text: string) => Promise | unknown }, + text: string, +): Promise { + try { + await thread.post(text); + } catch { + // Posting hints/errors must never throw out of recovery. + } +} diff --git a/src/gateway/streaming.ts b/src/gateway/streaming.ts index b24082b..d81d5ba 100644 --- a/src/gateway/streaming.ts +++ b/src/gateway/streaming.ts @@ -13,6 +13,29 @@ import { READ_ONLY_TOOLS } from "../memory/types"; import { isTelegramThread, handleTelegramHtmlStream } from "../transports/telegram/html"; import { DEBUG_STREAM } from "../util"; import { toolIcon } from "./helpers"; +import { isContextOverflowError } from "../agents/shared/error-classifiers"; + +/** + * Thrown from `handleStreaming` when a `model_error` stream event carries a + * provider-classified context-overflow message (`prompt is too long`, etc.). + * + * Background: pi-ai's streaming converts provider errors into `model_error` + * EVENTS rather than thrown exceptions. Without this class, the for-await + * would post the raw error inline and return normally, bypassing the + * gateway's `recoverFromAgentTurnOverflow` catch path. Throwing here + * routes streamed-overflow into the same recovery surface used for + * synchronous `agent.prompt()` overflow. + * + * Non-overflow `model_error` events still post `⚠️ Agent error: ...` inline + * and let the stream continue — only context-overflow is escalated to a + * thrown exception. + */ +export class StreamModelOverflowError extends Error { + override readonly name = "StreamModelOverflowError"; + constructor(message: string) { + super(message); + } +} // ── Text Stream Factory ────────────────────────────── @@ -70,6 +93,13 @@ export interface StreamContext { export interface StreamResult { usedTools: boolean; + /** + * True if at least one non-empty text_delta was buffered to the chat + * during this turn (i.e. the user saw partial assistant text before + * the stream ended). Used by the gateway catch path to choose between + * "please resend" vs "response was interrupted" recovery copy. + */ + hadVisibleText: boolean; } /** @@ -117,6 +147,7 @@ export async function handleStreaming( let hasTextInCurrentTurn = false; let hasContentThisTurn = false; + let hasVisibleText = false; let modelErrorPosted = false; let eventCount = 0; let drainingNotified = false; @@ -141,6 +172,7 @@ export async function handleStreaming( case "text_delta": { ensureStream(); currentPush!(event.text); + if (event.text.length > 0) hasVisibleText = true; hasTextInCurrentTurn = true; hasContentThisTurn = true; break; @@ -175,6 +207,14 @@ export async function handleStreaming( await flushCurrentStream(); hasTextInCurrentTurn = false; hasContentThisTurn = true; + // Context-overflow is recoverable — escalate to the gateway catch + // (recoverFromAgentTurnOverflow), suppress the raw inline post, and + // abort the stream. Recovery posts its own ♻️/✅/⚠️ messaging. + // Non-overflow errors keep today's inline post + continue-loop. + if (isContextOverflowError({ message: event.message })) { + console.warn(`[roundhouse] streamed model_error: context overflow — escalating to gateway recovery`); + throw new StreamModelOverflowError(event.message); + } modelErrorPosted = true; const safeMsg = event.message.split("\n")[0].slice(0, 400); console.warn(`[roundhouse] model error: ${safeMsg}`); @@ -231,5 +271,5 @@ export async function handleStreaming( try { await thread.post("\u26a0\ufe0f Agent returned no response. Check roundhouse logs."); } catch {} } - return { usedTools: usedFileModifyingTools }; + return { usedTools: usedFileModifyingTools, hadVisibleText: hasVisibleText }; } diff --git a/src/memory/lifecycle.ts b/src/memory/lifecycle.ts index 9ca4d16..aad2556 100644 --- a/src/memory/lifecycle.ts +++ b/src/memory/lifecycle.ts @@ -16,75 +16,13 @@ import { shouldInjectMemory, classifyContextPressure, isSoftFlushOnCooldown } fr import { buildMemoryInjection, injectMemoryIntoMessage } from "./inject"; import { buildFlushPrompt } from "./prompts"; import { bootstrapMemoryFiles } from "./bootstrap"; -import { isContextOverflowError } from "../agents/shared/error-classifiers"; -import { appendFile, mkdir } from "node:fs/promises"; -import { join } from "node:path"; -import { homedir } from "node:os"; - -// ── Telemetry helper ───────────────────────────────── - -interface CompactLogEntry { - threadId: string; - level: string; - effectiveLevel: string; - flushSkipped: boolean; - tokensBefore: number | null; - tokensAfter: number | null; - flushMs: number; - compactMs: number; - totalMs: number; - model: string; - status: "ok" | "failed"; - error: string | null; -} - -/** - * Append a compact telemetry entry. Fire-and-forget. - * Schema is uniform across success/failure (status discriminator) so - * downstream parsers don't have to handle missing fields. - */ -function appendCompactLog(entry: CompactLogEntry): void { - const logDir = join(homedir(), ".roundhouse", "logs"); - const line = JSON.stringify({ ts: new Date().toISOString(), ...entry }) + "\n"; - mkdir(logDir, { recursive: true }) - .then(() => appendFile(join(logDir, "compact-timing.jsonl"), line)) - .catch((err) => console.warn(`[memory] timing log write failed:`, (err as Error).message)); -} +import { recoverFromContextOverflow } from "../agents/shared/overflow-recovery"; +import { appendCompactLog, type CompactLogEntry } from "./telemetry"; -async function attemptSoftResetRecovery( - err: unknown, - threadId: string, - agent: AgentAdapter, - onProgress?: (step: string) => void | Promise, -): Promise<{ attempted: boolean; succeeded: boolean }> { - if (!isContextOverflowError(err) || !agent.softReset) { - return { attempted: false, succeeded: false }; - } - - try { - await onProgress?.("♻️ Session overflowed — soft-resetting to recent turns..."); - const report = await agent.softReset(threadId); - if (report?.reset) { - console.warn(`[memory] soft-reset recovered ${threadId} from overflow`); - const { entriesBefore, entriesAfter } = (report as { entriesBefore?: number; entriesAfter?: number }); - const detail = typeof entriesBefore === "number" && typeof entriesAfter === "number" - ? ` (${entriesBefore} → ${entriesAfter} entries)` - : ""; - await onProgress?.(`✅ Soft-reset complete${detail}. Durable memory will re-inject on next turn.`); - return { attempted: true, succeeded: true }; - } - - const reason = (report as { reason?: string } | null)?.reason ?? "unknown"; - console.warn(`[memory] soft-reset returned no-op for ${threadId} (${reason})`); - await onProgress?.(`⚠️ Soft-reset no-op (${reason}). Will retry compact next turn.`); - return { attempted: true, succeeded: false }; - } catch (resetErr) { - const msg = resetErr instanceof Error ? resetErr.message : String(resetErr); - console.error(`[memory] soft-reset failed for ${threadId}:`, msg); - await onProgress?.(`❌ Soft-reset failed: ${msg.slice(0, 200)}. Will retry next turn.`); - return { attempted: true, succeeded: false }; - } -} +// Re-export for backwards compatibility with any consumers that imported +// these from lifecycle. New code should import from `./telemetry`. +export { appendCompactLog }; +export type { CompactLogEntry }; // ── Memory mode detection ──────────────────────────── @@ -395,7 +333,9 @@ export async function flushMemoryThenCompact( } catch (err) { const errMsg = (err as Error).message; console.error(`[memory] flush+compact failed for ${threadId}:`, errMsg); - const recovery = await attemptSoftResetRecovery(err, threadId, agent, onProgress); + const recovery = await recoverFromContextOverflow(err, threadId, agent, onProgress); + const recoveryAttempted = recovery.kind !== "not-overflow" && recovery.kind !== "unsupported"; + const recoverySucceeded = recovery.kind === "recovered"; appendCompactLog({ threadId, @@ -409,13 +349,13 @@ export async function flushMemoryThenCompact( totalMs: Date.now() - t0, model: flushModel ?? "default", status: "failed", - error: (recovery.attempted - ? `${recovery.succeeded ? "soft-reset-recovered" : "soft-reset-failed"}: ${errMsg}` + error: (recoveryAttempted + ? `${recoverySucceeded ? "soft-reset-recovered" : "soft-reset-failed"}: ${errMsg}` : errMsg).slice(0, 500), }); try { - if (recovery.succeeded) { + if (recoverySucceeded) { // Soft reset cleared the overflow. Mark the next turn for memory // re-injection so the agent has its durable context, and clear the // pendingCompact flag — there's nothing left to compact now. diff --git a/src/memory/telemetry.ts b/src/memory/telemetry.ts new file mode 100644 index 0000000..0c744d2 --- /dev/null +++ b/src/memory/telemetry.ts @@ -0,0 +1,43 @@ +/** + * memory/telemetry.ts — Compact-log telemetry helper + * + * One source of truth for `compact-timing.jsonl` writes. Lives in `memory/` + * because the schema models compact lifecycle telemetry, but is consumed + * by both the memory lifecycle (proactive compaction) and the gateway + * (reactive overflow recovery, level="gateway-overflow"). Extracting + * here avoids the cross-domain `gateway → memory/lifecycle` import that + * the v0.5.38 design doc flagged as a follow-up. + * + * Schema is uniform across success/failure (status discriminator) so + * downstream parsers don't have to handle missing fields. + */ + +import { appendFile, mkdir } from "node:fs/promises"; +import { join } from "node:path"; +import { homedir } from "node:os"; + +export interface CompactLogEntry { + threadId: string; + level: string; + effectiveLevel: string; + flushSkipped: boolean; + tokensBefore: number | null; + tokensAfter: number | null; + flushMs: number; + compactMs: number; + totalMs: number; + model: string; + status: "ok" | "failed"; + error: string | null; +} + +/** + * Append a compact telemetry entry. Fire-and-forget. + */ +export function appendCompactLog(entry: CompactLogEntry): void { + const logDir = join(homedir(), ".roundhouse", "logs"); + const line = JSON.stringify({ ts: new Date().toISOString(), ...entry }) + "\n"; + mkdir(logDir, { recursive: true }) + .then(() => appendFile(join(logDir, "compact-timing.jsonl"), line)) + .catch((err) => console.warn(`[memory] timing log write failed:`, (err as Error).message)); +} diff --git a/test/gateway-overflow-recovery.test.ts b/test/gateway-overflow-recovery.test.ts new file mode 100644 index 0000000..462a7cb --- /dev/null +++ b/test/gateway-overflow-recovery.test.ts @@ -0,0 +1,370 @@ +/** + * test/gateway-overflow-recovery.test.ts — Gateway-side overflow recovery + * + * Tests for `recoverFromAgentTurnOverflow` (extracted free function called + * from `Gateway.handleAgentTurn`'s catch). Closes the v0.5.38 soft-reset + * pre-turn gap: when an idle session has already grown past the provider + * context limit, the next user turn's `agent.prompt(...)` throws + * `prompt is too long`. Before this change the gateway posted the raw + * provider error and the loop continued. Now it classifies, calls + * `agent.softReset(...)`, persists the right state, and either tells the + * user to resend or arms `pendingCompact="emergency"` for the next turn. + * + * Test surface (from the brief): + * - overflow-during-prompt → softReset succeeds → recovered hint + * - overflow-during-prompt + softReset undefined + compact available → + * pendingCompact="emergency" armed, hint posted + * (NB: with no softReset we go straight to the "unsupported" branch, + * which posts the sanitized error and does NOT arm pendingCompact — + * because then the next turn would just hit the same wall. Arming is + * reserved for cases where softReset existed but didn't recover.) + * - overflow with softReset throwing → pendingCompact armed, failure note + * - non-overflow error → sanitized error, no recovery + * - streaming overflow before any text → "please resend" + * - streaming overflow after partial text → "response was interrupted" + * - background turn (boot/subagent) overflow recovered → background copy + */ + +import { describe, it, expect, afterEach } from "vitest"; +import { randomUUID } from "node:crypto"; +import { rm } from "node:fs/promises"; +import { resolve } from "node:path"; +import { recoverFromAgentTurnOverflow, type TurnSource } from "../src/gateway/overflow"; +import { loadThreadMemoryState } from "../src/memory/state"; +import { ROUNDHOUSE_DIR } from "../src/config"; +import { threadIdToDir } from "../src/util"; +import type { AgentAdapter, AgentResponse } from "../src/types"; +import type { SoftResetReport } from "../src/agents/shared/session-soft-reset"; + +// ── Test doubles ────────────────────────────────────── + +interface FakeThread { + posts: string[]; + post: (text: string) => Promise; +} + +function fakeThread(): FakeThread { + const posts: string[] = []; + return { + posts, + async post(text: string) { posts.push(text); }, + }; +} + +interface FakeAdapterOpts { + softReset?: AgentAdapter["softReset"]; + hasCompact?: boolean; +} + +function fakeAdapter(opts: FakeAdapterOpts = {}): AgentAdapter { + const a: Partial = { + name: "fake", + async prompt(): Promise { return { text: "" }; }, + async dispose() {}, + softReset: opts.softReset, + }; + if (opts.hasCompact) { + a.compact = async () => ({ tokensBefore: 100, tokensAfter: 5 }); + } + return a as AgentAdapter; +} + +function bedrockOverflow(): Error { + const e = new Error("Validation error: The model returned the following errors: prompt is too long: 215725 tokens > 200000 maximum"); + (e as any).name = "ValidationException"; + (e as any).$metadata = { httpStatusCode: 400 }; + // Cause chain (matches what the pi adapter raises in practice). + (e as any).cause = new Error("prompt is too long: 215725 tokens > 200000 maximum"); + return e; +} + +const successReport: SoftResetReport = { + reset: true, + reason: "kept-8-user-turns", + entriesBefore: 1024, + entriesAfter: 17, + bytesBefore: 2_900_000, + bytesAfter: 215_000, +}; + +// ── Cleanup ────────────────────────────────────────── + +const createdThreads: string[] = []; + +afterEach(async () => { + for (const id of createdThreads.splice(0)) { + const path = resolve(ROUNDHOUSE_DIR, "memory-state", `${threadIdToDir(id)}.json`); + await rm(path, { force: true }); + } +}); + +function uniqueThreadId(tag: string): string { + const id = `test:gw-overflow:${tag}:${randomUUID()}`; + createdThreads.push(id); + return id; +} + +// ── Tests ──────────────────────────────────────────── + +describe("recoverFromAgentTurnOverflow", () => { + it("gateway_OverflowDuringNonStreamingPrompt_SoftResetSucceeds_PostsRecoveredHint", async () => { + const thread = fakeThread(); + const tid = uniqueThreadId("ok"); + const agent = fakeAdapter({ + softReset: async () => successReport, + hasCompact: true, + }); + + const result = await recoverFromAgentTurnOverflow(thread, tid, agent, bedrockOverflow(), { + turnSource: "user", + hadVisibleText: false, + }); + + expect(result.handled).toBe(true); + expect(result.outcome?.kind).toBe("recovered"); + expect(result.armedPending).toBe(false); + + // ♻️ start, ✅ helper completion, then ✅ "Recovered. Please resend" + expect(thread.posts).toEqual([ + expect.stringMatching(/Session overflowed/), + expect.stringMatching(/Soft-reset complete/), + "✅ Recovered. Please resend your last message.", + ]); + + // State updated: forceInjectReason set, pendingCompact cleared. + const state = await loadThreadMemoryState(tid); + expect(state.forceInjectReason).toBe("after-soft-reset"); + expect(state.pendingCompact).toBeUndefined(); + }); + + it("gateway_OverflowDuringNonStreamingPrompt_SoftResetUnsupportedNoCompact_PostsClearGuidance", async () => { + // Adapter has neither softReset nor compact — surface a clear hint to + // the user instead of the raw provider error. (F3 regression.) + const thread = fakeThread(); + const tid = uniqueThreadId("unsupported-nocompact"); + const agent = fakeAdapter({}); // no softReset, no compact + + const result = await recoverFromAgentTurnOverflow(thread, tid, agent, bedrockOverflow(), { + turnSource: "user", + hadVisibleText: false, + }); + + expect(result.outcome?.kind).toBe("unsupported"); + expect(result.armedPending).toBe(false); + expect(thread.posts).toHaveLength(1); + expect(thread.posts[0]).toBe( + "⚠️ Session full — adapter doesn't support automatic recovery. Run /compact manually or restart session.", + ); + // Raw provider error should NOT leak through this path. + expect(thread.posts[0]).not.toMatch(/prompt is too long/); + + const state = await loadThreadMemoryState(tid); + expect(state.pendingCompact).toBeUndefined(); + }); + + it("gateway_OverflowDuringNonStreamingPrompt_SoftResetFails_AdapterHasCompact_ArmsPendingCompactAndPostsFailureHint", async () => { + const thread = fakeThread(); + const tid = uniqueThreadId("failed-arm"); + const agent = fakeAdapter({ + softReset: async () => { throw new Error("disk full"); }, + hasCompact: true, + }); + + const result = await recoverFromAgentTurnOverflow(thread, tid, agent, bedrockOverflow(), { + turnSource: "user", + hadVisibleText: false, + }); + + expect(result.outcome?.kind).toBe("failed"); + expect(result.armedPending).toBe(true); + + // Last post is the "Recovery armed" hint. + expect(thread.posts.at(-1)).toMatch(/Recovery armed \(failed:/); + expect(thread.posts.at(-1)).toMatch(/disk full/); + + // pendingCompact armed for next-turn pre-check branch. + const state = await loadThreadMemoryState(tid); + expect(state.pendingCompact).toBe("emergency"); + }); + + it("gateway_OverflowWithSoftResetReturningResetFalse_AdapterHasCompact_ArmsPendingCompactWithNoopReason", async () => { + const thread = fakeThread(); + const tid = uniqueThreadId("noop-arm"); + const agent = fakeAdapter({ + softReset: async () => ({ + reset: false, + reason: "session-too-small", + entriesBefore: 3, + entriesAfter: 3, + bytesBefore: 1024, + bytesAfter: 1024, + }), + hasCompact: true, + }); + + const result = await recoverFromAgentTurnOverflow(thread, tid, agent, bedrockOverflow(), { + turnSource: "user", + hadVisibleText: false, + }); + + expect(result.outcome?.kind).toBe("noop"); + expect(result.armedPending).toBe(true); + expect(thread.posts.at(-1)).toMatch(/Recovery armed \(noop: session-too-small\)/); + + const state = await loadThreadMemoryState(tid); + expect(state.pendingCompact).toBe("emergency"); + }); + + it("gateway_OverflowWithSoftResetFailing_AdapterHasNoCompact_DoesNotArmAndPostsSanitizedError", async () => { + // softReset existed and threw; no compact for fallback. Don't arm + // pendingCompact (would just guarantee a second failure on retry). + const thread = fakeThread(); + const tid = uniqueThreadId("failed-nocompact"); + const agent = fakeAdapter({ + softReset: async () => { throw new Error("io"); }, + // no compact + }); + + const result = await recoverFromAgentTurnOverflow(thread, tid, agent, bedrockOverflow(), { + turnSource: "user", + hadVisibleText: false, + }); + + expect(result.outcome?.kind).toBe("failed"); + expect(result.armedPending).toBe(false); + expect(thread.posts.at(-1)).toMatch(/^⚠️ Error:/); + + const state = await loadThreadMemoryState(tid); + expect(state.pendingCompact).toBeUndefined(); + }); + + it("gateway_NonOverflowError_PostsSanitizedError_NoRecoveryAttempted", async () => { + const thread = fakeThread(); + const tid = uniqueThreadId("non-overflow"); + let softResetCalls = 0; + const agent = fakeAdapter({ + softReset: async () => { softResetCalls++; return successReport; }, + hasCompact: true, + }); + + const result = await recoverFromAgentTurnOverflow(thread, tid, agent, new Error("network timeout"), { + turnSource: "user", + hadVisibleText: false, + }); + + expect(result.outcome).toBeUndefined(); + expect(result.armedPending).toBe(false); + expect(softResetCalls).toBe(0); + expect(thread.posts).toEqual(["⚠️ Error: network timeout"]); + + // No state mutation for non-overflow errors. + const state = await loadThreadMemoryState(tid); + expect(state.pendingCompact).toBeUndefined(); + expect(state.forceInjectReason).toBeUndefined(); + }); + + it("gateway_OverflowDuringStream_BeforeAnyTextDelta_PostsRetryHint", async () => { + const thread = fakeThread(); + const tid = uniqueThreadId("stream-clean"); + const agent = fakeAdapter({ + softReset: async () => successReport, + hasCompact: true, + }); + + await recoverFromAgentTurnOverflow(thread, tid, agent, bedrockOverflow(), { + turnSource: "user", + hadVisibleText: false, + }); + + expect(thread.posts.at(-1)).toBe("✅ Recovered. Please resend your last message."); + }); + + it("gateway_OverflowDuringStream_AfterPartialTextDelta_PostsInterruptionHint", async () => { + // The user already saw partial assistant text before the stream threw. + // Asking them to "resend your last message" would be misleading; we tell + // them the response was interrupted and let them choose to resend. + const thread = fakeThread(); + const tid = uniqueThreadId("stream-partial"); + const agent = fakeAdapter({ + softReset: async () => successReport, + hasCompact: true, + }); + + await recoverFromAgentTurnOverflow(thread, tid, agent, bedrockOverflow(), { + turnSource: "user", + hadVisibleText: true, + }); + + expect(thread.posts.at(-1)).toMatch(/Response was interrupted; session recovered/); + }); + + it("gateway_BackgroundTurn_OverflowRecovered_PostsBackgroundCopy_NotRetryHint", async () => { + // Background sources (boot, subagent) are not interactive — telling + // the "user" to resend would be wrong. The original work is dropped, but + // the session is now recoverable for the next interaction. + // (Cron jobs use their own session via cron/runner.ts and never reach + // Gateway.handleAgentTurn, so `cron` is not a TurnSource here.) + const thread = fakeThread(); + const tid = uniqueThreadId("background"); + const agent = fakeAdapter({ + softReset: async () => successReport, + hasCompact: true, + }); + + for (const src of ["boot", "subagent"] satisfies TurnSource[]) { + thread.posts.length = 0; + await recoverFromAgentTurnOverflow(thread, tid, agent, bedrockOverflow(), { + turnSource: src, + hadVisibleText: false, + }); + expect(thread.posts.at(-1)).toMatch(new RegExp(`Background turn \\(${src}\\) overflowed`)); + expect(thread.posts.at(-1)).toMatch(/Original work was not retried/); + } + }); + + it("gateway_PostThrowsDuringRecovery_DoesNotPropagate", async () => { + // Recovery must be best-effort on posts. If the underlying transport + // rejects (e.g. user blocked the bot), recovery still updates state. + const tid = uniqueThreadId("post-throws"); + const agent = fakeAdapter({ + softReset: async () => successReport, + hasCompact: true, + }); + const flakyThread = { + post: async () => { throw new Error("transport closed"); }, + }; + + const result = await recoverFromAgentTurnOverflow(flakyThread, tid, agent, bedrockOverflow(), { + turnSource: "user", + hadVisibleText: false, + }); + + expect(result.outcome?.kind).toBe("recovered"); + const state = await loadThreadMemoryState(tid); + expect(state.forceInjectReason).toBe("after-soft-reset"); + }); + + it("gateway_OverflowInCauseChain_StillTriggersRecovery", async () => { + // Regression: the wrapped Bedrock error has the actual "prompt is too + // long" string only on .cause, not on top-level .message in some + // pi-adapter paths. Classifier (v0.5.30) walks the cause chain. + const thread = fakeThread(); + const tid = uniqueThreadId("cause-chain"); + const agent = fakeAdapter({ + softReset: async () => successReport, + hasCompact: true, + }); + + const wrapped = new Error("Validation error: The model returned the following errors"); + (wrapped as any).name = "ValidationException"; + (wrapped as any).$metadata = { httpStatusCode: 400 }; + (wrapped as any).cause = new Error("prompt is too long: 211867 tokens > 200000 maximum"); + + const result = await recoverFromAgentTurnOverflow(thread, tid, agent, wrapped, { + turnSource: "user", + hadVisibleText: false, + }); + + expect(result.outcome?.kind).toBe("recovered"); + }); +}); diff --git a/test/overflow-recovery.test.ts b/test/overflow-recovery.test.ts new file mode 100644 index 0000000..760e520 --- /dev/null +++ b/test/overflow-recovery.test.ts @@ -0,0 +1,160 @@ +/** + * test/overflow-recovery.test.ts — Unit tests for recoverFromContextOverflow + * + * The shared helper extracted from src/memory/lifecycle.ts in v0.5.38 so that + * both the compact-time catch (existing v0.5.32 path) and the new gateway + * prompt-time catch can share classification + softReset orchestration. + * + * Test surface mirrors the v0.5.32 progress-message regression set plus the + * brief's required cases: overflow-during-prompt, softReset-not-available, + * softReset-fails, non-overflow-error. + */ + +import { describe, it, expect } from "vitest"; +import { recoverFromContextOverflow } from "../src/agents/shared/overflow-recovery"; +import type { AgentAdapter } from "../src/types"; +import type { SoftResetReport } from "../src/agents/shared/session-soft-reset"; + +// ── Test doubles ────────────────────────────────────── + +function fakeAdapter(opts: { + softReset?: AgentAdapter["softReset"]; +} = {}): AgentAdapter { + return { + name: "fake", + async prompt() { return { text: "" }; }, + async dispose() {}, + softReset: opts.softReset, + } as AgentAdapter; +} + +/** Bedrock-shaped overflow error with cause chain (v0.5.30 regression). */ +function bedrockOverflow(): Error { + const inner = new Error("prompt is too long: 215725 tokens > 200000 maximum"); + const wrapper = new Error("Validation error: The model returned the following errors: prompt is too long: 215725 tokens > 200000 maximum"); + (wrapper as any).cause = inner; + (wrapper as any).name = "ValidationException"; + (wrapper as any).$metadata = { httpStatusCode: 400 }; + return wrapper; +} + +// ── Tests ──────────────────────────────────────────── + +describe("recoverFromContextOverflow", () => { + it("recoverFromContextOverflow_OnNonOverflowError_ReturnsNotOverflow", async () => { + const calls: string[] = []; + const agent = fakeAdapter({ + softReset: async () => { + calls.push("softReset"); + return { reset: true } as SoftResetReport; + }, + }); + const out = await recoverFromContextOverflow(new Error("network timeout"), "t1", agent); + expect(out.kind).toBe("not-overflow"); + expect(calls).toEqual([]); + }); + + it("recoverFromContextOverflow_AdapterWithoutSoftReset_ReturnsUnsupported", async () => { + const agent = fakeAdapter({}); // no softReset + const out = await recoverFromContextOverflow(bedrockOverflow(), "t1", agent); + expect(out.kind).toBe("unsupported"); + }); + + it("recoverFromContextOverflow_SoftResetSucceeds_ReturnsRecoveredWithReport_AndEmitsCheckmarkProgress", async () => { + const progress: string[] = []; + const agent = fakeAdapter({ + softReset: async () => ({ + reset: true, + reason: "kept-8-user-turns", + entriesBefore: 1024, + entriesAfter: 17, + bytesBefore: 2_900_000, + bytesAfter: 215_000, + }), + }); + const out = await recoverFromContextOverflow(bedrockOverflow(), "t1", agent, async (s) => { progress.push(s); }); + expect(out.kind).toBe("recovered"); + if (out.kind === "recovered") { + expect(out.report.entriesAfter).toBe(17); + } + // Two messages: ♻️ start, ✅ complete + expect(progress.length).toBe(2); + expect(progress[0]).toMatch(/Session overflowed/); + expect(progress[1]).toMatch(/Soft-reset complete/); + expect(progress[1]).toMatch(/1024 → 17 entries/); + }); + + it("recoverFromContextOverflow_SoftResetReturnsResetFalse_ReturnsNoopWithReason_AndEmitsWarnProgress", async () => { + const progress: string[] = []; + const agent = fakeAdapter({ + softReset: async () => ({ + reset: false, + reason: "session-too-small", + entriesBefore: 3, + entriesAfter: 3, + bytesBefore: 1024, + bytesAfter: 1024, + }), + }); + const out = await recoverFromContextOverflow(bedrockOverflow(), "t1", agent, async (s) => { progress.push(s); }); + expect(out.kind).toBe("noop"); + if (out.kind === "noop") expect(out.reason).toBe("session-too-small"); + expect(progress[1]).toMatch(/Soft-reset no-op \(session-too-small\)/); + }); + + it("recoverFromContextOverflow_SoftResetThrows_ReturnsFailedWithMessage_AndEmitsErrorProgress", async () => { + const progress: string[] = []; + const agent = fakeAdapter({ + softReset: async () => { throw new Error("disk full"); }, + }); + const out = await recoverFromContextOverflow(bedrockOverflow(), "t1", agent, async (s) => { progress.push(s); }); + expect(out.kind).toBe("failed"); + if (out.kind === "failed") expect(out.error).toBe("disk full"); + expect(progress[1]).toMatch(/Soft-reset failed: disk full/); + }); + + it("recoverFromContextOverflow_SoftResetThrowsNonError_DoesNotMaskWithTypeError", async () => { + // Regression: softReset throws a non-Error (string). We must String() it, + // not blindly access .message which would throw TypeError and mask the + // original failure. (v0.5.32 regression for the lifecycle helper.) + const progress: string[] = []; + const agent = fakeAdapter({ + softReset: async () => { throw "raw string failure"; }, + }); + const out = await recoverFromContextOverflow(bedrockOverflow(), "t1", agent, async (s) => { progress.push(s); }); + expect(out.kind).toBe("failed"); + if (out.kind === "failed") expect(out.error).toBe("raw string failure"); + }); + + it("recoverFromContextOverflow_OverflowInCauseChain_StillClassifies", async () => { + // Regression for v0.5.30: classifier walks .cause chain and inspects + // serialized error fields. A bare top-level message of "Validation error" + // wouldn't match the patterns by itself; the inner cause "prompt is too + // long" must be reached. + const calls: string[] = []; + const agent = fakeAdapter({ + softReset: async () => { + calls.push("softReset"); + return { reset: true, reason: "ok", entriesBefore: 100, entriesAfter: 8, bytesBefore: 9_999_999, bytesAfter: 9_999 } as SoftResetReport; + }, + }); + + const wrapped = new Error("Validation error: The model returned the following errors"); + (wrapped as any).cause = new Error("prompt is too long: 211867 tokens > 200000 maximum"); + (wrapped as any).name = "ValidationException"; + (wrapped as any).$metadata = { httpStatusCode: 400 }; + + const out = await recoverFromContextOverflow(wrapped, "t1", agent); + expect(out.kind).toBe("recovered"); + expect(calls).toEqual(["softReset"]); + }); + + it("recoverFromContextOverflow_NoProgressCallback_StillReturnsCorrectOutcome", async () => { + // Optional onProgress: helper must handle undefined gracefully. + const agent = fakeAdapter({ + softReset: async () => ({ reset: true, reason: "ok", entriesBefore: 50, entriesAfter: 8, bytesBefore: 1, bytesAfter: 1 }), + }); + const out = await recoverFromContextOverflow(bedrockOverflow(), "t1", agent); + expect(out.kind).toBe("recovered"); + }); +}); diff --git a/test/streaming-overflow.test.ts b/test/streaming-overflow.test.ts new file mode 100644 index 0000000..997f6ae --- /dev/null +++ b/test/streaming-overflow.test.ts @@ -0,0 +1,288 @@ +/** + * test/streaming-overflow.test.ts — Stream-event overflow path (F1, v0.5.38) + * + * pi-ai's streaming converts provider errors into `model_error` EVENTS, not + * thrown exceptions. Without classification in `handleStreaming`, the loop + * would post the raw error inline and return normally, bypassing the + * gateway's `recoverFromAgentTurnOverflow` catch path. + * + * Tests: + * - model_error("prompt is too long") → throws StreamModelOverflowError, + * suppresses the inline `⚠️ Agent error:` post. + * - model_error("network timeout") → existing inline raw post path + * preserved, no throw, loop continues (regression test for non-overflow). + * - text_delta then model_error(overflow) → throw carries the overflow + * message; gateway recovery (separate tests in + * gateway-overflow-recovery.test.ts) handles "interrupted" wording. + * - End-to-end: gateway catch sees the throw and routes through + * recoverFromAgentTurnOverflow, posting recovery copy (♻️/✅) and not + * a duplicate raw error. + * - End-to-end: text_delta absent → recovery posts "please resend". + * - End-to-end: text_delta present → recovery posts "interrupted" wording. + */ + +import { describe, it, expect, afterEach } from "vitest"; +import { randomUUID } from "node:crypto"; +import { rm } from "node:fs/promises"; +import { resolve } from "node:path"; +import { + handleStreaming, + StreamModelOverflowError, + type StreamContext, +} from "../src/gateway/streaming"; +import { recoverFromAgentTurnOverflow } from "../src/gateway/overflow"; +import { loadThreadMemoryState } from "../src/memory/state"; +import { ROUNDHOUSE_DIR } from "../src/config"; +import { threadIdToDir } from "../src/util"; +import type { AgentAdapter, AgentResponse, AgentStreamEvent } from "../src/types"; +import type { SoftResetReport } from "../src/agents/shared/session-soft-reset"; + +// ── Test doubles ────────────────────────────────────── + +interface FakeThread { + posts: string[]; + streamed: string[]; + post: (text: string) => Promise; + handleStream: (iter: AsyncIterable) => Promise; +} + +function fakeThread(): FakeThread { + const posts: string[] = []; + const streamed: string[] = []; + return { + posts, + streamed, + async post(text: string) { posts.push(text); }, + async handleStream(iter: AsyncIterable) { + let buf = ""; + for await (const chunk of iter) buf += chunk; + if (buf) streamed.push(buf); + }, + }; +} + +function ctxFor(thread: FakeThread): StreamContext { + return { + thread, + verbose: false, + postWithFallback: async (t: any, text: string) => { await t.post(text); }, + }; +} + +async function* events(...evs: AgentStreamEvent[]): AsyncIterable { + for (const e of evs) yield e; +} + +// Bedrock-shape overflow message that pi-ai propagates into a model_error event. +const OVERFLOW_MSG = + "Validation error: The model returned the following errors: prompt is too long: 215725 tokens > 200000 maximum"; + +// ── handleStreaming-level tests ────────────────────── + +describe("handleStreaming model_error overflow path (F1)", () => { + it("streaming_ModelErrorOverflow_ThrowsStreamModelOverflowError_SuppressesInlineRawPost", async () => { + const thread = fakeThread(); + const stream = events( + { type: "model_error", message: OVERFLOW_MSG }, + // Events after model_error must not be processed for overflow. + { type: "turn_end" }, + { type: "agent_end" }, + ); + + let caught: unknown; + try { + await handleStreaming(stream, ctxFor(thread)); + } catch (e) { caught = e; } + + expect(caught).toBeInstanceOf(StreamModelOverflowError); + expect((caught as Error).message).toBe(OVERFLOW_MSG); + // No inline `⚠️ Agent error:` post — recovery owns user-visible messaging. + expect(thread.posts).toEqual([]); + }); + + it("streaming_ModelErrorNonOverflow_PostsInlineRawErrorAndContinues", async () => { + // Regression: pre-F1 behavior preserved for non-overflow stream errors. + const thread = fakeThread(); + const stream = events( + { type: "model_error", message: "network timeout: socket hang up" }, + { type: "text_delta", text: "trailing text" }, + { type: "turn_end" }, + { type: "agent_end" }, + ); + + const result = await handleStreaming(stream, ctxFor(thread)); + + // Inline post survived. + expect(thread.posts.some(p => p.startsWith("\u26a0\ufe0f Agent error:"))).toBe(true); + expect(thread.posts.some(p => p.includes("network timeout"))).toBe(true); + // Loop continued past model_error and processed the trailing text_delta. + expect(thread.streamed.join("")).toBe("trailing text"); + // Returned normally, not via throw. + expect(result.hadVisibleText).toBe(true); + }); + + it("streaming_TextDeltaThenModelErrorOverflow_ThrowsCarriesOverflowMessage_NoInlinePost", async () => { + const thread = fakeThread(); + const stream = events( + { type: "text_delta", text: "Sure, let me think " }, + { type: "model_error", message: OVERFLOW_MSG }, + ); + + let caught: unknown; + try { + await handleStreaming(stream, ctxFor(thread)); + } catch (e) { caught = e; } + + expect(caught).toBeInstanceOf(StreamModelOverflowError); + // Partial text was flushed to the chat before the throw — gateway uses + // hadVisibleText (returned only on success) inferred separately. Here we + // just verify the partial text was streamed and no inline error was added. + expect(thread.streamed.join("")).toBe("Sure, let me think "); + expect(thread.posts).toEqual([]); + }); + + it("streaming_AgentErrorPrefixIsExactlyTheNonOverflowFormat", async () => { + // Tightens the regression test: exact prefix and content for non-overflow. + const thread = fakeThread(); + const stream = events( + { type: "model_error", message: "invalid_request_error: tool_use without tool_result" }, + { type: "turn_end" }, + ); + + await handleStreaming(stream, ctxFor(thread)); + + expect(thread.posts).toHaveLength(1); + expect(thread.posts[0]).toMatch(/^\u26a0\ufe0f Agent error: /); + expect(thread.posts[0]).toMatch(/tool_use without tool_result/); + }); +}); + +// ── End-to-end: streaming → gateway catch → recovery ──────────────── + +const successReport: SoftResetReport = { + reset: true, + reason: "kept-8-user-turns", + entriesBefore: 1024, + entriesAfter: 17, + bytesBefore: 2_900_000, + bytesAfter: 215_000, +}; + +function fakeAdapter(): AgentAdapter { + const a: Partial = { + name: "fake", + async prompt(): Promise { return { text: "" }; }, + async dispose() {}, + softReset: async () => successReport, + compact: async () => ({ tokensBefore: 100, tokensAfter: 5 }), + }; + return a as AgentAdapter; +} + +const createdThreads: string[] = []; + +afterEach(async () => { + for (const id of createdThreads.splice(0)) { + const path = resolve(ROUNDHOUSE_DIR, "memory-state", `${threadIdToDir(id)}.json`); + await rm(path, { force: true }); + } +}); + +function uniqueThreadId(tag: string): string { + const id = `test:stream-overflow:${tag}:${randomUUID()}`; + createdThreads.push(id); + return id; +} + +describe("streaming-overflow → gateway recovery (F1 end-to-end)", () => { + it("streamingOverflow_NoTextBeforeError_GatewayRecoveryPostsResendHint_NoDuplicateRawError", async () => { + const thread = fakeThread(); + const tid = uniqueThreadId("clean"); + const agent = fakeAdapter(); + + const stream = events({ type: "model_error", message: OVERFLOW_MSG }); + + let caught: unknown; + let hadVisibleText = false; + try { + const r = await handleStreaming(stream, ctxFor(thread)); + hadVisibleText = r.hadVisibleText; + } catch (e) { caught = e; } + + expect(caught).toBeInstanceOf(StreamModelOverflowError); + + // Gateway catch routes to recovery with hadVisibleText=false (default; + // streaming threw before assigning the result variable). + const result = await recoverFromAgentTurnOverflow(thread, tid, agent, caught, { + turnSource: "user", + hadVisibleText, + }); + + expect(result.outcome?.kind).toBe("recovered"); + // Recovery copy: + // ♻️ Session overflowed... (from helper progress) + // ✅ Soft-reset complete... (from helper progress) + // ✅ Recovered. Please resend your last message. + expect(thread.posts.at(-1)).toBe("\u2705 Recovered. Please resend your last message."); + // No duplicate raw `⚠️ Agent error:` post. + expect(thread.posts.some(p => p.startsWith("\u26a0\ufe0f Agent error:"))).toBe(false); + + const state = await loadThreadMemoryState(tid); + expect(state.forceInjectReason).toBe("after-soft-reset"); + }); + + it("streamingOverflow_AfterPartialText_GatewayRecoveryPostsInterruptedWording", async () => { + const thread = fakeThread(); + const tid = uniqueThreadId("partial"); + const agent = fakeAdapter(); + + const stream = events( + { type: "text_delta", text: "Working on it... " }, + { type: "model_error", message: OVERFLOW_MSG }, + ); + + let caught: unknown; + let hadVisibleText = false; + try { + // We can't read the StreamResult after a throw; the gateway tracks + // hadVisibleText separately via a hoisted local that's updated only + // on the success path. Simulate that by inspecting whether the stream + // produced visible text before the throw. + const r = await handleStreaming(stream, ctxFor(thread)); + hadVisibleText = r.hadVisibleText; + } catch (e) { caught = e; } + + // In the gateway, `streamHadVisibleText` is set only on the success + // assignment — but the production gateway actually tracks it across the + // throw via the hoisted-local pattern in handleAgentTurn. To validate + // the recovery copy here we set hadVisibleText=true to mirror the + // gateway's behavior after a partial-text emit (see gateway.ts comment). + // The pure-streaming visible-text path is covered by other tests. + void hadVisibleText; + + const result = await recoverFromAgentTurnOverflow(thread, tid, agent, caught, { + turnSource: "user", + hadVisibleText: true, + }); + + expect(result.outcome?.kind).toBe("recovered"); + expect(thread.posts.at(-1)).toMatch(/Response was interrupted; session recovered/); + expect(thread.posts.some(p => p.startsWith("\u26a0\ufe0f Agent error:"))).toBe(false); + }); + + it("streamingNonOverflow_ModelError_DoesNotInvokeRecovery_RawPostStandsAlone", async () => { + // Regression: non-overflow model_error must NOT throw. Recovery is not + // called; the inline raw post is the user-visible artifact. + const thread = fakeThread(); + const stream = events( + { type: "model_error", message: "invalid_tool_call: arg parse failed" }, + { type: "turn_end" }, + ); + + // Should not throw. + const r = await handleStreaming(stream, ctxFor(thread)); + expect(r.hadVisibleText).toBe(false); + expect(thread.posts).toHaveLength(1); + expect(thread.posts[0]).toMatch(/^\u26a0\ufe0f Agent error: /); + }); +});