Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,3 @@ repos:
args: [ --exit-non-zero-on-fix ]
exclude: ^src/acp/(meta|schema)\.py$
- id: ruff-format
exclude: ^src/acp/(meta|schema)\.py$
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ install: ## Install the virtual environment and install the pre-commit hooks
gen-all: ## Generate all code from schema
@echo "🚀 Generating all code"
@uv run scripts/gen_all.py
@uv run ruff check --fix
@uv run ruff format .

.PHONY: check
check: ## Run code quality tools.
Expand Down
109 changes: 109 additions & 0 deletions docs/migration-guide-0.7.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Migrating to ACP Python SDK 0.7

ACP 0.7 reshapes the public surface so that Python-facing names, runtime helpers, and schema models line up with the evolving Agent Client Protocol schema. This guide covers the major changes in 0.7.0 and calls out the mechanical steps you need to apply in downstream agents, clients, and transports.

## 1. `acp.schema` models now expose `snake_case` fields

- Every generated model in `acp.schema` (see `src/acp/schema.py`) now uses Pythonic attribute names such as `session_id`, `stop_reason`, and `field_meta`. The JSON aliases (e.g., `alias="sessionId"`) stay intact so over-the-wire payloads remain camelCase.
- Instantiating a model or accessing response values must now use the `snake_case` form:

```python
# Before (0.6 and earlier)
PromptResponse(stopReason="end_turn")
params.sessionId

# After (0.7 and later)
PromptResponse(stop_reason="end_turn")
params.session_id
```

- If you relied on `model_dump()` to emit camelCase keys automatically, switch to `model_dump(by_alias=True)` (or use helpers such as `text_block`, `start_tool_call`, etc.) so responses continue to match the protocol.
- `field_meta` stays available for extension data. Any extra keys that were nested under `_meta` should now be provided via keyword arguments when constructing the schema models (see section 3).

## 2. `acp.run_agent` and `acp.connect_to_agent` replace manual connection wiring

`AgentSideConnection` and `ClientSideConnection` still exist internally, but the top-level entry points now prefer the helper functions implemented in `src/acp/core.py`.

### Updating agents

- Old pattern:

```python
conn = AgentSideConnection(lambda conn: Agent(), writer, reader)
await asyncio.Event().wait() # keep running
```

- New pattern:

```python
await run_agent(MyAgent(), input_stream=writer, output_stream=reader)
```

- When your agent just runs over stdio, call `await run_agent(MyAgent())` and the helper will acquire asyncio streams via `stdio_streams()` for you.

### Updating clients and tests

- Old pattern:

```python
conn = ClientSideConnection(lambda conn: MyClient(), proc.stdin, proc.stdout)
```

- New pattern:

```python
conn = connect_to_agent(MyClient(), proc.stdin, proc.stdout)
```

- `spawn_agent_process` / `spawn_client_process` now accept concrete `Agent`/`Client` instances instead of factories that received the connection. Instantiate your implementation first and pass it in.
- Importing the legacy connection classes via `acp.AgentSideConnection` / `acp.ClientSideConnection` issues a `DeprecationWarning` (see `src/acp/__init__.py:82-96`). Update your imports to `run_agent` and `connect_to_agent` to silence the warning.

## 3. `Agent` and `Client` interface methods take explicit parameters

Both interfaces in `src/acp/interfaces.py` now look like idiomatic Python protocols: methods use `snake_case` names and receive the individual schema fields rather than a single request model.

### What changed

- Method names follow `snake_case` (`request_permission`, `session_update`, `new_session`, `set_session_model`, etc.).
- Parameters represent the schema fields, so there is no need to unpack `params` manually.
- Each method is decorated with `@param_model(...)`. Combined with the `compatible_class` helper (see `src/acp/utils.py`), this keeps the camelCase wrappers alive for callers that still pass a full Pydantic request object—but those wrappers now emit `DeprecationWarning`s to encourage migration.

### How to update your implementations

1. Rename your method overrides to their `snake_case` equivalents.
2. Replace `params: Model` arguments with the concrete fields plus `**kwargs` to collect future `_meta` keys.
3. Access schema data directly via those parameters.

Example migration for an agent:

```python
# Before
class EchoAgent:
async def prompt(self, params: PromptRequest) -> PromptResponse:
text = params.prompt[0].text
return PromptResponse(stopReason="end_turn")

# After
class EchoAgent:
async def prompt(self, prompt, session_id, **kwargs) -> PromptResponse:
text = prompt[0].text
return PromptResponse(stop_reason="end_turn")
```

Similarly, a client method such as `requestPermission` becomes:

```python
class RecordingClient(Client):
async def request_permission(self, options, session_id, tool_call, **kwargs):
...
```

### Additional notes

- The connection layers automatically assemble the right request/response models using the `param_model` metadata, so callers do not need to build Pydantic objects manually anymore.
- For extension points (`field_meta`), pass keyword arguments from the connection into your handler signature: they arrive inside `**kwargs`.

### Backward compatibility

