Skip to content

Simplify agent message and streaming management #2065

@jezekra1

Description

@jezekra1

Currently streaming tokens from agents using the agentstack-sdk is solved by streaming many individual messages with a text part with a single token. This has several disadvantages:

  • The a2a client (UI) must be custom built to support this way of streaming and concatenation of tokens
  • The a2a task store becomes polluted and overloaded by too many messages stored per conversation (this especially becomes a problem if we want to enable a sql/persistent task store implementation)
  • The way to manage streaming vs. conversation in the agent and UI is not straight-forward, there can be a disconnect between what is stored to conversation history and what is streamed. This leads to weird issues like - UI will show different messages when refreshing a conversation.

Proposal
Simplify the SDK message management with automatic storage of messages to the platform.

Current version:

@server.agent(...)
async def my_agent(context: RunContext):
    # Explicit history management
    await context.store(input)
    yield "first" # yields a Message with a single text part
    yield "second" # yields another Message with a single text part
        
    # Manual concatenation
    final_message = AgentMessage(parts=[TextPart("firstsecond")])
         
    await context.store(final_message)

Proposed version (pseudocode):

@server.agent(...)
async def my_agent(context: RunContext):
    async with context.message() as message:
        yield TextPart() # entire text part (not streamed)
        yield FilePart() # file
        async with message.text_part() as text_part:
            yield text_part.chunk("token") # This is streamed to UI
        # when context manager closes, text part is attached to message
        yield FilePart() # another file
     
     # when context manager closes, message is added to conversation history
     # no need to explicitly aggregate

Tokens streamed will be sent through the metadata on TaskStatusUpdate without creating an explicit Message object and only if the UI subscribes to these updates (agentstack streaming extension is activated)

Similar APIs will be available for non-generator agent functions:

async def my_agent(context: RunContext):
    await context.start_message()
    await context.yield_async(TextPart(...))
    await context.end_message()

Or both approaches can be combined - context managers with yields either natively or through context functions.

There will be certain constrains, for example:

  • inside message block only message part yields are allowed
  • inside text part block only chunk yields are allowed

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

Status

Dev backlog

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions