Skip to content

Commit

Permalink
WIP chore(core): migrate StorageInterface to service-loader mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Apr 26, 2024
1 parent 93f9f70 commit c0fd070
Show file tree
Hide file tree
Showing 12 changed files with 304 additions and 112 deletions.
14 changes: 4 additions & 10 deletions cli/src/main/java/io/kestra/cli/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.kestra.cli.commands.servers.ServerCommand;
import io.kestra.cli.commands.sys.SysCommand;
import io.kestra.cli.commands.templates.TemplateCommand;
import io.kestra.core.contexts.KestraApplicationContext;
import io.kestra.core.plugins.DefaultPluginRegistry;
import io.kestra.core.plugins.PluginRegistry;
import io.micronaut.configuration.picocli.MicronautFactory;
Expand Down Expand Up @@ -65,7 +64,7 @@ protected static void execute(Class<?> cls, String... args) {
SLF4JBridgeHandler.install();

// Init ApplicationContext
ApplicationContext applicationContext = App.applicationContext(cls, args, DefaultPluginRegistry.getOrCreate());
ApplicationContext applicationContext = App.applicationContext(cls, args);

// Call Picocli command
int exitCode = new CommandLine(cls, new MicronautFactory(applicationContext)).execute(args);
Expand All @@ -84,10 +83,10 @@ protected static void execute(Class<?> cls, String... args) {
* @return the application context created
*/
protected static ApplicationContext applicationContext(Class<?> mainClass,
String[] args,
PluginRegistry pluginRegistry) {
String[] args) {

ApplicationContextBuilder builder = KestraApplicationContext.builder(pluginRegistry)
ApplicationContextBuilder builder = ApplicationContext
.builder()
.mainClass(mainClass)
.environments(Environment.CLI);

Expand Down Expand Up @@ -123,12 +122,7 @@ protected static ApplicationContext applicationContext(Class<?> mainClass,
});

builder.properties(properties);

// Load external plugins before starting ApplicationContext
Path pluginPath = ((AbstractCommand)commandLine.getCommandSpec().userObject()).pluginsPath;
pluginRegistry.registerIfAbsent(pluginPath);
}

return builder.build();
}

Expand Down

This file was deleted.

44 changes: 44 additions & 0 deletions core/src/main/java/io/kestra/core/contexts/KestraBeansFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,60 @@

import io.kestra.core.plugins.DefaultPluginRegistry;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.StorageInterfaceFactory;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.env.Environment;
import io.micronaut.core.naming.conventions.StringConvention;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.validation.Validator;

import java.io.IOException;
import java.util.Map;

