Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 28 additions & 59 deletions patterns/agui-langgraph-agent/agent.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
"""AG-UI LangGraph agent with Gateway MCP tools, Memory, and Code Interpreter.
"""AG-UI LangGraph agent with Gateway MCP tools, Memory, and Code Interpreter."""

Uses copilotkit's LangGraphAGUIAgent to produce native AG-UI SSE events.
AgentCore proxies these unchanged when deployed with --protocol AGUI.
"""
from __future__ import annotations

import logging
import os
Expand All @@ -12,7 +10,6 @@
from copilotkit import CopilotKitMiddleware, LangGraphAGUIAgent
from langchain.agents import create_agent
from langchain_aws import ChatBedrock
from langgraph.graph import END, START, StateGraph
from langgraph_checkpoint_aws import AgentCoreMemorySaver
from tools.gateway import create_gateway_mcp_client
from utils.auth import extract_user_id_from_context
Expand All @@ -28,82 +25,54 @@
"When asked about your tools, list them and explain what they do."
)

REGION = os.environ.get("AWS_REGION", "us-east-1")
MEMORY_ID = os.environ.get("MEMORY_ID")
MODEL = ChatBedrock(
model_id="us.anthropic.claude-sonnet-4-5-20250929-v1:0",
temperature=0.1,
streaming=True,
beta_use_converse_api=True,
)
CODE_INTERPRETER = LangGraphCodeInterpreterTools(REGION).execute_python_securely

def _build_model() -> ChatBedrock:
return ChatBedrock(
model_id="us.anthropic.claude-sonnet-4-5-20250929-v1:0",
temperature=0.1,
streaming=True,
beta_use_converse_api=True,
)

def get_memory_saver() -> AgentCoreMemorySaver | None:
"""Return an AgentCore Memory checkpointer, or None when MEMORY_ID is unset."""
if not MEMORY_ID:
return None
return AgentCoreMemorySaver(memory_id=MEMORY_ID, region_name=REGION)

def _create_checkpointer() -> AgentCoreMemorySaver:
memory_id = os.environ.get("MEMORY_ID")
if not memory_id:
raise ValueError("MEMORY_ID environment variable is required")
return AgentCoreMemorySaver(
memory_id=memory_id,
region_name=os.environ.get("AWS_DEFAULT_REGION", "us-east-1"),
)


async def create_langgraph_agent(user_id: str):
"""Create a LangGraph agent with Gateway tools, Memory, and Code Interpreter."""
mcp_client = await create_gateway_mcp_client(user_id)
async def build_graph(actor_id: str):
"""Build a LangGraph compiled graph with Gateway tools and Memory."""
mcp_client = await create_gateway_mcp_client(actor_id)
tools = await mcp_client.get_tools()

region = os.environ.get("AWS_DEFAULT_REGION", "us-east-1")
code_tools = LangGraphCodeInterpreterTools(region)
tools.append(code_tools.execute_python_securely)
tools.append(CODE_INTERPRETER)

return create_agent(
model=_build_model(),
model=MODEL,
tools=tools,
checkpointer=_create_checkpointer(),
checkpointer=get_memory_saver(),
middleware=[CopilotKitMiddleware()],
system_prompt=SYSTEM_PROMPT,
)


class ActorAwareLangGraphAgent(LangGraphAGUIAgent):
"""LangGraphAGUIAgent that creates the graph per-request with fresh tokens."""

def __init__(self, *, user_id: str, **kwargs):
self._user_id = user_id
# Create a minimal placeholder graph to satisfy validation in newer
# copilotkit/ag_ui_langgraph versions that inspect self.graph.nodes
# during __init__. The placeholder is overwritten in run().
if kwargs.get("graph") is None:
builder = StateGraph(dict)
builder.add_node("placeholder", lambda x: x)
builder.add_edge(START, "placeholder")
builder.add_edge("placeholder", END)
kwargs["graph"] = builder.compile()
super().__init__(**kwargs)

async def run(self, input: RunAgentInput):
self.graph = await create_langgraph_agent(self._user_id)
async for event in super().run(input):
yield event


@app.entrypoint
async def invocations(payload: dict, context: RequestContext):
input_data = RunAgentInput.model_validate(payload)
actor_id = extract_user_id_from_context(context)

user_id = extract_user_id_from_context(context)

