From 58954d098c20fbfac732386c0434440a4c854e7e Mon Sep 17 00:00:00 2001 From: Steven Hawkins Date: Tue, 26 Mar 2024 10:53:19 -0400 Subject: [PATCH] fix: using tombstones to account for rapid deletion closes: #2314 Signed-off-by: Steven Hawkins --- .../KubernetesDependentResource.java | 22 +++++++++ .../source/informer/InformerEventSource.java | 8 +++ .../informer/TemporaryResourceCache.java | 44 ++++++++++++++--- .../informer/TemporaryResourceCacheTest.java | 17 +++++++ .../ExternalStateReconciler.java | 49 ++++++++++++------- 5 files changed, 115 insertions(+), 25 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java index b804b88a30..3ad733ff1b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java @@ -213,6 +213,28 @@ private boolean usePreviousAnnotation(Context

context) { .previousAnnotationForDependentResourcesEventFiltering(); } + @Override + protected R handleCreate(R desired, P primary, Context

context) { + var id = ResourceID.fromResource(desired); + try { + eventSource().orElseThrow().prepareForAddOrUpdate(id); + return super.handleCreate(desired, primary, context); + } finally { + eventSource().orElseThrow().finishAddOrUpdate(id); + } + } + + @Override + protected R handleUpdate(R actual, R desired, P primary, Context

context) { + var id = ResourceID.fromResource(desired); + try { + eventSource().orElseThrow().prepareForAddOrUpdate(id); + return super.handleUpdate(actual, desired, primary, context); + } finally { + eventSource().orElseThrow().finishAddOrUpdate(id); + } + } + @Override protected void handleDelete(P primary, R secondary, Context

context) { if (secondary != null) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 81d31f7407..d3f3b23694 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -270,6 +270,14 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res handleRecentCreateOrUpdate(Operation.ADD, resource, null); } + public void prepareForAddOrUpdate(ResourceID id) { + this.temporaryResourceCache.prepareForAddOrUpdate(id); + } + + public void finishAddOrUpdate(ResourceID id) { + this.temporaryResourceCache.finshedAddOrUpdate(id); + } + private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) { primaryToSecondaryIndex.onAddOrUpdate(newResource); temporaryResourceCache.putResource(newResource, Optional.ofNullable(oldResource) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index fd9a8ad565..0a6f5ad84d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -1,7 +1,9 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; +import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -40,6 +42,9 @@ public class TemporaryResourceCache { private static final int MAX_RESOURCE_VERSIONS = 256; private final Map cache = new ConcurrentHashMap<>(); + + private final Map> tombstones = + new ConcurrentHashMap>(); private final ManagedInformerEventSource managedInformerEventSource; private final boolean parseResourceVersions; private final Set knownResourceVersions; @@ -51,7 +56,7 @@ public TemporaryResourceCache(ManagedInformerEventSource managedInforme if (parseResourceVersions) { knownResourceVersions = Collections.newSetFromMap(new LinkedHashMap() { @Override - protected boolean removeEldestEntry(java.util.Map.Entry eldest) { + protected boolean removeEldestEntry(Map.Entry eldest) { return size() >= MAX_RESOURCE_VERSIONS; } }); @@ -60,6 +65,22 @@ protected boolean removeEldestEntry(java.util.Map.Entry eldest) } } + public void prepareForAddOrUpdate(ResourceID id) { + tombstones.put(id, new ArrayList<>()); + } + + public void finshedAddOrUpdate(ResourceID id) { + tombstones.remove(id); + } + + public synchronized void onDeleteEvent(T resource, boolean unknownState) { + tombstones.computeIfPresent(ResourceID.fromResource(resource), (k, v) -> { + v.add(resource.getMetadata().getUid()); + return v; + }); + onEvent(resource, unknownState); + } + public synchronized void onEvent(T resource, boolean unknownState) { cache.computeIfPresent(ResourceID.fromResource(resource), (id, cached) -> (unknownState || !isLaterResourceVersion(id, cached, resource)) ? null @@ -84,14 +105,27 @@ public synchronized void putResource(T newResource, String previousResourceVersi var cachedResource = getResourceFromCache(resourceId) .orElse(managedInformerEventSource.get(resourceId).orElse(null)); - if ((previousResourceVersion == null && cachedResource == null) + boolean moveAhead = false; + if (previousResourceVersion == null && cachedResource == null) { + if (Optional.ofNullable(tombstones.get(resourceId)) + .filter(list -> list.contains(newResource.getMetadata().getUid())).isPresent()) { + log.debug( + "Won't resurrect uid {} for resource id: {}", + newResource.getMetadata().getUid(), resourceId); + return; + } + // we can skip further checks as this is a simple add and there's no previous entry to consider + moveAhead = true; + } + + if (moveAhead || (cachedResource != null && (cachedResource.getMetadata().getResourceVersion().equals(previousResourceVersion)) || isLaterResourceVersion(resourceId, newResource, cachedResource))) { log.debug( "Temporarily moving ahead to target version {} for resource id: {}", newResource.getMetadata().getResourceVersion(), resourceId); - putToCache(newResource, resourceId); + cache.put(resourceId, newResource); } else if (cache.remove(resourceId) != null) { log.debug("Removed an obsolete resource from cache for id: {}", resourceId); } @@ -123,10 +157,6 @@ private boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T c return false; } - private void putToCache(T resource, ResourceID resourceID) { - cache.put(resourceID == null ? ResourceID.fromResource(resource) : resourceID, resource); - } - public synchronized Optional getResourceFromCache(ResourceID resourceID) { return Optional.ofNullable(cache.get(resourceID)); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java index d641736739..c44957bd3b 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java @@ -110,6 +110,22 @@ void resourceVersionParsing() { .isNotPresent(); } + @Test + void rapidDeletion() { + var testResource = testResource(); + ResourceID id = ResourceID.fromResource(testResource); + temporaryResourceCache.prepareForAddOrUpdate(id); + temporaryResourceCache.onEvent(testResource, false); // create + temporaryResourceCache.onDeleteEvent(new ConfigMapBuilder(testResource).editMetadata() + .withResourceVersion("3").endMetadata().build(), false); + + // put should be rejected + temporaryResourceCache.putAddedResource(testResource); + + assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) + .isEmpty(); + } + private ConfigMap propagateTestResourceToCache() { var testResource = testResource(); when(informerEventSource.get(any())).thenReturn(Optional.empty()); @@ -127,6 +143,7 @@ ConfigMap testResource() { configMap.getMetadata().setName("test"); configMap.getMetadata().setNamespace("default"); configMap.getMetadata().setResourceVersion(RESOURCE_VERSION); + configMap.getMetadata().setUid("test-uid"); return configMap; } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/externalstate/ExternalStateReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/externalstate/ExternalStateReconciler.java index 66c53c3971..1252a88acf 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/externalstate/ExternalStateReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/externalstate/ExternalStateReconciler.java @@ -10,7 +10,14 @@ import io.fabric8.kubernetes.api.model.ConfigMapBuilder; import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; -import io.javaoperatorsdk.operator.api.reconciler.*; +import io.javaoperatorsdk.operator.api.reconciler.Cleaner; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.EventSource; import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority; @@ -64,24 +71,29 @@ private void updateExternalResource(ExternalStateCustomResource resource, private void createExternalResource(ExternalStateCustomResource resource, Context context) { - var createdResource = - externalService.create(new ExternalResource(resource.getSpec().getData())); - var configMap = new ConfigMapBuilder() - .withMetadata(new ObjectMetaBuilder() - .withName(resource.getMetadata().getName()) - .withNamespace(resource.getMetadata().getNamespace()) - .build()) - .withData(Map.of(ID_KEY, createdResource.getId())) - .build(); - configMap.addOwnerReference(resource); - context.getClient().configMaps().resource(configMap).create(); - var primaryID = ResourceID.fromResource(resource); - // Making sure that the created resources are in the cache for the next reconciliation. - // This is critical in this case, since on next reconciliation if it would not be in the cache - // it would be created again. - configMapEventSource.handleRecentResourceCreate(primaryID, configMap); - externalResourceEventSource.handleRecentResourceCreate(primaryID, createdResource); + try { + var createdResource = + externalService.create(new ExternalResource(resource.getSpec().getData())); + var configMap = new ConfigMapBuilder() + .withMetadata(new ObjectMetaBuilder() + .withName(resource.getMetadata().getName()) + .withNamespace(resource.getMetadata().getNamespace()) + .build()) + .withData(Map.of(ID_KEY, createdResource.getId())) + .build(); + configMap.addOwnerReference(resource); + configMapEventSource.prepareForAddOrUpdate(primaryID); + context.getClient().configMaps().resource(configMap).create(); + + // Making sure that the created resources are in the cache for the next reconciliation. + // This is critical in this case, since on next reconciliation if it would not be in the cache + // it would be created again. + configMapEventSource.handleRecentResourceCreate(primaryID, configMap); + externalResourceEventSource.handleRecentResourceCreate(primaryID, createdResource); + } finally { + configMapEventSource.finishAddOrUpdate(primaryID); + } } @Override @@ -94,6 +106,7 @@ public DeleteControl cleanup(ExternalStateCustomResource resource, return DeleteControl.defaultDelete(); } + @Override public int getNumberOfExecutions() { return numberOfExecutions.get(); }