diff --git a/src/agents/kiro/acp/client.ts b/src/agents/kiro/acp/client.ts index ea3f0a9..a5d3ba7 100644 --- a/src/agents/kiro/acp/client.ts +++ b/src/agents/kiro/acp/client.ts @@ -11,7 +11,7 @@ import type { ChildProcessWithoutNullStreams } from "node:child_process"; export interface PendingRequest { resolve: (value: unknown) => void; reject: (error: unknown) => void; - timer: ReturnType; + timer: ReturnType | null; } export class AcpClient extends EventEmitter { @@ -31,18 +31,19 @@ export class AcpClient extends EventEmitter { } /** Send a JSON-RPC request and await its response. */ - async call(method: string, params?: unknown): Promise { + async call(method: string, params?: unknown, timeoutMs?: number): Promise { if (this.closed) throw new Error("ACP client is closed"); const id = this.nextId++; const payload = JSON.stringify({ jsonrpc: "2.0", id, method, params: params ?? {} }); this.proc.stdin.write(payload + "\n"); + const timeout = timeoutMs ?? this.requestTimeoutMs; return new Promise((resolve, reject) => { - const timer = setTimeout(() => { + const timer = timeout > 0 ? setTimeout(() => { this.pending.delete(id); - reject(new Error(`ACP call "${method}" timed out after ${this.requestTimeoutMs}ms`)); - }, this.requestTimeoutMs); + reject(new Error(`ACP call "${method}" timed out after ${timeout}ms`)); + }, timeout) : null; this.pending.set(id, { resolve: resolve as (v: unknown) => void, reject, timer }); }); @@ -90,7 +91,7 @@ export class AcpClient extends EventEmitter { // Response to a pending request if ("id" in msg && typeof msg.id === "number" && this.pending.has(msg.id)) { const { resolve, reject, timer } = this.pending.get(msg.id)!; - clearTimeout(timer); + if (timer) clearTimeout(timer); this.pending.delete(msg.id); if (msg.error) reject(msg.error); else resolve(msg.result); @@ -109,7 +110,7 @@ export class AcpClient extends EventEmitter { private rejectAll(error: Error): void { for (const [id, { reject, timer }] of this.pending) { - clearTimeout(timer); + if (timer) clearTimeout(timer); reject(error); } this.pending.clear(); diff --git a/src/agents/kiro/acp/index.ts b/src/agents/kiro/acp/index.ts index c2b6339..f7b02b1 100644 --- a/src/agents/kiro/acp/index.ts +++ b/src/agents/kiro/acp/index.ts @@ -6,3 +6,4 @@ export { AcpClient } from "./client.js"; export { spawnKiroCli, shutdownProcess, getKiroCliVersion } from "./process.js"; export type { AcpProcess, SpawnOptions } from "./process.js"; export type * from "./types.js"; +export { AcpMethod, AcpEvent, SessionUpdateKind } from "./methods.js"; diff --git a/src/agents/kiro/acp/methods.ts b/src/agents/kiro/acp/methods.ts new file mode 100644 index 0000000..257e9f1 --- /dev/null +++ b/src/agents/kiro/acp/methods.ts @@ -0,0 +1,35 @@ +/** + * acp/methods.ts — JSON-RPC method names and event discriminators used by + * kiro-cli's ACP protocol. Centralized here so a protocol rev only touches + * this file. + */ + +/** Methods the client calls on the agent. */ +export const AcpMethod = { + Initialize: "initialize", + SessionNew: "session/new", + SessionLoad: "session/load", + SessionPrompt: "session/prompt", + SessionCancel: "session/cancel", + /** Internal kiro extension — e.g. running `/compact`. */ + KiroCommandsExecute: "_kiro.dev/commands/execute", +} as const; + +/** Notifications the agent emits to the client. */ +export const AcpEvent = { + SessionUpdate: "session/update", + /** + * Internal kiro variant. Multiplexes a different set of update kinds + * (e.g. `tool_call_chunk`) than `session/update`, NOT a mirror — subscribing + * to both would route the same handler over disjoint payloads. + */ + KiroSessionUpdate: "_kiro.dev/session/update", + KiroMetadata: "_kiro.dev/metadata", +} as const; + +/** Discriminator values inside a `session/update` notification's `update.sessionUpdate`. */ +export const SessionUpdateKind = { + AgentMessageChunk: "agent_message_chunk", + ToolCall: "tool_call", + ToolCallUpdate: "tool_call_update", +} as const; diff --git a/src/agents/kiro/acp/process.ts b/src/agents/kiro/acp/process.ts index 0515923..c7ea1d1 100644 --- a/src/agents/kiro/acp/process.ts +++ b/src/agents/kiro/acp/process.ts @@ -31,7 +31,7 @@ export interface AcpProcess { export function spawnKiroCli(opts: SpawnOptions): AcpProcess { const { agentName, cwd, env, maxStderrBytes = 1_048_576 } = opts; - const proc = spawn("kiro-cli", ["chat", "--agent", agentName, "--acp"], { + const proc = spawn("kiro-cli", ["acp", "--agent", agentName, "--trust-all-tools"], { cwd, env: { ...process.env, ...env }, stdio: ["pipe", "pipe", "pipe"], diff --git a/src/agents/kiro/kiro-adapter.ts b/src/agents/kiro/kiro-adapter.ts index 478c556..6298169 100644 --- a/src/agents/kiro/kiro-adapter.ts +++ b/src/agents/kiro/kiro-adapter.ts @@ -15,10 +15,19 @@ import { resolve } from "node:path"; import type { AgentAdapterFactory, AgentMessage, AgentResponse, AgentStreamEvent, AdapterInfo, MessageContext } from "../../types.js"; import { ROUNDHOUSE_VERSION } from "../../config.js"; import { BaseAdapter } from "../base-adapter.js"; -import { spawnKiroCli, shutdownProcess, getKiroCliVersion, type AcpProcess, type InitializeResult, type SessionNewResult } from "./acp/index.js"; +import { spawnKiroCli, shutdownProcess, getKiroCliVersion, AcpMethod, AcpEvent, SessionUpdateKind, type AcpProcess, type InitializeResult, type SessionNewResult } from "./acp/index.js"; import { SessionStore, type SessionEntry } from "./session.js"; import { normalizeToolName } from "./tool-names.js"; +// kiro emits context usage as a percentage and does not advertise the +// active model's window size in the ACP protocol. We approximate the window +// at 200k so SessionEntry.contextTokens/contextWindow stay populated and the +// memory pressure classifier (which short-circuits when either is null) can +// still trigger compaction. This is correct for 200k-window models and a +// best-effort approximation for others — when kiro adds a window field, +// thread it through here instead. +const KIRO_DEFAULT_CONTEXT_WINDOW = 200_000; + // ── Types ──────────────────────────────────────────── interface KiroAdapterConfig { @@ -141,7 +150,7 @@ class KiroAdapter extends BaseAdapter { async abort(threadId: string): Promise { const session = this.store.get(threadId); if (!session || !this.mainProcess) return; - await this.mainProcess.client.call("session/cancel", { sessionId: session.sessionId }).catch(() => {}); + await this.mainProcess.client.call(AcpMethod.SessionCancel, { sessionId: session.sessionId }).catch(() => {}); } async restart(threadId: string): Promise { @@ -154,7 +163,7 @@ class KiroAdapter extends BaseAdapter { if (!session || !this.mainProcess) return null; const before = session.contextTokens ?? 0; - await this.mainProcess.client.call("_kiro.dev/commands/execute", { + await this.mainProcess.client.call(AcpMethod.KiroCommandsExecute, { sessionId: session.sessionId, command: "/compact", }); @@ -189,10 +198,20 @@ class KiroAdapter extends BaseAdapter { private async ensureProcess(): Promise { if (this.mainProcess && !this.mainProcess.client.isClosed) return this.mainProcess; - this.mainProcess = spawnKiroCli({ agentName: this.config.agentName, cwd: this.config.cwd }); + const acpProc = spawnKiroCli({ agentName: this.config.agentName, cwd: this.config.cwd }); + this.mainProcess = acpProc; + + // Capture acpProc in the closure so the listener always reports stderr + // for *this* process even if `this.mainProcess` is reassigned later. + acpProc.proc.on("exit", (code, signal) => { + if (acpProc.stderr.length) { + console.error(`[kiro] process exited code=${code} signal=${signal}; stderr:\n${acpProc.stderr.join("")}`); + } + }); - await this.mainProcess.client.call("initialize", { - protocolVersion: "1.0", + await acpProc.client.call(AcpMethod.Initialize, { + protocolVersion: 1, + clientCapabilities: { terminal: true }, clientInfo: { name: "roundhouse", version: ROUNDHOUSE_VERSION }, }); @@ -207,12 +226,12 @@ class KiroAdapter extends BaseAdapter { const existing = this.store.get(threadId); if (existing) return existing; - const proc = await this.ensureProcess(); + let proc = await this.ensureProcess(); const persistedId = this.store.loadPersistedSessionId(threadId); if (persistedId) { try { - await proc.client.call("session/load", { sessionId: persistedId }); + await proc.client.call(AcpMethod.SessionLoad, { sessionId: persistedId, cwd: this.config.cwd, mcpServers: [] }); const entry: SessionEntry = { sessionId: persistedId, threadId, @@ -226,11 +245,19 @@ class KiroAdapter extends BaseAdapter { this.store.set(threadId, entry); return entry; } catch { - // Session no longer valid — create new + // Session no longer valid. kiro-cli sometimes responds to an unknown + // sessionId by exiting (rather than returning a JSON-RPC error), which + // closes the AcpClient. Clear the stale persisted id and respawn before + // falling through to session/new. + this.store.clearPersistedSessionId(threadId); + if (this.mainProcess?.client.isClosed) { + this.mainProcess = null; + proc = await this.ensureProcess(); + } } } - const result = await proc.client.call("session/new", {}); + const result = await proc.client.call(AcpMethod.SessionNew, { cwd: this.config.cwd, mcpServers: [] }); const entry: SessionEntry = { sessionId: result.sessionId, threadId, @@ -255,44 +282,40 @@ class KiroAdapter extends BaseAdapter { const proc = this.mainProcess!; const text = this.formatMessage(message); - const responsePromise = proc.client.call("session/prompt", { - sessionId: session.sessionId, - text, - }); - let fullText = ""; - const onTextChunk = (params: any) => { - if (params?.sessionId === session.sessionId) { - fullText += params.text ?? ""; - } - }; - - const onPermission = (params: any) => { - if (params?.sessionId === session.sessionId) { - proc.client.notify("permission/response", { - tool_call_id: params.tool_call_id, - decision: "approved", - }); + const onSessionUpdate = (params: any) => { + if (params?.sessionId !== session.sessionId) return; + const update = params.update; + if (!update) return; + if (update.sessionUpdate === SessionUpdateKind.AgentMessageChunk && update.content?.text) { + fullText += update.content.text; } }; - const onSessionUpdate = (params: any) => { - if (params?.sessionId === session.sessionId) { - this.store.updateContext(threadId, params.context_tokens ?? null, params.context_window ?? null, params.model); + const onMetadata = (params: any) => { + if (params?.sessionId !== session.sessionId) return; + const pct = params.contextUsagePercentage; + if (pct != null) { + const tokens = Math.round((pct / 100) * KIRO_DEFAULT_CONTEXT_WINDOW); + this.store.updateContext(threadId, tokens, KIRO_DEFAULT_CONTEXT_WINDOW, params.model ?? null); } }; - proc.client.on("text_chunk", onTextChunk); - proc.client.on("permission_request", onPermission); - proc.client.on("session/update", onSessionUpdate); + // session/update is the canonical channel; _kiro.dev/session/update + // multiplexes a different set of update kinds (e.g. tool_call_chunk) + // we don't currently consume — don't subscribe twice. + proc.client.on(AcpEvent.SessionUpdate, onSessionUpdate); + proc.client.on(AcpEvent.KiroMetadata, onMetadata); try { - await responsePromise; + await proc.client.call(AcpMethod.SessionPrompt, { + sessionId: session.sessionId, + prompt: [{ type: "text", text }], + }, 0); } finally { - proc.client.off("text_chunk", onTextChunk); - proc.client.off("permission_request", onPermission); - proc.client.off("session/update", onSessionUpdate); + proc.client.off(AcpEvent.SessionUpdate, onSessionUpdate); + proc.client.off(AcpEvent.KiroMetadata, onMetadata); } return { text: fullText }; @@ -319,59 +342,56 @@ class KiroAdapter extends BaseAdapter { resolveWait?.(); } - const onTextChunk = (params: any) => { - if (params?.sessionId === session.sessionId) { - push({ type: "text_delta", text: params.text ?? "" }); - } - }; - - const onToolCall = (params: any) => { - if (params?.sessionId === session.sessionId) { - push({ type: "tool_start", toolName: normalizeToolName(params.title ?? params.tool_name ?? ""), toolCallId: params.tool_call_id }); - } - }; - - const onToolResult = (params: any) => { - if (params?.sessionId === session.sessionId) { - push({ type: "tool_end", toolName: normalizeToolName(params.tool_name ?? ""), toolCallId: params.tool_call_id, isError: (params.exit_code ?? 0) !== 0 }); - } - }; - - const onPermission = (params: any) => { - if (params?.sessionId === session.sessionId) { - proc.client.notify("permission/response", { - tool_call_id: params.tool_call_id, - decision: "approved", - }); - } - }; - - const onComplete = (params: any) => { - if (params?.sessionId === session.sessionId) { - if (params.stop_reason === "end_turn") { - push({ type: "turn_end" }); + const onSessionUpdate = (params: any) => { + if (params?.sessionId !== session.sessionId) return; + const update = params.update; + if (!update) return; + switch (update.sessionUpdate) { + case SessionUpdateKind.AgentMessageChunk: + if (update.content?.text) { + push({ type: "text_delta", text: update.content.text }); + } + break; + case SessionUpdateKind.ToolCall: + push({ type: "tool_start", toolName: normalizeToolName(update.title ?? ""), toolCallId: update.toolCallId ?? "" }); + break; + case SessionUpdateKind.ToolCallUpdate: { + // tool_call_update arrives multiple times per call — for in-progress + // status changes and for the terminal completed/failed transition. + // Only the terminal one closes the tool lifecycle in our gateway. + const status = update.status; + if (status === "completed" || status === "failed") { + push({ + type: "tool_end", + toolName: normalizeToolName(update.title ?? ""), + toolCallId: update.toolCallId ?? "", + isError: status === "failed", + }); + } + break; } - push({ type: "agent_end" }); - done = true; - resolveWait?.(); } }; - const onSessionUpdate = (params: any) => { - if (params?.sessionId === session.sessionId) { - this.store.updateContext(threadId, params.context_tokens ?? null, params.context_window ?? null, params.model); + const onMetadata = (params: any) => { + if (params?.sessionId !== session.sessionId) return; + const pct = params.contextUsagePercentage; + if (pct != null) { + const tokens = Math.round((pct / 100) * KIRO_DEFAULT_CONTEXT_WINDOW); + this.store.updateContext(threadId, tokens, KIRO_DEFAULT_CONTEXT_WINDOW, params.model ?? null); } }; - proc.client.on("text_chunk", onTextChunk); - proc.client.on("tool_call", onToolCall); - proc.client.on("tool_result", onToolResult); - proc.client.on("permission_request", onPermission); - proc.client.on("complete", onComplete); - proc.client.on("session/update", onSessionUpdate); + proc.client.on(AcpEvent.SessionUpdate, onSessionUpdate); + proc.client.on(AcpEvent.KiroMetadata, onMetadata); // Fire the prompt (don't await — events stream in) - proc.client.call("session/prompt", { sessionId: session.sessionId, text }).catch((err) => { + proc.client.call(AcpMethod.SessionPrompt, { sessionId: session.sessionId, prompt: [{ type: "text", text }] }, 0).then(() => { + push({ type: "turn_end" }); + push({ type: "agent_end" }); + done = true; + resolveWait?.(); + }).catch((err) => { push({ type: "agent_end" }); done = true; promptError = err; @@ -389,12 +409,8 @@ class KiroAdapter extends BaseAdapter { } if (promptError) throw promptError; } finally { - proc.client.off("text_chunk", onTextChunk); - proc.client.off("tool_call", onToolCall); - proc.client.off("tool_result", onToolResult); - proc.client.off("permission_request", onPermission); - proc.client.off("complete", onComplete); - proc.client.off("session/update", onSessionUpdate); + proc.client.off(AcpEvent.SessionUpdate, onSessionUpdate); + proc.client.off(AcpEvent.KiroMetadata, onMetadata); } } finally { this.store.markInFlight(threadId, false); diff --git a/src/agents/kiro/session.ts b/src/agents/kiro/session.ts index 464b7eb..6513457 100644 --- a/src/agents/kiro/session.ts +++ b/src/agents/kiro/session.ts @@ -6,7 +6,7 @@ */ import { resolve } from "node:path"; -import { readFileSync, writeFileSync, mkdirSync, existsSync, renameSync } from "node:fs"; +import { readFileSync, writeFileSync, mkdirSync, existsSync, renameSync, unlinkSync } from "node:fs"; import { randomBytes } from "node:crypto"; export interface SessionEntry { @@ -97,6 +97,12 @@ export class SessionStore { } } + /** Remove the persisted session id file so the next ensureSession starts fresh. */ + clearPersistedSessionId(threadId: string): void { + const filePath = this.sessionFilePath(threadId); + try { unlinkSync(filePath); } catch {} + } + // ── Private ────────────────────────────────────────── private persistSession(threadId: string, entry: SessionEntry): void {