Skip to content

fix(a2a): handle streaming backpressure#1734

Open
RainYuY wants to merge 1 commit into
agentscope-ai:mainfrom
RainYuY:rain/a2a-streaming-backpressure
Open

fix(a2a): handle streaming backpressure#1734
RainYuY wants to merge 1 commit into
agentscope-ai:mainfrom
RainYuY:rain/a2a-streaming-backpressure

Conversation

@RainYuY

@RainYuY RainYuY commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

AgentScope-Java Version

#1490

Description

Checklist

Please check the following items before code is ready to be reviewed.

  • Code has been formatted with mvn spotless:apply
  • All tests are passing (mvn test)
  • Javadoc comments are complete and follow project conventions
  • Related documentation has been updated (e.g. links, examples, etc.)
  • Code is ready for review

@RainYuY RainYuY requested a review from a team June 12, 2026 09:24
@RainYuY RainYuY force-pushed the rain/a2a-streaming-backpressure branch from e6d937c to 3da9d27 Compare June 12, 2026 09:31
@RainYuY RainYuY force-pushed the rain/a2a-streaming-backpressure branch from 3da9d27 to c6c95b6 Compare June 12, 2026 09:43
@codecov

codecov Bot commented Jun 12, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 64.51613% with 11 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
...ver/transport/jsonrpc/JsonRpcTransportWrapper.java 46.66% 7 Missing and 1 partial ⚠️
...e/core/a2a/agent/event/TaskUpdateEventHandler.java 77.77% 1 Missing and 1 partial ⚠️
...e/a2a/server/executor/AgentScopeAgentExecutor.java 85.71% 1 Missing ⚠️

📢 Thoughts on this report? Let us know!

@AgentScopeJavaBot AgentScopeJavaBot left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 AI Review

This PR adds backpressure handling to the A2A streaming pipeline and improves client-side handling of non-COMPLETED terminal task states. The server-side changes in JsonRpcTransportWrapper introduce a configurable onBackpressureBuffer with BufferOverflowStrategy.ERROR and proper overflow logging — a solid, defensive approach. The AgentScopeAgentExecutor adds a good defensive catch for IllegalStateException when taskUpdater.fail() is called after doOnError already transitioned the task. The client-side TaskUpdateEventHandler correctly guards against converting artifacts from failed/cancelled tasks. Tests demonstrate the backpressure problem and verify the fix. Two minor issues noted below.

"[{}] A2aAgent task ended with non-completed state: {}.",
currentRequestId,
state);
context.getSink().success(Msg.builder().textContent(errorMsg).build());

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] context.getSink().success(...) is used to propagate a non-COMPLETED terminal state (e.g. FAILED, CANCELED) as if it were a normal completion. This is semantically misleading — downstream consumers of the Sink cannot distinguish a genuinely successful task from a failed one without inspecting the Msg.textContent.

If the Sink API supports it, prefer sink.error(new RuntimeException(errorMsg)) or a dedicated failure path. If success() is the only option (e.g. the sink type doesn't support error emission), add a code comment explaining why a failure is propagated via success(), so future maintainers are not confused.

"agentscope.a2a.streaming.backpressure-buffer-size";

private static final int DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE = 8192;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] Integer.getInteger(prop, defaultValue) returns null (and triggers an NPE during static init) if the system property is set to a non-numeric string (e.g. -Dagentscope.a2a.streaming.backpressure-buffer-size=abc). This would crash the entire class loading and prevent the server from starting.

Consider adding a defensive parse:

private static final int STREAMING_BACKPRESSURE_BUFFER_SIZE = parseBufferSize();

private static int parseBufferSize() {
    String raw = System.getProperty(STREAMING_BACKPRESSURE_BUFFER_SIZE_PROPERTY);
    if (raw == null) return DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE;
    try {
        return Integer.parseInt(raw);
    } catch (NumberFormatException e) {
        LoggerFactory.getLogger(JsonRpcTransportWrapper.class)
            .warn("Invalid value for {}: '{}', using default {}",
                STREAMING_BACKPRESSURE_BUFFER_SIZE_PROPERTY, raw,
                DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE);
        return DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE;
    }
}

@AgentScopeJavaBot AgentScopeJavaBot left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 AI Review

This PR adds backpressure handling to the A2A streaming pipeline and improves client-side handling of non-COMPLETED terminal task states. The server-side changes in JsonRpcTransportWrapper introduce a configurable onBackpressureBuffer with BufferOverflowStrategy.ERROR and proper overflow logging — a solid, defensive approach. The AgentScopeAgentExecutor adds a good defensive catch for IllegalStateException when taskUpdater.fail() is called after doOnError already transitioned the task. The client-side TaskUpdateEventHandler correctly guards against converting artifacts from failed/cancelled tasks. Tests demonstrate the backpressure problem and verify the fix. Two minor issues noted below.

"[{}] A2aAgent task ended with non-completed state: {}.",
currentRequestId,
state);
context.getSink().success(Msg.builder().textContent(errorMsg).build());

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] context.getSink().success(...) is used to propagate a non-COMPLETED terminal state (e.g. FAILED, CANCELED) as if it were a normal completion. This is semantically misleading — downstream consumers of the Sink cannot distinguish a genuinely successful task from a failed one without inspecting the Msg.textContent.

If the Sink API supports it, prefer sink.error(new RuntimeException(errorMsg)) or a dedicated failure path. If success() is the only option (e.g. the sink type doesn't support error emission), add a code comment explaining why a failure is propagated via success(), so future maintainers are not confused.

"agentscope.a2a.streaming.backpressure-buffer-size";

private static final int DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE = 8192;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] Integer.getInteger(prop, defaultValue) returns null (and triggers an NPE during static init) if the system property is set to a non-numeric string (e.g. -Dagentscope.a2a.streaming.backpressure-buffer-size=abc). This would crash the entire class loading and prevent the server from starting.

Consider adding a defensive parse:

private static final int STREAMING_BACKPRESSURE_BUFFER_SIZE = parseBufferSize();

private static int parseBufferSize() {
    String raw = System.getProperty(STREAMING_BACKPRESSURE_BUFFER_SIZE_PROPERTY);
    if (raw == null) return DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE;
    try {
        return Integer.parseInt(raw);
    } catch (NumberFormatException e) {
        LoggerFactory.getLogger(JsonRpcTransportWrapper.class)
            .warn("Invalid value for {}: '{}', using default {}",
                STREAMING_BACKPRESSURE_BUFFER_SIZE_PROPERTY, raw,
                DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE);
        return DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE;
    }
}

@AgentScopeJavaBot AgentScopeJavaBot added bug Something isn't working area/ext/integration External protocols & middleware integrations labels Jun 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/ext/integration External protocols & middleware integrations bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants