Skip to content

a2a: switch synchronous a2a_send to SSE streaming (tasks/sendSubscribe)#1219

Open
pbranchu wants to merge 2 commits into
RightNow-AI:mainfrom
pbranchu:a2a-streaming
Open

a2a: switch synchronous a2a_send to SSE streaming (tasks/sendSubscribe)#1219
pbranchu wants to merge 2 commits into
RightNow-AI:mainfrom
pbranchu:a2a-streaming

Conversation

@pbranchu

Copy link
Copy Markdown
Contributor

Summary

Switches the synchronous `a2a_send` tool from the blocking `tasks/send` JSON-RPC method to SSE-streaming `tasks/sendSubscribe`. Adds a shared `consume_sse_stream` helper used by both this PR and the forthcoming async dispatch PR.

This is the foundation PR in a three-PR series that splits the work from the closed #1066 into reviewable chunks:

  1. This PR — SSE streaming for the existing sync tool (no kernel changes)
  2. `kernel-context-threading` — refactor `KernelHandle::send_message` to plumb `ChannelCallbackContext` as a parameter (eliminates a cross-user race in any future callback machinery)
  3. `a2a-async-dispatch` — async tools (`a2a_send_async`, `a2a_check_task`, `a2a_cancel_task`) built on top of PRs 1 + 2

Changes

  • New shared helper: `process_sse_line` / `parse_sse_content` / `consume_sse_stream` in `a2a.rs` — single SSE-parsing implementation used by all streaming paths
  • `send_task_streaming` — replaces `send_task` for the `a2a_send` tool. Streams bytes from the wire incrementally (does not buffer the full response into a String first)
  • UTF-8-safe byte accumulation: bytes accumulate in a `Vec`, only complete newline-terminated prefixes are decoded with `from_utf8_lossy` (avoids panics on chunks that split multi-byte codepoints)
  • Wall-clock deadline of 300 s via `tokio::time::timeout` wrapping `consume_sse_stream` (matches the tool description; `reqwest`'s built-in `.timeout()` is per-read, not total-elapsed)
  • Tool description for `a2a_send` updated to honestly say "blocks until complete, up to 300 s"
  • Cargo.lock: bumps `lettre` to 0.11.22 (RUSTSEC-2026-0141)

Tests

7 SSE parser unit tests covering: normal completion, mid-stream disconnect, malformed JSON, missing final event, concatenated lines, error events, empty stream.

946 runtime + 503 channels tests pass on two consecutive runs (no flakes). `cargo clippy --workspace --tests --all-targets -- -D warnings` clean.

Context

This PR is what closed PR #1066 should have been on its own: a small, focused, self-contained change. The async dispatch tools and kernel refactor are split into their own PRs to keep each one reviewable.

Test plan

  • CI passes
  • Local: `cargo test -p openfang-runtime --lib -- --skip process_manager --skip subprocess_sandbox` passes
  • Confirmed against a mock SSE server in production deployment

Philippe Branchu and others added 2 commits May 31, 2026 20:34
Replace blocking tasks/send with tasks/sendSubscribe so a2a_send no longer
hits the 30-second client timeout on long-running remote agents (Claude Code
delegations, multi-step reasoning, etc.). The call still blocks until the
remote agent emits its final event but processes SSE chunks incrementally
from the wire — the response body is never buffered in memory at once, so
backpressure is preserved.

- Bump A2aClient timeout from 30s to 300s.
- Add `send_task_streaming` to `A2aClient` (incremental SSE consumption).
- Extract `process_sse_line` + `parse_sse_content` so production streaming
  and unit tests share one parser implementation.
- `tool_a2a_send` switches to `send_task_streaming`; description updated
  to accurately state the 300s blocking behaviour.
- Add 7 SSE parser unit tests: normal completion, mid-stream disconnect,
  malformed JSON skipping, no-final-event, chunk boundaries, error events,
  empty stream.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant