Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
88d58bb
Migrate worker_deployment_test.go
stephanos Jun 5, 2026
83cde40
Inline ctx and tv locals in test methods
stephanos Jun 6, 2026
866727c
Remove shortNamer; use full test name in tv()
stephanos Jun 6, 2026
a4d0047
Reuse worker service cluster in worker deployment suite
stephanos Jun 18, 2026
a53b6eb
Fix worker deployment lint
stephanos Jun 18, 2026
7110e57
Simplify worker deployment poller helpers
stephanos Jun 18, 2026
658cbad
rename
stephanos Jun 18, 2026
5c20a94
Update worker_deployment_test.go
stephanos Jun 18, 2026
4ca925c
Update worker_deployment_test.go
stephanos Jun 18, 2026
b7fe93e
Update worker_deployment_test.go
stephanos Jun 18, 2026
cb703c7
Update worker_deployment_test.go
stephanos Jun 18, 2026
e0730fe
Update worker_deployment_test.go
stephanos Jun 18, 2026
b2df3b3
Update worker_deployment_test.go
stephanos Jun 18, 2026
ae6a5b1
Update worker_deployment_test.go
stephanos Jun 18, 2026
70fb01c
Update worker_deployment_test.go
stephanos Jun 18, 2026
920d5e3
remove dc
stephanos Jun 19, 2026
f7d8552
Update worker_deployment_test.go
stephanos Jun 19, 2026
ebf0db4
Update worker_deployment_test.go
stephanos Jun 19, 2026
966b51d
Update worker_deployment_test.go
stephanos Jun 19, 2026
3c8a808
Update worker_deployment_test.go
stephanos Jun 19, 2026
584b0e3
Allow suite-scoped test hooks
stephanos Jun 19, 2026
0e39b46
Fix worker deployment list expectation
stephanos Jun 19, 2026
ec2eace
Fix worker deployment sorted version expectation
stephanos Jun 19, 2026
f619d24
Fix worker deployment conflict token version
stephanos Jun 19, 2026
00d7090
Fix task queue stats versioning test flake
stephanos Jun 18, 2026
cc63086
ctx first
stephanos Jun 18, 2026
bf8f022
lint
stephanos Jun 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 55 additions & 50 deletions tests/task_queue_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ type workflowTasksAndActivitiesPollerParams struct {

// taskQueueStatsContext holds the per-test environment and configuration for task queue stats tests.
type taskQueueStatsContext struct {
testcore.Env
*require.Assertions
*VersioningTestEnv
tb testing.TB
ctx context.Context
usePriMatcher bool
Expand All @@ -75,18 +74,17 @@ func newTaskQueueStatsContext(
}
opts = append(opts, behavior.Options()...)
opts = append(opts, extraOpts...)
env := testcore.NewEnv(t, opts...)
env := newVersioningTestEnv(t, opts...)
behavior.InjectHooks(env)
return &taskQueueStatsContext{
Env: env,
Assertions: require.New(t),
tb: t,
ctx: ctx,
usePriMatcher: usePriMatcher,
minPriority: 1,
maxPriority: 5,
defaultPriority: 3,
partitionCount: 2, // kept low to reduce test time on CI
VersioningTestEnv: env,
tb: t,
ctx: ctx,
usePriMatcher: usePriMatcher,
minPriority: 1,
maxPriority: 5,
defaultPriority: 3,
partitionCount: 2, // kept low to reduce test time on CI
}
}

Expand Down Expand Up @@ -309,25 +307,37 @@ func (s *TaskQueueStatsVersionSuite) TestRampingAndCurrentAbsorbUnversionedBackl
env.OverrideDynamicConfig(dynamicconfig.MatchingLongPollExpirationInterval, 10*time.Second)
env.OverrideDynamicConfig(dynamicconfig.TaskQueueInfoByBuildIdTTL, 1*time.Millisecond) // zero means no TTL

ctx, cancel := context.WithTimeout(s.Context(), 120*time.Second)
defer cancel()

tqName := "tq-" + common.GenerateRandomString(5)
deploymentName := testcore.RandomizeStr("deployment")
currentBuildID := "v1"
rampingBuildID := "v2"

pollCtx, cancelPoll := context.WithCancel(ctx)
s.createVersionsInTaskQueue(pollCtx, env, tqName, deploymentName, currentBuildID)
s.createVersionsInTaskQueue(pollCtx, env, tqName, deploymentName, rampingBuildID)
cancelPoll() // cancel the pollers so that we can verify the backlog expectations
pollerCtx, cancelPoller := context.WithCancel(s.Context())
s.createVersionsInTaskQueue(pollerCtx, env, tqName, deploymentName, currentBuildID)
s.createVersionsInTaskQueue(pollerCtx, env, tqName, deploymentName, rampingBuildID)
// Stopping the pollers so that we verify the backlog expectations
cancelPoller()

// Set ramping version to 30%
rampPercentage := 30
s.setRampingVersion(env, deploymentName, rampingBuildID, rampPercentage)

// Set current version
s.setCurrentVersion(env, deploymentName, currentBuildID)
env.waitForTaskQueueVersioningInfo(
s.Context(),
s.T(),
&taskqueuepb.TaskQueue{Name: tqName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
worker_versioning.ExternalWorkerDeploymentVersionToStringV31(&deploymentpb.WorkerDeploymentVersion{
DeploymentName: deploymentName,
BuildId: currentBuildID,
}),
worker_versioning.ExternalWorkerDeploymentVersionToStringV31(&deploymentpb.WorkerDeploymentVersion{
DeploymentName: deploymentName,
BuildId: rampingBuildID,
}),
float32(rampPercentage),
)

// Enqueue unversioned backlog.
unversionedWorkflowCount := 10 * env.partitionCount
Expand Down Expand Up @@ -460,16 +470,14 @@ func (s *TaskQueueStatsVersionSuite) TestCurrentAbsorbsUnversionedBacklog_WhenRa
env.OverrideDynamicConfig(dynamicconfig.MatchingLongPollExpirationInterval, 10*time.Second)
env.OverrideDynamicConfig(dynamicconfig.TaskQueueInfoByBuildIdTTL, 1*time.Millisecond) // zero means no TTL

ctx, cancel := context.WithTimeout(s.Context(), 60*time.Second)
defer cancel()

deploymentName := testcore.RandomizeStr("deployment")
tqName := "tq-" + common.GenerateRandomString(5)
currentBuildID := "v1"

pollCtx, cancelPoll := context.WithCancel(ctx)
s.createVersionsInTaskQueue(pollCtx, env, tqName, deploymentName, currentBuildID)
cancelPoll() // cancel the pollers so that we can verify the backlog expectations
pollerCtx, cancelPoller := context.WithCancel(s.Context())
s.createVersionsInTaskQueue(pollerCtx, env, tqName, deploymentName, currentBuildID)
// Stopping the pollers so that we verify the backlog expectations
cancelPoller()

// Set current version.
s.setCurrentVersion(env, deploymentName, currentBuildID)
Expand Down Expand Up @@ -522,16 +530,14 @@ func (s *TaskQueueStatsVersionSuite) TestRampingAbsorbsUnversionedBacklog_WhenCu
env.OverrideDynamicConfig(dynamicconfig.MatchingLongPollExpirationInterval, 10*time.Second)
env.OverrideDynamicConfig(dynamicconfig.TaskQueueInfoByBuildIdTTL, 1*time.Millisecond) // zero means no TTL

ctx, cancel := context.WithTimeout(s.Context(), 60*time.Second)
defer cancel()

deploymentName := testcore.RandomizeStr("deployment")
tqName := "tq-" + common.GenerateRandomString(5)
rampingBuildID := "v2"

pollCtx, cancelPoll := context.WithCancel(ctx)
s.createVersionsInTaskQueue(pollCtx, env, tqName, deploymentName, rampingBuildID)
cancelPoll() // cancel the pollers so that we can verify the backlog expectations
pollerCtx, cancelPoller := context.WithCancel(s.Context())
s.createVersionsInTaskQueue(pollerCtx, env, tqName, deploymentName, rampingBuildID)
// Stopping the pollers so that we verify the backlog expectations
cancelPoller()

// Set current to unversioned (nil current version).
s.setCurrentVersion(env, deploymentName, "")
Expand Down Expand Up @@ -590,16 +596,15 @@ func (s *TaskQueueStatsVersionSuite) TestInactiveVersionDoesNotAbsorbUnversioned
currentBuildID := "v1"
inactiveBuildID := "v2"

pollCtx, cancelPoll := context.WithCancel(s.Context())

s.createVersionsInTaskQueue(pollCtx, env, tqName, deploymentName, currentBuildID)
s.createVersionsInTaskQueue(pollCtx, env, tqName, deploymentName, inactiveBuildID)
pollerCtx, cancelPoller := context.WithCancel(s.Context())
s.createVersionsInTaskQueue(pollerCtx, env, tqName, deploymentName, currentBuildID)
s.createVersionsInTaskQueue(pollerCtx, env, tqName, deploymentName, inactiveBuildID)

// Set current version
s.setCurrentVersion(env, deploymentName, currentBuildID)

// Stopping the pollers so that we verify the backlog expectations
cancelPoll()
cancelPoller()

// Enqueue unversioned backlog.
unversionedWorkflows := 10 * env.partitionCount
Expand Down Expand Up @@ -759,7 +764,7 @@ func (s *TaskQueueStatsVersionSuite) requireWDVTaskQueueStatsRelaxed(
// Use the existing validateTaskQueueStats with MaxExtraTasks set to numPartitions
// to account for ceiling operations across partitions
expectation.MaxExtraTasks = env.partitionCount
validateTaskQueueStats(s.T(), label, stats, expectation)
validateTaskQueueStats(s.Assertions, label, stats, expectation)

@stephanos stephanos Jun 18, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing s.Assertions keeps this check bound to the s.Await attempt

}

// requireLegacyTaskQueueStatsRelaxed asserts task queue statistics by allowing for over-counting in multi-partition scenarios.
Expand All @@ -780,7 +785,7 @@ func (s *TaskQueueStatsVersionSuite) requireLegacyTaskQueueStatsRelaxed(
// Use the existing validateTaskQueueStats with MaxExtraTasks set to numPartitions
// to account for ceiling operations across partitions
expectation.MaxExtraTasks = env.partitionCount
validateTaskQueueStats(s.T(), label, stats, expectation)
validateTaskQueueStats(s.Assertions, label, stats, expectation)
}

// Publishes versioned and unversioned entities; with one entity per priority (plus default priority). Multiplied by `sets`.
Expand Down Expand Up @@ -1092,10 +1097,8 @@ func (s *taskQueueStatsContext) describeWDVTaskQueueStats(
if err != nil {
return nil, false, err
}
for _, tq := range resp.GetVersionTaskQueues() {
if tq.GetName() == tqName && tq.GetType() == tqType {
return tq.GetStats(), true, nil
}
if tq := s.findVersionTaskQueue(resp.GetVersionTaskQueues(), tqName, tqType); tq != nil {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved this into a helper

return tq.GetStats(), true, nil
}
return nil, false, nil
}
Expand Down Expand Up @@ -1198,6 +1201,8 @@ func (s *TaskQueueStatsVersionSuite) createVersionsInTaskQueue(ctx context.Conte
})
s.NoError(err)
s.NotNil(resp)
s.NotNil(env.findVersionTaskQueue(resp.GetVersionTaskQueues(), tqName, enumspb.TASK_QUEUE_TYPE_WORKFLOW))

@stephanos stephanos Jun 18, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to help make the test less flaky

s.NotNil(env.findVersionTaskQueue(resp.GetVersionTaskQueues(), tqName, enumspb.TASK_QUEUE_TYPE_ACTIVITY))
}, 10*time.Second, 200*time.Millisecond)
}

Expand Down Expand Up @@ -1410,7 +1415,7 @@ func (s *taskQueueStatsContext) validateDescribeTaskQueueWithDefaultMode(
require.EqualValuesf(t, expected, actual, "%s: backlog hint should be %d, got %d", label, expected, actual)
}

validateTaskQueueStats(t, label, resp.Stats, expectation)
validateTaskQueueStats(require.New(t), label, resp.Stats, expectation)
if s.usePriMatcher && expectation.BacklogCount > 0 {
// Per priority stats are only available with the priority matcher and when they've been actively used.
s.validateTaskQueueStatsByPriority(t, label, resp.StatsByPriorityKey, expectation)
Expand Down Expand Up @@ -1471,7 +1476,7 @@ func (s *taskQueueStatsContext) validateDescribeTaskQueueWithEnhancedMode(
return
}

validateTaskQueueStats(t, "DescribeTaskQueue_EnhancedMode["+tqType.String()+"]", info.Stats, expectation)
validateTaskQueueStats(require.New(t), "DescribeTaskQueue_EnhancedMode["+tqType.String()+"]", info.Stats, expectation)
}
}, 5*time.Second, 100*time.Millisecond)
}
Expand Down Expand Up @@ -1507,7 +1512,7 @@ func (s *taskQueueStatsContext) validateDescribeWorkerDeploymentVersion(
for _, info := range resp.VersionTaskQueues {
if info.Name == tqName || info.Type == tqType {
label := "DescribeWorkerDeploymentVersion[" + tqType.String() + "]"
validateTaskQueueStats(t, label, info.Stats, expectation)
validateTaskQueueStats(require.New(t), label, info.Stats, expectation)
if s.usePriMatcher && expectation.BacklogCount > 0 {
// Per priority stats are only available with the priority matcher and when they've been actively used.
s.validateTaskQueueStatsByPriority(t, label, info.StatsByPriorityKey, expectation)
Expand Down Expand Up @@ -1551,7 +1556,7 @@ func (s *taskQueueStatsContext) validateTaskQueueStatsByPriority(
}

require.Containsf(t, stats, i, "%s: stats should contain priority %d", label, i)
validateTaskQueueStats(t, fmt.Sprintf("%s_Pri[%d]", label, i), stats[i], priExpectation)
validateTaskQueueStats(require.New(t), fmt.Sprintf("%s_Pri[%d]", label, i), stats[i], priExpectation)
accBacklogCount += int(stats[i].ApproximateBacklogCount)
}
require.GreaterOrEqualf(t, taskQueueExpectation.BacklogCount, accBacklogCount,
Expand Down Expand Up @@ -1584,23 +1589,23 @@ func validateTaskQueueStatsStrict(
}

func validateTaskQueueStats(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't make this a method without duplicating it as it's used from both suites.

t require.TestingT,
a *require.Assertions,
label string,
stats *taskqueuepb.TaskQueueStats,
expectation taskQueueExpectations,
) {
// Actual counter can be greater than the expected due to history retries. We make sure the counter is in
// range [expected, expected+maxBacklogExtraTasks]
require.GreaterOrEqual(t, stats.ApproximateBacklogCount, int64(expectation.BacklogCount),
a.GreaterOrEqual(stats.ApproximateBacklogCount, int64(expectation.BacklogCount),
"%s: ApproximateBacklogCount should be at least %d, got %d",
label, expectation.BacklogCount, stats.ApproximateBacklogCount)

maxApproximateBacklogCount := int64(expectation.BacklogCount + expectation.MaxExtraTasks)
require.LessOrEqual(t, stats.ApproximateBacklogCount, maxApproximateBacklogCount,
a.LessOrEqual(stats.ApproximateBacklogCount, maxApproximateBacklogCount,
"%s: ApproximateBacklogCount should be at most %d, got %d",
label, maxApproximateBacklogCount, stats.ApproximateBacklogCount)

require.Equal(t, stats.ApproximateBacklogCount == 0, stats.ApproximateBacklogAge.AsDuration() == time.Duration(0),
a.Equal(stats.ApproximateBacklogCount == 0, stats.ApproximateBacklogAge.AsDuration() == time.Duration(0),
"%s: ApproximateBacklogAge should be 0 when ApproximateBacklogCount is 0, got %s",
label, stats.ApproximateBacklogAge.AsDuration())
}
Expand Down
1 change: 1 addition & 0 deletions tests/testcore/dynamic_config_overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var (
dynamicconfig.ReplicationTaskProcessorErrorRetryWait.Key(): time.Millisecond,
dynamicconfig.ClusterMetadataRefreshInterval.Key(): 100 * time.Millisecond,
dynamicconfig.NamespaceCacheRefreshInterval.Key(): NamespaceCacheRefreshInterval,
dynamicconfig.VisibilityPersistenceSlowQueryThreshold.Key(): 60 * time.Second,
dynamicconfig.ReplicationEnableUpdateWithNewTaskMerge.Key(): true,
dynamicconfig.FrontendMaskInternalErrorDetails.Key(): false,
dynamicconfig.HistoryScannerEnabled.Key(): false,
Expand Down
6 changes: 6 additions & 0 deletions tests/testcore/test_cluster_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ func (p *clusterRouter) getShared(t *testing.T) *FunctionalTestBase {
})
}

func (p *clusterRouter) hasSuiteScoped(t *testing.T) bool {
rootName, _, _ := strings.Cut(t.Name(), "/")
_, ok := p.suiteScoped.Load(rootName)
return ok
}

func (p *clusterRouter) getSuiteScoped(t *testing.T) *FunctionalTestBase {
rootName, _, _ := strings.Cut(t.Name(), "/")
if _, ok := p.suiteScoped.Load(rootName); !ok {
Expand Down
40 changes: 30 additions & 10 deletions tests/testcore/test_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,15 @@ type TestEnv struct {

Logger log.Logger

cluster *TestCluster
nsName namespace.Name
nsID namespace.ID
taskPoller *taskpoller.TaskPoller
t *testing.T
tv *testvars.TestVars
ctx context.Context
dedicatedGuard *dedicatedClusterGuard
cluster *TestCluster
nsName namespace.Name
nsID namespace.ID
taskPoller *taskpoller.TaskPoller
t *testing.T
tv *testvars.TestVars
ctx context.Context
dedicatedGuard *dedicatedClusterGuard
allowGlobalHooks bool

sdkClientOnce sync.Once
sdkClient sdkclient.Client
Expand All @@ -95,6 +96,8 @@ type testOptions struct {
dedicatedReason string
disableTestloggerFailure bool
dynamicConfigSettings []dynamicConfigOverride
testHooks []testhooks.Hook
hasGlobalTestHook bool
clusterOptions []TestClusterOption
testVars func(*testvars.TestVars) *testvars.TestVars
}
Expand Down Expand Up @@ -215,6 +218,16 @@ func WithDynamicConfig(setting dynamicconfig.GenericSetting, value any) TestOpti
}
}

// WithTestHook injects a test hook for the duration of the test.
func WithTestHook(hook testhooks.Hook) TestOption {
return func(o *testOptions) {
if hook.Scope() == testhooks.ScopeGlobal {
o.hasGlobalTestHook = true
}
o.testHooks = append(o.testHooks, hook)
}
}

// NewEnv creates a new test environment with access to a Temporal cluster.
func NewEnv(t *testing.T, opts ...TestOption) *TestEnv {
t.Helper()
Expand All @@ -226,6 +239,9 @@ func NewEnv(t *testing.T, opts ...TestOption) *TestEnv {
for _, opt := range opts {
opt(&options)
}
if options.hasGlobalTestHook && !testClusterRouter.hasSuiteScoped(t) {
options.dedicatedCluster = true
}
dedicatedGuard := newDedicatedClusterGuard(options.dedicatedCluster)
if options.dedicatedReason != "" {
dedicatedGuard.record(options.dedicatedReason)
Expand Down Expand Up @@ -279,6 +295,7 @@ func NewEnv(t *testing.T, opts ...TestOption) *TestEnv {
ctx: setupTestTimeoutWithContext(t),
sdkWorkerTQ: RandomizeStr("tq-" + t.Name()),
dedicatedGuard: dedicatedGuard,
allowGlobalHooks: testClusterRouter.hasSuiteScoped(t),
}
t.Cleanup(func() {
if err := env.dedicatedGuard.validate(); err != nil && !t.Failed() {
Expand All @@ -301,6 +318,9 @@ func NewEnv(t *testing.T, opts ...TestOption) *TestEnv {
env.OverrideDynamicConfig(override.setting, override.value)
}
}
for _, hook := range options.testHooks {
env.InjectHook(hook)
}

return env
}
Expand All @@ -318,14 +338,14 @@ func (e *TestEnv) NamespaceID() namespace.ID {
//
// It auto-detects the scope from the hook:
// - For namespace-scoped hooks: scopes it to the test's namespace
// - For global hooks: requires a dedicated cluster (fails early if used on shared cluster)
// - For global hooks: requires a dedicated cluster, except for suite-scoped legacy clusters.
func (e *TestEnv) InjectHook(hook testhooks.Hook) (cleanup func()) {
var scope any
switch hook.Scope() {
case testhooks.ScopeNamespace:
scope = e.nsID
case testhooks.ScopeGlobal:
if e.isShared {
if e.isShared && !e.allowGlobalHooks {
e.t.Fatal("InjectHook: global hooks require a dedicated cluster; use testcore.WithDedicatedCluster()")
}
e.dedicatedGuard.record("global hook injected")
Expand Down
Loading
Loading