Skip to content

Commit 2b8b64f

Browse files
authored
improve: filter only own updates for read-after-write-consistency (#3414)
Reworks how the framework filters watch events caused by the controller's own writes. The previous "expect this single RV" approach couldn't handle concurrent writes, foreign updates between two of our writes, mid-window deletes, or relist gaps. It is replaced with a per-resource event-filter window driven by an explicit state machine. Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent 2a3dff4 commit 2b8b64f

41 files changed

Lines changed: 3710 additions & 712 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

docs/content/en/blog/news/read-after-write-consistency.md

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,11 @@ From this point the idea of the algorithm is very simple:
179179
2. When the informer propagates an event, check if its resource version is greater than or equal to
180180
the one in the TRC. If yes, evict the resource from the TRC.
181181
3. When the controller reads a resource from cache, it checks the TRC first, then falls back to the Informer's cache.
182+
183+
The actual filtering of events for our own writes is more nuanced than a simple
184+
"evict on RV ≥ TRC version" rule — it is driven by a per-resource state machine
185+
that tracks in-flight writes and the events received around them. See
186+
[Filtering events for our own updates](#filtering-events-for-our-own-updates) below.
182187

183188

184189
```mermaid
@@ -221,13 +226,38 @@ sequenceDiagram
221226
When we update a resource, eventually the informer will propagate an event that would trigger a reconciliation.
222227
However, this is mostly not desired. Since we already have the up-to-date resource at that point,
223228
we would like to be notified only if the resource is changed after our change.
224-
Therefore, in addition to caching the resource, we also filter out events that contain a resource
225-
version older than or equal to our cached resource version.
226-
227-
Note that the implementation of this is relatively complex, since while performing the update we want to record all the
228-
events received in the meantime and decide whether to propagate them further once the update request is complete.
229229

230-
However, this way we significantly reduce the number of reconciliations, making the whole process much more efficient.
230+
The framework runs a per-resource *event filter window* around each in-flight
231+
write: it records the resource version returned by our update, buffers any
232+
related events that arrive in the meantime, and at the end of the window
233+
decides what (if anything) to surface to the reconciler. The rules:
234+
235+
- **Pure own echo**: if the only events in the window are watch events whose
236+
resource versions match our recorded own writes (and the action is `UPDATED`),
237+
they are filtered out — the reconciler isn't bothered.
238+
- **Foreign change in the window**: if a resource version arrived that was *not*
239+
one of our own writes — e.g. a third party modified the resource between two
240+
of our updates — the framework synthesizes a single `UPDATED` event covering
241+
the whole window (`previousResource` = the resource just before the window,
242+
`resource` = the latest known state). The reconciler is notified once, with a
243+
faithful before/after picture, instead of receiving each underlying watch
244+
event individually.
245+
- **DELETE in the middle**: if the resource was deleted at some point during
246+
the window, that DELETE participates in the synthesis. A trailing `DELETED`
247+
is surfaced verbatim; a DELETE-then-recreate inside the window collapses to
248+
an `UPDATED` from the deleted state to the recreated state.
249+
- **Held foreign events**: a foreign event that arrives *before* the matching
250+
own write echo is buffered until the write completes. This avoids
251+
surfacing it as foreign only to immediately overwrite it with a synthesized
252+
echo.
253+
- **ReList**: events arriving while the informer is performing a relist are
254+
tagged. Because a relist may have hidden events, the framework defaults to
255+
surfacing such events to the reconciler rather than silently filtering
256+
them — even when they would otherwise look like our own echoes.
257+
258+
This way we significantly reduce the number of reconciliations, making the whole
259+
process much more efficient, while preserving the invariant that any
260+
foreign change reaches the reconciler.
231261

232262
### The case for instant reschedule
233263

docs/content/en/docs/documentation/reconciler.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,23 @@ supports stronger guarantees, both for primary and secondary resources. If this
175175
that resource again. This feature also makes sure that the reconciliation is not triggered from the event from our
176176
writes.
177177

178+
The filter is implemented as a per-resource *event filter window* that opens
179+
when an update starts and closes when it completes. Inside the window:
180+
- Pure own echoes (watch events whose resource version matches one of our
181+
recorded own writes) are dropped.
182+
- Foreign events received during the window are merged with the surrounding
183+
own writes into a single synthesized `UPDATED` event so the reconciler
184+
gets a faithful before/after picture rather than each individual watch
185+
event. These events are carefully crafted so they correspond to a real-life scenario,
186+
and remain fully usable by filters.
187+
- A `DELETED` arriving in the window is propagated; a delete-then-recreate
188+
inside the window collapses into one synthesized `UPDATED` from the
189+
deleted state to the recreated state.
190+
- During an informer relist the filter degrades to "surface what we see":
191+
events received while a relist is in progress are propagated even when
192+
they would otherwise look like own echoes, since the relist may have
193+
hidden events.
194+
178195

179196
In order to benefit from these stronger guarantees, use [`ResourceOperations`](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java)
180197
from the context of the reconciliation:

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@
3131
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
3232
import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter;
3333
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
34+
import io.javaoperatorsdk.operator.processing.event.source.informer.ExtendedResourceEvent;
3435
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
35-
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;
3636

3737
import static io.javaoperatorsdk.operator.ReconcilerUtilsInternal.handleKubernetesClientException;
3838
import static io.javaoperatorsdk.operator.processing.event.source.controller.InternalEventFilters.*;
@@ -141,11 +141,22 @@ private void handleOnAddOrUpdate(
141141
ResourceAction action, T oldCustomResource, T newCustomResource) {
142142
var handling =
143143
temporaryResourceCache.onAddOrUpdateEvent(action, newCustomResource, oldCustomResource);
144-
if (handling == EventHandling.NEW) {
145-
handleEvent(action, newCustomResource, oldCustomResource, null);
146-
} else if (log.isDebugEnabled()) {
147-
log.debug("{} event propagation for action: {}", handling, action);
148-
}
144+
handling.ifPresentOrElse(
145+
this::handleEvent,
146+
() -> {
147+
if (log.isDebugEnabled()) {
148+
log.debug("Skipping/deferring event propagation for action: {}", action);
149+
}
150+
});
151+
}
152+
153+
@SuppressWarnings("unchecked")
154+
private void handleEvent(ExtendedResourceEvent r) {
155+
handleEvent(
156+
r.getAction(),
157+
(T) r.getResource().orElseThrow(),
158+
(T) r.getPreviousResource().orElse(null),
159+
r.isLastStateUnknown());
149160
}
150161

151162
@Override
@@ -154,10 +165,10 @@ public synchronized void onDelete(T resource, boolean deletedFinalStateUnknown)
154165
resource,
155166
ResourceAction.DELETED,
156167
() -> {
157-
temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
168+
var res = temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
158169
// delete event is quite special here, that requires special care, since we clean up
159170
// caches on delete event.
160-
handleEvent(ResourceAction.DELETED, resource, null, deletedFinalStateUnknown);
171+
res.ifPresent(this::handleEvent);
161172
});
162173
}
163174

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java

Lines changed: 0 additions & 72 deletions
This file was deleted.
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Copyright Java Operator SDK Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.javaoperatorsdk.operator.processing.event.source.informer;
17+
18+
import java.util.HashMap;
19+
import java.util.Map;
20+
import java.util.Optional;
21+
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
26+
27+
class EventFilterSupport {
28+
29+
private static final Logger log = LoggerFactory.getLogger(EventFilterSupport.class);
30+
31+
private final Map<ResourceID, EventFilterWindow> eventFilterWindows = new HashMap<>();
32+
private boolean ongoingReList = false;
33+
34+
public synchronized void startEventFilteringModify(ResourceID resourceID) {
35+
var existing = eventFilterWindows.get(resourceID);
36+
var ed =
37+
eventFilterWindows.computeIfAbsent(resourceID, id -> new EventFilterWindow(ongoingReList));
38+
ed.increaseActiveUpdates();
39+
log.debug(
40+
"startEventFilteringModify: id={}, windowReused={}, ongoingReList={}",
41+
resourceID,
42+
existing != null,
43+
ongoingReList);
44+
}
45+
46+
public synchronized Optional<ExtendedResourceEvent> doneEventFilterModify(ResourceID resourceID) {
47+
var ed = eventFilterWindows.get(resourceID);
48+
if (ed == null) {
49+
log.debug("doneEventFilterModify: no window for id={}", resourceID);
50+
return Optional.empty();
51+
}
52+
ed.decreaseActiveUpdates();
53+
log.debug("doneEventFilterModify: id={}", resourceID);
54+
return check(ed, resourceID);
55+
}
56+
57+
public synchronized Optional<ExtendedResourceEvent> processEvent(
58+
ResourceID resourceId, ExtendedResourceEvent extendedResourceEvent) {
59+
var ed = eventFilterWindows.get(resourceId);
60+
if (ed != null) {
61+
log.debug(
62+
"processEvent: buffering event in window. id={}, action={}, rv={}",
63+
resourceId,
64+
extendedResourceEvent.getAction(),
65+
extendedResourceEvent
66+
.getResource()
67+
.map(r -> r.getMetadata().getResourceVersion())
68+
.orElse("?"));
69+
ed.addRelatedEvent(extendedResourceEvent);
70+
return check(ed, resourceId);
71+
} else {
72+
log.debug(
73+
"processEvent: no active window, surfacing directly. id={}, action={}",
74+
resourceId,
75+
extendedResourceEvent.getAction());
76+
return Optional.of(extendedResourceEvent);
77+
}
78+
}
79+
80+
private Optional<ExtendedResourceEvent> check(
81+
EventFilterWindow eventFilterWindow, ResourceID resourceID) {
82+
var res = eventFilterWindow.check();
83+
if (eventFilterWindow.canBeRemoved()) {
84+
log.debug("Removing empty event filter window. id={}", resourceID);
85+
eventFilterWindows.remove(resourceID);
86+
}
87+
return res;
88+
}
89+
90+
public synchronized void addToOwnResourceVersions(ResourceID resourceId, String resourceVersion) {
91+
var window = eventFilterWindows.get(resourceId);
92+
if (window != null) {
93+
log.debug("Recording own resourceVersion. id={}, rv={}", resourceId, resourceVersion);
94+
window.addToOwnUpdateVersions(resourceVersion);
95+
} else {
96+
log.debug(
97+
"addToOwnResourceVersions: no active window for id={}, rv={} (skipped)",
98+
resourceId,
99+
resourceVersion);
100+
}
101+
}
102+
103+
public synchronized void handleGhostResourceRemoval(ResourceID resourceId) {
104+
log.debug("Ghost resource removal: discarding event filter window. id={}", resourceId);
105+
eventFilterWindows.remove(resourceId);
106+
}
107+
108+
// for testing purposes
109+
synchronized Map<ResourceID, EventFilterWindow> getEventFilterWindows() {
110+
return eventFilterWindows;
111+
}
112+
113+
public synchronized void setStartingReList() {
114+
log.debug("ReList starting: tagging {} active window(s)", eventFilterWindows.size());
115+
ongoingReList = true;
116+
eventFilterWindows.values().forEach(EventFilterWindow::setReListStarted);
117+
}
118+
119+
public synchronized void setRelistFinished() {
120+
log.debug("ReList finished: clearing tag from {} active window(s)", eventFilterWindows.size());
121+
ongoingReList = false;
122+
eventFilterWindows.values().forEach(EventFilterWindow::setReListFinished);
123+
}
124+
125+
public synchronized boolean isActiveUpdateFor(ResourceID resourceId) {
126+
return eventFilterWindows.containsKey(resourceId);
127+
}
128+
}

0 commit comments

Comments
 (0)