Skip to content

Commit a2ff365

Browse files
Merge branch 'main' into caching
2 parents 6d2ac4b + bed1b68 commit a2ff365

29 files changed

+3064
-272
lines changed

src/strands/agent/agent.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -754,9 +754,9 @@ async def _run_loop(
754754
and event.chunk.get("redactContent")
755755
and event.chunk["redactContent"].get("redactUserContentMessage")
756756
):
757-
self.messages[-1]["content"] = [
758-
{"text": str(event.chunk["redactContent"]["redactUserContentMessage"])}
759-
]
757+
self.messages[-1]["content"] = self._redact_user_content(
758+
self.messages[-1]["content"], str(event.chunk["redactContent"]["redactUserContentMessage"])
759+
)
760760
if self._session_manager:
761761
self._session_manager.redact_latest_message(self.messages[-1], self)
762762
yield event
@@ -995,3 +995,29 @@ def _append_message(self, message: Message) -> None:
995995
"""Appends a message to the agent's list of messages and invokes the callbacks for the MessageCreatedEvent."""
996996
self.messages.append(message)
997997
self.hooks.invoke_callbacks(MessageAddedEvent(agent=self, message=message))
998+
999+
def _redact_user_content(self, content: list[ContentBlock], redact_message: str) -> list[ContentBlock]:
1000+
"""Redact user content preserving toolResult blocks.
1001+
1002+
Args:
1003+
content: content blocks to be redacted
1004+
redact_message: redact message to be replaced
1005+
1006+
Returns:
1007+
Redacted content, as follows:
1008+
- if the message contains at least a toolResult block,
1009+
all toolResult blocks(s) are kept, redacting only the result content;
1010+
- otherwise, the entire content of the message is replaced
1011+
with a single text block with the redact message.
1012+
"""
1013+
redacted_content = []
1014+
for block in content:
1015+
if "toolResult" in block:
1016+
block["toolResult"]["content"] = [{"text": redact_message}]
1017+
redacted_content.append(block)
1018+
1019+
if not redacted_content:
1020+
# Text content is added only if no toolResult blocks were found
1021+
redacted_content = [{"text": redact_message}]
1022+
1023+
return redacted_content

src/strands/models/bedrock.py

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import logging
99
import os
1010
import warnings
11-
from typing import Any, AsyncGenerator, Callable, Iterable, Literal, Optional, Type, TypeVar, Union, cast
11+
from typing import Any, AsyncGenerator, Callable, Iterable, Literal, Optional, Type, TypeVar, Union, ValuesView, cast
1212

1313
import boto3
1414
from botocore.config import Config as BotocoreConfig
@@ -892,18 +892,12 @@ def _find_detected_and_blocked_policy(self, input: Any) -> bool:
892892
if input.get("action") == "BLOCKED" and input.get("detected") and isinstance(input.get("detected"), bool):
893893
return True
894894

895-
# Recursively check all values in the dictionary
896-
for value in input.values():
897-
if isinstance(value, dict):
898-
return self._find_detected_and_blocked_policy(value)
899-
# Handle case where value is a list of dictionaries
900-
elif isinstance(value, list):
901-
for item in value:
902-
return self._find_detected_and_blocked_policy(item)
903-
elif isinstance(input, list):
904-
# Handle case where input is a list of dictionaries
905-
for item in input:
906-
return self._find_detected_and_blocked_policy(item)
895+
# Otherwise, recursively check all values in the dictionary
896+
return self._find_detected_and_blocked_policy(input.values())
897+
898+
elif isinstance(input, (list, ValuesView)):
899+
# Handle case where input is a list or dict_values
900+
return any(self._find_detected_and_blocked_policy(item) for item in input)
907901
# Otherwise return False
908902
return False
909903

