Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion common/worker_versioning/worker_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,10 +657,22 @@ func GetOverridePinnedVersion(override *workflowpb.VersioningOverride) *deployme
}
return nil
}

func GetOverrideOneTimeTargetVersion(override *workflowpb.VersioningOverride) *deploymentpb.WorkerDeploymentVersion {
return override.GetOneTime().GetTargetDeploymentVersion()
}

func GetOverrideTargetDeploymentVersion(override *workflowpb.VersioningOverride) *deploymentpb.WorkerDeploymentVersion {
if OverrideIsPinned(override) {
return GetOverridePinnedVersion(override)
}
return GetOverrideOneTimeTargetVersion(override)
}

func ExtractVersioningBehaviorFromOverride(override *workflowpb.VersioningOverride) enumspb.VersioningBehavior {
if override.GetAutoUpgrade() {
return enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE
} else if override.GetPinned() != nil {
} else if override.GetPinned() != nil || override.GetOneTime() != nil {
return enumspb.VERSIONING_BEHAVIOR_PINNED
}

Expand Down Expand Up @@ -737,6 +749,11 @@ func ValidateVersioningOverrideAndGetReactivationEligibility(ctx context.Context
return false, 0, serviceerror.NewInvalidArgument("must specify pinned override behavior if override is pinned.")
}
return validateVersionAndGetReactivationEligibility(ctx, p.GetVersion(), matchingClient, versionCache, tq, tqType, namespaceID)
} else if oneTime := override.GetOneTime(); oneTime != nil {
if oneTime.GetTargetDeploymentVersion() == nil {
return false, 0, serviceerror.NewInvalidArgument("must provide target deployment version if override is one-time.")
}
return validateVersionAndGetReactivationEligibility(ctx, oneTime.GetTargetDeploymentVersion(), matchingClient, versionCache, tq, tqType, namespaceID)
}

