Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
5865ac8
current work on version routing config
Shivs11 Oct 17, 2025
c82d8f2
Merge branch 'main' into ss/version_config_counter
Shivs11 Oct 17, 2025
4d10da8
Proto changes for async deployment info propagation
ShahabT Oct 22, 2025
f4d45bf
comments
Shivs11 Oct 23, 2025
37abe4a
added revision number in MS and in task coming from history->matching
Shivs11 Oct 23, 2025
41484f1
Merge branch 'shahab/async' into ss/version_config_counter
Shivs11 Oct 23, 2025
1d93f38
current progress
Shivs11 Oct 27, 2025
5e88fc2
do not use revision number if deployment are different
Shivs11 Oct 27, 2025
b020b6a
test added for bouncing of unpinned workflows
Shivs11 Oct 28, 2025
b609e49
regenerated mocks
Shivs11 Oct 28, 2025
8cdeb6e
draft v1
Shivs11 Oct 29, 2025
9bf9c6c
draft v2
Shivs11 Oct 29, 2025
c4bf287
some todo's
Shivs11 Oct 29, 2025
c700376
address comments
Shivs11 Oct 29, 2025
e8470cb
fix buggy tests
Shivs11 Oct 29, 2025
e3fd534
remove all prints
Shivs11 Oct 30, 2025
3d8c07a
findTargetDeployment function works with old and new deploymentData
Shivs11 Oct 30, 2025
5704c93
Async routing config propagation in deployment workflow
ShahabT Oct 30, 2025
b82758b
remove plan files
ShahabT Oct 30, 2025
175835c
support new version in replay testing
ShahabT Oct 30, 2025
32b009c
v4: equating rev number in MS
Shivs11 Oct 30, 2025
90b6c01
Mutable state now increments revision number based on task dispatch n…
Shivs11 Oct 30, 2025
af66e45
new deployment check for blackholed queries
Shivs11 Oct 30, 2025
9bb373f
Merge branch 'main' into ss/version_config_counter
Shivs11 Oct 30, 2025
5b60385
some more tweaks after merge conflicts were resolved
Shivs11 Oct 30, 2025
1e8f565
fix breaking unit tests
Shivs11 Oct 30, 2025
3a4610e
gate revision number behind a flag in both history and matching
Shivs11 Oct 30, 2025
eb84e97
clean up PR
Shivs11 Oct 30, 2025
fcc4420
DescribeTQ also uses the latest deploymentData
Shivs11 Oct 30, 2025
a4177a0
fix lint
ShahabT Oct 31, 2025
189f95a
Merge remote-tracking branch 'origin/main' into shahab/sync-apis
ShahabT Oct 31, 2025
c98e8d8
addressed all the changes
Shivs11 Oct 31, 2025
85a832d
more changes addressed and comments removed
Shivs11 Oct 31, 2025
3348870
Merge branch 'main' into ss/version_config_counter
Shivs11 Oct 31, 2025
a3b09a8
remove test from stats
Shivs11 Oct 31, 2025
a840b20
restore a deleted stats test
Shivs11 Oct 31, 2025
10d11c6
remove pre-release test in versioning_3_test
Shivs11 Oct 31, 2025
29d5681
pass in the nil string
Shivs11 Oct 31, 2025
29df5a5
fix workflow determinism
ShahabT Oct 31, 2025
1f2e9c0
fix unit tests
ShahabT Oct 31, 2025
2a55e2f
lint
Shivs11 Oct 31, 2025
925ebb8
fix bug in calculateTaskQueueVersioningInfo by now correctly checking…
Shivs11 Nov 4, 2025
38ded5a
address comments and make batch processing parallel
ShahabT Nov 6, 2025
b062248
fix tests
ShahabT Nov 6, 2025
de66930
fix tests on old wf version
ShahabT Nov 7, 2025
9bb4f87
remove old deployments when RC gets updated + unversioned current/ram…
Shivs11 Nov 7, 2025
5829007
ensureRegisteredInDeploymentVersion now does a HasDeploymentVersion c…
Shivs11 Nov 7, 2025
908826d
add unit tests for both matching and worker-versioning
Shivs11 Nov 7, 2025
4b4be18
lint
Shivs11 Nov 7, 2025
7dafee3
recurring lint
Shivs11 Nov 7, 2025
dfb1c51
address comments
Shivs11 Nov 7, 2025
98468ce
clear current from RC only when required
Shivs11 Nov 7, 2025
c04944b
lint fixes and helpers
Shivs11 Nov 7, 2025
7508721
lint typo
Shivs11 Nov 7, 2025
31a39da
some final cleanup
Shivs11 Nov 7, 2025
f5a5f6e
lint keeps on failing
Shivs11 Nov 7, 2025
e84f484
some more cleanup
Shivs11 Nov 10, 2025
0980b42
Merge branch 'main' into ss/version_config_counter
Shivs11 Nov 10, 2025
b5b031f
make delete version async
ShahabT Nov 10, 2025
86ae2c9
Merge remote-tracking branch 'origin/ss/version_config_counter' into …
ShahabT Nov 10, 2025
58fac14
Merge remote-tracking branch 'origin/main' into shahab/sync-apis
ShahabT Nov 12, 2025
26c0f1b
Fix more tests
ShahabT Nov 12, 2025
3bff9e1
Add versioning tests
ShahabT Nov 12, 2025
8e00d06
Merge remote-tracking branch 'origin/main' into shahab/sync-apis
ShahabT Nov 12, 2025
bbed3c7
Merge remote-tracking branch 'refs/remotes/origin/main' into shahab/s…
ShahabT Nov 13, 2025
b89ea20
more test fixes
ShahabT Nov 13, 2025
f2e3f48
more test fixes
ShahabT Nov 13, 2025
f1126bb
fix unit test
ShahabT Nov 13, 2025
af46d74
fix lint
ShahabT Nov 13, 2025
32efd27
name suite runs
ShahabT Nov 13, 2025
9eeeeb3
improve versioning error messages, return NotFound gRPC error for all…
carlydf Nov 14, 2025
6364d2a
remove unused code
carlydf Nov 14, 2025
d572f69
revert
carlydf Nov 14, 2025
9ce284e
fix tests
carlydf Nov 14, 2025
8b882bd
try to merge
carlydf Nov 20, 2025
cac5554
remove unintentional changes from merge
carlydf Nov 20, 2025
a3a7f4f
recognize templated errors correctly
carlydf Nov 20, 2025
5bb0b22
fix nil pointer error
carlydf Nov 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions service/worker/workerdeployment/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sort"
"strings"
"time"

"github.com/dgryski/go-farm"
Expand Down Expand Up @@ -338,9 +339,9 @@ func newResourceExhaustedError(message string) *serviceerror.ResourceExhausted {
}
}

func (d *ClientImpl) handleUpdateVersionFailures(outcome *updatepb.Outcome) error {
func (d *ClientImpl) handleUpdateVersionFailures(outcome *updatepb.Outcome, deploymentName, buildID string) error {
if failure := outcome.GetFailure(); failure.GetApplicationFailureInfo().GetType() == errVersionNotFound {
return serviceerror.NewNotFound(errVersionNotFound)
return serviceerror.NewNotFoundf(ErrWorkerDeploymentVersionNotFound, buildID, deploymentName)
} else if failure.GetApplicationFailureInfo().GetType() == errFailedPrecondition {
return serviceerror.NewFailedPrecondition(failure.Message)
} else if failure != nil {
Expand Down Expand Up @@ -704,7 +705,7 @@ func (d *ClientImpl) SetCurrentVersion(
if err != nil {
var notFound *serviceerror.NotFound
if errors.As(err, &notFound) {
return nil, serviceerror.NewFailedPreconditionf(ErrWorkerDeploymentNotFound, deploymentName)
return nil, serviceerror.NewNotFoundf(ErrWorkerDeploymentNotFound, deploymentName)
}
return nil, err
}
Expand All @@ -719,7 +720,7 @@ func (d *ClientImpl) SetCurrentVersion(
res.ConflictToken = details[0].GetData()
}
return &res, nil
} else if updateErr := d.handleUpdateVersionFailures(outcome); updateErr != nil {
} else if updateErr := d.handleUpdateVersionFailures(outcome, deploymentName, versionObj.GetBuildId()); updateErr != nil {
return nil, updateErr
} else if registerErr := d.handleRegisterVersionFailures(outcome); registerErr != nil {
return nil, registerErr
Expand Down Expand Up @@ -818,7 +819,7 @@ func (d *ClientImpl) SetRampingVersion(
if err != nil {
var notFound *serviceerror.NotFound
if errors.As(err, &notFound) {
return nil, serviceerror.NewFailedPreconditionf(ErrWorkerDeploymentNotFound, deploymentName)
return nil, serviceerror.NewNotFoundf(ErrWorkerDeploymentNotFound, deploymentName)
}
return nil, err
}
Expand All @@ -836,7 +837,7 @@ func (d *ClientImpl) SetRampingVersion(
}

return &res, nil
} else if updateErr := d.handleUpdateVersionFailures(outcome); updateErr != nil {
} else if updateErr := d.handleUpdateVersionFailures(outcome, deploymentName, versionObj.GetBuildId()); updateErr != nil {
return nil, updateErr
} else if registerErr := d.handleRegisterVersionFailures(outcome); registerErr != nil {
return nil, registerErr
Expand Down Expand Up @@ -1153,10 +1154,10 @@ func (d *ClientImpl) DeleteVersionFromWorkerDeployment(
}

if failure := outcome.GetFailure(); failure != nil {
if failure.Message == ErrVersionIsDraining {
return temporal.NewNonRetryableApplicationError(ErrVersionIsDraining, errFailedPrecondition, nil) // non-retryable error to stop multiple activity attempts
} else if failure.Message == ErrVersionHasPollers {
return temporal.NewNonRetryableApplicationError(ErrVersionHasPollers, errFailedPrecondition, nil) // non-retryable error to stop multiple activity attempts
if strings.Contains(failure.Message, errVersionIsDrainingSuffix) {
return temporal.NewNonRetryableApplicationError(fmt.Sprintf(ErrVersionIsDraining, worker_versioning.WorkerDeploymentVersionToStringV32(versionObj)), errFailedPrecondition, nil) // non-retryable error to stop multiple activity attempts
} else if strings.Contains(failure.Message, errVersionHasPollersSuffix) {
return temporal.NewNonRetryableApplicationError(fmt.Sprintf(ErrVersionHasPollers, worker_versioning.WorkerDeploymentVersionToStringV32(versionObj)), errFailedPrecondition, nil) // non-retryable error to stop multiple activity attempts
}
return serviceerror.NewInternal(failure.Message)
}
Expand Down
18 changes: 10 additions & 8 deletions service/worker/workerdeployment/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,17 @@ const (
errConflictTokenMismatchType = "errConflictTokenMismatch"
errFailedPrecondition = "FailedPrecondition"

ErrVersionIsDraining = "Version cannot be deleted since it is draining."
ErrVersionHasPollers = "Version cannot be deleted since it has active pollers."
ErrVersionIsCurrentOrRamping = "Version cannot be deleted since it is current or ramping."

ErrRampingVersionDoesNotHaveAllTaskQueues = "proposed ramping version is missing active task queues from the current version; these would become unversioned if it is set as the ramping version"
ErrCurrentVersionDoesNotHaveAllTaskQueues = "proposed current version is missing active task queues from the current version; these would become unversioned if it is set as the current version"
errVersionIsDrainingSuffix = "cannot be deleted since it is draining"
ErrVersionIsDraining = "version '%s' " + errVersionIsDrainingSuffix
errVersionHasPollersSuffix = "cannot be deleted since it has active pollers"
ErrVersionHasPollers = "version '%s' " + errVersionHasPollersSuffix
ErrVersionIsCurrentOrRamping = "version '%s' cannot be deleted since it is current or ramping"

ErrRampingVersionDoesNotHaveAllTaskQueues = "proposed ramping version '%s' is missing active task queues from the current version; these would become unversioned if it is set as the ramping version"
ErrCurrentVersionDoesNotHaveAllTaskQueues = "proposed current version '%s' is missing active task queues from the current version; these would become unversioned if it is set as the current version"
ErrManagerIdentityMismatch = "ManagerIdentity '%s' is set and does not match user identity '%s'; to proceed, set your own identity as the ManagerIdentity, remove the ManagerIdentity, or wait for the other client to do so"
ErrWorkerDeploymentNotFound = "no Worker Deployment found with name %s; does your Worker Deployment have pollers?"
ErrWorkerDeploymentVersionNotFound = "build ID %s not fount in Worker Deployment %s"
ErrWorkerDeploymentNotFound = "no Worker Deployment found with name '%s'; does your Worker Deployment have pollers?"
ErrWorkerDeploymentVersionNotFound = "build ID '%s' not found in Worker Deployment '%s'"
)

var (
Expand Down
17 changes: 13 additions & 4 deletions service/worker/workerdeployment/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,11 @@ func (d *WorkflowRunner) setRamp(
return err
}
if isMissingTaskQueues {
return serviceerror.NewFailedPrecondition(ErrRampingVersionDoesNotHaveAllTaskQueues)
newRampingVersionObj, _ := worker_versioning.WorkerDeploymentVersionFromStringV31(newRampingVersion)
return serviceerror.NewFailedPreconditionf(
ErrRampingVersionDoesNotHaveAllTaskQueues,
worker_versioning.WorkerDeploymentVersionToStringV32(newRampingVersionObj),
)
}
}

Expand Down Expand Up @@ -814,13 +818,14 @@ func (d *WorkflowRunner) validateDeleteVersion(args *deploymentspb.DeleteVersion
// deployment workflow because that's the source of truth for routing config.
//nolint:staticcheck // SA1019: worker versioning v0.31
if d.State.RoutingConfig.CurrentVersion == args.Version || d.State.RoutingConfig.RampingVersion == args.Version {
versionObj, _ := worker_versioning.WorkerDeploymentVersionFromStringV31(args.Version)
// activity won't retry on this error since version not eligible for deletion
return serviceerror.NewFailedPrecondition(ErrVersionIsCurrentOrRamping)
return serviceerror.NewFailedPreconditionf(ErrVersionIsCurrentOrRamping, worker_versioning.WorkerDeploymentVersionToStringV32(versionObj))
}

// Ignore the manager identity check if the delete operation is initiated by the server internally
if !args.GetServerDelete() && d.State.ManagerIdentity != "" && d.State.ManagerIdentity != args.Identity {
return serviceerror.NewFailedPrecondition(fmt.Sprintf(ErrManagerIdentityMismatch, d.State.ManagerIdentity, args.Identity))
return serviceerror.NewFailedPreconditionf(ErrManagerIdentityMismatch, d.State.ManagerIdentity, args.Identity)
}
return nil
}
Expand Down Expand Up @@ -1012,7 +1017,11 @@ func (d *WorkflowRunner) handleSetCurrent(ctx workflow.Context, args *deployment
return nil, err
}
if isMissingTaskQueues {
return nil, serviceerror.NewFailedPrecondition(ErrCurrentVersionDoesNotHaveAllTaskQueues)
newCurrentVersionObj, _ := worker_versioning.WorkerDeploymentVersionFromStringV31(newCurrentVersion)
return nil, serviceerror.NewFailedPreconditionf(
ErrCurrentVersionDoesNotHaveAllTaskQueues,
worker_versioning.WorkerDeploymentVersionToStringV32(newCurrentVersionObj),
)
}
}

Expand Down
2 changes: 1 addition & 1 deletion service/worker/workerdeployment/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,7 @@ func (s *WorkerDeploymentSuite) Test_DeleteVersion_FailsWhenCurrentOrRamping() {
s.env.UpdateWorkflow(DeleteVersion, "", &testsuite.TestUpdateCallback{
OnReject: func(err error) {
// The validator should reject this update
s.Require().ErrorContains(err, ErrVersionIsCurrentOrRamping)
s.Require().ErrorContains(err, fmt.Sprintf(ErrVersionIsCurrentOrRamping, tv.DeploymentVersionStringV32()))
},
OnAccept: func() {
s.Fail("delete version should have been rejected by validator")
Expand Down
4 changes: 2 additions & 2 deletions tests/versioning_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2695,7 +2695,7 @@ func (s *Versioning3Suite) setCurrentDeployment(tv *testvars.TestVars) {
}
_, err := s.FrontendClient().SetWorkerDeploymentCurrentVersion(ctx, req)
var notFound *serviceerror.NotFound
if errors.As(err, &notFound) || (err != nil && strings.Contains(err.Error(), workerdeployment.ErrCurrentVersionDoesNotHaveAllTaskQueues)) {
if errors.As(err, &notFound) || (err != nil && strings.Contains(err.Error(), serviceerror.NewFailedPreconditionf(workerdeployment.ErrCurrentVersionDoesNotHaveAllTaskQueues, tv.DeploymentVersionStringV32()).Error())) {
return false
}
s.NoError(err)
Expand Down Expand Up @@ -2748,7 +2748,7 @@ func (s *Versioning3Suite) setRampingDeployment(
}
_, err := s.FrontendClient().SetWorkerDeploymentRampingVersion(ctx, req)
var notFound *serviceerror.NotFound
if errors.As(err, &notFound) || errors.Is(err, serviceerror.NewFailedPrecondition(workerdeployment.ErrRampingVersionDoesNotHaveAllTaskQueues)) {
if errors.As(err, &notFound) || (err != nil && strings.Contains(err.Error(), serviceerror.NewFailedPreconditionf(workerdeployment.ErrRampingVersionDoesNotHaveAllTaskQueues, tv.DeploymentVersionStringV32()).Error())) {
return false
}
s.NoError(err)
Expand Down
12 changes: 6 additions & 6 deletions tests/worker_deployment_version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func (s *DeploymentVersionSuite) TestDeleteVersion_DeleteCurrentVersion() {
s.Nil(err)

// Deleting this version should fail since the version is current
s.tryDeleteVersion(ctx, tv1, workerdeployment.ErrVersionIsCurrentOrRamping, false)
s.tryDeleteVersion(ctx, tv1, fmt.Sprintf(workerdeployment.ErrVersionIsCurrentOrRamping, tv1.DeploymentVersionStringV32()), false)

// Verifying workflow is not in a locked state after an invalid delete request such as the one above. If the workflow were in a locked
// state, the passed context would have timed out making the following operation fail.
Expand Down Expand Up @@ -545,7 +545,7 @@ func (s *DeploymentVersionSuite) TestDeleteVersion_DeleteRampedVersion() {
s.Nil(err)

// Deleting this version should fail since the version is ramping
s.tryDeleteVersion(ctx, tv1, workerdeployment.ErrVersionIsCurrentOrRamping, false)
s.tryDeleteVersion(ctx, tv1, fmt.Sprintf(workerdeployment.ErrVersionIsCurrentOrRamping, tv1.DeploymentVersionStringV32()), false)

// Verifying workflow is not in a locked state after an invalid delete request such as the one above. If the workflow were in a locked
// state, the passed context would have timed out making the following operation fail.
Expand Down Expand Up @@ -622,7 +622,7 @@ func (s *DeploymentVersionSuite) TestDeleteVersion_DrainingVersion() {
}, enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_DRAINING, false, false)

// delete should fail
s.tryDeleteVersion(ctx, tv1, workerdeployment.ErrVersionIsDraining, false)
s.tryDeleteVersion(ctx, tv1, fmt.Sprintf(workerdeployment.ErrVersionIsDraining, tv1.DeploymentVersionStringV32()), false)

}

Expand Down Expand Up @@ -650,7 +650,7 @@ func (s *DeploymentVersionSuite) TestDeleteVersion_Drained_But_Pollers_Exist() {
s.signalAndWaitForDrained(ctx, tv1)

// Version will bypass "drained" check but delete should still fail since we have active pollers.
s.tryDeleteVersion(ctx, tv1, workerdeployment.ErrVersionHasPollers, false)
s.tryDeleteVersion(ctx, tv1, fmt.Sprintf(workerdeployment.ErrVersionHasPollers, tv1.DeploymentVersionStringV32()), false)
}

func (s *DeploymentVersionSuite) signalAndWaitForDrained(ctx context.Context, tv *testvars.TestVars) {
Expand Down Expand Up @@ -936,7 +936,7 @@ func (s *DeploymentVersionSuite) TestVersionMissingTaskQueues_InvalidSetCurrentV

// SetCurrent should fail since task_queue_1 does not have a current version than the deployment's existing current version
// and it either has a backlog of tasks being present or an add rate > 0.
s.EqualError(err, workerdeployment.ErrCurrentVersionDoesNotHaveAllTaskQueues)
s.EqualError(err, fmt.Sprintf(workerdeployment.ErrCurrentVersionDoesNotHaveAllTaskQueues, tv2.DeploymentVersionStringV32()))
}

func (s *DeploymentVersionSuite) TestVersionMissingTaskQueues_ValidSetCurrentVersion() {
Expand Down Expand Up @@ -994,7 +994,7 @@ func (s *DeploymentVersionSuite) TestVersionMissingTaskQueues_InvalidSetRampingV

// SetRampingVersion should fail since task_queue_1 does not have a current version than the deployment's existing current version
// and it either has a backlog of tasks being present or an add rate > 0.
s.EqualError(err, workerdeployment.ErrRampingVersionDoesNotHaveAllTaskQueues)
s.EqualError(err, fmt.Sprintf(workerdeployment.ErrRampingVersionDoesNotHaveAllTaskQueues, tv2.DeploymentVersionStringV32()))
}

func (s *DeploymentVersionSuite) TestVersionMissingTaskQueues_ValidSetRampingVersion() {
Expand Down
Loading