Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
9ae44e3
improve: filter only own updates for read-after-write-conistency
csviri Jun 7, 2026
67a7799
wip
csviri Jun 8, 2026
da29019
wip
csviri Jun 8, 2026
b881622
wip
csviri Jun 8, 2026
3fa7f44
Event filtering with recording
csviri Jun 8, 2026
e0999cd
test fix
csviri Jun 8, 2026
5926b5f
Simplified EventHandling
csviri Jun 9, 2026
5755ddf
unit tests fix
csviri Jun 9, 2026
eec9ead
small fix, test repeats
csviri Jun 9, 2026
7d655ab
improvements and releated unit tests
csviri Jun 9, 2026
78d37d3
cleanup
csviri Jun 9, 2026
ab31610
improve: filter only own updates for read-after-write-conistency with…
csviri Jun 9, 2026
c104c43
improvements on edge cases
csviri Jun 9, 2026
01461a8
Potential fix for pull request finding
csviri Jun 10, 2026
0a02991
delete related improvements and unit tests
csviri Jun 10, 2026
61a2bef
delete handling improvements and test improvements
csviri Jun 10, 2026
d81c4b9
wip
csviri Jun 10, 2026
8897e99
tests
csviri Jun 10, 2026
b0d303d
test fix
csviri Jun 10, 2026
a4ccc1d
fix typo
csviri Jun 10, 2026
9b8182b
Potential fix for pull request finding
csviri Jun 10, 2026
fca8bf5
fixes
csviri Jun 10, 2026
f5ce1c3
improve: filter only own updates for read-after-write-conistency with…
csviri Jun 9, 2026
df93f1a
test fixes
csviri Jun 10, 2026
b6d6e82
logging and improvements
csviri Jun 10, 2026
d3e6992
test AI identified cases
csviri Jun 10, 2026
028863f
fix: only filter own events
csviri Jun 11, 2026
73d867c
wip
csviri Jun 11, 2026
34d776d
improvements and test fixes
csviri Jun 11, 2026
67c39f9
improvements
csviri Jun 11, 2026
24801d2
fix resource cache read
csviri Jun 11, 2026
c303353
support for re-list
csviri Jun 12, 2026
f779318
simple algorithm, refined tests
csviri Jun 12, 2026
57d3a90
naming fix
csviri Jun 12, 2026
a509cf0
small fixes
csviri Jun 12, 2026
2c103d4
cleanup
csviri Jun 12, 2026
1ac63fc
cleanup
csviri Jun 12, 2026
2330ec0
additional tests
csviri Jun 12, 2026
5d1b0a8
addiotinal tests and docs improvements
csviri Jun 13, 2026
ef10e12
wip
csviri Jun 14, 2026
ec72a28
wip
csviri Jun 14, 2026
f1079f5
prepare for re-list
csviri Jun 14, 2026
52f1de1
wip
csviri Jun 14, 2026
4ef6e6e
wip
csviri Jun 14, 2026
309fb9c
Potential fix for pull request finding
csviri Jun 14, 2026
e7ad144
wip
csviri Jun 14, 2026
55f530a
fix filtering
csviri Jun 14, 2026
662328a
logging
csviri Jun 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 36 additions & 6 deletions docs/content/en/blog/news/read-after-write-consistency.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ From this point the idea of the algorithm is very simple:
2. When the informer propagates an event, check if its resource version is greater than or equal to
the one in the TRC. If yes, evict the resource from the TRC.
3. When the controller reads a resource from cache, it checks the TRC first, then falls back to the Informer's cache.

