diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index aa48359501..ba31b9b4dd 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -15,17 +15,22 @@ import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource; import io.javaoperatorsdk.operator.processing.retry.GenericRetry; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @SuppressWarnings("rawtypes") -public class Operator { +public class Operator implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(Operator.class); private final KubernetesClient k8sClient; private final ConfigurationService configurationService; private final ObjectMapper objectMapper; + private final List closeables; public Operator(KubernetesClient k8sClient, ConfigurationService configurationService) { this(k8sClient, configurationService, new ObjectMapper()); @@ -38,6 +43,7 @@ public Operator( this.k8sClient = k8sClient; this.configurationService = configurationService; this.objectMapper = objectMapper; + this.closeables = new ArrayList<>(); } /** @@ -64,6 +70,21 @@ public void start() { } } + /** Stop the operator. */ + @Override + public void close() { + log.info("Operator {} is shutting down...", configurationService.getVersion().getSdkVersion()); + + for (Closeable closeable : this.closeables) { + try { + log.debug("closing {}", closeable); + closeable.close(); + } catch (IOException e) { + log.warn("Error closing {}", closeable, e); + } + } + } + /** * Registers the specified controller with this operator. * @@ -160,10 +181,15 @@ public void register( customResourceCache, watchAllNamespaces, targetNamespaces, - defaultEventHandler, configuration.isGenerationAware(), - finalizer); - eventSourceManager.registerCustomResourceEventSource(customResourceEventSource); + finalizer, + resClass); + + closeables.add(customResourceEventSource); + closeables.add(eventSourceManager); + + customResourceEventSource.setEventHandler(defaultEventHandler); + customResourceEventSource.start(); log.info( "Registered Controller: '{}' for CRD: '{}' for namespace(s): {}", @@ -178,18 +204,14 @@ private CustomResourceEventSource createCustomResourceEventSource( CustomResourceCache customResourceCache, boolean watchAllNamespaces, String[] targetNamespaces, - DefaultEventHandler defaultEventHandler, boolean generationAware, - String finalizer) { - CustomResourceEventSource customResourceEventSource = - watchAllNamespaces - ? CustomResourceEventSource.customResourceEventSourceForAllNamespaces( - customResourceCache, client, generationAware, finalizer) - : CustomResourceEventSource.customResourceEventSourceForTargetNamespaces( - customResourceCache, client, targetNamespaces, generationAware, finalizer); - - customResourceEventSource.setEventHandler(defaultEventHandler); - - return customResourceEventSource; + String finalizer, + Class resClass) { + + return watchAllNamespaces + ? CustomResourceEventSource.customResourceEventSourceForAllNamespaces( + customResourceCache, client, generationAware, finalizer, resClass) + : CustomResourceEventSource.customResourceEventSourceForTargetNamespaces( + customResourceCache, client, targetNamespaces, generationAware, finalizer, resClass); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index a638816969..43389707b7 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -37,6 +37,7 @@ public class DefaultEventHandler implements EventHandler { private final EventDispatcher eventDispatcher; private final Retry retry; private final Map retryState = new HashMap<>(); + private final String controllerName; private DefaultEventSourceManager eventSourceManager; private final ReentrantLock lock = new ReentrantLock(); @@ -50,6 +51,7 @@ public DefaultEventHandler( this.customResourceCache = customResourceCache; this.eventDispatcher = eventDispatcher; this.retry = retry; + this.controllerName = relatedControllerName; eventBuffer = new EventBuffer(); executor = new ScheduledThreadPoolExecutor( @@ -70,6 +72,16 @@ public DefaultEventHandler( ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER); } + @Override + public void close() { + if (eventSourceManager != null) { + log.debug("Closing EventSourceManager {} -> {}", controllerName, eventSourceManager); + eventSourceManager.close(); + } + + executor.shutdownNow(); + } + public void setEventSourceManager(DefaultEventSourceManager eventSourceManager) { this.eventSourceManager = eventSourceManager; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java index 24e0fe2d4f..4a8a40517e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java @@ -1,10 +1,10 @@ package io.javaoperatorsdk.operator.processing.event; import io.javaoperatorsdk.operator.processing.DefaultEventHandler; -import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource; import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource; import java.util.Collections; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; @@ -17,9 +17,8 @@ public class DefaultEventSourceManager implements EventSourceManager { private static final Logger log = LoggerFactory.getLogger(DefaultEventSourceManager.class); private final ReentrantLock lock = new ReentrantLock(); - private Map eventSources = new ConcurrentHashMap<>(); - private CustomResourceEventSource customResourceEventSource; - private DefaultEventHandler defaultEventHandler; + private final Map eventSources = new ConcurrentHashMap<>(); + private final DefaultEventHandler defaultEventHandler; private TimerEventSource retryTimerEventSource; public DefaultEventSourceManager(DefaultEventHandler defaultEventHandler, boolean supportRetry) { @@ -30,23 +29,53 @@ public DefaultEventSourceManager(DefaultEventHandler defaultEventHandler, boolea } } - public void registerCustomResourceEventSource( - CustomResourceEventSource customResourceEventSource) { - this.customResourceEventSource = customResourceEventSource; - this.customResourceEventSource.addedToEventManager(); + @Override + public void close() { + try { + lock.lock(); + for (var entry : eventSources.entrySet()) { + try { + log.debug("Closing {} -> {}", entry.getKey(), entry.getValue()); + entry.getValue().close(); + } catch (Exception e) { + log.warn("Error closing {} -> {}", entry.getKey(), entry.getValue(), e); + } + } + + eventSources.clear(); + } finally { + lock.unlock(); + } } @Override - public void registerEventSource(String name, T eventSource) { + public final void registerEventSource(String name, EventSource eventSource) { + Objects.requireNonNull(eventSource, "EventSource must not be null"); + try { lock.lock(); - EventSource currentEventSource = eventSources.get(name); - if (currentEventSource != null) { + if (eventSources.containsKey(name)) { throw new IllegalStateException( "Event source with name already registered. Event source name: " + name); } eventSources.put(name, eventSource); eventSource.setEventHandler(defaultEventHandler); + eventSource.start(); + } finally { + lock.unlock(); + } + } + + @Override + public Optional deRegisterEventSource(String name) { + try { + lock.lock(); + EventSource currentEventSource = eventSources.remove(name); + if (currentEventSource != null) { + currentEventSource.close(); + } + + return Optional.ofNullable(currentEventSource); } finally { lock.unlock(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventHandler.java index 064b566220..d09a1c6d31 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventHandler.java @@ -1,6 +1,11 @@ package io.javaoperatorsdk.operator.processing.event; -public interface EventHandler { +import java.io.Closeable; + +public interface EventHandler extends Closeable { void handleEvent(Event event); + + @Override + default void close() {} } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java index bd992b71de..8076ff422f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java @@ -1,6 +1,21 @@ package io.javaoperatorsdk.operator.processing.event; -public interface EventSource { +import java.io.Closeable; + +public interface EventSource extends Closeable { + + /** + * This method is invoked when this {@link EventSource} instance is properly registered to a + * {@link EventSourceManager}. + */ + default void start() {} + + /** + * This method is invoked when this {@link EventSource} instance is de-registered from a {@link + * EventSourceManager}. + */ + @Override + default void close() {} void setEventHandler(EventHandler eventHandler); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index cc8e5660a8..650011ab1a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -1,14 +1,36 @@ package io.javaoperatorsdk.operator.processing.event; +import java.io.Closeable; import java.util.Map; import java.util.Optional; -public interface EventSourceManager { +public interface EventSourceManager extends Closeable { - void registerEventSource(String name, T eventSource); + /** + * Add the {@link EventSource} identified by the given name to the event manager. + * + * @param name the name of the {@link EventSource} to add + * @param eventSource the {@link EventSource} to register + * @thorw IllegalStateException if an {@link EventSource} with the same name is already + * registered. + */ + void registerEventSource(String name, EventSource eventSource); + + /** + * Remove the {@link EventSource} identified by the given name from the event + * manager. + * + * @param name the name of the {@link EventSource} to remove + * @return an optional {@link EventSource} which would be empty if no {@link EventSource} have + * been registered with the given name. + */ + Optional deRegisterEventSource(String name); Optional deRegisterCustomResourceFromEventSource( String name, String customResourceUid); Map getRegisteredEventSources(); + + @Override + default void close() {} } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java index f93d68684b..a1dcd329e3 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java @@ -4,6 +4,7 @@ import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion; import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.MixedOperation; @@ -11,6 +12,8 @@ import io.javaoperatorsdk.operator.processing.CustomResourceCache; import io.javaoperatorsdk.operator.processing.KubernetesResourceUtils; import io.javaoperatorsdk.operator.processing.event.AbstractEventSource; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; @@ -23,19 +26,22 @@ public class CustomResourceEventSource extends AbstractEventSource private static final Logger log = LoggerFactory.getLogger(CustomResourceEventSource.class); private final CustomResourceCache resourceCache; - private MixedOperation client; + private final MixedOperation client; private final String[] targetNamespaces; private final boolean generationAware; private final String resourceFinalizer; private final Map lastGenerationProcessedSuccessfully = new ConcurrentHashMap<>(); + private final List watches; + private final String resClass; public static CustomResourceEventSource customResourceEventSourceForAllNamespaces( CustomResourceCache customResourceCache, MixedOperation client, boolean generationAware, - String resourceFinalizer) { + String resourceFinalizer, + Class resClass) { return new CustomResourceEventSource( - customResourceCache, client, null, generationAware, resourceFinalizer); + customResourceCache, client, null, generationAware, resourceFinalizer, resClass); } public static CustomResourceEventSource customResourceEventSourceForTargetNamespaces( @@ -43,9 +49,10 @@ public static CustomResourceEventSource customResourceEventSourceForTargetNamesp MixedOperation client, String[] namespaces, boolean generationAware, - String resourceFinalizer) { + String resourceFinalizer, + Class resClass) { return new CustomResourceEventSource( - customResourceCache, client, namespaces, generationAware, resourceFinalizer); + customResourceCache, client, namespaces, generationAware, resourceFinalizer, resClass); } private CustomResourceEventSource( @@ -53,32 +60,50 @@ private CustomResourceEventSource( MixedOperation client, String[] targetNamespaces, boolean generationAware, - String resourceFinalizer) { + String resourceFinalizer, + Class resClass) { this.resourceCache = customResourceCache; this.client = client; this.targetNamespaces = targetNamespaces; this.generationAware = generationAware; this.resourceFinalizer = resourceFinalizer; + this.watches = new ArrayList<>(); + this.resClass = resClass.getName(); } private boolean isWatchAllNamespaces() { return targetNamespaces == null; } - public void addedToEventManager() { - registerWatch(); - } - - private void registerWatch() { + @Override + public void start() { CustomResourceOperationsImpl crClient = (CustomResourceOperationsImpl) client; if (isWatchAllNamespaces()) { - crClient.inAnyNamespace().watch(this); + var w = crClient.inAnyNamespace().watch(this); + watches.add(w); + log.debug("Registered controller {} -> {} for any namespace", resClass, w); } else if (targetNamespaces.length == 0) { - client.watch(this); + var w = client.watch(this); + watches.add(w); + log.debug( + "Registered controller {} -> {} for namespace {}", resClass, w, crClient.getNamespace()); } else { for (String targetNamespace : targetNamespaces) { - crClient.inNamespace(targetNamespace).watch(this); - log.debug("Registered controller for namespace: {}", targetNamespace); + var w = crClient.inNamespace(targetNamespace).watch(this); + watches.add(w); + log.debug("Registered controller {} -> {} for namespace: {}", resClass, w, targetNamespace); + } + } + } + + @Override + public void close() { + for (Watch watch : this.watches) { + try { + log.debug("Closing watch {} -> {}", resClass, watch); + watch.close(); + } catch (Exception e) { + log.warn("Error closing watcher {} -> {}", resClass, watch, e); } } } @@ -155,7 +180,8 @@ public void onClose(WatcherException e) { if (e.isHttpGone()) { log.warn("Received error for watch, will try to reconnect.", e); try { - registerWatch(); + close(); + start(); } catch (Throwable ex) { log.error("Unexpected error happened with watch reconnect. Will exit.", e); System.exit(1); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java index 9e286626ba..9bfd8e91e1 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java @@ -25,7 +25,7 @@ class CustomResourceEventSourceTest { private CustomResourceEventSource customResourceEventSource = CustomResourceEventSource.customResourceEventSourceForAllNamespaces( - customResourceCache, mixedOperation, true, FINALIZER); + customResourceCache, mixedOperation, true, FINALIZER, TestCustomResource.class); @BeforeEach public void setup() { @@ -73,7 +73,7 @@ public void normalExecutionIfGenerationChanges() { public void handlesAllEventIfNotGenerationAware() { customResourceEventSource = CustomResourceEventSource.customResourceEventSourceForAllNamespaces( - customResourceCache, mixedOperation, false, FINALIZER); + customResourceCache, mixedOperation, false, FINALIZER, TestCustomResource.class); setup(); TestCustomResource customResource1 = TestUtils.testCustomResource();