Skip to content

Commit 313199c

Browse files
authored
Merge pull request #12877 from Nordix/lentzi90/ssa-cache-after-apply
🌱 Add items to cache immediately after apply
2 parents 87d9768 + 5d3943e commit 313199c

File tree

4 files changed

+104
-33
lines changed

4 files changed

+104
-33
lines changed

internal/controllers/topology/cluster/structuredmerge/dryrun.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func dryRunSSAPatch(ctx context.Context, dryRunCtx *dryRunSSAPatchInput) (bool,
5454
// The identifier consists of: gvk, namespace, name and resourceVersion of originalUnstructured
5555
// and a hash of modifiedUnstructured.
5656
// This ensures that we re-run the request as soon as either original or modified changes.
57-
requestIdentifier, err := ssa.ComputeRequestIdentifier(dryRunCtx.client.Scheme(), dryRunCtx.originalUnstructured, dryRunCtx.modifiedUnstructured)
57+
requestIdentifier, err := ssa.ComputeRequestIdentifier(dryRunCtx.client.Scheme(), dryRunCtx.originalUnstructured.GetResourceVersion(), dryRunCtx.modifiedUnstructured)
5858
if err != nil {
5959
return false, false, nil, err
6060
}

internal/util/ssa/cache.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -104,18 +104,18 @@ func (r *ssaCache) Has(key, kind string) bool {
104104
// ComputeRequestIdentifier computes a request identifier for the cache.
105105
// The identifier is unique for a specific request to ensure we don't have to re-run the request
106106
// once we found out that it would not produce a diff.
107-
// The identifier consists of: gvk, namespace, name and resourceVersion of the original object and a hash of the modified
108-
// object. This ensures that we re-run the request as soon as either original or modified changes.
109-
func ComputeRequestIdentifier(scheme *runtime.Scheme, original, modified client.Object) (string, error) {
110-
modifiedObjectHash, err := hash.Compute(modified)
107+
// The identifier consists of: gvk, namespace, name and resourceVersion of the object and a hash of the modified
108+
// object. This ensures that we re-run the request as soon as anything changes.
109+
func ComputeRequestIdentifier(scheme *runtime.Scheme, resourceVersion string, obj client.Object) (string, error) {
110+
objHash, err := hash.Compute(obj)
111111
if err != nil {
112-
return "", errors.Wrapf(err, "failed to calculate request identifier: failed to compute hash for modified object")
112+
return "", errors.Wrapf(err, "failed to calculate request identifier: failed to compute hash for object")
113113
}
114114

115-
gvk, err := apiutil.GVKForObject(original, scheme)
115+
gvk, err := apiutil.GVKForObject(obj, scheme)
116116
if err != nil {
117-
return "", errors.Wrapf(err, "failed to calculate request identifier: failed to get GroupVersionKind of original object %s", klog.KObj(original))
117+
return "", errors.Wrapf(err, "failed to calculate request identifier: failed to get GroupVersionKind of object %s", klog.KObj(obj))
118118
}
119119

120-
return fmt.Sprintf("%s.%s.%s.%d", gvk.String(), klog.KObj(original), original.GetResourceVersion(), modifiedObjectHash), nil
120+
return fmt.Sprintf("%s.%s.%s.%d", gvk.String(), klog.KObj(obj), resourceVersion, objHash), nil
121121
}

internal/util/ssa/patch.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ func Patch(ctx context.Context, c client.Client, fieldManager string, modified c
8484
if err != nil {
8585
return err
8686
}
87+
modifiedUnstructuredBeforeApply := modifiedUnstructured.DeepCopy()
8788

8889
gvk, err := apiutil.GVKForObject(modifiedUnstructured, c.Scheme())
8990
if err != nil {
@@ -93,7 +94,7 @@ func Patch(ctx context.Context, c client.Client, fieldManager string, modified c
9394
var requestIdentifier string
9495
if options.WithCachingProxy {
9596
// Check if the request is cached.
96-
requestIdentifier, err = ComputeRequestIdentifier(c.Scheme(), options.Original, modifiedUnstructured)
97+
requestIdentifier, err = ComputeRequestIdentifier(c.Scheme(), options.Original.GetResourceVersion(), modifiedUnstructured)
9798
if err != nil {
9899
return errors.Wrapf(err, "failed to apply object")
99100
}
@@ -132,10 +133,16 @@ func Patch(ctx context.Context, c client.Client, fieldManager string, modified c
132133

133134
// Add the request to the cache only if dry-run was not used.
134135
if options.WithCachingProxy && !options.WithDryRun {
135-
// If the SSA call did not update the object, add the request to the cache.
136-
if options.Original.GetResourceVersion() == modifiedUnstructured.GetResourceVersion() {
137-
options.Cache.Add(requestIdentifier)
136+
// If the object changed, we need to recompute the request identifier before caching.
137+
if options.Original.GetResourceVersion() != modifiedUnstructured.GetResourceVersion() {
138+
// NOTE: This uses the resourceVersion from modifiedUnstructured (after apply), and the hash from
139+
// modifiedUnstructuredBeforeApply (what we wanted to apply), which is what we want.
140+
requestIdentifier, err = ComputeRequestIdentifier(c.Scheme(), modifiedUnstructured.GetResourceVersion(), modifiedUnstructuredBeforeApply)
141+
if err != nil {
142+
return errors.Wrapf(err, "failed to compute request identifier after apply")
143+
}
138144
}
145+
options.Cache.Add(requestIdentifier)
139146
}
140147

141148
return nil

internal/util/ssa/patch_test.go

Lines changed: 84 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,33 @@ limitations under the License.
1717
package ssa
1818

1919
import (
20+
"context"
2021
"testing"
2122
"time"
2223

2324
. "github.com/onsi/gomega"
2425
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2526
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
27+
"k8s.io/apimachinery/pkg/runtime"
28+
"k8s.io/apimachinery/pkg/watch"
2629
"k8s.io/utils/ptr"
2730
"sigs.k8s.io/controller-runtime/pkg/client"
31+
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"
2832

2933
clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
3034
"sigs.k8s.io/cluster-api/util/test/builder"
3135
)
3236

37+
// clientWithWatch wraps a client.Client and adds a Watch method to satisfy client.WithWatch interface.
38+
type clientWithWatch struct {
39+
client.Client
40+
}
41+
42+
func (c *clientWithWatch) Watch(_ context.Context, _ client.ObjectList, _ ...client.ListOption) (watch.Interface, error) {
43+
// This is not used in the tests, but required to satisfy the client.WithWatch interface
44+
panic("Watch not implemented")
45+
}
46+
3347
func TestPatch(t *testing.T) {
3448
g := NewWithT(t)
3549

@@ -48,26 +62,49 @@ func TestPatch(t *testing.T) {
4862
fieldManager := "test-manager"
4963
ssaCache := NewCache("test-controller")
5064

65+
// Wrap the client with an interceptor to count API calls
66+
var applyCallCount int
67+
countingClient := interceptor.NewClient(&clientWithWatch{Client: env.GetClient()}, interceptor.Funcs{
68+
Apply: func(ctx context.Context, c client.WithWatch, obj runtime.ApplyConfiguration, opts ...client.ApplyOption) error {
69+
applyCallCount++
70+
return c.Apply(ctx, obj, opts...)
71+
},
72+
})
73+
5174
// 1. Create the object
5275
createObject := initialObject.DeepCopy()
53-
g.Expect(Patch(ctx, env.GetClient(), fieldManager, createObject)).To(Succeed())
76+
g.Expect(Patch(ctx, countingClient, fieldManager, createObject)).To(Succeed())
77+
g.Expect(applyCallCount).To(Equal(1), "Expected 1 API call for create")
5478

55-
// 2. Update the object and verify that the request was not cached as the object was changed.
79+
// 2. Update the object and verify that the request was not cached with the old identifier,
80+
// but is cached with a new identifier (after apply).
5681
// Get the original object.
5782
originalObject := initialObject.DeepCopy()
5883
g.Expect(env.GetAPIReader().Get(ctx, client.ObjectKeyFromObject(originalObject), originalObject)).To(Succeed())
5984
// Modify the object
6085
modifiedObject := initialObject.DeepCopy()
6186
g.Expect(unstructured.SetNestedField(modifiedObject.Object, "baz", "spec", "foo")).To(Succeed())
62-
// Compute request identifier, so we can later verify that the update call was not cached.
87+
// Compute request identifier before the update, so we can later verify that the update call was not cached with this identifier.
6388
modifiedUnstructured, err := prepareModified(env.Scheme(), modifiedObject)
6489
g.Expect(err).ToNot(HaveOccurred())
65-
requestIdentifier, err := ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured)
90+
oldRequestIdentifier, err := ComputeRequestIdentifier(env.GetScheme(), originalObject.GetResourceVersion(), modifiedUnstructured)
6691
g.Expect(err).ToNot(HaveOccurred())
92+
// Save a copy of modifiedUnstructured before apply to compute the new identifier later
93+
modifiedUnstructuredBeforeApply := modifiedUnstructured.DeepCopy()
6794
// Update the object
68-
g.Expect(Patch(ctx, env.GetClient(), fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
69-
// Verify that request was not cached (as it changed the object)
70-
g.Expect(ssaCache.Has(requestIdentifier, initialObject.GetKind())).To(BeFalse())
95+
applyCallCount = 0
96+
g.Expect(Patch(ctx, countingClient, fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
97+
g.Expect(applyCallCount).To(Equal(1), "Expected 1 API call for first update (object changed)")
98+
// Verify that request was not cached with the old identifier (as it changed the object)
99+
g.Expect(ssaCache.Has(oldRequestIdentifier, initialObject.GetKind())).To(BeFalse())
100+
// Get the actual object from server after apply to compute the new request identifier
101+
objectAfterApply := initialObject.DeepCopy()
102+
g.Expect(env.GetAPIReader().Get(ctx, client.ObjectKeyFromObject(objectAfterApply), objectAfterApply)).To(Succeed())
103+
// Compute the new request identifier (after apply)
104+
newRequestIdentifier, err := ComputeRequestIdentifier(env.GetScheme(), objectAfterApply.GetResourceVersion(), modifiedUnstructuredBeforeApply)
105+
g.Expect(err).ToNot(HaveOccurred())
106+
// Verify that request was cached with the new identifier (after apply)
107+
g.Expect(ssaCache.Has(newRequestIdentifier, initialObject.GetKind())).To(BeTrue())
71108

72109
// 3. Repeat the same update and verify that the request was cached as the object was not changed.
73110
// Get the original object.
@@ -79,12 +116,14 @@ func TestPatch(t *testing.T) {
79116
// Compute request identifier, so we can later verify that the update call was cached.
80117
modifiedUnstructured, err = prepareModified(env.Scheme(), modifiedObject)
81118
g.Expect(err).ToNot(HaveOccurred())
82-
requestIdentifier, err = ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured)
119+
requestIdentifierNoOp, err := ComputeRequestIdentifier(env.GetScheme(), originalObject.GetResourceVersion(), modifiedUnstructured)
83120
g.Expect(err).ToNot(HaveOccurred())
84121
// Update the object
85-
g.Expect(Patch(ctx, env.GetClient(), fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
122+
applyCallCount = 0
123+
g.Expect(Patch(ctx, countingClient, fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
124+
g.Expect(applyCallCount).To(Equal(0), "Expected 0 API calls for repeat update (should hit cache)")
86125
// Verify that request was cached (as it did not change the object)
87-
g.Expect(ssaCache.Has(requestIdentifier, initialObject.GetKind())).To(BeTrue())
126+
g.Expect(ssaCache.Has(requestIdentifierNoOp, initialObject.GetKind())).To(BeTrue())
88127
})
89128

90129
t.Run("Test patch with Machine", func(*testing.T) {
@@ -123,30 +162,53 @@ func TestPatch(t *testing.T) {
123162
fieldManager := "test-manager"
124163
ssaCache := NewCache("test-controller")
125164

165+
// Wrap the client with an interceptor to count API calls
166+
var applyCallCount int
167+
countingClient := interceptor.NewClient(&clientWithWatch{Client: env.GetClient()}, interceptor.Funcs{
168+
Apply: func(ctx context.Context, c client.WithWatch, obj runtime.ApplyConfiguration, opts ...client.ApplyOption) error {
169+
applyCallCount++
170+
return c.Apply(ctx, obj, opts...)
171+
},
172+
})
173+
126174
// 1. Create the object
127175
createObject := initialObject.DeepCopy()
128-
g.Expect(Patch(ctx, env.GetClient(), fieldManager, createObject)).To(Succeed())
176+
g.Expect(Patch(ctx, countingClient, fieldManager, createObject)).To(Succeed())
177+
g.Expect(applyCallCount).To(Equal(1), "Expected 1 API call for create")
129178
// Verify that gvk is still set
130179
g.Expect(createObject.GroupVersionKind()).To(Equal(initialObject.GroupVersionKind()))
131180

132-
// 2. Update the object and verify that the request was not cached as the object was changed.
181+
// 2. Update the object and verify that the request was not cached with the old identifier,
182+
// but is cached with a new identifier (after apply).
133183
// Get the original object.
134184
originalObject := initialObject.DeepCopy()
135185
g.Expect(env.GetAPIReader().Get(ctx, client.ObjectKeyFromObject(originalObject), originalObject)).To(Succeed())
136186
// Modify the object
137187
modifiedObject := initialObject.DeepCopy()
138188
modifiedObject.Spec.Deletion.NodeDrainTimeoutSeconds = ptr.To(int32(5))
139-
// Compute request identifier, so we can later verify that the update call was not cached.
189+
// Compute request identifier before the update, so we can later verify that the update call was not cached with this identifier.
140190
modifiedUnstructured, err := prepareModified(env.Scheme(), modifiedObject)
141191
g.Expect(err).ToNot(HaveOccurred())
142-
requestIdentifier, err := ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured)
192+
oldRequestIdentifier, err := ComputeRequestIdentifier(env.GetScheme(), originalObject.GetResourceVersion(), modifiedUnstructured)
143193
g.Expect(err).ToNot(HaveOccurred())
194+
// Save a copy of modifiedUnstructured before apply to compute the new identifier later
195+
modifiedUnstructuredBeforeApply := modifiedUnstructured.DeepCopy()
144196
// Update the object
145-
g.Expect(Patch(ctx, env.GetClient(), fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
197+
applyCallCount = 0
198+
g.Expect(Patch(ctx, countingClient, fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
199+
g.Expect(applyCallCount).To(Equal(1), "Expected 1 API call for first update (object changed)")
146200
// Verify that gvk is still set
147201
g.Expect(modifiedObject.GroupVersionKind()).To(Equal(initialObject.GroupVersionKind()))
148-
// Verify that request was not cached (as it changed the object)
149-
g.Expect(ssaCache.Has(requestIdentifier, initialObject.GetObjectKind().GroupVersionKind().Kind)).To(BeFalse())
202+
// Verify that request was not cached with the old identifier (as it changed the object)
203+
g.Expect(ssaCache.Has(oldRequestIdentifier, initialObject.GetObjectKind().GroupVersionKind().Kind)).To(BeFalse())
204+
// Get the actual object from server after apply to compute the new request identifier
205+
objectAfterApply := initialObject.DeepCopy()
206+
g.Expect(env.GetAPIReader().Get(ctx, client.ObjectKeyFromObject(objectAfterApply), objectAfterApply)).To(Succeed())
207+
// Compute the new request identifier (after apply)
208+
newRequestIdentifier, err := ComputeRequestIdentifier(env.GetScheme(), objectAfterApply.GetResourceVersion(), modifiedUnstructuredBeforeApply)
209+
g.Expect(err).ToNot(HaveOccurred())
210+
// Verify that request was cached with the new identifier (after apply)
211+
g.Expect(ssaCache.Has(newRequestIdentifier, initialObject.GetObjectKind().GroupVersionKind().Kind)).To(BeTrue())
150212

151213
// Wait for 1 second. We are also trying to verify in this test that the resourceVersion of the Machine
152214
// is not increased. Under some circumstances this would only happen if the timestamp in managedFields would
@@ -166,12 +228,14 @@ func TestPatch(t *testing.T) {
166228
// Compute request identifier, so we can later verify that the update call was cached.
167229
modifiedUnstructured, err = prepareModified(env.Scheme(), modifiedObject)
168230
g.Expect(err).ToNot(HaveOccurred())
169-
requestIdentifier, err = ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured)
231+
requestIdentifierNoOp, err := ComputeRequestIdentifier(env.GetScheme(), originalObject.GetResourceVersion(), modifiedUnstructured)
170232
g.Expect(err).ToNot(HaveOccurred())
171233
// Update the object
172-
g.Expect(Patch(ctx, env.GetClient(), fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
234+
applyCallCount = 0
235+
g.Expect(Patch(ctx, countingClient, fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
236+
g.Expect(applyCallCount).To(Equal(0), "Expected 0 API calls for repeat update (should hit cache)")
173237
// Verify that request was cached (as it did not change the object)
174-
g.Expect(ssaCache.Has(requestIdentifier, initialObject.GetObjectKind().GroupVersionKind().Kind)).To(BeTrue())
238+
g.Expect(ssaCache.Has(requestIdentifierNoOp, initialObject.GetObjectKind().GroupVersionKind().Kind)).To(BeTrue())
175239
// Verify that gvk is still set
176240
g.Expect(modifiedObject.GroupVersionKind()).To(Equal(initialObject.GroupVersionKind()))
177241
})

0 commit comments

Comments
 (0)