From e3a32d0499363fc976d3bcaea7a73866aa37feeb Mon Sep 17 00:00:00 2001 From: Luca Burgazzoli Date: Fri, 13 Aug 2021 11:02:35 +0200 Subject: [PATCH] feat: delay the registration of controller till the operator is started --- .../io/javaoperatorsdk/operator/Operator.java | 120 ++++++++++++++---- .../event/DefaultEventSourceManager.java | 7 +- .../operator/IntegrationTestSupport.java | 1 + .../sample/PureJavaApplicationRunner.java | 1 + .../operator/sample/Config.java | 2 +- 5 files changed, 97 insertions(+), 34 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 11254f185b..bda70ca7e6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -18,16 +18,21 @@ @SuppressWarnings("rawtypes") public class Operator implements AutoCloseable { - private static final Logger log = LoggerFactory.getLogger(Operator.class); private final KubernetesClient k8sClient; private final ConfigurationService configurationService; private final List closeables; + private final Object lock; + private final List controllers; + private volatile boolean started; public Operator(KubernetesClient k8sClient, ConfigurationService configurationService) { this.k8sClient = k8sClient; this.configurationService = configurationService; this.closeables = new ArrayList<>(); + this.lock = new Object(); + this.controllers = new ArrayList<>(); + this.started = false; Runtime.getRuntime().addShutdownHook(new Thread(this::close)); } @@ -45,43 +50,65 @@ public ConfigurationService getConfigurationService() { * where there is no obvious entrypoint to the application which can trigger the injection process * and start the cluster monitoring processes. */ + @SuppressWarnings("unchecked") public void start() { - final var version = configurationService.getVersion(); - log.info( - "Operator SDK {} (commit: {}) built on {} starting...", - version.getSdkVersion(), - version.getCommit(), - version.getBuiltTime()); - log.info("Client version: {}", Version.clientVersion()); - try { - final var k8sVersion = k8sClient.getVersion(); - if (k8sVersion != null) { - log.info("Server version: {}.{}", k8sVersion.getMajor(), k8sVersion.getMinor()); + synchronized (lock) { + if (started) { + return; + } + + final var version = configurationService.getVersion(); + log.info( + "Operator SDK {} (commit: {}) built on {} starting...", + version.getSdkVersion(), + version.getCommit(), + version.getBuiltTime()); + log.info("Client version: {}", Version.clientVersion()); + try { + final var k8sVersion = k8sClient.getVersion(); + if (k8sVersion != null) { + log.info("Server version: {}.{}", k8sVersion.getMajor(), k8sVersion.getMinor()); + } + } catch (Exception e) { + log.error("Error retrieving the server version. Exiting!", e); + throw new OperatorException("Error retrieving the server version", e); + } + + for (ControllerRef ref : controllers) { + startController(ref.controller, ref.configuration); } - } catch (Exception e) { - log.error("Error retrieving the server version. Exiting!", e); - throw new OperatorException("Error retrieving the server version", e); + + started = true; } } /** Stop the operator. */ @Override public void close() { - log.info( - "Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion()); + synchronized (lock) { + if (!started) { + return; + } - for (Closeable closeable : this.closeables) { - try { - log.debug("closing {}", closeable); - closeable.close(); - } catch (IOException e) { - log.warn("Error closing {}", closeable, e); + log.info( + "Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion()); + + for (Closeable closeable : this.closeables) { + try { + log.debug("closing {}", closeable); + closeable.close(); + } catch (IOException e) { + log.warn("Error closing {}", closeable, e); + } } + + started = false; } } /** - * Registers the specified controller with this operator. + * Add a registration requests for the specified controller with this operator. The effective + * registration of the controller is delayed till the operator is started. * * @param controller the controller to register * @param the {@code CustomResource} type associated with the controller @@ -92,6 +119,32 @@ public void register(ResourceController controller register(controller, null); } + /** + * Add a registration requests for the specified controller with this operator, overriding its + * default configuration by the specified one (usually created via {@link + * io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider#override(ControllerConfiguration)}, + * passing it the controller's original configuration. The effective registration of the + * controller is delayed till the operator is started. + * + * @param controller the controller to register + * @param configuration the configuration with which we want to register the controller, if {@code + * null}, the controller's original configuration is used + * @param the {@code CustomResource} type associated with the controller + * @throws OperatorException if a problem occurred during the registration process + */ + public void register( + ResourceController controller, ControllerConfiguration configuration) + throws OperatorException { + synchronized (lock) { + if (!started) { + this.controllers.add(new ControllerRef(controller, configuration)); + } else { + this.controllers.add(new ControllerRef(controller, configuration)); + startController(controller, configuration); + } + } + } + /** * Registers the specified controller with this operator, overriding its default configuration by * the specified one (usually created via {@link @@ -104,9 +157,10 @@ public void register(ResourceController controller * @param the {@code CustomResource} type associated with the controller * @throws OperatorException if a problem occurred during the registration process */ - public void register( + private void startController( ResourceController controller, ControllerConfiguration configuration) throws OperatorException { + final var existing = configurationService.getConfigurationFor(controller); if (existing == null) { log.warn( @@ -120,7 +174,7 @@ public void register( configuration = existing; } - Class resClass = configuration.getCustomResourceClass(); + final Class resClass = configuration.getCustomResourceClass(); final String controllerName = configuration.getName(); final var crdName = configuration.getCRDName(); final var specVersion = "v1"; @@ -137,10 +191,10 @@ public void register( CustomResourceUtils.assertCustomResource(resClass, crd); } - final var client = k8sClient.customResources(resClass); try { DefaultEventSourceManager eventSourceManager = - new DefaultEventSourceManager(controller, configuration, client); + new DefaultEventSourceManager( + controller, configuration, k8sClient.customResources(resClass)); controller.init(eventSourceManager); closeables.add(eventSourceManager); } catch (MissingCRDException e) { @@ -195,4 +249,14 @@ private static boolean failOnMissingCurrentNS( } return false; } + + private static class ControllerRef { + public final ResourceController controller; + public final ControllerConfiguration configuration; + + public ControllerRef(ResourceController controller, ControllerConfiguration configuration) { + this.controller = controller; + this.configuration = configuration; + } + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java index 08fa77bdb3..75aad9081d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java @@ -1,10 +1,8 @@ package io.javaoperatorsdk.operator.processing.event; -import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.client.CustomResource; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.dsl.MixedOperation; -import io.fabric8.kubernetes.client.dsl.Resource; import io.javaoperatorsdk.operator.MissingCRDException; import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.api.ResourceController; @@ -45,10 +43,9 @@ public class DefaultEventSourceManager implements EventSourceManager { } } + @SuppressWarnings({"rawtypes", "unchecked"}) public > DefaultEventSourceManager( - ResourceController controller, - ControllerConfiguration configuration, - MixedOperation, Resource> client) { + ResourceController controller, ControllerConfiguration configuration, MixedOperation client) { this(new DefaultEventHandler(controller, configuration, client), true); registerEventSource( CUSTOM_RESOURCE_EVENT_SOURCE_NAME, new CustomResourceEventSource<>(client, configuration)); diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/IntegrationTestSupport.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/IntegrationTestSupport.java index e2cd129fb2..f69317b646 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/IntegrationTestSupport.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/IntegrationTestSupport.java @@ -68,6 +68,7 @@ public void initialize(KubernetesClient k8sClient, ResourceController controller overriddenConfig.withRetry(retry); } operator.register(controller, overriddenConfig.build()); + operator.start(); log.info("Operator is running with {}", controller.getClass().getCanonicalName()); } diff --git a/samples/pure-java/src/main/java/io/javaoperatorsdk/operator/sample/PureJavaApplicationRunner.java b/samples/pure-java/src/main/java/io/javaoperatorsdk/operator/sample/PureJavaApplicationRunner.java index 24d2957ef7..fae8f88c9e 100644 --- a/samples/pure-java/src/main/java/io/javaoperatorsdk/operator/sample/PureJavaApplicationRunner.java +++ b/samples/pure-java/src/main/java/io/javaoperatorsdk/operator/sample/PureJavaApplicationRunner.java @@ -11,5 +11,6 @@ public static void main(String[] args) { KubernetesClient client = new DefaultKubernetesClient(); Operator operator = new Operator(client, DefaultConfigurationService.instance()); operator.register(new CustomServiceController(client)); + operator.start(); } } diff --git a/samples/spring-boot-plain/src/main/java/io/javaoperatorsdk/operator/sample/Config.java b/samples/spring-boot-plain/src/main/java/io/javaoperatorsdk/operator/sample/Config.java index 509b83bcd9..8aa5b1d400 100644 --- a/samples/spring-boot-plain/src/main/java/io/javaoperatorsdk/operator/sample/Config.java +++ b/samples/spring-boot-plain/src/main/java/io/javaoperatorsdk/operator/sample/Config.java @@ -23,7 +23,7 @@ public CustomServiceController customServiceController(KubernetesClient client) } // Register all controller beans - @Bean + @Bean(initMethod = "start", destroyMethod = "stop") public Operator operator(KubernetesClient client, List controllers) { Operator operator = new Operator(client, DefaultConfigurationService.instance()); controllers.forEach(operator::register);