Skip to content

RFC: 统一外部长连接流为 actor 一等抽象(SessionStreamGAgent 方向)— 求团队讨论与互评 #560

@loning

Description

@loning
id: cluster-triage-stream-session-actor-contract
severity: high
requires_design: true

来源

本 issue 由 maintainer 手动开,maintainer 重新加 auto-loop-triage 后,triage codex 按当前代码重新调研并补充 evidence。

核心问题

外部长连接 session 还没有一套 runtime-neutral、actor-owned 的统一 contract。NyxIdChat SSE 现在通过 request handler 持有 writer 并把 AGUIEvent 直接映射到 HTTP response;VoicePresence 已经把部分 session/lease 事实收敛进 actor-owned state,但 host 侧仍以 capability preflight + lease handle + volatile transport attachment 组合承载连接接续;Channel inbound 另有 in-process transport buffer;CQRS/AGUI 还有 EventChannel<T> sink。

这些局部修复方向大体正确,但缺少统一的 StreamSessionGAgent / StreamFrame 设计边界时,后续 solver 很容易再次把外部长连接、projection observation、channel webhook、LLM token delta、voice media data plane 都混成一个 “streaming” 抽象,违反 actor 边界、读写分离、投影主链和内部状态 Protobuf 建模要求。

本 issue 是 design issue,不要求在同一 PR 内实现完整流处理子系统;目标是把外部长连接 session 的权威 owner、frame 范围、projection/AGUI/channel 边界、host failover/replay 语义裁定清楚,再拆 first-slice implementation。

Evidence

1. NyxIdChat streaming endpoint 仍由 HTTP request handler 持有 live SSE writer

agents/Aevatar.GAgents.NyxidChat/NyxIdChatEndpoints.Streaming.cs:73

var writer = new NyxIdChatSseWriter(http.Response);
...
var result = await interactionService.ExecuteAsync(
    new NyxIdChatCommand(...),
    async (evt, _) =>
    {
        await NyxIdChatStreamingRunner.WriteAguiEventAsync(evt, messageId, writer);
    },
    null,
    ct);

违反点:命令交互骨架已经比旧 runner 收敛,但外部长连接 session 的 lifecycle、seq、ack、reconnect、replay/gap 仍没有 actor-owned contract。HTTP request 只应持有 wire handle;连接会话事实不能由 Host/request stack 暗含。

2. NyxIdChatStreamingRunner 当前只是 AGUI 到 NyxId SSE 的 presentation mapper

agents/Aevatar.GAgents.NyxidChat/NyxIdChatStreamingRunner.cs:8

internal static class NyxIdChatStreamingRunner
{
    public static async ValueTask<string?> WriteAguiEventAsync(
        AGUIEvent aguiEvent,
        string messageId,
        NyxIdChatSseWriter writer)

违反点:早期 TaskCompletionSource + SubscribeAsync<EventEnvelope> + Task.Delay 形态已经不再是当前代码证据;当前剩余问题不是“给 runner 打补丁”,而是缺少一个可复用的 external stream session contract,让 NyxIdChat outbound、approval continuation、AGUI wire mapping 与 connection delivery 语义分层。

3. VoicePresence 已有 actor-owned lease/state,但 session 仍是能力专用形状

src/Aevatar.Foundation.VoicePresence.Abstractions/Protos/voice_presence.proto:81

message VoicePresenceRuntimeState {
  VoicePresenceRuntimeStatus status = 1;
  ...
  string active_session_id = 15;
  google.protobuf.Timestamp lease_expires_at = 16;
  string active_transport_lease_id = 18;
  string active_lease_owner_id = 19;
  int64 lease_epoch = 21;
}

src/Aevatar.Foundation.VoicePresence/Hosting/ActorOwnedVoicePresenceSessionResolver.cs:86

var leaseRequest = new VoicePresenceSessionLeaseRequest(
    capability.ActorId,
    capability.ModuleName,
    Guid.NewGuid().ToString("N"),
    HostOwnerId,
    _timeProvider.GetUtcNow().Add(DefaultLeaseTtl),
    capability.StateVersion,
    capability.RemoteAudioSupport);

违反点:VoicePresence 是最接近正确形状的现有实现,但它是 voice-specific session/lease contract,不是通用 external stream session contract。ADR 需要决定哪些语义上提为共享 StreamSession,哪些留在 VoicePresence typed media/session domain。

4. Voice transport callbacks 已事件化,但 frame/replay/drop 语义仍按能力自定义

src/Aevatar.Foundation.VoicePresence.Abstractions/Protos/voice_presence.proto:225

message VoiceTransportControlFrameReceived {
  string session_id = 1;
  string transport_lease_id = 2;
  VoiceControlFrame control_frame = 3;
  string owner_id = 4;
  google.protobuf.Timestamp lease_expires_at = 5;
  int64 lease_epoch = 6;
}

message VoiceTransportAudioFrameReceived {
  string session_id = 1;
  string transport_lease_id = 2;
  bytes pcm16 = 3;
  ...
}

违反点:control/audio 都已强类型且事件化,这是正确方向;但通用 stream design 必须明确 text/tool/control/lifecycle 可 replay,heartbeat/audio/video 默认不 replay 或走 media fast path,不能让每个 capability 用不同隐式规则处理 seq/ack/gap。

5. Channel transport 明确是 in-process inbound buffer,不是 durable/session 边界

agents/Aevatar.GAgents.Channel.Abstractions/Transport/IChannelTransport.cs:14

/// InitializeAsync must succeed exactly once before StartReceivingAsync.
/// The inbound stream is a single-reader, in-process buffer and is not the durable ingress boundary.
...
ChannelReader<ChatActivity> InboundStream { get; }

违反点:channel webhook / bot adapter 的 IChannelTransport 不能被直接收编为 StreamSessionGAgent。它处理平台绑定、鉴权、normalize 和 inbound buffer;只有真正需要长连接接续、seq/ack、replay/gap 的连接才进入 StreamSession 抽象。

6. CQRS EventChannel<T> 是 sink/backpressure primitive,不是 actor-owned session fact source

src/Aevatar.CQRS.Core.Abstractions/Streaming/EventSink.cs:33

public sealed class EventChannel<TEvent> : IEventSink<TEvent>
{
    private readonly Channel<TEvent> _channel;
    ...
    public async IAsyncEnumerable<TEvent> ReadAllAsync(...)

违反点:EventChannel<T> 适合作为 projection/interaction live sink,但它是进程内 channel,不拥有跨 host/session 事实。AGUI/CQRS observation 可以继续走统一 Projection Pipeline;外部长连接 delivery contract 不能退化成 sink channel 或 request-local writer。

7. NyxID 是外部能力 surface,不应要求改外部仓库

已按仓库规则检查 ../NyxID 存在。NyxID 当前文档/代码已经提供代理、SSE passthrough、LLM gateway streaming、channel bot relay 等外部能力 surface;本 issue 不要求 NyxID 增加端点、改 schema 或承担 aevatar 的 session actor contract。aevatar 应在本仓库内用现有外部 surface 适配。

违反条款

AGENTS.md / CLAUDE.md:

