Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions controllers/change_coordinators.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/FoundationDB/fdb-kubernetes-operator/v2/internal/coordinator"
"github.com/FoundationDB/fdb-kubernetes-operator/v2/internal/locality"
"github.com/FoundationDB/fdb-kubernetes-operator/v2/pkg/fdbstatus"
"github.com/go-logr/logr"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -94,6 +95,20 @@ func (c changeCoordinators) reconcile(
return nil
}

// Perform safety checks before changing coordinators. The minimum uptime should reduce the coordinator changes
// if a process is down for a short amount of time, e.g. after a cluster wide bounce.
err = fdbstatus.CanSafelyChangeCoordinators(
logger,
cluster,
status,
r.MinimumUptimeForCoordinatorChangeWithMissingProcess,
r.MinimumUptimeForCoordinatorChangeWithUndesiredProcess,
)
if err != nil {
logger.Info("Deferring coordinator change due to safety check", "error", err.Error())
return &requeue{curError: err, delayedRequeue: true}
}

err = r.takeLock(logger, cluster, "changing coordinators")
if err != nil {
return &requeue{curError: err, delayedRequeue: true}
Expand Down
181 changes: 176 additions & 5 deletions controllers/change_coordinators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package controllers
import (
"context"
"math"
"time"

fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/v2/api/v1beta2"
"github.com/FoundationDB/fdb-kubernetes-operator/v2/internal"
Expand All @@ -49,10 +50,6 @@ var _ = Describe("Change coordinators", func() {
},
}
Expect(setupClusterForTest(cluster)).NotTo(HaveOccurred())

var err error
_, err = mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())
})

Describe("reconcile", func() {
Expand All @@ -69,7 +66,7 @@ var _ = Describe("Change coordinators", func() {
clusterReconciler,
cluster,
nil,
globalControllerLogger,
testLogger,
)
})

Expand Down Expand Up @@ -163,5 +160,179 @@ var _ = Describe("Change coordinators", func() {
},
)
})

When("safety checks are enabled", func() {
BeforeEach(func() {
clusterReconciler.MinimumUptimeForCoordinatorChangeWithUndesiredProcess = 5 * time.Minute
clusterReconciler.MinimumUptimeForCoordinatorChangeWithMissingProcess = 10 * time.Minute
clusterReconciler.EnableRecoveryState = true
})

When("one coordinator is undesired", func() {
BeforeEach(func() {
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())

status, err := adminClient.GetStatus()
Expect(err).NotTo(HaveOccurred())

coordinators := map[string]fdbv1beta2.None{}
for _, coordinator := range status.Client.Coordinators.Coordinators {
coordinators[coordinator.Address.String()] = fdbv1beta2.None{}
}

for _, process := range status.Cluster.Processes {
if _, ok := coordinators[process.Address.String()]; !ok {
continue
}
Expect(adminClient.ExcludeProcesses([]fdbv1beta2.ProcessAddress{
{
IPAddress: process.Address.IPAddress,
},
})).To(Succeed())
break
}
})

When("the cluster is up for long enough", func() {
It("should change the coordinators", func() {
Expect(requeue).To(BeNil())
Expect(
cluster.Status.ConnectionString,
).NotTo(Equal(originalConnectionString))
})
})

When("Too many active generations are present", func() {
BeforeEach(func() {
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())
adminClient.ActiveGenerations = ptr.To(11)
})

AfterEach(func() {
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())
adminClient.ActiveGenerations = nil
})

It("should defer coordinator change and requeue with delay", func() {
Expect(requeue).NotTo(BeNil())
Expect(requeue.delayedRequeue).To(BeTrue())
Expect(requeue.curError).To(HaveOccurred())
Expect(
requeue.curError.Error(),
).To(ContainSubstring("cluster has 11 active generations, but only 10 active generations are allowed to safely change coordinators"))
Expect(cluster.Status.ConnectionString).To(Equal(originalConnectionString))
})
})

When("the cluster is only up for 10 seconds", func() {
BeforeEach(func() {
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())
adminClient.SecondsSinceLastRecovered = ptr.To(10.0)
})

AfterEach(func() {
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())
adminClient.SecondsSinceLastRecovered = nil
})

It("should defer coordinator change and requeue with delay", func() {
Expect(requeue).NotTo(BeNil())
Expect(requeue.delayedRequeue).To(BeTrue())
Expect(requeue.curError).To(HaveOccurred())
Expect(
requeue.curError.Error(),
).To(Equal("cannot: change coordinators: cluster is not up for long enough, clusters last recovery was 10.00 seconds ago, waiting until the last recovery was 300 seconds ago"))
Expect(cluster.Status.ConnectionString).To(Equal(originalConnectionString))
})
})
})

