diff --git a/patterns/agui-langgraph-agent/agent.py b/patterns/agui-langgraph-agent/agent.py index 4801c22d..e40bfbba 100644 --- a/patterns/agui-langgraph-agent/agent.py +++ b/patterns/agui-langgraph-agent/agent.py @@ -1,8 +1,6 @@ -"""AG-UI LangGraph agent with Gateway MCP tools, Memory, and Code Interpreter. +"""AG-UI LangGraph agent with Gateway MCP tools, Memory, and Code Interpreter.""" -Uses copilotkit's LangGraphAGUIAgent to produce native AG-UI SSE events. -AgentCore proxies these unchanged when deployed with --protocol AGUI. -""" +from __future__ import annotations import logging import os @@ -12,7 +10,6 @@ from copilotkit import CopilotKitMiddleware, LangGraphAGUIAgent from langchain.agents import create_agent from langchain_aws import ChatBedrock -from langgraph.graph import END, START, StateGraph from langgraph_checkpoint_aws import AgentCoreMemorySaver from tools.gateway import create_gateway_mcp_client from utils.auth import extract_user_id_from_context @@ -28,82 +25,54 @@ "When asked about your tools, list them and explain what they do." ) +REGION = os.environ.get("AWS_REGION", "us-east-1") +MEMORY_ID = os.environ.get("MEMORY_ID") +MODEL = ChatBedrock( + model_id="us.anthropic.claude-sonnet-4-5-20250929-v1:0", + temperature=0.1, + streaming=True, + beta_use_converse_api=True, +) +CODE_INTERPRETER = LangGraphCodeInterpreterTools(REGION).execute_python_securely -def _build_model() -> ChatBedrock: - return ChatBedrock( - model_id="us.anthropic.claude-sonnet-4-5-20250929-v1:0", - temperature=0.1, - streaming=True, - beta_use_converse_api=True, - ) +def get_memory_saver() -> AgentCoreMemorySaver | None: + """Return an AgentCore Memory checkpointer, or None when MEMORY_ID is unset.""" + if not MEMORY_ID: + return None + return AgentCoreMemorySaver(memory_id=MEMORY_ID, region_name=REGION) -def _create_checkpointer() -> AgentCoreMemorySaver: - memory_id = os.environ.get("MEMORY_ID") - if not memory_id: - raise ValueError("MEMORY_ID environment variable is required") - return AgentCoreMemorySaver( - memory_id=memory_id, - region_name=os.environ.get("AWS_DEFAULT_REGION", "us-east-1"), - ) - -async def create_langgraph_agent(user_id: str): - """Create a LangGraph agent with Gateway tools, Memory, and Code Interpreter.""" - mcp_client = await create_gateway_mcp_client(user_id) +async def build_graph(actor_id: str): + """Build a LangGraph compiled graph with Gateway tools and Memory.""" + mcp_client = await create_gateway_mcp_client(actor_id) tools = await mcp_client.get_tools() - - region = os.environ.get("AWS_DEFAULT_REGION", "us-east-1") - code_tools = LangGraphCodeInterpreterTools(region) - tools.append(code_tools.execute_python_securely) + tools.append(CODE_INTERPRETER) return create_agent( - model=_build_model(), + model=MODEL, tools=tools, - checkpointer=_create_checkpointer(), + checkpointer=get_memory_saver(), middleware=[CopilotKitMiddleware()], system_prompt=SYSTEM_PROMPT, ) -class ActorAwareLangGraphAgent(LangGraphAGUIAgent): - """LangGraphAGUIAgent that creates the graph per-request with fresh tokens.""" - - def __init__(self, *, user_id: str, **kwargs): - self._user_id = user_id - # Create a minimal placeholder graph to satisfy validation in newer - # copilotkit/ag_ui_langgraph versions that inspect self.graph.nodes - # during __init__. The placeholder is overwritten in run(). - if kwargs.get("graph") is None: - builder = StateGraph(dict) - builder.add_node("placeholder", lambda x: x) - builder.add_edge(START, "placeholder") - builder.add_edge("placeholder", END) - kwargs["graph"] = builder.compile() - super().__init__(**kwargs) - - async def run(self, input: RunAgentInput): - self.graph = await create_langgraph_agent(self._user_id) - async for event in super().run(input): - yield event - - @app.entrypoint async def invocations(payload: dict, context: RequestContext): input_data = RunAgentInput.model_validate(payload) + actor_id = extract_user_id_from_context(context) - user_id = extract_user_id_from_context(context) - - agent = ActorAwareLangGraphAgent( + graph = await build_graph(actor_id) + agui_agent = LangGraphAGUIAgent( name="agui_langgraph_agent", description="AG-UI LangGraph agent with Gateway MCP tools and Memory", - graph=None, - config={"configurable": {"actor_id": user_id}}, - user_id=user_id, + graph=graph, + config={"configurable": {"actor_id": actor_id}}, ) try: - async for event in agent.run(input_data): + async for event in agui_agent.run(input_data): if event is not None: yield event.model_dump(mode="json", by_alias=True, exclude_none=True) except Exception as exc: diff --git a/patterns/agui-strands-agent/agent.py b/patterns/agui-strands-agent/agent.py index 36e84684..843250ca 100644 --- a/patterns/agui-strands-agent/agent.py +++ b/patterns/agui-strands-agent/agent.py @@ -1,8 +1,6 @@ -"""AG-UI Strands agent with Gateway MCP tools, Memory, and Code Interpreter. +"""AG-UI Strands agent with Gateway MCP tools, Memory, and Code Interpreter.""" -Uses ag-ui-strands to produce native AG-UI SSE events. -AgentCore proxies these unchanged when deployed with --protocol AGUI. -""" +from __future__ import annotations import logging import os @@ -30,79 +28,49 @@ "When asked about your tools, list them and explain what they do." ) - -def _build_model() -> BedrockModel: - return BedrockModel( - model_id="us.anthropic.claude-sonnet-4-5-20250929-v1:0", temperature=0.1 - ) +REGION = os.environ.get("AWS_REGION", "us-east-1") +MEMORY_ID = os.environ.get("MEMORY_ID") +MODEL = BedrockModel( + model_id="us.anthropic.claude-sonnet-4-5-20250929-v1:0", + temperature=0.1, +) +CODE_INTERPRETER = StrandsCodeInterpreterTools(REGION).execute_python_securely -def _create_session_manager( - user_id: str, session_id: str -) -> AgentCoreMemorySessionManager: - memory_id = os.environ.get("MEMORY_ID") - if not memory_id: - raise ValueError("MEMORY_ID environment variable is required") - config = AgentCoreMemoryConfig( - memory_id=memory_id, session_id=session_id, actor_id=user_id - ) +def get_memory_session_manager( + actor_id: str, session_id: str +) -> AgentCoreMemorySessionManager | None: + """Return an AgentCore Memory session manager, or None when MEMORY_ID is unset.""" + if not MEMORY_ID: + return None return AgentCoreMemorySessionManager( - agentcore_memory_config=config, - region_name=os.environ.get("AWS_DEFAULT_REGION", "us-east-1"), - ) - - -def _create_agent(user_id: str, session_id: str) -> Agent: - """Create a Strands Agent with Gateway MCP tools, Memory, and Code Interpreter.""" - gateway_client = create_gateway_mcp_client(user_id) - - region = os.environ.get("AWS_DEFAULT_REGION", "us-east-1") - code_tools = StrandsCodeInterpreterTools(region) - - return Agent( - name="strands_agent", - system_prompt=SYSTEM_PROMPT, - tools=[gateway_client, code_tools.execute_python_securely], - model=_build_model(), - session_manager=_create_session_manager(user_id, session_id), + AgentCoreMemoryConfig( + memory_id=MEMORY_ID, session_id=session_id, actor_id=actor_id + ), + region_name=REGION, ) -class ActorAwareStrandsAgent(StrandsAgent): - """StrandsAgent that creates the agent per-request with fresh MCP context.""" - - def __init__(self, *, user_id: str, session_id: str, name: str, description: str): - self._user_id = user_id - self._session_id = session_id - super().__init__( - agent=Agent(model=_build_model(), system_prompt=SYSTEM_PROMPT), - name=name, - description=description, - ) - - async def run(self, input_data: RunAgentInput): - thread_id = input_data.thread_id or "default" - self._agents_by_thread[thread_id] = _create_agent( - self._user_id, self._session_id - ) - async for event in super().run(input_data): - yield event - - @app.entrypoint async def invocations(payload: dict, context: RequestContext): input_data = RunAgentInput.model_validate(payload) - user_id = extract_user_id_from_context(context) + actor_id = extract_user_id_from_context(context) + session_id = input_data.thread_id or actor_id - agent = ActorAwareStrandsAgent( - user_id=user_id, - session_id=input_data.thread_id, + agent = Agent( + model=MODEL, + system_prompt=SYSTEM_PROMPT, + tools=[create_gateway_mcp_client(actor_id), CODE_INTERPRETER], + session_manager=get_memory_session_manager(actor_id, session_id), + ) + agui_agent = StrandsAgent( + agent=agent, name="agui_strands_agent", description="AG-UI Strands agent with Gateway MCP tools and Code Interpreter", ) try: - async for event in agent.run(input_data): + async for event in agui_agent.run(input_data): if event is not None: yield event.model_dump(mode="json", by_alias=True, exclude_none=True) except Exception as exc: