Skip to content

[Bug]: Context canceled error when sending multiple streaming messages with delays #124

@obutora

Description

@obutora

What happened?


Context canceled error when sending multiple streaming messages with delays

Summary

When implementing an agent that sends multiple messages over time using q.Write() with delays (e.g., time.Sleep()), the context gets canceled after only a few messages, resulting in "context canceled" errors.

Environment

  • Library version: v0.3.2
  • Transport: gRPC
  • OS: macOS

Expected Behavior

An agent's Execute() function should be able to send multiple messages over time:

func (*agentExecutor) Execute(ctx context.Context, reqCtx *a2asrv.RequestContext, q eventqueue.Queue) error {
    for i := 0; i < 10; i++ {
        q.Write(ctx, a2a.NewMessage(a2a.MessageRoleAgent, a2a.TextPart{Text: fmt.Sprintf("message %d", i)}))
        time.Sleep(time.Second * 1)
    }
    return nil
}

All 10 messages should be successfully delivered to the client.

Actual Behavior

The context gets canceled after sending only 1-2 messages:

2025/11/28 16:11:47 Sent message 0
2025/11/28 16:11:48 Sent message 1
2025/11/28 16:11:49 Failed to write a message: context canceled

Root Cause Analysis

The issue appears to be in internal/taskexec/execution.go:

func (e *Execution) processEvents(ctx context.Context, queue eventqueue.Queue) (a2a.SendMessageResult, error) {
    // ...
    queueReadCtx, cancelCtx := context.WithCancel(ctx)
    defer cancelCtx()  // ← This cancels the queue reading context
    go readQueueToChannels(queueReadCtx, queue, eventChan, errorChan)
    // ...
}

Problem flow:

  1. Execute() returns after writing messages to queue
  2. processEvents() exits
  3. defer cancelCtx() is called
  4. readQueueToChannels() goroutine's context is canceled
  5. Messages still in the queue are never sent

Investigation

  1. All official examples (examples/helloworld/server/*) only send a single message
  2. There are no examples demonstrating multiple messages being sent with time delays
  3. The library appears to be designed for immediate message batching rather than time-delayed streaming

Questions

  1. Is sending multiple messages with time delays a supported use case?
  2. If so, what is the recommended pattern for implementing this?
  3. Should processEvents() wait for Execute() to complete and then drain the queue before canceling the context?
  4. Is there a better approach to implementing streaming functionality that I might be missing? If there's a more idiomatic or recommended pattern for this use case, I'd appreciate any guidance.

Current Workaround

The only working pattern is to write all messages immediately without delays:

func (*agentExecutor) Execute(ctx context.Context, reqCtx *a2asrv.RequestContext, q eventqueue.Queue) error {
    for i := 0; i < 10; i++ {
        q.Write(ctx, a2a.NewMessage(...))
    }
    return nil
}

However, this doesn't support time-delayed streaming scenarios.

Relevant log output

2025/11/28 16:11:47 Sent message 0
2025/11/28 16:11:48 Sent message 1
2025/11/28 16:11:49 Failed to write a message: context canceled

Code of Conduct

  • I agree to follow this project's Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions