Currently streaming tokens from agents using the agentstack-sdk is solved by streaming many individual messages with a text part with a single token. This has several disadvantages:
- The a2a client (UI) must be custom built to support this way of streaming and concatenation of tokens
- The a2a task store becomes polluted and overloaded by too many messages stored per conversation (this especially becomes a problem if we want to enable a sql/persistent task store implementation)
- The way to manage streaming vs. conversation in the agent and UI is not straight-forward, there can be a disconnect between what is stored to conversation history and what is streamed. This leads to weird issues like - UI will show different messages when refreshing a conversation.
Proposal
Simplify the SDK message management with automatic storage of messages to the platform.
Current version:
@server.agent(...)
async def my_agent(context: RunContext):
# Explicit history management
await context.store(input)
yield "first" # yields a Message with a single text part
yield "second" # yields another Message with a single text part
# Manual concatenation
final_message = AgentMessage(parts=[TextPart("firstsecond")])
await context.store(final_message)
Proposed version (pseudocode):
@server.agent(...)
async def my_agent(context: RunContext):
async with context.message() as message:
yield TextPart() # entire text part (not streamed)
yield FilePart() # file
async with message.text_part() as text_part:
yield text_part.chunk("token") # This is streamed to UI
# when context manager closes, text part is attached to message
yield FilePart() # another file
# when context manager closes, message is added to conversation history
# no need to explicitly aggregate
Tokens streamed will be sent through the metadata on TaskStatusUpdate without creating an explicit Message object and only if the UI subscribes to these updates (agentstack streaming extension is activated)
Similar APIs will be available for non-generator agent functions:
async def my_agent(context: RunContext):
await context.start_message()
await context.yield_async(TextPart(...))
await context.end_message()
Or both approaches can be combined - context managers with yields either natively or through context functions.
There will be certain constrains, for example:
- inside message block only message part yields are allowed
- inside text part block only chunk yields are allowed
Currently streaming tokens from agents using the agentstack-sdk is solved by streaming many individual messages with a text part with a single token. This has several disadvantages:
Proposal
Simplify the SDK message management with automatic storage of messages to the platform.
Current version:
Proposed version (pseudocode):
Tokens streamed will be sent through the metadata on
TaskStatusUpdatewithout creating an explicitMessageobject and only if the UI subscribes to these updates (agentstack streaming extension is activated)Similar APIs will be available for non-generator agent functions:
Or both approaches can be combined - context managers with yields either natively or through
contextfunctions.There will be certain constrains, for example: