Skip to content

Commit d237f25

Browse files
committed
wip
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent cac3d20 commit d237f25

4 files changed

Lines changed: 63 additions & 14 deletions

File tree

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,6 @@ public synchronized void onDelete(R resource, boolean deletedFinalStateUnknown)
127127
if (resultEvent.isEmpty()) {
128128
return;
129129
}
130-
if (resultEvent.orElseThrow().getAction() != ResourceAction.DELETED) {
131-
log.warn("Non delete event received on onDelete handling. This should not happen.");
132-
}
133130
primaryToSecondaryIndex.onDelete(resource);
134131
if (acceptedByDeleteFilters(resource, deletedFinalStateUnknown)) {
135132
propagateEvent(resource);
@@ -140,6 +137,12 @@ public synchronized void onDelete(R resource, boolean deletedFinalStateUnknown)
140137
@Override
141138
protected void handleEvent(
142139
ResourceAction action, R resource, R oldResource, Boolean deletedFinalStateUnknown) {
140+
// this is called only from ManagedInformerEventSource#eventFilteringUpdateAndCacheResource
141+
// we want to skip delete when the delete event filtered out, but update the index if
142+
// an actual event is propagated
143+
if (action == ResourceAction.DELETED) {
144+
primaryToSecondaryIndex.onDelete(resource);
145+
}
143146
propagateEvent(resource);
144147
}
145148

@@ -154,6 +157,7 @@ public synchronized void start() {
154157
manager().list().forEach(primaryToSecondaryIndex::onAddOrUpdate);
155158
}
156159

160+
@SuppressWarnings("unchecked")
157161
private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R oldObject) {
158162
primaryToSecondaryIndex.onAddOrUpdate(newObject);
159163
var resourceID = ResourceID.fromResource(newObject);
@@ -167,11 +171,7 @@ private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R ol
167171
"Propagating event for {}, resource with same version not result of a our update.",
168172
action);
169173
var event = resultEvent.get();
170-
handleEvent(
171-
event.getAction(),
172-
(R) event.getResource().orElseThrow(),
173-
(R) event.getPreviousResource().orElse(null),
174-
event.getLastStateUnknow());
174+
propagateEvent((R) event.getResource().orElseThrow());
175175
} else {
176176
log.debug("Event filtered out for operation: {}, resourceID: {}", action, resourceID);
177177
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,9 @@ public void onList(String resourceVersion, boolean remainedEmpty) {
153153

154154
// @Override (enable when
155155
// re-list supported by fabric8 client https://github.com/fabric8io/kubernetes-client/pull/7899
156-
public void onBeforeList(String lastSyncResourceVersion) {
157-
// temporaryResourceCache.setOngoingRelist(lastSyncResourceVersion);
158-
}
156+
// public void onBeforeList(String lastSyncResourceVersion) {
157+
// temporaryResourceCache.setOngoingRelist(lastSyncResourceVersion);
158+
// }
159159

160160
@Override
161161
public void handleRecentResourceUpdate(

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.javaoperatorsdk.operator.processing.event.source.informer;
1717

18+
import java.lang.reflect.Field;
1819
import java.time.Duration;
1920
import java.util.HashMap;
2021
import java.util.List;
@@ -224,11 +225,11 @@ void eventReceivedAfterFailedUpdate_isPropagatedNormally() {
224225

225226
CountDownLatch latch = sendForExceptionThrowingUpdate();
226227
latch.countDown();
228+
var deployment = deploymentWithResourceVersion(2);
227229

228-
informerEventSource.onUpdate(
229-
deploymentWithResourceVersion(1), deploymentWithResourceVersion(2));
230+
informerEventSource.onUpdate(deploymentWithResourceVersion(1), deployment);
230231

231-
expectHandleUpdateEvent(2, 1);
232+
expectPropagateEvent(deployment);
232233
}
233234

234235
@Test
@@ -541,13 +542,59 @@ void deleteEventDuringOwnUpdateIsPropagated() {
541542
.untilAsserted(() -> verify(eventHandlerMock, atLeastOnce()).handleEvent(any()));
542543
}
543544

545+
@Test
546+
void handleEventUpdatesIndexWhenDeletePropagatesFromTempCache() throws Exception {
547+
// handleEvent is invoked from ManagedInformerEventSource#eventFilteringUpdateAndCacheResource
548+
// only after the temp cache decided to surface the event. For a DELETE that means the resource
549+
// is really gone and the secondary→primary index must drop it; otherwise stale entries linger
550+
// and getSecondaryResources keeps returning a tombstone.
551+
var indexMock = injectIndexMock();
552+
var resource = testDeployment();
553+
554+
informerEventSource.handleEvent(ResourceAction.DELETED, resource, null, false);
555+
556+
verify(indexMock, times(1)).onDelete(resource);
557+
verify(indexMock, never()).onAddOrUpdate(any());
558+
verify(eventHandlerMock, times(1)).handleEvent(any());
559+
}
560+
561+
@Test
562+
void handleEventDoesNotTouchIndexForNonDeleteAction() throws Exception {
563+
// The onAdd/onUpdate path maintains the index in onAddOrUpdate(); handleEvent must not
564+
// double-update it for non-DELETE actions, otherwise we'd index resources twice.
565+
var indexMock = injectIndexMock();
566+
567+
informerEventSource.handleEvent(
568+
ResourceAction.UPDATED, testDeployment(), testDeployment(), null);
569+
570+
verify(indexMock, never()).onDelete(any());
571+
verify(indexMock, never()).onAddOrUpdate(any());
572+
verify(eventHandlerMock, times(1)).handleEvent(any());
573+
}
574+
575+
private PrimaryToSecondaryIndex<Deployment> injectIndexMock() throws Exception {
576+
@SuppressWarnings("unchecked")
577+
PrimaryToSecondaryIndex<Deployment> indexMock = mock(PrimaryToSecondaryIndex.class);
578+
Field field = InformerEventSource.class.getDeclaredField("primaryToSecondaryIndex");
579+
field.setAccessible(true);
580+
field.set(informerEventSource, indexMock);
581+
return indexMock;
582+
}
583+
544584
private void assertNoEventProduced() {
545585
await()
546586
.pollDelay(Duration.ofMillis(70))
547587
.timeout(Duration.ofMillis(150))
548588
.untilAsserted(() -> verify(informerEventSource, never()).propagateEvent(any()));
549589
}
550590

591+
private void expectPropagateEvent(Deployment newResourceVersion) {
592+
await()
593+
.atMost(Duration.ofSeconds(1))
594+
.untilAsserted(
595+
() -> verify(informerEventSource, times(1)).propagateEvent(newResourceVersion));
596+
}
597+
551598
private void expectHandleUpdateEvent(int newResourceVersion, int oldResourceVersion) {
552599
await()
553600
.atMost(Duration.ofSeconds(1))

operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/onrelistfilter/OnRelistFilterIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.time.Duration;
1919

20+
import org.junit.jupiter.api.Disabled;
2021
import org.junit.jupiter.api.Test;
2122
import org.junit.jupiter.api.extension.RegisterExtension;
2223

@@ -36,6 +37,7 @@
3637
* <li>re-list starts WHILE the update window is open — own write is propagated
3738
* </ul>
3839
*/
40+
@Disabled("enable when fabric8 supports relist")
3941
class OnRelistFilterIT {
4042

4143
static final String RESOURCE_NAME = "test-resource";

0 commit comments

Comments
 (0)