agent = ActorAwareLangGraphAgent(
graph = await build_graph(actor_id)
agui_agent = LangGraphAGUIAgent(
name="agui_langgraph_agent",
description="AG-UI LangGraph agent with Gateway MCP tools and Memory",
graph=None,
config={"configurable": {"actor_id": user_id}},
user_id=user_id,
graph=graph,
config={"configurable": {"actor_id": actor_id}},
)

try:
async for event in agent.run(input_data):
async for event in agui_agent.run(input_data):
if event is not None:
yield event.model_dump(mode="json", by_alias=True, exclude_none=True)
except Exception as exc:
Expand Down
92 changes: 30 additions & 62 deletions patterns/agui-strands-agent/agent.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
"""AG-UI Strands agent with Gateway MCP tools, Memory, and Code Interpreter.
"""AG-UI Strands agent with Gateway MCP tools, Memory, and Code Interpreter."""

Uses ag-ui-strands to produce native AG-UI SSE events.
AgentCore proxies these unchanged when deployed with --protocol AGUI.
"""
from __future__ import annotations

import logging
import os
Expand Down Expand Up @@ -30,79 +28,49 @@
"When asked about your tools, list them and explain what they do."
)


def _build_model() -> BedrockModel:
return BedrockModel(
model_id="us.anthropic.claude-sonnet-4-5-20250929-v1:0", temperature=0.1
)
REGION = os.environ.get("AWS_REGION", "us-east-1")
MEMORY_ID = os.environ.get("MEMORY_ID")
MODEL = BedrockModel(
model_id="us.anthropic.claude-sonnet-4-5-20250929-v1:0",
temperature=0.1,
)
CODE_INTERPRETER = StrandsCodeInterpreterTools(REGION).execute_python_securely


def _create_session_manager(
user_id: str, session_id: str
) -> AgentCoreMemorySessionManager:
memory_id = os.environ.get("MEMORY_ID")
if not memory_id:
raise ValueError("MEMORY_ID environment variable is required")
config = AgentCoreMemoryConfig(
memory_id=memory_id, session_id=session_id, actor_id=user_id
)
def get_memory_session_manager(
actor_id: str, session_id: str
) -> AgentCoreMemorySessionManager | None:
"""Return an AgentCore Memory session manager, or None when MEMORY_ID is unset."""
if not MEMORY_ID:
return None
return AgentCoreMemorySessionManager(
agentcore_memory_config=config,
region_name=os.environ.get("AWS_DEFAULT_REGION", "us-east-1"),
)


def _create_agent(user_id: str, session_id: str) -> Agent:
"""Create a Strands Agent with Gateway MCP tools, Memory, and Code Interpreter."""
gateway_client = create_gateway_mcp_client(user_id)

region = os.environ.get("AWS_DEFAULT_REGION", "us-east-1")
code_tools = StrandsCodeInterpreterTools(region)

return Agent(
name="strands_agent",
system_prompt=SYSTEM_PROMPT,
tools=[gateway_client, code_tools.execute_python_securely],
model=_build_model(),
session_manager=_create_session_manager(user_id, session_id),
AgentCoreMemoryConfig(
memory_id=MEMORY_ID, session_id=session_id, actor_id=actor_id
),
region_name=REGION,
)


class ActorAwareStrandsAgent(StrandsAgent):
"""StrandsAgent that creates the agent per-request with fresh MCP context."""

def __init__(self, *, user_id: str, session_id: str, name: str, description: str):
self._user_id = user_id
self._session_id = session_id
super().__init__(
agent=Agent(model=_build_model(), system_prompt=SYSTEM_PROMPT),
name=name,
description=description,
)

async def run(self, input_data: RunAgentInput):
thread_id = input_data.thread_id or "default"
self._agents_by_thread[thread_id] = _create_agent(
self._user_id, self._session_id
)
async for event in super().run(input_data):
yield event


@app.entrypoint
async def invocations(payload: dict, context: RequestContext):
input_data = RunAgentInput.model_validate(payload)
user_id = extract_user_id_from_context(context)
actor_id = extract_user_id_from_context(context)
session_id = input_data.thread_id or actor_id

agent = ActorAwareStrandsAgent(
user_id=user_id,
session_id=input_data.thread_id,
agent = Agent(
model=MODEL,
system_prompt=SYSTEM_PROMPT,
tools=[create_gateway_mcp_client(actor_id), CODE_INTERPRETER],
session_manager=get_memory_session_manager(actor_id, session_id),
)
agui_agent = StrandsAgent(
agent=agent,
name="agui_strands_agent",
description="AG-UI Strands agent with Gateway MCP tools and Code Interpreter",
)

try:
async for event in agent.run(input_data):
async for event in agui_agent.run(input_data):
if event is not None:
yield event.model_dump(mode="json", by_alias=True, exclude_none=True)
except Exception as exc:
Expand Down
Loading