Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/strands/models/bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,12 +743,10 @@ async def stream(
"""

def callback(event: StreamEvent | None = None) -> None:
loop.call_soon_threadsafe(queue.put_nowait, event)
if event is None:
return
asyncio.run_coroutine_threadsafe(queue.put(event), loop).result()

loop = asyncio.get_event_loop()
queue: asyncio.Queue[StreamEvent | None] = asyncio.Queue()
queue: asyncio.Queue[StreamEvent | None] = asyncio.Queue(maxsize=1)

# Handle backward compatibility: if system_prompt is provided but system_prompt_content is None
if system_prompt and system_prompt_content is None:
Expand Down
33 changes: 33 additions & 0 deletions tests/strands/models/test_bedrock.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import copy
import logging
import os
Expand Down Expand Up @@ -660,6 +661,38 @@ async def test_stream(bedrock_client, model, messages, tool_spec, model_id, addi
bedrock_client.converse_stream.assert_called_once_with(**request)


@pytest.mark.asyncio
async def test_stream_delivers_chunks_individually(bedrock_client, model, messages):
"""Test that stream delivers chunks one at a time, not in batches.

Regression test for https://github.com/strands-agents/sdk-python/issues/1523.
When the Bedrock stream produces chunks in a burst (no delay between them),
the consumer should still receive them individually rather than all at once.
"""
chunks = [f"chunk_{i}" for i in range(10)]
bedrock_client.converse_stream.return_value = {"stream": chunks}

queue_depths = []
original_get = asyncio.Queue.get

async def tracking_get(self):
result = await original_get(self)
queue_depths.append(self.qsize())
return result

with unittest.mock.patch.object(asyncio.Queue, "get", tracking_get):
received = []
async for event in model.stream(messages):
received.append(event)

assert received == chunks
# After each get(), the queue should be empty (depth 0) — chunks delivered one at a time
assert all(d == 0 for d in queue_depths), (
f"Chunks were batched! Queue depths after each get(): {queue_depths}. "
f"Expected all zeros for individual delivery."
)


@pytest.mark.asyncio
async def test_stream_with_system_prompt_content(bedrock_client, model, messages, alist):
"""Test stream method with system_prompt_content parameter."""
Expand Down