diff --git a/.changelog/22954.txt b/.changelog/22954.txt new file mode 100644 index 000000000000..33b48027951d --- /dev/null +++ b/.changelog/22954.txt @@ -0,0 +1,3 @@ +```release-note:bug +acl: fixed a bug where ACL policy replication in WANfed is impacted when primaryDC is inconsistent +``` \ No newline at end of file diff --git a/agent/consul/acl_replication.go b/agent/consul/acl_replication.go index 79e4e5d7a7d8..027cbd5de73d 100644 --- a/agent/consul/acl_replication.go +++ b/agent/consul/acl_replication.go @@ -66,6 +66,11 @@ type aclTypeReplicator interface { // correction phase). FetchUpdated(srv *Server, updates []string) (int, error) + //ensureRemoteConsistent compares the updated items with the upsertable remotes + //returns consistent update check post stale batch read or other scenarios + //[]string of items missing from remote, []string of items updated remote, error if inconsistent + ensureRemoteConsistent(updates []string) ([]string, []string, error) + // LenPendingUpdates should be the size of the data retrieved in // FetchUpdated. LenPendingUpdates() int @@ -87,6 +92,7 @@ type aclTypeReplicator interface { } var errContainsRedactedData = errors.New("replication results contain redacted data") +var errContainsStaleData = errors.New("replication batch read for update items contain stale data") func (s *Server) fetchACLRolesBatch(roleIDs []string) (*structs.ACLRoleBatchResponse, error) { req := structs.ACLRoleBatchGetRequest{ @@ -436,11 +442,22 @@ func (s *Server) replicateACLType(ctx context.Context, logger hclog.Logger, tr a if len(res.LocalUpserts) > 0 { lenUpdated, err := tr.FetchUpdated(s, res.LocalUpserts) - if err == errContainsRedactedData { - return 0, false, fmt.Errorf("failed to retrieve unredacted %s - replication token in use does not grant acl:write", tr.PluralNoun()) - } else if err != nil { + if err != nil { + if err == errContainsRedactedData { + return 0, false, fmt.Errorf("failed to retrieve unredacted %s - replication token in use does not grant acl:write", tr.PluralNoun()) + } return 0, false, fmt.Errorf("failed to retrieve ACL %s updates: %v", tr.SingularNoun(), err) } + //if fetch updated gets stale inconsistent data then we should not proceed with applying + //the updates as that would lead to partial/stale data being replicated + //hence, we call ensureRemoteConsistent to validate the fetched updates with diff results + _, _, err = tr.ensureRemoteConsistent(res.LocalUpserts) + if err != nil { + if err == errContainsStaleData { + return 0, false, fmt.Errorf("failed to ensure consistent %s replication updates - stale data", tr.PluralNoun()) + } + return 0, false, fmt.Errorf("failed to ensure consistent ACL %s updates: %v", tr.SingularNoun(), err) + } logger.Debug( "acl replication - downloaded updates", "amount", lenUpdated, diff --git a/agent/consul/acl_replication_types.go b/agent/consul/acl_replication_types.go index e313019d3853..5c7d6ea17433 100644 --- a/agent/consul/acl_replication_types.go +++ b/agent/consul/acl_replication_types.go @@ -4,6 +4,7 @@ package consul import ( + "bytes" "context" "fmt" @@ -86,6 +87,11 @@ func (r *aclTokenReplicator) FetchUpdated(srv *Server, updates []string) (int, e return len(r.updated), nil } +func (r *aclTokenReplicator) ensureRemoteConsistent(updates []string) ([]string, []string, error) { + //return true if consistent updates, + return []string{}, []string{}, nil +} + func (r *aclTokenReplicator) DeleteLocalBatch(srv *Server, batch []string) error { req := structs.ACLTokenBatchDeleteRequest{ TokenIDs: batch, @@ -186,6 +192,45 @@ func (r *aclPolicyReplicator) FetchUpdated(srv *Server, updates []string) (int, return len(r.updated), nil } +func (r *aclPolicyReplicator) ensureRemoteConsistent(updates []string) ([]string, []string, error) { + updatedMap := make(map[string]*structs.ACLPolicy) + for _, policy := range r.updated { + updatedMap[policy.ID] = policy + } + + remoteMap := make(map[string]*structs.ACLPolicyListStub) + for _, policyStub := range r.remote { + remoteMap[policyStub.ID] = policyStub + } + + //iterate over all updates array which are policy IDs check if the hash match for both + var consistent = true + var remoteNotCreated []string + var remoteNotUpdated []string + var err error = nil + + for _, policyID := range updates { + if updatedPolicy, ok := updatedMap[policyID]; ok { + if remotePolicy, ok := remoteMap[policyID]; ok { + if !bytes.Equal(updatedPolicy.Hash, remotePolicy.Hash) && updatedPolicy.ModifyIndex < remotePolicy.ModifyIndex { + // remote stale batch did not get modified policy than what local diff calculated + remoteNotUpdated = append(remoteNotUpdated, policyID) + consistent = false + } + } + } else if remotePolicy, ok := remoteMap[policyID]; ok && remotePolicy.ModifyIndex == remotePolicy.CreateIndex { + // remote stale batch did not get the created policy than what local diff calculated + remoteNotCreated = append(remoteNotCreated, policyID) + consistent = false + } + } + + if !consistent { + err = errContainsStaleData + } + return remoteNotCreated, remoteNotUpdated, err +} + func (r *aclPolicyReplicator) DeleteLocalBatch(srv *Server, batch []string) error { req := structs.ACLPolicyBatchDeleteRequest{ PolicyIDs: batch, @@ -307,6 +352,11 @@ func (r *aclRoleReplicator) FetchUpdated(srv *Server, updates []string) (int, er return len(r.updated), nil } +func (r *aclRoleReplicator) ensureRemoteConsistent(updates []string) ([]string, []string, error) { + //return true if consistent updates, + return []string{}, []string{}, nil +} + func (r *aclRoleReplicator) DeleteLocalBatch(srv *Server, batch []string) error { req := structs.ACLRoleBatchDeleteRequest{ RoleIDs: batch,