-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Add support for updating start_delay via UpdateActivityOptions #10745
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/activity-operator-cmds
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -184,6 +184,7 @@ func NewStandaloneActivity( | |
| HeartbeatTimeout: request.GetHeartbeatTimeout(), | ||
| RetryPolicy: request.GetRetryPolicy(), | ||
| Priority: request.GetPriority(), | ||
| StartDelay: request.GetStartDelay(), | ||
| }, | ||
| }, | ||
| LastAttempt: chasm.NewDataField(ctx, &activitypb.ActivityAttemptState{}), | ||
|
|
@@ -624,6 +625,29 @@ func (a *Activity) UpdateActivityExecutionOptions( | |
|
|
||
| frontendReq := req.GetFrontendRequest() | ||
|
|
||
| // start_delay updates are only valid while the activity is still in its delay window. | ||
| var hasStartDelayInMask bool | ||
| if mask := frontendReq.GetUpdateMask(); mask != nil { | ||
| _, hasStartDelayInMask = util.ParseFieldMask(mask)["startDelay"] | ||
| } | ||
| if !frontendReq.GetRestoreOriginal() && hasStartDelayInMask { | ||
| newDelay := frontendReq.GetActivityOptions().GetStartDelay() | ||
| if err := validateStartDelay(newDelay); err != nil { | ||
| return nil, err | ||
| } | ||
| if newDelay.AsDuration() > 0 { | ||
| actCtx := activityContextFromChasm(ctx) | ||
| if !actCtx.config.StartDelayEnabled(frontendReq.GetNamespace()) { | ||
| return nil, serviceerror.NewInvalidArgument("start_delay is not enabled for this namespace") | ||
| } | ||
| } | ||
| if a.GetStatus() != activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED || | ||
| !a.firstDispatchTime().After(ctx.Now(a)) { | ||
| return nil, serviceerror.NewInvalidArgument( | ||
| "cannot update start_delay: activity is no longer in its delay window") | ||
| } | ||
| } | ||
|
|
||
| if frontendReq.GetRestoreOriginal() { | ||
| ogOptions := a.GetOriginalOptions() | ||
| a.TaskQueue = common.CloneProto(ogOptions.GetTaskQueue()) | ||
|
|
@@ -633,6 +657,11 @@ func (a *Activity) UpdateActivityExecutionOptions( | |
| a.HeartbeatTimeout = common.CloneProto(ogOptions.GetHeartbeatTimeout()) | ||
| a.RetryPolicy = common.CloneProto(ogOptions.GetRetryPolicy()) | ||
| a.Priority = common.CloneProto(ogOptions.GetPriority()) | ||
| // start_delay only governs the first dispatch. Once the first attempt has started, restoring | ||
| // the original value would shift ScheduleToClose without affecting dispatch timing. | ||
| if a.GetFirstAttemptStartedTime() == nil { | ||
| a.StartDelay = common.CloneProto(ogOptions.GetStartDelay()) | ||
| } | ||
| } else { | ||
| if err := a.mergeActivityOptions(frontendReq); err != nil { | ||
| return nil, err | ||
|
|
@@ -650,9 +679,8 @@ func (a *Activity) UpdateActivityExecutionOptions( | |
|
|
||
| // Add a new ScheduleToCloseTimeoutTask at the (possibly updated) deadline. | ||
| // Increment the stamp so the previous task is invalidated by the Validate check. | ||
| if timeout := a.GetScheduleToCloseTimeout().AsDuration(); timeout > 0 { | ||
| if deadline := a.scheduleToCloseDeadline(); !deadline.IsZero() { | ||
| a.ScheduleToCloseStamp++ | ||
| deadline := a.GetScheduleTime().AsTime().Add(timeout) | ||
| ctx.AddTask( | ||
| a, | ||
| chasm.TaskAttributes{ScheduledTime: deadline}, | ||
|
|
@@ -662,68 +690,9 @@ func (a *Activity) UpdateActivityExecutionOptions( | |
|
|
||
| attempt.Stamp++ | ||
|
|
||
| if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED || | ||
| a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED || | ||
| a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED || | ||
| a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_RESET_REQUESTED { | ||
| // Re-create the start-to-close timeout task with the new stamp and (possibly updated) timeout. | ||
| // The old task was invalidated by the stamp increment above. | ||
| if timeout := a.GetStartToCloseTimeout().AsDuration(); timeout > 0 { | ||
| deadline := attempt.GetStartedTime().AsTime().Add(timeout) | ||
| ctx.AddTask( | ||
| a, | ||
| chasm.TaskAttributes{ScheduledTime: deadline}, | ||
| &activitypb.StartToCloseTimeoutTask{Stamp: attempt.GetStamp()}, | ||
| ) | ||
| } | ||
|
|
||
| if hbTimeout := a.GetHeartbeatTimeout().AsDuration(); hbTimeout > 0 { | ||
| // The next heartbeat time is the max of (the last heartbeats recorded time and | ||
| // the current attempts started time) plus the heartbeat timeout | ||
| lastHb, _ := a.LastHeartbeat.TryGet(ctx) | ||
| lastHbTime := util.MaxTime( | ||
| lastHb.GetRecordedTime().AsTime(), | ||
| attempt.GetStartedTime().AsTime(), | ||
| ).Add(hbTimeout) | ||
| ctx.AddTask( | ||
| a, | ||
| chasm.TaskAttributes{ | ||
| ScheduledTime: lastHbTime, | ||
| }, | ||
| &activitypb.HeartbeatTimeoutTask{ | ||
| Stamp: attempt.GetStamp(), | ||
| }, | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| // TODO(saa-ga): need to handle the StartDelay timer | ||
|
|
||
| a.reissueRunningAttemptTimers(ctx, attempt) | ||
| if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED { | ||
| // Re dispatch this activity | ||
| retryTime := attemptScheduleTimeForRetry(attempt) | ||
| var dispatchAttrs chasm.TaskAttributes | ||
| if retryTime != nil { | ||
| // in backoff, future retry time | ||
| dispatchAttrs.ScheduledTime = retryTime.AsTime() | ||
| } | ||
| ctx.AddTask( | ||
| a, | ||
| dispatchAttrs, | ||
| &activitypb.ActivityDispatchTask{Stamp: attempt.GetStamp()}, | ||
| ) | ||
|
|
||
| if timeout := a.GetScheduleToStartTimeout().AsDuration(); timeout > 0 { | ||
| schedToStart := ctx.Now(a).Add(timeout) | ||
| if retryTime != nil { | ||
| schedToStart = retryTime.AsTime().Add(timeout) | ||
| } | ||
| ctx.AddTask( | ||
| a, | ||
| chasm.TaskAttributes{ScheduledTime: schedToStart}, | ||
| &activitypb.ScheduleToStartTimeoutTask{Stamp: attempt.GetStamp()}, | ||
| ) | ||
| } | ||
| a.reissueScheduledDispatch(ctx, attempt) | ||
| } | ||
|
|
||
| metricsHandler, err := a.enrichMetricsHandler(ctx, metrics.ActivityUpdateOptionsScope) | ||
|
|
@@ -742,6 +711,7 @@ func (a *Activity) UpdateActivityExecutionOptions( | |
| HeartbeatTimeout: a.GetHeartbeatTimeout(), | ||
| RetryPolicy: a.GetRetryPolicy(), | ||
| Priority: a.GetPriority(), | ||
| StartDelay: a.GetStartDelay(), | ||
| }, | ||
| }, | ||
| }, nil | ||
|
|
@@ -763,6 +733,7 @@ func (a *Activity) mergeActivityOptions( | |
| HeartbeatTimeout: a.HeartbeatTimeout, | ||
| Priority: a.Priority, | ||
| RetryPolicy: a.RetryPolicy, | ||
| StartDelay: a.StartDelay, | ||
| } | ||
|
|
||
| if err := activityoptions.MergeActivityOptions(ao, req.GetActivityOptions(), updateFields); err != nil { | ||
|
|
@@ -784,6 +755,7 @@ func (a *Activity) mergeActivityOptions( | |
| a.HeartbeatTimeout = ao.HeartbeatTimeout | ||
| a.Priority = ao.Priority | ||
| a.RetryPolicy = ao.RetryPolicy | ||
| a.StartDelay = ao.StartDelay | ||
|
|
||
| return nil | ||
| } | ||
|
|
@@ -1226,6 +1198,78 @@ func (a *Activity) firstDispatchTime() time.Time { | |
| return a.ScheduleTime.AsTime().Add(a.GetStartDelay().AsDuration()) | ||
| } | ||
|
|
||
| // reissueScheduledDispatch re-emits the ActivityDispatchTask and ScheduleToStart timeout task for | ||
| // a SCHEDULED activity. Retries fire at the retry time; first attempts dispatch now, lifted to | ||
| // honor any pending start_delay. | ||
| func (a *Activity) reissueScheduledDispatch(ctx chasm.MutableContext, attempt *activitypb.ActivityAttemptState) { | ||
| var scheduleTime time.Time | ||
| if retryTime := attemptScheduleTimeForRetry(attempt); retryTime != nil { | ||
| scheduleTime = retryTime.AsTime() | ||
| } else { | ||
| scheduleTime = a.respectStartDelay(ctx.Now(a)) | ||
| } | ||
| ctx.AddTask( | ||
| a, | ||
| chasm.TaskAttributes{ScheduledTime: scheduleTime}, | ||
| &activitypb.ActivityDispatchTask{Stamp: attempt.GetStamp()}, | ||
| ) | ||
| if timeout := a.GetScheduleToStartTimeout().AsDuration(); timeout > 0 { | ||
| ctx.AddTask( | ||
| a, | ||
| chasm.TaskAttributes{ScheduledTime: scheduleTime.Add(timeout)}, | ||
| &activitypb.ScheduleToStartTimeoutTask{Stamp: attempt.GetStamp()}, | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| // reissueRunningAttemptTimers re-emits the StartToClose and Heartbeat timeout tasks for the | ||
| // currently-running attempt, anchored to the attempt's StartedTime. Called from options-update | ||
| // paths after stamp bump so the old tasks are invalidated and replaced with the (possibly | ||
| // updated) timeouts. No-op unless the activity is in a status where a worker holds the task token | ||
| // (STARTED / CANCEL_REQUESTED / PAUSE_REQUESTED / RESET_REQUESTED). | ||
| func (a *Activity) reissueRunningAttemptTimers(ctx chasm.MutableContext, attempt *activitypb.ActivityAttemptState) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Refactoring due to lint complaining code complexity |
||
| if a.GetStatus() != activitypb.ACTIVITY_EXECUTION_STATUS_STARTED && | ||
| a.GetStatus() != activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED && | ||
| a.GetStatus() != activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED && | ||
| a.GetStatus() != activitypb.ACTIVITY_EXECUTION_STATUS_RESET_REQUESTED { | ||
| return | ||
| } | ||
| if timeout := a.GetStartToCloseTimeout().AsDuration(); timeout > 0 { | ||
| deadline := attempt.GetStartedTime().AsTime().Add(timeout) | ||
| ctx.AddTask( | ||
| a, | ||
| chasm.TaskAttributes{ScheduledTime: deadline}, | ||
| &activitypb.StartToCloseTimeoutTask{Stamp: attempt.GetStamp()}, | ||
| ) | ||
| } | ||
| if hbTimeout := a.GetHeartbeatTimeout().AsDuration(); hbTimeout > 0 { | ||
| // Next heartbeat fires at max(last recorded heartbeat, current attempt start) + heartbeat timeout. | ||
| lastHb, _ := a.LastHeartbeat.TryGet(ctx) | ||
| lastHbTime := util.MaxTime( | ||
| lastHb.GetRecordedTime().AsTime(), | ||
| attempt.GetStartedTime().AsTime(), | ||
| ).Add(hbTimeout) | ||
| ctx.AddTask( | ||
| a, | ||
| chasm.TaskAttributes{ScheduledTime: lastHbTime}, | ||
| &activitypb.HeartbeatTimeoutTask{Stamp: attempt.GetStamp()}, | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| // respectStartDelay lifts a candidate dispatch time up to scheduleTime + start_delay when the | ||
| // activity has not yet been picked up by a worker, so pre-dispatch re-scheduling (unpause, Reset+ | ||
| // RestoreOriginalOptions, options update) honors start_delay. No-op once dispatched. | ||
| func (a *Activity) respectStartDelay(scheduleTime time.Time) time.Time { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. important helper reused in downstream PRs |
||
| if a.GetFirstAttemptStartedTime() != nil { | ||
| return scheduleTime | ||
| } | ||
| if firstDispatch := a.firstDispatchTime(); firstDispatch.After(scheduleTime) { | ||
| return firstDispatch | ||
| } | ||
| return scheduleTime | ||
| } | ||
|
|
||
| // scheduleToCloseDeadline returns the absolute time at which the ScheduleToClose timeout expires, | ||
| // accounting for start delay. Returns zero time if no ScheduleToClose timeout is set. | ||
| func (a *Activity) scheduleToCloseDeadline() time.Time { | ||
|
|
||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactoring due to lint complaining code complexity