From f131ee3ab4d732611e0a27323060451976197e6c Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Wed, 15 Oct 2025 11:16:49 -0400 Subject: [PATCH] showing code to possibly remove the previous annotation Signed-off-by: Steve Hawkins --- .../KubernetesDependentResource.java | 2 +- .../processing/event/EventProcessor.java | 8 +-- .../processing/event/ResourceState.java | 36 +++++++++- .../event/source/informer/DependentEvent.java | 65 +++++++++++++++++++ .../source/informer/InformerEventSource.java | 33 +++++----- 5 files changed, 123 insertions(+), 21 deletions(-) create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DependentEvent.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java index 69d145866d..4b8b6a3601 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java @@ -41,7 +41,7 @@ public abstract class KubernetesDependentResource kubernetesDependentResourceConfig; private volatile Boolean useSSA; - private volatile Boolean usePreviousAnnotationForEventFiltering; + private volatile Boolean usePreviousAnnotationForEventFiltering = false; public KubernetesDependentResource() {} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index e029e287a0..903a2413c6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -202,10 +202,10 @@ private void handleEventMarking(Event event, ResourceState state) { // removed, but also the informers websocket is disconnected and later reconnected. So // meanwhile the resource could be deleted and recreated. In this case we just mark a new // event as below. - markEventReceived(state); + markEventReceived(state, event); } } else if (!state.deleteEventPresent() && !state.processedMarkForDeletionPresent()) { - markEventReceived(state); + markEventReceived(state, event); } else if (log.isDebugEnabled()) { log.debug( "Skipped marking event as received. Delete event present: {}, processed mark for" @@ -215,9 +215,9 @@ private void handleEventMarking(Event event, ResourceState state) { } } - private void markEventReceived(ResourceState state) { + private void markEventReceived(ResourceState state, Event event) { log.debug("Marking event received for: {}", state.getId()); - state.markEventReceived(); + state.markEventReceived(event); } private boolean isResourceMarkedForDeletion(ResourceEvent resourceEvent) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java index 5d4e74d681..95769e2f12 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java @@ -1,6 +1,10 @@ package io.javaoperatorsdk.operator.processing.event; +import java.util.HashMap; +import java.util.Map; + import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState; +import io.javaoperatorsdk.operator.processing.event.source.informer.DependentEvent; import io.javaoperatorsdk.operator.processing.retry.RetryExecution; class ResourceState { @@ -29,6 +33,12 @@ private enum EventingState { private RetryExecution retry; private EventingState eventing; private RateLimitState rateLimit; + + record DependentKey(String kind, ResourceID id) { + + } + + private Map dependentEvents = new HashMap<>(); public ResourceState(ResourceID id) { this.id = id; @@ -60,6 +70,12 @@ public boolean isUnderProcessing() { } public void setUnderProcessing(boolean underProcessing) { + if (!underProcessing && !dependentEvents.isEmpty()) { + dependentEvents.clear(); + if (eventing == EventingState.NO_EVENT_PRESENT) { + eventing = EventingState.EVENT_PRESENT; + } + } this.underProcessing = underProcessing; } @@ -76,10 +92,27 @@ public boolean processedMarkForDeletionPresent() { } public void markEventReceived() { + markEventReceived(null); + } + + public void markEventReceived(Event event) { if (deleteEventPresent()) { throw new IllegalStateException("Cannot receive event after a delete event received"); } - eventing = EventingState.EVENT_PRESENT; + if (!underProcessing || !(event instanceof DependentEvent)) { + eventing = EventingState.EVENT_PRESENT; + dependentEvents.clear(); + } else { + DependentEvent de = (DependentEvent) event; + DependentKey key = new DependentKey(de.getKind(), de.getDependent()); + if (de.isFromOperator()) { + // if the event is from the operator, we don't care about anything older + dependentEvents.computeIfPresent( + key, (ignored, rv) -> rv <= de.getDependentResourceVersion() ? null : rv); + } else { + dependentEvents.put(key, de.getDependentResourceVersion()); + } + } } public void markProcessedMarkForDeletion() { @@ -107,6 +140,7 @@ public void unMarkEventReceived() { // do nothing break; } + dependentEvents.clear(); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DependentEvent.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DependentEvent.java new file mode 100644 index 0000000000..60c7f4e0e0 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DependentEvent.java @@ -0,0 +1,65 @@ +package io.javaoperatorsdk.operator.processing.event.source.informer; + +import java.util.Objects; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.processing.event.Event; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +public class DependentEvent extends Event { + + private final ResourceID dependent; + private final String dependentResourceVersion; + private final String kind; + private final boolean fromOperator; + + public DependentEvent( + ResourceID targetCustomResource, + HasMetadata dependent, + boolean fromOperator) { + super(targetCustomResource); + this.dependent = ResourceID.fromResource(dependent); + this.dependentResourceVersion = dependent.getMetadata().getResourceVersion(); + this.kind = dependent.getKind(); + this.fromOperator = fromOperator; + } + + public ResourceID getDependent() { + return dependent; + } + + public String getDependentResourceVersion() { + return dependentResourceVersion; + } + + public String getKind() { + return kind; + } + + public boolean isFromOperator() { + return fromOperator; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + DependentEvent that = (DependentEvent) o; + return Objects.equals(dependent, that.dependent) + && Objects.equals(dependentResourceVersion, that.dependentResourceVersion) + && Objects.equals(kind, that.kind) + && fromOperator == that.fromOperator; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), dependent, dependentResourceVersion, kind, fromOperator); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index c029a54170..71aa7f1b38 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -153,7 +153,7 @@ public void onDelete(R resource, boolean b) { primaryToSecondaryIndex.onDelete(resource); super.onDelete(resource, b); if (acceptedByDeleteFilters(resource, b)) { - propagateEvent(resource); + propagateEvent(resource, false); } } @@ -169,25 +169,23 @@ private synchronized void onAddOrUpdate( Operation operation, R newObject, R oldObject, Runnable superOnOp) { var resourceID = ResourceID.fromResource(newObject); + superOnOp.run(); + if (canSkipEvent(newObject, oldObject, resourceID)) { log.debug( "Skipping event propagation for {}, since was a result of a reconcile action. Resource" + " ID: {}", operation, ResourceID.fromResource(newObject)); - superOnOp.run(); + } else if (eventAcceptedByFilter(operation, newObject, oldObject)) { + log.debug( + "Propagating event for {}, resource with same version not result of a reconciliation." + + " Resource ID: {}", + operation, + resourceID); + propagateEvent(newObject, false); } else { - superOnOp.run(); - if (eventAcceptedByFilter(operation, newObject, oldObject)) { - log.debug( - "Propagating event for {}, resource with same version not result of a reconciliation." - + " Resource ID: {}", - operation, - resourceID); - propagateEvent(newObject); - } else { - log.debug("Event filtered out for operation: {}, resourceID: {}", operation, resourceID); - } + log.debug("Event filtered out for operation: {}, resourceID: {}", operation, resourceID); } } @@ -229,7 +227,7 @@ private boolean isEventKnownFromAnnotation(R newObject, R oldObject) { return known; } - private void propagateEvent(R object) { + private void propagateEvent(R object, boolean fromOperator) { var primaryResourceIdSet = configuration().getSecondaryToPrimaryMapper().toPrimaryResourceIDs(object); if (primaryResourceIdSet.isEmpty()) { @@ -237,7 +235,11 @@ private void propagateEvent(R object) { } primaryResourceIdSet.forEach( resourceId -> { - Event event = new Event(resourceId); + Event event = + new DependentEvent( + resourceId, + object, + fromOperator); /* * In fabric8 client for certain cases informers can be created on in a way that they are * automatically started, what would cause a NullPointerException here, since an event @@ -293,6 +295,7 @@ private void handleRecentCreateOrUpdate(Operation operation, R newResource, R ol Optional.ofNullable(oldResource) .map(r -> r.getMetadata().getResourceVersion()) .orElse(null)); + propagateEvent(newResource, true); } private boolean useSecondaryToPrimaryIndex() {