The actual filtering of events for our own writes is more nuanced than a simple
"evict on RV ≥ TRC version" rule — it is driven by a per-resource state machine
that tracks in-flight writes and the events received around them. See
[Filtering events for our own updates](#filtering-events-for-our-own-updates) below.


```mermaid
Expand Down Expand Up @@ -221,13 +226,38 @@ sequenceDiagram
When we update a resource, eventually the informer will propagate an event that would trigger a reconciliation.
However, this is mostly not desired. Since we already have the up-to-date resource at that point,
we would like to be notified only if the resource is changed after our change.
Therefore, in addition to caching the resource, we also filter out events that contain a resource
version older than or equal to our cached resource version.

Note that the implementation of this is relatively complex, since while performing the update we want to record all the
events received in the meantime and decide whether to propagate them further once the update request is complete.

However, this way we significantly reduce the number of reconciliations, making the whole process much more efficient.
The framework runs a per-resource *event filter window* around each in-flight
write: it records the resource version returned by our update, buffers any
related events that arrive in the meantime, and at the end of the window
decides what (if anything) to surface to the reconciler. The rules:

- **Pure own echo**: if the only events in the window are watch events whose
resource versions match our recorded own writes (and the action is `UPDATED`),
they are filtered out — the reconciler isn't bothered.
- **Foreign change in the window**: if a resource version arrived that was *not*
one of our own writes — e.g. a third party modified the resource between two
of our updates — the framework synthesizes a single `UPDATED` event covering
the whole window (`previousResource` = the resource just before the window,
`resource` = the latest known state). The reconciler is notified once, with a
faithful before/after picture, instead of receiving each underlying watch
event individually.
- **DELETE in the middle**: if the resource was deleted at some point during
the window, that DELETE participates in the synthesis. A trailing `DELETED`
is surfaced verbatim; a DELETE-then-recreate inside the window collapses to
an `UPDATED` from the deleted state to the recreated state.
- **Held foreign events**: a foreign event that arrives *before* the matching
own write echo is buffered until the write completes. This avoids
surfacing it as foreign only to immediately overwrite it with a synthesized
echo.
- **ReList**: events arriving while the informer is performing a relist are
tagged. Because a relist may have hidden events, the framework defaults to
surfacing such events to the reconciler rather than silently filtering
them — even when they would otherwise look like our own echoes.

This way we significantly reduce the number of reconciliations, making the whole
process much more efficient, while preserving the invariant that any
foreign change reaches the reconciler.

### The case for instant reschedule

Expand Down
17 changes: 17 additions & 0 deletions docs/content/en/docs/documentation/reconciler.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,23 @@ supports stronger guarantees, both for primary and secondary resources. If this
that resource again. This feature also makes sure that the reconciliation is not triggered from the event from our
writes.

The filter is implemented as a per-resource *event filter window* that opens
when an update starts and closes when it completes. Inside the window:
- Pure own echoes (watch events whose resource version matches one of our
recorded own writes) are dropped.
- Foreign events received during the window are merged with the surrounding
own writes into a single synthesized `UPDATED` event so the reconciler
gets a faithful before/after picture rather than each individual watch
event. These events are carefully crafted so they correspond to a real-life scenario,
and remain fully usable by filters.
- A `DELETED` arriving in the window is propagated; a delete-then-recreate
inside the window collapses into one synthesized `UPDATED` from the
deleted state to the recreated state.
- During an informer relist the filter degrades to "surface what we see":
events received while a relist is in progress are propagated even when
they would otherwise look like own echoes, since the relist may have
hidden events.


In order to benefit from these stronger guarantees, use [`ResourceOperations`](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java)
from the context of the reconciliation:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
import io.javaoperatorsdk.operator.processing.event.source.informer.GenericResourceEvent;
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;

import static io.javaoperatorsdk.operator.ReconcilerUtilsInternal.handleKubernetesClientException;
import static io.javaoperatorsdk.operator.processing.event.source.controller.InternalEventFilters.*;
Expand Down Expand Up @@ -84,7 +84,7 @@ protected synchronized void handleEvent(
try {
if (log.isDebugEnabled()) {
log.debug("Event received with action: {}", action);
log.trace("Event Old resource: {},\n new resource: {}", oldResource, resource);
log.debug("Event Old resource: {},\n new resource: {}", oldResource, resource);
}
Comment thread
csviri marked this conversation as resolved.
Comment thread
csviri marked this conversation as resolved.
Comment thread
csviri marked this conversation as resolved.
MDCUtils.addResourceInfo(resource);
controller.getEventSourceManager().broadcastOnResourceEvent(action, resource, oldResource);
Expand Down Expand Up @@ -141,11 +141,22 @@ private void handleOnAddOrUpdate(
ResourceAction action, T oldCustomResource, T newCustomResource) {
var handling =
temporaryResourceCache.onAddOrUpdateEvent(action, newCustomResource, oldCustomResource);
if (handling == EventHandling.NEW) {
handleEvent(action, newCustomResource, oldCustomResource, null);
} else if (log.isDebugEnabled()) {
log.debug("{} event propagation for action: {}", handling, action);
}
handling.ifPresentOrElse(
this::handleEvent,
() -> {
if (log.isDebugEnabled()) {
log.debug("Skipping/deferring event propagation for action: {}", action);
}
});
}

@SuppressWarnings("unchecked")
private void handleEvent(GenericResourceEvent r) {
handleEvent(
r.getAction(),
(T) r.getResource().orElseThrow(),
(T) r.getPreviousResource().orElse(null),
r.getLastStateUnknow());
}

@Override
Expand All @@ -154,10 +165,10 @@ public synchronized void onDelete(T resource, boolean deletedFinalStateUnknown)
resource,
ResourceAction.DELETED,
() -> {
temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
var res = temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
// delete event is quite special here, that requires special care, since we clean up
// caches on delete event.
handleEvent(ResourceAction.DELETED, resource, null, deletedFinalStateUnknown);
res.ifPresent(this::handleEvent);
});
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright Java Operator SDK Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.javaoperatorsdk.operator.processing.event.ResourceID;

public class EventFilterSupport {

private static final Logger log = LoggerFactory.getLogger(EventFilterSupport.class);

private final Map<ResourceID, EventFilterWindow> eventFilterWindows = new HashMap<>();
private boolean ongoingReList = false;

public synchronized void startEventFilteringModify(ResourceID resourceID) {
var existing = eventFilterWindows.get(resourceID);
var ed =
eventFilterWindows.computeIfAbsent(resourceID, id -> new EventFilterWindow(ongoingReList));
ed.increaseActiveUpdates();
log.debug(
"startEventFilteringModify: id={}, windowReused={}, ongoingReList={}",
resourceID,
existing != null,
ongoingReList);
}

public synchronized Optional<GenericResourceEvent> doneEventFilterModify(ResourceID resourceID) {
var ed = eventFilterWindows.get(resourceID);
if (ed == null) {
log.debug("doneEventFilterModify: no window for id={}", resourceID);
return Optional.empty();
}
ed.decreaseActiveUpdates();
log.debug("doneEventFilterModify: id={}", resourceID);
return check(ed, resourceID);
}

public synchronized Optional<GenericResourceEvent> processEvent(
ResourceID resourceId, GenericResourceEvent genericResourceEvent) {
var ed = eventFilterWindows.get(resourceId);
if (ed != null) {
log.debug(
"processEvent: buffering event in window. id={}, action={}, rv={}",
resourceId,
genericResourceEvent.getAction(),
genericResourceEvent
.getResource()
.map(r -> r.getMetadata().getResourceVersion())
.orElse("?"));
ed.addRelatedEvent(genericResourceEvent);
return check(ed, resourceId);
} else {
log.debug(
"processEvent: no active window, surfacing directly. id={}, action={}",
resourceId,
genericResourceEvent.getAction());
return Optional.of(genericResourceEvent);
}
}

private Optional<GenericResourceEvent> check(
EventFilterWindow eventFilterWindow, ResourceID resourceID) {
var res = eventFilterWindow.check();
if (eventFilterWindow.canBeRemoved()) {
log.debug("Removing empty event filter window. id={}", resourceID);
eventFilterWindows.remove(resourceID);
}
return res;
}

public synchronized void addToOwnResourceVersions(ResourceID resourceId, String resourceVersion) {
var window = eventFilterWindows.get(resourceId);
if (window != null) {
log.debug("Recording own resourceVersion. id={}, rv={}", resourceId, resourceVersion);
window.addToOwnResourceVersions(resourceVersion);
} else {
log.debug(
"addToOwnResourceVersions: no active window for id={}, rv={} (skipped)",
resourceId,
resourceVersion);
}
}

public synchronized void handleGhostResourceRemoval(ResourceID resourceId) {
log.debug("Ghost resource removal: discarding event filter window. id={}", resourceId);
eventFilterWindows.remove(resourceId);
}

// for testing purposes
synchronized Map<ResourceID, EventFilterWindow> getEventFilterWindows() {
return eventFilterWindows;
}

public synchronized void setStartingReList() {
log.debug("ReList starting: tagging {} active window(s)", eventFilterWindows.size());
ongoingReList = true;
eventFilterWindows.values().forEach(EventFilterWindow::setReListStarted);
}

public synchronized void setRelistFinished() {
log.debug("ReList finished: clearing tag from {} active window(s)", eventFilterWindows.size());
ongoingReList = false;
eventFilterWindows.values().forEach(EventFilterWindow::setReListFinished);
}

public synchronized boolean isActiveUpdateFor(ResourceID resourceId) {
return eventFilterWindows.containsKey(resourceId);
}
}
Loading
Loading