//nolint:staticcheck // SA1019: worker versioning v0.31
Expand Down
55 changes: 55 additions & 0 deletions common/worker_versioning/worker_versioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,61 @@ func TestValidateVersioningOverrideAndGetReactivationEligibility(t *testing.T) {
},
expectError: false,
},
{
name: "v0.32: One-time override, with cache hit, returns cached reactivation eligibility",
override: &workflowpb.VersioningOverride{
Override: &workflowpb.VersioningOverride_OneTime{
OneTime: &workflowpb.VersioningOverride_OneTimeOverride{
TargetDeploymentVersion: testVersion,
},
},
},
setupCache: func(c *testVersionMembershipCache) {
c.Put(testNamespaceID, testTaskQueue, enumspb.TASK_QUEUE_TYPE_WORKFLOW, testVersion.DeploymentName, testVersion.BuildId, true, false, 42)
},
setupMock: func(m *matchingservicemock.MockMatchingServiceClient) {
m.EXPECT().CheckTaskQueueVersionMembership(gomock.Any(), gomock.Any()).Times(0)
},
expectError: false,
expectedShouldSkipReactivation: false,
expectedRevisionNumber: 42,
},
{
name: "v0.32: One-time override, with cache miss, RPC returns member and active",
override: &workflowpb.VersioningOverride{
Override: &workflowpb.VersioningOverride_OneTime{
OneTime: &workflowpb.VersioningOverride_OneTimeOverride{
TargetDeploymentVersion: testVersion,
},
},
},
setupCache: func(c *testVersionMembershipCache) {},
setupMock: func(m *matchingservicemock.MockMatchingServiceClient) {
m.EXPECT().CheckTaskQueueVersionMembership(
gomock.Any(),
gomock.Any(),
).Return(&matchingservice.CheckTaskQueueVersionMembershipResponse{
IsMember: true,
ShouldSkipReactivation: true,
RevisionNumber: 7,
}, nil)
},
expectError: false,
expectedShouldSkipReactivation: true,
expectedRevisionNumber: 7,
},
{
name: "v0.32: One-time override, without target deployment version, returns error",
override: &workflowpb.VersioningOverride{
Override: &workflowpb.VersioningOverride_OneTime{
OneTime: &workflowpb.VersioningOverride_OneTimeOverride{},
},
},
setupCache: func(c *testVersionMembershipCache) {},
setupMock: func(m *matchingservicemock.MockMatchingServiceClient) {},
expectError: true,
errorContains: "must provide target deployment version if override is one-time",
},
{
name: "v0.32: Pinned override, with cache hit (drained), returns isDrainedOrInactive=true and cached revision",
override: &workflowpb.VersioningOverride{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ require (
go.opentelemetry.io/otel/sdk v1.43.0
go.opentelemetry.io/otel/sdk/metric v1.43.0
go.opentelemetry.io/otel/trace v1.43.0
go.temporal.io/api v1.62.15-0.20260615235047-378792ab2240
go.temporal.io/api v1.62.15-0.20260618002053-7c062185c563
go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2
go.temporal.io/sdk v1.41.1
go.uber.org/fx v1.24.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,8 @@ go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.3.0 h1:R
go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.3.0/go.mod h1:I89cynRj8y+383o7tEQVg2SVA6SRgDVIouWPUVXjx0U=
go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.3.0 h1:CQvJSldHRUN6Z8jsUeYv8J0lXRvygALXIzsmAeCcZE0=
go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.3.0/go.mod h1:xSQ+mEfJe/GjK1LXEyVOoSI1N9JV9ZI923X5kup43W4=
go.temporal.io/api v1.62.15-0.20260615235047-378792ab2240 h1:Up/CNfkScGxN1TdrGZ3ez+0k6MIIhuhlbBgdZnrPhm0=
go.temporal.io/api v1.62.15-0.20260615235047-378792ab2240/go.mod h1:0k75tRljEuELWGeXjEZZO7zYqBln4+1FrG6+IMOMy7Q=
go.temporal.io/api v1.62.15-0.20260618002053-7c062185c563 h1:gPketS2mBLHDxz8l2xldQxQFEmm14h+sqmaJ5QQmKi4=
go.temporal.io/api v1.62.15-0.20260618002053-7c062185c563/go.mod h1:0k75tRljEuELWGeXjEZZO7zYqBln4+1FrG6+IMOMy7Q=
go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2 h1:1hKeH3GyR6YD6LKMHGCZ76t6h1Sgha0hXVQBxWi3dlQ=
go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2/go.mod h1:T8dnzVPeO+gaUTj9eDgm/lT2lZH4+JXNvrGaQGyVi50=
go.temporal.io/sdk v1.41.1 h1:yOpvsHyDD1lNuwlGBv/SUodCPhjv9nDeC9lLHW/fJUA=
Expand Down
32 changes: 32 additions & 0 deletions service/history/api/updateworkflowoptions/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,38 @@ func mergeWorkflowExecutionOptions(
mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride()
}

if _, ok := updateFields["versioningOverride.pinned"]; ok {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is something which i just over thought while working on this, or something we missed while we wrote update workflow options?

question: why don't we have these mask options for all the paths for api's that were added in the v0.32 phase? cc - @carlydf

have added them here, but could be totally off hence the q

mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride()
}

if _, ok := updateFields["versioningOverride.pinned.behavior"]; ok {
mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride()
}

if _, ok := updateFields["versioningOverride.pinned.version"]; ok {
mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride()
}

if _, ok := updateFields["versioningOverride.autoUpgrade"]; ok {
mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride()
}

if _, ok := updateFields["versioningOverride.oneTime"]; ok {
mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride()
}

if _, ok := updateFields["versioningOverride.oneTime.targetDeploymentVersion"]; ok {
mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride()
}

if _, ok := updateFields["versioningOverride.oneTime.targetDeploymentVersion.deploymentName"]; ok {
mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride()
}

if _, ok := updateFields["versioningOverride.oneTime.targetDeploymentVersion.buildId"]; ok {
mergeInto.VersioningOverride = mergeFrom.GetVersioningOverride()
}

// ==== Priority

if _, ok := updateFields["priority"]; ok {
Expand Down
69 changes: 69 additions & 0 deletions service/history/api/updateworkflowoptions/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,18 @@ var (
PinnedVersion: "X.B",
},
}
oneTimeOverrideOptions = &workflowpb.WorkflowExecutionOptions{
VersioningOverride: &workflowpb.VersioningOverride{
Override: &workflowpb.VersioningOverride_OneTime{
OneTime: &workflowpb.VersioningOverride_OneTimeOverride{
TargetDeploymentVersion: &deploymentpb.WorkerDeploymentVersion{
DeploymentName: "X",
BuildId: "C",
},
},
},
},
}
)

func TestMergeOptions_VersionOverrideMask(t *testing.T) {
Expand Down Expand Up @@ -161,6 +173,63 @@ func TestMergeOptions_PartialMask(t *testing.T) {

}

func TestMergeOptions_VersionOverrideOneofNestedMask(t *testing.T) {
testCases := []struct {
name string
mask *fieldmaskpb.FieldMask
}{
{
name: "one_time field",
mask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override.one_time"}},
},
{
name: "one_time target version field",
mask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override.one_time.target_deployment_version"}},
},
{
name: "one_time target version deployment name field",
mask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override.one_time.target_deployment_version.deployment_name"}},
},
{
name: "one_time target version build id field",
mask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override.one_time.target_deployment_version.build_id"}},
},
{
name: "pinned oneof field",
mask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override.pinned"}},
},
{
name: "pinned nested version field",
mask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override.pinned.version"}},
},
{
name: "auto_upgrade oneof field",
mask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override.auto_upgrade"}},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
input := proto.Clone(pinnedOverrideOptionsB).(*workflowpb.WorkflowExecutionOptions)
requested := proto.Clone(oneTimeOverrideOptions).(*workflowpb.WorkflowExecutionOptions)
if tc.name == "auto_upgrade oneof field" {
requested = &workflowpb.WorkflowExecutionOptions{
VersioningOverride: &workflowpb.VersioningOverride{
Override: &workflowpb.VersioningOverride_AutoUpgrade{
AutoUpgrade: true,
},
},
}
}

merged, optionsToReapply, err := mergeWorkflowExecutionOptions(input, requested, tc.mask)
require.NoError(t, err)
require.True(t, proto.Equal(requested, merged))
require.False(t, optionsToReapply.hasChanges())
})
}
}

func TestMergeOptions_EmptyMask(t *testing.T) {
emptyUpdateMask := &fieldmaskpb.FieldMask{Paths: []string{}}
input := pinnedOverrideOptionsB
Expand Down
14 changes: 5 additions & 9 deletions service/history/api/worker_versioning_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ type VersionReactivationSignalerFn func(
) error

// ReactivateVersionWorkflowIfPinned sends a reactivation signal to the version workflow
// when workflows are pinned to a potentially DRAINED/INACTIVE version.
// when workflow execution options target a potentially DRAINED/INACTIVE version
// (for example, by specifying a Pinned or OneTime override via update options).
// This is a fire-and-forget operation - the signal is sent asynchronously and errors are
// logged by the signaler implementation. The signaler itself is responsible for per-pod
// dedup by revision number; cross-pod duplicates fold at the receiver via a deterministic
Expand All @@ -47,20 +48,15 @@ func ReactivateVersionWorkflowIfPinned(
return
}

// Only process if we're pinning to a specific version
if !worker_versioning.OverrideIsPinned(override) {
return
}

pinnedVersion := worker_versioning.GetOverridePinnedVersion(override)
if pinnedVersion == nil {
targetVersion := worker_versioning.GetOverrideTargetDeploymentVersion(override)
if targetVersion == nil {
return
}

// Send the signal asynchronously to avoid adding latency to the caller's request.
// Errors are logged by the signaler implementation (e.g. via convertAndRecordError). However,
// errors are not propagated to the caller as this is a fire-and-forget operation.
go func() {
signaler(context.Background(), namespaceEntry, pinnedVersion.GetDeploymentName(), pinnedVersion.GetBuildId(), revisionNumber) //nolint:errcheck
signaler(context.Background(), namespaceEntry, targetVersion.GetDeploymentName(), targetVersion.GetBuildId(), revisionNumber) //nolint:errcheck
}()
}
8 changes: 4 additions & 4 deletions service/history/interfaces/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,8 @@ type (
// GetEffectiveDeployment returns the effective deployment in the following order:
// 1. DeploymentVersionTransition.Deployment: this is returned when the wf is transitioning to a
// new deployment
// 2. VersioningOverride.Deployment: this is returned when user has set a PINNED override
// at wf start time, or later via UpdateWorkflowExecutionOptions.
// 2. VersioningOverride target: this is returned when user has set a PINNED override or
// pending one-time move, either at wf start time or later via UpdateWorkflowExecutionOptions.
// 3. Deployment: this is returned when there is no transition and no override (the most
// common case). Deployment is set based on the worker-sent deployment in the latest WFT
// completion. Exception: if Deployment is set but the workflow's effective behavior is
Expand All @@ -387,8 +387,8 @@ type (
// GetEffectiveVersioningBehavior returns the effective versioning behavior in the following
// order:
// 1. DeploymentVersionTransition: if there is a transition, then effective behavior is AUTO_UPGRADE.
// 2. VersioningOverride.Behavior: this is returned when user has set a behavior override
// at wf start time, or later via UpdateWorkflowExecutionOptions.
// 2. VersioningOverride behavior: this is returned when user has set a behavior override
// or pending one-time move at wf start time, or later via UpdateWorkflowExecutionOptions.
// 3. Behavior: this is returned when there is no override (most common case). Behavior is
// set based on the worker-sent deployment in the latest WFT completion.
GetEffectiveVersioningBehavior() enumspb.VersioningBehavior
Expand Down
18 changes: 9 additions & 9 deletions service/history/transfer_queue_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,14 +984,14 @@ func (t *transferQueueActiveTaskExecutor) processStartChildExecution(
}
}

// Pinned override is inherited if Task Queue of new run is compatible with the override version.
var inheritedPinnedOverride *workflowpb.VersioningOverride
if o := mutableState.GetExecutionInfo().GetVersioningInfo().GetVersioningOverride(); worker_versioning.OverrideIsPinned(o) {
inheritedPinnedOverride = o
// Pinned and one-time overrides are inherited if Task Queue of new run is compatible with the override version.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason why the child workflow path is receiving this special treatment but none of CAN/retry/cron paths are is because of the following idea:

  • if an operator has a "Started" WFT and were to run a VersioningOverride (One Time Move) operation, and the said WFT returns a command to CAN/retry/cron, we reject those commands since we have a buffered history event thanks to that pending one time move command. Thus, for these three primitives, we shall always only have the case where the task finishes and the pending one time move is cleared, after which we handle commands (i have added versioning tests that are validating this)

  • on the other hand, the child path is a bit different in the sense that we don't reject this start child command even if we do have buffered events.

var inheritedVersioningOverride *workflowpb.VersioningOverride
if o := mutableState.GetExecutionInfo().GetVersioningInfo().GetVersioningOverride(); worker_versioning.GetOverrideTargetDeploymentVersion(o) != nil {
inheritedVersioningOverride = o
newTQ := attributes.GetTaskQueue().GetName()
if newTQ != mutableState.GetExecutionInfo().GetTaskQueue() && !newTQInPinnedVersion ||
attributes.GetNamespaceId() != mutableState.GetExecutionInfo().GetNamespaceId() { // don't inherit pinned version if child is in a different namespace
inheritedPinnedOverride = nil
attributes.GetNamespaceId() != mutableState.GetExecutionInfo().GetNamespaceId() { // don't inherit override if child is in a different namespace
inheritedVersioningOverride = nil
}
}

Expand Down Expand Up @@ -1091,7 +1091,7 @@ func (t *transferQueueActiveTaskExecutor) processStartChildExecution(
inheritedBuildId,
initiatedEvent.GetUserMetadata(),
shouldTerminateAndStartChild,
inheritedPinnedOverride,
inheritedVersioningOverride,
inheritedPinnedVersion,
priorities.Merge(mutableState.GetExecutionInfo().Priority, attributes.Priority),
inheritedAutoUpgradeInfo,
Expand Down Expand Up @@ -1665,7 +1665,7 @@ func (t *transferQueueActiveTaskExecutor) startWorkflow(
inheritedBuildId string,
userMetadata *sdkpb.UserMetadata,
shouldTerminateAndStartChild bool,
inheritedPinnedOverride *workflowpb.VersioningOverride,
inheritedVersioningOverride *workflowpb.VersioningOverride,
inheritedPinnedVersion *deploymentpb.WorkerDeploymentVersion,
priority *commonpb.Priority,
inheritedAutoUpgradeInfo *deploymentpb.InheritedAutoUpgradeInfo,
Expand All @@ -1690,7 +1690,7 @@ func (t *transferQueueActiveTaskExecutor) startWorkflow(
Memo: attributes.Memo,
SearchAttributes: attributes.SearchAttributes,
UserMetadata: userMetadata,
VersioningOverride: inheritedPinnedOverride,
VersioningOverride: inheritedVersioningOverride,
Priority: priority,
TimeSkippingConfig: attributes.GetTimeSkippingConfig(),
}
Expand Down
Loading
Loading