Skip to content

Conversation

@sarojrout
Copy link
Contributor

Link to Issue or Description of Change

1. Link to an existing issue (if applicable):

2. Or, if no issue exists, describe the change:

If applicable, please follow the issue templates to provide as much detail as
possible.

Problem:
Currently, streaming tools (async generator functions) only work in run_live() mode. When using run_async() or SSE endpoints, tools must complete execution before results are shown to users or agents. This causes issues for:

  1. Long-Running Operations: Tools that take time (e.g., data processing, API calls) block the entire agent response
  2. Real-Time Monitoring: Can't show progress for monitoring tools (stock prices, system metrics)
  3. User Experience: Users wait with no feedback during tool execution
  4. Interactive Tools: Tools that need to provide intermediate results can't do so

Example Problem:

@tool
async def process_large_dataset(file_path: str) -> dict:
    # Takes 30 seconds to process
    # Agent and user wait with no feedback
    result = await expensive_processing(file_path)
    return result

Solution:
Extend the streaming tools infrastructure to support progressive result delivery in non-live mode (run_async/SSE), similar to how it works in live mode but adapted for non-live execution.

Key Changes:

  • Added _is_streaming_tool() helper to detect async generator functions
  • Added handle_function_calls_async_with_streaming() to handle streaming tools separately from regular tools
  • Added _execute_streaming_tool_async() to execute streaming tools and yield Events for each intermediate result
  • Integrated streaming tool detection in _postprocess_handle_function_calls_async() to automatically route to streaming handler when needed
  • Added cancellation support via task tracking in active_streaming_tools

How It Works:

  1. When function calls are detected, the system checks if any tools are streaming tools (async generators)
  2. If streaming tools are found, they're handled separately from regular tools
  3. Each yield from the streaming tool generates a separate Event
  4. Events are yielded progressively as the tool executes
  5. Agent can react to intermediate results in the same invocation
  6. Tasks are tracked for cancellation support
    Example:
async def monitor_stock_price(symbol: str) -> AsyncGenerator[dict, None]:
    """Monitor stock price with real-time updates."""
    while True:
        price = await get_current_price(symbol)
        yield {
            "symbol": symbol,
            "price": price,
            "timestamp": time.time(),
            "status": "streaming"
        }
        await asyncio.sleep(1)

In run_async() mode, this now yields multiple Events as prices update, allowing the agent and user to see real-time updates.

Testing Plan

Please describe the tests that you ran to verify your changes. This is required
for all PRs that are not small documentation or typo fixes.

Unit Tests:

  • I have added or updated unit tests for my change.
  • All unit tests pass locally.

Please include a summary of passed pytest results.

Created comprehensive unit tests in tests/unittests/flows/llm_flows/test_streaming_tools_non_live.py:

  1. test_is_streaming_tool_detects_async_generator - Verifies detection of async generator functions
  2. test_streaming_tool_yields_multiple_events - Ensures multiple Events are yielded for streaming tools
  3. test_streaming_tool_with_string_results - Tests streaming tools with string results
  4. test_streaming_tool_tracks_task_for_cancellation - Verifies task tracking for cancellation
  5. test_streaming_tool_handles_errors - Tests error handling in streaming tools
  6. test_handle_function_calls_async_with_streaming_separates_tools - Ensures streaming and regular tools are handled separately
  7. test_streaming_tool_with_tool_context - Verifies tool_context is correctly passed
$ pytest tests/unittests/flows/llm_flows/test_streaming_tools_non_live.py -v