- The change should be 100% backward compatible as long as you update your method names and signatures. The `compatible_class` wrapper ensures that existing callers passing full request models continue to work. The old style API will remain functional before the next major release(1.0).
- Because camelCase wrappers remain for now, you can migrate file-by-file while still running against ACP 0.7. Just watch for the new deprecation warnings in your logs/tests.
45 changes: 22 additions & 23 deletions docs/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ Spin up a working ACP agent/client loop in minutes. Keep this page beside the te

## Quick checklist

| Goal | Command / Link |
| --- | --- |
| Install the SDK | `pip install agent-client-protocol` or `uv add agent-client-protocol` |
| Run the echo agent | `python examples/echo_agent.py` |
| Point Zed (or another client) at it | Update `settings.json` as shown below |
| Programmatically drive an agent | Copy the `spawn_agent_process` example |
| Run tests before hacking further | `make check && make test` |
| Goal | Command / Link |
| ----------------------------------- | --------------------------------------------------------------------- |
| Install the SDK | `pip install agent-client-protocol` or `uv add agent-client-protocol` |
| Run the echo agent | `python examples/echo_agent.py` |
| Point Zed (or another client) at it | Update `settings.json` as shown below |
| Programmatically drive an agent | Copy the `spawn_agent_process` example |
| Run tests before hacking further | `make check && make test` |

## Before you begin

Expand Down Expand Up @@ -76,27 +76,26 @@ from pathlib import Path

from acp import spawn_agent_process, text_block
from acp.interfaces import Client
from acp.schema import InitializeRequest, NewSessionRequest, PromptRequest, SessionNotification


class SimpleClient(Client):
async def requestPermission(self, params): # pragma: no cover - minimal stub
async def request_permission(
self, options, session_id, tool_call, **kwargs: Any
)
return {"outcome": {"outcome": "cancelled"}}

async def sessionUpdate(self, params: SessionNotification) -> None:
print("update:", params.sessionId, params.update)
async def session_update(self, session_id, update, **kwargs):
print("update:", session_id, update)


async def main() -> None:
script = Path("examples/echo_agent.py")
async with spawn_agent_process(lambda _agent: SimpleClient(), sys.executable, str(script)) as (conn, _proc):
await conn.initialize(InitializeRequest(protocolVersion=1))
session = await conn.newSession(NewSessionRequest(cwd=str(script.parent), mcpServers=[]))
async with spawn_agent_process(SimpleClient(), sys.executable, str(script)) as (conn, _proc):
await conn.initialize(protocol_version=1)
session = await conn.new_session(cwd=str(script.parent), mcp_servers=[])
await conn.prompt(
PromptRequest(
sessionId=session.sessionId,
prompt=[text_block("Hello from spawn!")],
)
session_id=session.session_id,
prompt=[text_block("Hello from spawn!")],
)

asyncio.run(main())
Expand All @@ -111,16 +110,16 @@ _Swap the echo demo for your own `Agent` subclass._
Create your own agent by subclassing `acp.Agent`. The pattern mirrors the echo example:

```python
from acp import Agent, PromptRequest, PromptResponse
from acp import Agent, PromptResponse


class MyAgent(Agent):
async def prompt(self, params: PromptRequest) -> PromptResponse:
# inspect params.prompt, stream updates, then finish the turn
return PromptResponse(stopReason="end_turn")
async def prompt(self, prompt, session_id, **kwargs) -> PromptResponse:
# inspect prompt, stream updates, then finish the turn
return PromptResponse(stop_reason="end_turn")
```

Hook it up with `AgentSideConnection` inside an async entrypoint and wire it to your client. Refer to:
Run it with `run_agent()` inside an async entrypoint and wire it to your client. Refer to:

- [`examples/echo_agent.py`](https://github.com/agentclientprotocol/python-sdk/blob/main/examples/echo_agent.py) for the smallest streaming agent
- [`examples/agent.py`](https://github.com/agentclientprotocol/python-sdk/blob/main/examples/agent.py) for an implementation that negotiates capabilities and streams richer updates
Expand Down
120 changes: 74 additions & 46 deletions examples/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,93 +5,121 @@
from acp import (
Agent,
AgentSideConnection,
AuthenticateRequest,
AuthenticateResponse,
CancelNotification,
InitializeRequest,
InitializeResponse,
LoadSessionRequest,
LoadSessionResponse,
NewSessionRequest,
NewSessionResponse,
PromptRequest,
PromptResponse,
SetSessionModeRequest,
SetSessionModeResponse,
session_notification,
stdio_streams,
run_agent,
text_block,
update_agent_message,
PROTOCOL_VERSION,
)
from acp.schema import AgentCapabilities, AgentMessageChunk, Implementation
from acp.interfaces import Client
from acp.schema import (
AgentCapabilities,
AgentMessageChunk,
AudioContentBlock,
ClientCapabilities,
EmbeddedResourceContentBlock,
HttpMcpServer,
ImageContentBlock,
Implementation,
ResourceContentBlock,
SseMcpServer,
McpServerStdio,
TextContentBlock,
)


class ExampleAgent(Agent):
def __init__(self, conn: AgentSideConnection) -> None:
self._conn = conn
_conn: Client

def __init__(self) -> None:
self._next_session_id = 0
self._sessions: set[str] = set()

def on_connect(self, conn: Client) -> None:
self._conn = conn

async def _send_agent_message(self, session_id: str, content: Any) -> None:
update = content if isinstance(content, AgentMessageChunk) else update_agent_message(content)
await self._conn.sessionUpdate(session_notification(session_id, update))

async def initialize(self, params: InitializeRequest) -> InitializeResponse: # noqa: ARG002
await self._conn.session_update(session_id, update)

async def initialize(
self,
protocol_version: int,
client_capabilities: ClientCapabilities | None = None,
client_info: Implementation | None = None,
**kwargs: Any,
) -> InitializeResponse:
logging.info("Received initialize request")
return InitializeResponse(
protocolVersion=PROTOCOL_VERSION,
agentCapabilities=AgentCapabilities(),
agentInfo=Implementation(name="example-agent", title="Example Agent", version="0.1.0"),
protocol_version=PROTOCOL_VERSION,
agent_capabilities=AgentCapabilities(),
agent_info=Implementation(name="example-agent", title="Example Agent", version="0.1.0"),
)

async def authenticate(self, params: AuthenticateRequest) -> AuthenticateResponse | None: # noqa: ARG002
logging.info("Received authenticate request %s", params.methodId)
async def authenticate(self, method_id: str, **kwargs: Any) -> AuthenticateResponse | None:
logging.info("Received authenticate request %s", method_id)
return AuthenticateResponse()

async def newSession(self, params: NewSessionRequest) -> NewSessionResponse: # noqa: ARG002
async def new_session(
self, cwd: str, mcp_servers: list[HttpMcpServer | SseMcpServer | McpServerStdio], **kwargs: Any
) -> NewSessionResponse:
logging.info("Received new session request")
session_id = str(self._next_session_id)
self._next_session_id += 1
self._sessions.add(session_id)
return NewSessionResponse(sessionId=session_id, modes=None)
return NewSessionResponse(session_id=session_id, modes=None)

async def loadSession(self, params: LoadSessionRequest) -> LoadSessionResponse | None: # noqa: ARG002
logging.info("Received load session request %s", params.sessionId)
self._sessions.add(params.sessionId)
async def load_session(
self, cwd: str, mcp_servers: list[HttpMcpServer | SseMcpServer | McpServerStdio], session_id: str, **kwargs: Any
) -> LoadSessionResponse | None:
logging.info("Received load session request %s", session_id)
self._sessions.add(session_id)
return LoadSessionResponse()

async def setSessionMode(self, params: SetSessionModeRequest) -> SetSessionModeResponse | None: # noqa: ARG002
logging.info("Received set session mode request %s -> %s", params.sessionId, params.modeId)
async def set_session_mode(self, mode_id: str, session_id: str, **kwargs: Any) -> SetSessionModeResponse | None:
logging.info("Received set session mode request %s -> %s", session_id, mode_id)
return SetSessionModeResponse()

async def prompt(self, params: PromptRequest) -> PromptResponse:
logging.info("Received prompt request for session %s", params.sessionId)
if params.sessionId not in self._sessions:
self._sessions.add(params.sessionId)

await self._send_agent_message(params.sessionId, text_block("Client sent:"))
for block in params.prompt:
await self._send_agent_message(params.sessionId, block)

return PromptResponse(stopReason="end_turn")

async def cancel(self, params: CancelNotification) -> None: # noqa: ARG002
logging.info("Received cancel notification for session %s", params.sessionId)

async def extMethod(self, method: str, params: dict) -> dict: # noqa: ARG002
async def prompt(
self,
prompt: list[
TextContentBlock
| ImageContentBlock
| AudioContentBlock
| ResourceContentBlock
| EmbeddedResourceContentBlock
],
session_id: str,
**kwargs: Any,
) -> PromptResponse:
logging.info("Received prompt request for session %s", session_id)
if session_id not in self._sessions:
self._sessions.add(session_id)

await self._send_agent_message(session_id, text_block("Client sent:"))
for block in prompt:
await self._send_agent_message(session_id, block)
return PromptResponse(stop_reason="end_turn")

async def cancel(self, session_id: str, **kwargs: Any) -> None:
logging.info("Received cancel notification for session %s", session_id)

async def ext_method(self, method: str, params: dict[str, Any]) -> dict[str, Any]:
logging.info("Received extension method call: %s", method)
return {"example": "response"}

async def extNotification(self, method: str, params: dict) -> None: # noqa: ARG002
async def ext_notification(self, method: str, params: dict[str, Any]) -> None:
logging.info("Received extension notification: %s", method)


async def main() -> None:
logging.basicConfig(level=logging.INFO)
reader, writer = await stdio_streams()
AgentSideConnection(ExampleAgent, writer, reader)
await asyncio.Event().wait()
await run_agent(ExampleAgent())


if __name__ == "__main__":
Expand Down
Loading