src/strands/models/openai.py

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -214,10 +214,16 @@ def format_request_messages(cls, messages: Messages, system_prompt: Optional[str
214214
for message in messages:
215215
contents = message["content"]
216216

217+
# Check for reasoningContent and warn user
218+
if any("reasoningContent" in content for content in contents):
219+
logger.warning(
220+
"reasoningContent is not supported in multi-turn conversations with the Chat Completions API."
221+
)
222+
217223
formatted_contents = [
218224
cls.format_request_message_content(content)
219225
for content in contents
220-
if not any(block_type in content for block_type in ["toolResult", "toolUse"])
226+
if not any(block_type in content for block_type in ["toolResult", "toolUse", "reasoningContent"])
221227
]
222228
formatted_tool_calls = [
223229
cls.format_request_message_tool_call(content["toolUse"]) for content in contents if "toolUse" in content
@@ -405,38 +411,46 @@ async def stream(
405411

406412
logger.debug("got response from model")
407413
yield self.format_chunk({"chunk_type": "message_start"})
408-
yield self.format_chunk({"chunk_type": "content_start", "data_type": "text"})
409-
410414
tool_calls: dict[int, list[Any]] = {}
415+
data_type = None
416+
finish_reason = None # Store finish_reason for later use
417+
event = None # Initialize for scope safety
411418

412419
async for event in response:
413420
# Defensive: skip events with empty or missing choices
414421
if not getattr(event, "choices", None):
415422
continue
416423
choice = event.choices[0]
417424

418-
if choice.delta.content:
419-
yield self.format_chunk(
420-
{"chunk_type": "content_delta", "data_type": "text", "data": choice.delta.content}
421-
)
422-
423425
if hasattr(choice.delta, "reasoning_content") and choice.delta.reasoning_content:
426+
chunks, data_type = self._stream_switch_content("reasoning_content", data_type)
427+
for chunk in chunks:
428+
yield chunk
424429
yield self.format_chunk(
425430
{
426431
"chunk_type": "content_delta",
427-
"data_type": "reasoning_content",
432+
"data_type": data_type,
428433
"data": choice.delta.reasoning_content,
429434
}
430435
)
431436

437+
if choice.delta.content:
438+
chunks, data_type = self._stream_switch_content("text", data_type)
439+
for chunk in chunks:
440+
yield chunk
441+
yield self.format_chunk(
442+
{"chunk_type": "content_delta", "data_type": data_type, "data": choice.delta.content}
443+
)
444+
432445
for tool_call in choice.delta.tool_calls or []:
433446
tool_calls.setdefault(tool_call.index, []).append(tool_call)
434447

435448
if choice.finish_reason:
449+
finish_reason = choice.finish_reason # Store for use outside loop
450+
if data_type:
451+
yield self.format_chunk({"chunk_type": "content_stop", "data_type": data_type})
436452
break
437453

438-
yield self.format_chunk({"chunk_type": "content_stop", "data_type": "text"})
439-
440454
for tool_deltas in tool_calls.values():
441455
yield self.format_chunk({"chunk_type": "content_start", "data_type": "tool", "data": tool_deltas[0]})
442456

@@ -445,17 +459,37 @@ async def stream(
445459

446460
yield self.format_chunk({"chunk_type": "content_stop", "data_type": "tool"})
447461

448-
yield self.format_chunk({"chunk_type": "message_stop", "data": choice.finish_reason})
462+
yield self.format_chunk({"chunk_type": "message_stop", "data": finish_reason or "end_turn"})
449463

450464
# Skip remaining events as we don't have use for anything except the final usage payload
451465
async for event in response:
452466
_ = event
453467

454-
if event.usage:
468+
if event and hasattr(event, "usage") and event.usage:
455469
yield self.format_chunk({"chunk_type": "metadata", "data": event.usage})
456470

457471
logger.debug("finished streaming response from model")
458472

473+
def _stream_switch_content(self, data_type: str, prev_data_type: str | None) -> tuple[list[StreamEvent], str]:
474+
"""Handle switching to a new content stream.
475+
476+
Args:
477+
data_type: The next content data type.
478+
prev_data_type: The previous content data type.
479+
480+
Returns:
481+
Tuple containing:
482+
- Stop block for previous content and the start block for the next content.
483+
- Next content data type.
484+
"""
485+
chunks = []
486+
if data_type != prev_data_type:
487+
if prev_data_type is not None:
488+
chunks.append(self.format_chunk({"chunk_type": "content_stop", "data_type": prev_data_type}))
489+
chunks.append(self.format_chunk({"chunk_type": "content_start", "data_type": data_type}))
490+
491+
return chunks, data_type
492+
459493
@override
460494
async def structured_output(
461495
self, output_model: Type[T], prompt: Messages, system_prompt: Optional[str] = None, **kwargs: Any

src/strands/multiagent/base.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from abc import ABC, abstractmethod
99
from dataclasses import dataclass, field
1010
from enum import Enum
11-
from typing import Any, Union
11+
from typing import Any, AsyncIterator, Union
1212

1313
from .._async import run_async
1414
from ..agent import AgentResult
@@ -137,7 +137,7 @@ def from_dict(cls, data: dict[str, Any]) -> "MultiAgentResult":
137137
metrics = _parse_metrics(data.get("accumulated_metrics", {}))
138138

139139
multiagent_result = cls(
140-
status=Status(data.get("status", Status.PENDING.value)),
140+
status=Status(data["status"]),
141141
results=results,
142142
accumulated_usage=usage,
143143
accumulated_metrics=metrics,
@@ -164,8 +164,13 @@ class MultiAgentBase(ABC):
164164
165165
This class integrates with existing Strands Agent instances and provides
166166
multi-agent orchestration capabilities.
167+
168+
Attributes:
169+
id: Unique MultiAgent id for session management,etc.
167170
"""
168171

172+
id: str
173+
169174
@abstractmethod
170175
async def invoke_async(
171176
self, task: str | list[ContentBlock], invocation_state: dict[str, Any] | None = None, **kwargs: Any
@@ -180,6 +185,31 @@ async def invoke_async(
180185
"""
181186
raise NotImplementedError("invoke_async not implemented")
182187

188+
async def stream_async(
189+
self, task: str | list[ContentBlock], invocation_state: dict[str, Any] | None = None, **kwargs: Any
190+
) -> AsyncIterator[dict[str, Any]]:
191+
"""Stream events during multi-agent execution.
192+
193+
Default implementation executes invoke_async and yields the result as a single event.
194+
Subclasses can override this method to provide true streaming capabilities.
195+
196+
Args:
197+
task: The task to execute
198+
invocation_state: Additional state/context passed to underlying agents.
199+
Defaults to None to avoid mutable default argument issues.
200+
**kwargs: Additional keyword arguments passed to underlying agents.
201+
202+
Yields:
203+
Dictionary events containing multi-agent execution information including:
204+
- Multi-agent coordination events (node start/complete, handoffs)
205+
- Forwarded single-agent events with node context
206+
- Final result event
207+
"""
208+
# Default implementation for backward compatibility
209+
# Execute invoke_async and yield the result as a single event
210+
result = await self.invoke_async(task, invocation_state, **kwargs)
211+
yield {"result": result}
212+
183213
def __call__(
184214
self, task: str | list[ContentBlock], invocation_state: dict[str, Any] | None = None, **kwargs: Any
185215
) -> MultiAgentResult:

0 commit comments

Comments
 (0)