-
Notifications
You must be signed in to change notification settings - Fork 507
iterative stream messages #213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
src/strands/agent/conversation_manager/sliding_window_conversation_manager.py
Outdated
Show resolved
Hide resolved
| callback_handler(**inputs) | ||
| else: | ||
| stop_reason, message, usage, metrics = event["stop"] | ||
| kwargs.setdefault("request_state", {}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will progressively move the callback_handler invocations upward in the call stack as we convert the event loop to an event generator.
| for event in stream_messages(model, system_prompt, messages, tool_config): | ||
| if "callback" in event: | ||
| inputs = {**event["callback"], **(kwargs if "delta" in event else {})} | ||
| callback_handler(**inputs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this is a bit messy but I am trying to maintain backwards compatibility with these changes. Ultimately, we should formalize and strongly type callback payloads.
Note, this callback invocation was copied out of stream_messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the strongly typed callback payloads already tracked? If not, can you add it to the async iterator stuff (we can always break it out later)
| event: Delta event. | ||
| state: The current state of message processing. | ||
| callback_handler: Callback for processing events as they happen. | ||
| **kwargs: Additional keyword arguments to pass to the callback handler. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are now yielding events from stream_messages, we no longer need to pass around callback_handler and kwargs. We can invoke the callback_handler in the event loop where we iterate over stream_messages.
|
|
||
| state["current_tool_use"]["input"] += delta_content["toolUse"]["input"] | ||
| callback_handler(delta=delta_content, current_tool_use=state["current_tool_use"], **kwargs) | ||
| callback_event["callback"] = {"delta": delta_content, "current_tool_use": state["current_tool_use"]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a temporary format to maintain backwards compatibility. We are planning on formalizing and strongly typing the construction of our agent events for consistency and ease of use.
| elif "contentBlockDelta" in chunk: | ||
| state = handle_content_block_delta(chunk["contentBlockDelta"], state, callback_handler, **kwargs) | ||
| state, callback_event = handle_content_block_delta(chunk["contentBlockDelta"], state) | ||
| yield callback_event |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, keeping for backwards compatibility, but really we should consider getting rid of this extra yield. It is redundant since we have the yield chunk at the start of each loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you clarify and/or elaborate on what the two events are? Are they truly duplciates?
| condition=os.environ.get("GITHUB_ACTIONS") == 'true', | ||
| reason="streamable transport is failing in GitHub actions, debugging if linux compatibility issue" | ||
| condition=os.environ.get("GITHUB_ACTIONS") == "true", | ||
| reason="streamable transport is failing in GitHub actions, debugging if linux compatibility issue", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Auto linted.
| for event in stream_messages(model, system_prompt, messages, tool_config): | ||
| if "callback" in event: | ||
| inputs = {**event["callback"], **(kwargs if "delta" in event else {})} | ||
| callback_handler(**inputs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the strongly typed callback payloads already tracked? If not, can you add it to the async iterator stuff (we can always break it out later)
src/strands/event_loop/event_loop.py
Outdated
| ) | ||
| for event in stream_messages(model, system_prompt, messages, tool_config): | ||
| if "callback" in event: | ||
| inputs = {**event["callback"], **(kwargs if "delta" in event else {})} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a comment indicating what's going on/what the change is.
I believe it's effectively:
In the migration to async-iterator, we converted all events that were previously passed to
callbackinto yielded events that have thecallbackkey, with the properties that we emitted which we now have to unwrap and combine withkwargsfor backwards compatibility
Also, this is clever, so props for that
| inputs = {**event["callback"], **(kwargs if "delta" in event else {})} | ||
| callback_handler(**inputs) | ||
| else: | ||
| stop_reason, message, usage, metrics = event["stop"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So stop right now is not an event that callback gets passed. Given that we don't need backwards compability here, what would you think about turning this into a typed event?
E.g:
class StopStreamEvent(dict):
could also be a follow-up PR
| elif "contentBlockDelta" in chunk: | ||
| state = handle_content_block_delta(chunk["contentBlockDelta"], state, callback_handler, **kwargs) | ||
| state, callback_event = handle_content_block_delta(chunk["contentBlockDelta"], state) | ||
| yield callback_event |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you clarify and/or elaborate on what the two events are? Are they truly duplciates?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I presume that test_event_loop has enough UT coverage verifying the callbacks of events are as expected? At which point we have a lot of confidence that we maintained backwards compatibility.
* build(a2a): add a2a deps and mitigate otel conflict
Description
We are currently working on support for an iterative async stream method on the agent class (#83). As part of this work, we need to yield underlying events of the model stream. This PR thus converts the
stream_messagesfunction to a generator.Related Issues
#83
Type of Change
Testing
hatch fmt --linterhatch fmt --formatterhatch test --allhatch run test-lintChecklist
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.