@Factory
public class KestraBeansFactory {

public static final String KESTRA_STORAGE_CONFIG = "kestra.storage";
public static final String KESTRA_STORAGE_TYPE_CONFIG = "kestra.storage.type";

@Inject
Environment environment;

@Inject
Validator validator;

@Requires(missingBeans = PluginRegistry.class)
@Singleton
public PluginRegistry pluginRegistry() {
return DefaultPluginRegistry.getOrCreate();
}

@Requires(missingBeans = StorageInterface.class)
@Singleton
@Bean(preDestroy = "close")
public StorageInterface storageInterface(final PluginRegistry pluginRegistry) throws IOException {
return StorageInterfaceFactory.make(pluginRegistry, getStorageType(), getStorageConfig(), validator);
}

/**
* Returns the type of the configured storage.
*
* @return the string type.
*/
private String getStorageType() {
return environment.getRequiredProperty(KESTRA_STORAGE_TYPE_CONFIG, String.class);
}

/**
* Returns the configuration for the configured storage.
*
* @return the configuration.
*/
private Map<String, Object> getStorageConfig() {
return environment.getProperties(KESTRA_STORAGE_CONFIG + "." + getStorageType(), StringConvention.CAMEL_CASE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.kestra.core.exceptions;

/**
* The top-level {@link KestraRuntimeException} for non-recoverable errors.
*/
public class KestraRuntimeException extends RuntimeException {

public KestraRuntimeException() {
}

public KestraRuntimeException(String message) {
super(message);
}

public KestraRuntimeException(String message, Throwable cause) {
super(message, cause);
}

public KestraRuntimeException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
})
@Getter
@NoArgsConstructor
@Introspected
@SuperBuilder
public abstract class AbstractRetry {
abstract public String getType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import com.fasterxml.jackson.databind.module.SimpleModule;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.models.triggers.AbstractTrigger;
Expand Down
19 changes: 18 additions & 1 deletion core/src/main/java/io/kestra/core/storages/StorageInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,24 @@
import java.util.List;

@Introspected
public interface StorageInterface extends Plugin {
public interface StorageInterface extends AutoCloseable, Plugin {

/**
* Opens any resources or perform any pre-checks for initializing this storage.
*
* @throws IOException if an error happens while opening the storage.
*/
default void init() throws IOException {
// no-op
}

/**
* Closes any resources used by this storage.
*/
@Override
default void close() {
// no-op
}

@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
InputStream get(String tenantId, URI uri) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package io.kestra.core.storages;

import io.kestra.core.exceptions.KestraRuntimeException;
import io.kestra.core.models.Plugin;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.plugins.RegisteredPlugin;
import io.kestra.core.serializers.JacksonMapper;
import jakarta.annotation.Nullable;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Validator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Factor class for constructing {@link StorageInterface} objects.
*/
public final class StorageInterfaceFactory {

private static final Logger log = LoggerFactory.getLogger(StorageInterfaceFactory.class);

/**
* Factory method for constructing and validating new {@link StorageInterface} of the given type with the given configuration.
*
* @param pluginRegistry The {@link PluginRegistry}. cannot be {@code null}.
* @param storageType The type of the storage. cannot be {@code null}.
* @param storageConfiguration The configuration of the storage. cannot be {@code null}.
* @param validator The {@link Validator}.
* @return a new {@link StorageInterface}.
* @throws KestraRuntimeException if no storage can be found.
*/
public static StorageInterface make(final PluginRegistry pluginRegistry,
final String storageType,
final Map<String, Object> storageConfiguration,
@Nullable final Validator validator) {
List<Class<? extends StorageInterface>> classes = pluginRegistry.plugins()
.stream()
.map(RegisteredPlugin::getStorages)
.flatMap(List::stream)
.filter(clazz -> Plugin.getId(clazz).map(id -> id.equalsIgnoreCase(storageType)).orElse(false))
.toList();

if (classes.isEmpty()) {
String storageIds = allStorageIds(pluginRegistry);
throw new KestraRuntimeException(String.format(
"No storage interface can be found for 'kestra.storage.type=%s'. Supported types are: %s", storageType, storageIds
));
}

if (classes.size() > 1) {
String storageIds = allIdsFor(classes.stream().map(Function.identity()));
log.warn("Multiple StorageInterface candidates was found for 'kestra.storage.type={}' ({}). First one is used.", storageType, storageIds);
}

Class<? extends StorageInterface> storageClass = classes.get(0);

// Storage are handle as any serializable/deserialize plugins.
StorageInterface storage;
try {
storage = JacksonMapper.toMap(storageConfiguration, storageClass);
} catch (Exception e) {
throw new KestraRuntimeException(String.format(
"Failed to create storage '%s'. Error: %s", storageType, e.getMessage())
);
}

// Validate configuration.
if (validator != null) {
Set<ConstraintViolation<StorageInterface>> violations;
try {
violations = validator.validate(storage);
} catch (ConstraintViolationException e) {
throw new KestraRuntimeException(String.format(
"Failed to validate configuration for storage '%s'. Error: %s", storageType, e.getMessage())
);
}
if (!violations.isEmpty()) {
ConstraintViolationException e = new ConstraintViolationException(violations);
throw new KestraRuntimeException(String.format(
"Invalid configuration for storage '%s'. Error: '%s'", storageType, e.getMessage()), e
);
}
}

try {
storage.init();
} catch (IOException e) {
throw new KestraRuntimeException(String.format(
"Failed to initialize storage '%s'. Error: %s", storageType, e.getMessage()), e
);
}
return storage;
}

private static String allStorageIds(final PluginRegistry pluginRegistry) {
return allIdsFor(allStorageClasses(pluginRegistry).map(Function.identity()));
}

private static Stream<Class<? extends StorageInterface>> allStorageClasses(final PluginRegistry pluginRegistry) {
return pluginRegistry.plugins()
.stream()
.map(RegisteredPlugin::getStorages)
.flatMap(List::stream);
}

private static String allIdsFor(final Stream<Class<?>> classes) {
return classes
.map(Plugin::getId)
.flatMap(Optional::stream)
.collect(Collectors.joining(",", "[", "]"));
}
}
Loading

0 comments on commit c0fd070

Please sign in to comment.