Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/22954.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
acl: fixed a bug where ACL policy replication in WANfed is impacted when primaryDC is inconsistent
```
23 changes: 20 additions & 3 deletions agent/consul/acl_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ type aclTypeReplicator interface {
// correction phase).
FetchUpdated(srv *Server, updates []string) (int, error)

//ensureRemoteConsistent compares the updated items with the upsertable remotes
//returns consistent update check post stale batch read or other scenarios
//[]string of items missing from remote, []string of items updated remote, error if inconsistent
ensureRemoteConsistent(updates []string) ([]string, []string, error)

// LenPendingUpdates should be the size of the data retrieved in
// FetchUpdated.
LenPendingUpdates() int
Expand All @@ -87,6 +92,7 @@ type aclTypeReplicator interface {
}

var errContainsRedactedData = errors.New("replication results contain redacted data")
var errContainsStaleData = errors.New("replication batch read for update items contain stale data")

func (s *Server) fetchACLRolesBatch(roleIDs []string) (*structs.ACLRoleBatchResponse, error) {
req := structs.ACLRoleBatchGetRequest{
Expand Down Expand Up @@ -436,11 +442,22 @@ func (s *Server) replicateACLType(ctx context.Context, logger hclog.Logger, tr a

if len(res.LocalUpserts) > 0 {
lenUpdated, err := tr.FetchUpdated(s, res.LocalUpserts)
if err == errContainsRedactedData {
return 0, false, fmt.Errorf("failed to retrieve unredacted %s - replication token in use does not grant acl:write", tr.PluralNoun())
} else if err != nil {
if err != nil {
if err == errContainsRedactedData {
return 0, false, fmt.Errorf("failed to retrieve unredacted %s - replication token in use does not grant acl:write", tr.PluralNoun())
}
return 0, false, fmt.Errorf("failed to retrieve ACL %s updates: %v", tr.SingularNoun(), err)
}
//if fetch updated gets stale inconsistent data then we should not proceed with applying
//the updates as that would lead to partial/stale data being replicated
//hence, we call ensureRemoteConsistent to validate the fetched updates with diff results
_, _, err = tr.ensureRemoteConsistent(res.LocalUpserts)
if err != nil {
if err == errContainsStaleData {
return 0, false, fmt.Errorf("failed to ensure consistent %s replication updates - stale data", tr.PluralNoun())
}
return 0, false, fmt.Errorf("failed to ensure consistent ACL %s updates: %v", tr.SingularNoun(), err)
}
logger.Debug(
"acl replication - downloaded updates",
"amount", lenUpdated,
Expand Down
50 changes: 50 additions & 0 deletions agent/consul/acl_replication_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package consul

import (
"bytes"
"context"
"fmt"

Expand Down Expand Up @@ -86,6 +87,11 @@ func (r *aclTokenReplicator) FetchUpdated(srv *Server, updates []string) (int, e
return len(r.updated), nil
}

func (r *aclTokenReplicator) ensureRemoteConsistent(updates []string) ([]string, []string, error) {
//return true if consistent updates,
return []string{}, []string{}, nil
}

func (r *aclTokenReplicator) DeleteLocalBatch(srv *Server, batch []string) error {
req := structs.ACLTokenBatchDeleteRequest{
TokenIDs: batch,
Expand Down Expand Up @@ -186,6 +192,45 @@ func (r *aclPolicyReplicator) FetchUpdated(srv *Server, updates []string) (int,
return len(r.updated), nil
}

func (r *aclPolicyReplicator) ensureRemoteConsistent(updates []string) ([]string, []string, error) {
updatedMap := make(map[string]*structs.ACLPolicy)
for _, policy := range r.updated {
updatedMap[policy.ID] = policy
}

remoteMap := make(map[string]*structs.ACLPolicyListStub)
for _, policyStub := range r.remote {
remoteMap[policyStub.ID] = policyStub
}

//iterate over all updates array which are policy IDs check if the hash match for both
var consistent = true
var remoteNotCreated []string
var remoteNotUpdated []string
var err error = nil

for _, policyID := range updates {
if updatedPolicy, ok := updatedMap[policyID]; ok {
if remotePolicy, ok := remoteMap[policyID]; ok {
if !bytes.Equal(updatedPolicy.Hash, remotePolicy.Hash) && updatedPolicy.ModifyIndex < remotePolicy.ModifyIndex {
// remote stale batch did not get modified policy than what local diff calculated
remoteNotUpdated = append(remoteNotUpdated, policyID)
consistent = false
}
}
} else if remotePolicy, ok := remoteMap[policyID]; ok && remotePolicy.ModifyIndex == remotePolicy.CreateIndex {
// remote stale batch did not get the created policy than what local diff calculated
remoteNotCreated = append(remoteNotCreated, policyID)
consistent = false
}
}

if !consistent {
err = errContainsStaleData
}
return remoteNotCreated, remoteNotUpdated, err
}

func (r *aclPolicyReplicator) DeleteLocalBatch(srv *Server, batch []string) error {
req := structs.ACLPolicyBatchDeleteRequest{
PolicyIDs: batch,
Expand Down Expand Up @@ -307,6 +352,11 @@ func (r *aclRoleReplicator) FetchUpdated(srv *Server, updates []string) (int, er
return len(r.updated), nil
}

func (r *aclRoleReplicator) ensureRemoteConsistent(updates []string) ([]string, []string, error) {
//return true if consistent updates,
return []string{}, []string{}, nil
}

func (r *aclRoleReplicator) DeleteLocalBatch(srv *Server, batch []string) error {
req := structs.ACLRoleBatchDeleteRequest{
RoleIDs: batch,
Expand Down
Loading