  • 严格分层:Domain / Application / Infrastructure / Host;API 仅做宿主与组合,不承载核心业务编排。
  • 投影编排 Actor 化:Projection 的会话、订阅、关联关系等运行态必须由 Actor 或分布式状态承载;禁止在中间层通过进程内注册表/字典持有事实状态。
  • 明确读写分离:Command -> Event,Query -> ReadModel;异步完成通过事件通知与推送,不在会话内临时拼装流程。
  • 已提交领域事件必须可观察:write-side 一旦完成 committed domain event,必须把该事实送入统一 observation/projection 主链。
  • 投递语义必须 runtime-neutral:publish/send 统一表示进入目标 actor inbox 等待处理,不得因 self 或 runtime 差异退化为 inline dispatch。
  • ACK 语义必须诚实:同步返回只能承诺已经真实达到的阶段,默认应是 accepted for dispatch + stable command id。
  • 查询始终走 readmodel;不得把 actor 内部状态、state mirror payload 或 event replay 暴露成查询主路径。
  • projection 只消费 committed 事实:禁止订阅入站 command、self continuation 或 actor 运行时偶然结构去推测业务完成态。
  • Actor/模块运行态只能在事件处理主线程修改;回调只发信号。
  • 中间层禁止维护 entity/actor/workflow-run/session 等 ID 到上下文或事实状态的进程内映射。
  • 所有序列化与反序列化操作统一使用 Protobuf,尤其是 State、领域事件、命令、回调载荷、快照、缓存载荷、跨 Actor/跨节点内部传输对象。

新原则

外部长连接 session 是 actor-owned delivery state,不是 projection hub、channel webhook、AGUI event 名字或 request-local SSE writer。StreamSessionGAgent 只拥有连接投递事实:session lifecycle、attachment、seq、ack、heartbeat、bounded replay window、gap/resync。业务事实仍由 consumer actor 拥有并提交 typed domain events;Projection Pipeline 继续只消费 committed facts 并一对多物化 readmodel/AGUI observation。

AGUI 保留为 presentation wire format;StreamFrame 是内部 session/delivery frame。需要把 projection observation 推给前端时,由 adapter 映射成 AGUI/SSE/WebSocket wire,不把 Projection Pipeline 折成 long-lived bidirectional session hub。

Channel inbound 继续走 IChannelTransport -> ChatActivity -> durable inbox/conversation。Webhook-only inbound 不是 stream session;只有具备长连接接续、seq/ack、replay/gap 的 transport attachment 才接入 StreamSession。

Fix boundary

建议 scope paths:

