Skip to content

Commit f00eba7

Browse files
committed
simple algorithm, refined tests
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent 7663c2b commit f00eba7

7 files changed

Lines changed: 276 additions & 211 deletions

File tree

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterSupport.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,10 @@
1919
import java.util.Map;
2020
import java.util.Optional;
2121

22-
import org.slf4j.Logger;
23-
import org.slf4j.LoggerFactory;
24-
2522
import io.javaoperatorsdk.operator.processing.event.ResourceID;
2623

2724
public class EventFilterSupport {
2825

29-
private static final Logger log = LoggerFactory.getLogger(EventFilterSupport.class);
30-
3126
private final Map<ResourceID, EventFilterWindow> eventFilterWindows = new HashMap<>();
3227
private Long lastKnownVersionBeforeRelist = null;
3328

@@ -45,7 +40,7 @@ public synchronized Optional<GenericResourceEvent> doneEventFilterModify(Resourc
4540
return check(ed, resourceID);
4641
}
4742

48-
public synchronized Optional<GenericResourceEvent> processRelevantEvent(
43+
public synchronized Optional<GenericResourceEvent> processEvent(
4944
ResourceID resourceId, GenericResourceEvent genericResourceEvent) {
5045
var ed = eventFilterWindows.get(resourceId);
5146
if (ed != null) {
@@ -71,10 +66,6 @@ public synchronized void addToOwnResourceVersions(ResourceID resourceId, String
7166
}
7267

7368
public synchronized void handleGhostResourceRemoval(ResourceID resourceId) {
74-
var ed = eventFilterWindows.get(resourceId);
75-
if (ed != null && !ed.canRemoved()) {
76-
return;
77-
}
7869
eventFilterWindows.remove(resourceId);
7970
}
8071

@@ -84,10 +75,12 @@ synchronized Map<ResourceID, EventFilterWindow> getEventFilterWindows() {
8475
}
8576

8677
public synchronized void setStartingReList(String lastKnownVersion) {
78+
lastKnownVersionBeforeRelist = Long.parseLong(lastKnownVersion);
8779
eventFilterWindows.values().forEach(au -> au.setReListStartedFrom(lastKnownVersion));
8880
}
8981

9082
public synchronized void setRelistFinished(String syncResourceVersions) {
83+
lastKnownVersionBeforeRelist = null;
9184
eventFilterWindows.values().forEach(EventFilterWindow::setReListFinished);
9285
}
9386

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterWindow.java

Lines changed: 94 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.slf4j.Logger;
2525
import org.slf4j.LoggerFactory;
2626

27-
import io.fabric8.kubernetes.api.model.HasMetadata;
2827
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
2928

3029
/**
@@ -37,16 +36,10 @@ class EventFilterWindow {
3736
private final SortedMap<Long, GenericResourceEvent> relatedEvents = new TreeMap<>();
3837
private final SortedSet<Long> ownResourceVersions = new TreeSet<>();
3938
private Long lastResourceVersionBeforeReList;
40-
private boolean affectedByReList;
4139
private int activeUpdates = 0;
42-
private boolean ownRvEverAdded = false;
43-
private int ownRvCount = 0;
44-
private Long lastEmittedResourceRv;
45-
private Long lastSeenRelatedRv;
4640

4741
public EventFilterWindow(Long lastResourceVersionBeforeReList) {
4842
this.lastResourceVersionBeforeReList = lastResourceVersionBeforeReList;
49-
this.affectedByReList = lastResourceVersionBeforeReList != null;
5043
}
5144

5245
// Before we run this method
@@ -69,179 +62,138 @@ public synchronized Optional<GenericResourceEvent> check() {
6962
if (relatedEvents.isEmpty()) {
7063
return Optional.empty();
7164
}
72-
73-
long maxRelatedRv = relatedEvents.lastKey();
74-
75-
// While an in-flight write hasn't recorded its own RV yet, events past
76-
// the highest known own RV may still turn out to be that write's echo —
77-
// restrict the synth window so they're held until either the RV arrives
78-
// or the write completes. ownRvCount is monotonic across cleanups so
79-
// already-recorded RVs are not re-classified as "pending" once forgotten.
80-
Long cutoff;
81-
if (activeUpdates > ownRvCount) {
82-
if (ownResourceVersions.isEmpty()) {
83-
return Optional.empty();
84-
}
85-
cutoff = ownResourceVersions.last();
86-
} else {
87-
cutoff = maxRelatedRv;
65+
if (activeUpdates == 0 && ownResourceVersions.isEmpty()) {
66+
return eventForRangeAndClear(relatedEvents, ownResourceVersions);
8867
}
89-
90-
var windowMap = relatedEvents.headMap(cutoff + 1);
91-
if (windowMap.isEmpty()) {
92-
return Optional.empty();
68+
if (ownResourceVersions.isEmpty()
69+
&& getFirstRelatedEvent().getAction().equals(ResourceAction.DELETED)) {
70+
return eventForRangeAndClear(relatedEvents, ownResourceVersions);
9371
}
9472

95-
boolean foundForeign = false;
96-
for (var entry : windowMap.entrySet()) {
97-
if (!isOwnEcho(entry.getKey(), entry.getValue())) {
98-
foundForeign = true;
73+
var lastEventVersion = getLastRelatedEvent().getResourceVersion();
74+
var numberOwnUpdatesSelected = 0;
75+
long lastOwnVersion = -1;
76+
for (long ownVersion : ownResourceVersions) {
77+
if (ownVersion <= lastEventVersion) {
78+
numberOwnUpdatesSelected++;
79+
lastOwnVersion = ownVersion;
80+
} else {
9981
break;
10082
}
10183
}
84+
if (numberOwnUpdatesSelected > 0) {
85+
if (numberOwnUpdatesSelected == ownResourceVersions.size() && activeUpdates == 0) {
86+
return eventForRangeAndClear(relatedEvents, ownResourceVersions);
87+
} else {
88+
if (numberOwnUpdatesSelected < ownResourceVersions.size()) {
89+
return eventForRangeAndClear(
90+
relatedEvents.headMap(ownResourceVersions.tailSet(lastOwnVersion + 1).first()),
91+
ownResourceVersions.headSet(lastOwnVersion + 1));
92+
} else
93+
return eventForRangeAndClear(
94+
relatedEvents.headMap(lastOwnVersion + 1),
95+
ownResourceVersions.headSet(lastOwnVersion + 1));
96+
}
97+
}
98+
return Optional.empty();
99+
}
102100

103-
Long prevSeen = lastSeenRelatedRv;
104-
Optional<GenericResourceEvent> result = Optional.empty();
105-
106-
// Emit if there is a foreign event in the window, or if a previously emitted
107-
// event already advanced the reconciler's view and a *new* event (not one we
108-
// already saw at a prior check) now moves it further. ReList also forces an
109-
// emit since it may have hidden events while it was running.
110-
boolean shouldEmit =
111-
foundForeign
112-
|| (lastEmittedResourceRv != null && (prevSeen == null || cutoff > prevSeen))
113-
|| affectedByReList;
101+
// it has responsibility to clear those ranges and emit event if needed
102+
Optional<GenericResourceEvent> eventForRangeAndClear(
103+
SortedMap<Long, GenericResourceEvent> events, SortedSet<Long> ownResourceVersions) {
104+
if (events.isEmpty()) {
105+
return Optional.empty();
106+
}
107+
var isAnyEventFromReList =
108+
events.values().stream().anyMatch(GenericResourceEvent::isPartOfReList);
114109

115-
if (shouldEmit) {
116-
// Synthesize only from events that are *new* since the last check;
117-
// carryover events (RV ≤ prevSeen) were already considered before and
118-
// should not drive the synthesized event's resource versions.
119-
var synthWindow = prevSeen == null ? windowMap : windowMap.tailMap(prevSeen + 1);
110+
var first = getFirstRelatedEvent(events);
111+
if (events.size() > 1 && first.getAction() == ResourceAction.DELETED) {
112+
events.remove(events.firstKey());
113+
first = getFirstRelatedEvent(events);
114+
}
120115

121-
// When affected by a reList, treat events at or before the reList boundary
122-
// as captured *during* relist and not informative — only events strictly
123-
// after the boundary drive the synthesized output.
124-
var effectiveWindow =
125-
affectedByReList && lastResourceVersionBeforeReList != null
126-
? synthWindow.tailMap(lastResourceVersionBeforeReList + 1)
127-
: synthWindow;
116+
if (events.keySet().equals(ownResourceVersions) && !isAnyEventFromReList) {
117+
GenericResourceEvent res = null;
118+
var lastEvent = getLastRelatedEvent(events);
119+
if (lastEvent.getAction() == ResourceAction.DELETED) {
120+
res = lastEvent;
121+
}
122+
events.clear();
123+
ownResourceVersions.clear();
124+
return Optional.ofNullable(res);
125+
}
128126

129-
if (!effectiveWindow.isEmpty()) {
130-
var firstEvent = effectiveWindow.get(effectiveWindow.firstKey());
131-
var lastEvent = effectiveWindow.get(effectiveWindow.lastKey());
127+
if (events.size() == 1) {
128+
ownResourceVersions.clear();
129+
var res = Optional.of(events.values().iterator().next());
130+
events.clear();
131+
return res;
132+
}
133+
var lastEvent = getLastRelatedEvent(events);
134+
if (lastEvent.getAction() == ResourceAction.DELETED) {
135+
events.clear();
136+
ownResourceVersions.clear();
137+
return Optional.of(lastEvent);
138+
}
132139

133-
// Identify the last DELETE in the synth window; a DELETE marks the
134-
// boundary of the "current life" of the resource — anything before it
135-
// represents a state that no longer exists.
136-
GenericResourceEvent lastDelete = null;
137-
boolean hasForeign = false;
138-
boolean allForeignAreDeletes = true;
139-
for (var entry : effectiveWindow.entrySet()) {
140-
var ev = entry.getValue();
141-
if (ev.getAction() == ResourceAction.DELETED) {
142-
lastDelete = ev;
143-
}
144-
if (!isOwnEcho(entry.getKey(), ev)) {
145-
hasForeign = true;
146-
if (ev.getAction() != ResourceAction.DELETED) {
147-
allForeignAreDeletes = false;
148-
}
149-
}
150-
}
151-
boolean lastIsOwnEcho = isOwnEcho(effectiveWindow.lastKey(), lastEvent);
152-
boolean reListBeforeFirstOwn =
153-
affectedByReList
154-
&& !ownResourceVersions.isEmpty()
155-
&& lastResourceVersionBeforeReList != null
156-
&& lastResourceVersionBeforeReList < ownResourceVersions.first();
140+
var res =
141+
Optional.of(
142+
new GenericResourceEvent(
143+
ResourceAction.UPDATED,
144+
lastEvent.getResource().orElseThrow(),
145+
first.getPreviousResource().isEmpty()
146+
? first.getResource().orElseThrow()
147+
: first.getPreviousResource().orElseThrow(),
148+
null));
149+
events.clear();
150+
ownResourceVersions.clear();
151+
return res;
152+
}
157153

158-
if (affectedByReList && (hasForeign || reListBeforeFirstOwn)) {
159-
// ReList obscured part of the timeline AND something happened that
160-
// wasn't purely our own activity — surface a DELETE with
161-
// lastStateUnknown=true so the reconciler knows the latest known
162-
// state is uncertain.
163-
HasMetadata deleted = lastEvent.getResource().orElseThrow();
164-
result =
165-
Optional.of(new GenericResourceEvent(ResourceAction.DELETED, deleted, null, true));
166-
lastEmittedResourceRv = cutoff;
167-
} else if (!affectedByReList && hasForeign && allForeignAreDeletes && lastIsOwnEcho) {
168-
// The synth window represents a delete-then-our-recreate sequence:
169-
// the only foreign activity was DELETE(s) and the resource is back
170-
// under our control. Nothing for the reconciler to know about.
171-
} else if (effectiveWindow.size() == 1) {
172-
result = Optional.of(firstEvent);
173-
lastEmittedResourceRv = cutoff;
174-
} else if (lastEvent.getAction() == ResourceAction.DELETED) {
175-
result = Optional.of(lastEvent);
176-
lastEmittedResourceRv = cutoff;
177-
} else if (lastDelete != null) {
178-
// A DELETE happened in the middle and the resource was recreated/updated
179-
// afterwards. Synth UPDATED with previous = the deleted state.
180-
HasMetadata previous = lastDelete.getResource().orElseThrow();
181-
HasMetadata latest = lastEvent.getResource().orElseThrow();
182-
result =
183-
Optional.of(new GenericResourceEvent(ResourceAction.UPDATED, latest, previous, null));
184-
lastEmittedResourceRv = cutoff;
185-
} else {
186-
HasMetadata previous =
187-
firstEvent
188-
.getPreviousResource()
189-
.orElseGet(() -> firstEvent.getResource().orElseThrow());
190-
HasMetadata latest = lastEvent.getResource().orElseThrow();
191-
result =
192-
Optional.of(new GenericResourceEvent(ResourceAction.UPDATED, latest, previous, null));
193-
lastEmittedResourceRv = cutoff;
194-
}
195-
}
154+
private GenericResourceEvent getFirstRelatedEvent() {
155+
return getFirstRelatedEvent(relatedEvents);
156+
}
196157

197-
if (affectedByReList) {
198-
affectedByReList = false;
199-
lastResourceVersionBeforeReList = null;
200-
}
201-
}
158+
private GenericResourceEvent getFirstRelatedEvent(SortedMap<Long, GenericResourceEvent> subMap) {
159+
return subMap.values().iterator().next();
160+
}
202161

203-
lastSeenRelatedRv = prevSeen == null ? maxRelatedRv : Math.max(prevSeen, maxRelatedRv);
204-
relatedEvents.headMap(cutoff + 1).clear();
205-
ownResourceVersions.headSet(cutoff + 1).clear();
206-
return result;
162+
private GenericResourceEvent getLastRelatedEvent(SortedMap<Long, GenericResourceEvent> subMap) {
163+
return subMap.get(subMap.lastKey());
207164
}
208165

209-
private boolean isOwnEcho(Long resourceVersion, GenericResourceEvent event) {
210-
return event.getAction() != ResourceAction.DELETED
211-
&& ownResourceVersions.contains(resourceVersion);
166+
private GenericResourceEvent getLastRelatedEvent() {
167+
return getLastRelatedEvent(relatedEvents);
212168
}
213169

214170
public synchronized boolean canRemoved() {
215-
if (activeUpdates == 0 && ownResourceVersions.isEmpty() && ownRvEverAdded) {
216-
if (!relatedEvents.isEmpty()) {
217-
log.warn("Related events are not empty");
218-
}
171+
if (activeUpdates == 0 && ownResourceVersions.isEmpty() && relatedEvents.isEmpty()) {
219172
return true;
220173
}
221174
return false;
222175
}
223176

224177
void addToOwnResourceVersions(String resourceVersion) {
225178
ownResourceVersions.add(Long.parseLong(resourceVersion));
226-
ownRvEverAdded = true;
227-
ownRvCount++;
228179
}
229180

230-
public void addRelatedEvent(GenericResourceEvent event) {
181+
public synchronized void addRelatedEvent(GenericResourceEvent event) {
182+
if (lastResourceVersionBeforeReList != null) {
183+
event.setPartOfReList(true);
184+
}
185+
231186
relatedEvents.put(
232187
Long.parseLong(event.getResource().orElseThrow().getMetadata().getResourceVersion()),
233188
event);
234189
}
235190

236191
public synchronized void setReListStartedFrom(String lastResourceVersionBeforeReList) {
237192
this.lastResourceVersionBeforeReList = Long.parseLong(lastResourceVersionBeforeReList);
238-
this.affectedByReList = true;
239193
}
240194

241195
public synchronized void setReListFinished() {
242-
// Marker: relist has completed and check() may now process. The relist
243-
// boundary (lastResourceVersionBeforeReList) is consumed by the next check
244-
// and reset there along with affectedByReList.
196+
lastResourceVersionBeforeReList = null;
245197
}
246198

247199
public synchronized void increaseActiveUpdates() {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/GenericResourceEvent.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public class GenericResourceEvent extends ResourceEvent {
2828

2929
private final HasMetadata previousResource;
3030
private final Boolean lastStateUnknow;
31+
private boolean partOfReList = false;
3132

3233
public GenericResourceEvent(
3334
ResourceAction action,
@@ -75,4 +76,16 @@ public boolean equals(Object o) {
7576
public int hashCode() {
7677
return Objects.hash(super.hashCode(), previousResource);
7778
}
79+
80+
public long getResourceVersion() {
81+
return Long.parseLong(getResource().orElseThrow().getMetadata().getResourceVersion());
82+
}
83+
84+
public boolean isPartOfReList() {
85+
return partOfReList;
86+
}
87+
88+
public void setPartOfReList(boolean partOfReList) {
89+
this.partOfReList = partOfReList;
90+
}
7891
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ private synchronized Optional<GenericResourceEvent> onEvent(
119119
cache.remove(resourceId);
120120
}
121121
}
122-
return eventFilteringSupport.processRelevantEvent(resourceId, actualEvent);
122+
return eventFilteringSupport.processEvent(resourceId, actualEvent);
123123
}
124124

125125
static <T extends HasMetadata> GenericResourceEvent toGenericResourceEvent(

0 commit comments

Comments
 (0)