-
Notifications
You must be signed in to change notification settings - Fork 41
Open
Description
What happened?
Context canceled error when sending multiple streaming messages with delays
Summary
When implementing an agent that sends multiple messages over time using q.Write() with delays (e.g., time.Sleep()), the context gets canceled after only a few messages, resulting in "context canceled" errors.
Environment
- Library version: v0.3.2
- Transport: gRPC
- OS: macOS
Expected Behavior
An agent's Execute() function should be able to send multiple messages over time:
func (*agentExecutor) Execute(ctx context.Context, reqCtx *a2asrv.RequestContext, q eventqueue.Queue) error {
for i := 0; i < 10; i++ {
q.Write(ctx, a2a.NewMessage(a2a.MessageRoleAgent, a2a.TextPart{Text: fmt.Sprintf("message %d", i)}))
time.Sleep(time.Second * 1)
}
return nil
}All 10 messages should be successfully delivered to the client.
Actual Behavior
The context gets canceled after sending only 1-2 messages:
2025/11/28 16:11:47 Sent message 0
2025/11/28 16:11:48 Sent message 1
2025/11/28 16:11:49 Failed to write a message: context canceled
Root Cause Analysis
The issue appears to be in internal/taskexec/execution.go:
func (e *Execution) processEvents(ctx context.Context, queue eventqueue.Queue) (a2a.SendMessageResult, error) {
// ...
queueReadCtx, cancelCtx := context.WithCancel(ctx)
defer cancelCtx() // ← This cancels the queue reading context
go readQueueToChannels(queueReadCtx, queue, eventChan, errorChan)
// ...
}Problem flow:
Execute()returns after writing messages to queueprocessEvents()exitsdefer cancelCtx()is calledreadQueueToChannels()goroutine's context is canceled- Messages still in the queue are never sent
Investigation
- All official examples (
examples/helloworld/server/*) only send a single message - There are no examples demonstrating multiple messages being sent with time delays
- The library appears to be designed for immediate message batching rather than time-delayed streaming
Questions
- Is sending multiple messages with time delays a supported use case?
- If so, what is the recommended pattern for implementing this?
- Should
processEvents()wait forExecute()to complete and then drain the queue before canceling the context? - Is there a better approach to implementing streaming functionality that I might be missing? If there's a more idiomatic or recommended pattern for this use case, I'd appreciate any guidance.
Current Workaround
The only working pattern is to write all messages immediately without delays:
func (*agentExecutor) Execute(ctx context.Context, reqCtx *a2asrv.RequestContext, q eventqueue.Queue) error {
for i := 0; i < 10; i++ {
q.Write(ctx, a2a.NewMessage(...))
}
return nil
}However, this doesn't support time-delayed streaming scenarios.
Relevant log output
2025/11/28 16:11:47 Sent message 0
2025/11/28 16:11:48 Sent message 1
2025/11/28 16:11:49 Failed to write a message: context canceledCode of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
No labels