Skip to content

Commit

Permalink
WIP chore(core): migrate StorageInterface to service-loader mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Apr 25, 2024
1 parent 93f9f70 commit 97ccc1f
Show file tree
Hide file tree
Showing 14 changed files with 414 additions and 112 deletions.
14 changes: 4 additions & 10 deletions cli/src/main/java/io/kestra/cli/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.kestra.cli.commands.servers.ServerCommand;
import io.kestra.cli.commands.sys.SysCommand;
import io.kestra.cli.commands.templates.TemplateCommand;
import io.kestra.core.contexts.KestraApplicationContext;
import io.kestra.core.plugins.DefaultPluginRegistry;
import io.kestra.core.plugins.PluginRegistry;
import io.micronaut.configuration.picocli.MicronautFactory;
Expand Down Expand Up @@ -65,7 +64,7 @@ protected static void execute(Class<?> cls, String... args) {
SLF4JBridgeHandler.install();

// Init ApplicationContext
ApplicationContext applicationContext = App.applicationContext(cls, args, DefaultPluginRegistry.getOrCreate());
ApplicationContext applicationContext = App.applicationContext(cls, args);

// Call Picocli command
int exitCode = new CommandLine(cls, new MicronautFactory(applicationContext)).execute(args);
Expand All @@ -84,10 +83,10 @@ protected static void execute(Class<?> cls, String... args) {
* @return the application context created
*/
protected static ApplicationContext applicationContext(Class<?> mainClass,
String[] args,
PluginRegistry pluginRegistry) {
String[] args) {

ApplicationContextBuilder builder = KestraApplicationContext.builder(pluginRegistry)
ApplicationContextBuilder builder = ApplicationContext
.builder()
.mainClass(mainClass)
.environments(Environment.CLI);

Expand Down Expand Up @@ -123,12 +122,7 @@ protected static ApplicationContext applicationContext(Class<?> mainClass,
});

builder.properties(properties);

// Load external plugins before starting ApplicationContext
Path pluginPath = ((AbstractCommand)commandLine.getCommandSpec().userObject()).pluginsPath;
pluginRegistry.registerIfAbsent(pluginPath);
}

return builder.build();
}

Expand Down

This file was deleted.

40 changes: 40 additions & 0 deletions core/src/main/java/io/kestra/core/contexts/KestraBeansFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,56 @@

import io.kestra.core.plugins.DefaultPluginRegistry;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.StorageInterfaceFactory;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.env.Environment;
import io.micronaut.core.naming.conventions.StringConvention;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import java.io.IOException;
import java.util.Map;

@Factory
public class KestraBeansFactory {

public static final String KESTRA_STORAGE_CONFIG = "kestra.storage";
public static final String KESTRA_STORAGE_TYPE_CONFIG = "kestra.storage.type";

@Inject
Environment environment;

@Requires(missingBeans = PluginRegistry.class)
@Singleton
public PluginRegistry pluginRegistry() {
return DefaultPluginRegistry.getOrCreate();
}

@Requires(missingBeans = StorageInterface.class)
@Singleton
@Bean(preDestroy = "close")
public StorageInterface storageInterface(final PluginRegistry pluginRegistry) throws IOException {
return StorageInterfaceFactory.make(pluginRegistry, getStorageType(), getStorageConfig());
}

/**
* Returns the type of the configured storage.
*
* @return the string type.
*/
private String getStorageType() {
return environment.getRequiredProperty(KESTRA_STORAGE_TYPE_CONFIG, String.class);
}

/**
* Returns the configuration for the configured storage.
*
* @return the configuration.
*/
private Map<String, Object> getStorageConfig() {
return environment.getProperties(KESTRA_STORAGE_CONFIG + "." + getStorageType(), StringConvention.CAMEL_CASE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.kestra.core.exceptions;

/**
* The top-level {@link KestraRuntimeException} for non-recoverable errors.
*/
public class KestraRuntimeException extends RuntimeException {

public KestraRuntimeException() {
}

public KestraRuntimeException(String message) {
super(message);
}

public KestraRuntimeException(String message, Throwable cause) {
super(message, cause);
}

public KestraRuntimeException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
})
@Getter
@NoArgsConstructor
@Introspected
@SuperBuilder
public abstract class AbstractRetry {
abstract public String getType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import com.fasterxml.jackson.databind.module.SimpleModule;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.models.triggers.AbstractTrigger;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package io.kestra.core.storages;

import io.kestra.core.utils.Classes;
import io.micronaut.core.util.functional.ThrowingSupplier;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.List;
import java.util.Objects;

/**
* Class for decorating a {@link StorageInterface} that sets the context class loader when calling the respective
* methods.
*/
public final class ClassLoaderAwareStorage implements StorageInterface {

private final ClassLoader classLoader;

private final StorageInterface delegate;

/**
* Creates a new {@link ClassLoaderAwareStorage} instance.
*
* @param classLoader The {@link ClassLoader} to be used.
* @param delegate The {@link StorageInterface} to delegate method calls.
*/
public ClassLoaderAwareStorage(final ClassLoader classLoader,
final StorageInterface delegate) {
this.classLoader = Objects.requireNonNull(classLoader, "classLoader cannot be null");
this.delegate = Objects.requireNonNull(delegate, "delegate cannot be null");
}

/** {@inheritDoc} **/
@Override
public String getType() {
return this.delegate.getType();
}

/** {@inheritDoc} **/
@Override
public void init() throws IOException{
Classes.withContextClassLoader(classLoader, (ThrowingSupplier<Void, IOException>) () -> {
delegate.init();
return null;
});
}

/** {@inheritDoc} **/
@Override
public void close() {
Classes.withContextClassLoader(classLoader, delegate::close);
}

/** {@inheritDoc} **/
@Override
public InputStream get(String tenantId, URI uri) throws IOException {
return Classes.withContextClassLoader(classLoader, () -> delegate.get(tenantId, uri));
}
/** {@inheritDoc} **/
@Override
public List<URI> allByPrefix(String tenantId, URI prefix, boolean includeDirectories) throws IOException {
return Classes.withContextClassLoader(classLoader, () -> delegate.allByPrefix(tenantId, prefix, includeDirectories));
}
/** {@inheritDoc} **/
@Override
public List<FileAttributes> list(String tenantId, URI uri) throws IOException {
return Classes.withContextClassLoader(classLoader, () -> delegate.list(tenantId, uri));
}
/** {@inheritDoc} **/
@Override
public FileAttributes getAttributes(String tenantId, URI uri) throws IOException {
return Classes.withContextClassLoader(classLoader, () -> delegate.getAttributes(tenantId, uri));
}
/** {@inheritDoc} **/
@Override
public URI put(String tenantId, URI uri, InputStream data) throws IOException {
return Classes.withContextClassLoader(classLoader, () -> delegate.put(tenantId, uri, data));
}
/** {@inheritDoc} **/
@Override
public boolean delete(String tenantId, URI uri) throws IOException {
return Classes.withContextClassLoader(classLoader, () -> delegate.delete(tenantId, uri));
}
/** {@inheritDoc} **/
@Override
public URI createDirectory(String tenantId, URI uri) throws IOException {
return Classes.withContextClassLoader(classLoader, () -> delegate.createDirectory(tenantId, uri));
}
/** {@inheritDoc} **/
@Override
public URI move(String tenantId, URI from, URI to) throws IOException {
return Classes.withContextClassLoader(classLoader, () -> delegate.move(tenantId, from, to));
}

/** {@inheritDoc} **/
@Override
public List<URI> deleteByPrefix(String tenantId, URI storagePrefix) throws IOException {
return Classes.withContextClassLoader(classLoader, () -> delegate.deleteByPrefix(tenantId, storagePrefix));
}

/** {@inheritDoc} **/
@Override
public StorageInterface unwrapped() {
return this.delegate.unwrapped();
}
}
30 changes: 29 additions & 1 deletion core/src/main/java/io/kestra/core/storages/StorageInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,35 @@
import java.util.List;

@Introspected
public interface StorageInterface extends Plugin {
public interface StorageInterface extends AutoCloseable, Plugin {

/**
* Opens any resources or perform any pre-checks for initializing this storage.
*
* @throws IOException if an error happens while opening the storage.
*/
default void init() throws IOException {
// no-op
}

/**
* Closes any resources used by this storage.
*/
@Override
default void close() {
// no-op
}

/**
* The storage interface can be decorated with additional capabilities.
* This method can be used to recursively unwraps a {@link StorageInterface}
* returning the original {@link StorageInterface} object.
*
* @return the StorageInterface.
*/
default StorageInterface unwrapped() {
return this;
}

@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
InputStream get(String tenantId, URI uri) throws IOException;
Expand Down
Loading

0 comments on commit 97ccc1f

Please sign in to comment.