When("one coordinator is missing", func() {
BeforeEach(func() {
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())

status, err := adminClient.GetStatus()
Expect(err).NotTo(HaveOccurred())

coordinators := map[string]fdbv1beta2.None{}
for _, coordinator := range status.Client.Coordinators.Coordinators {
coordinators[coordinator.Address.String()] = fdbv1beta2.None{}
}

for _, process := range status.Cluster.Processes {
if _, ok := coordinators[process.Address.String()]; !ok {
continue
}
adminClient.MockMissingProcessGroup(
fdbv1beta2.ProcessGroupID(
process.Locality[fdbv1beta2.FDBLocalityInstanceIDKey],
),
true,
)
break
}
})

When("the cluster is up for long enough", func() {
It("should change the coordinators", func() {
Expect(requeue).To(BeNil())
Expect(
cluster.Status.ConnectionString,
).NotTo(Equal(originalConnectionString))
})
})

When("Multiple active generations are present", func() {
BeforeEach(func() {
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())
adminClient.ActiveGenerations = ptr.To(11)
})

AfterEach(func() {
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())
adminClient.ActiveGenerations = nil
})

It("should change the coordinators", func() {
Expect(requeue).To(BeNil())
Expect(
cluster.Status.ConnectionString,
).NotTo(Equal(originalConnectionString))
})
})

When("the cluster is only up for 10 seconds", func() {
BeforeEach(func() {
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())
adminClient.SecondsSinceLastRecovered = ptr.To(10.0)
})

AfterEach(func() {
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())
adminClient.SecondsSinceLastRecovered = nil
})

It("should defer coordinator change and requeue with delay", func() {
Expect(requeue).NotTo(BeNil())
Expect(requeue.delayedRequeue).To(BeTrue())
Expect(requeue.curError).To(HaveOccurred())
Expect(
requeue.curError.Error(),
).To(Equal("cannot: change coordinators: cluster has 1 missing coordinators, clusters last recovery was 10.00 seconds ago, waiting until the last recovery was 600 seconds ago"))
Expect(cluster.Status.ConnectionString).To(Equal(originalConnectionString))
})
})
})
})
})
})
10 changes: 10 additions & 0 deletions controllers/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ type FoundationDBClusterReconciler struct {
// wait time will increase the chances that all updates are part of the list but will also delay the rollout of
// the change.
GlobalSynchronizationWaitDuration time.Duration
// MinimumUptimeForCoordinatorChangeWithMissingProcess defines the minimum uptime of the cluster before coordinator
// changes because of a missing coordinator are allowed.
MinimumUptimeForCoordinatorChangeWithMissingProcess time.Duration
// MinimumUptimeForCoordinatorChangeWithUndesiredProcess defines the minimum uptime of the cluster before coordinator
// changes because of an undesired coordinator are allowed.
MinimumUptimeForCoordinatorChangeWithUndesiredProcess time.Duration
// MinimumUptimeForConfigurationChanges defines the minimum uptime for the cluster before configuration changes
// are allowed.
MinimumUptimeForConfigurationChanges time.Duration

// MinimumRecoveryTimeForInclusion defines the duration in seconds that a cluster must be up
// before new inclusions are allowed. The operator issuing frequent inclusions in a short time window
// could cause instability for the cluster as each inclusion will/can cause a recovery. Delaying the inclusion
Expand Down
13 changes: 7 additions & 6 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,12 @@ func createTestClusterReconciler() *FoundationDBClusterReconciler {
SimulateZones: true,
SimulateTime: true,
},
PodLifecycleManager: &podmanager.StandardPodLifecycleManager{},
PodClientProvider: mockpodclient.NewMockFdbPodClient,
DatabaseClientProvider: mock.DatabaseClientProvider{},
MaintenanceListStaleDuration: 4 * time.Hour,
MaintenanceListWaitDuration: 5 * time.Minute,
HighRunLoopBusyThreshold: 1.0,
PodLifecycleManager: &podmanager.StandardPodLifecycleManager{},
PodClientProvider: mockpodclient.NewMockFdbPodClient,
DatabaseClientProvider: mock.DatabaseClientProvider{},
MaintenanceListStaleDuration: 4 * time.Hour,
MaintenanceListWaitDuration: 5 * time.Minute,
HighRunLoopBusyThreshold: 1.0,
MinimumUptimeForConfigurationChanges: 1 * time.Minute,
}
}
3 changes: 2 additions & 1 deletion controllers/update_database_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ func (u updateDatabaseConfiguration) reconcile(
return &requeue{curError: err, delayedRequeue: true}
}

