fix(a2a): handle streaming backpressure#1734
Conversation
e6d937c to
3da9d27
Compare
3da9d27 to
c6c95b6
Compare
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
AgentScopeJavaBot
left a comment
There was a problem hiding this comment.
🤖 AI Review
This PR adds backpressure handling to the A2A streaming pipeline and improves client-side handling of non-COMPLETED terminal task states. The server-side changes in JsonRpcTransportWrapper introduce a configurable onBackpressureBuffer with BufferOverflowStrategy.ERROR and proper overflow logging — a solid, defensive approach. The AgentScopeAgentExecutor adds a good defensive catch for IllegalStateException when taskUpdater.fail() is called after doOnError already transitioned the task. The client-side TaskUpdateEventHandler correctly guards against converting artifacts from failed/cancelled tasks. Tests demonstrate the backpressure problem and verify the fix. Two minor issues noted below.
| "[{}] A2aAgent task ended with non-completed state: {}.", | ||
| currentRequestId, | ||
| state); | ||
| context.getSink().success(Msg.builder().textContent(errorMsg).build()); |
There was a problem hiding this comment.
[minor] context.getSink().success(...) is used to propagate a non-COMPLETED terminal state (e.g. FAILED, CANCELED) as if it were a normal completion. This is semantically misleading — downstream consumers of the Sink cannot distinguish a genuinely successful task from a failed one without inspecting the Msg.textContent.
If the Sink API supports it, prefer sink.error(new RuntimeException(errorMsg)) or a dedicated failure path. If success() is the only option (e.g. the sink type doesn't support error emission), add a code comment explaining why a failure is propagated via success(), so future maintainers are not confused.
| "agentscope.a2a.streaming.backpressure-buffer-size"; | ||
|
|
||
| private static final int DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE = 8192; | ||
|
|
There was a problem hiding this comment.
[minor] Integer.getInteger(prop, defaultValue) returns null (and triggers an NPE during static init) if the system property is set to a non-numeric string (e.g. -Dagentscope.a2a.streaming.backpressure-buffer-size=abc). This would crash the entire class loading and prevent the server from starting.
Consider adding a defensive parse:
private static final int STREAMING_BACKPRESSURE_BUFFER_SIZE = parseBufferSize();
private static int parseBufferSize() {
String raw = System.getProperty(STREAMING_BACKPRESSURE_BUFFER_SIZE_PROPERTY);
if (raw == null) return DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE;
try {
return Integer.parseInt(raw);
} catch (NumberFormatException e) {
LoggerFactory.getLogger(JsonRpcTransportWrapper.class)
.warn("Invalid value for {}: '{}', using default {}",
STREAMING_BACKPRESSURE_BUFFER_SIZE_PROPERTY, raw,
DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE);
return DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE;
}
}
AgentScopeJavaBot
left a comment
There was a problem hiding this comment.
🤖 AI Review
This PR adds backpressure handling to the A2A streaming pipeline and improves client-side handling of non-COMPLETED terminal task states. The server-side changes in JsonRpcTransportWrapper introduce a configurable onBackpressureBuffer with BufferOverflowStrategy.ERROR and proper overflow logging — a solid, defensive approach. The AgentScopeAgentExecutor adds a good defensive catch for IllegalStateException when taskUpdater.fail() is called after doOnError already transitioned the task. The client-side TaskUpdateEventHandler correctly guards against converting artifacts from failed/cancelled tasks. Tests demonstrate the backpressure problem and verify the fix. Two minor issues noted below.
| "[{}] A2aAgent task ended with non-completed state: {}.", | ||
| currentRequestId, | ||
| state); | ||
| context.getSink().success(Msg.builder().textContent(errorMsg).build()); |
There was a problem hiding this comment.
[minor] context.getSink().success(...) is used to propagate a non-COMPLETED terminal state (e.g. FAILED, CANCELED) as if it were a normal completion. This is semantically misleading — downstream consumers of the Sink cannot distinguish a genuinely successful task from a failed one without inspecting the Msg.textContent.
If the Sink API supports it, prefer sink.error(new RuntimeException(errorMsg)) or a dedicated failure path. If success() is the only option (e.g. the sink type doesn't support error emission), add a code comment explaining why a failure is propagated via success(), so future maintainers are not confused.
| "agentscope.a2a.streaming.backpressure-buffer-size"; | ||
|
|
||
| private static final int DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE = 8192; | ||
|
|
There was a problem hiding this comment.
[minor] Integer.getInteger(prop, defaultValue) returns null (and triggers an NPE during static init) if the system property is set to a non-numeric string (e.g. -Dagentscope.a2a.streaming.backpressure-buffer-size=abc). This would crash the entire class loading and prevent the server from starting.
Consider adding a defensive parse:
private static final int STREAMING_BACKPRESSURE_BUFFER_SIZE = parseBufferSize();
private static int parseBufferSize() {
String raw = System.getProperty(STREAMING_BACKPRESSURE_BUFFER_SIZE_PROPERTY);
if (raw == null) return DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE;
try {
return Integer.parseInt(raw);
} catch (NumberFormatException e) {
LoggerFactory.getLogger(JsonRpcTransportWrapper.class)
.warn("Invalid value for {}: '{}', using default {}",
STREAMING_BACKPRESSURE_BUFFER_SIZE_PROPERTY, raw,
DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE);
return DEFAULT_STREAMING_BACKPRESSURE_BUFFER_SIZE;
}
}
AgentScope-Java Version
#1490
Description
Checklist
Please check the following items before code is ready to be reviewed.
mvn spotless:applymvn test)