diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index 82b3a225296..440d37774a9 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "strings" - "sync" "sync/atomic" "testing" "time" @@ -143,10 +142,10 @@ func (s *Versioning3Suite) TestPinnedTask_NoProperPoller() { tv := testvars.New(s) tv2 := tv.WithBuildIDNumber(2) - go s.idlePollWorkflow(tv2, true, ver3MinPollTime, "second deployment should not receive pinned task") + go s.idlePollWorkflow(context.Background(), tv2, true, ver3MinPollTime, "second deployment should not receive pinned task") s.startWorkflow(tv, tv.VersioningOverridePinned(s.useV32)) - s.idlePollWorkflow(tv, false, ver3MinPollTime, "unversioned worker should not receive pinned task") + s.idlePollWorkflow(context.Background(), tv, false, ver3MinPollTime, "unversioned worker should not receive pinned task") // Sleeping to let the pollers arrive to server before ending the test. time.Sleep(200 * time.Millisecond) //nolint:forbidigo @@ -157,7 +156,7 @@ func (s *Versioning3Suite) TestUnpinnedTask_NonCurrentDeployment() { s.RunTestWithMatchingBehavior( func() { tv := testvars.New(s) - go s.idlePollWorkflow(tv, true, ver3MinPollTime, "non-current versioned poller should not receive unpinned task") + go s.idlePollWorkflow(context.Background(), tv, true, ver3MinPollTime, "non-current versioned poller should not receive unpinned task") s.startWorkflow(tv, nil) @@ -195,6 +194,7 @@ func (s *Versioning3Suite) TestUnpinnedTask_OldDeployment() { s.startWorkflow(tv, nil) s.idlePollWorkflow( + context.Background(), tvOldDeployment, true, ver3MinPollTime, @@ -218,6 +218,7 @@ func (s *Versioning3Suite) TestUnpinnedTask_OldDeployment() { s.startWorkflow(tv, nil) s.idlePollWorkflow( + context.Background(), tvOldDeployment, true, ver3MinPollTime, @@ -346,7 +347,7 @@ func (s *Versioning3Suite) testPinnedQuery_DrainedVersion(pollersPresent bool, r // create version v1 and make it current idlePollerDone := make(chan struct{}) go func() { - s.idlePollWorkflow(tv, true, ver3MinPollTime, "should not have gotten any tasks since there are none") + s.idlePollWorkflow(context.Background(), tv, true, ver3MinPollTime, "should not have gotten any tasks since there are none") close(idlePollerDone) }() s.setCurrentDeployment(tv) @@ -367,7 +368,7 @@ func (s *Versioning3Suite) testPinnedQuery_DrainedVersion(pollersPresent bool, r idlePollerDone = make(chan struct{}) tv2 := tv.WithBuildIDNumber(2) go func() { - s.idlePollWorkflow(tv2, true, ver3MinPollTime, "should not have gotten any tasks since there are none") + s.idlePollWorkflow(context.Background(), tv2, true, ver3MinPollTime, "should not have gotten any tasks since there are none") close(idlePollerDone) }() s.setCurrentDeployment(tv2) @@ -499,7 +500,7 @@ func (s *Versioning3Suite) testUnpinnedQuery(sticky bool) { pollerDone := make(chan struct{}) go func() { - s.idlePollWorkflow(tv2, true, 5*time.Second, "new deployment should not receive query") + s.idlePollWorkflow(context.Background(), tv2, true, 5*time.Second, "new deployment should not receive query") close(pollerDone) }() s.pollAndQueryWorkflow(tv, sticky) @@ -508,7 +509,7 @@ func (s *Versioning3Suite) testUnpinnedQuery(sticky bool) { s.setCurrentDeployment(tv2) s.waitForDeploymentDataPropagation(tv2, versionStatusCurrent, false, tqTypeWf) - go s.idlePollWorkflow(tv, true, ver3MinPollTime, "old deployment should not receive query") + go s.idlePollWorkflow(context.Background(), tv, true, ver3MinPollTime, "old deployment should not receive query") // Since the current deployment has changed, task will move to the normal queue (thus, sticky=false) s.pollAndQueryWorkflow(tv2, false) } @@ -680,7 +681,7 @@ func (s *Versioning3Suite) TestUnpinnedWorkflow_SuccessfulUpdate_TransitionsToNe // Register the new version and set it to current tv2 := tv1.WithBuildIDNumber(2) - s.idlePollWorkflow(tv2, true, ver3MinPollTime, "should not have gotten any tasks since there are none") + s.idlePollWorkflow(context.Background(), tv2, true, ver3MinPollTime, "should not have gotten any tasks since there are none") s.setCurrentDeployment(tv2) // Send update @@ -765,7 +766,7 @@ func (s *Versioning3Suite) TestUnpinnedWorkflow_FailedUpdate_DoesNotTransitionTo // Register the new version and set it to current tv2 := tv1.WithBuildIDNumber(2) - s.idlePollWorkflow(tv2, true, ver3MinPollTime, "should not have gotten any tasks since there are none") + s.idlePollWorkflow(context.Background(), tv2, true, ver3MinPollTime, "should not have gotten any tasks since there are none") s.setCurrentDeployment(tv2) @@ -3355,6 +3356,7 @@ func (s *Versioning3Suite) doPollActivityAndHandle( } func (s *Versioning3Suite) idlePollWorkflow( + ctx context.Context, tv *testvars.TestVars, versioned bool, timeout time.Duration, @@ -3375,6 +3377,7 @@ func (s *Versioning3Suite) idlePollWorkflow( return nil, nil }, taskpoller.WithTimeout(timeout), + taskpoller.WithContext(ctx), ) } @@ -3652,19 +3655,9 @@ func (s *Versioning3Suite) verifyVersioningSAs( } func (s *Versioning3Suite) TestAutoUpgradeWorkflows_NoBouncingBetweenVersions() { - s.T().Skip("Skipping this test for now as there seems to be a flake. Shall fix it in the upcoming PR.") - /* - - Test plan: - - Use only one read and write partition. - - Update the userData by setting current version to v0. - - Start 10 workflows on v0. MS reads v0 for these workflows. - - Update the userData by setting the current version to v1. - - Complete workflow task on v1. MS reads v1 for these workflows now. - - *Rollback* userData to v0 on that single partition (call lower level API) - - See if any workflow task goes back to original v0 poller (should not) - - */ + if !s.useRevisionNumbers { + s.T().Skip("This test is only supported on revision number mechanics") + } s.OverrideDynamicConfig(dynamicconfig.MatchingNumTaskqueueReadPartitions, 1) s.OverrideDynamicConfig(dynamicconfig.MatchingNumTaskqueueWritePartitions, 1) @@ -3672,182 +3665,54 @@ func (s *Versioning3Suite) TestAutoUpgradeWorkflows_NoBouncingBetweenVersions() tv0 := testvars.New(s).WithBuildIDNumber(0) tv1 := tv0.WithBuildIDNumber(1) - // Update the userData by setting the current version to v0 - s.updateTaskQueueDeploymentDataWithRoutingConfig(tv0, &deploymentpb.RoutingConfig{ - CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv0.DeploymentVersionString()), - CurrentVersionChangedTime: timestamp.TimePtr(time.Now()), - RevisionNumber: 1, - }, map[string]*deploymentspb.WorkerDeploymentVersionData{tv0.DeploymentVersion().GetBuildId(): &deploymentspb.WorkerDeploymentVersionData{ - Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT, - }}, []string{}, tqTypeWf, tqTypeAct) - - // Wait until all task queue partitions know that v0 is current. - s.waitForDeploymentDataPropagation(tv0, versionStatusCurrent, false, tqTypeWf, tqTypeAct) - - // Make numWorkflows different workflow ID's so that we can verify that all workflows are running on v0. - numWorkflows := 10 - wfVarsV0 := make([]*testvars.TestVars, numWorkflows) - wfVarsV1 := make([]*testvars.TestVars, numWorkflows) - - for i := 0; i < numWorkflows; i++ { - wfVarsV0[i] = tv0.WithWorkflowIDNumber(i) - wfVarsV1[i] = tv1.WithWorkflowIDNumber(i) - } - - // Start all different workflows on version v0. - for i := 0; i < numWorkflows; i++ { - s.startWorkflow(wfVarsV0[i], nil) - } - - // Poll for workflows on v0. - channels := make([]chan struct{}, numWorkflows) - for i := 0; i < numWorkflows; i++ { - channels[i] = make(chan struct{}) - s.pollWftAndHandle(wfVarsV0[i], false, channels[i], - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - s.NotNil(task) - resp := respondEmptyWft(wfVarsV0[i], false, vbUnpinned) - resp.ForceCreateNewWorkflowTask = true - return resp, nil - }) - } - // Wait for channels to be closed - for i := 0; i < numWorkflows; i++ { - <-channels[i] + wf := func(ctx workflow.Context) (string, error) { + workflow.GetSignalChannel(ctx, "afterRollback").Receive(ctx, nil) + return "v1", nil } - // Verify that all workflows are running on v0. - for i := 0; i < numWorkflows; i++ { - s.EventuallyWithT(func(t *assert.CollectT) { - s.verifyWorkflowVersioning(wfVarsV0[i], vbUnpinned, tv0.Deployment(), nil, nil) - }, 10*time.Second, 100*time.Millisecond) - } + // v1 workers + w1 := worker.New(s.SdkClient(), tv1.TaskQueue().GetName(), worker.Options{ + DeploymentOptions: worker.DeploymentOptions{Version: tv1.SDKDeploymentVersion(), UseVersioning: true, DefaultVersioningBehavior: workflow.VersioningBehaviorAutoUpgrade}, + }) + w1.RegisterWorkflowWithOptions(wf, workflow.RegisterOptions{Name: "wf", VersioningBehavior: workflow.VersioningBehaviorAutoUpgrade}) + s.NoError(w1.Start()) + defer w1.Stop() - // Update the userData by setting the current version to v1. - s.updateTaskQueueDeploymentDataWithRoutingConfig(tv1, &deploymentpb.RoutingConfig{ - CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv1.DeploymentVersionString()), - CurrentVersionChangedTime: timestamp.TimePtr(time.Now()), - RevisionNumber: 2, - }, map[string]*deploymentspb.WorkerDeploymentVersionData{tv1.DeploymentVersion().GetBuildId(): &deploymentspb.WorkerDeploymentVersionData{ - Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT, - }}, []string{}, tqTypeWf, tqTypeAct) + // Set v1 to be the current version + s.setCurrentDeployment(tv1) - // Wait until all task queue partitions know that v1 is current. - // s.waitForDeploymentDataPropagation(tv1, versionStatusCurrent, false, tqTypeWf, tqTypeAct) + // Start a workflow on v1 + run, err := s.SdkClient().ExecuteWorkflow(context.Background(), sdkclient.StartWorkflowOptions{ + ID: tv1.WorkflowID(), + TaskQueue: tv1.TaskQueue().GetName(), + }, "wf") + s.NoError(err) - // Poll for workflows but this time the workflow task should be acted upon by a v1 worker. - channels = make([]chan struct{}, numWorkflows) - for i := 0; i < numWorkflows; i++ { - channels[i] = make(chan struct{}) - s.pollWftAndHandle(wfVarsV1[i], false, channels[i], - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - s.NotNil(task) - resp := respondEmptyWft(wfVarsV1[i], false, vbUnpinned) - resp.ForceCreateNewWorkflowTask = true - return resp, nil - }) - } - // Wait for channels to be closed - for i := 0; i < numWorkflows; i++ { - <-channels[i] - } - // Verify that all workflows are running on v1. - for i := 0; i < numWorkflows; i++ { - s.EventuallyWithT(func(t *assert.CollectT) { - s.verifyWorkflowVersioning(wfVarsV1[i], vbUnpinned, tv1.Deployment(), nil, nil) - }, 10*time.Second, 100*time.Millisecond) - } + // Verify that the workflow is running on v1 + s.EventuallyWithT(func(t *assert.CollectT) { + s.verifyWorkflowVersioning(tv1, vbUnpinned, tv1.Deployment(), nil, nil) + }, 10*time.Second, 100*time.Millisecond) - // Rollback the userData to v0. Using the lower API here. + // Start v0 workers to ensure they never receive a task + idlePollerCtx, idlePollerCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer idlePollerCancel() + go s.idlePollWorkflow(idlePollerCtx, tv0, true, ver3MinPollTime, "workflows should not go to the old deployment") - currentData, err := s.GetTestCluster().MatchingClient().GetTaskQueueUserData(context.Background(), &matchingservice.GetTaskQueueUserDataRequest{ - NamespaceId: s.NamespaceID().String(), - TaskQueue: tv0.TaskQueue().GetName(), - TaskQueueType: tqTypeWf, - }) - s.NoError(err) + // Rollback the userData to v0 to simulate routing config lag within a single partition. + s.rollbackTaskQueueToVersion(tv0) - _, err = s.GetTestCluster().MatchingClient().UpdateTaskQueueUserData(context.Background(), &matchingservice.UpdateTaskQueueUserDataRequest{ - NamespaceId: s.NamespaceID().String(), - TaskQueue: tv0.TaskQueue().GetName(), - UserData: &persistencespb.VersionedTaskQueueUserData{ - Data: &persistencespb.TaskQueueUserData{ - PerType: map[int32]*persistencespb.TaskQueueTypeUserData{ - int32(tqTypeWf): { - DeploymentData: &persistencespb.DeploymentData{ - DeploymentsData: map[string]*persistencespb.WorkerDeploymentData{ - tv0.DeploymentVersion().GetDeploymentName(): { - RoutingConfig: &deploymentpb.RoutingConfig{ - CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv0.DeploymentVersionString()), - CurrentVersionChangedTime: timestamp.TimePtr(time.Now().Add(-time.Second)), - RevisionNumber: 0, - }, - Versions: map[string]*deploymentspb.WorkerDeploymentVersionData{ - tv0.DeploymentVersion().GetBuildId(): { - Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT, - }, - }, - }, - }, - }, - }, - int32(tqTypeAct): { - DeploymentData: &persistencespb.DeploymentData{ - DeploymentsData: map[string]*persistencespb.WorkerDeploymentData{ - tv0.DeploymentVersion().GetDeploymentName(): { - RoutingConfig: &deploymentpb.RoutingConfig{ - CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv0.DeploymentVersionString()), - CurrentVersionChangedTime: timestamp.TimePtr(time.Now().Add(-time.Second)), - RevisionNumber: 0, - }, - Versions: map[string]*deploymentspb.WorkerDeploymentVersionData{ - tv0.DeploymentVersion().GetBuildId(): { - Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT, - }, - }, - }, - }, - }, - }, - }, - }, - Version: currentData.GetUserData().GetVersion(), - }, - }) - s.NoError(err) - // Even though the userData is rolled back to v0, the workflows should only continue to run on v1. - // Start idle pollers on v0 and ensure they never receive a task. - for i := 0; i < numWorkflows; i++ { - //nolint:testifylint - go s.idlePollWorkflow(wfVarsV0[i], true, ver3MinPollTime, "workflows should not go to the old deployment") - } - - // Complete all workflows on v1. - channels = make([]chan struct{}, numWorkflows) - for i := 0; i < numWorkflows; i++ { - channels[i] = make(chan struct{}) - s.pollWftAndHandle(wfVarsV1[i], false, channels[i], - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - s.NotNil(task) - return respondCompleteWorkflow(wfVarsV1[i], vbUnpinned), nil - }) - } - // Wait for channels to be closed - for i := 0; i < numWorkflows; i++ { - <-channels[i] - } + // Unblock the workflow to let it continue + s.NoError(s.SdkClient().SignalWorkflow(context.Background(), tv1.WorkflowID(), "", "afterRollback", nil)) - // Verify that all workflows are completed on v1. - for i := 0; i < numWorkflows; i++ { - s.EventuallyWithT(func(t *assert.CollectT) { - s.verifyWorkflowVersioning(wfVarsV1[i], vbUnpinned, tv1.Deployment(), nil, nil) - }, 60*time.Second, 100*time.Millisecond) - } + // Verify that the workflow completed successfully on v1 + var result string + s.NoError(run.Get(context.Background(), &result)) + s.Equal("v1", result) } func (s *Versioning3Suite) TestWorkflowTQLags_DependentActivityStartsTransition() { - if !s.useNewDeploymentData { - s.T().Skip("This test is only supported on new deployment data") + if !s.useRevisionNumbers { + s.T().Skip("This test is only supported on revision number mechanics") } /* The aim of this test is to show the following does not occur when using revisionNumber mechanics: @@ -3938,8 +3803,8 @@ func (s *Versioning3Suite) TestWorkflowTQLags_DependentActivityStartsTransition( } func (s *Versioning3Suite) TestActivityTQLags_DependentActivityCompletesOnTheNewVersion() { - if !s.useNewDeploymentData { - s.T().Skip("This test is only supported on new deployment data") + if !s.useRevisionNumbers { + s.T().Skip("This test is only supported on revision number mechanics") } /* The aim of this test is to show the following does not occur when using revisionNumber mechanics: @@ -4197,10 +4062,9 @@ func (s *Versioning3Suite) TestChildStartsWithParentRevision_SameTQ_TQLags() { s.rollbackTaskQueueToVersion(tv0Child) //nolint:testifylint - var wg sync.WaitGroup - wg.Go(func() { - s.idlePollWorkflow(tv0Child, true, 10*time.Second, "workflow should not go to the old deployment") - }) + idlePollerCtx, idlePollerCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer idlePollerCancel() + go s.idlePollWorkflow(idlePollerCtx, tv0Child, true, 10*time.Second, "workflow should not go to the old deployment") // Unblock parent to start the child s.NoError(s.SdkClient().SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "startChild", nil)) @@ -4229,7 +4093,6 @@ func (s *Versioning3Suite) TestChildStartsWithParentRevision_SameTQ_TQLags() { s.NoError(run.Get(ctx, &result)) s.Equal("v1", result) - wg.Wait() } // TestChildStartsWithNoInheritedAutoUpgradeInfo_CrossTQ demonstrates that a child workflow of an AutoUpgrade parent, not sharing @@ -4375,10 +4238,9 @@ func (s *Versioning3Suite) TestContinueAsNewOfAutoUpgradeWorkflow_RevisionNumber tv0 := tv1.WithBuildIDNumber(0) s.rollbackTaskQueueToVersion(tv0) - var wg sync.WaitGroup - wg.Go(func() { - s.idlePollWorkflow(tv0, true, 10*time.Second, "workflow should not go to the old deployment") - }) + idlePollerCtx, idlePollerCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer idlePollerCancel() + go s.idlePollWorkflow(idlePollerCtx, tv0, true, 10*time.Second, "workflow should not go to the old deployment") // Signal the workflow to trigger CAN s.NoError(s.SdkClient().SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "triggerCAN", nil)) @@ -4413,7 +4275,6 @@ func (s *Versioning3Suite) TestContinueAsNewOfAutoUpgradeWorkflow_RevisionNumber s.NoError(run.Get(ctx, &result)) s.Equal("v1", result) - wg.Wait() } // Verifies that a retry run starts on the same version the first run executed on, @@ -4611,10 +4472,9 @@ func (s *Versioning3Suite) testRetryNoBounceBack(testContinueAsNew bool, testChi s.rollbackTaskQueueToVersion(tv0) // Start v0 pollers and ensure they don't receive a task - var wg sync.WaitGroup - wg.Go(func() { - s.idlePollWorkflow(tv0, true, 10*time.Second, "v0 poller should not receive a task") - }) + idlePollerCtx, idlePollerCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer idlePollerCancel() + go s.idlePollWorkflow(idlePollerCtx, tv0, true, 10*time.Second, "v0 poller should not receive a task") // Verify that the rollback propagated to all partitions s.Eventually(func() bool { @@ -4657,7 +4517,6 @@ func (s *Versioning3Suite) testRetryNoBounceBack(testContinueAsNew bool, testChi return false }, 10*time.Second, 100*time.Millisecond) - wg.Wait() } func (s *Versioning3Suite) TestWorkflowRetry_AutoUpgrade_NoBounceBack() {