From 6436088af56cfe8435e3daa85baf1d006581e687 Mon Sep 17 00:00:00 2001 From: rshoemaker Date: Fri, 20 Mar 2026 15:25:37 -0400 Subject: [PATCH] fix: preserve service resources in intermediate node add plan step When adding a node to a database that already has a running service, the intermediate plan step now clones the last cumulative state (which carries existing service resources) before merging in the node-only end state. Previously, diffing from the cumulative state to a bare node-only state scheduled all service resources for deletion, causing downtime and credential rotation. --- server/internal/database/operations/end.go | 19 +- ...ode_to_two-node_database_with_service.json | 261 ++++++++++++++++++ .../database/operations/update_database.go | 22 +- .../operations/update_database_test.go | 40 +++ 4 files changed, 338 insertions(+), 4 deletions(-) create mode 100644 server/internal/database/operations/golden_test/TestUpdateDatabase/add_third_node_to_two-node_database_with_service.json diff --git a/server/internal/database/operations/end.go b/server/internal/database/operations/end.go index 0f60ab35..360f0e99 100644 --- a/server/internal/database/operations/end.go +++ b/server/internal/database/operations/end.go @@ -8,9 +8,11 @@ import ( "github.com/pgEdge/control-plane/server/internal/resource" ) -// EndState computes the end state for the database, containing only the given -// nodes. -func EndState(nodes []*NodeResources, services []*ServiceResources) (*resource.State, error) { +// nodeEndState computes the end state containing only database node resources +// (instances, subscriptions, replication slots, etc.) without service resources. +// Used as an intermediate plan step to ensure all nodes are healthy before +// service resources (such as service user roles) are created. +func nodeEndState(nodes []*NodeResources) (*resource.State, error) { end := resource.NewState() for _, node := range nodes { var resources []resource.Resource @@ -65,6 +67,17 @@ func EndState(nodes []*NodeResources, services []*ServiceResources) (*resource.S } } + return end, nil +} + +// EndState computes the end state for the database, containing only the given +// nodes. +func EndState(nodes []*NodeResources, services []*ServiceResources) (*resource.State, error) { + end, err := nodeEndState(nodes) + if err != nil { + return nil, err + } + for _, svc := range services { state, err := svc.State() if err != nil { diff --git a/server/internal/database/operations/golden_test/TestUpdateDatabase/add_third_node_to_two-node_database_with_service.json b/server/internal/database/operations/golden_test/TestUpdateDatabase/add_third_node_to_two-node_database_with_service.json new file mode 100644 index 00000000..46e918bb --- /dev/null +++ b/server/internal/database/operations/golden_test/TestUpdateDatabase/add_third_node_to_two-node_database_with_service.json @@ -0,0 +1,261 @@ +[ + [ + [ + { + "type": "create", + "resource_id": "orchestrator.resource::n3-instance-1-dep-1-id", + "reason": "does_not_exist", + "diff": null + } + ], + [ + { + "type": "create", + "resource_id": "database.instance::n3-instance-1-id", + "reason": "does_not_exist", + "diff": null + } + ], + [ + { + "type": "create", + "resource_id": "database.node::n3", + "reason": "does_not_exist", + "diff": null + }, + { + "type": "create", + "resource_id": "monitor.instance::n3-instance-1-id", + "reason": "does_not_exist", + "diff": null + } + ] + ], + [ + [ + { + "type": "create", + "resource_id": "database.replication_slot::n1n3", + "reason": "does_not_exist", + "diff": null + }, + { + "type": "create", + "resource_id": "database.replication_slot::n2n3", + "reason": "does_not_exist", + "diff": null + }, + { + "type": "create", + "resource_id": "database.replication_slot_create::spk_test_n2_sub_n2_n3", + "reason": "does_not_exist", + "diff": null + } + ], + [ + { + "type": "create", + "resource_id": "database.subscription::n2n3", + "reason": "does_not_exist", + "diff": null + } + ], + [ + { + "type": "create", + "resource_id": "database.sync_event::n2n1", + "reason": "does_not_exist", + "diff": null + } + ], + [ + { + "type": "create", + "resource_id": "database.wait_for_sync_event::n2n1", + "reason": "does_not_exist", + "diff": null + } + ], + [ + { + "type": "create", + "resource_id": "database.subscription::n1n3", + "reason": "does_not_exist", + "diff": null + } + ], + [ + { + "type": "create", + "resource_id": "database.sync_event::n1n3", + "reason": "does_not_exist", + "diff": null + } + ], + [ + { + "type": "create", + "resource_id": "database.wait_for_sync_event::n1n3", + "reason": "does_not_exist", + "diff": null + } + ], + [ + { + "type": "create", + "resource_id": "database.lag_tracker_commit_ts::n2n3", + "reason": "does_not_exist", + "diff": null + } + ], + [ + { + "type": "create", + "resource_id": "database.replication_slot_advance_from_cts::n2n3", + "reason": "does_not_exist", + "diff": null + } + ] + ], + [ + [ + { + "type": "create", + "resource_id": "database.replication_slot::n3n1", + "reason": "does_not_exist", + "diff": null + }, + { + "type": "create", + "resource_id": "database.replication_slot::n3n2", + "reason": "does_not_exist", + "diff": null + } + ], + [ + { + "type": "update", + "resource_id": "database.subscription::n1n3", + "reason": "has_diff", + "diff": [ + { + "value": null, + "op": "replace", + "path": "/dependent_subscriptions" + }, + { + "value": false, + "op": "replace", + "path": "/sync_data" + }, + { + "value": false, + "op": "replace", + "path": "/sync_structure" + } + ] + }, + { + "type": "update", + "resource_id": "database.subscription::n2n3", + "reason": "has_diff", + "diff": [ + { + "value": false, + "op": "replace", + "path": "/disabled" + } + ] + }, + { + "type": "create", + "resource_id": "database.subscription::n3n1", + "reason": "does_not_exist", + "diff": null + }, + { + "type": "create", + "resource_id": "database.subscription::n3n2", + "reason": "does_not_exist", + "diff": null + } + ], + [ + { + "type": "update", + "resource_id": "database.sync_event::n1n3", + "reason": "dependency_updated", + "diff": null + } + ], + [ + { + "type": "update", + "resource_id": "database.wait_for_sync_event::n1n3", + "reason": "dependency_updated", + "diff": null + } + ], + [ + { + "type": "update", + "resource_id": "database.lag_tracker_commit_ts::n2n3", + "reason": "dependency_updated", + "diff": null + } + ], + [ + { + "type": "update", + "resource_id": "database.replication_slot_advance_from_cts::n2n3", + "reason": "dependency_updated", + "diff": null + } + ] + ], + [ + [ + { + "type": "delete", + "resource_id": "database.replication_slot_advance_from_cts::n2n3", + "diff": null + }, + { + "type": "delete", + "resource_id": "database.wait_for_sync_event::n2n1", + "diff": null + } + ], + [ + { + "type": "delete", + "resource_id": "database.lag_tracker_commit_ts::n2n3", + "diff": null + }, + { + "type": "delete", + "resource_id": "database.sync_event::n2n1", + "diff": null + } + ], + [ + { + "type": "delete", + "resource_id": "database.replication_slot_create::spk_test_n2_sub_n2_n3", + "diff": null + }, + { + "type": "delete", + "resource_id": "database.wait_for_sync_event::n1n3", + "diff": null + } + ], + [ + { + "type": "delete", + "resource_id": "database.sync_event::n1n3", + "diff": null + } + ] + ] +] \ No newline at end of file diff --git a/server/internal/database/operations/update_database.go b/server/internal/database/operations/update_database.go index d6596e57..c89dd95a 100644 --- a/server/internal/database/operations/update_database.go +++ b/server/internal/database/operations/update_database.go @@ -133,7 +133,27 @@ func UpdateDatabase( states[i] = curr prev = curr } - states = append(states, end) + if len(adds) > 0 && len(services) > 0 { + // Emit a node-only intermediate state before the full end state. This + // ensures all database nodes are fully provisioned before new service + // resources (e.g. per-node service user roles) are created. + // + // We build the intermediate state by cloning the last cumulative state + // (prev) — which carries any already-provisioned service resources — + // and then merging in the node-only end state. This overwrites node + // resources with their final desired values while leaving existing + // service resources untouched, preventing spurious delete/recreate + // cycles for services that were already running. + nodeEnd, err := nodeEndState(nodes) + if err != nil { + return nil, err + } + nodeEndWithServices := prev.Clone() + nodeEndWithServices.Merge(nodeEnd) + states = append(states, nodeEndWithServices, end) + } else { + states = append(states, end) + } plans, err := start.PlanAll(options.PlanOptions, states...) if err != nil { diff --git a/server/internal/database/operations/update_database_test.go b/server/internal/database/operations/update_database_test.go index 9651f271..b6a5066f 100644 --- a/server/internal/database/operations/update_database_test.go +++ b/server/internal/database/operations/update_database_test.go @@ -172,6 +172,31 @@ func TestUpdateDatabase(t *testing.T) { ), ) + twoNodeWithServiceState := makeState(t, + []resource.Resource{ + n1Instance1.Instance, + makeMonitorResource(n1Instance1), + &database.NodeResource{ + Name: "n1", + PrimaryInstanceID: n1Instance1.InstanceID(), + InstanceIDs: []string{n1Instance1.InstanceID()}, + }, + n2Instance1.Instance, + makeMonitorResource(n2Instance1), + &database.NodeResource{ + Name: "n2", + PrimaryInstanceID: n2Instance1.InstanceID(), + InstanceIDs: []string{n2Instance1.InstanceID()}, + }, + &database.ReplicationSlotResource{ProviderNode: "n2", SubscriberNode: "n1"}, + &database.SubscriptionResource{SubscriberNode: "n1", ProviderNode: "n2"}, + &database.ReplicationSlotResource{ProviderNode: "n1", SubscriberNode: "n2"}, + &database.SubscriptionResource{SubscriberNode: "n2", ProviderNode: "n1"}, + svcRes.MonitorResource, + }, + slices.Concat(n1Instance1.Resources, n2Instance1.Resources, svcRes.Resources), + ) + for _, tc := range []struct { name string options operations.UpdateDatabaseOptions @@ -528,6 +553,21 @@ func TestUpdateDatabase(t *testing.T) { }, services: []*operations.ServiceResources{svcRes}, }, + { + // Regression test: adding a 3rd node to a 2-node database that already + // has a running service must not delete/recreate the service instance. + // The intermediate NodeEndState step must preserve existing service + // resources so they are not cycled through delete+recreate. + name: "add third node to two-node database with service", + options: operations.UpdateDatabaseOptions{}, + start: twoNodeWithServiceState, + nodes: []*operations.NodeResources{ + {NodeName: "n1", InstanceResources: []*database.InstanceResources{n1Instance1}}, + {NodeName: "n2", InstanceResources: []*database.InstanceResources{n2Instance1}}, + {NodeName: "n3", SourceNode: "n1", InstanceResources: []*database.InstanceResources{n3Instance1}}, + }, + services: []*operations.ServiceResources{svcRes}, + }, } { t.Run(tc.name, func(t *testing.T) { plans, err := operations.UpdateDatabase(