From 1dd45712351f94a29416b98e71dcdb83dc58e40b Mon Sep 17 00:00:00 2001 From: Fred Tzeng Date: Wed, 17 Jun 2026 09:33:50 -0700 Subject: [PATCH] Add support for updating start_delay via UpdateActivityOptions --- chasm/lib/activity/activity.go | 170 +++-- .../gen/activitypb/v1/activity_state.pb.go | 69 +- .../activity/proto/v1/activity_state.proto | 6 + chasm/lib/activity/statemachine.go | 4 + common/activityoptions/merge.go | 6 +- go.mod | 2 +- go.sum | 4 +- tests/activity_standalone_test.go | 688 ++++++++++++++++++ 8 files changed, 854 insertions(+), 95 deletions(-) diff --git a/chasm/lib/activity/activity.go b/chasm/lib/activity/activity.go index b4881117a46..0cc435e4edc 100644 --- a/chasm/lib/activity/activity.go +++ b/chasm/lib/activity/activity.go @@ -184,6 +184,7 @@ func NewStandaloneActivity( HeartbeatTimeout: request.GetHeartbeatTimeout(), RetryPolicy: request.GetRetryPolicy(), Priority: request.GetPriority(), + StartDelay: request.GetStartDelay(), }, }, LastAttempt: chasm.NewDataField(ctx, &activitypb.ActivityAttemptState{}), @@ -624,6 +625,29 @@ func (a *Activity) UpdateActivityExecutionOptions( frontendReq := req.GetFrontendRequest() + // start_delay updates are only valid while the activity is still in its delay window. + var hasStartDelayInMask bool + if mask := frontendReq.GetUpdateMask(); mask != nil { + _, hasStartDelayInMask = util.ParseFieldMask(mask)["startDelay"] + } + if !frontendReq.GetRestoreOriginal() && hasStartDelayInMask { + newDelay := frontendReq.GetActivityOptions().GetStartDelay() + if err := validateStartDelay(newDelay); err != nil { + return nil, err + } + if newDelay.AsDuration() > 0 { + actCtx := activityContextFromChasm(ctx) + if !actCtx.config.StartDelayEnabled(frontendReq.GetNamespace()) { + return nil, serviceerror.NewInvalidArgument("start_delay is not enabled for this namespace") + } + } + if a.GetStatus() != activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED || + !a.firstDispatchTime().After(ctx.Now(a)) { + return nil, serviceerror.NewInvalidArgument( + "cannot update start_delay: activity is no longer in its delay window") + } + } + if frontendReq.GetRestoreOriginal() { ogOptions := a.GetOriginalOptions() a.TaskQueue = common.CloneProto(ogOptions.GetTaskQueue()) @@ -633,6 +657,11 @@ func (a *Activity) UpdateActivityExecutionOptions( a.HeartbeatTimeout = common.CloneProto(ogOptions.GetHeartbeatTimeout()) a.RetryPolicy = common.CloneProto(ogOptions.GetRetryPolicy()) a.Priority = common.CloneProto(ogOptions.GetPriority()) + // start_delay only governs the first dispatch. Once the first attempt has started, restoring + // the original value would shift ScheduleToClose without affecting dispatch timing. + if a.GetFirstAttemptStartedTime() == nil { + a.StartDelay = common.CloneProto(ogOptions.GetStartDelay()) + } } else { if err := a.mergeActivityOptions(frontendReq); err != nil { return nil, err @@ -650,9 +679,8 @@ func (a *Activity) UpdateActivityExecutionOptions( // Add a new ScheduleToCloseTimeoutTask at the (possibly updated) deadline. // Increment the stamp so the previous task is invalidated by the Validate check. - if timeout := a.GetScheduleToCloseTimeout().AsDuration(); timeout > 0 { + if deadline := a.scheduleToCloseDeadline(); !deadline.IsZero() { a.ScheduleToCloseStamp++ - deadline := a.GetScheduleTime().AsTime().Add(timeout) ctx.AddTask( a, chasm.TaskAttributes{ScheduledTime: deadline}, @@ -662,68 +690,9 @@ func (a *Activity) UpdateActivityExecutionOptions( attempt.Stamp++ - if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED || - a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED || - a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED || - a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_RESET_REQUESTED { - // Re-create the start-to-close timeout task with the new stamp and (possibly updated) timeout. - // The old task was invalidated by the stamp increment above. - if timeout := a.GetStartToCloseTimeout().AsDuration(); timeout > 0 { - deadline := attempt.GetStartedTime().AsTime().Add(timeout) - ctx.AddTask( - a, - chasm.TaskAttributes{ScheduledTime: deadline}, - &activitypb.StartToCloseTimeoutTask{Stamp: attempt.GetStamp()}, - ) - } - - if hbTimeout := a.GetHeartbeatTimeout().AsDuration(); hbTimeout > 0 { - // The next heartbeat time is the max of (the last heartbeats recorded time and - // the current attempts started time) plus the heartbeat timeout - lastHb, _ := a.LastHeartbeat.TryGet(ctx) - lastHbTime := util.MaxTime( - lastHb.GetRecordedTime().AsTime(), - attempt.GetStartedTime().AsTime(), - ).Add(hbTimeout) - ctx.AddTask( - a, - chasm.TaskAttributes{ - ScheduledTime: lastHbTime, - }, - &activitypb.HeartbeatTimeoutTask{ - Stamp: attempt.GetStamp(), - }, - ) - } - } - - // TODO(saa-ga): need to handle the StartDelay timer - + a.reissueRunningAttemptTimers(ctx, attempt) if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED { - // Re dispatch this activity - retryTime := attemptScheduleTimeForRetry(attempt) - var dispatchAttrs chasm.TaskAttributes - if retryTime != nil { - // in backoff, future retry time - dispatchAttrs.ScheduledTime = retryTime.AsTime() - } - ctx.AddTask( - a, - dispatchAttrs, - &activitypb.ActivityDispatchTask{Stamp: attempt.GetStamp()}, - ) - - if timeout := a.GetScheduleToStartTimeout().AsDuration(); timeout > 0 { - schedToStart := ctx.Now(a).Add(timeout) - if retryTime != nil { - schedToStart = retryTime.AsTime().Add(timeout) - } - ctx.AddTask( - a, - chasm.TaskAttributes{ScheduledTime: schedToStart}, - &activitypb.ScheduleToStartTimeoutTask{Stamp: attempt.GetStamp()}, - ) - } + a.reissueScheduledDispatch(ctx, attempt) } metricsHandler, err := a.enrichMetricsHandler(ctx, metrics.ActivityUpdateOptionsScope) @@ -742,6 +711,7 @@ func (a *Activity) UpdateActivityExecutionOptions( HeartbeatTimeout: a.GetHeartbeatTimeout(), RetryPolicy: a.GetRetryPolicy(), Priority: a.GetPriority(), + StartDelay: a.GetStartDelay(), }, }, }, nil @@ -763,6 +733,7 @@ func (a *Activity) mergeActivityOptions( HeartbeatTimeout: a.HeartbeatTimeout, Priority: a.Priority, RetryPolicy: a.RetryPolicy, + StartDelay: a.StartDelay, } if err := activityoptions.MergeActivityOptions(ao, req.GetActivityOptions(), updateFields); err != nil { @@ -784,6 +755,7 @@ func (a *Activity) mergeActivityOptions( a.HeartbeatTimeout = ao.HeartbeatTimeout a.Priority = ao.Priority a.RetryPolicy = ao.RetryPolicy + a.StartDelay = ao.StartDelay return nil } @@ -1226,6 +1198,78 @@ func (a *Activity) firstDispatchTime() time.Time { return a.ScheduleTime.AsTime().Add(a.GetStartDelay().AsDuration()) } +// reissueScheduledDispatch re-emits the ActivityDispatchTask and ScheduleToStart timeout task for +// a SCHEDULED activity. Retries fire at the retry time; first attempts dispatch now, lifted to +// honor any pending start_delay. +func (a *Activity) reissueScheduledDispatch(ctx chasm.MutableContext, attempt *activitypb.ActivityAttemptState) { + var scheduleTime time.Time + if retryTime := attemptScheduleTimeForRetry(attempt); retryTime != nil { + scheduleTime = retryTime.AsTime() + } else { + scheduleTime = a.respectStartDelay(ctx.Now(a)) + } + ctx.AddTask( + a, + chasm.TaskAttributes{ScheduledTime: scheduleTime}, + &activitypb.ActivityDispatchTask{Stamp: attempt.GetStamp()}, + ) + if timeout := a.GetScheduleToStartTimeout().AsDuration(); timeout > 0 { + ctx.AddTask( + a, + chasm.TaskAttributes{ScheduledTime: scheduleTime.Add(timeout)}, + &activitypb.ScheduleToStartTimeoutTask{Stamp: attempt.GetStamp()}, + ) + } +} + +// reissueRunningAttemptTimers re-emits the StartToClose and Heartbeat timeout tasks for the +// currently-running attempt, anchored to the attempt's StartedTime. Called from options-update +// paths after stamp bump so the old tasks are invalidated and replaced with the (possibly +// updated) timeouts. No-op unless the activity is in a status where a worker holds the task token +// (STARTED / CANCEL_REQUESTED / PAUSE_REQUESTED / RESET_REQUESTED). +func (a *Activity) reissueRunningAttemptTimers(ctx chasm.MutableContext, attempt *activitypb.ActivityAttemptState) { + if a.GetStatus() != activitypb.ACTIVITY_EXECUTION_STATUS_STARTED && + a.GetStatus() != activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED && + a.GetStatus() != activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED && + a.GetStatus() != activitypb.ACTIVITY_EXECUTION_STATUS_RESET_REQUESTED { + return + } + if timeout := a.GetStartToCloseTimeout().AsDuration(); timeout > 0 { + deadline := attempt.GetStartedTime().AsTime().Add(timeout) + ctx.AddTask( + a, + chasm.TaskAttributes{ScheduledTime: deadline}, + &activitypb.StartToCloseTimeoutTask{Stamp: attempt.GetStamp()}, + ) + } + if hbTimeout := a.GetHeartbeatTimeout().AsDuration(); hbTimeout > 0 { + // Next heartbeat fires at max(last recorded heartbeat, current attempt start) + heartbeat timeout. + lastHb, _ := a.LastHeartbeat.TryGet(ctx) + lastHbTime := util.MaxTime( + lastHb.GetRecordedTime().AsTime(), + attempt.GetStartedTime().AsTime(), + ).Add(hbTimeout) + ctx.AddTask( + a, + chasm.TaskAttributes{ScheduledTime: lastHbTime}, + &activitypb.HeartbeatTimeoutTask{Stamp: attempt.GetStamp()}, + ) + } +} + +// respectStartDelay lifts a candidate dispatch time up to scheduleTime + start_delay when the +// activity has not yet been picked up by a worker, so pre-dispatch re-scheduling (unpause, Reset+ +// RestoreOriginalOptions, options update) honors start_delay. No-op once dispatched. +func (a *Activity) respectStartDelay(scheduleTime time.Time) time.Time { + if a.GetFirstAttemptStartedTime() != nil { + return scheduleTime + } + if firstDispatch := a.firstDispatchTime(); firstDispatch.After(scheduleTime) { + return firstDispatch + } + return scheduleTime +} + // scheduleToCloseDeadline returns the absolute time at which the ScheduleToClose timeout expires, // accounting for start delay. Returns zero time if no ScheduleToClose timeout is set. func (a *Activity) scheduleToCloseDeadline() time.Time { diff --git a/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go b/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go index c943d7c3abd..11786f40229 100644 --- a/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go +++ b/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go @@ -235,8 +235,13 @@ type ActivityState struct { // that when the worker yields the activity lands back in PAUSED rather than SCHEDULED. Consumed // when the activity transitions out of RESET_REQUESTED. ResetKeepPaused bool `protobuf:"varint,18,opt,name=reset_keep_paused,json=resetKeepPaused,proto3" json:"reset_keep_paused,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // Time at which a worker first picked up the activity (the first attempt's started time). Set + // once on the first SCHEDULED->STARTED transition and never updated thereafter, so it survives + // retries and resets. Used as the discriminator for whether start_delay still applies on + // pre-dispatch rescheduling paths. + FirstAttemptStartedTime *timestamppb.Timestamp `protobuf:"bytes,19,opt,name=first_attempt_started_time,json=firstAttemptStartedTime,proto3" json:"first_attempt_started_time,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *ActivityState) Reset() { @@ -395,6 +400,13 @@ func (x *ActivityState) GetResetKeepPaused() bool { return false } +func (x *ActivityState) GetFirstAttemptStartedTime() *timestamppb.Timestamp { + if x != nil { + return x.FirstAttemptStartedTime + } + return nil +} + type ActivityCancelState struct { state protoimpl.MessageState `protogen:"open.v1"` RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` @@ -1088,8 +1100,7 @@ var File_temporal_server_chasm_lib_activity_proto_v1_activity_state_proto protor const file_temporal_server_chasm_lib_activity_proto_v1_activity_state_proto_rawDesc = "" + "\n" + - "@temporal/server/chasm/lib/activity/proto/v1/activity_state.proto\x12+temporal.server.chasm.lib.activity.proto.v1\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a&temporal/api/activity/v1/message.proto\x1a$temporal/api/common/v1/message.proto\x1a(temporal/api/deployment/v1/message.proto\x1a%temporal/api/failure/v1/message.proto\x1a'temporal/api/sdk/v1/user_metadata.proto\x1a'temporal/api/taskqueue/v1/message.proto\"\xe6\n" + - "\n" + + "@temporal/server/chasm/lib/activity/proto/v1/activity_state.proto\x12+temporal.server.chasm.lib.activity.proto.v1\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a&temporal/api/activity/v1/message.proto\x1a$temporal/api/common/v1/message.proto\x1a(temporal/api/deployment/v1/message.proto\x1a%temporal/api/failure/v1/message.proto\x1a'temporal/api/sdk/v1/user_metadata.proto\x1a'temporal/api/taskqueue/v1/message.proto\"\xbf\v\n" + "\rActivityState\x12I\n" + "\ractivity_type\x18\x01 \x01(\v2$.temporal.api.common.v1.ActivityTypeR\factivityType\x12C\n" + "\n" + @@ -1111,7 +1122,8 @@ const file_temporal_server_chasm_lib_activity_proto_v1_activity_state_proto_rawD "\x17schedule_to_close_stamp\x18\x0f \x01(\x05R\x14scheduleToCloseStamp\x12i\n" + "\x10last_pause_state\x18\x10 \x01(\v2?.temporal.server.chasm.lib.activity.proto.v1.ActivityPauseStateR\x0elastPauseState\x12)\n" + "\x10reset_heartbeats\x18\x11 \x01(\bR\x0fresetHeartbeats\x12*\n" + - "\x11reset_keep_paused\x18\x12 \x01(\bR\x0fresetKeepPaused\"\xa7\x01\n" + + "\x11reset_keep_paused\x18\x12 \x01(\bR\x0fresetKeepPaused\x12W\n" + + "\x1afirst_attempt_started_time\x18\x13 \x01(\v2\x1a.google.protobuf.TimestampR\x17firstAttemptStartedTime\"\xa7\x01\n" + "\x13ActivityCancelState\x12\x1d\n" + "\n" + "request_id\x18\x01 \x01(\tR\trequestId\x12=\n" + @@ -1235,29 +1247,30 @@ var file_temporal_server_chasm_lib_activity_proto_v1_activity_state_proto_depIdx 14, // 12: temporal.server.chasm.lib.activity.proto.v1.ActivityState.start_delay:type_name -> google.protobuf.Duration 18, // 13: temporal.server.chasm.lib.activity.proto.v1.ActivityState.original_options:type_name -> temporal.api.activity.v1.ActivityOptions 4, // 14: temporal.server.chasm.lib.activity.proto.v1.ActivityState.last_pause_state:type_name -> temporal.server.chasm.lib.activity.proto.v1.ActivityPauseState - 16, // 15: temporal.server.chasm.lib.activity.proto.v1.ActivityCancelState.request_time:type_name -> google.protobuf.Timestamp - 16, // 16: temporal.server.chasm.lib.activity.proto.v1.ActivityPauseState.pause_time:type_name -> google.protobuf.Timestamp - 14, // 17: temporal.server.chasm.lib.activity.proto.v1.ActivityAttemptState.current_retry_interval:type_name -> google.protobuf.Duration - 16, // 18: temporal.server.chasm.lib.activity.proto.v1.ActivityAttemptState.started_time:type_name -> google.protobuf.Timestamp - 16, // 19: temporal.server.chasm.lib.activity.proto.v1.ActivityAttemptState.complete_time:type_name -> google.protobuf.Timestamp - 9, // 20: temporal.server.chasm.lib.activity.proto.v1.ActivityAttemptState.last_failure_details:type_name -> temporal.server.chasm.lib.activity.proto.v1.ActivityAttemptState.LastFailureDetails - 19, // 21: temporal.server.chasm.lib.activity.proto.v1.ActivityAttemptState.last_deployment_version:type_name -> temporal.api.deployment.v1.WorkerDeploymentVersion - 20, // 22: temporal.server.chasm.lib.activity.proto.v1.ActivityHeartbeatState.details:type_name -> temporal.api.common.v1.Payloads - 16, // 23: temporal.server.chasm.lib.activity.proto.v1.ActivityHeartbeatState.recorded_time:type_name -> google.protobuf.Timestamp - 20, // 24: temporal.server.chasm.lib.activity.proto.v1.ActivityRequestData.input:type_name -> temporal.api.common.v1.Payloads - 21, // 25: temporal.server.chasm.lib.activity.proto.v1.ActivityRequestData.header:type_name -> temporal.api.common.v1.Header - 22, // 26: temporal.server.chasm.lib.activity.proto.v1.ActivityRequestData.user_metadata:type_name -> temporal.api.sdk.v1.UserMetadata - 10, // 27: temporal.server.chasm.lib.activity.proto.v1.ActivityOutcome.successful:type_name -> temporal.server.chasm.lib.activity.proto.v1.ActivityOutcome.Successful - 11, // 28: temporal.server.chasm.lib.activity.proto.v1.ActivityOutcome.failed:type_name -> temporal.server.chasm.lib.activity.proto.v1.ActivityOutcome.Failed - 16, // 29: temporal.server.chasm.lib.activity.proto.v1.ActivityAttemptState.LastFailureDetails.time:type_name -> google.protobuf.Timestamp - 23, // 30: temporal.server.chasm.lib.activity.proto.v1.ActivityAttemptState.LastFailureDetails.failure:type_name -> temporal.api.failure.v1.Failure - 20, // 31: temporal.server.chasm.lib.activity.proto.v1.ActivityOutcome.Successful.output:type_name -> temporal.api.common.v1.Payloads - 23, // 32: temporal.server.chasm.lib.activity.proto.v1.ActivityOutcome.Failed.failure:type_name -> temporal.api.failure.v1.Failure - 33, // [33:33] is the sub-list for method output_type - 33, // [33:33] is the sub-list for method input_type - 33, // [33:33] is the sub-list for extension type_name - 33, // [33:33] is the sub-list for extension extendee - 0, // [0:33] is the sub-list for field type_name + 16, // 15: temporal.server.chasm.lib.activity.proto.v1.ActivityState.first_attempt_started_time:type_name -> google.protobuf.Timestamp + 16, // 16: temporal.server.chasm.lib.activity.proto.v1.ActivityCancelState.request_time:type_name -> google.protobuf.Timestamp + 16, // 17: temporal.server.chasm.lib.activity.proto.v1.ActivityPauseState.pause_time:type_name -> google.protobuf.Timestamp + 14, // 18: temporal.server.chasm.lib.activity.proto.v1.ActivityAttemptState.current_retry_interval:type_name -> google.protobuf.Duration + 16, // 19: temporal.server.chasm.lib.activity.proto.v1.ActivityAttemptState.started_time:type_name -> google.protobuf.Timestamp + 16, // 20: temporal.server.chasm.lib.activity.proto.v1.ActivityAttemptState.complete_time:type_name -> google.protobuf.Timestamp + 9, // 21: temporal.server.chasm.lib.activity.proto.v1.ActivityAttemptState.last_failure_details:type_name -> temporal.server.chasm.lib.activity.proto.v1.ActivityAttemptState.LastFailureDetails + 19, // 22: temporal.server.chasm.lib.activity.proto.v1.ActivityAttemptState.last_deployment_version:type_name -> temporal.api.deployment.v1.WorkerDeploymentVersion + 20, // 23: temporal.server.chasm.lib.activity.proto.v1.ActivityHeartbeatState.details:type_name -> temporal.api.common.v1.Payloads + 16, // 24: temporal.server.chasm.lib.activity.proto.v1.ActivityHeartbeatState.recorded_time:type_name -> google.protobuf.Timestamp + 20, // 25: temporal.server.chasm.lib.activity.proto.v1.ActivityRequestData.input:type_name -> temporal.api.common.v1.Payloads + 21, // 26: temporal.server.chasm.lib.activity.proto.v1.ActivityRequestData.header:type_name -> temporal.api.common.v1.Header + 22, // 27: temporal.server.chasm.lib.activity.proto.v1.ActivityRequestData.user_metadata:type_name -> temporal.api.sdk.v1.UserMetadata + 10, // 28: temporal.server.chasm.lib.activity.proto.v1.ActivityOutcome.successful:type_name -> temporal.server.chasm.lib.activity.proto.v1.ActivityOutcome.Successful + 11, // 29: temporal.server.chasm.lib.activity.proto.v1.ActivityOutcome.failed:type_name -> temporal.server.chasm.lib.activity.proto.v1.ActivityOutcome.Failed + 16, // 30: temporal.server.chasm.lib.activity.proto.v1.ActivityAttemptState.LastFailureDetails.time:type_name -> google.protobuf.Timestamp + 23, // 31: temporal.server.chasm.lib.activity.proto.v1.ActivityAttemptState.LastFailureDetails.failure:type_name -> temporal.api.failure.v1.Failure + 20, // 32: temporal.server.chasm.lib.activity.proto.v1.ActivityOutcome.Successful.output:type_name -> temporal.api.common.v1.Payloads + 23, // 33: temporal.server.chasm.lib.activity.proto.v1.ActivityOutcome.Failed.failure:type_name -> temporal.api.failure.v1.Failure + 34, // [34:34] is the sub-list for method output_type + 34, // [34:34] is the sub-list for method input_type + 34, // [34:34] is the sub-list for extension type_name + 34, // [34:34] is the sub-list for extension extendee + 0, // [0:34] is the sub-list for field type_name } func init() { file_temporal_server_chasm_lib_activity_proto_v1_activity_state_proto_init() } diff --git a/chasm/lib/activity/proto/v1/activity_state.proto b/chasm/lib/activity/proto/v1/activity_state.proto index 27bca0cf75a..275dfede36f 100644 --- a/chasm/lib/activity/proto/v1/activity_state.proto +++ b/chasm/lib/activity/proto/v1/activity_state.proto @@ -137,6 +137,12 @@ message ActivityState { // that when the worker yields the activity lands back in PAUSED rather than SCHEDULED. Consumed // when the activity transitions out of RESET_REQUESTED. bool reset_keep_paused = 18; + + // Time at which a worker first picked up the activity (the first attempt's started time). Set + // once on the first SCHEDULED->STARTED transition and never updated thereafter, so it survives + // retries and resets. Used as the discriminator for whether start_delay still applies on + // pre-dispatch rescheduling paths. + google.protobuf.Timestamp first_attempt_started_time = 19; } message ActivityCancelState { diff --git a/chasm/lib/activity/statemachine.go b/chasm/lib/activity/statemachine.go index 9f6e6af732d..ce70b1d4c64 100644 --- a/chasm/lib/activity/statemachine.go +++ b/chasm/lib/activity/statemachine.go @@ -141,6 +141,10 @@ var TransitionStarted = chasm.NewTransition( func(a *Activity, ctx chasm.MutableContext, request *historyservice.RecordActivityTaskStartedRequest) error { attempt := a.LastAttempt.Get(ctx) attempt.StartedTime = timestamppb.New(ctx.Now(a)) + // Record the first-ever worker pickup time once and never update on retries or resets. + if a.FirstAttemptStartedTime == nil { + a.FirstAttemptStartedTime = attempt.GetStartedTime() + } attempt.StartRequestId = request.GetRequestId() attempt.LastWorkerIdentity = request.GetPollRequest().GetIdentity() attempt.SdkName = ctx.RequestHeader(headers.ClientNameHeaderName) diff --git a/common/activityoptions/merge.go b/common/activityoptions/merge.go index b9713ba431c..cc0b84c6181 100644 --- a/common/activityoptions/merge.go +++ b/common/activityoptions/merge.go @@ -7,7 +7,7 @@ import ( ) // MergeActivityOptions applies the fields specified in updateFields from mergeFrom into mergeInto in-place. -// updateFields is a map of camelCase JSON field paths, as returned by util.ParseFieldMask. +// updateFields is a map of camelCase JSON field paths, as returned by util.ParseFieldMask // Returns an error if a required parent field (TaskQueue, Priority, RetryPolicy) is nil in mergeFrom // when a sub-field of that parent is listed in updateFields. // @@ -118,5 +118,9 @@ func MergeActivityOptions(mergeInto, mergeFrom *activitypb.ActivityOptions, upda mergeInto.RetryPolicy.MaximumAttempts = mergeFrom.GetRetryPolicy().GetMaximumAttempts() } + if _, ok := updateFields["startDelay"]; ok { + mergeInto.StartDelay = mergeFrom.GetStartDelay() + } + return nil } diff --git a/go.mod b/go.mod index 8c3ae7e2fab..0358c9a5323 100644 --- a/go.mod +++ b/go.mod @@ -64,7 +64,7 @@ require ( go.opentelemetry.io/otel/sdk v1.43.0 go.opentelemetry.io/otel/sdk/metric v1.43.0 go.opentelemetry.io/otel/trace v1.43.0 - go.temporal.io/api v1.62.15-0.20260615235047-378792ab2240 + go.temporal.io/api v1.62.15-0.20260616185333-4e489f426ebb // TODO: get off dev branch go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2 go.temporal.io/sdk v1.41.1 go.uber.org/fx v1.24.0 diff --git a/go.sum b/go.sum index fdb33780b70..bce0ac22597 100644 --- a/go.sum +++ b/go.sum @@ -471,8 +471,8 @@ go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.3.0 h1:R go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.3.0/go.mod h1:I89cynRj8y+383o7tEQVg2SVA6SRgDVIouWPUVXjx0U= go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.3.0 h1:CQvJSldHRUN6Z8jsUeYv8J0lXRvygALXIzsmAeCcZE0= go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.3.0/go.mod h1:xSQ+mEfJe/GjK1LXEyVOoSI1N9JV9ZI923X5kup43W4= -go.temporal.io/api v1.62.15-0.20260615235047-378792ab2240 h1:Up/CNfkScGxN1TdrGZ3ez+0k6MIIhuhlbBgdZnrPhm0= -go.temporal.io/api v1.62.15-0.20260615235047-378792ab2240/go.mod h1:0k75tRljEuELWGeXjEZZO7zYqBln4+1FrG6+IMOMy7Q= +go.temporal.io/api v1.62.15-0.20260616185333-4e489f426ebb h1:B6JLjw425iG75kHGYfHYzwqlbYiMJrpYooorBJA8mnA= +go.temporal.io/api v1.62.15-0.20260616185333-4e489f426ebb/go.mod h1:0k75tRljEuELWGeXjEZZO7zYqBln4+1FrG6+IMOMy7Q= go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2 h1:1hKeH3GyR6YD6LKMHGCZ76t6h1Sgha0hXVQBxWi3dlQ= go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2/go.mod h1:T8dnzVPeO+gaUTj9eDgm/lT2lZH4+JXNvrGaQGyVi50= go.temporal.io/sdk v1.41.1 h1:yOpvsHyDD1lNuwlGBv/SUodCPhjv9nDeC9lLHW/fJUA= diff --git a/tests/activity_standalone_test.go b/tests/activity_standalone_test.go index 1a7c6638d48..ba7a558f66a 100644 --- a/tests/activity_standalone_test.go +++ b/tests/activity_standalone_test.go @@ -6150,6 +6150,694 @@ func (s *standaloneActivityTestSuite) TestStartDelay() { require.EqualValues(t, 2, descResp.GetInfo().GetAttempt()) protorequire.ProtoEqual(t, defaultResult, descResp.GetOutcome().GetResult()) }) + + // UpdateActivityOptions can mutate start_delay while the activity is inside its delay + // window. The new value is anchored to the original schedule_time; if the resulting + // first-dispatch time is in the past, the activity dispatches immediately. Setting + // start_delay to 0 is the "run-now" path. + s.Run("UpdateToZero_DispatchImmediately", func(s *standaloneActivityTestSuite) { + t := s.T() + env := s.newTestEnv() + + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + startDelay := 60 * time.Second + + startResp, err := env.FrontendClient().StartActivityExecution(s.Context(), &workflowservice.StartActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + ActivityType: env.Tv().ActivityType(), + Identity: env.Tv().WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), + StartDelay: durationpb.New(startDelay), + }) + require.NoError(t, err) + + // Update start_delay to 0 while we're still in the long delay window. + updateResp, err := env.FrontendClient().UpdateActivityExecutionOptions(s.Context(), &workflowservice.UpdateActivityExecutionOptionsRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + ActivityOptions: &activitypb.ActivityOptions{StartDelay: durationpb.New(0)}, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"start_delay"}}, + }) + require.NoError(t, err) + require.EqualValues(t, 0, updateResp.GetActivityOptions().GetStartDelay().AsDuration()) + + // Poll should succeed quickly (well before the original 60s delay would have elapsed). + pollCtx, cancel := context.WithTimeout(s.Context(), 10*time.Second) + defer cancel() + pollResp, err := env.pollActivityTaskQueue(pollCtx, taskQueue) + require.NoError(t, err) + require.NotEmpty(t, pollResp.GetTaskToken(), "expected immediate dispatch after start_delay=0") + }) + + s.Run("UpdateLonger_ExtendsDispatch", func(s *standaloneActivityTestSuite) { + t := s.T() + env := s.newTestEnv() + + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + originalDelay := 1 * time.Second + newDelay := 3 * time.Second + + startResp, err := env.FrontendClient().StartActivityExecution(s.Context(), &workflowservice.StartActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + ActivityType: env.Tv().ActivityType(), + Identity: env.Tv().WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), + StartDelay: durationpb.New(originalDelay), + }) + require.NoError(t, err) + + _, err = env.FrontendClient().UpdateActivityExecutionOptions(s.Context(), &workflowservice.UpdateActivityExecutionOptionsRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + ActivityOptions: &activitypb.ActivityOptions{StartDelay: durationpb.New(newDelay)}, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"start_delay"}}, + }) + require.NoError(t, err) + + // Activity should stay SCHEDULED past the original 2s delay, since the new target is at T+5s. + require.Never(t, func() bool { + descResp, err := env.FrontendClient().DescribeActivityExecution(s.Context(), &workflowservice.DescribeActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + }) + return err != nil || descResp.GetInfo().GetRunState() != enumspb.PENDING_ACTIVITY_STATE_SCHEDULED + }, originalDelay+timerSafetyMargin, 100*time.Millisecond, + "activity should remain SCHEDULED past the original delay since start_delay was extended") + + // Polling should eventually succeed once the new delay elapses. + pollResp, err := env.pollActivityTaskQueue(s.Context(), taskQueue) + require.NoError(t, err) + require.NotEmpty(t, pollResp.GetTaskToken()) + expectedDispatchTime := pollResp.GetScheduledTime().AsTime().Add(newDelay) + require.Equal(t, expectedDispatchTime, pollResp.GetCurrentAttemptScheduledTime().AsTime()) + }) + + s.Run("UpdateShorter_ReducesDispatch", func(s *standaloneActivityTestSuite) { + t := s.T() + env := s.newTestEnv() + + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + originalDelay := 30 * time.Second + newDelay := 2 * time.Second + + startResp, err := env.FrontendClient().StartActivityExecution(s.Context(), &workflowservice.StartActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + ActivityType: env.Tv().ActivityType(), + Identity: env.Tv().WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), + StartDelay: durationpb.New(originalDelay), + }) + require.NoError(t, err) + + _, err = env.FrontendClient().UpdateActivityExecutionOptions(s.Context(), &workflowservice.UpdateActivityExecutionOptionsRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + ActivityOptions: &activitypb.ActivityOptions{StartDelay: durationpb.New(newDelay)}, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"start_delay"}}, + }) + require.NoError(t, err) + + // Poll should succeed within the new (shorter) delay window, well before the original. + pollCtx, cancel := context.WithTimeout(s.Context(), 5*time.Second) + defer cancel() + pollResp, err := env.pollActivityTaskQueue(pollCtx, taskQueue) + require.NoError(t, err) + require.NotEmpty(t, pollResp.GetTaskToken()) + expectedDispatchTime := pollResp.GetScheduledTime().AsTime().Add(newDelay) + require.Equal(t, expectedDispatchTime, pollResp.GetCurrentAttemptScheduledTime().AsTime()) + }) + + s.Run("UpdatePastTime_DispatchesImmediately", func(s *standaloneActivityTestSuite) { + t := s.T() + env := s.newTestEnv() + + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + originalDelay := 60 * time.Second + + startResp, err := env.FrontendClient().StartActivityExecution(s.Context(), &workflowservice.StartActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + ActivityType: env.Tv().ActivityType(), + Identity: env.Tv().WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), + StartDelay: durationpb.New(originalDelay), + }) + require.NoError(t, err) + + // Give a small wall-clock advance so "scheduleTime + 1ms" is already in the past. + time.Sleep(100 * time.Millisecond) //nolint:forbidigo + + _, err = env.FrontendClient().UpdateActivityExecutionOptions(s.Context(), &workflowservice.UpdateActivityExecutionOptionsRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + ActivityOptions: &activitypb.ActivityOptions{StartDelay: durationpb.New(1 * time.Millisecond)}, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"start_delay"}}, + }) + require.NoError(t, err) + + pollCtx, cancel := context.WithTimeout(s.Context(), 10*time.Second) + defer cancel() + pollResp, err := env.pollActivityTaskQueue(pollCtx, taskQueue) + require.NoError(t, err) + require.NotEmpty(t, pollResp.GetTaskToken(), + "expected immediate dispatch when scheduleTime+newStartDelay is in the past") + }) + + s.Run("UpdateAfterDispatch_Rejected", func(s *standaloneActivityTestSuite) { + t := s.T() + env := s.newTestEnv() + + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + + startResp, err := env.FrontendClient().StartActivityExecution(s.Context(), &workflowservice.StartActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + ActivityType: env.Tv().ActivityType(), + Identity: env.Tv().WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), + }) + require.NoError(t, err) + + pollResp, err := env.pollActivityTaskQueue(s.Context(), taskQueue) + require.NoError(t, err) + require.NotEmpty(t, pollResp.GetTaskToken()) + + _, err = env.FrontendClient().UpdateActivityExecutionOptions(s.Context(), &workflowservice.UpdateActivityExecutionOptionsRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + ActivityOptions: &activitypb.ActivityOptions{StartDelay: durationpb.New(10 * time.Second)}, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"start_delay"}}, + }) + require.Error(t, err) + var invArg *serviceerror.InvalidArgument + require.ErrorAs(t, err, &invArg) + require.Contains(t, invArg.Message, "start_delay") + }) + + s.Run("UpdateNegative_Rejected", func(s *standaloneActivityTestSuite) { + t := s.T() + env := s.newTestEnv() + + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + + startResp, err := env.FrontendClient().StartActivityExecution(s.Context(), &workflowservice.StartActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + ActivityType: env.Tv().ActivityType(), + Identity: env.Tv().WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), + StartDelay: durationpb.New(60 * time.Second), + }) + require.NoError(t, err) + + _, err = env.FrontendClient().UpdateActivityExecutionOptions(s.Context(), &workflowservice.UpdateActivityExecutionOptionsRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + ActivityOptions: &activitypb.ActivityOptions{StartDelay: durationpb.New(-1 * time.Second)}, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"start_delay"}}, + }) + require.Error(t, err) + var invArg *serviceerror.InvalidArgument + require.ErrorAs(t, err, &invArg) + require.Contains(t, invArg.Message, "invalid StartDelay") + }) + + // An unrelated options update during the delay window must not shrink the ScheduleToClose deadline by start_delay. + s.Run("UpdateUnrelatedField_ScheduleToCloseRespectsDelay", func(s *standaloneActivityTestSuite) { + t := s.T() + env := s.newTestEnv() + + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + startDelay := 3 * time.Second + scheduleToCloseTimeout := 1 * time.Second + // Effective deadline = scheduleTime + startDelay + scheduleToClose = T+4s. + + startResp, err := env.FrontendClient().StartActivityExecution(s.Context(), &workflowservice.StartActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + ActivityType: env.Tv().ActivityType(), + Identity: env.Tv().WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + ScheduleToCloseTimeout: durationpb.New(scheduleToCloseTimeout), + StartDelay: durationpb.New(startDelay), + }) + require.NoError(t, err) + + // Update an unrelated field (heartbeat timeout) without touching start_delay. + _, err = env.FrontendClient().UpdateActivityExecutionOptions(s.Context(), &workflowservice.UpdateActivityExecutionOptionsRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + ActivityOptions: &activitypb.ActivityOptions{HeartbeatTimeout: durationpb.New(5 * time.Second)}, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"heartbeat_timeout"}}, + }) + require.NoError(t, err) + + // ExpirationTime must still reflect scheduleTime + startDelay + scheduleToClose. + descResp, err := env.FrontendClient().DescribeActivityExecution(s.Context(), &workflowservice.DescribeActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + }) + require.NoError(t, err) + expectedExpiration := descResp.GetInfo().GetScheduleTime().AsTime().Add(startDelay).Add(scheduleToCloseTimeout) + require.Equal(t, expectedExpiration, descResp.GetInfo().GetExpirationTime().AsTime()) + + // Must not time out before startDelay + scheduleToClose elapses (would happen with the bug). + require.Never(t, func() bool { + resp, err := env.FrontendClient().DescribeActivityExecution(s.Context(), &workflowservice.DescribeActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + }) + return err != nil || resp.GetInfo().GetStatus() == enumspb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT + }, startDelay+scheduleToCloseTimeout-timerSafetyMargin, 100*time.Millisecond, + "activity timed out earlier than scheduleTime + startDelay + scheduleToClose") + + // And eventually does time out at the correct deadline. + await.Require(s.Context(), t, func(c *await.T) { + resp, err := env.FrontendClient().DescribeActivityExecution(c.Context(), &workflowservice.DescribeActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + }) + require.NoError(c, err) + require.Equal(c, enumspb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT, resp.GetInfo().GetStatus()) + }, 10*time.Second, 100*time.Millisecond) + }) + + s.Run("UpdateRestoreOriginal_RestoresStartDelay", func(s *standaloneActivityTestSuite) { + t := s.T() + env := s.newTestEnv() + + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + originalDelay := 30 * time.Second + + startResp, err := env.FrontendClient().StartActivityExecution(s.Context(), &workflowservice.StartActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + ActivityType: env.Tv().ActivityType(), + Identity: env.Tv().WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), + StartDelay: durationpb.New(originalDelay), + }) + require.NoError(t, err) + + // First, update start_delay to a shorter value. + _, err = env.FrontendClient().UpdateActivityExecutionOptions(s.Context(), &workflowservice.UpdateActivityExecutionOptionsRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + ActivityOptions: &activitypb.ActivityOptions{StartDelay: durationpb.New(1 * time.Second)}, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"start_delay"}}, + }) + require.NoError(t, err) + descResp, err := env.FrontendClient().DescribeActivityExecution(s.Context(), &workflowservice.DescribeActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + }) + require.NoError(t, err) + require.Equal(t, 1*time.Second, descResp.GetInfo().GetStartDelay().AsDuration()) + + // Now restore original. + _, err = env.FrontendClient().UpdateActivityExecutionOptions(s.Context(), &workflowservice.UpdateActivityExecutionOptionsRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + RestoreOriginal: true, + }) + require.NoError(t, err) + + descResp, err = env.FrontendClient().DescribeActivityExecution(s.Context(), &workflowservice.DescribeActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + }) + require.NoError(t, err) + require.Equal(t, originalDelay, descResp.GetInfo().GetStartDelay().AsDuration()) + }) + + // RestoreOriginal on an already-dispatched activity must NOT touch start_delay. Restoring it would + // shift the close-deadline recomputation (firstDispatchTime + scheduleToClose) and rewrite the + // recorded requested_start_time without any functional effect on the running attempt. + s.Run("UpdateRestoreOriginal_OnStartedActivity_LeavesStartDelayUnchanged", func(s *standaloneActivityTestSuite) { + t := s.T() + env := s.newTestEnv() + + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + originalDelay := 5 * time.Second + scheduleToCloseTimeout := 30 * time.Second + + startResp, err := env.FrontendClient().StartActivityExecution(s.Context(), &workflowservice.StartActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + ActivityType: env.Tv().ActivityType(), + Identity: env.Tv().WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), + ScheduleToCloseTimeout: durationpb.New(scheduleToCloseTimeout), + StartDelay: durationpb.New(originalDelay), + }) + require.NoError(t, err) + + // While still in delay window, set start_delay=0 so the activity dispatches immediately. + _, err = env.FrontendClient().UpdateActivityExecutionOptions(s.Context(), &workflowservice.UpdateActivityExecutionOptionsRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + ActivityOptions: &activitypb.ActivityOptions{StartDelay: durationpb.New(0)}, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"start_delay"}}, + }) + require.NoError(t, err) + + // Poll: activity dispatches and worker picks up, so the status moves to STARTED. + pollResp, err := env.pollActivityTaskQueue(s.Context(), taskQueue) + require.NoError(t, err) + require.NotEmpty(t, pollResp.GetTaskToken()) + + descResp, err := env.FrontendClient().DescribeActivityExecution(s.Context(), &workflowservice.DescribeActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + }) + require.NoError(t, err) + require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_STARTED, descResp.GetInfo().GetRunState()) + scheduleTime := descResp.GetInfo().GetScheduleTime().AsTime() + expectedDeadlineAfterUpdate := scheduleTime.Add(scheduleToCloseTimeout) + require.Equal(t, expectedDeadlineAfterUpdate, descResp.GetInfo().GetExpirationTime().AsTime(), + "close-deadline should be scheduleTime + scheduleToClose after start_delay was set to 0") + + // RestoreOriginal while STARTED. + _, err = env.FrontendClient().UpdateActivityExecutionOptions(s.Context(), &workflowservice.UpdateActivityExecutionOptionsRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + RestoreOriginal: true, + }) + require.NoError(t, err) + + descResp, err = env.FrontendClient().DescribeActivityExecution(s.Context(), &workflowservice.DescribeActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + }) + require.NoError(t, err) + require.EqualValues(t, 0, descResp.GetInfo().GetStartDelay().AsDuration(), + "start_delay should not be restored when the activity has already dispatched") + require.Equal(t, expectedDeadlineAfterUpdate, descResp.GetInfo().GetExpirationTime().AsTime(), + "close-deadline should not shift when start_delay is left untouched") + }) + + // Same invariant as the STARTED case, but exercises the retry-backoff path: after the first + // attempt has dispatched and failed, the activity is back in SCHEDULED. RestoreOriginal must + // still not rewrite start_delay, since the first dispatch already happened. + s.Run("UpdateRestoreOriginal_OnRetryBackoff_LeavesStartDelayUnchanged", func(s *standaloneActivityTestSuite) { + t := s.T() + env := s.newTestEnv() + + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + originalDelay := 5 * time.Second + scheduleToCloseTimeout := 5 * time.Minute + + startResp, err := env.FrontendClient().StartActivityExecution(s.Context(), &workflowservice.StartActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + ActivityType: env.Tv().ActivityType(), + Identity: env.Tv().WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), + ScheduleToCloseTimeout: durationpb.New(scheduleToCloseTimeout), + StartDelay: durationpb.New(originalDelay), + RetryPolicy: &commonpb.RetryPolicy{ + // Retry interval long enough to keep the activity in backoff through RestoreOriginal, + // short enough to fit comfortably inside ScheduleToCloseTimeout so the activity is + // not pre-emptively timed out instead of scheduled for retry. + InitialInterval: durationpb.New(1 * time.Minute), + MaximumAttempts: 3, + }, + }) + require.NoError(t, err) + + // Drop start_delay so the activity dispatches immediately. + _, err = env.FrontendClient().UpdateActivityExecutionOptions(s.Context(), &workflowservice.UpdateActivityExecutionOptionsRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + ActivityOptions: &activitypb.ActivityOptions{StartDelay: durationpb.New(0)}, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"start_delay"}}, + }) + require.NoError(t, err) + + pollResp, err := env.pollActivityTaskQueue(s.Context(), taskQueue) + require.NoError(t, err) + require.NotEmpty(t, pollResp.GetTaskToken()) + + descResp, err := env.FrontendClient().DescribeActivityExecution(s.Context(), &workflowservice.DescribeActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + }) + require.NoError(t, err) + scheduleTime := descResp.GetInfo().GetScheduleTime().AsTime() + expectedDeadlineAfterUpdate := scheduleTime.Add(scheduleToCloseTimeout) + + // Fail the attempt with a retryable failure to drive the activity back to SCHEDULED. + _, err = env.FrontendClient().RespondActivityTaskFailed(s.Context(), &workflowservice.RespondActivityTaskFailedRequest{ + Namespace: env.Namespace().String(), + TaskToken: pollResp.GetTaskToken(), + Failure: &failurepb.Failure{ + Message: "retryable failure", + FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ + ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{NonRetryable: false}, + }, + }, + }) + require.NoError(t, err) + + descResp, err = env.FrontendClient().DescribeActivityExecution(s.Context(), &workflowservice.DescribeActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + }) + require.NoError(t, err) + require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_SCHEDULED, descResp.GetInfo().GetRunState(), + "activity should be in retry backoff (SCHEDULED) after a retryable failure") + + _, err = env.FrontendClient().UpdateActivityExecutionOptions(s.Context(), &workflowservice.UpdateActivityExecutionOptionsRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + RestoreOriginal: true, + }) + require.NoError(t, err) + + descResp, err = env.FrontendClient().DescribeActivityExecution(s.Context(), &workflowservice.DescribeActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + }) + require.NoError(t, err) + require.EqualValues(t, 0, descResp.GetInfo().GetStartDelay().AsDuration(), + "start_delay should not be restored when the activity has already dispatched") + require.Equal(t, expectedDeadlineAfterUpdate, descResp.GetInfo().GetExpirationTime().AsTime(), + "close-deadline should not shift when start_delay is left untouched") + }) + + // The guard accepts the field mask path in either snake_case or camelCase form. + s.Run("UpdateCamelCaseFieldMask_Rejected", func(s *standaloneActivityTestSuite) { + t := s.T() + env := s.newTestEnv() + + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + + startResp, err := env.FrontendClient().StartActivityExecution(s.Context(), &workflowservice.StartActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + ActivityType: env.Tv().ActivityType(), + Identity: env.Tv().WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), + StartDelay: durationpb.New(60 * time.Second), + }) + require.NoError(t, err) + + // CamelCase path should still trigger the start_delay guard. + _, err = env.FrontendClient().UpdateActivityExecutionOptions(s.Context(), &workflowservice.UpdateActivityExecutionOptionsRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + ActivityOptions: &activitypb.ActivityOptions{StartDelay: durationpb.New(-1 * time.Second)}, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"startDelay"}}, + }) + require.Error(t, err) + var invArg *serviceerror.InvalidArgument + require.ErrorAs(t, err, &invArg) + require.Contains(t, invArg.Message, "StartDelay") + }) + + // Verifies the ScheduleToStart timeout is reissued anchored to the updated firstDispatchTime. Without + // correct re-anchoring, the activity would time out at scheduleTime + originalDelay + scheduleToStartTimeout + // instead of at the extended deadline. + s.Run("UpdateStartDelay_ScheduleToStartReanchored", func(s *standaloneActivityTestSuite) { + t := s.T() + env := s.newTestEnv() + + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + originalDelay := 1 * time.Second + updatedDelay := 4 * time.Second + scheduleToStartTimeout := 1 * time.Second + + startResp, err := env.FrontendClient().StartActivityExecution(s.Context(), &workflowservice.StartActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + ActivityType: env.Tv().ActivityType(), + Identity: env.Tv().WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + StartToCloseTimeout: durationpb.New(30 * time.Second), + ScheduleToStartTimeout: durationpb.New(scheduleToStartTimeout), + StartDelay: durationpb.New(originalDelay), + }) + require.NoError(t, err) + + _, err = env.FrontendClient().UpdateActivityExecutionOptions(s.Context(), &workflowservice.UpdateActivityExecutionOptionsRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + ActivityOptions: &activitypb.ActivityOptions{StartDelay: durationpb.New(updatedDelay)}, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"start_delay"}}, + }) + require.NoError(t, err) + + // Must not time out before scheduleTime + updatedDelay + scheduleToStartTimeout = T+5s. + // With the bug, would time out at scheduleTime + originalDelay + scheduleToStartTimeout = T+2s. + require.Never(t, func() bool { + descResp, err := env.FrontendClient().DescribeActivityExecution(s.Context(), &workflowservice.DescribeActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + }) + return err != nil || descResp.GetInfo().GetStatus() == enumspb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT + }, updatedDelay+scheduleToStartTimeout-timerSafetyMargin, 100*time.Millisecond, + "activity should not time out before scheduleTime + updatedDelay + scheduleToStartTimeout") + + // Eventually times out at the correct deadline. + await.Require(s.Context(), t, func(c *await.T) { + resp, err := env.FrontendClient().DescribeActivityExecution(c.Context(), &workflowservice.DescribeActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + }) + require.NoError(c, err) + require.Equal(c, enumspb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT, resp.GetInfo().GetStatus()) + }, 10*time.Second, 100*time.Millisecond) + }) + + // During retry backoff an unrelated options update must not re-apply start_delay to the retry timing. + s.Run("UpdateDuringRetryBackoff_DoesNotReapplyStartDelay", func(s *standaloneActivityTestSuite) { + t := s.T() + env := s.newTestEnv() + + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + startDelay := 2 * time.Second + retryInterval := 2 * time.Second + + startResp, err := env.FrontendClient().StartActivityExecution(s.Context(), &workflowservice.StartActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + ActivityType: env.Tv().ActivityType(), + Identity: env.Tv().WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + StartToCloseTimeout: durationpb.New(60 * time.Second), + ScheduleToCloseTimeout: durationpb.New(60 * time.Second), + RetryPolicy: &commonpb.RetryPolicy{ + InitialInterval: durationpb.New(retryInterval), + BackoffCoefficient: 1.0, + MaximumAttempts: 3, + }, + StartDelay: durationpb.New(startDelay), + }) + require.NoError(t, err) + + // Wait for and fail attempt 1. + pollResp1, err := env.pollActivityTaskQueue(s.Context(), taskQueue) + require.NoError(t, err) + require.EqualValues(t, 1, pollResp1.GetAttempt()) + + _, err = env.FrontendClient().RespondActivityTaskFailed(s.Context(), &workflowservice.RespondActivityTaskFailedRequest{ + Namespace: env.Namespace().String(), + TaskToken: pollResp1.TaskToken, + Failure: &failurepb.Failure{ + Message: "retryable failure", + FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ + ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{NonRetryable: false}, + }, + }, + }) + require.NoError(t, err) + failTime := time.Now() + + // Update an unrelated option (heartbeat) while the activity is in retry backoff. + _, err = env.FrontendClient().UpdateActivityExecutionOptions(s.Context(), &workflowservice.UpdateActivityExecutionOptionsRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + ActivityOptions: &activitypb.ActivityOptions{HeartbeatTimeout: durationpb.New(10 * time.Second)}, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"heartbeat_timeout"}}, + }) + require.NoError(t, err) + + // Attempt 2 should arrive at the retry interval (~retryInterval after fail), not delayed by start_delay. + pollResp2, err := env.pollActivityTaskQueue(s.Context(), taskQueue) + require.NoError(t, err) + require.NotEmpty(t, pollResp2.GetTaskToken()) + require.EqualValues(t, 2, pollResp2.GetAttempt()) + require.Less(t, time.Since(failTime), retryInterval+startDelay, + "retry was delayed beyond the retry interval, suggesting start_delay was incorrectly re-applied") + }) } func (s *standaloneActivityTestSuite) TestUpdateActivityExecutionOptions() {