tests/unittests/flows/llm_flows/test_streaming_tools_non_live.py::test_is_streaming_tool_detects_async_generator PASSED [ 14%]
tests/unittests/flows/llm_flows/test_streaming_tools_non_live.py::test_streaming_tool_yields_multiple_events PASSED [ 28%]
tests/unittests/flows/llm_flows/test_streaming_tools_non_live.py::test_streaming_tool_with_string_results PASSED [ 42%]
tests/unittests/flows/llm_flows/test_streaming_tools_non_live.py::test_streaming_tool_tracks_task_for_cancellation PASSED [ 57%]
tests/unittests/flows/llm_flows/test_streaming_tools_non_live.py::test_streaming_tool_handles_errors PASSED [ 71%]
tests/unittests/flows/llm_flows/test_streaming_tools_non_live.py::test_handle_function_calls_async_with_streaming_separates_tools PASSED [ 85%]
tests/unittests/flows/llm_flows/test_streaming_tools_non_live.py::test_streaming_tool_with_tool_context PASSED [100%]

Regression Tests:
All existing related tests pass:

$ pytest tests/unittests/flows/llm_flows/test_functions_simple.py tests/unittests/flows/llm_flows/test_base_llm_flow.py -v
tests/unittests/flows/llm_flows/test_functions_simple.py::test_simple_function PASSED [  2%]
tests/unittests/flows/llm_flows/test_functions_simple.py::test_async_function PASSED [  4%]
tests/unittests/flows/llm_flows/test_functions_simple.py::test_function_tool PASSED [  7%]
tests/unittests/flows/llm_flows/test_functions_simple.py::test_update_state PASSED [  9%]
tests/unittests/flows/llm_flows/test_functions_simple.py::test_function_call_id PASSED [ 11%]
tests/unittests/flows/llm_flows/test_functions_simple.py::test_find_function_call_event_no_function_response_in_last_event PASSED [ 14%]
tests/unittests/flows/llm_flows/test_functions_simple.py::test_find_function_call_event_empty_session_events PASSED [ 16%]
tests/unittests/flows/llm_flows/test_functions_simple.py::test_find_function_call_event_function_response_but_no_matching_call PASSED [ 19%]
tests/unittests/flows/llm_flows/test_functions_simple.py::test_find_function_call_event_function_response_with_matching_call PASSED [ 21%]
tests/unittests/flows/llm_flows/test_functions_simple.py::test_find_function_call_event_multiple_function_responses PASSED [ 23%]
tests/unittests/flows/llm_flows/test_functions_simple.py::test_function_call_args_not_modified PASSED [ 26%]
tests/unittests/flows/llm_flows/test_functions_simple.py::test_function_call_args_none_handling PASSED [ 28%]
tests/unittests/flows/llm_flows/test_functions_simple.py::test_function_call_args_copy_behavior PASSED [ 30%]
tests/unittests/flows/llm_flows/test_functions_simple.py::test_function_call_args_deep_copy_behavior PASSED [ 33%]
tests/unittests/flows/llm_flows/test_functions_simple.py::test_shallow_vs_deep_copy_demonstration PASSED [ 35%]
tests/unittests/flows/llm_flows/test_functions_simple.py::test_parallel_function_execution_timing PASSED [ 38%]
tests/unittests/flows/llm_flows/test_functions_simple.py::test_parallel_state_modifications_thread_safety PASSED [ 40%]
tests/unittests/flows/llm_flows/test_functions_simple.py::test_sync_function_blocks_async_functions PASSED [ 42%]
tests/unittests/flows/llm_flows/test_functions_simple.py::test_async_function_without_yield_blocks_others PASSED [ 45%]
tests/unittests/flows/llm_flows/test_functions_simple.py::test_merge_parallel_function_response_events_preserves_invocation_id PASSED [ 47%]
tests/unittests/flows/llm_flows/test_functions_simple.py::test_merge_parallel_function_response_events_single_event PASSED [ 50%]
tests/unittests/flows/llm_flows/test_functions_simple.py::test_merge_parallel_function_response_events_preserves_other_attributes PASSED [ 52%]
tests/unittests/flows/llm_flows/test_functions_simple.py::test_yielding_async_functions_run_concurrently PASSED [ 54%]
tests/unittests/flows/llm_flows/test_functions_simple.py::test_mixed_function_types_execution_order PASSED [ 57%]
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_preprocess_calls_toolset_process_llm_request PASSED [ 59%]
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_preprocess_handles_mixed_tools_and_toolsets PASSED [ 61%]
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_preprocess_with_google_search_only PASSED [ 64%]
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_preprocess_with_google_search_workaround PASSED [ 66%]
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_preprocess_calls_convert_tool_union_to_tools PASSED [ 69%]
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_no_callbacks[no_search_no_grounding] PASSED [ 71%]
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_no_callbacks[with_search_with_grounding] PASSED [ 73%]
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_no_callbacks[no_search_with_grounding] PASSED [ 76%]
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_no_callbacks[with_search_no_grounding] PASSED [ 78%]
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_callback_override[no_search_no_grounding] PASSED [ 80%]
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_callback_override[with_search_with_grounding] PASSED [ 83%]
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_callback_override[no_search_with_grounding] PASSED [ 85%]
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_callback_override[with_search_no_grounding] PASSED [ 88%]
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_plugin_override[no_search_no_grounding] PASSED [ 90%]
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_plugin_override[with_search_with_grounding] PASSED [ 92%]
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_plugin_override[no_search_with_grounding] PASSED [ 95%]
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_grounding_with_plugin_override[with_search_no_grounding] PASSED [ 97%]
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_handle_after_model_callback_caches_canonical_tools PASSED [100%]