  • docs/adr/00xx-stream-session-actor-contract.md
  • src/Aevatar.Foundation.Abstractions/Streaming/*
  • src/Aevatar.Foundation.Runtime/Streaming/*
  • src/Aevatar.Foundation.Runtime.Implementations.Orleans.Streaming/*
  • src/Aevatar.CQRS.Core.Abstractions/Streaming/EventSink.cs
  • src/Aevatar.CQRS.Core/Streaming/*
  • src/Aevatar.Presentation.AGUI/*
  • agents/Aevatar.GAgents.NyxidChat/NyxIdChatEndpoints.Streaming.cs
  • agents/Aevatar.GAgents.NyxidChat/NyxIdChatStreamingRunner.cs
  • src/Aevatar.Foundation.VoicePresence.Abstractions/Protos/voice_presence.proto
  • src/Aevatar.Foundation.VoicePresence/Hosting/*
  • src/Aevatar.Foundation.VoicePresence/Modules/VoicePresenceModule.cs
  • agents/Aevatar.GAgents.Channel.Abstractions/Transport/IChannelTransport.cs
  • agents/Aevatar.GAgents.Channel.Runtime/TurnStreamingReplySink.cs
  • agents/Aevatar.GAgents.Scheduled/SkillRunnerStreamingReplySink.cs
  • focused tests under test/Aevatar.AI.Tests/, test/Aevatar.Foundation.VoicePresence.Tests/, test/Aevatar.Foundation.Runtime.Hosting.Tests/, and CQRS/AGUI tests as touched.

避免 scope:

  • 不改 NyxID / chrono / Ornn 等外部 sibling repositories。
  • 不把 long-lived bidirectional stream 折回 Projection Pipeline。
  • 不把 IChannelTransport 的 webhook/inbound abstraction 改名或强行收编为 StreamSession。
  • 不用 Metadata / string key 决定 frame 是否持久化;业务持久化规则必须在 consumer actor typed policy 中表达。
  • 不在 Host/API 保存 session_id -> context/writer/transport 事实状态;host 只能持有 volatile wire handle。
  • 不在首个 implement slice 中一次性吃下 audio/video/screen 全模态;先裁定 P0 text/tool/approval/control/lifecycle 与 media fast path 边界。

human_brief

这是 refactor loop 范畴,原因是它直接触及 AGENTS/CLAUDE 的 actor session 状态、projection 主链、CQRS/AGUI 统一 observation、Host 只做 wire adapter、Protobuf 强类型建模等架构规则。当前 issue 已有团队评论收敛到 StreamSessionGAgent 命名、AGUI 保留 wire format、channel inbound 不收编、text/control 与 media data plane 分层等方向;下一步需要 Phase 9 design-solving 把这些收敛意见变成可执行 ADR + first-slice 边界。

建议 first-slice 不直接“统一所有 stream”,而是输出 ADR 与 P0 contract:StreamSessionGAgent / StreamSessionState / StreamFrame 覆盖 text、tool、approval、control、lifecycle、heartbeat、ack;NyxIdChat outbound 是第一个落地点;VoicePresence 作为 P1/Poc 验证 media fast path 与 audio non-replay;AGUI sink/EventChannel 和 reply throttler 合并作为后续独立 slice。

Decision questions

  1. Actor 命名是否裁定为 StreamSessionGAgent,并明确禁止继续使用 StreamProxy/StreamingProxy 表示 connection session?
  2. P0 StreamFrame 是否包含 text_chunk/text_end/tool_call_start/tool_call_end/approval_request/approval_decision/lifecycle/control/heartbeat/ack,而 audio/video/screen 后置到 VoicePresence PoC 或独立 media frame?
  3. session_id 是否作为 opaque logical delivery stream actor id,user_id/conversation_id/transport_kind 只作为 typed state/lookup 条件,不参与 actor id 派生?
  4. Transport attachment 是否必须通过 actor command/event 注册,并禁止 Host 保存 session_id -> writer/context/transport 事实状态?
  5. Replay policy 是否按 frame kind 强类型裁定:text/tool/control/lifecycle 在 bounded window 内可 replay,heartbeat 不 replay,audio/video/screen 默认 gap/resync 或 media fast path?
  6. AGUI 是否保留为 presentation wire format,由 adapter 做 StreamFrame <-> AGUIEvent 映射,而不是把 AGUI 收编成核心 StreamFrame 子集?
  7. NyxIdChat P0 是只替换 outbound transport-session delivery 语义,还是同时删除 NyxIdChatStreamingRunner mapper;若保留 mapper,应重命名为 presentation adapter 以免继续叫 runner?
  8. ADR 是否需要同步更新 docs/canon/cqrs-projection.md 或新增 canon,明确 Projection Observation 与 StreamSession Delivery 的边界?

original_authors

为遵守共享规则,GitHub 正文不生成 maintainer 私名或 @ mention。

  • agents/Aevatar.GAgents.NyxidChat/NyxIdChatEndpoints.Streaming.cs: loning
  • agents/Aevatar.GAgents.NyxidChat/NyxIdChatStreamingRunner.cs: loning
  • src/Aevatar.Foundation.VoicePresence.Abstractions/Protos/voice_presence.proto: loning
  • src/Aevatar.Foundation.VoicePresence/Hosting/ActorOwnedVoicePresenceSessionResolver.cs: loning
  • agents/Aevatar.GAgents.Channel.Abstractions/Transport/IChannelTransport.cs: louis.li

⟦AI:AUTO-LOOP⟧

Metadata

Metadata

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions