diff --git a/src/strands/models/bedrock.py b/src/strands/models/bedrock.py index db1878108..a24a953e3 100644 --- a/src/strands/models/bedrock.py +++ b/src/strands/models/bedrock.py @@ -743,12 +743,10 @@ async def stream( """ def callback(event: StreamEvent | None = None) -> None: - loop.call_soon_threadsafe(queue.put_nowait, event) - if event is None: - return + asyncio.run_coroutine_threadsafe(queue.put(event), loop).result() loop = asyncio.get_event_loop() - queue: asyncio.Queue[StreamEvent | None] = asyncio.Queue() + queue: asyncio.Queue[StreamEvent | None] = asyncio.Queue(maxsize=1) # Handle backward compatibility: if system_prompt is provided but system_prompt_content is None if system_prompt and system_prompt_content is None: diff --git a/tests/strands/models/test_bedrock.py b/tests/strands/models/test_bedrock.py index 228d6c138..93b78e156 100644 --- a/tests/strands/models/test_bedrock.py +++ b/tests/strands/models/test_bedrock.py @@ -1,3 +1,4 @@ +import asyncio import copy import logging import os @@ -660,6 +661,38 @@ async def test_stream(bedrock_client, model, messages, tool_spec, model_id, addi bedrock_client.converse_stream.assert_called_once_with(**request) +@pytest.mark.asyncio +async def test_stream_delivers_chunks_individually(bedrock_client, model, messages): + """Test that stream delivers chunks one at a time, not in batches. + + Regression test for https://github.com/strands-agents/sdk-python/issues/1523. + When the Bedrock stream produces chunks in a burst (no delay between them), + the consumer should still receive them individually rather than all at once. + """ + chunks = [f"chunk_{i}" for i in range(10)] + bedrock_client.converse_stream.return_value = {"stream": chunks} + + queue_depths = [] + original_get = asyncio.Queue.get + + async def tracking_get(self): + result = await original_get(self) + queue_depths.append(self.qsize()) + return result + + with unittest.mock.patch.object(asyncio.Queue, "get", tracking_get): + received = [] + async for event in model.stream(messages): + received.append(event) + + assert received == chunks + # After each get(), the queue should be empty (depth 0) — chunks delivered one at a time + assert all(d == 0 for d in queue_depths), ( + f"Chunks were batched! Queue depths after each get(): {queue_depths}. " + f"Expected all zeros for individual delivery." + ) + + @pytest.mark.asyncio async def test_stream_with_system_prompt_content(bedrock_client, model, messages, alist): """Test stream method with system_prompt_content parameter."""