================================ warnings summary =================================
tests/unittests/flows/llm_flows/test_functions_simple.py: 11 warnings
  /Users/usharout/projects/adk-python/src/google/adk/runners.py:1381: DeprecationWarning: deprecated
    save_input_blobs_as_artifacts=run_config.save_input_blobs_as_artifacts,

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
========================= 42 passed, 11 warnings in 6.76s =========================

Manual End-to-End (E2E) Tests:

Setup:

  1. Navigate to the sample agent directory:

    cd contributing/samples
  2. Launch ADK Web UI:

    adk web .
  3. Select the streaming_tools_non_live_agent from the agent list

Test Cases:

  1. Stock Price Monitoring:

    • Input: "Monitor the stock price for AAPL"
    • Expected: Multiple Events in the Events tab showing price updates (e.g., 100, 105, 110, 108, 112, 115)
    • Result: Verified - 6 function_response Events generated, one per price update
  2. Large Dataset Processing:

    • Input: "Process a large dataset at /tmp/data.csv"
    • Expected: Multiple Events showing progress updates (0%, 10%, 20%, ..., 100%)
    • Result: Verified - 10+ Events generated showing progressive updates
  3. System Health Monitoring:

    • Input: "Monitor system health"
    • Expected: Multiple Events showing health metrics over time
    • Result: Verified - 4 Events generated with different health statuses

Verification:

  • Open the Events tab (not Trace tab) in ADK Web UI
  • For each streaming tool call, verify multiple function_response Events appear
  • Each Event should show a different intermediate result
  • The agent should be able to react to intermediate results (e.g., stop monitoring if threshold reached)

Checklist

  • I have read the CONTRIBUTING.md document.
  • I have performed a self-review of my own code.
  • I have commented my code, particularly in hard-to-understand areas.
  • I have added tests that prove my fix is effective or that my feature works.
  • New and existing unit tests pass locally with my changes.
  • I have manually tested my changes end-to-end.
  • Any dependent changes have been merged and published in downstream modules.

Additional context

This PR addresses the use case described in #3837
This PR extends the streaming tools pattern (which works in live mode) to non-live mode.

Sample Agent

A sample agent demonstrating the feature is included:

  • contributing/samples/streaming_tools_non_live_agent/
  • Includes three streaming tools: monitor_stock_price, process_large_dataset, monitor_system_health
  • Full documentation in README.md
