-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Fix task queue stats versioning test flake #10773
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: stephanos/migrate-worker_deployment_test
Are you sure you want to change the base?
Changes from all commits
88d58bb
83cde40
866727c
a4d0047
a53b6eb
7110e57
658cbad
5c20a94
4ca925c
b7fe93e
cb703c7
e0730fe
b2df3b3
ae6a5b1
70fb01c
920d5e3
f7d8552
ebf0db4
966b51d
3c8a808
584b0e3
0e39b46
ec2eace
f619d24
00d7090
cc63086
bf8f022
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,8 +49,7 @@ type workflowTasksAndActivitiesPollerParams struct { | |
|
|
||
| // taskQueueStatsContext holds the per-test environment and configuration for task queue stats tests. | ||
| type taskQueueStatsContext struct { | ||
| testcore.Env | ||
| *require.Assertions | ||
| *VersioningTestEnv | ||
| tb testing.TB | ||
| ctx context.Context | ||
| usePriMatcher bool | ||
|
|
@@ -75,18 +74,17 @@ func newTaskQueueStatsContext( | |
| } | ||
| opts = append(opts, behavior.Options()...) | ||
| opts = append(opts, extraOpts...) | ||
| env := testcore.NewEnv(t, opts...) | ||
| env := newVersioningTestEnv(t, opts...) | ||
| behavior.InjectHooks(env) | ||
| return &taskQueueStatsContext{ | ||
| Env: env, | ||
| Assertions: require.New(t), | ||
| tb: t, | ||
| ctx: ctx, | ||
| usePriMatcher: usePriMatcher, | ||
| minPriority: 1, | ||
| maxPriority: 5, | ||
| defaultPriority: 3, | ||
| partitionCount: 2, // kept low to reduce test time on CI | ||
| VersioningTestEnv: env, | ||
| tb: t, | ||
| ctx: ctx, | ||
| usePriMatcher: usePriMatcher, | ||
| minPriority: 1, | ||
| maxPriority: 5, | ||
| defaultPriority: 3, | ||
| partitionCount: 2, // kept low to reduce test time on CI | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -309,25 +307,37 @@ func (s *TaskQueueStatsVersionSuite) TestRampingAndCurrentAbsorbUnversionedBackl | |
| env.OverrideDynamicConfig(dynamicconfig.MatchingLongPollExpirationInterval, 10*time.Second) | ||
| env.OverrideDynamicConfig(dynamicconfig.TaskQueueInfoByBuildIdTTL, 1*time.Millisecond) // zero means no TTL | ||
|
|
||
| ctx, cancel := context.WithTimeout(s.Context(), 120*time.Second) | ||
| defer cancel() | ||
|
|
||
| tqName := "tq-" + common.GenerateRandomString(5) | ||
| deploymentName := testcore.RandomizeStr("deployment") | ||
| currentBuildID := "v1" | ||
| rampingBuildID := "v2" | ||
|
|
||
| pollCtx, cancelPoll := context.WithCancel(ctx) | ||
| s.createVersionsInTaskQueue(pollCtx, env, tqName, deploymentName, currentBuildID) | ||
| s.createVersionsInTaskQueue(pollCtx, env, tqName, deploymentName, rampingBuildID) | ||
| cancelPoll() // cancel the pollers so that we can verify the backlog expectations | ||
| pollerCtx, cancelPoller := context.WithCancel(s.Context()) | ||
| s.createVersionsInTaskQueue(pollerCtx, env, tqName, deploymentName, currentBuildID) | ||
| s.createVersionsInTaskQueue(pollerCtx, env, tqName, deploymentName, rampingBuildID) | ||
| // Stopping the pollers so that we verify the backlog expectations | ||
| cancelPoller() | ||
|
|
||
| // Set ramping version to 30% | ||
| rampPercentage := 30 | ||
| s.setRampingVersion(env, deploymentName, rampingBuildID, rampPercentage) | ||
|
|
||
| // Set current version | ||
| s.setCurrentVersion(env, deploymentName, currentBuildID) | ||
| env.waitForTaskQueueVersioningInfo( | ||
| s.Context(), | ||
| s.T(), | ||
| &taskqueuepb.TaskQueue{Name: tqName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, | ||
| worker_versioning.ExternalWorkerDeploymentVersionToStringV31(&deploymentpb.WorkerDeploymentVersion{ | ||
| DeploymentName: deploymentName, | ||
| BuildId: currentBuildID, | ||
| }), | ||
| worker_versioning.ExternalWorkerDeploymentVersionToStringV31(&deploymentpb.WorkerDeploymentVersion{ | ||
| DeploymentName: deploymentName, | ||
| BuildId: rampingBuildID, | ||
| }), | ||
| float32(rampPercentage), | ||
| ) | ||
|
|
||
| // Enqueue unversioned backlog. | ||
| unversionedWorkflowCount := 10 * env.partitionCount | ||
|
|
@@ -460,16 +470,14 @@ func (s *TaskQueueStatsVersionSuite) TestCurrentAbsorbsUnversionedBacklog_WhenRa | |
| env.OverrideDynamicConfig(dynamicconfig.MatchingLongPollExpirationInterval, 10*time.Second) | ||
| env.OverrideDynamicConfig(dynamicconfig.TaskQueueInfoByBuildIdTTL, 1*time.Millisecond) // zero means no TTL | ||
|
|
||
| ctx, cancel := context.WithTimeout(s.Context(), 60*time.Second) | ||
| defer cancel() | ||
|
|
||
| deploymentName := testcore.RandomizeStr("deployment") | ||
| tqName := "tq-" + common.GenerateRandomString(5) | ||
| currentBuildID := "v1" | ||
|
|
||
| pollCtx, cancelPoll := context.WithCancel(ctx) | ||
| s.createVersionsInTaskQueue(pollCtx, env, tqName, deploymentName, currentBuildID) | ||
| cancelPoll() // cancel the pollers so that we can verify the backlog expectations | ||
| pollerCtx, cancelPoller := context.WithCancel(s.Context()) | ||
| s.createVersionsInTaskQueue(pollerCtx, env, tqName, deploymentName, currentBuildID) | ||
| // Stopping the pollers so that we verify the backlog expectations | ||
| cancelPoller() | ||
|
|
||
| // Set current version. | ||
| s.setCurrentVersion(env, deploymentName, currentBuildID) | ||
|
|
@@ -522,16 +530,14 @@ func (s *TaskQueueStatsVersionSuite) TestRampingAbsorbsUnversionedBacklog_WhenCu | |
| env.OverrideDynamicConfig(dynamicconfig.MatchingLongPollExpirationInterval, 10*time.Second) | ||
| env.OverrideDynamicConfig(dynamicconfig.TaskQueueInfoByBuildIdTTL, 1*time.Millisecond) // zero means no TTL | ||
|
|
||
| ctx, cancel := context.WithTimeout(s.Context(), 60*time.Second) | ||
| defer cancel() | ||
|
|
||
| deploymentName := testcore.RandomizeStr("deployment") | ||
| tqName := "tq-" + common.GenerateRandomString(5) | ||
| rampingBuildID := "v2" | ||
|
|
||
| pollCtx, cancelPoll := context.WithCancel(ctx) | ||
| s.createVersionsInTaskQueue(pollCtx, env, tqName, deploymentName, rampingBuildID) | ||
| cancelPoll() // cancel the pollers so that we can verify the backlog expectations | ||
| pollerCtx, cancelPoller := context.WithCancel(s.Context()) | ||
| s.createVersionsInTaskQueue(pollerCtx, env, tqName, deploymentName, rampingBuildID) | ||
| // Stopping the pollers so that we verify the backlog expectations | ||
| cancelPoller() | ||
|
|
||
| // Set current to unversioned (nil current version). | ||
| s.setCurrentVersion(env, deploymentName, "") | ||
|
|
@@ -590,16 +596,15 @@ func (s *TaskQueueStatsVersionSuite) TestInactiveVersionDoesNotAbsorbUnversioned | |
| currentBuildID := "v1" | ||
| inactiveBuildID := "v2" | ||
|
|
||
| pollCtx, cancelPoll := context.WithCancel(s.Context()) | ||
|
|
||
| s.createVersionsInTaskQueue(pollCtx, env, tqName, deploymentName, currentBuildID) | ||
| s.createVersionsInTaskQueue(pollCtx, env, tqName, deploymentName, inactiveBuildID) | ||
| pollerCtx, cancelPoller := context.WithCancel(s.Context()) | ||
| s.createVersionsInTaskQueue(pollerCtx, env, tqName, deploymentName, currentBuildID) | ||
| s.createVersionsInTaskQueue(pollerCtx, env, tqName, deploymentName, inactiveBuildID) | ||
|
|
||
| // Set current version | ||
| s.setCurrentVersion(env, deploymentName, currentBuildID) | ||
|
|
||
| // Stopping the pollers so that we verify the backlog expectations | ||
| cancelPoll() | ||
| cancelPoller() | ||
|
|
||
| // Enqueue unversioned backlog. | ||
| unversionedWorkflows := 10 * env.partitionCount | ||
|
|
@@ -759,7 +764,7 @@ func (s *TaskQueueStatsVersionSuite) requireWDVTaskQueueStatsRelaxed( | |
| // Use the existing validateTaskQueueStats with MaxExtraTasks set to numPartitions | ||
| // to account for ceiling operations across partitions | ||
| expectation.MaxExtraTasks = env.partitionCount | ||
| validateTaskQueueStats(s.T(), label, stats, expectation) | ||
| validateTaskQueueStats(s.Assertions, label, stats, expectation) | ||
| } | ||
|
|
||
| // requireLegacyTaskQueueStatsRelaxed asserts task queue statistics by allowing for over-counting in multi-partition scenarios. | ||
|
|
@@ -780,7 +785,7 @@ func (s *TaskQueueStatsVersionSuite) requireLegacyTaskQueueStatsRelaxed( | |
| // Use the existing validateTaskQueueStats with MaxExtraTasks set to numPartitions | ||
| // to account for ceiling operations across partitions | ||
| expectation.MaxExtraTasks = env.partitionCount | ||
| validateTaskQueueStats(s.T(), label, stats, expectation) | ||
| validateTaskQueueStats(s.Assertions, label, stats, expectation) | ||
| } | ||
|
|
||
| // Publishes versioned and unversioned entities; with one entity per priority (plus default priority). Multiplied by `sets`. | ||
|
|
@@ -1092,10 +1097,8 @@ func (s *taskQueueStatsContext) describeWDVTaskQueueStats( | |
| if err != nil { | ||
| return nil, false, err | ||
| } | ||
| for _, tq := range resp.GetVersionTaskQueues() { | ||
| if tq.GetName() == tqName && tq.GetType() == tqType { | ||
| return tq.GetStats(), true, nil | ||
| } | ||
| if tq := s.findVersionTaskQueue(resp.GetVersionTaskQueues(), tqName, tqType); tq != nil { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. moved this into a helper |
||
| return tq.GetStats(), true, nil | ||
| } | ||
| return nil, false, nil | ||
| } | ||
|
|
@@ -1198,6 +1201,8 @@ func (s *TaskQueueStatsVersionSuite) createVersionsInTaskQueue(ctx context.Conte | |
| }) | ||
| s.NoError(err) | ||
| s.NotNil(resp) | ||
| s.NotNil(env.findVersionTaskQueue(resp.GetVersionTaskQueues(), tqName, enumspb.TASK_QUEUE_TYPE_WORKFLOW)) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems to help make the test less flaky |
||
| s.NotNil(env.findVersionTaskQueue(resp.GetVersionTaskQueues(), tqName, enumspb.TASK_QUEUE_TYPE_ACTIVITY)) | ||
| }, 10*time.Second, 200*time.Millisecond) | ||
| } | ||
|
|
||
|
|
@@ -1410,7 +1415,7 @@ func (s *taskQueueStatsContext) validateDescribeTaskQueueWithDefaultMode( | |
| require.EqualValuesf(t, expected, actual, "%s: backlog hint should be %d, got %d", label, expected, actual) | ||
| } | ||
|
|
||
| validateTaskQueueStats(t, label, resp.Stats, expectation) | ||
| validateTaskQueueStats(require.New(t), label, resp.Stats, expectation) | ||
| if s.usePriMatcher && expectation.BacklogCount > 0 { | ||
| // Per priority stats are only available with the priority matcher and when they've been actively used. | ||
| s.validateTaskQueueStatsByPriority(t, label, resp.StatsByPriorityKey, expectation) | ||
|
|
@@ -1471,7 +1476,7 @@ func (s *taskQueueStatsContext) validateDescribeTaskQueueWithEnhancedMode( | |
| return | ||
| } | ||
|
|
||
| validateTaskQueueStats(t, "DescribeTaskQueue_EnhancedMode["+tqType.String()+"]", info.Stats, expectation) | ||
| validateTaskQueueStats(require.New(t), "DescribeTaskQueue_EnhancedMode["+tqType.String()+"]", info.Stats, expectation) | ||
| } | ||
| }, 5*time.Second, 100*time.Millisecond) | ||
| } | ||
|
|
@@ -1507,7 +1512,7 @@ func (s *taskQueueStatsContext) validateDescribeWorkerDeploymentVersion( | |
| for _, info := range resp.VersionTaskQueues { | ||
| if info.Name == tqName || info.Type == tqType { | ||
| label := "DescribeWorkerDeploymentVersion[" + tqType.String() + "]" | ||
| validateTaskQueueStats(t, label, info.Stats, expectation) | ||
| validateTaskQueueStats(require.New(t), label, info.Stats, expectation) | ||
| if s.usePriMatcher && expectation.BacklogCount > 0 { | ||
| // Per priority stats are only available with the priority matcher and when they've been actively used. | ||
| s.validateTaskQueueStatsByPriority(t, label, info.StatsByPriorityKey, expectation) | ||
|
|
@@ -1551,7 +1556,7 @@ func (s *taskQueueStatsContext) validateTaskQueueStatsByPriority( | |
| } | ||
|
|
||
| require.Containsf(t, stats, i, "%s: stats should contain priority %d", label, i) | ||
| validateTaskQueueStats(t, fmt.Sprintf("%s_Pri[%d]", label, i), stats[i], priExpectation) | ||
| validateTaskQueueStats(require.New(t), fmt.Sprintf("%s_Pri[%d]", label, i), stats[i], priExpectation) | ||
| accBacklogCount += int(stats[i].ApproximateBacklogCount) | ||
| } | ||
| require.GreaterOrEqualf(t, taskQueueExpectation.BacklogCount, accBacklogCount, | ||
|
|
@@ -1584,23 +1589,23 @@ func validateTaskQueueStatsStrict( | |
| } | ||
|
|
||
| func validateTaskQueueStats( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can't make this a method without duplicating it as it's used from both suites. |
||
| t require.TestingT, | ||
| a *require.Assertions, | ||
| label string, | ||
| stats *taskqueuepb.TaskQueueStats, | ||
| expectation taskQueueExpectations, | ||
| ) { | ||
| // Actual counter can be greater than the expected due to history retries. We make sure the counter is in | ||
| // range [expected, expected+maxBacklogExtraTasks] | ||
| require.GreaterOrEqual(t, stats.ApproximateBacklogCount, int64(expectation.BacklogCount), | ||
| a.GreaterOrEqual(stats.ApproximateBacklogCount, int64(expectation.BacklogCount), | ||
| "%s: ApproximateBacklogCount should be at least %d, got %d", | ||
| label, expectation.BacklogCount, stats.ApproximateBacklogCount) | ||
|
|
||
| maxApproximateBacklogCount := int64(expectation.BacklogCount + expectation.MaxExtraTasks) | ||
| require.LessOrEqual(t, stats.ApproximateBacklogCount, maxApproximateBacklogCount, | ||
| a.LessOrEqual(stats.ApproximateBacklogCount, maxApproximateBacklogCount, | ||
| "%s: ApproximateBacklogCount should be at most %d, got %d", | ||
| label, maxApproximateBacklogCount, stats.ApproximateBacklogCount) | ||
|
|
||
| require.Equal(t, stats.ApproximateBacklogCount == 0, stats.ApproximateBacklogAge.AsDuration() == time.Duration(0), | ||
| a.Equal(stats.ApproximateBacklogCount == 0, stats.ApproximateBacklogAge.AsDuration() == time.Duration(0), | ||
| "%s: ApproximateBacklogAge should be 0 when ApproximateBacklogCount is 0, got %s", | ||
| label, stats.ApproximateBacklogAge.AsDuration()) | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Passing
s.Assertionskeeps this check bound to thes.Awaitattempt