From 97ccc1f577675e39a4663f7f63653249d9166817 Mon Sep 17 00:00:00 2001 From: Florian Hussonnois Date: Thu, 25 Apr 2024 14:18:39 +0200 Subject: [PATCH] WIP chore(core): migrate StorageInterface to service-loader mechanism part-of: kestra-io/storage-minio#80 part-of: kestra-io/storage-s3#80 part-of: kestra-io/storage-gcs#119 --- cli/src/main/java/io/kestra/cli/App.java | 14 +-- .../contexts/KestraApplicationContext.java | 77 ------------- .../core/contexts/KestraBeansFactory.java | 40 +++++++ .../exceptions/KestraRuntimeException.java | 22 ++++ .../models/tasks/retrys/AbstractRetry.java | 1 - .../io/kestra/core/plugins/PluginModule.java | 1 - .../storages/ClassLoaderAwareStorage.java | 107 ++++++++++++++++++ .../core/storages/StorageInterface.java | 30 ++++- .../storages/StorageInterfaceFactory.java | 100 ++++++++++++++++ .../java/io/kestra/core/utils/Classes.java | 36 ++++++ .../storages/StorageInterfaceFactoryTest.java | 26 +++++ .../java/io/kestra/core/models/Plugin.java | 15 +++ .../core/models/annotations/Plugin.java | 13 ++- .../io/kestra/storage/local/LocalStorage.java | 44 +++---- 14 files changed, 414 insertions(+), 112 deletions(-) delete mode 100644 core/src/main/java/io/kestra/core/contexts/KestraApplicationContext.java create mode 100644 core/src/main/java/io/kestra/core/exceptions/KestraRuntimeException.java create mode 100644 core/src/main/java/io/kestra/core/storages/ClassLoaderAwareStorage.java create mode 100644 core/src/main/java/io/kestra/core/storages/StorageInterfaceFactory.java create mode 100644 core/src/main/java/io/kestra/core/utils/Classes.java create mode 100644 core/src/test/java/io/kestra/core/storages/StorageInterfaceFactoryTest.java diff --git a/cli/src/main/java/io/kestra/cli/App.java b/cli/src/main/java/io/kestra/cli/App.java index 12210448ea7..490eece75b9 100644 --- a/cli/src/main/java/io/kestra/cli/App.java +++ b/cli/src/main/java/io/kestra/cli/App.java @@ -7,7 +7,6 @@ import io.kestra.cli.commands.servers.ServerCommand; import io.kestra.cli.commands.sys.SysCommand; import io.kestra.cli.commands.templates.TemplateCommand; -import io.kestra.core.contexts.KestraApplicationContext; import io.kestra.core.plugins.DefaultPluginRegistry; import io.kestra.core.plugins.PluginRegistry; import io.micronaut.configuration.picocli.MicronautFactory; @@ -65,7 +64,7 @@ protected static void execute(Class cls, String... args) { SLF4JBridgeHandler.install(); // Init ApplicationContext - ApplicationContext applicationContext = App.applicationContext(cls, args, DefaultPluginRegistry.getOrCreate()); + ApplicationContext applicationContext = App.applicationContext(cls, args); // Call Picocli command int exitCode = new CommandLine(cls, new MicronautFactory(applicationContext)).execute(args); @@ -84,10 +83,10 @@ protected static void execute(Class cls, String... args) { * @return the application context created */ protected static ApplicationContext applicationContext(Class mainClass, - String[] args, - PluginRegistry pluginRegistry) { + String[] args) { - ApplicationContextBuilder builder = KestraApplicationContext.builder(pluginRegistry) + ApplicationContextBuilder builder = ApplicationContext + .builder() .mainClass(mainClass) .environments(Environment.CLI); @@ -123,12 +122,7 @@ protected static ApplicationContext applicationContext(Class mainClass, }); builder.properties(properties); - - // Load external plugins before starting ApplicationContext - Path pluginPath = ((AbstractCommand)commandLine.getCommandSpec().userObject()).pluginsPath; - pluginRegistry.registerIfAbsent(pluginPath); } - return builder.build(); } diff --git a/core/src/main/java/io/kestra/core/contexts/KestraApplicationContext.java b/core/src/main/java/io/kestra/core/contexts/KestraApplicationContext.java deleted file mode 100644 index ecfc3aaf68c..00000000000 --- a/core/src/main/java/io/kestra/core/contexts/KestraApplicationContext.java +++ /dev/null @@ -1,77 +0,0 @@ -package io.kestra.core.contexts; - -import io.kestra.core.plugins.DefaultPluginRegistry; -import io.kestra.core.plugins.PluginRegistry; -import io.micronaut.context.ApplicationContext; -import io.micronaut.context.ApplicationContextBuilder; -import io.micronaut.context.ApplicationContextConfiguration; -import io.micronaut.context.DefaultApplicationContext; -import io.micronaut.context.DefaultApplicationContextBuilder; -import io.micronaut.context.env.Environment; -import io.micronaut.core.annotation.NonNull; -import io.micronaut.core.annotation.Nullable; -import io.micronaut.core.io.service.SoftServiceLoader; -import io.micronaut.inject.BeanDefinitionReference; - -import java.util.List; - -/** - * Overload the {@link DefaultApplicationContext} in order to add plugins - * into the {@link io.micronaut.context.DefaultBeanContext} - */ -@SuppressWarnings("rawtypes") -public class KestraApplicationContext extends DefaultApplicationContext { - private final PluginRegistry pluginRegistry; - - private final ApplicationContext delegate; - - public static ApplicationContextBuilder builder(@Nullable PluginRegistry pluginRegistry) { - DefaultApplicationContextBuilder builder = new DefaultApplicationContextBuilder() { - @Override - public ApplicationContext build() { - return new KestraApplicationContext(super.build(), this, pluginRegistry); - } - }; - // Register PluginRegistry as singleton - return builder.singletons(pluginRegistry); - } - - public KestraApplicationContext(@NonNull ApplicationContext delegate, - @NonNull ApplicationContextConfiguration configuration, - PluginRegistry pluginRegistry) { - super(configuration); - this.delegate = delegate; - this.pluginRegistry = pluginRegistry; - } - - /** - * {@inheritDoc} - **/ - @Override - public Environment getEnvironment() { - return delegate.getEnvironment(); - } - - /** - * Resolves the {@link BeanDefinitionReference} class instances from the {@link io.kestra.core.plugins.PluginRegistry}. - * to found all external implementations of the following plugin types. - *