Screenshot 2025-12-05 at 12 06 21 PM Screenshot 2025-12-05 at 12 06 54 PM Screenshot 2025-12-05 at 12 07 20 PM Screenshot 2025-12-05 at 12 07 32 PM Screenshot 2025-12-05 at 12 07 42 PM Screenshot 2025-12-05 at 12 07 58 PM

Implements streaming tools support for non-live mode (run_async/SSE),
allowing tools to yield intermediate results as Events.

Changes:
- Added _is_streaming_tool() helper to detect async generator functions
- Added handle_function_calls_async_with_streaming() to handle streaming tools
- Added _execute_streaming_tool_async() to execute and yield Events for streaming tools
- Integrated streaming tool detection in _postprocess_handle_function_calls_async()
- Added cancellation support via task tracking in active_streaming_tools
- Added unit tests

Fixes google#3837
Adds a sample agent demonstrating streaming tools in non-live mode
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @sarojrout, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a limitation where streaming tools (asynchronous generator functions) were only functional in 'live' execution modes. Previously, using 'run_async()' or SSE endpoints with such tools would block the entire agent response until the tool completed, leading to poor user experience and hindering real-time feedback. The solution extends the streaming tools infrastructure to support progressive result delivery in these non-live modes, allowing agents to receive and react to intermediate outputs as they are generated, significantly improving responsiveness and enabling new use cases like real-time monitoring and progress updates for long-running operations.

Highlights

  • Streaming Tool Detection: Introduced a new helper function, _is_streaming_tool(), to accurately identify asynchronous generator functions, which are now recognized as streaming tools.
  • Dedicated Streaming Tool Handler: Added handle_function_calls_async_with_streaming() to manage the execution of streaming tools separately from traditional tools, enabling progressive result delivery.
  • Progressive Event Yielding: Implemented _execute_streaming_tool_async() which executes streaming tools and yields individual Event objects for each intermediate result produced by the tool.
  • Integration with Function Call Processing: Modified _postprocess_handle_function_calls_async() to automatically detect streaming tools and route their execution to the new streaming handler, ensuring seamless integration.
  • Cancellation Support: Enhanced the system with cancellation support for streaming tools by tracking active tasks within invocation_context.active_streaming_tools.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@adk-bot adk-bot added core [Component] This issue is related to the core interface and implementation tools [Component] This issue is related to tools labels Dec 6, 2025
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant feature to enable streaming tools in non-live modes, which is a great enhancement for long-running operations. The implementation is well-structured, with clear separation of concerns for handling streaming versus regular tools, and it's backed by a comprehensive set of new unit tests. My review includes a few suggestions to improve maintainability by reducing code duplication and removing unused code. I've also identified a high-severity issue where the agent transfer logic is missing for the new streaming tool path, which could prevent agents from reacting to streamed results as intended. Please address this to ensure the feature is complete.

- Added agent transfer handling for streaming tools
  * Check transfer_to_agent after each streaming event
  * Transfer agent and exit streaming loop when detected
  * Enables agent reaction to intermediate results

- Refactored duplicated code into _yield_function_response_events helper
  * Both streaming and regular paths use the same helper
  * Improves maintainability

- Moved cleanup logic to finally block
  * Ensures cleanup always happens (3 places → 1 place)

- Removed unused imports and variables
  * Removed unused LlmAgent import from handle_function_calls_async_with_streaming
  * Removed unused agent variable
@sarojrout
Copy link
Contributor Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a valuable feature by enabling streaming tools in non-live modes, which significantly improves the user experience for long-running operations. The implementation is well-structured, and I appreciate the inclusion of comprehensive unit tests and a sample agent, which clearly demonstrate the new functionality. My main feedback is to improve the concurrency of tool execution to ensure that all tools, both streaming and regular, run in parallel as expected in an async environment. I've also included a suggestion to enhance one of the new tests.

@sarojrout
Copy link
Contributor Author

/gemini review

@gemini-code-assist
Copy link
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core [Component] This issue is related to the core interface and implementation tools [Component] This issue is related to tools

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants