diff --git a/controllers/change_coordinators.go b/controllers/change_coordinators.go index 403335d05..51aa50333 100644 --- a/controllers/change_coordinators.go +++ b/controllers/change_coordinators.go @@ -25,6 +25,7 @@ import ( "github.com/FoundationDB/fdb-kubernetes-operator/v2/internal/coordinator" "github.com/FoundationDB/fdb-kubernetes-operator/v2/internal/locality" + "github.com/FoundationDB/fdb-kubernetes-operator/v2/pkg/fdbstatus" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" @@ -94,6 +95,20 @@ func (c changeCoordinators) reconcile( return nil } + // Perform safety checks before changing coordinators. The minimum uptime should reduce the coordinator changes + // if a process is down for a short amount of time, e.g. after a cluster wide bounce. + err = fdbstatus.CanSafelyChangeCoordinators( + logger, + cluster, + status, + r.MinimumUptimeForCoordinatorChangeWithMissingProcess, + r.MinimumUptimeForCoordinatorChangeWithUndesiredProcess, + ) + if err != nil { + logger.Info("Deferring coordinator change due to safety check", "error", err.Error()) + return &requeue{curError: err, delayedRequeue: true} + } + err = r.takeLock(logger, cluster, "changing coordinators") if err != nil { return &requeue{curError: err, delayedRequeue: true} diff --git a/controllers/change_coordinators_test.go b/controllers/change_coordinators_test.go index 21fc54cf4..a00136c79 100644 --- a/controllers/change_coordinators_test.go +++ b/controllers/change_coordinators_test.go @@ -23,6 +23,7 @@ package controllers import ( "context" "math" + "time" fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/v2/api/v1beta2" "github.com/FoundationDB/fdb-kubernetes-operator/v2/internal" @@ -49,10 +50,6 @@ var _ = Describe("Change coordinators", func() { }, } Expect(setupClusterForTest(cluster)).NotTo(HaveOccurred()) - - var err error - _, err = mock.NewMockAdminClientUncast(cluster, k8sClient) - Expect(err).NotTo(HaveOccurred()) }) Describe("reconcile", func() { @@ -69,7 +66,7 @@ var _ = Describe("Change coordinators", func() { clusterReconciler, cluster, nil, - globalControllerLogger, + testLogger, ) }) @@ -163,5 +160,179 @@ var _ = Describe("Change coordinators", func() { }, ) }) + + When("safety checks are enabled", func() { + BeforeEach(func() { + clusterReconciler.MinimumUptimeForCoordinatorChangeWithUndesiredProcess = 5 * time.Minute + clusterReconciler.MinimumUptimeForCoordinatorChangeWithMissingProcess = 10 * time.Minute + clusterReconciler.EnableRecoveryState = true + }) + + When("one coordinator is undesired", func() { + BeforeEach(func() { + adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient) + Expect(err).NotTo(HaveOccurred()) + + status, err := adminClient.GetStatus() + Expect(err).NotTo(HaveOccurred()) + + coordinators := map[string]fdbv1beta2.None{} + for _, coordinator := range status.Client.Coordinators.Coordinators { + coordinators[coordinator.Address.String()] = fdbv1beta2.None{} + } + + for _, process := range status.Cluster.Processes { + if _, ok := coordinators[process.Address.String()]; !ok { + continue + } + Expect(adminClient.ExcludeProcesses([]fdbv1beta2.ProcessAddress{ + { + IPAddress: process.Address.IPAddress, + }, + })).To(Succeed()) + break + } + }) + + When("the cluster is up for long enough", func() { + It("should change the coordinators", func() { + Expect(requeue).To(BeNil()) + Expect( + cluster.Status.ConnectionString, + ).NotTo(Equal(originalConnectionString)) + }) + }) + + When("Too many active generations are present", func() { + BeforeEach(func() { + adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient) + Expect(err).NotTo(HaveOccurred()) + adminClient.ActiveGenerations = ptr.To(11) + }) + + AfterEach(func() { + adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient) + Expect(err).NotTo(HaveOccurred()) + adminClient.ActiveGenerations = nil + }) + + It("should defer coordinator change and requeue with delay", func() { + Expect(requeue).NotTo(BeNil()) + Expect(requeue.delayedRequeue).To(BeTrue()) + Expect(requeue.curError).To(HaveOccurred()) + Expect( + requeue.curError.Error(), + ).To(ContainSubstring("cluster has 11 active generations, but only 10 active generations are allowed to safely change coordinators")) + Expect(cluster.Status.ConnectionString).To(Equal(originalConnectionString)) + }) + }) + + When("the cluster is only up for 10 seconds", func() { + BeforeEach(func() { + adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient) + Expect(err).NotTo(HaveOccurred()) + adminClient.SecondsSinceLastRecovered = ptr.To(10.0) + }) + + AfterEach(func() { + adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient) + Expect(err).NotTo(HaveOccurred()) + adminClient.SecondsSinceLastRecovered = nil + }) + + It("should defer coordinator change and requeue with delay", func() { + Expect(requeue).NotTo(BeNil()) + Expect(requeue.delayedRequeue).To(BeTrue()) + Expect(requeue.curError).To(HaveOccurred()) + Expect( + requeue.curError.Error(), + ).To(Equal("cannot: change coordinators: cluster is not up for long enough, clusters last recovery was 10.00 seconds ago, waiting until the last recovery was 300 seconds ago")) + Expect(cluster.Status.ConnectionString).To(Equal(originalConnectionString)) + }) + }) + }) + + When("one coordinator is missing", func() { + BeforeEach(func() { + adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient) + Expect(err).NotTo(HaveOccurred()) + + status, err := adminClient.GetStatus() + Expect(err).NotTo(HaveOccurred()) + + coordinators := map[string]fdbv1beta2.None{} + for _, coordinator := range status.Client.Coordinators.Coordinators { + coordinators[coordinator.Address.String()] = fdbv1beta2.None{} + } + + for _, process := range status.Cluster.Processes { + if _, ok := coordinators[process.Address.String()]; !ok { + continue + } + adminClient.MockMissingProcessGroup( + fdbv1beta2.ProcessGroupID( + process.Locality[fdbv1beta2.FDBLocalityInstanceIDKey], + ), + true, + ) + break + } + }) + + When("the cluster is up for long enough", func() { + It("should change the coordinators", func() { + Expect(requeue).To(BeNil()) + Expect( + cluster.Status.ConnectionString, + ).NotTo(Equal(originalConnectionString)) + }) + }) + + When("Multiple active generations are present", func() { + BeforeEach(func() { + adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient) + Expect(err).NotTo(HaveOccurred()) + adminClient.ActiveGenerations = ptr.To(11) + }) + + AfterEach(func() { + adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient) + Expect(err).NotTo(HaveOccurred()) + adminClient.ActiveGenerations = nil + }) + + It("should change the coordinators", func() { + Expect(requeue).To(BeNil()) + Expect( + cluster.Status.ConnectionString, + ).NotTo(Equal(originalConnectionString)) + }) + }) + + When("the cluster is only up for 10 seconds", func() { + BeforeEach(func() { + adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient) + Expect(err).NotTo(HaveOccurred()) + adminClient.SecondsSinceLastRecovered = ptr.To(10.0) + }) + + AfterEach(func() { + adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient) + Expect(err).NotTo(HaveOccurred()) + adminClient.SecondsSinceLastRecovered = nil + }) + + It("should defer coordinator change and requeue with delay", func() { + Expect(requeue).NotTo(BeNil()) + Expect(requeue.delayedRequeue).To(BeTrue()) + Expect(requeue.curError).To(HaveOccurred()) + Expect( + requeue.curError.Error(), + ).To(Equal("cannot: change coordinators: cluster has 1 missing coordinators, clusters last recovery was 10.00 seconds ago, waiting until the last recovery was 600 seconds ago")) + Expect(cluster.Status.ConnectionString).To(Equal(originalConnectionString)) + }) + }) + }) + }) }) }) diff --git a/controllers/cluster_controller.go b/controllers/cluster_controller.go index 8808e4df1..83f7023c9 100644 --- a/controllers/cluster_controller.go +++ b/controllers/cluster_controller.go @@ -126,6 +126,16 @@ type FoundationDBClusterReconciler struct { // wait time will increase the chances that all updates are part of the list but will also delay the rollout of // the change. GlobalSynchronizationWaitDuration time.Duration + // MinimumUptimeForCoordinatorChangeWithMissingProcess defines the minimum uptime of the cluster before coordinator + // changes because of a missing coordinator are allowed. + MinimumUptimeForCoordinatorChangeWithMissingProcess time.Duration + // MinimumUptimeForCoordinatorChangeWithUndesiredProcess defines the minimum uptime of the cluster before coordinator + // changes because of an undesired coordinator are allowed. + MinimumUptimeForCoordinatorChangeWithUndesiredProcess time.Duration + // MinimumUptimeForConfigurationChanges defines the minimum uptime for the cluster before configuration changes + // are allowed. + MinimumUptimeForConfigurationChanges time.Duration + // MinimumRecoveryTimeForInclusion defines the duration in seconds that a cluster must be up // before new inclusions are allowed. The operator issuing frequent inclusions in a short time window // could cause instability for the cluster as each inclusion will/can cause a recovery. Delaying the inclusion diff --git a/controllers/suite_test.go b/controllers/suite_test.go index 84c8b6bef..6c4b9cbbf 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -209,11 +209,12 @@ func createTestClusterReconciler() *FoundationDBClusterReconciler { SimulateZones: true, SimulateTime: true, }, - PodLifecycleManager: &podmanager.StandardPodLifecycleManager{}, - PodClientProvider: mockpodclient.NewMockFdbPodClient, - DatabaseClientProvider: mock.DatabaseClientProvider{}, - MaintenanceListStaleDuration: 4 * time.Hour, - MaintenanceListWaitDuration: 5 * time.Minute, - HighRunLoopBusyThreshold: 1.0, + PodLifecycleManager: &podmanager.StandardPodLifecycleManager{}, + PodClientProvider: mockpodclient.NewMockFdbPodClient, + DatabaseClientProvider: mock.DatabaseClientProvider{}, + MaintenanceListStaleDuration: 4 * time.Hour, + MaintenanceListWaitDuration: 5 * time.Minute, + HighRunLoopBusyThreshold: 1.0, + MinimumUptimeForConfigurationChanges: 1 * time.Minute, } } diff --git a/controllers/update_database_configuration.go b/controllers/update_database_configuration.go index 9a2a4e758..ca30d1415 100644 --- a/controllers/update_database_configuration.go +++ b/controllers/update_database_configuration.go @@ -92,8 +92,9 @@ func (u updateDatabaseConfiguration) reconcile( return &requeue{curError: err, delayedRequeue: true} } - err = fdbstatus.ConfigurationChangeAllowed( + err = fdbstatus.ConfigurationChangeAllowedWithMinimumUptime( status, + r.MinimumUptimeForConfigurationChanges, runningVersion.SupportsRecoveryState() && r.EnableRecoveryState, ) if err != nil { diff --git a/controllers/update_database_configuration_test.go b/controllers/update_database_configuration_test.go index 9c90fcae3..d3ecef1fb 100644 --- a/controllers/update_database_configuration_test.go +++ b/controllers/update_database_configuration_test.go @@ -278,7 +278,7 @@ var _ = Describe("update_database_configuration", func() { Expect(requeue).NotTo(BeNil()) Expect( requeue.message, - ).To(Equal("Configuration change is not safe: clusters last recovery was 0.10 seconds ago, wait until the last recovery was 60 seconds ago, will retry")) + ).To(Equal("Configuration change is not safe: cannot: change configuration, clusters last recovery was 0.10 seconds ago, waiting until the last recovery was 60 seconds ago, will retry")) Expect(cluster.Status.Configured).To(BeTrue()) adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient) diff --git a/e2e/fixtures/fdb_operator_client.go b/e2e/fixtures/fdb_operator_client.go index 599124ffd..bf677a6a2 100644 --- a/e2e/fixtures/fdb_operator_client.go +++ b/e2e/fixtures/fdb_operator_client.go @@ -505,6 +505,9 @@ spec: - --cluster-label-key-for-node-trigger=foundationdb.org/fdb-cluster-name - --enable-node-index - --replace-on-security-context-change + - --minimum-uptime-for-coordinator-change-with-undesired-process=20s + - --minimum-uptime-for-coordinator-change-with-missing-process=10s + - --minimum-uptime-for-configuration-changes=5s ` ) diff --git a/pkg/fdbadminclient/mock/admin_client_mock.go b/pkg/fdbadminclient/mock/admin_client_mock.go index 3fce7e38e..8bad37f8f 100644 --- a/pkg/fdbadminclient/mock/admin_client_mock.go +++ b/pkg/fdbadminclient/mock/admin_client_mock.go @@ -72,11 +72,13 @@ type AdminClient struct { localityInfo map[fdbv1beta2.ProcessGroupID]map[string]string MaxZoneFailuresWithoutLosingData *int MaxZoneFailuresWithoutLosingAvailability *int + ActiveGenerations *int MaintenanceZone fdbv1beta2.FaultDomain restoreURL string maintenanceZoneStartTimestamp time.Time MockAdditionTimeForGlobalCoordination time.Time uptimeSecondsForMaintenanceZone float64 + SecondsSinceLastRecovered *float64 TeamTracker []fdbv1beta2.FoundationDBStatusTeamTracker Logs []fdbv1beta2.FoundationDBStatusLogInfo mockError error @@ -616,8 +618,8 @@ func (client *AdminClient) GetStatus() (*fdbv1beta2.FoundationDBStatus, error) { status.Cluster.RecoveryState = fdbv1beta2.RecoveryState{ Name: "fully_recovered", - SecondsSinceLastRecovered: 600.0, - ActiveGenerations: 1, + SecondsSinceLastRecovered: ptr.Deref(client.SecondsSinceLastRecovered, 600.0), + ActiveGenerations: ptr.Deref(client.ActiveGenerations, 1), } return status, nil diff --git a/pkg/fdbstatus/status_checks.go b/pkg/fdbstatus/status_checks.go index cb12911f2..b6a6f1d4f 100644 --- a/pkg/fdbstatus/status_checks.go +++ b/pkg/fdbstatus/status_checks.go @@ -25,6 +25,7 @@ import ( "math" "strconv" "strings" + "time" fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/v2/api/v1beta2" "github.com/FoundationDB/fdb-kubernetes-operator/v2/pkg/fdbadminclient" @@ -313,7 +314,7 @@ func GetCoordinatorsFromStatus(status *fdbv1beta2.FoundationDBStatus) map[string return coordinators } -// GetMinimumUptimeAndAddressMap returns address map of the processes included the the foundationdb status. The minimum +// GetMinimumUptimeAndAddressMap returns address map of the processes included the foundationdb status. The minimum // uptime will be either secondsSinceLastRecovered if the recovery state is supported and enabled otherwise we will // take the minimum uptime of all processes. func GetMinimumUptimeAndAddressMap( @@ -630,24 +631,11 @@ func canSafelyExcludeOrIncludeProcesses( return err } - version, err := fdbv1beta2.ParseFdbVersion(cluster.GetRunningVersion()) + err = checkIfClusterUpLongEnough(cluster, status, minRecoverySeconds, action) if err != nil { return err } - if version.SupportsRecoveryState() { - // We want to make sure that the cluster is recovered for some time. This should protect the cluster from - // getting into a bad state as a result of frequent inclusions/exclusions. - if status.Cluster.RecoveryState.SecondsSinceLastRecovered < minRecoverySeconds { - return fmt.Errorf( - "cannot: %s, clusters last recovery was %0.2f seconds ago, wait until the last recovery was %0.0f seconds ago", - action, - status.Cluster.RecoveryState.SecondsSinceLastRecovered, - minRecoverySeconds, - ) - } - } - // In the case of inclusions we also want to make sure we only change the list of excluded server if the cluster is // in a good shape, otherwise the CC might crash: https://github.com/apple/foundationdb/blob/release-7.1/fdbserver/ClusterRecovery.actor.cpp#L575-L579 if inclusion && !recoveryStateAllowsInclusion(status) { @@ -689,9 +677,20 @@ func CanSafelyIncludeProcesses( // ConfigurationChangeAllowed will return an error if the configuration change is assumed to be unsafe. If no error // is returned the configuration change can be applied. +// deprecated func ConfigurationChangeAllowed( status *fdbv1beta2.FoundationDBStatus, useRecoveryState bool, +) error { + return ConfigurationChangeAllowedWithMinimumUptime(status, 60*time.Second, useRecoveryState) +} + +// ConfigurationChangeAllowedWithMinimumUptime will return an error if the configuration change is assumed to be unsafe. If no error +// is returned the configuration change can be applied. +func ConfigurationChangeAllowedWithMinimumUptime( + status *fdbv1beta2.FoundationDBStatus, + minimumUptime time.Duration, + useRecoveryState bool, ) error { err := DefaultSafetyChecks(status, 10, "change configuration") if err != nil { @@ -711,13 +710,16 @@ func ConfigurationChangeAllowed( } } - // We want to wait at least 60 seconds between configuration changes that trigger a recovery, otherwise we might + // We want to wait at least minimumUptime seconds between configuration changes that trigger a recovery, otherwise we might // issue too frequent configuration changes. - if useRecoveryState && status.Cluster.RecoveryState.SecondsSinceLastRecovered < 60.0 { - return fmt.Errorf( - "clusters last recovery was %0.2f seconds ago, wait until the last recovery was 60 seconds ago", - status.Cluster.RecoveryState.SecondsSinceLastRecovered, - ) + err = checkIfClusterUpLongEnoughWithRecoveryState( + status, + minimumUptime.Seconds(), + "change configuration", + useRecoveryState, + ) + if err != nil { + return err } return CheckQosStatus(status) @@ -840,3 +842,118 @@ func ClusterIsConfigured( status.Client.DatabaseStatus.Available && status.Cluster.Layers.Error != "configurationMissing" } + +// CanSafelyChangeCoordinators returns nil when it is safe to change coordinators in the cluster or returns an error +// with more information why it's not safe to change coordinators. This function differentiates between missing (down) +// processes and processes that are only undesired, applying different minimum uptime requirements for each case. +func CanSafelyChangeCoordinators( + _ logr.Logger, + cluster *fdbv1beta2.FoundationDBCluster, + status *fdbv1beta2.FoundationDBStatus, + minimumUptimeForMissing time.Duration, + minimumUptimeForUndesired time.Duration, +) error { + // Analyze current coordinators using the coordinator information from status + // This gives us the definitive list of coordinators regardless of whether processes are running + missingCoordinators := 0 + + // Create a map of process addresses to process group IDs for faster lookup + coordinators := map[string]bool{} + for _, coordinator := range status.Client.Coordinators.Coordinators { + coordinators[coordinator.Address.String()] = false + } + + for _, process := range status.Cluster.Processes { + processAddr := process.Address.String() + if _, ok := coordinators[processAddr]; !ok { + continue + } + + coordinators[processAddr] = true + } + + for _, isPresent := range coordinators { + if !isPresent { + missingCoordinators++ + } + } + + // Apply different uptime requirements based on the type of coordinator issues + var requiredUptime float64 + var reason string + + if missingCoordinators > 0 { + // Missing coordinators indicate processes that are down, and we should be using a lower threshold to recover + // from the missing coordinator faster. + requiredUptime = minimumUptimeForMissing.Seconds() + reason = fmt.Sprintf("cluster has %d missing coordinators", missingCoordinators) + } else { + requiredUptime = minimumUptimeForUndesired.Seconds() + reason = "cluster is not up for long enough" + + // Perform the default safety checks in case of "normal" coordinator changes or if processes are excluded. If + // the cluster has missing coordinators, we should bypass those checks to ensure we recruit the new coordinators + // in a timely manner. + err := DefaultSafetyChecks(status, 10, "change coordinators") + if err != nil { + return err + } + } + + // Check that the cluster has been stable for the required time + return checkIfClusterUpLongEnough( + cluster, + status, + requiredUptime, + fmt.Sprintf("change coordinators: %s", reason), + ) +} + +// checkIfClusterUpLongEnough checks if the clusters is up long enough to perform the requested change. The uptime of the +// cluster is calculated based on the status.Cluster.RecoveryState.SecondsSinceLastRecovered value. For clusters before +// 7.1.22 this check will be ignored. +// Will return an error with additional details if the cluster is not up long enough. +func checkIfClusterUpLongEnough( + cluster *fdbv1beta2.FoundationDBCluster, + status *fdbv1beta2.FoundationDBStatus, + minRecoverySeconds float64, + action string, +) error { + version, err := fdbv1beta2.ParseFdbVersion(cluster.GetRunningVersion()) + if err != nil { + return err + } + + return checkIfClusterUpLongEnoughWithRecoveryState( + status, + minRecoverySeconds, + action, + version.SupportsRecoveryState(), + ) +} + +// checkIfClusterUpLongEnoughWithRecoveryState checks if the clusters is up long enough to perform the requested change. The uptime of the +// cluster is calculated based on the status.Cluster.RecoveryState.SecondsSinceLastRecovered value. When supportsRecoveryState is false +// the check will be skipped. +// Will return an error with additional details if the cluster is not up long enough. +func checkIfClusterUpLongEnoughWithRecoveryState( + status *fdbv1beta2.FoundationDBStatus, + minRecoverySeconds float64, + action string, + supportsRecoveryState bool, +) error { + if supportsRecoveryState { + // We want to make sure that the cluster is recovered for some time. This should protect the cluster from + // getting into a bad state as a result of frequent inclusions/exclusions. + if status.Cluster.RecoveryState.SecondsSinceLastRecovered < minRecoverySeconds { + return fmt.Errorf( + "cannot: %s, clusters last recovery was %0.2f seconds ago, waiting until the last recovery was %0.0f seconds ago", + action, + status.Cluster.RecoveryState.SecondsSinceLastRecovered, + minRecoverySeconds, + ) + } + } + + return nil +} diff --git a/pkg/fdbstatus/status_checks_test.go b/pkg/fdbstatus/status_checks_test.go index 9f4bf9a0f..7210d2b68 100644 --- a/pkg/fdbstatus/status_checks_test.go +++ b/pkg/fdbstatus/status_checks_test.go @@ -23,6 +23,7 @@ package fdbstatus import ( "fmt" "net" + "time" "k8s.io/utils/ptr" @@ -2035,7 +2036,7 @@ var _ = Describe("status_checks", func() { }, }, fmt.Errorf( - "cannot: exclude processes, clusters last recovery was 10.00 seconds ago, wait until the last recovery was 120 seconds ago", + "cannot: exclude processes, clusters last recovery was 10.00 seconds ago, waiting until the last recovery was 120 seconds ago", ), ), Entry("cluster's last recovery is 120 seconds ago", @@ -2207,7 +2208,7 @@ var _ = Describe("status_checks", func() { }, }, fmt.Errorf( - "cannot: include processes, clusters last recovery was 10.00 seconds ago, wait until the last recovery was 300 seconds ago", + "cannot: include processes, clusters last recovery was 10.00 seconds ago, waiting until the last recovery was 300 seconds ago", ), ), Entry("cluster's last recovery is 320 seconds ago", @@ -2447,7 +2448,7 @@ var _ = Describe("status_checks", func() { }, true, fmt.Errorf( - "clusters last recovery was 5.00 seconds ago, wait until the last recovery was 60 seconds ago", + "cannot: change configuration, clusters last recovery was 5.00 seconds ago, waiting until the last recovery was 60 seconds ago", ), ), Entry( @@ -2596,3 +2597,320 @@ var _ = Describe("status_checks", func() { ), ) }) + +var _ = Describe("CanSafelyChangeCoordinators", func() { + var cluster *fdbv1beta2.FoundationDBCluster + var minimumUptimeForMissing, minimumUptimeForUndesired time.Duration + + BeforeEach(func() { + cluster = &fdbv1beta2.FoundationDBCluster{ + Spec: fdbv1beta2.FoundationDBClusterSpec{ + Version: fdbv1beta2.Versions.SupportsRecoveryState.String(), + DatabaseConfiguration: fdbv1beta2.DatabaseConfiguration{ + RedundancyMode: fdbv1beta2.RedundancyModeDouble, + }, + }, + Status: fdbv1beta2.FoundationDBClusterStatus{ + Configured: true, + }, + } + minimumUptimeForMissing = 5 * time.Minute + minimumUptimeForUndesired = 10 * time.Minute + }) + + Context("with basic safety checks", func() { + DescribeTable("when cluster status doesn't pass basic safety checks", + func(status *fdbv1beta2.FoundationDBStatus, expectedError string) { + err := CanSafelyChangeCoordinators( + logr.Discard(), + cluster, + status, + minimumUptimeForMissing, + minimumUptimeForUndesired, + ) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring(expectedError)) + }, + Entry("when cluster is unavailable", + &fdbv1beta2.FoundationDBStatus{ + Client: fdbv1beta2.FoundationDBStatusLocalClientInfo{ + DatabaseStatus: fdbv1beta2.FoundationDBStatusClientDBStatus{ + Available: false, + }, + }, + Cluster: fdbv1beta2.FoundationDBStatusClusterInfo{ + RecoveryState: fdbv1beta2.RecoveryState{ + ActiveGenerations: 1, + }, + }, + }, + "cluster is unavailable", + ), + Entry("when too many active generations", + &fdbv1beta2.FoundationDBStatus{ + Client: fdbv1beta2.FoundationDBStatusLocalClientInfo{ + DatabaseStatus: fdbv1beta2.FoundationDBStatusClientDBStatus{ + Available: true, + }, + }, + Cluster: fdbv1beta2.FoundationDBStatusClusterInfo{ + RecoveryState: fdbv1beta2.RecoveryState{ + ActiveGenerations: 15, + }, + }, + }, + "active generations", + ), + ) + }) + + Context("with uptime requirements for different coordinator scenarios", func() { + var baseStatus *fdbv1beta2.FoundationDBStatus + + BeforeEach(func() { + baseStatus = &fdbv1beta2.FoundationDBStatus{ + Client: fdbv1beta2.FoundationDBStatusLocalClientInfo{ + DatabaseStatus: fdbv1beta2.FoundationDBStatusClientDBStatus{ + Available: true, + }, + Coordinators: fdbv1beta2.FoundationDBStatusCoordinatorInfo{ + QuorumReachable: true, + Coordinators: []fdbv1beta2.FoundationDBStatusCoordinator{ + { + Reachable: true, + Address: fdbv1beta2.ProcessAddress{ + IPAddress: net.ParseIP("127.0.0.1"), + }, + }, + }, + }, + }, + Cluster: fdbv1beta2.FoundationDBStatusClusterInfo{ + RecoveryState: fdbv1beta2.RecoveryState{ + ActiveGenerations: 1, + SecondsSinceLastRecovered: 1000.0, // Well above both thresholds + }, + Data: fdbv1beta2.FoundationDBStatusDataStatistics{ + State: fdbv1beta2.FoundationDBStatusDataState{ + Healthy: true, + }, + TeamTrackers: []fdbv1beta2.FoundationDBStatusTeamTracker{ + { + Primary: true, + State: fdbv1beta2.FoundationDBStatusDataState{ + Healthy: true, + MinReplicasRemaining: 2, + }, + }, + }, + }, + Logs: []fdbv1beta2.FoundationDBStatusLogInfo{ + { + LogReplicationFactor: 2, + LogFaultTolerance: 1, + }, + }, + Processes: map[fdbv1beta2.ProcessGroupID]fdbv1beta2.FoundationDBStatusProcessInfo{ + "coord1": { + Address: fdbv1beta2.ProcessAddress{IPAddress: net.ParseIP("127.0.0.1")}, + Locality: map[string]string{ + fdbv1beta2.FDBLocalityInstanceIDKey: "coord1", + }, + Roles: []fdbv1beta2.FoundationDBStatusProcessRoleInfo{ + {Role: string(fdbv1beta2.ProcessRoleCoordinator)}, + }, + UptimeSeconds: 1000.0, + }, + }, + }, + } + }) + + It("should succeed when uptime requirements are met with healthy coordinators", func() { + err := CanSafelyChangeCoordinators( + logr.Discard(), + cluster, + baseStatus, + minimumUptimeForMissing, + minimumUptimeForUndesired, + ) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should fail when uptime is insufficient with excluded coordinators", func() { + // Exclude the coordinator + baseStatus.Cluster.Processes["coord1"] = fdbv1beta2.FoundationDBStatusProcessInfo{ + Address: fdbv1beta2.ProcessAddress{IPAddress: net.ParseIP("127.0.0.1")}, + Excluded: true, + Locality: map[string]string{ + fdbv1beta2.FDBLocalityInstanceIDKey: "coord1", + }, + Roles: []fdbv1beta2.FoundationDBStatusProcessRoleInfo{ + {Role: string(fdbv1beta2.ProcessRoleCoordinator)}, + }, + UptimeSeconds: 500.0, // Between minimumUptimeForMissing and minimumUptimeForUndesired + } + // Set cluster uptime to match process uptime + baseStatus.Cluster.RecoveryState.SecondsSinceLastRecovered = 500.0 + + err := CanSafelyChangeCoordinators( + logr.Discard(), + cluster, + baseStatus, + minimumUptimeForMissing, + minimumUptimeForUndesired, + ) + Expect(err).To(HaveOccurred()) + Expect( + err.Error(), + ).To(Equal("cannot: change coordinators: cluster is not up for long enough, clusters last recovery was 500.00 seconds ago, waiting until the last recovery was 600 seconds ago")) + }) + + It("should succeed when uptime meets requirements for excluded coordinators", func() { + // Exclude the coordinator + baseStatus.Cluster.Processes["coord1"] = fdbv1beta2.FoundationDBStatusProcessInfo{ + Address: fdbv1beta2.ProcessAddress{IPAddress: net.ParseIP("127.0.0.1")}, + Excluded: true, + Locality: map[string]string{ + fdbv1beta2.FDBLocalityInstanceIDKey: "coord1", + }, + Roles: []fdbv1beta2.FoundationDBStatusProcessRoleInfo{ + {Role: string(fdbv1beta2.ProcessRoleCoordinator)}, + }, + UptimeSeconds: 700.0, // Above minimumUptimeForUndesired + } + // Set cluster uptime to match process uptime + baseStatus.Cluster.RecoveryState.SecondsSinceLastRecovered = 700.0 + + err := CanSafelyChangeCoordinators( + logr.Discard(), + cluster, + baseStatus, + minimumUptimeForMissing, + minimumUptimeForUndesired, + ) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should succeed when uptime meets lower requirements for missing coordinators", func() { + // Remove the coordinator from processes (simulate missing coordinator) + delete(baseStatus.Cluster.Processes, "coord1") + // Set cluster uptime between the two thresholds + baseStatus.Cluster.RecoveryState.SecondsSinceLastRecovered = 400.0 + + err := CanSafelyChangeCoordinators( + logr.Discard(), + cluster, + baseStatus, + minimumUptimeForMissing, + minimumUptimeForUndesired, + ) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should fail when uptime is insufficient even for missing coordinators", func() { + // Remove the coordinator from processes (simulate missing coordinator) + delete(baseStatus.Cluster.Processes, "coord1") + // Set cluster uptime below minimumUptimeForMissing + baseStatus.Cluster.RecoveryState.SecondsSinceLastRecovered = 200.0 + + err := CanSafelyChangeCoordinators( + logr.Discard(), + cluster, + baseStatus, + minimumUptimeForMissing, + minimumUptimeForUndesired, + ) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("missing coordinators")) + Expect( + err.Error(), + ).To(ContainSubstring("waiting until the last recovery was 300 seconds ago")) + }) + }) + + Context("with multiple coordinator scenarios", func() { + It( + "should prioritize missing coordinators over excluded ones for uptime requirements", + func() { + status := &fdbv1beta2.FoundationDBStatus{ + Client: fdbv1beta2.FoundationDBStatusLocalClientInfo{ + DatabaseStatus: fdbv1beta2.FoundationDBStatusClientDBStatus{ + Available: true, + }, + Coordinators: fdbv1beta2.FoundationDBStatusCoordinatorInfo{ + QuorumReachable: true, + Coordinators: []fdbv1beta2.FoundationDBStatusCoordinator{ + { + Reachable: true, + Address: fdbv1beta2.ProcessAddress{ + IPAddress: net.ParseIP("127.0.0.1"), + }, + }, + { + Reachable: true, + Address: fdbv1beta2.ProcessAddress{ + IPAddress: net.ParseIP("127.0.0.2"), + }, + }, + }, + }, + }, + Cluster: fdbv1beta2.FoundationDBStatusClusterInfo{ + RecoveryState: fdbv1beta2.RecoveryState{ + ActiveGenerations: 1, + SecondsSinceLastRecovered: 400.0, // Between the two thresholds + }, + Data: fdbv1beta2.FoundationDBStatusDataStatistics{ + State: fdbv1beta2.FoundationDBStatusDataState{ + Healthy: true, + }, + TeamTrackers: []fdbv1beta2.FoundationDBStatusTeamTracker{ + { + Primary: true, + State: fdbv1beta2.FoundationDBStatusDataState{ + Healthy: true, + MinReplicasRemaining: 2, + }, + }, + }, + }, + Logs: []fdbv1beta2.FoundationDBStatusLogInfo{ + { + LogReplicationFactor: 2, + LogFaultTolerance: 1, + }, + }, + Processes: map[fdbv1beta2.ProcessGroupID]fdbv1beta2.FoundationDBStatusProcessInfo{ + "coord2": { + Address: fdbv1beta2.ProcessAddress{ + IPAddress: net.ParseIP("127.0.0.2"), + }, + Excluded: true, + Locality: map[string]string{ + fdbv1beta2.FDBLocalityInstanceIDKey: "coord2", + }, + Roles: []fdbv1beta2.FoundationDBStatusProcessRoleInfo{ + {Role: string(fdbv1beta2.ProcessRoleCoordinator)}, + }, + UptimeSeconds: 400.0, + }, + // coord1 is missing from processes - simulates down coordinator + }, + }, + } + + // Should succeed because missing coordinators (coord1) have lower requirements + // even though there are excluded coordinators (coord2) + err := CanSafelyChangeCoordinators( + logr.Discard(), + cluster, + status, + minimumUptimeForMissing, + minimumUptimeForUndesired, + ) + Expect(err).NotTo(HaveOccurred()) + }, + ) + }) +}) diff --git a/setup/setup.go b/setup/setup.go index 475c0f741..10b24e9b8 100644 --- a/setup/setup.go +++ b/setup/setup.go @@ -105,9 +105,12 @@ type Options struct { RenewDeadline time.Duration // RetryPeriod is the duration the LeaderElector clients should wait // between tries of actions. Default is 2 seconds. - RetryPeriod time.Duration - DeprecationOptions internal.DeprecationOptions - MinimumRequiredUptimeCCBounce time.Duration + RetryPeriod time.Duration + DeprecationOptions internal.DeprecationOptions + MinimumRequiredUptimeCCBounce time.Duration + MinimumUptimeForCoordinatorChangeWithMissingProcess time.Duration + MinimumUptimeForCoordinatorChangeWithUndesiredProcess time.Duration + MinimumUptimeForConfigurationChanges time.Duration } // BindFlags will parse the given flagset for the operator option flags @@ -329,6 +332,24 @@ func (o *Options) BindFlags(fs *flag.FlagSet) { math.MaxFloat64, "Defines the threshold when a process will be considered to have a high run loop busy value. The value will be between 0.0 and 1.0. Setting it to a higher value will disable the high run loop busy check.", ) + fs.DurationVar( + &o.MinimumUptimeForCoordinatorChangeWithUndesiredProcess, + "minimum-uptime-for-coordinator-change-with-undesired-process", + 5*time.Minute, + "the minimum uptime for the cluster before coordinator changes are allowed because a coordinator is undesired.", + ) + fs.DurationVar( + &o.MinimumUptimeForCoordinatorChangeWithMissingProcess, + "minimum-uptime-for-coordinator-change-with-missing-process", + 2*time.Minute, + "the minimum uptime for the cluster before coordinator changes are allowed because a coordinator is missing.", + ) + fs.DurationVar( + &o.MinimumUptimeForConfigurationChanges, + "minimum-uptime-for-configuration-changes", + 1*time.Minute, + "the minimum uptime for the cluster before configuration changes are allowed.", + ) } // StartManager will start the FoundationDB operator manager. @@ -468,7 +489,9 @@ func StartManager( clusterReconciler.Namespace = operatorOpts.WatchNamespace clusterReconciler.GlobalSynchronizationWaitDuration = operatorOpts.GlobalSynchronizationWaitDuration clusterReconciler.HighRunLoopBusyThreshold = operatorOpts.HighRunLoopBusyThreshold - + clusterReconciler.MinimumUptimeForCoordinatorChangeWithMissingProcess = operatorOpts.MinimumUptimeForCoordinatorChangeWithMissingProcess + clusterReconciler.MinimumUptimeForCoordinatorChangeWithUndesiredProcess = operatorOpts.MinimumUptimeForCoordinatorChangeWithUndesiredProcess + clusterReconciler.MinimumUptimeForConfigurationChanges = operatorOpts.MinimumUptimeForConfigurationChanges // If the provided PodLifecycleManager supports the update method, we can set the desired update method, otherwise the // update method will be ignored. castedPodManager, ok := clusterReconciler.PodLifecycleManager.(podmanager.PodLifecycleManagerWithPodUpdateMethod)