diff --git a/cli/src/main/java/io/kestra/cli/App.java b/cli/src/main/java/io/kestra/cli/App.java index 12210448ea7..490eece75b9 100644 --- a/cli/src/main/java/io/kestra/cli/App.java +++ b/cli/src/main/java/io/kestra/cli/App.java @@ -7,7 +7,6 @@ import io.kestra.cli.commands.servers.ServerCommand; import io.kestra.cli.commands.sys.SysCommand; import io.kestra.cli.commands.templates.TemplateCommand; -import io.kestra.core.contexts.KestraApplicationContext; import io.kestra.core.plugins.DefaultPluginRegistry; import io.kestra.core.plugins.PluginRegistry; import io.micronaut.configuration.picocli.MicronautFactory; @@ -65,7 +64,7 @@ protected static void execute(Class cls, String... args) { SLF4JBridgeHandler.install(); // Init ApplicationContext - ApplicationContext applicationContext = App.applicationContext(cls, args, DefaultPluginRegistry.getOrCreate()); + ApplicationContext applicationContext = App.applicationContext(cls, args); // Call Picocli command int exitCode = new CommandLine(cls, new MicronautFactory(applicationContext)).execute(args); @@ -84,10 +83,10 @@ protected static void execute(Class cls, String... args) { * @return the application context created */ protected static ApplicationContext applicationContext(Class mainClass, - String[] args, - PluginRegistry pluginRegistry) { + String[] args) { - ApplicationContextBuilder builder = KestraApplicationContext.builder(pluginRegistry) + ApplicationContextBuilder builder = ApplicationContext + .builder() .mainClass(mainClass) .environments(Environment.CLI); @@ -123,12 +122,7 @@ protected static ApplicationContext applicationContext(Class mainClass, }); builder.properties(properties); - - // Load external plugins before starting ApplicationContext - Path pluginPath = ((AbstractCommand)commandLine.getCommandSpec().userObject()).pluginsPath; - pluginRegistry.registerIfAbsent(pluginPath); } - return builder.build(); } diff --git a/core/src/main/java/io/kestra/core/contexts/KestraApplicationContext.java b/core/src/main/java/io/kestra/core/contexts/KestraApplicationContext.java deleted file mode 100644 index ecfc3aaf68c..00000000000 --- a/core/src/main/java/io/kestra/core/contexts/KestraApplicationContext.java +++ /dev/null @@ -1,77 +0,0 @@ -package io.kestra.core.contexts; - -import io.kestra.core.plugins.DefaultPluginRegistry; -import io.kestra.core.plugins.PluginRegistry; -import io.micronaut.context.ApplicationContext; -import io.micronaut.context.ApplicationContextBuilder; -import io.micronaut.context.ApplicationContextConfiguration; -import io.micronaut.context.DefaultApplicationContext; -import io.micronaut.context.DefaultApplicationContextBuilder; -import io.micronaut.context.env.Environment; -import io.micronaut.core.annotation.NonNull; -import io.micronaut.core.annotation.Nullable; -import io.micronaut.core.io.service.SoftServiceLoader; -import io.micronaut.inject.BeanDefinitionReference; - -import java.util.List; - -/** - * Overload the {@link DefaultApplicationContext} in order to add plugins - * into the {@link io.micronaut.context.DefaultBeanContext} - */ -@SuppressWarnings("rawtypes") -public class KestraApplicationContext extends DefaultApplicationContext { - private final PluginRegistry pluginRegistry; - - private final ApplicationContext delegate; - - public static ApplicationContextBuilder builder(@Nullable PluginRegistry pluginRegistry) { - DefaultApplicationContextBuilder builder = new DefaultApplicationContextBuilder() { - @Override - public ApplicationContext build() { - return new KestraApplicationContext(super.build(), this, pluginRegistry); - } - }; - // Register PluginRegistry as singleton - return builder.singletons(pluginRegistry); - } - - public KestraApplicationContext(@NonNull ApplicationContext delegate, - @NonNull ApplicationContextConfiguration configuration, - PluginRegistry pluginRegistry) { - super(configuration); - this.delegate = delegate; - this.pluginRegistry = pluginRegistry; - } - - /** - * {@inheritDoc} - **/ - @Override - public Environment getEnvironment() { - return delegate.getEnvironment(); - } - - /** - * Resolves the {@link BeanDefinitionReference} class instances from the {@link io.kestra.core.plugins.PluginRegistry}. - * to found all external implementations of the following plugin types. - *

- * - {@link io.kestra.core.secret.SecretPluginInterface} - * - {@link io.kestra.core.storages.StorageInterface} - * - * @return The bean definition classes - */ - @Override - protected @NonNull List resolveBeanDefinitionReferences() { - List resolvedBeanReferences = super.resolveBeanDefinitionReferences(); - if (pluginRegistry != null) { - ((DefaultPluginRegistry)pluginRegistry) - .externalPlugins() - .forEach(plugin -> { - final SoftServiceLoader definitions = SoftServiceLoader.load(BeanDefinitionReference.class, plugin.getClassLoader()); - definitions.collectAll(resolvedBeanReferences, BeanDefinitionReference::isPresent); - }); - } - return resolvedBeanReferences; - } -} \ No newline at end of file diff --git a/core/src/main/java/io/kestra/core/contexts/KestraBeansFactory.java b/core/src/main/java/io/kestra/core/contexts/KestraBeansFactory.java index 53b5336f6fa..aee7f9ba6b7 100644 --- a/core/src/main/java/io/kestra/core/contexts/KestraBeansFactory.java +++ b/core/src/main/java/io/kestra/core/contexts/KestraBeansFactory.java @@ -2,16 +2,66 @@ import io.kestra.core.plugins.DefaultPluginRegistry; import io.kestra.core.plugins.PluginRegistry; +import io.kestra.core.storages.StorageInterface; +import io.kestra.core.storages.StorageInterfaceFactory; +import io.micronaut.context.annotation.Bean; +import io.micronaut.context.annotation.ConfigurationProperties; import io.micronaut.context.annotation.Factory; import io.micronaut.context.annotation.Requires; +import io.micronaut.context.annotation.Value; +import io.micronaut.core.annotation.Nullable; +import io.micronaut.core.convert.format.MapFormat; +import io.micronaut.core.naming.conventions.StringConvention; +import jakarta.inject.Inject; import jakarta.inject.Singleton; +import jakarta.validation.Validator; + +import java.io.IOException; +import java.util.Map; +import java.util.Optional; @Factory public class KestraBeansFactory { + @Inject + Validator validator; + + @Inject + StorageConfig storageConfig; + + @Value("${kestra.storage.type") + Optional storageType; + @Requires(missingBeans = PluginRegistry.class) @Singleton public PluginRegistry pluginRegistry() { return DefaultPluginRegistry.getOrCreate(); } + + @Requires(missingBeans = StorageInterface.class) + @Requires(property = "kestra.storage.type") + @Singleton + @Bean(preDestroy = "close") + public StorageInterface storageInterface(final PluginRegistry pluginRegistry) throws IOException { + String pluginId = storageType.get(); + return StorageInterfaceFactory.make(pluginRegistry, pluginId, storageConfig.getStorageConfig(pluginId), validator); + } + + @ConfigurationProperties("kestra") + public record StorageConfig( + @Nullable + @MapFormat(keyFormat = StringConvention.CAMEL_CASE, transformation = MapFormat.MapTransformation.NESTED) + Map storage + ) { + + /** + * Returns the configuration for the configured storage. + * + * @return the configuration. + */ + @SuppressWarnings("unchecked") + private Map getStorageConfig(String type) { + return (Map) storage.get(type); + } + } } diff --git a/core/src/main/java/io/kestra/core/exceptions/KestraRuntimeException.java b/core/src/main/java/io/kestra/core/exceptions/KestraRuntimeException.java new file mode 100644 index 00000000000..d2d4d99f503 --- /dev/null +++ b/core/src/main/java/io/kestra/core/exceptions/KestraRuntimeException.java @@ -0,0 +1,22 @@ +package io.kestra.core.exceptions; + +/** + * The top-level {@link KestraRuntimeException} for non-recoverable errors. + */ +public class KestraRuntimeException extends RuntimeException { + + public KestraRuntimeException() { + } + + public KestraRuntimeException(String message) { + super(message); + } + + public KestraRuntimeException(String message, Throwable cause) { + super(message, cause); + } + + public KestraRuntimeException(Throwable cause) { + super(cause); + } +} diff --git a/core/src/main/java/io/kestra/core/models/tasks/retrys/AbstractRetry.java b/core/src/main/java/io/kestra/core/models/tasks/retrys/AbstractRetry.java index 9668f010b10..ef0a1656a7d 100644 --- a/core/src/main/java/io/kestra/core/models/tasks/retrys/AbstractRetry.java +++ b/core/src/main/java/io/kestra/core/models/tasks/retrys/AbstractRetry.java @@ -22,7 +22,6 @@ }) @Getter @NoArgsConstructor -@Introspected @SuperBuilder public abstract class AbstractRetry { abstract public String getType(); diff --git a/core/src/main/java/io/kestra/core/plugins/PluginModule.java b/core/src/main/java/io/kestra/core/plugins/PluginModule.java index 874ce48f45c..6aca015b8ad 100644 --- a/core/src/main/java/io/kestra/core/plugins/PluginModule.java +++ b/core/src/main/java/io/kestra/core/plugins/PluginModule.java @@ -3,7 +3,6 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import io.kestra.core.models.conditions.Condition; -import io.kestra.core.models.listeners.Listener; import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.runners.TaskRunner; import io.kestra.core.models.triggers.AbstractTrigger; diff --git a/core/src/main/java/io/kestra/core/storages/StorageInterface.java b/core/src/main/java/io/kestra/core/storages/StorageInterface.java index 80a83d562c5..b34d5e41976 100644 --- a/core/src/main/java/io/kestra/core/storages/StorageInterface.java +++ b/core/src/main/java/io/kestra/core/storages/StorageInterface.java @@ -15,7 +15,24 @@ import java.util.List; @Introspected -public interface StorageInterface extends Plugin { +public interface StorageInterface extends AutoCloseable, Plugin { + + /** + * Opens any resources or perform any pre-checks for initializing this storage. + * + * @throws IOException if an error happens during initialization. + */ + default void init() throws IOException { + // no-op + } + + /** + * Closes any resources used by this class. + */ + @Override + default void close() { + // no-op + } @Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class}) InputStream get(String tenantId, URI uri) throws IOException; diff --git a/core/src/main/java/io/kestra/core/storages/StorageInterfaceFactory.java b/core/src/main/java/io/kestra/core/storages/StorageInterfaceFactory.java new file mode 100644 index 00000000000..ac872cadd2c --- /dev/null +++ b/core/src/main/java/io/kestra/core/storages/StorageInterfaceFactory.java @@ -0,0 +1,119 @@ +package io.kestra.core.storages; + +import io.kestra.core.exceptions.KestraRuntimeException; +import io.kestra.core.models.Plugin; +import io.kestra.core.plugins.PluginRegistry; +import io.kestra.core.plugins.RegisteredPlugin; +import io.kestra.core.serializers.JacksonMapper; +import jakarta.annotation.Nullable; +import jakarta.validation.ConstraintViolation; +import jakarta.validation.ConstraintViolationException; +import jakarta.validation.Validator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Factor class for constructing {@link StorageInterface} objects. + */ +public final class StorageInterfaceFactory { + + private static final Logger log = LoggerFactory.getLogger(StorageInterfaceFactory.class); + + /** + * Factory method for constructing and validating new {@link StorageInterface} of the given type with the given configuration. + * + * @param pluginRegistry The {@link PluginRegistry}. cannot be {@code null}. + * @param pluginId The ID of the storage. cannot be {@code null}. + * @param pluginConfiguration The configuration of the storage. cannot be {@code null}. + * @param validator The {@link Validator}. + * @return a new {@link StorageInterface}. + * @throws KestraRuntimeException if no storage can be found. + */ + public static StorageInterface make(final PluginRegistry pluginRegistry, + final String pluginId, + final Map pluginConfiguration, + @Nullable final Validator validator) { + List> classes = allStorageClasses(pluginRegistry) + .filter(clazz -> Plugin.getId(clazz).map(id -> id.equalsIgnoreCase(pluginId)).orElse(false)) + .toList(); + + if (classes.isEmpty()) { + String storageIds = allIdsFor(allStorageClasses(pluginRegistry)); + throw new KestraRuntimeException(String.format( + "No storage interface can be found for 'kestra.storage.type=%s'. Supported types are: %s", pluginId, storageIds + )); + } + + if (classes.size() > 1) { + String storageIds = allIdsFor(classes.stream()); + log.warn("Multiple storage interface candidates was found for 'kestra.storage.type={}' ({}). First one is used.", pluginId, storageIds); + } + + Class storageClass = classes.get(0); + + // Storage are handle as any serializable/deserialize plugins. + StorageInterface plugin; + try { + plugin = JacksonMapper.toMap(pluginConfiguration, storageClass); + } catch (Exception e) { + throw new KestraRuntimeException(String.format( + "Failed to create storage '%s'. Error: %s", pluginId, e.getMessage()) + ); + } + + // Validate configuration. + if (validator != null) { + Set> violations; + try { + violations = validator.validate(plugin); + } catch (ConstraintViolationException e) { + throw new KestraRuntimeException(String.format( + "Failed to validate configuration for storage '%s'. Error: %s", pluginId, e.getMessage()) + ); + } + if (!violations.isEmpty()) { + ConstraintViolationException e = new ConstraintViolationException(violations); + throw new KestraRuntimeException(String.format( + "Invalid configuration for storage '%s'. Error: '%s'", pluginId, e.getMessage()), e + ); + } + } + + try { + plugin.init(); + } catch (IOException e) { + throw new KestraRuntimeException(String.format( + "Failed to initialize storage '%s'. Error: %s", pluginId, e.getMessage()), e + ); + } + return plugin; + } + + /** + * @return all plugin classes for the {@link StorageInterface}s. + */ + private static Stream> allStorageClasses(final PluginRegistry pluginRegistry) { + return pluginRegistry.plugins() + .stream() + .map(RegisteredPlugin::getStorages) + .flatMap(List::stream); + } + + /** + * @return all plugin identifier for the {@link StorageInterface}s. + */ + private static String allIdsFor(final Stream> classes) { + return classes + .map(Plugin::getId) + .flatMap(Optional::stream) + .collect(Collectors.joining(",", "[", "]")); + } +} diff --git a/core/src/test/java/io/kestra/core/models/PluginTest.java b/core/src/test/java/io/kestra/core/models/PluginTest.java new file mode 100644 index 00000000000..762c53d70d6 --- /dev/null +++ b/core/src/test/java/io/kestra/core/models/PluginTest.java @@ -0,0 +1,26 @@ +package io.kestra.core.models; + +import io.kestra.core.models.annotations.Plugin; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Optional; + +class PluginTest { + + @Test + void shouldReturnTrueForInternal() { + Assertions.assertTrue( io.kestra.core.models.Plugin.isInternal(TestPlugin.class)); + } + + @Test + void shouldReturnPluginId() { + Assertions.assertEquals(Optional.of("test"), io.kestra.core.models.Plugin.getId(TestPlugin.class)); + } + + @Plugin(internal = true) + @Plugin.Id("test") + public static class TestPlugin implements io.kestra.core.models.Plugin { + + } +} \ No newline at end of file diff --git a/core/src/test/java/io/kestra/core/storages/StorageInterfaceFactoryTest.java b/core/src/test/java/io/kestra/core/storages/StorageInterfaceFactoryTest.java new file mode 100644 index 00000000000..bee03542bdc --- /dev/null +++ b/core/src/test/java/io/kestra/core/storages/StorageInterfaceFactoryTest.java @@ -0,0 +1,45 @@ +package io.kestra.core.storages; + +import io.kestra.core.exceptions.KestraRuntimeException; +import io.kestra.core.plugins.DefaultPluginRegistry; +import io.kestra.storage.local.LocalStorage; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import jakarta.validation.ConstraintViolation; +import jakarta.validation.ConstraintViolationException; +import jakarta.validation.Validator; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +@MicronautTest +class StorageInterfaceFactoryTest { + + DefaultPluginRegistry registry = DefaultPluginRegistry.getOrCreate(); + + @Inject + Validator validator; + + @Test + void shouldReturnStorageGivenValidId() { + StorageInterface storage = StorageInterfaceFactory.make(registry, "local", Map.of("basePath", "/tmp/kestra"), validator); + Assertions.assertNotNull(storage); + Assertions.assertEquals(LocalStorage.class.getName(), storage.getType()); + } + + @Test + void shouldFailedGivenInvalidId() { + Assertions.assertThrows(KestraRuntimeException.class, + () -> StorageInterfaceFactory.make(registry, "invalid", Map.of(), validator)); + } + + @Test + void shouldFailedGivenInvalidConfig() { + KestraRuntimeException e = Assertions.assertThrows(KestraRuntimeException.class, + () -> StorageInterfaceFactory.make(registry, "local", Map.of(), validator)); + + Assertions.assertTrue(e.getCause() instanceof ConstraintViolationException); + Assertions.assertEquals("basePath: must not be null", e.getCause().getMessage()); + } +} \ No newline at end of file diff --git a/model/src/main/java/io/kestra/core/models/Plugin.java b/model/src/main/java/io/kestra/core/models/Plugin.java index 6ebc81eef0a..df96478b4b1 100644 --- a/model/src/main/java/io/kestra/core/models/Plugin.java +++ b/model/src/main/java/io/kestra/core/models/Plugin.java @@ -1,7 +1,9 @@ package io.kestra.core.models; +import io.kestra.core.models.annotations.Plugin.Id; import jakarta.validation.constraints.NotNull; +import java.util.Objects; import java.util.Optional; /** @@ -26,9 +28,22 @@ default String getType() { * @return {@code true} if the plugin is internal. */ static boolean isInternal(final Class plugin) { + Objects.requireNonNull(plugin, "Cannot check if a plugin is internal from null"); io.kestra.core.models.annotations.Plugin annotation = plugin.getAnnotation(io.kestra.core.models.annotations.Plugin.class); return Optional.ofNullable(annotation) .map(io.kestra.core.models.annotations.Plugin::internal) .orElse(false); } + + /** + * Static helper method to gt the id of a plugin. + * + * @param plugin The plugin type. + * @return an optional string id. + */ + static Optional getId(final Class plugin) { + Objects.requireNonNull(plugin, "Cannot get plugin id from null"); + Id annotation = plugin.getAnnotation(Id.class); + return Optional.ofNullable(annotation).map(Id::value).map(String::toLowerCase); + } } diff --git a/model/src/main/java/io/kestra/core/models/annotations/Plugin.java b/model/src/main/java/io/kestra/core/models/annotations/Plugin.java index 730c4ce9893..9549fd740b1 100644 --- a/model/src/main/java/io/kestra/core/models/annotations/Plugin.java +++ b/model/src/main/java/io/kestra/core/models/annotations/Plugin.java @@ -21,7 +21,7 @@ /** * Specifies whether the annotated plugin class is internal to Kestra. *

- * An internal plugin can be resolved through the {@link io.kestra.core.plugins.PluginRegistry}, but cannot + * An internal plugin can be resolved through the PluginRegistry, but cannot * be referenced directly in a YAML flow definition. * * @return {@code true} if the plugin is internal. Otherwise {@link false}. @@ -35,4 +35,16 @@ * For the moment, aliases are considered as deprecated plugins replaced by the class annotated. */ String[] aliases() default {}; + + @Documented + @Inherited + @Retention(RUNTIME) + @Target({ElementType.TYPE}) + @interface Id { + /** + * Specifies the unique ID for identifying a plugin. ID is case-insensitive. + * @return The string identifier. + */ + String value(); + } } diff --git a/storage-local/src/main/java/io/kestra/storage/local/LocalStorage.java b/storage-local/src/main/java/io/kestra/storage/local/LocalStorage.java index 31a34226e79..cfd9a279d92 100644 --- a/storage-local/src/main/java/io/kestra/storage/local/LocalStorage.java +++ b/storage-local/src/main/java/io/kestra/storage/local/LocalStorage.java @@ -1,10 +1,13 @@ package io.kestra.storage.local; import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.storages.FileAttributes; import io.kestra.core.storages.StorageInterface; -import jakarta.inject.Inject; -import jakarta.inject.Singleton; +import jakarta.validation.constraints.NotNull; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; import org.apache.commons.io.FileUtils; import java.io.*; @@ -20,28 +23,27 @@ import static io.kestra.core.utils.Rethrow.throwFunction; @Plugin -@Singleton -@LocalStorageEnabled +@Plugin.Id("local") +@Getter +@Setter +@NoArgsConstructor public class LocalStorage implements StorageInterface { - LocalConfig config; - - /** - * No-arg constructor - required by Kestra service loader. - */ - public LocalStorage() {} - - @Inject - public LocalStorage(LocalConfig config) throws IOException { - this.config = config; - - if (!Files.exists(config.getBasePath())) { - Files.createDirectories(config.getBasePath()); + + @PluginProperty + @NotNull + private Path basePath; + + /** {@inheritDoc} **/ + @Override + public void init() throws IOException { + if (!Files.exists(this.basePath)) { + Files.createDirectories(this.basePath); } } private Path getPath(String tenantId, URI uri) { - Path basePath = tenantId == null ? config.getBasePath().toAbsolutePath() - : Paths.get(config.getBasePath().toAbsolutePath().toString(), tenantId); + Path basePath = tenantId == null ? this.basePath.toAbsolutePath() + : Paths.get(this.basePath.toAbsolutePath().toString(), tenantId); if(uri == null) { return basePath; } @@ -191,8 +193,8 @@ public List deleteByPrefix(String tenantId, URI storagePrefix) throws IOExc private URI getKestraUri(String tenantId, Path path) { Path prefix = (tenantId == null) ? - config.getBasePath().toAbsolutePath() : - Path.of(config.getBasePath().toAbsolutePath().toString(), tenantId); + basePath.toAbsolutePath() : + Path.of(basePath.toAbsolutePath().toString(), tenantId); return URI.create("kestra:///" + prefix.relativize(path)); }