Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public abstract class KubernetesDependentResource<R extends HasMetadata, P exten
private final boolean garbageCollected = this instanceof GarbageCollected;
private KubernetesDependentResourceConfig<R> kubernetesDependentResourceConfig;
private volatile Boolean useSSA;
private volatile Boolean usePreviousAnnotationForEventFiltering;
private volatile Boolean usePreviousAnnotationForEventFiltering = false;

public KubernetesDependentResource() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,10 @@ private void handleEventMarking(Event event, ResourceState state) {
// removed, but also the informers websocket is disconnected and later reconnected. So
// meanwhile the resource could be deleted and recreated. In this case we just mark a new
// event as below.
markEventReceived(state);
markEventReceived(state, event);
}
} else if (!state.deleteEventPresent() && !state.processedMarkForDeletionPresent()) {
markEventReceived(state);
markEventReceived(state, event);
} else if (log.isDebugEnabled()) {
log.debug(
"Skipped marking event as received. Delete event present: {}, processed mark for"
Expand All @@ -215,9 +215,9 @@ private void handleEventMarking(Event event, ResourceState state) {
}
}

private void markEventReceived(ResourceState state) {
private void markEventReceived(ResourceState state, Event event) {
log.debug("Marking event received for: {}", state.getId());
state.markEventReceived();
state.markEventReceived(event);
}

private boolean isResourceMarkedForDeletion(ResourceEvent resourceEvent) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package io.javaoperatorsdk.operator.processing.event;

import java.util.HashMap;
import java.util.Map;

import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState;
import io.javaoperatorsdk.operator.processing.event.source.informer.DependentEvent;
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;

class ResourceState {
Expand Down Expand Up @@ -29,6 +33,12 @@ private enum EventingState {
private RetryExecution retry;
private EventingState eventing;
private RateLimitState rateLimit;

record DependentKey(String kind, ResourceID id) {

}

private Map<DependentKey, String> dependentEvents = new HashMap<>();

public ResourceState(ResourceID id) {
this.id = id;
Expand Down Expand Up @@ -60,6 +70,12 @@ public boolean isUnderProcessing() {
}

public void setUnderProcessing(boolean underProcessing) {
if (!underProcessing && !dependentEvents.isEmpty()) {
dependentEvents.clear();
if (eventing == EventingState.NO_EVENT_PRESENT) {
eventing = EventingState.EVENT_PRESENT;
}
}
this.underProcessing = underProcessing;
}

Expand All @@ -76,10 +92,27 @@ public boolean processedMarkForDeletionPresent() {
}

public void markEventReceived() {
markEventReceived(null);
}

public void markEventReceived(Event event) {
if (deleteEventPresent()) {
throw new IllegalStateException("Cannot receive event after a delete event received");
}
eventing = EventingState.EVENT_PRESENT;
if (!underProcessing || !(event instanceof DependentEvent)) {
eventing = EventingState.EVENT_PRESENT;
dependentEvents.clear();
} else {
DependentEvent de = (DependentEvent) event;
DependentKey key = new DependentKey(de.getKind(), de.getDependent());
if (de.isFromOperator()) {
// if the event is from the operator, we don't care about anything older
dependentEvents.computeIfPresent(
key, (ignored, rv) -> rv <= de.getDependentResourceVersion() ? null : rv);
} else {
dependentEvents.put(key, de.getDependentResourceVersion());
}
}
}

public void markProcessedMarkForDeletion() {
Expand Down Expand Up @@ -107,6 +140,7 @@ public void unMarkEventReceived() {
// do nothing
break;
}
dependentEvents.clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.Objects;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

public class DependentEvent extends Event {

private final ResourceID dependent;
private final String dependentResourceVersion;
private final String kind;
private final boolean fromOperator;

public DependentEvent(
ResourceID targetCustomResource,
HasMetadata dependent,
boolean fromOperator) {
super(targetCustomResource);
this.dependent = ResourceID.fromResource(dependent);
this.dependentResourceVersion = dependent.getMetadata().getResourceVersion();
this.kind = dependent.getKind();
this.fromOperator = fromOperator;
}

public ResourceID getDependent() {
return dependent;
}

public String getDependentResourceVersion() {
return dependentResourceVersion;
}

public String getKind() {
return kind;
}

public boolean isFromOperator() {
return fromOperator;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
DependentEvent that = (DependentEvent) o;
return Objects.equals(dependent, that.dependent)
&& Objects.equals(dependentResourceVersion, that.dependentResourceVersion)
&& Objects.equals(kind, that.kind)
&& fromOperator == that.fromOperator;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), dependent, dependentResourceVersion, kind, fromOperator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void onDelete(R resource, boolean b) {
primaryToSecondaryIndex.onDelete(resource);
super.onDelete(resource, b);
if (acceptedByDeleteFilters(resource, b)) {
propagateEvent(resource);
propagateEvent(resource, false);
}
}

Expand All @@ -169,25 +169,23 @@ private synchronized void onAddOrUpdate(
Operation operation, R newObject, R oldObject, Runnable superOnOp) {
var resourceID = ResourceID.fromResource(newObject);

superOnOp.run();

if (canSkipEvent(newObject, oldObject, resourceID)) {
log.debug(
"Skipping event propagation for {}, since was a result of a reconcile action. Resource"
+ " ID: {}",
operation,
ResourceID.fromResource(newObject));
superOnOp.run();
} else if (eventAcceptedByFilter(operation, newObject, oldObject)) {
log.debug(
"Propagating event for {}, resource with same version not result of a reconciliation."
+ " Resource ID: {}",
operation,
resourceID);
propagateEvent(newObject, false);
} else {
superOnOp.run();
if (eventAcceptedByFilter(operation, newObject, oldObject)) {
log.debug(
"Propagating event for {}, resource with same version not result of a reconciliation."
+ " Resource ID: {}",
operation,
resourceID);
propagateEvent(newObject);
} else {
log.debug("Event filtered out for operation: {}, resourceID: {}", operation, resourceID);
}
log.debug("Event filtered out for operation: {}, resourceID: {}", operation, resourceID);
}
}

Expand Down Expand Up @@ -229,15 +227,19 @@ private boolean isEventKnownFromAnnotation(R newObject, R oldObject) {
return known;
}

private void propagateEvent(R object) {
private void propagateEvent(R object, boolean fromOperator) {
var primaryResourceIdSet =
configuration().getSecondaryToPrimaryMapper().toPrimaryResourceIDs(object);
if (primaryResourceIdSet.isEmpty()) {
return;
}
primaryResourceIdSet.forEach(
resourceId -> {
Event event = new Event(resourceId);
Event event =
new DependentEvent(
resourceId,
object,
fromOperator);
/*
* In fabric8 client for certain cases informers can be created on in a way that they are
* automatically started, what would cause a NullPointerException here, since an event
Expand Down Expand Up @@ -293,6 +295,7 @@ private void handleRecentCreateOrUpdate(Operation operation, R newResource, R ol
Optional.ofNullable(oldResource)
.map(r -> r.getMetadata().getResourceVersion())
.orElse(null));
propagateEvent(newResource, true);
}

private boolean useSecondaryToPrimaryIndex() {
Expand Down
Loading