- * - {@link io.kestra.core.secret.SecretPluginInterface} - * - {@link io.kestra.core.storages.StorageInterface} - * - * @return The bean definition classes - */ - @Override - protected @NonNull List resolveBeanDefinitionReferences() { - List resolvedBeanReferences = super.resolveBeanDefinitionReferences(); - if (pluginRegistry != null) { - ((DefaultPluginRegistry)pluginRegistry) - .externalPlugins() - .forEach(plugin -> { - final SoftServiceLoader definitions = SoftServiceLoader.load(BeanDefinitionReference.class, plugin.getClassLoader()); - definitions.collectAll(resolvedBeanReferences, BeanDefinitionReference::isPresent); - }); - } - return resolvedBeanReferences; - } -} \ No newline at end of file diff --git a/core/src/main/java/io/kestra/core/contexts/KestraBeansFactory.java b/core/src/main/java/io/kestra/core/contexts/KestraBeansFactory.java index 53b5336f6fa..3ffd630a639 100644 --- a/core/src/main/java/io/kestra/core/contexts/KestraBeansFactory.java +++ b/core/src/main/java/io/kestra/core/contexts/KestraBeansFactory.java @@ -2,16 +2,56 @@ import io.kestra.core.plugins.DefaultPluginRegistry; import io.kestra.core.plugins.PluginRegistry; +import io.kestra.core.storages.StorageInterface; +import io.kestra.core.storages.StorageInterfaceFactory; +import io.micronaut.context.annotation.Bean; import io.micronaut.context.annotation.Factory; import io.micronaut.context.annotation.Requires; +import io.micronaut.context.env.Environment; +import io.micronaut.core.naming.conventions.StringConvention; +import jakarta.inject.Inject; import jakarta.inject.Singleton; +import java.io.IOException; +import java.util.Map; + @Factory public class KestraBeansFactory { + public static final String KESTRA_STORAGE_CONFIG = "kestra.storage"; + public static final String KESTRA_STORAGE_TYPE_CONFIG = "kestra.storage.type"; + + @Inject + Environment environment; + @Requires(missingBeans = PluginRegistry.class) @Singleton public PluginRegistry pluginRegistry() { return DefaultPluginRegistry.getOrCreate(); } + + @Requires(missingBeans = StorageInterface.class) + @Singleton + @Bean(preDestroy = "close") + public StorageInterface storageInterface(final PluginRegistry pluginRegistry) throws IOException { + return StorageInterfaceFactory.make(pluginRegistry, getStorageType(), getStorageConfig()); + } + + /** + * Returns the type of the configured storage. + * + * @return the string type. + */ + private String getStorageType() { + return environment.getRequiredProperty(KESTRA_STORAGE_TYPE_CONFIG, String.class); + } + + /** + * Returns the configuration for the configured storage. + * + * @return the configuration. + */ + private Map getStorageConfig() { + return environment.getProperties(KESTRA_STORAGE_CONFIG + "." + getStorageType(), StringConvention.CAMEL_CASE); + } } diff --git a/core/src/main/java/io/kestra/core/exceptions/KestraRuntimeException.java b/core/src/main/java/io/kestra/core/exceptions/KestraRuntimeException.java new file mode 100644 index 00000000000..d2d4d99f503 --- /dev/null +++ b/core/src/main/java/io/kestra/core/exceptions/KestraRuntimeException.java @@ -0,0 +1,22 @@ +package io.kestra.core.exceptions; + +/** + * The top-level {@link KestraRuntimeException} for non-recoverable errors. + */ +public class KestraRuntimeException extends RuntimeException { + + public KestraRuntimeException() { + } + + public KestraRuntimeException(String message) { + super(message); + } + + public KestraRuntimeException(String message, Throwable cause) { + super(message, cause); + } + + public KestraRuntimeException(Throwable cause) { + super(cause); + } +} diff --git a/core/src/main/java/io/kestra/core/models/tasks/retrys/AbstractRetry.java b/core/src/main/java/io/kestra/core/models/tasks/retrys/AbstractRetry.java index 9668f010b10..ef0a1656a7d 100644 --- a/core/src/main/java/io/kestra/core/models/tasks/retrys/AbstractRetry.java +++ b/core/src/main/java/io/kestra/core/models/tasks/retrys/AbstractRetry.java @@ -22,7 +22,6 @@ }) @Getter @NoArgsConstructor -@Introspected @SuperBuilder public abstract class AbstractRetry { abstract public String getType(); diff --git a/core/src/main/java/io/kestra/core/plugins/PluginModule.java b/core/src/main/java/io/kestra/core/plugins/PluginModule.java index 874ce48f45c..6aca015b8ad 100644 --- a/core/src/main/java/io/kestra/core/plugins/PluginModule.java +++ b/core/src/main/java/io/kestra/core/plugins/PluginModule.java @@ -3,7 +3,6 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import io.kestra.core.models.conditions.Condition; -import io.kestra.core.models.listeners.Listener; import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.runners.TaskRunner; import io.kestra.core.models.triggers.AbstractTrigger; diff --git a/core/src/main/java/io/kestra/core/storages/ClassLoaderAwareStorage.java b/core/src/main/java/io/kestra/core/storages/ClassLoaderAwareStorage.java new file mode 100644 index 00000000000..3440ff36807 --- /dev/null +++ b/core/src/main/java/io/kestra/core/storages/ClassLoaderAwareStorage.java @@ -0,0 +1,107 @@ +package io.kestra.core.storages; + +import io.kestra.core.utils.Classes; +import io.micronaut.core.util.functional.ThrowingSupplier; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.List; +import java.util.Objects; + +/** + * Class for decorating a {@link StorageInterface} that sets the context class loader when calling the respective + * methods. + */ +public final class ClassLoaderAwareStorage implements StorageInterface { + + private final ClassLoader classLoader; + + private final StorageInterface delegate; + + /** + * Creates a new {@link ClassLoaderAwareStorage} instance. + * + * @param classLoader The {@link ClassLoader} to be used. + * @param delegate The {@link StorageInterface} to delegate method calls. + */ + public ClassLoaderAwareStorage(final ClassLoader classLoader, + final StorageInterface delegate) { + this.classLoader = Objects.requireNonNull(classLoader, "classLoader cannot be null"); + this.delegate = Objects.requireNonNull(delegate, "delegate cannot be null"); + } + + /** {@inheritDoc} **/ + @Override + public String getType() { + return this.delegate.getType(); + } + + /** {@inheritDoc} **/ + @Override + public void init() throws IOException{ + Classes.withContextClassLoader(classLoader, (ThrowingSupplier) () -> { + delegate.init(); + return null; + }); + } + + /** {@inheritDoc} **/ + @Override + public void close() { + Classes.withContextClassLoader(classLoader, delegate::close); + } + + /** {@inheritDoc} **/ + @Override + public InputStream get(String tenantId, URI uri) throws IOException { + return Classes.withContextClassLoader(classLoader, () -> delegate.get(tenantId, uri)); + } + /** {@inheritDoc} **/ + @Override + public List allByPrefix(String tenantId, URI prefix, boolean includeDirectories) throws IOException { + return Classes.withContextClassLoader(classLoader, () -> delegate.allByPrefix(tenantId, prefix, includeDirectories)); + } + /** {@inheritDoc} **/ + @Override + public List list(String tenantId, URI uri) throws IOException { + return Classes.withContextClassLoader(classLoader, () -> delegate.list(tenantId, uri)); + } + /** {@inheritDoc} **/ + @Override + public FileAttributes getAttributes(String tenantId, URI uri) throws IOException { + return Classes.withContextClassLoader(classLoader, () -> delegate.getAttributes(tenantId, uri)); + } + /** {@inheritDoc} **/ + @Override + public URI put(String tenantId, URI uri, InputStream data) throws IOException { + return Classes.withContextClassLoader(classLoader, () -> delegate.put(tenantId, uri, data)); + } + /** {@inheritDoc} **/ + @Override + public boolean delete(String tenantId, URI uri) throws IOException { + return Classes.withContextClassLoader(classLoader, () -> delegate.delete(tenantId, uri)); + } + /** {@inheritDoc} **/ + @Override + public URI createDirectory(String tenantId, URI uri) throws IOException { + return Classes.withContextClassLoader(classLoader, () -> delegate.createDirectory(tenantId, uri)); + } + /** {@inheritDoc} **/ + @Override + public URI move(String tenantId, URI from, URI to) throws IOException { + return Classes.withContextClassLoader(classLoader, () -> delegate.move(tenantId, from, to)); + } + + /** {@inheritDoc} **/ + @Override + public List deleteByPrefix(String tenantId, URI storagePrefix) throws IOException { + return Classes.withContextClassLoader(classLoader, () -> delegate.deleteByPrefix(tenantId, storagePrefix)); + } + + /** {@inheritDoc} **/ + @Override + public StorageInterface unwrapped() { + return this.delegate.unwrapped(); + } +} diff --git a/core/src/main/java/io/kestra/core/storages/StorageInterface.java b/core/src/main/java/io/kestra/core/storages/StorageInterface.java index 80a83d562c5..cce60224e87 100644 --- a/core/src/main/java/io/kestra/core/storages/StorageInterface.java +++ b/core/src/main/java/io/kestra/core/storages/StorageInterface.java @@ -15,7 +15,35 @@ import java.util.List; @Introspected -public interface StorageInterface extends Plugin { +public interface StorageInterface extends AutoCloseable, Plugin { + + /** + * Opens any resources or perform any pre-checks for initializing this storage. + * + * @throws IOException if an error happens while opening the storage. + */ + default void init() throws IOException { + // no-op + } + + /** + * Closes any resources used by this storage. + */ + @Override + default void close() { + // no-op + } + + /** + * The storage interface can be decorated with additional capabilities. + * This method can be used to recursively unwraps a {@link StorageInterface} + * returning the original {@link StorageInterface} object. + * + * @return the StorageInterface. + */ + default StorageInterface unwrapped() { + return this; + } @Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class}) InputStream get(String tenantId, URI uri) throws IOException; diff --git a/core/src/main/java/io/kestra/core/storages/StorageInterfaceFactory.java b/core/src/main/java/io/kestra/core/storages/StorageInterfaceFactory.java new file mode 100644 index 00000000000..407b561251a --- /dev/null +++ b/core/src/main/java/io/kestra/core/storages/StorageInterfaceFactory.java @@ -0,0 +1,100 @@ +package io.kestra.core.storages; + +import io.kestra.core.exceptions.KestraRuntimeException; +import io.kestra.core.models.Plugin; +import io.kestra.core.plugins.PluginRegistry; +import io.kestra.core.plugins.RegisteredPlugin; +import io.kestra.core.serializers.JacksonMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Factor class for constructing {@link StorageInterface} objects. + */ +public final class StorageInterfaceFactory { + + private static final Logger log = LoggerFactory.getLogger(StorageInterfaceFactory.class); + + /** + * Factory method for constructing a new {@link StorageInterface} of the given type with the given configuration. + * + * @param pluginRegistry The {@link PluginRegistry}. cannot be {@code null}. + * @param storageType The type of the storage. cannot be {@code null}. + * @param storageConfiguration The configuration of the storage. cannot be {@code null}. + * @return a new {@link StorageInterface}. + * @throws KestraRuntimeException if no storage can be found. + */ + public static StorageInterface make(final PluginRegistry pluginRegistry, + final String storageType, + final Map storageConfiguration) { + List> classes = pluginRegistry.plugins() + .stream() + .map(RegisteredPlugin::getStorages) + .flatMap(List::stream) + .filter(clazz -> Plugin.getId(clazz).map(id -> id.equalsIgnoreCase(storageType)).orElse(false)) + .toList(); + + if (classes.isEmpty()) { + String storageIds = allStorageIds(pluginRegistry); + throw new KestraRuntimeException(String.format( + "No storage interface can be found for 'kestra.storage.type=%s'. Supported types are: %s", storageType, storageIds + )); + } + + if (classes.size() > 1) { + String storageIds = allIdsFor(classes.stream().map(Function.identity())); + log.warn("Multiple StorageInterface candidates was found for 'kestra.storage.type={}'({}). First one is used.", storageType, storageIds); + } + + Class storageClass = classes.get(0); + + // Storage are handle as any serializable/deserialize plugins. + StorageInterface storage; + try { + storage = JacksonMapper.toMap(storageConfiguration, storageClass); + } catch (Exception e) { + throw new KestraRuntimeException(String.format( + "Failed to configure storage '%s'('%s'). Error: %s", storageClass, storageType, e.getMessage()) + ); + } + + // Storage can be used by any Kestra services (i.e., not only Workers). + // Classloader isolation must be ensured for any method calls. + storage = new ClassLoaderAwareStorage(storageClass.getClassLoader(), storage); + + try { + storage.init(); + } catch (IOException e) { + throw new KestraRuntimeException(String.format( + "Failed to initialize storage '%s'('%s').", storageClass, storageType), e + ); + } + return storage; + } + + private static String allStorageIds(final PluginRegistry pluginRegistry) { + return allIdsFor(allStorageClasses(pluginRegistry).map(Function.identity())); + } + + private static Stream> allStorageClasses(final PluginRegistry pluginRegistry) { + return pluginRegistry.plugins() + .stream() + .map(RegisteredPlugin::getStorages) + .flatMap(List::stream); + } + + private static String allIdsFor(final Stream> classes) { + return classes + .map(Plugin::getId) + .flatMap(Optional::stream) + .collect(Collectors.joining(",", "[", "]")); + } +} diff --git a/core/src/main/java/io/kestra/core/utils/Classes.java b/core/src/main/java/io/kestra/core/utils/Classes.java new file mode 100644 index 00000000000..6b2c47c279a --- /dev/null +++ b/core/src/main/java/io/kestra/core/utils/Classes.java @@ -0,0 +1,36 @@ +package io.kestra.core.utils; + +import io.micronaut.core.util.functional.ThrowingSupplier; + +public class Classes { + + public static R withContextClassLoader(final ClassLoader classLoader, + final ThrowingSupplier runnable) throws E { + final ClassLoader saved = compareAndSwapLoaders(classLoader); + try { + Thread.currentThread().setContextClassLoader(classLoader); + return runnable.get(); + } finally { + compareAndSwapLoaders(saved); + } + } + + public static void withContextClassLoader(final ClassLoader classLoader, + final Runnable runnable) { + final ClassLoader saved = compareAndSwapLoaders(classLoader); + try { + Thread.currentThread().setContextClassLoader(classLoader); + runnable.run(); + } finally { + compareAndSwapLoaders(saved); + } + } + + public static ClassLoader compareAndSwapLoaders(final ClassLoader classLoader) { + final ClassLoader current = Thread.currentThread().getContextClassLoader(); + if (!current.equals(classLoader)) { + Thread.currentThread().setContextClassLoader(classLoader); + } + return current; + } +} diff --git a/core/src/test/java/io/kestra/core/storages/StorageInterfaceFactoryTest.java b/core/src/test/java/io/kestra/core/storages/StorageInterfaceFactoryTest.java new file mode 100644 index 00000000000..44df6d15b5f --- /dev/null +++ b/core/src/test/java/io/kestra/core/storages/StorageInterfaceFactoryTest.java @@ -0,0 +1,26 @@ +package io.kestra.core.storages; + +import io.kestra.core.exceptions.KestraRuntimeException; +import io.kestra.core.plugins.DefaultPluginRegistry; +import io.kestra.storage.local.LocalStorage; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +class StorageInterfaceFactoryTest { + + DefaultPluginRegistry registry = DefaultPluginRegistry.getOrCreate(); + + @Test + void shouldReturnStorageGivenValidId() { + StorageInterface storage = StorageInterfaceFactory.make(registry, "local", Map.of("basePath", "/tmp/kestra")); + Assertions.assertNotNull(storage); + Assertions.assertEquals(LocalStorage.class.getName(), storage.getType()); + } + + @Test + void shouldFailedGivenInvalidId() { + Assertions.assertThrows(KestraRuntimeException.class, () -> StorageInterfaceFactory.make(registry, "invalid", Map.of())); + } +} \ No newline at end of file diff --git a/model/src/main/java/io/kestra/core/models/Plugin.java b/model/src/main/java/io/kestra/core/models/Plugin.java index 6ebc81eef0a..df96478b4b1 100644 --- a/model/src/main/java/io/kestra/core/models/Plugin.java +++ b/model/src/main/java/io/kestra/core/models/Plugin.java @@ -1,7 +1,9 @@ package io.kestra.core.models; +import io.kestra.core.models.annotations.Plugin.Id; import jakarta.validation.constraints.NotNull; +import java.util.Objects; import java.util.Optional; /** @@ -26,9 +28,22 @@ default String getType() { * @return {@code true} if the plugin is internal. */ static boolean isInternal(final Class plugin) { + Objects.requireNonNull(plugin, "Cannot check if a plugin is internal from null"); io.kestra.core.models.annotations.Plugin annotation = plugin.getAnnotation(io.kestra.core.models.annotations.Plugin.class); return Optional.ofNullable(annotation) .map(io.kestra.core.models.annotations.Plugin::internal) .orElse(false); } + + /** + * Static helper method to gt the id of a plugin. + * + * @param plugin The plugin type. + * @return an optional string id. + */ + static Optional getId(final Class plugin) { + Objects.requireNonNull(plugin, "Cannot get plugin id from null"); + Id annotation = plugin.getAnnotation(Id.class); + return Optional.ofNullable(annotation).map(Id::value).map(String::toLowerCase); + } } diff --git a/model/src/main/java/io/kestra/core/models/annotations/Plugin.java b/model/src/main/java/io/kestra/core/models/annotations/Plugin.java index 730c4ce9893..c4ae64867bc 100644 --- a/model/src/main/java/io/kestra/core/models/annotations/Plugin.java +++ b/model/src/main/java/io/kestra/core/models/annotations/Plugin.java @@ -21,7 +21,7 @@ /** * Specifies whether the annotated plugin class is internal to Kestra. *

- * An internal plugin can be resolved through the {@link io.kestra.core.plugins.PluginRegistry}, but cannot + * An internal plugin can be resolved through the PluginRegistry, but cannot * be referenced directly in a YAML flow definition. * * @return {@code true} if the plugin is internal. Otherwise {@link false}. @@ -35,4 +35,15 @@ * For the moment, aliases are considered as deprecated plugins replaced by the class annotated. */ String[] aliases() default {}; + + @Documented + @Retention(RUNTIME) + @Target({ElementType.TYPE}) + @interface Id { + /** + * Specificies the unique ID for identifying a plugin. Id are case insensitive. + * @return The string identifier. + */ + String value(); + } } diff --git a/storage-local/src/main/java/io/kestra/storage/local/LocalStorage.java b/storage-local/src/main/java/io/kestra/storage/local/LocalStorage.java index 31a34226e79..cfd9a279d92 100644 --- a/storage-local/src/main/java/io/kestra/storage/local/LocalStorage.java +++ b/storage-local/src/main/java/io/kestra/storage/local/LocalStorage.java @@ -1,10 +1,13 @@ package io.kestra.storage.local; import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.storages.FileAttributes; import io.kestra.core.storages.StorageInterface; -import jakarta.inject.Inject; -import jakarta.inject.Singleton; +import jakarta.validation.constraints.NotNull; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; import org.apache.commons.io.FileUtils; import java.io.*; @@ -20,28 +23,27 @@ import static io.kestra.core.utils.Rethrow.throwFunction; @Plugin -@Singleton -@LocalStorageEnabled +@Plugin.Id("local") +@Getter +@Setter +@NoArgsConstructor public class LocalStorage implements StorageInterface { - LocalConfig config; - - /** - * No-arg constructor - required by Kestra service loader. - */ - public LocalStorage() {} - - @Inject - public LocalStorage(LocalConfig config) throws IOException { - this.config = config; - - if (!Files.exists(config.getBasePath())) { - Files.createDirectories(config.getBasePath()); + + @PluginProperty + @NotNull + private Path basePath; + + /** {@inheritDoc} **/ + @Override + public void init() throws IOException { + if (!Files.exists(this.basePath)) { + Files.createDirectories(this.basePath); } } private Path getPath(String tenantId, URI uri) { - Path basePath = tenantId == null ? config.getBasePath().toAbsolutePath() - : Paths.get(config.getBasePath().toAbsolutePath().toString(), tenantId); + Path basePath = tenantId == null ? this.basePath.toAbsolutePath() + : Paths.get(this.basePath.toAbsolutePath().toString(), tenantId); if(uri == null) { return basePath; } @@ -191,8 +193,8 @@ public List deleteByPrefix(String tenantId, URI storagePrefix) throws IOExc private URI getKestraUri(String tenantId, Path path) { Path prefix = (tenantId == null) ? - config.getBasePath().toAbsolutePath() : - Path.of(config.getBasePath().toAbsolutePath().toString(), tenantId); + basePath.toAbsolutePath() : + Path.of(basePath.toAbsolutePath().toString(), tenantId); return URI.create("kestra:///" + prefix.relativize(path)); }