err = fdbstatus.ConfigurationChangeAllowed(
err = fdbstatus.ConfigurationChangeAllowedWithMinimumUptime(
status,
r.MinimumUptimeForConfigurationChanges,
runningVersion.SupportsRecoveryState() && r.EnableRecoveryState,
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion controllers/update_database_configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ var _ = Describe("update_database_configuration", func() {
Expect(requeue).NotTo(BeNil())
Expect(
requeue.message,
).To(Equal("Configuration change is not safe: clusters last recovery was 0.10 seconds ago, wait until the last recovery was 60 seconds ago, will retry"))
).To(Equal("Configuration change is not safe: cannot: change configuration, clusters last recovery was 0.10 seconds ago, waiting until the last recovery was 60 seconds ago, will retry"))
Expect(cluster.Status.Configured).To(BeTrue())

adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expand Down
3 changes: 3 additions & 0 deletions e2e/fixtures/fdb_operator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,9 @@ spec:
- --cluster-label-key-for-node-trigger=foundationdb.org/fdb-cluster-name
- --enable-node-index
- --replace-on-security-context-change
- --minimum-uptime-for-coordinator-change-with-undesired-process=20s
- --minimum-uptime-for-coordinator-change-with-missing-process=10s
- --minimum-uptime-for-configuration-changes=5s
`
)

Expand Down
6 changes: 4 additions & 2 deletions pkg/fdbadminclient/mock/admin_client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,13 @@ type AdminClient struct {
localityInfo map[fdbv1beta2.ProcessGroupID]map[string]string
MaxZoneFailuresWithoutLosingData *int
MaxZoneFailuresWithoutLosingAvailability *int
ActiveGenerations *int
MaintenanceZone fdbv1beta2.FaultDomain
restoreURL string
maintenanceZoneStartTimestamp time.Time
MockAdditionTimeForGlobalCoordination time.Time
uptimeSecondsForMaintenanceZone float64
SecondsSinceLastRecovered *float64
TeamTracker []fdbv1beta2.FoundationDBStatusTeamTracker
Logs []fdbv1beta2.FoundationDBStatusLogInfo
mockError error
Expand Down Expand Up @@ -616,8 +618,8 @@ func (client *AdminClient) GetStatus() (*fdbv1beta2.FoundationDBStatus, error) {

status.Cluster.RecoveryState = fdbv1beta2.RecoveryState{
Name: "fully_recovered",
SecondsSinceLastRecovered: 600.0,
ActiveGenerations: 1,
SecondsSinceLastRecovered: ptr.Deref(client.SecondsSinceLastRecovered, 600.0),
ActiveGenerations: ptr.Deref(client.ActiveGenerations, 1),
}

return status, nil
Expand Down
Loading