Skip to content

将 ChatRuntime / ChannelLlmReplyInboxRuntime 收敛为 actor-owned Agent Continuation #596

@eanzhao

Description

@eanzhao

背景

这个 issue 承接 Discussion #568 的 Agent Continuation 方向,用来讨论并落地当前 Lark bot / channel reply / agent chat 链路里的 runtime 边界问题。

更新:已采纳 issue comment 中的替代 phasing。最终方向调整为:先把错误的 hosted-service 容器换成 run-scoped actor,再补观察;ES 只记录业务事实,不把每个 LLM round / tool call 都持久化;tool 按能力边界分流,不一刀切 actor 化。

相关讨论:#568

当前问题

简化后的当前链路:

%%{init: {"maxTextSize": 100000, "flowchart": {"useMaxWidth": false, "nodeSpacing": 10, "rankSpacing": 50}, "themeVariables": {"fontSize": "10px"}}}%%
flowchart LR
  C["ConversationGAgent"] --> I["ChannelLlmReplyInboxRuntime\nIHostedService singleton"]
  I --> G["NyxIdConversationReplyGenerator"]
  G --> R["ChatRuntime"]
  R --> L["LLMProvider"]
  R --> T["ToolCallLoop / StreamingToolExecutor"]
  T --> Tools["Tools / Skills"]
  R --> G
  G --> I
  I --> C
Loading

诊断

  1. ChannelLlmReplyInboxRuntimeIHostedService 单例,却承担 stale gate、metadata enrichment、fallback timeout、drop classification、streaming sink build、reply-ready 回送等 run-scoped 决策。这是当前最不符合 actor-owned continuation 的容器。

  2. ChatRuntime / ToolCallLoop 通过本地 for loop 承载 LLM/tool 多轮推进,本质也是 in-stack continuation。但它不是第一阶段最该动的点。先把外层容器换正确,再逐步拆内层 loop。

  3. ConversationGAgent 是 conversation-scoped 权威事实源,不能把真实 LLM/tool IO 塞回它的 actor turn,否则群聊/频道会变成热点串行执行容器。

  4. 不应把每个 LLM round / tool call 都做成 ES 持久化事件。它们更多是 run trace / usage ledger / observability,不是恢复业务正确性所需的权威事实。

  5. tool 不要一刀切 actor 化。跨 actor / 外部异步能力需要 continuation;纯函数、短耗时、无副作用工具可以继续 inline。

目标架构

第一目标不是立刻删除 ChatRuntime,而是先让一次 reply/run 有正确的 actor 容器:

%%{init: {"maxTextSize": 100000, "flowchart": {"useMaxWidth": false, "nodeSpacing": 10, "rankSpacing": 50}, "themeVariables": {"fontSize": "10px"}}}%%
flowchart LR
  Ingress["Lark / NyxID Relay"] --> C["ConversationGAgent\nadmission / dedup / pending / delivery commit"]
  C --> D["IChannelLlmReplyRunDispatcher\nthin port, no state"]
  D --> R["AgentRunGAgent[runId]\nrun-scoped continuation owner"]

  R --> CR["ChatRuntime\ntransitional local loop"]
  CR --> L["LLMProvider\nsingle sampling IO"]
  CR --> T["Tools / Skills\ninline or actor-backed"]

  R -->|"LlmReplyReadyEvent / Drop"| C
  R --> O["Observation\ntransient run steps + terminal facts"]
  C --> Outbound["Channel outbound reply/edit"]
Loading

命名说明:实现底层可以仍然是 Orleans grain,但仓库代码命名优先使用 GAgent,所以先用 AgentRunGAgent 而不是 AgentRunGrain

职责边界

ConversationGAgent

  • conversation-scoped 唯一权威状态。
  • 负责入站准入、去重、pending reply、最终 delivery commit。
  • 不执行长耗时 LLM/tool IO。
  • 从“enqueue inbox runtime”改为“dispatch run actor”。

IChannelLlmReplyRunDispatcher

  • 位于 Channel.Runtime 抽象层。
  • 只做投递端口,不持有 run state。
  • ConversationGAgent 不直接依赖 NyxidChat 的具体 run actor 实现。

AgentRunGAgent

  • runId = correlationId 寻址。
  • 拥有一次 Lark bot reply / agent run 的 continuation。
  • 第一阶段 1:1 承接 ChannelLlmReplyInboxRuntime.ProcessAsync 的逻辑。
  • 初期内部仍可调用现有 IConversationReplyGenerator / ChatRuntime,降低迁移风险。
  • ES 只记录 run started / result produced / failed / dropped 等最小业务事实。

ChatRuntime

  • 第一阶段保留为 transitional local loop。
  • 不再作为长期核心抽象继续扩展 run 语义。
  • 后续拆成 prompt/message builder、single LLM sampling adapter、stream normalizer、tool-call parser 等小组件。

Tools / Skills

  • actor-backed / external async tool:走 continuation。
  • pure/local tool:继续 inline。
  • skills lifecycle 与进程级 registry 问题单独开议题,不绑死在第一阶段。

实施步骤

Phase A:杀掉 hosted-service,落 run-scoped actor

