Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions server/internal/database/operations/end.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"github.com/pgEdge/control-plane/server/internal/resource"
)

// EndState computes the end state for the database, containing only the given
// nodes.
func EndState(nodes []*NodeResources, services []*ServiceResources) (*resource.State, error) {
// nodeEndState computes the end state containing only database node resources
// (instances, subscriptions, replication slots, etc.) without service resources.
// Used as an intermediate plan step to ensure all nodes are healthy before
// service resources (such as service user roles) are created.
func nodeEndState(nodes []*NodeResources) (*resource.State, error) {
end := resource.NewState()
for _, node := range nodes {
var resources []resource.Resource
Expand Down Expand Up @@ -65,6 +67,17 @@ func EndState(nodes []*NodeResources, services []*ServiceResources) (*resource.S
}
}

return end, nil
}

// EndState computes the end state for the database, containing only the given
// nodes.
func EndState(nodes []*NodeResources, services []*ServiceResources) (*resource.State, error) {
end, err := nodeEndState(nodes)
if err != nil {
return nil, err
}

for _, svc := range services {
state, err := svc.State()
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
[
[
[
{
"type": "create",
"resource_id": "orchestrator.resource::n3-instance-1-dep-1-id",
"reason": "does_not_exist",
"diff": null
}
],
[
{
"type": "create",
"resource_id": "database.instance::n3-instance-1-id",
"reason": "does_not_exist",
"diff": null
}
],
[
{
"type": "create",
"resource_id": "database.node::n3",
"reason": "does_not_exist",
"diff": null
},
{
"type": "create",
"resource_id": "monitor.instance::n3-instance-1-id",
"reason": "does_not_exist",
"diff": null
}
]
],
[
[
{
"type": "create",
"resource_id": "database.replication_slot::n1n3",
"reason": "does_not_exist",
"diff": null
},
{
"type": "create",
"resource_id": "database.replication_slot::n2n3",
"reason": "does_not_exist",
"diff": null
},
{
"type": "create",
"resource_id": "database.replication_slot_create::spk_test_n2_sub_n2_n3",
"reason": "does_not_exist",
"diff": null
}
],
[
{
"type": "create",
"resource_id": "database.subscription::n2n3",
"reason": "does_not_exist",
"diff": null
}
],
[
{
"type": "create",
"resource_id": "database.sync_event::n2n1",
"reason": "does_not_exist",
"diff": null
}
],
[
{
"type": "create",
"resource_id": "database.wait_for_sync_event::n2n1",
"reason": "does_not_exist",
"diff": null
}
],
[
{
"type": "create",
"resource_id": "database.subscription::n1n3",
"reason": "does_not_exist",
"diff": null
}
],
[
{
"type": "create",
"resource_id": "database.sync_event::n1n3",
"reason": "does_not_exist",
"diff": null
}
],
[
{
"type": "create",
"resource_id": "database.wait_for_sync_event::n1n3",
"reason": "does_not_exist",
"diff": null
}
],
[
{
"type": "create",
"resource_id": "database.lag_tracker_commit_ts::n2n3",
"reason": "does_not_exist",
"diff": null
}
],
[
{
"type": "create",
"resource_id": "database.replication_slot_advance_from_cts::n2n3",
"reason": "does_not_exist",
"diff": null
}
]
],
[
[
{
"type": "create",
"resource_id": "database.replication_slot::n3n1",
"reason": "does_not_exist",
"diff": null
},
{
"type": "create",
"resource_id": "database.replication_slot::n3n2",
"reason": "does_not_exist",
"diff": null
}
],
[
{
"type": "update",
"resource_id": "database.subscription::n1n3",
"reason": "has_diff",
"diff": [
{
"value": null,
"op": "replace",
"path": "/dependent_subscriptions"
},
{
"value": false,
"op": "replace",
"path": "/sync_data"
},
{
"value": false,
"op": "replace",
"path": "/sync_structure"
}
]
},
{
"type": "update",
"resource_id": "database.subscription::n2n3",
"reason": "has_diff",
"diff": [
{
"value": false,
"op": "replace",
"path": "/disabled"
}
]
},
{
"type": "create",
"resource_id": "database.subscription::n3n1",
"reason": "does_not_exist",
"diff": null
},
{
"type": "create",
"resource_id": "database.subscription::n3n2",
"reason": "does_not_exist",
"diff": null
}
],
[
{
"type": "update",
"resource_id": "database.sync_event::n1n3",
"reason": "dependency_updated",
"diff": null
}
],
[
{
"type": "update",
"resource_id": "database.wait_for_sync_event::n1n3",
"reason": "dependency_updated",
"diff": null
}
],
[
{
"type": "update",
"resource_id": "database.lag_tracker_commit_ts::n2n3",
"reason": "dependency_updated",
"diff": null
}
],
[
{
"type": "update",
"resource_id": "database.replication_slot_advance_from_cts::n2n3",
"reason": "dependency_updated",
"diff": null
}
]
],
[
[
{
"type": "delete",
"resource_id": "database.replication_slot_advance_from_cts::n2n3",
"diff": null
},
{
"type": "delete",
"resource_id": "database.wait_for_sync_event::n2n1",
"diff": null
}
],
[
{
"type": "delete",
"resource_id": "database.lag_tracker_commit_ts::n2n3",
"diff": null
},
{
"type": "delete",
"resource_id": "database.sync_event::n2n1",
"diff": null
}
],
[
{
"type": "delete",
"resource_id": "database.replication_slot_create::spk_test_n2_sub_n2_n3",
"diff": null
},
{
"type": "delete",
"resource_id": "database.wait_for_sync_event::n1n3",
"diff": null
}
],
[
{
"type": "delete",
"resource_id": "database.sync_event::n1n3",
"diff": null
}
]
]
]
22 changes: 21 additions & 1 deletion server/internal/database/operations/update_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,27 @@ func UpdateDatabase(
states[i] = curr
prev = curr
}
states = append(states, end)
if len(adds) > 0 && len(services) > 0 {
// Emit a node-only intermediate state before the full end state. This
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like it could be fixed by just listing the node resource in the dependencies for the service user role. Does that not work for some reason?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is about preventing a service deletion that shouldn't happen. I think the deletions were being caused because the plan was transitioning from a state that had a service resource to an intermediate state that didn't and so it was scheduled for deletion, which I think is a consequence of requiring all db nodes to be up and healthy before any service resources are created. The old code was creating a nodes-only state with no services and then adding services in later and it triggered a deletion in the plan. It looks like there's no dependency logic involved in state.planDeletes - so it feels like simply adding a dep on the service user role wouldn't fix the issue. What I'm trying to do here is modify the intermediate state by cloning the services in so they aren't scheduled for deletion.

// ensures all database nodes are fully provisioned before new service
// resources (e.g. per-node service user roles) are created.
//
// We build the intermediate state by cloning the last cumulative state
// (prev) — which carries any already-provisioned service resources —
// and then merging in the node-only end state. This overwrites node
// resources with their final desired values while leaving existing
// service resources untouched, preventing spurious delete/recreate
// cycles for services that were already running.
nodeEnd, err := nodeEndState(nodes)
if err != nil {
return nil, err
}
nodeEndWithServices := prev.Clone()
nodeEndWithServices.Merge(nodeEnd)
states = append(states, nodeEndWithServices, end)
} else {
states = append(states, end)
}

plans, err := start.PlanAll(options.PlanOptions, states...)
if err != nil {
Expand Down
40 changes: 40 additions & 0 deletions server/internal/database/operations/update_database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,31 @@ func TestUpdateDatabase(t *testing.T) {
),
)

twoNodeWithServiceState := makeState(t,
[]resource.Resource{
n1Instance1.Instance,
makeMonitorResource(n1Instance1),
&database.NodeResource{
Name: "n1",
PrimaryInstanceID: n1Instance1.InstanceID(),
InstanceIDs: []string{n1Instance1.InstanceID()},
},
n2Instance1.Instance,
makeMonitorResource(n2Instance1),
&database.NodeResource{
Name: "n2",
PrimaryInstanceID: n2Instance1.InstanceID(),
InstanceIDs: []string{n2Instance1.InstanceID()},
},
&database.ReplicationSlotResource{ProviderNode: "n2", SubscriberNode: "n1"},
&database.SubscriptionResource{SubscriberNode: "n1", ProviderNode: "n2"},
&database.ReplicationSlotResource{ProviderNode: "n1", SubscriberNode: "n2"},
&database.SubscriptionResource{SubscriberNode: "n2", ProviderNode: "n1"},
svcRes.MonitorResource,
},
slices.Concat(n1Instance1.Resources, n2Instance1.Resources, svcRes.Resources),
)

for _, tc := range []struct {
name string
options operations.UpdateDatabaseOptions
Expand Down Expand Up @@ -528,6 +553,21 @@ func TestUpdateDatabase(t *testing.T) {
},
services: []*operations.ServiceResources{svcRes},
},
{
// Regression test: adding a 3rd node to a 2-node database that already
// has a running service must not delete/recreate the service instance.
// The intermediate NodeEndState step must preserve existing service
// resources so they are not cycled through delete+recreate.
name: "add third node to two-node database with service",
options: operations.UpdateDatabaseOptions{},
start: twoNodeWithServiceState,
nodes: []*operations.NodeResources{
{NodeName: "n1", InstanceResources: []*database.InstanceResources{n1Instance1}},
{NodeName: "n2", InstanceResources: []*database.InstanceResources{n2Instance1}},
{NodeName: "n3", SourceNode: "n1", InstanceResources: []*database.InstanceResources{n3Instance1}},
},
services: []*operations.ServiceResources{svcRes},
},
} {
t.Run(tc.name, func(t *testing.T) {
plans, err := operations.UpdateDatabase(
Expand Down