Skip to content
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 62 additions & 186 deletions tests/versioning_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ func (s *Versioning3Suite) TestPinnedTask_NoProperPoller() {
tv := testvars.New(s)

tv2 := tv.WithBuildIDNumber(2)
go s.idlePollWorkflow(tv2, true, ver3MinPollTime, "second deployment should not receive pinned task")
go s.idlePollWorkflow(context.Background(), tv2, true, ver3MinPollTime, "second deployment should not receive pinned task")

s.startWorkflow(tv, tv.VersioningOverridePinned(s.useV32))
s.idlePollWorkflow(tv, false, ver3MinPollTime, "unversioned worker should not receive pinned task")
s.idlePollWorkflow(context.Background(), tv, false, ver3MinPollTime, "unversioned worker should not receive pinned task")

// Sleeping to let the pollers arrive to server before ending the test.
time.Sleep(200 * time.Millisecond) //nolint:forbidigo
Expand All @@ -157,7 +157,7 @@ func (s *Versioning3Suite) TestUnpinnedTask_NonCurrentDeployment() {
s.RunTestWithMatchingBehavior(
func() {
tv := testvars.New(s)
go s.idlePollWorkflow(tv, true, ver3MinPollTime, "non-current versioned poller should not receive unpinned task")
go s.idlePollWorkflow(context.Background(), tv, true, ver3MinPollTime, "non-current versioned poller should not receive unpinned task")

s.startWorkflow(tv, nil)

Expand Down Expand Up @@ -195,6 +195,7 @@ func (s *Versioning3Suite) TestUnpinnedTask_OldDeployment() {
s.startWorkflow(tv, nil)

s.idlePollWorkflow(
context.Background(),
tvOldDeployment,
true,
ver3MinPollTime,
Expand All @@ -218,6 +219,7 @@ func (s *Versioning3Suite) TestUnpinnedTask_OldDeployment() {
s.startWorkflow(tv, nil)

s.idlePollWorkflow(
context.Background(),
tvOldDeployment,
true,
ver3MinPollTime,
Expand Down Expand Up @@ -346,7 +348,7 @@ func (s *Versioning3Suite) testPinnedQuery_DrainedVersion(pollersPresent bool, r
// create version v1 and make it current
idlePollerDone := make(chan struct{})
go func() {
s.idlePollWorkflow(tv, true, ver3MinPollTime, "should not have gotten any tasks since there are none")
s.idlePollWorkflow(context.Background(), tv, true, ver3MinPollTime, "should not have gotten any tasks since there are none")
close(idlePollerDone)
}()
s.setCurrentDeployment(tv)
Expand All @@ -367,7 +369,7 @@ func (s *Versioning3Suite) testPinnedQuery_DrainedVersion(pollersPresent bool, r
idlePollerDone = make(chan struct{})
tv2 := tv.WithBuildIDNumber(2)
go func() {
s.idlePollWorkflow(tv2, true, ver3MinPollTime, "should not have gotten any tasks since there are none")
s.idlePollWorkflow(context.Background(), tv2, true, ver3MinPollTime, "should not have gotten any tasks since there are none")
close(idlePollerDone)
}()
s.setCurrentDeployment(tv2)
Expand Down Expand Up @@ -499,7 +501,7 @@ func (s *Versioning3Suite) testUnpinnedQuery(sticky bool) {

pollerDone := make(chan struct{})
go func() {
s.idlePollWorkflow(tv2, true, 5*time.Second, "new deployment should not receive query")
s.idlePollWorkflow(context.Background(), tv2, true, 5*time.Second, "new deployment should not receive query")
close(pollerDone)
}()
s.pollAndQueryWorkflow(tv, sticky)
Expand All @@ -508,7 +510,7 @@ func (s *Versioning3Suite) testUnpinnedQuery(sticky bool) {
s.setCurrentDeployment(tv2)
s.waitForDeploymentDataPropagation(tv2, versionStatusCurrent, false, tqTypeWf)

go s.idlePollWorkflow(tv, true, ver3MinPollTime, "old deployment should not receive query")
go s.idlePollWorkflow(context.Background(), tv, true, ver3MinPollTime, "old deployment should not receive query")
// Since the current deployment has changed, task will move to the normal queue (thus, sticky=false)
s.pollAndQueryWorkflow(tv2, false)
}
Expand Down Expand Up @@ -680,7 +682,7 @@ func (s *Versioning3Suite) TestUnpinnedWorkflow_SuccessfulUpdate_TransitionsToNe

// Register the new version and set it to current
tv2 := tv1.WithBuildIDNumber(2)
s.idlePollWorkflow(tv2, true, ver3MinPollTime, "should not have gotten any tasks since there are none")
s.idlePollWorkflow(context.Background(), tv2, true, ver3MinPollTime, "should not have gotten any tasks since there are none")
s.setCurrentDeployment(tv2)

// Send update
Expand Down Expand Up @@ -765,7 +767,7 @@ func (s *Versioning3Suite) TestUnpinnedWorkflow_FailedUpdate_DoesNotTransitionTo

// Register the new version and set it to current
tv2 := tv1.WithBuildIDNumber(2)
s.idlePollWorkflow(tv2, true, ver3MinPollTime, "should not have gotten any tasks since there are none")
s.idlePollWorkflow(context.Background(), tv2, true, ver3MinPollTime, "should not have gotten any tasks since there are none")

s.setCurrentDeployment(tv2)

Expand Down Expand Up @@ -3355,6 +3357,7 @@ func (s *Versioning3Suite) doPollActivityAndHandle(
}

func (s *Versioning3Suite) idlePollWorkflow(
ctx context.Context,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function shall now take in a context. The reason this was done is as follows:

There are some tests that kickstart a v0 poller (v0 is just an example) for the sole purpose of checking if a task were to ever go this worker. These tests initialize them in a waitGroup and then towards the end of the test, just before it gets cleaned up, there is a wg.Wait() present which ensures that this poller completes running before the test can be terminated safely.

However, this does mean that the minimum wait time for the test will never be lesser than the minimum poll time of this v0 poller. This makes our tests long to run. Thus, passing this context and cancelling it is a safe and efficient way to terminate the test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

tv *testvars.TestVars,
versioned bool,
timeout time.Duration,
Expand All @@ -3375,6 +3378,7 @@ func (s *Versioning3Suite) idlePollWorkflow(
return nil, nil
},
taskpoller.WithTimeout(timeout),
taskpoller.WithContext(ctx),
)
}

Expand Down Expand Up @@ -3651,18 +3655,18 @@ func (s *Versioning3Suite) verifyVersioningSAs(
}, 5*time.Second, 50*time.Millisecond)
}

func (s *Versioning3Suite) TestAutoUpgradeWorkflows_NoBouncingBetweenVersions() {
s.T().Skip("Skipping this test for now as there seems to be a flake. Shall fix it in the upcoming PR.")
func (s *Versioning3Suite) TestAutoUpgradeWorkflows_NoBouncingBetweenVersions_v2() {
// if !s.useRevisionNumbers {
// s.T().Skip("This test is only supported on revision number mechanics")
// }
/*

Test plan:
- Use only one read and write partition.
- Update the userData by setting current version to v0.
- Start 10 workflows on v0. MS reads v0 for these workflows.
- Update the userData by setting the current version to v1.
- Complete workflow task on v1. MS reads v1 for these workflows now.
- *Rollback* userData to v0 on that single partition (call lower level API)
- See if any workflow task goes back to original v0 poller (should not)
- Set v1 to be the current version.
- Start a workflow on v1 and then make it block on a signal.
- While blocked, rollback the userData to v0.
- Unblock the workflow and verify that it completes on v1.

*/

Expand All @@ -3672,182 +3676,54 @@ func (s *Versioning3Suite) TestAutoUpgradeWorkflows_NoBouncingBetweenVersions()
tv0 := testvars.New(s).WithBuildIDNumber(0)
tv1 := tv0.WithBuildIDNumber(1)

// Update the userData by setting the current version to v0
s.updateTaskQueueDeploymentDataWithRoutingConfig(tv0, &deploymentpb.RoutingConfig{
CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv0.DeploymentVersionString()),
CurrentVersionChangedTime: timestamp.TimePtr(time.Now()),
RevisionNumber: 1,
}, map[string]*deploymentspb.WorkerDeploymentVersionData{tv0.DeploymentVersion().GetBuildId(): &deploymentspb.WorkerDeploymentVersionData{
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT,
}}, []string{}, tqTypeWf, tqTypeAct)

// Wait until all task queue partitions know that v0 is current.
s.waitForDeploymentDataPropagation(tv0, versionStatusCurrent, false, tqTypeWf, tqTypeAct)

// Make numWorkflows different workflow ID's so that we can verify that all workflows are running on v0.
numWorkflows := 10
wfVarsV0 := make([]*testvars.TestVars, numWorkflows)
wfVarsV1 := make([]*testvars.TestVars, numWorkflows)

for i := 0; i < numWorkflows; i++ {
wfVarsV0[i] = tv0.WithWorkflowIDNumber(i)
wfVarsV1[i] = tv1.WithWorkflowIDNumber(i)
}

// Start all different workflows on version v0.
for i := 0; i < numWorkflows; i++ {
s.startWorkflow(wfVarsV0[i], nil)
}

// Poll for workflows on v0.
channels := make([]chan struct{}, numWorkflows)
for i := 0; i < numWorkflows; i++ {
channels[i] = make(chan struct{})
s.pollWftAndHandle(wfVarsV0[i], false, channels[i],
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
s.NotNil(task)
resp := respondEmptyWft(wfVarsV0[i], false, vbUnpinned)
resp.ForceCreateNewWorkflowTask = true
return resp, nil
})
}
// Wait for channels to be closed
for i := 0; i < numWorkflows; i++ {
<-channels[i]
wf := func(ctx workflow.Context) (string, error) {
workflow.GetSignalChannel(ctx, "afterRollback").Receive(ctx, nil)
return "v1", nil
}

// Verify that all workflows are running on v0.
for i := 0; i < numWorkflows; i++ {
s.EventuallyWithT(func(t *assert.CollectT) {
s.verifyWorkflowVersioning(wfVarsV0[i], vbUnpinned, tv0.Deployment(), nil, nil)
}, 10*time.Second, 100*time.Millisecond)
}
// Define the v1 workers
w1 := worker.New(s.SdkClient(), tv1.TaskQueue().GetName(), worker.Options{
DeploymentOptions: worker.DeploymentOptions{Version: tv1.SDKDeploymentVersion(), UseVersioning: true, DefaultVersioningBehavior: workflow.VersioningBehaviorAutoUpgrade},
})
w1.RegisterWorkflowWithOptions(wf, workflow.RegisterOptions{Name: "wf", VersioningBehavior: workflow.VersioningBehaviorAutoUpgrade})
s.NoError(w1.Start())
defer w1.Stop()

// Update the userData by setting the current version to v1.
s.updateTaskQueueDeploymentDataWithRoutingConfig(tv1, &deploymentpb.RoutingConfig{
CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv1.DeploymentVersionString()),
CurrentVersionChangedTime: timestamp.TimePtr(time.Now()),
RevisionNumber: 2,
}, map[string]*deploymentspb.WorkerDeploymentVersionData{tv1.DeploymentVersion().GetBuildId(): &deploymentspb.WorkerDeploymentVersionData{
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT,
}}, []string{}, tqTypeWf, tqTypeAct)
// Set v1 to be the current version
s.setCurrentDeployment(tv1)

// Wait until all task queue partitions know that v1 is current.
// s.waitForDeploymentDataPropagation(tv1, versionStatusCurrent, false, tqTypeWf, tqTypeAct)
// Start a workflow on v1.
run, err := s.SdkClient().ExecuteWorkflow(context.Background(), sdkclient.StartWorkflowOptions{
ID: tv1.WorkflowID(),
TaskQueue: tv1.TaskQueue().GetName(),
}, "wf")
s.NoError(err)

// Poll for workflows but this time the workflow task should be acted upon by a v1 worker.
channels = make([]chan struct{}, numWorkflows)
for i := 0; i < numWorkflows; i++ {
channels[i] = make(chan struct{})
s.pollWftAndHandle(wfVarsV1[i], false, channels[i],
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
s.NotNil(task)
resp := respondEmptyWft(wfVarsV1[i], false, vbUnpinned)
resp.ForceCreateNewWorkflowTask = true
return resp, nil
})
}
// Wait for channels to be closed
for i := 0; i < numWorkflows; i++ {
<-channels[i]
}
// Verify that all workflows are running on v1.
for i := 0; i < numWorkflows; i++ {
s.EventuallyWithT(func(t *assert.CollectT) {
s.verifyWorkflowVersioning(wfVarsV1[i], vbUnpinned, tv1.Deployment(), nil, nil)
}, 10*time.Second, 100*time.Millisecond)
}
// Verify that the workflow is running on v1
s.EventuallyWithT(func(t *assert.CollectT) {
s.verifyWorkflowVersioning(tv1, vbUnpinned, tv1.Deployment(), nil, nil)
}, 10*time.Second, 100*time.Millisecond)

// Rollback the userData to v0. Using the lower API here.
// Start v0 workers to ensure they never receive a task
idlePollerCtx, idlePollerCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer idlePollerCancel()
go s.idlePollWorkflow(idlePollerCtx, tv0, true, ver3MinPollTime, "workflows should not go to the old deployment")

currentData, err := s.GetTestCluster().MatchingClient().GetTaskQueueUserData(context.Background(), &matchingservice.GetTaskQueueUserDataRequest{
NamespaceId: s.NamespaceID().String(),
TaskQueue: tv0.TaskQueue().GetName(),
TaskQueueType: tqTypeWf,
})
s.NoError(err)
// Rollback the userData to v0 to simulate routing config lag within a single partition.
s.rollbackTaskQueueToVersion(tv0)

_, err = s.GetTestCluster().MatchingClient().UpdateTaskQueueUserData(context.Background(), &matchingservice.UpdateTaskQueueUserDataRequest{
NamespaceId: s.NamespaceID().String(),
TaskQueue: tv0.TaskQueue().GetName(),
UserData: &persistencespb.VersionedTaskQueueUserData{
Data: &persistencespb.TaskQueueUserData{
PerType: map[int32]*persistencespb.TaskQueueTypeUserData{
int32(tqTypeWf): {
DeploymentData: &persistencespb.DeploymentData{
DeploymentsData: map[string]*persistencespb.WorkerDeploymentData{
tv0.DeploymentVersion().GetDeploymentName(): {
RoutingConfig: &deploymentpb.RoutingConfig{
CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv0.DeploymentVersionString()),
CurrentVersionChangedTime: timestamp.TimePtr(time.Now().Add(-time.Second)),
RevisionNumber: 0,
},
Versions: map[string]*deploymentspb.WorkerDeploymentVersionData{
tv0.DeploymentVersion().GetBuildId(): {
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT,
},
},
},
},
},
},
int32(tqTypeAct): {
DeploymentData: &persistencespb.DeploymentData{
DeploymentsData: map[string]*persistencespb.WorkerDeploymentData{
tv0.DeploymentVersion().GetDeploymentName(): {
RoutingConfig: &deploymentpb.RoutingConfig{
CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv0.DeploymentVersionString()),
CurrentVersionChangedTime: timestamp.TimePtr(time.Now().Add(-time.Second)),
RevisionNumber: 0,
},
Versions: map[string]*deploymentspb.WorkerDeploymentVersionData{
tv0.DeploymentVersion().GetBuildId(): {
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT,
},
},
},
},
},
},
},
},
Version: currentData.GetUserData().GetVersion(),
},
})
s.NoError(err)
// Even though the userData is rolled back to v0, the workflows should only continue to run on v1.
// Start idle pollers on v0 and ensure they never receive a task.
for i := 0; i < numWorkflows; i++ {
//nolint:testifylint
go s.idlePollWorkflow(wfVarsV0[i], true, ver3MinPollTime, "workflows should not go to the old deployment")
}

// Complete all workflows on v1.
channels = make([]chan struct{}, numWorkflows)
for i := 0; i < numWorkflows; i++ {
channels[i] = make(chan struct{})
s.pollWftAndHandle(wfVarsV1[i], false, channels[i],
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
s.NotNil(task)
return respondCompleteWorkflow(wfVarsV1[i], vbUnpinned), nil
})
}
// Wait for channels to be closed
for i := 0; i < numWorkflows; i++ {
<-channels[i]
}
// Unblock the workflow to let it continue
s.NoError(s.SdkClient().SignalWorkflow(context.Background(), tv1.WorkflowID(), "", "afterRollback", nil))

// Verify that all workflows are completed on v1.
for i := 0; i < numWorkflows; i++ {
s.EventuallyWithT(func(t *assert.CollectT) {
s.verifyWorkflowVersioning(wfVarsV1[i], vbUnpinned, tv1.Deployment(), nil, nil)
}, 60*time.Second, 100*time.Millisecond)
}
// Verify that the workflow completed successfully on v1
var result string
s.NoError(run.Get(context.Background(), &result))
s.Equal("v1", result)
}

func (s *Versioning3Suite) TestWorkflowTQLags_DependentActivityStartsTransition() {
if !s.useNewDeploymentData {
s.T().Skip("This test is only supported on new deployment data")
if !s.useRevisionNumbers {
s.T().Skip("This test is only supported on revision number mechanics")
}
/*
The aim of this test is to show the following does not occur when using revisionNumber mechanics:
Expand Down Expand Up @@ -3938,8 +3814,8 @@ func (s *Versioning3Suite) TestWorkflowTQLags_DependentActivityStartsTransition(
}

func (s *Versioning3Suite) TestActivityTQLags_DependentActivityCompletesOnTheNewVersion() {
if !s.useNewDeploymentData {
s.T().Skip("This test is only supported on new deployment data")
if !s.useRevisionNumbers {
s.T().Skip("This test is only supported on revision number mechanics")
}
/*
The aim of this test is to show the following does not occur when using revisionNumber mechanics:
Expand Down Expand Up @@ -4199,7 +4075,7 @@ func (s *Versioning3Suite) TestChildStartsWithParentRevision_SameTQ_TQLags() {
//nolint:testifylint
var wg sync.WaitGroup
wg.Go(func() {
s.idlePollWorkflow(tv0Child, true, 10*time.Second, "workflow should not go to the old deployment")
s.idlePollWorkflow(context.Background(), tv0Child, true, 10*time.Second, "workflow should not go to the old deployment")
})

// Unblock parent to start the child
Expand Down Expand Up @@ -4377,7 +4253,7 @@ func (s *Versioning3Suite) TestContinueAsNewOfAutoUpgradeWorkflow_RevisionNumber

var wg sync.WaitGroup
wg.Go(func() {
s.idlePollWorkflow(tv0, true, 10*time.Second, "workflow should not go to the old deployment")
s.idlePollWorkflow(context.Background(), tv0, true, 10*time.Second, "workflow should not go to the old deployment")
})

// Signal the workflow to trigger CAN
Expand Down Expand Up @@ -4613,7 +4489,7 @@ func (s *Versioning3Suite) testRetryNoBounceBack(testContinueAsNew bool, testChi
// Start v0 pollers and ensure they don't receive a task
var wg sync.WaitGroup
wg.Go(func() {
s.idlePollWorkflow(tv0, true, 10*time.Second, "v0 poller should not receive a task")
s.idlePollWorkflow(context.Background(), tv0, true, 10*time.Second, "v0 poller should not receive a task")
})

// Verify that the rollback propagated to all partitions
Expand Down
Loading