Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions src/strands/event_loop/event_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from opentelemetry import trace as trace_api

from ..hooks import AfterModelCallEvent, BeforeModelCallEvent, MessageAddedEvent
from ..hooks import AfterModelCallEvent, BeforeModelCallEvent, BeforeStreamChunkEvent, MessageAddedEvent
from ..telemetry.metrics import Trace
from ..telemetry.tracer import Tracer, get_tracer
from ..tools._validator import validate_and_prepare_tools
Expand All @@ -39,7 +39,7 @@
MaxTokensReachedException,
StructuredOutputException,
)
from ..types.streaming import StopReason
from ..types.streaming import StopReason, StreamEvent
from ..types.tools import ToolResult, ToolUse
from ._recover_message_on_max_tokens_reached import recover_message_on_max_tokens_reached
from ._retry import ModelRetryStrategy
Expand Down Expand Up @@ -327,6 +327,18 @@ async def _handle_model_execution(
tool_specs = [tool_spec] if tool_spec else []
else:
tool_specs = agent.tool_registry.get_all_tool_specs()

# Create chunk interceptor that invokes BeforeStreamChunkEvent hook
async def chunk_interceptor(chunk: StreamEvent) -> tuple[StreamEvent, bool]:
"""Intercept chunks and invoke BeforeStreamChunkEvent hook."""
stream_chunk_event = BeforeStreamChunkEvent(
agent=agent,
chunk=chunk,
invocation_state=invocation_state,
)
await agent.hooks.invoke_callbacks_async(stream_chunk_event)
return stream_chunk_event.chunk, stream_chunk_event.skip

try:
async for event in stream_messages(
agent.model,
Expand All @@ -336,6 +348,7 @@ async def _handle_model_execution(
system_prompt_content=agent._system_prompt_content,
tool_choice=structured_output_context.tool_choice,
invocation_state=invocation_state,
chunk_interceptor=chunk_interceptor,
):
yield event

Expand Down
25 changes: 22 additions & 3 deletions src/strands/event_loop/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import time
import warnings
from collections.abc import AsyncGenerator, AsyncIterable
from collections.abc import AsyncGenerator, AsyncIterable, Awaitable, Callable
from typing import Any

from ..models.model import Model
Expand Down Expand Up @@ -41,6 +41,10 @@

logger = logging.getLogger(__name__)

# Type for chunk interceptor callback
# Takes a chunk and returns (modified_chunk, skip) where skip=True means don't process this chunk
ChunkInterceptor = Callable[[StreamEvent], Awaitable[tuple[StreamEvent, bool]]]


def _normalize_messages(messages: Messages) -> Messages:
"""Remove or replace blank text in message content.
Expand Down Expand Up @@ -368,13 +372,18 @@ def extract_usage_metrics(event: MetadataEvent, time_to_first_byte_ms: int | Non


async def process_stream(
chunks: AsyncIterable[StreamEvent], start_time: float | None = None
chunks: AsyncIterable[StreamEvent],
start_time: float | None = None,
chunk_interceptor: ChunkInterceptor | None = None,
) -> AsyncGenerator[TypedEvent, None]:
"""Processes the response stream from the API, constructing the final message and extracting usage metrics.

Args:
chunks: The chunks of the response stream from the model.
start_time: Time when the model request is initiated
chunk_interceptor: Optional callback to intercept and modify chunks before processing.
The callback receives a chunk and returns (modified_chunk, skip). If skip is True,
the chunk is not processed or yielded.

Yields:
The reason for stopping, the constructed message, and the usage metrics.
Expand All @@ -395,6 +404,12 @@ async def process_stream(
metrics: Metrics = Metrics(latencyMs=0, timeToFirstByteMs=0)

async for chunk in chunks:
# Invoke chunk interceptor BEFORE processing if provided
if chunk_interceptor is not None:
chunk, skip = await chunk_interceptor(chunk)
if skip:
continue

# Track first byte time when we get first content
if first_byte_time is None and ("contentBlockDelta" in chunk or "contentBlockStart" in chunk):
first_byte_time = time.time()
Expand Down Expand Up @@ -431,6 +446,7 @@ async def stream_messages(
tool_choice: Any | None = None,
system_prompt_content: list[SystemContentBlock] | None = None,
invocation_state: dict[str, Any] | None = None,
chunk_interceptor: ChunkInterceptor | None = None,
**kwargs: Any,
) -> AsyncGenerator[TypedEvent, None]:
"""Streams messages to the model and processes the response.
Expand All @@ -444,6 +460,9 @@ async def stream_messages(
system_prompt_content: The authoritative system prompt content blocks that always contains the
system prompt data.
invocation_state: Caller-provided state/context that was passed to the agent when it was invoked.
chunk_interceptor: Optional callback to intercept and modify chunks before processing.
The callback receives a chunk and returns (modified_chunk, skip). If skip is True,
the chunk is not processed or yielded.
**kwargs: Additional keyword arguments for future extensibility.

Yields:
Expand All @@ -463,5 +482,5 @@ async def stream_messages(
invocation_state=invocation_state,
)

async for event in process_stream(chunks, start_time):
async for event in process_stream(chunks, start_time, chunk_interceptor):
yield event
2 changes: 2 additions & 0 deletions src/strands/hooks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def log_end(self, event: AfterInvocationEvent) -> None:
BeforeModelCallEvent,
BeforeMultiAgentInvocationEvent,
BeforeNodeCallEvent,
BeforeStreamChunkEvent,
BeforeToolCallEvent,
MessageAddedEvent,
MultiAgentInitializedEvent,
Expand All @@ -50,6 +51,7 @@ def log_end(self, event: AfterInvocationEvent) -> None:
__all__ = [
"AgentInitializedEvent",
"BeforeInvocationEvent",
"BeforeStreamChunkEvent",
"BeforeToolCallEvent",
"AfterToolCallEvent",
"BeforeModelCallEvent",
Expand Down
44 changes: 43 additions & 1 deletion src/strands/hooks/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from ..types.content import Message, Messages
from ..types.interrupt import _Interruptible
from ..types.streaming import StopReason
from ..types.streaming import StopReason, StreamEvent
from ..types.tools import AgentTool, ToolResult, ToolUse
from .registry import BaseHookEvent, HookEvent

Expand Down Expand Up @@ -288,6 +288,48 @@ def should_reverse_callbacks(self) -> bool:
return True


@dataclass
class BeforeStreamChunkEvent(HookEvent):
"""Event triggered before each stream chunk is processed.

This event is fired for each chunk received from the model BEFORE the chunk
is processed for message building or yielded as stream events. Hook providers
can use this event to:

- Monitor streaming progress in real-time
- Modify chunk content before processing (affects final message and all events)
- Filter/skip chunks entirely by setting skip=True
- Implement content transformation (e.g., redaction, translation)

When skip=True:
- The chunk is not processed at all
- No events (ModelStreamChunkEvent, TextStreamEvent, etc.) are yielded
- The chunk does not contribute to the final message

When chunk is modified:
- The modified chunk is used for all downstream processing
- TextStreamEvent will contain the modified text
- The final message will contain the modified content

Performance Note:
This event fires for every stream chunk, so callbacks should execute
quickly to avoid impacting streaming latency.

Attributes:
chunk: The raw stream event from the model. Can be modified by hooks
to transform content before processing.
skip: When True, the chunk is skipped entirely (not processed or yielded).
invocation_state: State passed through agent invocation.
"""

chunk: StreamEvent
invocation_state: dict[str, Any] = field(default_factory=dict)
skip: bool = False

def _can_write(self, name: str) -> bool:
return name in ["chunk", "skip"]


# Multiagent hook events start here
@dataclass
class MultiAgentInitializedEvent(BaseHookEvent):
Expand Down
56 changes: 56 additions & 0 deletions tests/strands/agent/hooks/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
AgentInitializedEvent,
BeforeInvocationEvent,
BeforeModelCallEvent,
BeforeStreamChunkEvent,
BeforeToolCallEvent,
MessageAddedEvent,
)
Expand Down Expand Up @@ -230,3 +231,58 @@ def test_before_invocation_event_agent_not_writable(start_request_event_with_mes
"""Test that BeforeInvocationEvent.agent is not writable."""
with pytest.raises(AttributeError, match="Property agent is not writable"):
start_request_event_with_messages.agent = Mock()


@pytest.fixture
def before_stream_chunk_event(agent):
chunk = {"contentBlockDelta": {"delta": {"text": "Hello"}}}
return BeforeStreamChunkEvent(
agent=agent,
chunk=chunk,
invocation_state={"test": "state"},
)


def test_before_stream_chunk_event_should_not_reverse_callbacks(before_stream_chunk_event):
"""Test that BeforeStreamChunkEvent does not reverse callbacks."""
assert before_stream_chunk_event.should_reverse_callbacks is False


def test_before_stream_chunk_event_can_write_chunk(before_stream_chunk_event):
"""Test that BeforeStreamChunkEvent.chunk is writable."""
new_chunk = {"contentBlockDelta": {"delta": {"text": "Modified"}}}
before_stream_chunk_event.chunk = new_chunk
assert before_stream_chunk_event.chunk == new_chunk


def test_before_stream_chunk_event_can_write_skip(before_stream_chunk_event):
"""Test that BeforeStreamChunkEvent.skip is writable."""
assert before_stream_chunk_event.skip is False
before_stream_chunk_event.skip = True
assert before_stream_chunk_event.skip is True


def test_before_stream_chunk_event_cannot_write_agent(before_stream_chunk_event):
"""Test that BeforeStreamChunkEvent.agent is not writable."""
with pytest.raises(AttributeError, match="Property agent is not writable"):
before_stream_chunk_event.agent = Mock()


def test_before_stream_chunk_event_cannot_write_invocation_state(before_stream_chunk_event):
"""Test that BeforeStreamChunkEvent.invocation_state is not writable."""
with pytest.raises(AttributeError, match="Property invocation_state is not writable"):
before_stream_chunk_event.invocation_state = {}


def test_before_stream_chunk_event_skip_defaults_to_false(agent):
"""Test that BeforeStreamChunkEvent.skip defaults to False."""
chunk = {"contentBlockDelta": {"delta": {"text": "Hello"}}}
event = BeforeStreamChunkEvent(agent=agent, chunk=chunk)
assert event.skip is False


def test_before_stream_chunk_event_invocation_state_defaults_to_empty(agent):
"""Test that BeforeStreamChunkEvent.invocation_state defaults to empty dict."""
chunk = {"contentBlockDelta": {"delta": {"text": "Hello"}}}
event = BeforeStreamChunkEvent(agent=agent, chunk=chunk)
assert event.invocation_state == {}
Loading