Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions src/agents/kiro/acp/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type { ChildProcessWithoutNullStreams } from "node:child_process";
export interface PendingRequest {
resolve: (value: unknown) => void;
reject: (error: unknown) => void;
timer: ReturnType<typeof setTimeout>;
timer: ReturnType<typeof setTimeout> | null;
}

export class AcpClient extends EventEmitter {
Expand All @@ -31,18 +31,19 @@ export class AcpClient extends EventEmitter {
}

/** Send a JSON-RPC request and await its response. */
async call<T = unknown>(method: string, params?: unknown): Promise<T> {
async call<T = unknown>(method: string, params?: unknown, timeoutMs?: number): Promise<T> {
if (this.closed) throw new Error("ACP client is closed");

const id = this.nextId++;
const payload = JSON.stringify({ jsonrpc: "2.0", id, method, params: params ?? {} });
this.proc.stdin.write(payload + "\n");

const timeout = timeoutMs ?? this.requestTimeoutMs;
return new Promise<T>((resolve, reject) => {
const timer = setTimeout(() => {
const timer = timeout > 0 ? setTimeout(() => {
this.pending.delete(id);
reject(new Error(`ACP call "${method}" timed out after ${this.requestTimeoutMs}ms`));
}, this.requestTimeoutMs);
reject(new Error(`ACP call "${method}" timed out after ${timeout}ms`));
}, timeout) : null;

this.pending.set(id, { resolve: resolve as (v: unknown) => void, reject, timer });
});
Expand Down Expand Up @@ -90,7 +91,7 @@ export class AcpClient extends EventEmitter {
// Response to a pending request
if ("id" in msg && typeof msg.id === "number" && this.pending.has(msg.id)) {
const { resolve, reject, timer } = this.pending.get(msg.id)!;
clearTimeout(timer);
if (timer) clearTimeout(timer);
this.pending.delete(msg.id);
if (msg.error) reject(msg.error);
else resolve(msg.result);
Expand All @@ -109,7 +110,7 @@ export class AcpClient extends EventEmitter {

private rejectAll(error: Error): void {
for (const [id, { reject, timer }] of this.pending) {
clearTimeout(timer);
if (timer) clearTimeout(timer);
reject(error);
}
this.pending.clear();
Expand Down
1 change: 1 addition & 0 deletions src/agents/kiro/acp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ export { AcpClient } from "./client.js";
export { spawnKiroCli, shutdownProcess, getKiroCliVersion } from "./process.js";
export type { AcpProcess, SpawnOptions } from "./process.js";
export type * from "./types.js";
export { AcpMethod, AcpEvent, SessionUpdateKind } from "./methods.js";
35 changes: 35 additions & 0 deletions src/agents/kiro/acp/methods.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* acp/methods.ts — JSON-RPC method names and event discriminators used by
* kiro-cli's ACP protocol. Centralized here so a protocol rev only touches
* this file.
*/

/** Methods the client calls on the agent. */
export const AcpMethod = {
Initialize: "initialize",
SessionNew: "session/new",
SessionLoad: "session/load",
SessionPrompt: "session/prompt",
SessionCancel: "session/cancel",
/** Internal kiro extension — e.g. running `/compact`. */
KiroCommandsExecute: "_kiro.dev/commands/execute",
} as const;

/** Notifications the agent emits to the client. */
export const AcpEvent = {
SessionUpdate: "session/update",
/**
* Internal kiro variant. Multiplexes a different set of update kinds
* (e.g. `tool_call_chunk`) than `session/update`, NOT a mirror — subscribing
* to both would route the same handler over disjoint payloads.
*/
KiroSessionUpdate: "_kiro.dev/session/update",
KiroMetadata: "_kiro.dev/metadata",
} as const;

/** Discriminator values inside a `session/update` notification's `update.sessionUpdate`. */
export const SessionUpdateKind = {
AgentMessageChunk: "agent_message_chunk",
ToolCall: "tool_call",
ToolCallUpdate: "tool_call_update",
} as const;
2 changes: 1 addition & 1 deletion src/agents/kiro/acp/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export interface AcpProcess {
export function spawnKiroCli(opts: SpawnOptions): AcpProcess {
const { agentName, cwd, env, maxStderrBytes = 1_048_576 } = opts;

const proc = spawn("kiro-cli", ["chat", "--agent", agentName, "--acp"], {
const proc = spawn("kiro-cli", ["acp", "--agent", agentName, "--trust-all-tools"], {
cwd,
env: { ...process.env, ...env },
stdio: ["pipe", "pipe", "pipe"],
Expand Down
190 changes: 103 additions & 87 deletions src/agents/kiro/kiro-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,19 @@ import { resolve } from "node:path";
import type { AgentAdapterFactory, AgentMessage, AgentResponse, AgentStreamEvent, AdapterInfo, MessageContext } from "../../types.js";
import { ROUNDHOUSE_VERSION } from "../../config.js";
import { BaseAdapter } from "../base-adapter.js";
import { spawnKiroCli, shutdownProcess, getKiroCliVersion, type AcpProcess, type InitializeResult, type SessionNewResult } from "./acp/index.js";
import { spawnKiroCli, shutdownProcess, getKiroCliVersion, AcpMethod, AcpEvent, SessionUpdateKind, type AcpProcess, type InitializeResult, type SessionNewResult } from "./acp/index.js";
import { SessionStore, type SessionEntry } from "./session.js";
import { normalizeToolName } from "./tool-names.js";

// kiro emits context usage as a percentage and does not advertise the
// active model's window size in the ACP protocol. We approximate the window
// at 200k so SessionEntry.contextTokens/contextWindow stay populated and the
// memory pressure classifier (which short-circuits when either is null) can
// still trigger compaction. This is correct for 200k-window models and a
// best-effort approximation for others — when kiro adds a window field,
// thread it through here instead.
const KIRO_DEFAULT_CONTEXT_WINDOW = 200_000;

// ── Types ────────────────────────────────────────────

interface KiroAdapterConfig {
Expand Down Expand Up @@ -141,7 +150,7 @@ class KiroAdapter extends BaseAdapter {
async abort(threadId: string): Promise<void> {
const session = this.store.get(threadId);
if (!session || !this.mainProcess) return;
await this.mainProcess.client.call("session/cancel", { sessionId: session.sessionId }).catch(() => {});
await this.mainProcess.client.call(AcpMethod.SessionCancel, { sessionId: session.sessionId }).catch(() => {});
}

async restart(threadId: string): Promise<void> {
Expand All @@ -154,7 +163,7 @@ class KiroAdapter extends BaseAdapter {
if (!session || !this.mainProcess) return null;

const before = session.contextTokens ?? 0;
await this.mainProcess.client.call("_kiro.dev/commands/execute", {
await this.mainProcess.client.call(AcpMethod.KiroCommandsExecute, {
sessionId: session.sessionId,
command: "/compact",
});
Expand Down Expand Up @@ -189,10 +198,20 @@ class KiroAdapter extends BaseAdapter {
private async ensureProcess(): Promise<AcpProcess> {
if (this.mainProcess && !this.mainProcess.client.isClosed) return this.mainProcess;

this.mainProcess = spawnKiroCli({ agentName: this.config.agentName, cwd: this.config.cwd });
const acpProc = spawnKiroCli({ agentName: this.config.agentName, cwd: this.config.cwd });
this.mainProcess = acpProc;

// Capture acpProc in the closure so the listener always reports stderr
// for *this* process even if `this.mainProcess` is reassigned later.
acpProc.proc.on("exit", (code, signal) => {
if (acpProc.stderr.length) {
console.error(`[kiro] process exited code=${code} signal=${signal}; stderr:\n${acpProc.stderr.join("")}`);
}
});

await this.mainProcess.client.call<InitializeResult>("initialize", {
protocolVersion: "1.0",
await acpProc.client.call<InitializeResult>(AcpMethod.Initialize, {
protocolVersion: 1,
clientCapabilities: { terminal: true },
Comment thread
royosherove marked this conversation as resolved.
clientInfo: { name: "roundhouse", version: ROUNDHOUSE_VERSION },
});

Expand All @@ -207,12 +226,12 @@ class KiroAdapter extends BaseAdapter {
const existing = this.store.get(threadId);
if (existing) return existing;

const proc = await this.ensureProcess();
let proc = await this.ensureProcess();

const persistedId = this.store.loadPersistedSessionId(threadId);
if (persistedId) {
try {
await proc.client.call("session/load", { sessionId: persistedId });
await proc.client.call(AcpMethod.SessionLoad, { sessionId: persistedId, cwd: this.config.cwd, mcpServers: [] });
const entry: SessionEntry = {
sessionId: persistedId,
threadId,
Expand All @@ -226,11 +245,19 @@ class KiroAdapter extends BaseAdapter {
this.store.set(threadId, entry);
return entry;
} catch {
// Session no longer valid — create new
// Session no longer valid. kiro-cli sometimes responds to an unknown
// sessionId by exiting (rather than returning a JSON-RPC error), which
// closes the AcpClient. Clear the stale persisted id and respawn before
// falling through to session/new.
this.store.clearPersistedSessionId(threadId);
if (this.mainProcess?.client.isClosed) {
this.mainProcess = null;
proc = await this.ensureProcess();
}
}
}

const result = await proc.client.call<SessionNewResult>("session/new", {});
const result = await proc.client.call<SessionNewResult>(AcpMethod.SessionNew, { cwd: this.config.cwd, mcpServers: [] });
const entry: SessionEntry = {
sessionId: result.sessionId,
threadId,
Expand All @@ -255,44 +282,40 @@ class KiroAdapter extends BaseAdapter {
const proc = this.mainProcess!;
const text = this.formatMessage(message);

const responsePromise = proc.client.call<void>("session/prompt", {
sessionId: session.sessionId,
text,
});

let fullText = "";

const onTextChunk = (params: any) => {
if (params?.sessionId === session.sessionId) {
fullText += params.text ?? "";
}
};

const onPermission = (params: any) => {
if (params?.sessionId === session.sessionId) {
proc.client.notify("permission/response", {
tool_call_id: params.tool_call_id,
decision: "approved",
});
const onSessionUpdate = (params: any) => {
if (params?.sessionId !== session.sessionId) return;
const update = params.update;
if (!update) return;
if (update.sessionUpdate === SessionUpdateKind.AgentMessageChunk && update.content?.text) {
fullText += update.content.text;
}
};

const onSessionUpdate = (params: any) => {
if (params?.sessionId === session.sessionId) {
this.store.updateContext(threadId, params.context_tokens ?? null, params.context_window ?? null, params.model);
const onMetadata = (params: any) => {
if (params?.sessionId !== session.sessionId) return;
const pct = params.contextUsagePercentage;
if (pct != null) {
const tokens = Math.round((pct / 100) * KIRO_DEFAULT_CONTEXT_WINDOW);
this.store.updateContext(threadId, tokens, KIRO_DEFAULT_CONTEXT_WINDOW, params.model ?? null);
}
};

proc.client.on("text_chunk", onTextChunk);
proc.client.on("permission_request", onPermission);
proc.client.on("session/update", onSessionUpdate);
// session/update is the canonical channel; _kiro.dev/session/update
// multiplexes a different set of update kinds (e.g. tool_call_chunk)
// we don't currently consume — don't subscribe twice.
proc.client.on(AcpEvent.SessionUpdate, onSessionUpdate);
proc.client.on(AcpEvent.KiroMetadata, onMetadata);

try {
await responsePromise;
await proc.client.call<any>(AcpMethod.SessionPrompt, {
sessionId: session.sessionId,
prompt: [{ type: "text", text }],
}, 0);
} finally {
proc.client.off("text_chunk", onTextChunk);
proc.client.off("permission_request", onPermission);
proc.client.off("session/update", onSessionUpdate);
proc.client.off(AcpEvent.SessionUpdate, onSessionUpdate);
proc.client.off(AcpEvent.KiroMetadata, onMetadata);
}

return { text: fullText };
Expand All @@ -319,59 +342,56 @@ class KiroAdapter extends BaseAdapter {
resolveWait?.();
}

const onTextChunk = (params: any) => {
if (params?.sessionId === session.sessionId) {
push({ type: "text_delta", text: params.text ?? "" });
}
};

const onToolCall = (params: any) => {
if (params?.sessionId === session.sessionId) {
push({ type: "tool_start", toolName: normalizeToolName(params.title ?? params.tool_name ?? ""), toolCallId: params.tool_call_id });
}
};

const onToolResult = (params: any) => {
if (params?.sessionId === session.sessionId) {
push({ type: "tool_end", toolName: normalizeToolName(params.tool_name ?? ""), toolCallId: params.tool_call_id, isError: (params.exit_code ?? 0) !== 0 });
}
};

const onPermission = (params: any) => {
if (params?.sessionId === session.sessionId) {
proc.client.notify("permission/response", {
tool_call_id: params.tool_call_id,
decision: "approved",
});
}
};

const onComplete = (params: any) => {
if (params?.sessionId === session.sessionId) {
if (params.stop_reason === "end_turn") {
push({ type: "turn_end" });
const onSessionUpdate = (params: any) => {
if (params?.sessionId !== session.sessionId) return;
const update = params.update;
if (!update) return;
switch (update.sessionUpdate) {
case SessionUpdateKind.AgentMessageChunk:
if (update.content?.text) {
push({ type: "text_delta", text: update.content.text });
}
break;
case SessionUpdateKind.ToolCall:
push({ type: "tool_start", toolName: normalizeToolName(update.title ?? ""), toolCallId: update.toolCallId ?? "" });
break;
case SessionUpdateKind.ToolCallUpdate: {
// tool_call_update arrives multiple times per call — for in-progress
// status changes and for the terminal completed/failed transition.
// Only the terminal one closes the tool lifecycle in our gateway.
const status = update.status;
if (status === "completed" || status === "failed") {
push({
type: "tool_end",
toolName: normalizeToolName(update.title ?? ""),
toolCallId: update.toolCallId ?? "",
isError: status === "failed",
});
}
break;
}
push({ type: "agent_end" });
done = true;
resolveWait?.();
}
};

const onSessionUpdate = (params: any) => {
if (params?.sessionId === session.sessionId) {
this.store.updateContext(threadId, params.context_tokens ?? null, params.context_window ?? null, params.model);
const onMetadata = (params: any) => {
if (params?.sessionId !== session.sessionId) return;
const pct = params.contextUsagePercentage;
if (pct != null) {
const tokens = Math.round((pct / 100) * KIRO_DEFAULT_CONTEXT_WINDOW);
this.store.updateContext(threadId, tokens, KIRO_DEFAULT_CONTEXT_WINDOW, params.model ?? null);
}
};

proc.client.on("text_chunk", onTextChunk);
proc.client.on("tool_call", onToolCall);
proc.client.on("tool_result", onToolResult);
proc.client.on("permission_request", onPermission);
proc.client.on("complete", onComplete);
proc.client.on("session/update", onSessionUpdate);
proc.client.on(AcpEvent.SessionUpdate, onSessionUpdate);
proc.client.on(AcpEvent.KiroMetadata, onMetadata);

// Fire the prompt (don't await — events stream in)
proc.client.call("session/prompt", { sessionId: session.sessionId, text }).catch((err) => {
proc.client.call<any>(AcpMethod.SessionPrompt, { sessionId: session.sessionId, prompt: [{ type: "text", text }] }, 0).then(() => {
push({ type: "turn_end" });
push({ type: "agent_end" });
done = true;
resolveWait?.();
}).catch((err) => {
push({ type: "agent_end" });
done = true;
promptError = err;
Expand All @@ -389,12 +409,8 @@ class KiroAdapter extends BaseAdapter {
}
if (promptError) throw promptError;
} finally {
proc.client.off("text_chunk", onTextChunk);
proc.client.off("tool_call", onToolCall);
proc.client.off("tool_result", onToolResult);
proc.client.off("permission_request", onPermission);
proc.client.off("complete", onComplete);
proc.client.off("session/update", onSessionUpdate);
proc.client.off(AcpEvent.SessionUpdate, onSessionUpdate);
proc.client.off(AcpEvent.KiroMetadata, onMetadata);
}
} finally {
this.store.markInFlight(threadId, false);
Expand Down
Loading
Loading