diff --git a/agents/Aevatar.GAgents.StudioMember/IStudioMemberPlatformBindingCommandPort.cs b/agents/Aevatar.GAgents.StudioMember/IStudioMemberPlatformBindingCommandPort.cs new file mode 100644 index 000000000..c458f6a9d --- /dev/null +++ b/agents/Aevatar.GAgents.StudioMember/IStudioMemberPlatformBindingCommandPort.cs @@ -0,0 +1,21 @@ +namespace Aevatar.GAgents.StudioMember; + +/// +/// Admits and executes platform-side binding work for an admitted StudioMember binding run. +/// returns an accepted receipt only. +/// starts the platform work and reports completion or failure back to the run actor +/// through a later continuation event. Callers must only invoke it from a durable +/// actor state that can re-drive the same command after activation. +/// +public interface IStudioMemberPlatformBindingCommandPort +{ + Task StartAsync( + string replyActorId, + StudioMemberPlatformBindingStartRequested request, + CancellationToken ct = default); + + Task ExecuteAsync( + string replyActorId, + string platformBindingCommandId, + StudioMemberPlatformBindingStartRequested request); +} diff --git a/agents/Aevatar.GAgents.StudioMember/StudioMemberBindingRunGAgent.cs b/agents/Aevatar.GAgents.StudioMember/StudioMemberBindingRunGAgent.cs new file mode 100644 index 000000000..dc4bcd7da --- /dev/null +++ b/agents/Aevatar.GAgents.StudioMember/StudioMemberBindingRunGAgent.cs @@ -0,0 +1,687 @@ +using Aevatar.Foundation.Abstractions; +using Aevatar.Foundation.Abstractions.Attributes; +using Aevatar.Foundation.Core; +using Aevatar.Foundation.Core.EventSourcing; +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; + +namespace Aevatar.GAgents.StudioMember; + +/// +/// Short-lived actor for one StudioMember binding attempt. +/// +public sealed class StudioMemberBindingRunGAgent : GAgentBase, IProjectedActor +{ + private static readonly TimeSpan PlatformBindingExecuteInitialDelay = TimeSpan.FromMilliseconds(100); + private static readonly TimeSpan PlatformBindingWatchdogDelay = TimeSpan.FromSeconds(30); + private static readonly TimeSpan PlatformBindingExecutionStaleAfter = TimeSpan.FromMinutes(2); + private readonly IStudioMemberPlatformBindingCommandPort? _platformBindingPort; + + public static string ProjectionKind => "studio-member-binding-run"; + + public StudioMemberBindingRunGAgent(IStudioMemberPlatformBindingCommandPort? platformBindingPort = null) + { + _platformBindingPort = platformBindingPort; + } + + protected override async Task OnActivateAsync(CancellationToken ct) + { + await base.OnActivateAsync(ct); + + if (!CanRecoverRun()) + return; + + switch (State.Status) + { + case StudioMemberBindingRunStatus.AdmissionPending: + await SendAdmissionRequestAsync(ct); + break; + case StudioMemberBindingRunStatus.Admitted: + await SendPlatformBindingStartRequestedAsync(State.UpdatedAtUtc, ct); + break; + case StudioMemberBindingRunStatus.PlatformBindingPending: + await RecoverPlatformBindingPendingAsync( + State.UpdatedAtUtc ?? Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + ct); + break; + case StudioMemberBindingRunStatus.MemberNotificationPending: + await SendMemberTerminalNotificationAsync(ct); + break; + } + } + + [EventHandler(EndpointName = "requestBindingRun")] + public async Task HandleRequested(StudioMemberBindingRunRequested evt) + { + if (!string.IsNullOrEmpty(State.BindingRunId)) + { + if (!string.Equals(State.BindingRunId, evt.Request.BindingRunId, StringComparison.Ordinal)) + { + throw new InvalidOperationException( + $"binding run already initialized with id '{State.BindingRunId}'."); + } + + if (!IsSameRequest(State.Request, evt.Request, State.RequestHash)) + { + throw new InvalidOperationException( + $"binding run '{State.BindingRunId}' already exists with a different request payload."); + } + + return; + } + + await PersistDomainEventAsync(evt); + await SendAdmissionRequestAsync(); + } + + [EventHandler(EndpointName = "admitBindingRun")] + public async Task HandleAdmitted(StudioMemberBindingAdmittedEvent evt) + { + if (!CanAcceptAdmission(evt.BindingRunId)) + return; + + await PersistDomainEventAsync(evt); + await SendPlatformBindingStartRequestedAsync(evt.AdmittedAtUtc); + } + + [EventHandler(EndpointName = "rejectBindingRun")] + public async Task HandleRejected(StudioMemberBindingRejectedEvent evt) + { + if (!CanAcceptRunEvent(evt.BindingRunId)) + return; + + await PersistDomainEventAsync(evt); + } + + [EventHandler(EndpointName = "startPlatformBinding", AllowSelfHandling = true)] + public async Task HandlePlatformBindingStartRequested(StudioMemberPlatformBindingStartRequested evt) + { + if (!CanAcceptPlatformBindingStart(evt.BindingRunId)) + return; + + await PersistDomainEventAsync(evt); + + if (_platformBindingPort == null) + { + await SendToAsync(Id, new StudioMemberPlatformBindingFailed + { + BindingRunId = evt.BindingRunId, + PlatformBindingCommandId = evt.PlatformBindingCommandId, + Failure = new StudioMemberBindingFailure + { + Code = "STUDIO_MEMBER_PLATFORM_BINDING_PORT_UNAVAILABLE", + Message = "studio member platform binding command port is not registered.", + FailedAtUtc = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + }, + }); + return; + } + + var accepted = await _platformBindingPort.StartAsync(Id, evt); + await SendToAsync(Id, accepted); + } + + [EventHandler(EndpointName = "acceptPlatformBinding", AllowSelfHandling = true)] + public async Task HandlePlatformBindingAccepted(StudioMemberPlatformBindingAccepted evt) + { + if (!CanAcceptPlatformBindingAccepted(evt.BindingRunId, evt.PlatformBindingCommandId)) + return; + + await PersistDomainEventAsync(evt); + await SendPlatformBindingPendingAndExecuteAsync(evt.AcceptedAtUtc); + } + + [EventHandler(EndpointName = "executePlatformBinding", AllowSelfHandling = true)] + public async Task HandlePlatformBindingExecuteRequested(StudioMemberPlatformBindingExecuteRequested evt) + { + if (!CanAcceptPlatformBindingCommand(evt.BindingRunId, evt.PlatformBindingCommandId)) + return; + + if (State.PlatformExecutionInFlight && !evt.RecoveryExecution) + return; + + await PersistDomainEventAsync(new StudioMemberPlatformBindingExecutionStarted + { + BindingRunId = evt.BindingRunId, + PlatformBindingCommandId = evt.PlatformBindingCommandId, + StartedAtUtc = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + }); + + if (_platformBindingPort == null) + { + await SendToAsync(Id, new StudioMemberPlatformBindingFailed + { + BindingRunId = evt.BindingRunId, + PlatformBindingCommandId = evt.PlatformBindingCommandId, + Failure = new StudioMemberBindingFailure + { + Code = "STUDIO_MEMBER_PLATFORM_BINDING_PORT_UNAVAILABLE", + Message = "studio member platform binding command port is not registered.", + FailedAtUtc = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + }, + }); + return; + } + + await _platformBindingPort.ExecuteAsync( + Id, + evt.PlatformBindingCommandId, + new StudioMemberPlatformBindingStartRequested + { + BindingRunId = State.BindingRunId, + PlatformBindingCommandId = State.PlatformBindingCommandId, + Request = State.Request.Clone(), + Admitted = State.Admitted.Clone(), + RequestedAtUtc = State.UpdatedAtUtc ?? Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + }); + + await SchedulePlatformBindingWatchdogAsync(); + } + + [EventHandler(EndpointName = "platformBindingWatchdog", AllowSelfHandling = true)] + public async Task HandlePlatformBindingWatchdogFired(StudioMemberPlatformBindingWatchdogFired evt) + { + if (!CanAcceptPlatformBindingCommand(evt.BindingRunId, evt.PlatformBindingCommandId)) + return; + + if (!State.PlatformExecutionInFlight) + { + await SendToAsync(Id, new StudioMemberPlatformBindingExecuteRequested + { + BindingRunId = evt.BindingRunId, + PlatformBindingCommandId = evt.PlatformBindingCommandId, + }); + return; + } + + if (IsPlatformExecutionStale()) + { + await SendToAsync(Id, new StudioMemberPlatformBindingExecuteRequested + { + BindingRunId = evt.BindingRunId, + PlatformBindingCommandId = evt.PlatformBindingCommandId, + RecoveryExecution = true, + }); + return; + } + + await SchedulePlatformBindingWatchdogAsync(); + } + + [EventHandler(EndpointName = "completePlatformBinding")] + public async Task HandlePlatformBindingSucceeded(StudioMemberPlatformBindingSucceeded evt) + { + if (!CanAcceptPlatformBindingResult(evt.BindingRunId, evt.PlatformBindingCommandId)) + return; + + await PersistDomainEventAsync(evt); + await SendMemberTerminalNotificationAsync(); + } + + [EventHandler(EndpointName = "failPlatformBinding", AllowSelfHandling = true)] + public async Task HandlePlatformBindingFailed(StudioMemberPlatformBindingFailed evt) + { + if (!CanAcceptPlatformBindingResult(evt.BindingRunId, evt.PlatformBindingCommandId)) + return; + + await PersistDomainEventAsync(evt); + await SendMemberTerminalNotificationAsync(); + } + + [EventHandler(EndpointName = "acknowledgeMemberBindingTerminal", AllowSelfHandling = true)] + public async Task HandleMemberBindingTerminalAcknowledged(StudioMemberBindingTerminalAcknowledged evt) + { + if (!CanAcceptMemberTerminalAcknowledgement(evt.BindingRunId, evt.Status)) + return; + + await PersistDomainEventAsync(evt); + } + + protected override StudioMemberBindingRunState TransitionState( + StudioMemberBindingRunState current, + IMessage evt) + { + return StateTransitionMatcher + .Match(current, evt) + .On(ApplyRequested) + .On(ApplyAdmitted) + .On(ApplyRejected) + .On(ApplyPlatformBindingStartRequested) + .On(ApplyPlatformBindingAccepted) + .On(ApplyPlatformBindingExecutionStarted) + .On(ApplyPlatformBindingSucceeded) + .On(ApplyPlatformBindingFailed) + .On(ApplyMemberBindingTerminalAcknowledged) + .OrCurrent(); + } + + private static StudioMemberBindingRunState ApplyRequested( + StudioMemberBindingRunState state, + StudioMemberBindingRunRequested evt) + { + if (!string.IsNullOrEmpty(state.BindingRunId)) + return state; + + return new StudioMemberBindingRunState + { + BindingRunId = evt.Request.BindingRunId, + ScopeId = evt.Request.ScopeId, + MemberId = evt.Request.MemberId, + RequestHash = evt.Request.RequestHash, + Request = evt.Request.Clone(), + Status = StudioMemberBindingRunStatus.AdmissionPending, + AcceptedAtUtc = evt.RequestedAtUtc, + UpdatedAtUtc = evt.RequestedAtUtc, + AttemptCount = 0, + }; + } + + private static StudioMemberBindingRunState ApplyAdmitted( + StudioMemberBindingRunState state, + StudioMemberBindingAdmittedEvent evt) + { + if (IsStale(state, evt.BindingRunId) + || state.Status != StudioMemberBindingRunStatus.AdmissionPending) + { + return state; + } + + var next = state.Clone(); + next.Status = StudioMemberBindingRunStatus.Admitted; + next.Admitted = new StudioMemberBindingAdmittedSnapshot + { + MemberId = evt.MemberId, + ScopeId = evt.ScopeId, + PublishedServiceId = evt.PublishedServiceId, + ImplementationKind = evt.ImplementationKind, + DisplayName = evt.DisplayName, + }; + next.UpdatedAtUtc = evt.AdmittedAtUtc; + return next; + } + + private static StudioMemberBindingRunState ApplyRejected( + StudioMemberBindingRunState state, + StudioMemberBindingRejectedEvent evt) + { + if (IsStale(state, evt.BindingRunId) || IsTerminal(state.Status)) + return state; + + var next = state.Clone(); + next.Status = StudioMemberBindingRunStatus.Rejected; + next.Failure = evt.Failure?.Clone(); + if (evt.Failure?.FailedAtUtc != null) + next.UpdatedAtUtc = evt.Failure.FailedAtUtc; + return next; + } + + private static StudioMemberBindingRunState ApplyPlatformBindingStartRequested( + StudioMemberBindingRunState state, + StudioMemberPlatformBindingStartRequested evt) + { + if (IsStale(state, evt.BindingRunId) + || state.Status != StudioMemberBindingRunStatus.Admitted) + { + return state; + } + + var next = state.Clone(); + next.Status = StudioMemberBindingRunStatus.PlatformBindingPending; + next.PlatformBindingCommandId = evt.PlatformBindingCommandId; + next.PlatformExecutionInFlight = false; + next.PlatformExecutionStartedAtUtc = null; + next.AttemptCount++; + next.UpdatedAtUtc = evt.RequestedAtUtc; + return next; + } + + private static StudioMemberBindingRunState ApplyPlatformBindingAccepted( + StudioMemberBindingRunState state, + StudioMemberPlatformBindingAccepted evt) + { + if (IsStale(state, evt.BindingRunId) + || state.Status != StudioMemberBindingRunStatus.PlatformBindingPending + || state.PlatformExecutionInFlight + || !string.Equals(state.PlatformBindingCommandId, evt.PlatformBindingCommandId, StringComparison.Ordinal)) + { + return state; + } + + var next = state.Clone(); + next.Status = StudioMemberBindingRunStatus.PlatformBindingPending; + next.PlatformBindingCommandId = evt.PlatformBindingCommandId; + next.PlatformExecutionInFlight = false; + next.PlatformExecutionStartedAtUtc = null; + next.UpdatedAtUtc = evt.AcceptedAtUtc; + return next; + } + + private static StudioMemberBindingRunState ApplyPlatformBindingExecutionStarted( + StudioMemberBindingRunState state, + StudioMemberPlatformBindingExecutionStarted evt) + { + if (IsStale(state, evt.BindingRunId) + || state.Status != StudioMemberBindingRunStatus.PlatformBindingPending + || !string.Equals(state.PlatformBindingCommandId, evt.PlatformBindingCommandId, StringComparison.Ordinal)) + { + return state; + } + + var next = state.Clone(); + next.PlatformExecutionInFlight = true; + next.PlatformExecutionStartedAtUtc = evt.StartedAtUtc; + next.UpdatedAtUtc = evt.StartedAtUtc; + return next; + } + + private static StudioMemberBindingRunState ApplyPlatformBindingSucceeded( + StudioMemberBindingRunState state, + StudioMemberPlatformBindingSucceeded evt) + { + if (!CanApplyPlatformBindingResult(state, evt.BindingRunId, evt.PlatformBindingCommandId)) + return state; + + var next = state.Clone(); + next.Status = StudioMemberBindingRunStatus.MemberNotificationPending; + next.PlatformResult = evt.Result?.Clone(); + next.PlatformExecutionInFlight = false; + next.PlatformExecutionStartedAtUtc = null; + next.UpdatedAtUtc = evt.CompletedAtUtc; + return next; + } + + private static StudioMemberBindingRunState ApplyPlatformBindingFailed( + StudioMemberBindingRunState state, + StudioMemberPlatformBindingFailed evt) + { + if (!CanApplyPlatformBindingResult(state, evt.BindingRunId, evt.PlatformBindingCommandId)) + return state; + + var next = state.Clone(); + next.Status = StudioMemberBindingRunStatus.MemberNotificationPending; + next.Failure = evt.Failure?.Clone(); + next.PlatformExecutionInFlight = false; + next.PlatformExecutionStartedAtUtc = null; + if (evt.Failure?.FailedAtUtc != null) + next.UpdatedAtUtc = evt.Failure.FailedAtUtc; + return next; + } + + private static StudioMemberBindingRunState ApplyMemberBindingTerminalAcknowledged( + StudioMemberBindingRunState state, + StudioMemberBindingTerminalAcknowledged evt) + { + if (IsStale(state, evt.BindingRunId) + || state.Status != StudioMemberBindingRunStatus.MemberNotificationPending) + { + return state; + } + + var next = state.Clone(); + next.Status = evt.Status switch + { + StudioMemberBindingRunStatus.Succeeded => StudioMemberBindingRunStatus.Succeeded, + StudioMemberBindingRunStatus.Failed => StudioMemberBindingRunStatus.Failed, + _ => next.Status, + }; + next.UpdatedAtUtc = evt.AcknowledgedAtUtc; + return next; + } + + private static bool IsStale(StudioMemberBindingRunState state, string bindingRunId) => + !string.IsNullOrEmpty(state.BindingRunId) + && !string.Equals(state.BindingRunId, bindingRunId, StringComparison.Ordinal); + + private static bool IsTerminal(StudioMemberBindingRunStatus status) => + status is StudioMemberBindingRunStatus.Succeeded + or StudioMemberBindingRunStatus.Failed + or StudioMemberBindingRunStatus.Rejected; + + private bool CanAcceptRunEvent(string bindingRunId) => + !string.IsNullOrEmpty(State.BindingRunId) + && string.Equals(State.BindingRunId, bindingRunId, StringComparison.Ordinal) + && !IsTerminal(State.Status); + + private bool CanAcceptAdmission(string bindingRunId) => + !string.IsNullOrEmpty(State.BindingRunId) + && string.Equals(State.BindingRunId, bindingRunId, StringComparison.Ordinal) + && State.Status == StudioMemberBindingRunStatus.AdmissionPending; + + private bool CanAcceptPlatformBindingStart(string bindingRunId) => + !string.IsNullOrEmpty(State.BindingRunId) + && string.Equals(State.BindingRunId, bindingRunId, StringComparison.Ordinal) + && State.Status == StudioMemberBindingRunStatus.Admitted; + + private bool CanAcceptPlatformBindingAccepted(string bindingRunId, string platformBindingCommandId) => + !string.IsNullOrEmpty(State.BindingRunId) + && string.Equals(State.BindingRunId, bindingRunId, StringComparison.Ordinal) + && State.Status == StudioMemberBindingRunStatus.PlatformBindingPending + && !State.PlatformExecutionInFlight + && string.Equals(State.PlatformBindingCommandId, platformBindingCommandId, StringComparison.Ordinal); + + private bool CanAcceptPlatformBindingResult(string bindingRunId, string platformBindingCommandId) => + CanAcceptPlatformBindingCommand(bindingRunId, platformBindingCommandId); + + private bool CanAcceptPlatformBindingCommand(string bindingRunId, string platformBindingCommandId) => + !string.IsNullOrEmpty(State.BindingRunId) + && string.Equals(State.BindingRunId, bindingRunId, StringComparison.Ordinal) + && State.Status == StudioMemberBindingRunStatus.PlatformBindingPending + && string.Equals(State.PlatformBindingCommandId, platformBindingCommandId, StringComparison.Ordinal); + + private bool CanAcceptMemberTerminalAcknowledgement(string bindingRunId, StudioMemberBindingRunStatus status) => + !string.IsNullOrEmpty(State.BindingRunId) + && string.Equals(State.BindingRunId, bindingRunId, StringComparison.Ordinal) + && State.Status == StudioMemberBindingRunStatus.MemberNotificationPending + && (status == StudioMemberBindingRunStatus.Succeeded || status == StudioMemberBindingRunStatus.Failed) + && ((status == StudioMemberBindingRunStatus.Succeeded && State.PlatformResult != null) + || (status == StudioMemberBindingRunStatus.Failed && State.Failure != null)); + + private bool IsPlatformExecutionStale() + { + if (!State.PlatformExecutionInFlight || State.PlatformExecutionStartedAtUtc == null) + return false; + + var startedAt = State.PlatformExecutionStartedAtUtc.ToDateTimeOffset(); + return DateTimeOffset.UtcNow - startedAt >= PlatformBindingExecutionStaleAfter; + } + + private static bool CanApplyPlatformBindingResult( + StudioMemberBindingRunState state, + string bindingRunId, + string platformBindingCommandId) => + !string.IsNullOrEmpty(state.BindingRunId) + && string.Equals(state.BindingRunId, bindingRunId, StringComparison.Ordinal) + && state.Status == StudioMemberBindingRunStatus.PlatformBindingPending + && string.Equals(state.PlatformBindingCommandId, platformBindingCommandId, StringComparison.Ordinal); + + private bool CanRecoverRun() => + !string.IsNullOrEmpty(State.BindingRunId) + && State.Request != null + && State.Status switch + { + StudioMemberBindingRunStatus.AdmissionPending => true, + StudioMemberBindingRunStatus.Admitted => State.Admitted != null, + StudioMemberBindingRunStatus.PlatformBindingPending => + State.Admitted != null && !string.IsNullOrEmpty(State.PlatformBindingCommandId), + StudioMemberBindingRunStatus.MemberNotificationPending => + State.Admitted != null + && !string.IsNullOrEmpty(State.PlatformBindingCommandId) + && (State.PlatformResult != null || State.Failure != null), + _ => false, + }; + + private Task SendAdmissionRequestAsync(CancellationToken ct = default) => + SendToAsync( + StudioMemberConventions.BuildActorId(State.ScopeId, State.MemberId), + new StudioMemberBindAdmissionRequested + { + BindingRunId = State.BindingRunId, + ScopeId = State.ScopeId, + MemberId = State.MemberId, + RequestHash = State.RequestHash, + Request = State.Request.Clone(), + RequestedAtUtc = State.UpdatedAtUtc ?? State.AcceptedAtUtc ?? Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + }, + ct); + + private Task SendPlatformBindingStartRequestedAsync(Timestamp? requestedAtUtc = null, CancellationToken ct = default) + { + if (State.Admitted == null) + return Task.CompletedTask; + + return SendToAsync( + Id, + new StudioMemberPlatformBindingStartRequested + { + BindingRunId = State.BindingRunId, + PlatformBindingCommandId = StudioMemberConventions.BuildPlatformBindingCommandId( + State.BindingRunId, + State.AttemptCount + 1), + Request = State.Request.Clone(), + Admitted = State.Admitted.Clone(), + RequestedAtUtc = requestedAtUtc ?? State.UpdatedAtUtc ?? Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + }, + ct); + } + + private async Task SendPlatformBindingPendingAndExecuteAsync( + Timestamp pendingAtUtc, + bool recoveryExecution = false, + CancellationToken ct = default) + { + if (string.IsNullOrEmpty(State.PlatformBindingCommandId) || State.Admitted == null) + return; + + await SchedulePlatformBindingExecuteRequestedAsync( + PlatformBindingExecuteInitialDelay, + recoveryExecution, + ct); + + await SendToAsync( + StudioMemberConventions.BuildActorId(State.ScopeId, State.MemberId), + new StudioMemberBindingPlatformPendingEvent + { + BindingRunId = State.BindingRunId, + PlatformBindingCommandId = State.PlatformBindingCommandId, + PendingAtUtc = pendingAtUtc, + }, + ct); + } + + private async Task RecoverPlatformBindingPendingAsync( + Timestamp pendingAtUtc, + CancellationToken ct) + { + if (State.PlatformExecutionInFlight && !IsPlatformExecutionStale()) + { + await SchedulePlatformBindingWatchdogAsync(ct); + return; + } + + await SendPlatformBindingPendingAndExecuteAsync( + pendingAtUtc, + recoveryExecution: true, + ct); + } + + private Task SchedulePlatformBindingExecuteRequestedAsync( + TimeSpan dueTime, + bool recoveryExecution = false, + CancellationToken ct = default) + { + if (string.IsNullOrEmpty(State.PlatformBindingCommandId) || State.Admitted == null) + return Task.CompletedTask; + + return ScheduleSelfDurableTimeoutAsync( + BuildPlatformBindingExecuteCallbackId(State.BindingRunId, State.PlatformBindingCommandId), + dueTime, + new StudioMemberPlatformBindingExecuteRequested + { + BindingRunId = State.BindingRunId, + PlatformBindingCommandId = State.PlatformBindingCommandId, + RecoveryExecution = recoveryExecution, + }, + ct: ct); + } + + private Task SendMemberTerminalNotificationAsync(CancellationToken ct = default) + { + if (State.PlatformResult != null) + { + return SendToAsync( + StudioMemberConventions.BuildActorId(State.ScopeId, State.MemberId), + new StudioMemberBindingCompletedEvent + { + BindingRunId = State.BindingRunId, + PublishedServiceId = State.PlatformResult.PublishedServiceId, + RevisionId = State.PlatformResult.RevisionId, + ImplementationKind = State.PlatformResult.ImplementationKind, + ImplementationRef = State.PlatformResult.ImplementationRef?.Clone(), + CompletedAtUtc = State.UpdatedAtUtc ?? Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + }, + ct); + } + + if (State.Failure != null) + { + return SendToAsync( + StudioMemberConventions.BuildActorId(State.ScopeId, State.MemberId), + new StudioMemberBindingFailedEvent + { + BindingRunId = State.BindingRunId, + Failure = State.Failure.Clone(), + }, + ct); + } + + return Task.CompletedTask; + } + + private Task SchedulePlatformBindingWatchdogAsync(CancellationToken ct = default) + { + if (string.IsNullOrEmpty(State.PlatformBindingCommandId) || State.Admitted == null) + return Task.CompletedTask; + + return ScheduleSelfDurableTimeoutAsync( + BuildPlatformBindingWatchdogCallbackId(State.BindingRunId, State.PlatformBindingCommandId), + PlatformBindingWatchdogDelay, + new StudioMemberPlatformBindingWatchdogFired + { + BindingRunId = State.BindingRunId, + PlatformBindingCommandId = State.PlatformBindingCommandId, + }, + ct: ct); + } + + private static bool IsSameRequest( + StudioMemberBindingRequest? current, + StudioMemberBindingRequest incoming, + string currentHash) + { + if (current == null) + return false; + + if (!string.IsNullOrWhiteSpace(currentHash) + && !string.IsNullOrWhiteSpace(incoming.RequestHash) + && !string.Equals(currentHash, incoming.RequestHash, StringComparison.Ordinal)) + { + return false; + } + + var normalizedCurrent = current.Clone(); + normalizedCurrent.RequestHash = string.Empty; + var normalizedIncoming = incoming.Clone(); + normalizedIncoming.RequestHash = string.Empty; + return normalizedCurrent.Equals(normalizedIncoming); + } + + private static string BuildPlatformBindingExecuteCallbackId( + string bindingRunId, + string platformBindingCommandId) => + $"studio-member-binding-execute:{bindingRunId}:{platformBindingCommandId}"; + + private static string BuildPlatformBindingWatchdogCallbackId( + string bindingRunId, + string platformBindingCommandId) => + $"studio-member-binding-watchdog:{bindingRunId}:{platformBindingCommandId}"; +} diff --git a/agents/Aevatar.GAgents.StudioMember/StudioMemberConventions.cs b/agents/Aevatar.GAgents.StudioMember/StudioMemberConventions.cs index 810d8134d..443ed09e6 100644 --- a/agents/Aevatar.GAgents.StudioMember/StudioMemberConventions.cs +++ b/agents/Aevatar.GAgents.StudioMember/StudioMemberConventions.cs @@ -12,6 +12,7 @@ namespace Aevatar.GAgents.StudioMember; public static class StudioMemberConventions { public const string ActorIdPrefix = "studio-member"; + public const string BindingRunActorIdPrefix = "studio-member-binding-run"; public const string PublishedServiceIdPrefix = "member"; /// @@ -36,6 +37,20 @@ public static string BuildPublishedServiceId(string memberId) return $"{PublishedServiceIdPrefix}-{normalizedMemberId}"; } + public static string BuildBindingRunActorId(string bindingRunId) + { + var normalizedBindingRunId = NormalizeBindingRunId(bindingRunId); + return $"{BindingRunActorIdPrefix}:{normalizedBindingRunId}"; + } + + public static string BuildPlatformBindingCommandId(string bindingRunId, int attempt) + { + var normalizedBindingRunId = NormalizeBindingRunId(bindingRunId); + if (attempt <= 0) + throw new ArgumentOutOfRangeException(nameof(attempt), attempt, "attempt must be positive."); + return $"platform-{normalizedBindingRunId}-{attempt}"; + } + public static string NormalizeScopeId(string? scopeId) { var trimmed = scopeId?.Trim(); @@ -58,5 +73,16 @@ public static string NormalizeMemberId(string? memberId) return trimmed; } + public static string NormalizeBindingRunId(string? bindingRunId) + { + var trimmed = bindingRunId?.Trim(); + if (string.IsNullOrEmpty(trimmed)) + throw new ArgumentException("bindingRunId is required.", nameof(bindingRunId)); + if (ContainsActorIdSeparator(trimmed)) + throw new ArgumentException( + "bindingRunId must not contain ':' (it is the actor-id separator).", nameof(bindingRunId)); + return trimmed; + } + private static bool ContainsActorIdSeparator(string value) => value.Contains(':'); } diff --git a/agents/Aevatar.GAgents.StudioMember/StudioMemberGAgent.cs b/agents/Aevatar.GAgents.StudioMember/StudioMemberGAgent.cs index 7c95bcbeb..f777f48bf 100644 --- a/agents/Aevatar.GAgents.StudioMember/StudioMemberGAgent.cs +++ b/agents/Aevatar.GAgents.StudioMember/StudioMemberGAgent.cs @@ -90,15 +90,146 @@ public async Task HandleImplementationUpdated(StudioMemberImplementationUpdatedE await PersistDomainEventAsync(evt); } - [EventHandler(EndpointName = "recordBinding")] - public async Task HandleBound(StudioMemberBoundEvent evt) + [EventHandler(EndpointName = "requestBindingAdmission")] + public async Task HandleBindingAdmissionRequested(StudioMemberBindAdmissionRequested evt) + { + var runActorId = StudioMemberConventions.BuildBindingRunActorId(evt.BindingRunId); + var failedAt = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow); + + if (string.IsNullOrEmpty(State.MemberId)) + { + await SendToAsync(runActorId, BuildRejected(evt, "STUDIO_MEMBER_NOT_FOUND", "member not yet created.", failedAt)); + return; + } + + if (!string.Equals(State.ScopeId, evt.ScopeId, StringComparison.Ordinal) + || !string.Equals(State.MemberId, evt.MemberId, StringComparison.Ordinal)) + { + await SendToAsync(runActorId, BuildRejected(evt, "STUDIO_MEMBER_TARGET_MISMATCH", "binding admission target does not match member authority state.", failedAt)); + return; + } + + if (TryBuildTerminalBindingRunReplayResponse(State, evt, failedAt, out var terminalReplayResponse)) + { + await SendToAsync(runActorId, terminalReplayResponse); + return; + } + + if (IsTerminalBindingRunReplay(State, evt.BindingRunId)) + { + return; + } + + if (HasActiveBindingRun(State, evt.BindingRunId)) + { + await SendToAsync(runActorId, BuildRejected( + evt, + "STUDIO_MEMBER_BINDING_RUN_ALREADY_ACTIVE", + "member already has an active binding run.", + failedAt)); + return; + } + + if (IsSupersededBindingRun(State, evt.BindingRunId, evt.RequestedAtUtc)) + { + await SendToAsync(runActorId, BuildRejected(evt, "STUDIO_MEMBER_BINDING_RUN_SUPERSEDED", "binding run was superseded by a newer member binding run.", failedAt)); + return; + } + + var requestedKind = GetRequestImplementationKind(evt.Request); + if (requestedKind != State.ImplementationKind) + { + var rejected = BuildRejected( + evt, + "STUDIO_MEMBER_IMPLEMENTATION_KIND_MISMATCH", + $"binding request kind '{requestedKind}' does not match member kind '{State.ImplementationKind}'.", + failedAt); + await PersistDomainEventsAsync([evt, rejected]); + await SendToAsync(runActorId, rejected); + return; + } + + var admitted = new StudioMemberBindingAdmittedEvent + { + BindingRunId = evt.BindingRunId, + ScopeId = State.ScopeId, + MemberId = State.MemberId, + PublishedServiceId = State.PublishedServiceId, + ImplementationKind = State.ImplementationKind, + DisplayName = State.DisplayName, + AdmittedAtUtc = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + }; + + await PersistDomainEventsAsync([evt, admitted]); + await SendToAsync(runActorId, admitted); + } + + [EventHandler(EndpointName = "markBindingPlatformPending")] + public async Task HandleBindingPlatformPending(StudioMemberBindingPlatformPendingEvent evt) + { + if (string.IsNullOrEmpty(State.MemberId)) + { + throw new InvalidOperationException("member not yet created."); + } + + if (!CanAcceptBindingRunProgress(State, evt.BindingRunId)) + { + return; + } + + if (State.Binding?.CurrentStatus == StudioMemberBindingRunStatus.PlatformBindingPending + && string.Equals(State.Binding.CurrentBindingRunId, evt.BindingRunId, StringComparison.Ordinal)) + { + return; + } + + await PersistDomainEventAsync(evt); + } + + [EventHandler(EndpointName = "completeBinding")] + public async Task HandleBindingCompleted(StudioMemberBindingCompletedEvent evt) + { + if (string.IsNullOrEmpty(State.MemberId)) + { + throw new InvalidOperationException("member not yet created."); + } + + if (IsTerminalBindingRunReplay(State, evt.BindingRunId, StudioMemberBindingRunStatus.Succeeded)) + { + await SendTerminalAcknowledgementAsync(evt.BindingRunId, StudioMemberBindingRunStatus.Succeeded); + return; + } + + if (!CanAcceptBindingRunProgress(State, evt.BindingRunId)) + { + return; + } + + await PersistDomainEventAsync(evt); + await SendTerminalAcknowledgementAsync(evt.BindingRunId, StudioMemberBindingRunStatus.Succeeded); + } + + [EventHandler(EndpointName = "failBinding")] + public async Task HandleBindingFailed(StudioMemberBindingFailedEvent evt) { if (string.IsNullOrEmpty(State.MemberId)) { throw new InvalidOperationException("member not yet created."); } + if (IsTerminalBindingRunReplay(State, evt.BindingRunId, StudioMemberBindingRunStatus.Failed)) + { + await SendTerminalAcknowledgementAsync(evt.BindingRunId, StudioMemberBindingRunStatus.Failed); + return; + } + + if (!CanAcceptBindingRunProgress(State, evt.BindingRunId)) + { + return; + } + await PersistDomainEventAsync(evt); + await SendTerminalAcknowledgementAsync(evt.BindingRunId, StudioMemberBindingRunStatus.Failed); } /// @@ -186,7 +317,12 @@ protected override StudioMemberState TransitionState( .On(ApplyCreated) .On(ApplyRenamed) .On(ApplyImplementationUpdated) - .On(ApplyBound) + .On(ApplyBindingAdmissionRequested) + .On(ApplyBindingAdmitted) + .On(ApplyBindingRejected) + .On(ApplyBindingPlatformPending) + .On(ApplyBindingCompleted) + .On(ApplyBindingFailed) .On(ApplyReassigned) .OrCurrent(); } @@ -227,6 +363,71 @@ private static StudioMemberState ApplyRenamed( return next; } + private static StudioMemberState ApplyBindingAdmissionRequested( + StudioMemberState state, + StudioMemberBindAdmissionRequested evt) + { + if (ShouldIgnoreBindingRunStart(state, evt.BindingRunId, evt.RequestedAtUtc)) + return state; + + var next = state.Clone(); + next.Binding = new StudioMemberBindingAuthorityState + { + CurrentBindingRunId = evt.BindingRunId, + CurrentStatus = StudioMemberBindingRunStatus.AdmissionPending, + LastTerminalBindingRunId = next.Binding?.LastTerminalBindingRunId ?? string.Empty, + LastFailure = next.Binding?.LastFailure?.Clone(), + UpdatedAtUtc = evt.RequestedAtUtc, + }; + next.UpdatedAtUtc = evt.RequestedAtUtc; + return next; + } + + private static StudioMemberState ApplyBindingAdmitted( + StudioMemberState state, + StudioMemberBindingAdmittedEvent evt) + { + if (!CanAcceptBindingRunProgress(state, evt.BindingRunId)) + return state; + + var currentStatus = state.Binding?.CurrentStatus ?? StudioMemberBindingRunStatus.Unspecified; + if (currentStatus == StudioMemberBindingRunStatus.PlatformBindingPending) + return state; + + var next = state.Clone(); + next.Binding = new StudioMemberBindingAuthorityState + { + CurrentBindingRunId = evt.BindingRunId, + CurrentStatus = StudioMemberBindingRunStatus.Admitted, + LastTerminalBindingRunId = next.Binding?.LastTerminalBindingRunId ?? string.Empty, + LastFailure = next.Binding?.LastFailure?.Clone(), + UpdatedAtUtc = evt.AdmittedAtUtc, + }; + next.UpdatedAtUtc = evt.AdmittedAtUtc; + return next; + } + + private static StudioMemberState ApplyBindingRejected( + StudioMemberState state, + StudioMemberBindingRejectedEvent evt) + { + if (!CanAcceptBindingRunProgress(state, evt.BindingRunId)) + return state; + + var failedAt = evt.Failure?.FailedAtUtc ?? Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow); + var next = state.Clone(); + next.Binding = new StudioMemberBindingAuthorityState + { + CurrentBindingRunId = evt.BindingRunId, + CurrentStatus = StudioMemberBindingRunStatus.Rejected, + LastTerminalBindingRunId = evt.BindingRunId, + LastFailure = evt.Failure?.Clone(), + UpdatedAtUtc = failedAt, + }; + next.UpdatedAtUtc = failedAt; + return next; + } + private static StudioMemberState ApplyImplementationUpdated( StudioMemberState state, StudioMemberImplementationUpdatedEvent evt) { @@ -262,22 +463,214 @@ private static StudioMemberState ApplyImplementationUpdated( return next; } - private static StudioMemberState ApplyBound( - StudioMemberState state, StudioMemberBoundEvent evt) + private static StudioMemberState ApplyBindingPlatformPending( + StudioMemberState state, + StudioMemberBindingPlatformPendingEvent evt) { + if (!CanAcceptBindingRunProgress(state, evt.BindingRunId)) + return state; + + var next = state.Clone(); + next.Binding = new StudioMemberBindingAuthorityState + { + CurrentBindingRunId = evt.BindingRunId, + CurrentStatus = StudioMemberBindingRunStatus.PlatformBindingPending, + LastTerminalBindingRunId = next.Binding?.LastTerminalBindingRunId ?? string.Empty, + LastFailure = next.Binding?.LastFailure?.Clone(), + UpdatedAtUtc = evt.PendingAtUtc, + }; + next.UpdatedAtUtc = evt.PendingAtUtc; + return next; + } + + private static StudioMemberState ApplyBindingCompleted( + StudioMemberState state, StudioMemberBindingCompletedEvent evt) + { + if (!CanAcceptBindingRunProgress(state, evt.BindingRunId)) + return state; + var next = state.Clone(); next.LastBinding = new StudioMemberBindingContract { PublishedServiceId = evt.PublishedServiceId, RevisionId = evt.RevisionId, ImplementationKind = evt.ImplementationKind, - BoundAtUtc = evt.BoundAtUtc, + BoundAtUtc = evt.CompletedAtUtc, + }; + if (HasResolvedImplementationRef(evt.ImplementationRef)) + { + next.ImplementationRef = evt.ImplementationRef.Clone(); + } + next.Binding = new StudioMemberBindingAuthorityState + { + CurrentBindingRunId = evt.BindingRunId, + CurrentStatus = StudioMemberBindingRunStatus.Succeeded, + LastTerminalBindingRunId = evt.BindingRunId, + LastFailure = null, + UpdatedAtUtc = evt.CompletedAtUtc, }; next.LifecycleStage = StudioMemberLifecycleStage.BindReady; - next.UpdatedAtUtc = evt.BoundAtUtc; + next.UpdatedAtUtc = evt.CompletedAtUtc; + return next; + } + + private static StudioMemberState ApplyBindingFailed( + StudioMemberState state, + StudioMemberBindingFailedEvent evt) + { + if (!CanAcceptBindingRunProgress(state, evt.BindingRunId)) + return state; + + var failedAt = evt.Failure?.FailedAtUtc ?? Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow); + var next = state.Clone(); + next.Binding = new StudioMemberBindingAuthorityState + { + CurrentBindingRunId = evt.BindingRunId, + CurrentStatus = StudioMemberBindingRunStatus.Failed, + LastTerminalBindingRunId = evt.BindingRunId, + LastFailure = evt.Failure?.Clone(), + UpdatedAtUtc = failedAt, + }; + next.UpdatedAtUtc = failedAt; return next; } + private static bool CanAcceptBindingRunProgress(StudioMemberState state, string bindingRunId) + { + var currentRun = state.Binding?.CurrentBindingRunId; + return !string.IsNullOrEmpty(currentRun) + && string.Equals(currentRun, bindingRunId, StringComparison.Ordinal) + && !IsCurrentBindingTerminal(state); + } + + private static bool HasActiveBindingRun(StudioMemberState state, string incomingBindingRunId) + { + var currentBinding = state.Binding; + return currentBinding != null + && !string.IsNullOrEmpty(currentBinding.CurrentBindingRunId) + && !string.Equals(currentBinding.CurrentBindingRunId, incomingBindingRunId, StringComparison.Ordinal) + && !IsTerminalBindingStatus(currentBinding.CurrentStatus); + } + + private static bool ShouldIgnoreBindingRunStart( + StudioMemberState state, + string bindingRunId, + Timestamp? requestedAtUtc) + { + var currentBinding = state.Binding; + if (currentBinding == null || string.IsNullOrEmpty(currentBinding.CurrentBindingRunId)) + return false; + + if (string.Equals(currentBinding.CurrentBindingRunId, bindingRunId, StringComparison.Ordinal)) + return true; + + if (!IsTerminalBindingStatus(currentBinding.CurrentStatus)) + return true; + + if (currentBinding.UpdatedAtUtc == null) + return false; + + return CompareTimestamp(requestedAtUtc, currentBinding.UpdatedAtUtc) <= 0; + } + + private static bool IsSupersededBindingRun( + StudioMemberState state, + string bindingRunId, + Timestamp? requestedAtUtc) + { + var currentBinding = state.Binding; + if (currentBinding == null || string.IsNullOrEmpty(currentBinding.CurrentBindingRunId)) + return false; + + if (string.Equals(currentBinding.CurrentBindingRunId, bindingRunId, StringComparison.Ordinal)) + return false; + + if (currentBinding.UpdatedAtUtc == null) + return false; + + return CompareTimestamp(requestedAtUtc, currentBinding.UpdatedAtUtc) <= 0; + } + + private static bool IsTerminalBindingRunReplay(StudioMemberState state, string bindingRunId) + { + var currentBinding = state.Binding; + return currentBinding != null + && string.Equals(currentBinding.CurrentBindingRunId, bindingRunId, StringComparison.Ordinal) + && IsTerminalBindingStatus(currentBinding.CurrentStatus); + } + + private static bool IsTerminalBindingRunReplay( + StudioMemberState state, + string bindingRunId, + StudioMemberBindingRunStatus expectedStatus) + { + var currentBinding = state.Binding; + return currentBinding != null + && string.Equals(currentBinding.CurrentBindingRunId, bindingRunId, StringComparison.Ordinal) + && currentBinding.CurrentStatus == expectedStatus; + } + + private static bool TryBuildTerminalBindingRunReplayResponse( + StudioMemberState state, + StudioMemberBindAdmissionRequested request, + Timestamp failedAt, + out StudioMemberBindingRejectedEvent response) + { + response = new StudioMemberBindingRejectedEvent(); + + var currentBinding = state.Binding; + if (currentBinding == null + || !string.Equals(currentBinding.CurrentBindingRunId, request.BindingRunId, StringComparison.Ordinal) + || currentBinding.CurrentStatus != StudioMemberBindingRunStatus.Rejected) + { + return false; + } + + response = new StudioMemberBindingRejectedEvent + { + BindingRunId = request.BindingRunId, + ScopeId = state.ScopeId, + MemberId = state.MemberId, + Failure = currentBinding.LastFailure?.Clone() ?? new StudioMemberBindingFailure + { + Code = "STUDIO_MEMBER_BINDING_RUN_REJECTED", + Message = "binding run was already rejected.", + FailedAtUtc = failedAt, + }, + }; + return true; + } + + private static bool IsCurrentBindingTerminal(StudioMemberState state) => + IsTerminalBindingStatus(state.Binding?.CurrentStatus ?? StudioMemberBindingRunStatus.Unspecified); + + private static bool IsTerminalBindingStatus(StudioMemberBindingRunStatus status) => + status is StudioMemberBindingRunStatus.Succeeded + or StudioMemberBindingRunStatus.Failed + or StudioMemberBindingRunStatus.Rejected; + + private Task SendTerminalAcknowledgementAsync(string bindingRunId, StudioMemberBindingRunStatus status) => + SendToAsync( + StudioMemberConventions.BuildBindingRunActorId(bindingRunId), + new StudioMemberBindingTerminalAcknowledged + { + BindingRunId = bindingRunId, + Status = status, + AcknowledgedAtUtc = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + }); + + private static int CompareTimestamp(Timestamp? left, Timestamp? right) + { + if (left == null && right == null) + return 0; + if (left == null) + return -1; + if (right == null) + return 1; + + return left.ToDateTimeOffset().CompareTo(right.ToDateTimeOffset()); + } + private static StudioMemberState ApplyReassigned( StudioMemberState state, StudioMemberReassignedEvent evt) { @@ -308,4 +701,31 @@ private static bool HasResolvedImplementationRef(StudioMemberImplementationRef? return false; } + + private static StudioMemberImplementationKind GetRequestImplementationKind(StudioMemberBindingRequest request) => + request.ImplementationCase switch + { + StudioMemberBindingRequest.ImplementationOneofCase.Workflow => StudioMemberImplementationKind.Workflow, + StudioMemberBindingRequest.ImplementationOneofCase.Script => StudioMemberImplementationKind.Script, + StudioMemberBindingRequest.ImplementationOneofCase.Gagent => StudioMemberImplementationKind.Gagent, + _ => StudioMemberImplementationKind.Unspecified, + }; + + private static StudioMemberBindingRejectedEvent BuildRejected( + StudioMemberBindAdmissionRequested evt, + string code, + string message, + Timestamp failedAt) => + new() + { + BindingRunId = evt.BindingRunId, + ScopeId = evt.ScopeId, + MemberId = evt.MemberId, + Failure = new StudioMemberBindingFailure + { + Code = code, + Message = message, + FailedAtUtc = failedAt, + }, + }; } diff --git a/agents/Aevatar.GAgents.StudioMember/studio_member_messages.proto b/agents/Aevatar.GAgents.StudioMember/studio_member_messages.proto index b30799751..b0c7c161f 100644 --- a/agents/Aevatar.GAgents.StudioMember/studio_member_messages.proto +++ b/agents/Aevatar.GAgents.StudioMember/studio_member_messages.proto @@ -27,6 +27,24 @@ enum StudioMemberLifecycleStage { STUDIO_MEMBER_LIFECYCLE_STAGE_BIND_READY = 3; } +enum StudioMemberBindingRunStatus { + STUDIO_MEMBER_BINDING_RUN_STATUS_UNSPECIFIED = 0; + STUDIO_MEMBER_BINDING_RUN_STATUS_ACCEPTED = 1; + STUDIO_MEMBER_BINDING_RUN_STATUS_ADMISSION_PENDING = 2; + STUDIO_MEMBER_BINDING_RUN_STATUS_ADMITTED = 3; + STUDIO_MEMBER_BINDING_RUN_STATUS_PLATFORM_BINDING_PENDING = 4; + STUDIO_MEMBER_BINDING_RUN_STATUS_SUCCEEDED = 5; + STUDIO_MEMBER_BINDING_RUN_STATUS_FAILED = 6; + STUDIO_MEMBER_BINDING_RUN_STATUS_REJECTED = 7; + STUDIO_MEMBER_BINDING_RUN_STATUS_MEMBER_NOTIFICATION_PENDING = 8; +} + +enum StudioMemberGAgentEndpointKind { + STUDIO_MEMBER_GAGENT_ENDPOINT_KIND_UNSPECIFIED = 0; + STUDIO_MEMBER_GAGENT_ENDPOINT_KIND_COMMAND = 1; + STUDIO_MEMBER_GAGENT_ENDPOINT_KIND_CHAT = 2; +} + // ─── Implementation refs ─── // // Each implementation kind carries a typed sub-message rather than a generic @@ -66,6 +84,90 @@ message StudioMemberBindingContract { google.protobuf.Timestamp bound_at_utc = 4; } +message StudioMemberBindingFailure { + string code = 1; + string message = 2; + google.protobuf.Timestamp failed_at_utc = 3; +} + +message StudioMemberBindingAuthorityState { + string current_binding_run_id = 1; + StudioMemberBindingRunStatus current_status = 2; + string last_terminal_binding_run_id = 3; + StudioMemberBindingFailure last_failure = 4; + google.protobuf.Timestamp updated_at_utc = 5; +} + +message StudioMemberWorkflowBindingRequest { + repeated string workflow_yamls = 1; +} + +message StudioMemberScriptBindingRequest { + string script_id = 1; + optional string script_revision = 2; +} + +message StudioMemberGAgentEndpointBindingRequest { + string endpoint_id = 1; + string display_name = 2; + StudioMemberGAgentEndpointKind kind = 3; + string request_type_url = 4; + string response_type_url = 5; + string description = 6; +} + +message StudioMemberGAgentBindingRequest { + string actor_type_name = 1; + repeated StudioMemberGAgentEndpointBindingRequest endpoints = 2; +} + +message StudioMemberBindingRequest { + string binding_run_id = 1; + string scope_id = 2; + string member_id = 3; + string request_hash = 4; + optional string revision_id = 5; + oneof implementation { + StudioMemberWorkflowBindingRequest workflow = 10; + StudioMemberScriptBindingRequest script = 11; + StudioMemberGAgentBindingRequest gagent = 12; + } +} + +message StudioMemberBindingAdmittedSnapshot { + string member_id = 1; + string scope_id = 2; + string published_service_id = 3; + StudioMemberImplementationKind implementation_kind = 4; + string display_name = 5; +} + +message StudioMemberPlatformBindingResult { + string published_service_id = 1; + string revision_id = 2; + StudioMemberImplementationKind implementation_kind = 3; + string expected_actor_id = 4; + StudioMemberImplementationRef implementation_ref = 5; +} + +message StudioMemberBindingRunState { + string binding_run_id = 1; + string scope_id = 2; + string member_id = 3; + string request_hash = 4; + StudioMemberBindingRunStatus status = 5; + StudioMemberBindingRequest request = 6; + StudioMemberBindingAdmittedSnapshot admitted = 7; + StudioMemberPlatformBindingResult platform_result = 8; + StudioMemberBindingFailure failure = 9; + google.protobuf.Timestamp accepted_at_utc = 10; + google.protobuf.Timestamp updated_at_utc = 11; + int32 attempt_count = 12; + string platform_binding_command_id = 13; + bool platform_execution_in_flight = 14; + google.protobuf.Timestamp platform_execution_started_at_utc = 15; +} + // ─── State ─── // // State for a single StudioMember actor. Actor ID format: @@ -87,6 +189,10 @@ message StudioMemberState { google.protobuf.Timestamp created_at_utc = 9; google.protobuf.Timestamp updated_at_utc = 10; StudioMemberBindingContract last_binding = 11; + // Async binding protocol state. `last_binding` remains the single source of + // truth for the last successful binding contract; this sub-state owns active + // run status, stale-run rejection, and failure details. + StudioMemberBindingAuthorityState binding = 12; // Optional team membership (ADR-0017). Absent means "unassigned"; empty // string is rejected at the application layer. Mutated only via committed // StudioMemberReassignedEvent. @@ -120,11 +226,105 @@ message StudioMemberImplementationUpdatedEvent { google.protobuf.Timestamp updated_at_utc = 3; } -message StudioMemberBoundEvent { - string published_service_id = 1; - string revision_id = 2; - StudioMemberImplementationKind implementation_kind = 3; - google.protobuf.Timestamp bound_at_utc = 4; +message StudioMemberBindingPlatformPendingEvent { + string binding_run_id = 1; + string platform_binding_command_id = 2; + google.protobuf.Timestamp pending_at_utc = 3; +} + +message StudioMemberBindingCompletedEvent { + string binding_run_id = 1; + string published_service_id = 2; + string revision_id = 3; + StudioMemberImplementationKind implementation_kind = 4; + StudioMemberImplementationRef implementation_ref = 5; + google.protobuf.Timestamp completed_at_utc = 6; +} + +message StudioMemberBindingFailedEvent { + string binding_run_id = 1; + StudioMemberBindingFailure failure = 2; +} + +message StudioMemberBindingRunRequested { + StudioMemberBindingRequest request = 1; + google.protobuf.Timestamp requested_at_utc = 2; +} + +message StudioMemberBindAdmissionRequested { + string binding_run_id = 1; + string scope_id = 2; + string member_id = 3; + string request_hash = 4; + StudioMemberBindingRequest request = 5; + google.protobuf.Timestamp requested_at_utc = 6; +} + +message StudioMemberBindingAdmittedEvent { + string binding_run_id = 1; + string member_id = 2; + string scope_id = 3; + string published_service_id = 4; + StudioMemberImplementationKind implementation_kind = 5; + string display_name = 6; + google.protobuf.Timestamp admitted_at_utc = 7; +} + +message StudioMemberBindingRejectedEvent { + string binding_run_id = 1; + string scope_id = 2; + string member_id = 3; + StudioMemberBindingFailure failure = 4; +} + +message StudioMemberPlatformBindingStartRequested { + string binding_run_id = 1; + string platform_binding_command_id = 2; + StudioMemberBindingRequest request = 3; + StudioMemberBindingAdmittedSnapshot admitted = 4; + google.protobuf.Timestamp requested_at_utc = 5; +} + +message StudioMemberPlatformBindingAccepted { + string binding_run_id = 1; + string platform_binding_command_id = 2; + google.protobuf.Timestamp accepted_at_utc = 3; +} + +message StudioMemberPlatformBindingExecuteRequested { + string binding_run_id = 1; + string platform_binding_command_id = 2; + bool recovery_execution = 3; +} + +message StudioMemberPlatformBindingWatchdogFired { + string binding_run_id = 1; + string platform_binding_command_id = 2; +} + +message StudioMemberPlatformBindingExecutionStarted { + string binding_run_id = 1; + string platform_binding_command_id = 2; + google.protobuf.Timestamp started_at_utc = 3; +} + +message StudioMemberPlatformBindingSucceeded { + string binding_run_id = 1; + string platform_binding_command_id = 2; + StudioMemberPlatformBindingResult result = 3; + google.protobuf.Timestamp completed_at_utc = 4; +} + +message StudioMemberPlatformBindingFailed { + string binding_run_id = 1; + string platform_binding_command_id = 2; + StudioMemberBindingFailure failure = 3; +} + +message StudioMemberBindingTerminalAcknowledged { + string binding_run_id = 1; + StudioMemberBindingRunStatus status = 2; + google.protobuf.Timestamp acknowledged_at_utc = 3; } // Single reassignment event covers assign / unassign / move (ADR-0017). diff --git a/apps/aevatar-console-web/src/pages/studio/components/bind/StudioMemberBindPanel.test.tsx b/apps/aevatar-console-web/src/pages/studio/components/bind/StudioMemberBindPanel.test.tsx index b7f99ae03..e4b53cf40 100644 --- a/apps/aevatar-console-web/src/pages/studio/components/bind/StudioMemberBindPanel.test.tsx +++ b/apps/aevatar-console-web/src/pages/studio/components/bind/StudioMemberBindPanel.test.tsx @@ -670,16 +670,24 @@ describe('StudioMemberBindPanel', () => { fireEvent.click(screen.getByRole('button', { name: 'Bind current revision' })); }); - expect(await screen.findByText('draft1 is now bound. Review the invoke contract below.')).toBeTruthy(); + expect( + await screen.findByText( + 'draft1 binding request was accepted. Studio will show the published contract after the run completes.', + ), + ).toBeTruthy(); fireEvent.click(screen.getByRole('button', { name: 'Switch candidate' })); expect(await screen.findByText('No published contract exists for joker yet.')).toBeTruthy(); expect( - screen.queryByText('draft1 is now bound. Review the invoke contract below.'), + screen.queryByText( + 'draft1 binding request was accepted. Studio will show the published contract after the run completes.', + ), ).toBeNull(); expect( - screen.queryByText('joker is now bound. Review the invoke contract below.'), + screen.queryByText( + 'joker binding request was accepted. Studio will show the published contract after the run completes.', + ), ).toBeNull(); }); }); diff --git a/apps/aevatar-console-web/src/pages/studio/components/bind/StudioMemberBindPanel.tsx b/apps/aevatar-console-web/src/pages/studio/components/bind/StudioMemberBindPanel.tsx index a3ea11b08..bf4aa46f0 100644 --- a/apps/aevatar-console-web/src/pages/studio/components/bind/StudioMemberBindPanel.tsx +++ b/apps/aevatar-console-web/src/pages/studio/components/bind/StudioMemberBindPanel.tsx @@ -29,8 +29,10 @@ import { describeStudioMemberBindingRevisionContext, describeStudioMemberBindingRevisionTarget, formatStudioMemberBindingImplementationKind, - getStudioMemberBindingCurrentRevision, type StudioAuthSession, + type StudioMemberBindingContract, + type StudioMemberBindingRevision, + type StudioMemberBindingRunStatusResponse, } from '@/shared/studio/models'; import { studioApi } from '@/shared/studio/api'; import { AevatarPanel, AevatarStatusTag } from '@/shared/ui/aevatarPageShells'; @@ -52,7 +54,7 @@ type StudioMemberBindPanelProps = { readonly memberId?: string; readonly initialServiceId?: string; readonly onContinueToInvoke?: (serviceId: string, endpointId: string) => void; - readonly onBindPendingCandidate?: (() => Promise) | null; + readonly onBindPendingCandidate?: (() => Promise) | null; readonly onSelectionChange?: (selection: { serviceId: string; endpointId: string; @@ -70,6 +72,11 @@ type StudioMemberBindPanelProps = { readonly services: readonly ServiceCatalogSnapshot[]; }; +type PendingBindNotice = { + readonly message: string; + readonly type: 'success' | 'info' | 'warning' | 'error'; +}; + type SnippetTab = 'curl' | 'fetch' | 'sdk'; type SmokeTestResult = { @@ -81,6 +88,93 @@ type SmokeTestResult = { readonly status: 'idle' | 'running' | 'success' | 'error'; }; +function isStudioMemberBindingRunTerminal( + run: StudioMemberBindingRunStatusResponse | null | undefined, +): boolean { + return Boolean( + run && ['succeeded', 'failed', 'rejected'].includes(run.status), + ); +} + +function describeStudioMemberBindingRunStatus( + run: StudioMemberBindingRunStatusResponse, +): PendingBindNotice { + if (run.status === 'succeeded') { + return { + message: 'Binding completed. Studio is refreshing the published contract.', + type: 'success', + }; + } + + if (run.status === 'failed' || run.status === 'rejected') { + return { + message: + run.failure?.message || + (run.status === 'rejected' + ? 'Binding request was rejected by the member authority.' + : 'Binding failed while publishing the member contract.'), + type: 'error', + }; + } + + if (run.status === 'platform_binding_pending') { + return { + message: + 'Binding request accepted. Platform publication is still running; Invoke is not ready until the run completes.', + type: 'info', + }; + } + + if (run.status === 'admitted') { + return { + message: + 'Binding request admitted. Studio is starting platform publication; the member is not callable yet.', + type: 'info', + }; + } + + return { + message: + 'Binding request accepted. Studio is waiting for the member authority; this does not mean the member is bound yet.', + type: 'info', + }; +} + +function buildRevisionFromMemberBinding( + binding: StudioMemberBindingContract | null | undefined, +): StudioMemberBindingRevision | null { + if (!binding) { + return null; + } + + return { + allocationWeight: 100, + artifactHash: '', + createdAt: binding.boundAt, + deploymentId: '', + failureReason: '', + implementationKind: binding.implementationKind, + inlineWorkflowCount: 0, + isActiveServing: true, + isDefaultServing: true, + isServingTarget: true, + preparedAt: binding.boundAt, + primaryActorId: '', + publishedAt: binding.boundAt, + retiredAt: null, + revisionId: binding.revisionId, + scriptDefinitionActorId: '', + scriptId: '', + scriptRevision: '', + scriptSourceHash: '', + servingState: 'active', + staticActorTypeName: '', + status: 'active', + workflowDefinitionActorId: '', + workflowName: '', + }; +} + const monoFontFamily = "ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, 'Liberation Mono', monospace"; @@ -352,10 +446,8 @@ const StudioMemberBindPanel: React.FC = ({ createIdleSmokeTestResult(), ); const [pendingBindBusy, setPendingBindBusy] = useState(false); - const [pendingBindNotice, setPendingBindNotice] = useState<{ - readonly message: string; - readonly type: 'success' | 'error'; - } | null>(null); + const [pendingBindNotice, setPendingBindNotice] = + useState(null); const runsCurrentWorkflowDraft = Boolean(buildWorkflowYamls); const normalizedMemberId = trimOptional(memberId); @@ -456,20 +548,30 @@ const StudioMemberBindPanel: React.FC = ({ scopeRuntimeApi.getServiceRevisions(scopeId, selectedService?.serviceId || ''), }); const memberBindingStatusQuery = useQuery({ - enabled: Boolean(scopeId && normalizedMemberId && selectedService?.serviceId), + enabled: Boolean(scopeId && normalizedMemberId), queryKey: ['studio-bind', 'member-binding', scopeId, normalizedMemberId], queryFn: () => studioApi.getMemberBinding(scopeId, normalizedMemberId), + refetchInterval: (query) => { + const data = query.state.data as + | Awaited> + | undefined; + return data?.currentBindingRun && + !isStudioMemberBindingRunTerminal(data.currentBindingRun) + ? 1_500 + : false; + }, }); - const revisionCatalogQuery = normalizedMemberId - ? memberBindingStatusQuery - : revisionsQuery; + const currentBindingRun = memberBindingStatusQuery.data?.currentBindingRun ?? null; + const revisionCatalogQuery = revisionsQuery; const currentPublishedRevision = useMemo( () => - normalizedMemberId - ? getStudioMemberBindingCurrentRevision(memberBindingStatusQuery.data) - : getScopeServiceCurrentRevision(revisionsQuery.data), - [memberBindingStatusQuery.data, normalizedMemberId, revisionsQuery.data], + buildRevisionFromMemberBinding(memberBindingStatusQuery.data?.lastBinding) ?? + getScopeServiceCurrentRevision(revisionsQuery.data), + [memberBindingStatusQuery.data?.lastBinding, revisionsQuery.data], ); + const currentBindingRunNotice = currentBindingRun + ? describeStudioMemberBindingRunStatus(currentBindingRun) + : null; const bindContract = useMemo( () => @@ -709,13 +811,15 @@ const StudioMemberBindPanel: React.FC = ({ setPendingBindBusy(true); setPendingBindNotice(null); try { - await onBindPendingCandidate(); + const resultNotice = await onBindPendingCandidate(); if (bindSurfaceIdentityRef.current !== requestBindIdentity) { return; } setPendingBindNotice({ - message: `${pendingBindingCandidate.displayName} is now bound. Review the invoke contract below.`, - type: 'success', + message: + resultNotice?.message || + `${pendingBindingCandidate.displayName} binding request was accepted. Studio will show the published contract after the run completes.`, + type: resultNotice?.type || 'info', }); } catch (error) { if (bindSurfaceIdentityRef.current !== requestBindIdentity) { @@ -804,6 +908,14 @@ const StudioMemberBindPanel: React.FC = ({ type={pendingBindNotice.type} /> ) : null} + {currentBindingRunNotice ? ( + + ) : null}