这是第一批 PR 的目标范围。

  1. 新增 AgentRunGAgent 及最小 state / event contract。

    • 建议先放在 Aevatar.GAgents.NyxidChat,因为当前依赖 IConversationReplyGenerator、NyxID relay options、UserConfig 等 NyxidChat 侧服务。
    • runId 使用 correlationId
  2. 新增 IChannelLlmReplyRunDispatcher

    • 放在 Aevatar.GAgents.Channel.Runtime
    • 方法语义类似 DispatchAsync(NeedsLlmReplyEvent request, CancellationToken ct)
    • 不保留 service-level dictionary / queue / run state。
  3. 在 NyxidChat 中实现 dispatcher。

    • 创建或获取 AgentRunGAgent[runId]
    • 向 run actor 投递 typed command,例如 AgentRunStartRequested
  4. 迁移 ChannelLlmReplyInboxRuntime.ProcessAsync 逻辑到 AgentRunGAgent

    • malformed / stale / missing relay token gate。
    • metadata enrichment。
    • fallback timeout。
    • streaming sink build。
    • IConversationReplyGenerator.GenerateReplyAsync
    • LlmReplyReadyEvent / DeferredLlmReplyDroppedEvent 回送 ConversationGAgent
  5. 修改 ConversationGAgent.DispatchPendingLlmReplyAsync

    • IChannelLlmReplyInbox.EnqueueAsync 改为 IChannelLlmReplyRunDispatcher.DispatchAsync
    • 保留现有 durable retry / rehydration 语义。
  6. 下线旧 inbox runtime。

    • 移除 ChannelLlmReplyInboxRuntime / ChannelLlmReplyInboxHostedService 注册。
    • 移除或废弃 IChannelLlmReplyInbox
    • 移除 channel-runtime:llm-reply:inbox stream。
  7. 测试迁移。

    • ChannelLlmReplyInboxRuntimeTests 迁到 AgentRunGAgentTests
    • 保持现有 ConversationGAgent dedup / reply-token / streaming 行为不回退。

建议验证:

dotnet test test/Aevatar.GAgents.ChannelRuntime.Tests/Aevatar.GAgents.ChannelRuntime.Tests.csproj --nologo
dotnet test test/Aevatar.GAgents.Channel.Protocol.Tests/Aevatar.GAgents.Channel.Protocol.Tests.csproj --nologo
bash tools/ci/test_stability_guards.sh

Phase B:观察走 transient stream + readmodel,不做 ES 写放大

  1. AgentRunGAgent 终态事实进入 ES:started、result produced、failed、dropped。
  2. 每轮 LLM/tool 完成时发布 transient observation,例如:
RunStepObserved(runId, stepKind, model, latency, tokenIn, tokenOut, toolName, success)
  1. 物化为 trace/telemetry 型 readmodel,例如 active_runsrun_tracetool_usage_stats
  2. 不把 LLMSamplingRequested/CompletedToolInvocationRequested/Completed 默认写入 ES。

Phase C:actor-backed tool 走 continuation

  1. 给 tool/capability 增加执行边界描述:Inline / ActorBacked / ExternalAsync
  2. Inline tool 继续在当前调用栈执行。
  3. ActorBacked / ExternalAsync tool:
    • run actor 记录 pending invocation。
    • 发 command 或事件。
    • 当前 turn 结束。
    • 回执事件唤醒 run actor 继续。
  4. 不把纯函数工具 actor 化。

Phase D:metadata enrichment 前移到 admit / routing policy

  1. bot owner LLM config / sender preference 是入站 routing 决策,逐步前移到 ChannelConversationTurnRunner / admit policy。
  2. AgentRunGAgent 只消费已经固化的 effective metadata。
  3. secret token 仍只走 transient command,不进 ES、不进 readmodel。

Phase E:拆掉 ChatRuntime 的 loop 职责

  1. ChatRuntime 拆成小组件:

    • prompt/message builder
    • single LLM sampling adapter
    • stream normalizer
    • tool-call parser
    • length recovery helper
  2. 多轮推进由 AgentRunGAgent 的 actor event choreography 决定。

  3. ToolCallLoop / StreamingToolExecutor 逐步退化为局部 helper 或删除。

非目标

  • 不要求 NyxID / Ornn / chrono-* 外部仓库新增 endpoint 或 schema。
  • 不做插件市场。
  • 不把 LLM/tool 长耗时 IO 塞回 ConversationGAgent 的单个 actor turn。
  • 第一阶段不解决 skills lifecycle / registry 的完整治理问题。
  • 第一阶段不要求每个 LLM round / tool call 都成为 ES 持久化事件。

第一批 PR 的验收标准

  • ChannelLlmReplyInboxRuntime 不再作为 hosted service 参与生产链路。
  • deferred LLM reply 由 ConversationGAgent -> dispatcher -> AgentRunGAgent -> ConversationGAgent 完成。
  • reply token 不进入持久化 state / event store / readmodel。
  • stale / malformed / missing token drop 行为保持。
  • streaming reply 行为保持,包括 final chunk 与 ready event 的顺序保护。
  • 现有 channel runtime / protocol 测试通过。

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions