Skip to content

Commit

Permalink
WIP chore(core): migrate StorageInterface to service-loader mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Apr 26, 2024
1 parent f2e7a3a commit 229e95b
Show file tree
Hide file tree
Showing 13 changed files with 335 additions and 112 deletions.
14 changes: 4 additions & 10 deletions cli/src/main/java/io/kestra/cli/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.kestra.cli.commands.servers.ServerCommand;
import io.kestra.cli.commands.sys.SysCommand;
import io.kestra.cli.commands.templates.TemplateCommand;
import io.kestra.core.contexts.KestraApplicationContext;
import io.kestra.core.plugins.DefaultPluginRegistry;
import io.kestra.core.plugins.PluginRegistry;
import io.micronaut.configuration.picocli.MicronautFactory;
Expand Down Expand Up @@ -65,7 +64,7 @@ protected static void execute(Class<?> cls, String... args) {
SLF4JBridgeHandler.install();

// Init ApplicationContext
ApplicationContext applicationContext = App.applicationContext(cls, args, DefaultPluginRegistry.getOrCreate());
ApplicationContext applicationContext = App.applicationContext(cls, args);

// Call Picocli command
int exitCode = new CommandLine(cls, new MicronautFactory(applicationContext)).execute(args);
Expand All @@ -84,10 +83,10 @@ protected static void execute(Class<?> cls, String... args) {
* @return the application context created
*/
protected static ApplicationContext applicationContext(Class<?> mainClass,
String[] args,
PluginRegistry pluginRegistry) {
String[] args) {

ApplicationContextBuilder builder = KestraApplicationContext.builder(pluginRegistry)
ApplicationContextBuilder builder = ApplicationContext
.builder()
.mainClass(mainClass)
.environments(Environment.CLI);

Expand Down Expand Up @@ -123,12 +122,7 @@ protected static ApplicationContext applicationContext(Class<?> mainClass,
});

builder.properties(properties);

// Load external plugins before starting ApplicationContext
Path pluginPath = ((AbstractCommand)commandLine.getCommandSpec().userObject()).pluginsPath;
pluginRegistry.registerIfAbsent(pluginPath);
}

return builder.build();
}

Expand Down

This file was deleted.

50 changes: 50 additions & 0 deletions core/src/main/java/io/kestra/core/contexts/KestraBeansFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,66 @@

import io.kestra.core.plugins.DefaultPluginRegistry;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.StorageInterfaceFactory;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.convert.format.MapFormat;
import io.micronaut.core.naming.conventions.StringConvention;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.validation.Validator;

import java.io.IOException;
import java.util.Map;
import java.util.Optional;

@Factory
public class KestraBeansFactory {

@Inject
Validator validator;

@Inject
StorageConfig storageConfig;

@Value("${kestra.storage.type")
Optional<String> storageType;

@Requires(missingBeans = PluginRegistry.class)
@Singleton
public PluginRegistry pluginRegistry() {
return DefaultPluginRegistry.getOrCreate();
}

@Requires(missingBeans = StorageInterface.class)
@Requires(property = "kestra.storage.type")
@Singleton
@Bean(preDestroy = "close")
public StorageInterface storageInterface(final PluginRegistry pluginRegistry) throws IOException {
String pluginId = storageType.get();
return StorageInterfaceFactory.make(pluginRegistry, pluginId, storageConfig.getStorageConfig(pluginId), validator);
}

@ConfigurationProperties("kestra")
public record StorageConfig(
@Nullable
@MapFormat(keyFormat = StringConvention.CAMEL_CASE, transformation = MapFormat.MapTransformation.NESTED)
Map<String, Object> storage
) {

/**
* Returns the configuration for the configured storage.
*
* @return the configuration.
*/
@SuppressWarnings("unchecked")
private Map<String, Object> getStorageConfig(String type) {
return (Map<String, Object>) storage.get(type);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.kestra.core.exceptions;

/**
* The top-level {@link KestraRuntimeException} for non-recoverable errors.
*/
public class KestraRuntimeException extends RuntimeException {

public KestraRuntimeException() {
}

public KestraRuntimeException(String message) {
super(message);
}

public KestraRuntimeException(String message, Throwable cause) {
super(message, cause);
}

public KestraRuntimeException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
})
@Getter
@NoArgsConstructor
@Introspected
@SuperBuilder
public abstract class AbstractRetry {
abstract public String getType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import com.fasterxml.jackson.databind.module.SimpleModule;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.models.triggers.AbstractTrigger;
Expand Down
19 changes: 18 additions & 1 deletion core/src/main/java/io/kestra/core/storages/StorageInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,24 @@
import java.util.List;

@Introspected
public interface StorageInterface extends Plugin {
public interface StorageInterface extends AutoCloseable, Plugin {

/**
* Opens any resources or perform any pre-checks for initializing this storage.
*
* @throws IOException if an error happens during initialization.
*/
default void init() throws IOException {
// no-op
}

/**
* Closes any resources used by this class.
*/
@Override
default void close() {
// no-op
}

@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
InputStream get(String tenantId, URI uri) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package io.kestra.core.storages;

import io.kestra.core.exceptions.KestraRuntimeException;
import io.kestra.core.models.Plugin;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.plugins.RegisteredPlugin;
import io.kestra.core.serializers.JacksonMapper;
import jakarta.annotation.Nullable;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Validator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Factor class for constructing {@link StorageInterface} objects.
*/
public final class StorageInterfaceFactory {

private static final Logger log = LoggerFactory.getLogger(StorageInterfaceFactory.class);

/**
* Factory method for constructing and validating new {@link StorageInterface} of the given type with the given configuration.
*
* @param pluginRegistry The {@link PluginRegistry}. cannot be {@code null}.
* @param pluginId The ID of the storage. cannot be {@code null}.
* @param pluginConfiguration The configuration of the storage. cannot be {@code null}.
* @param validator The {@link Validator}.
* @return a new {@link StorageInterface}.
* @throws KestraRuntimeException if no storage can be found.
*/
public static StorageInterface make(final PluginRegistry pluginRegistry,
final String pluginId,
final Map<String, Object> pluginConfiguration,
@Nullable final Validator validator) {
List<Class<? extends StorageInterface>> classes = allStorageClasses(pluginRegistry)
.filter(clazz -> Plugin.getId(clazz).map(id -> id.equalsIgnoreCase(pluginId)).orElse(false))
.toList();

if (classes.isEmpty()) {
String storageIds = allIdsFor(allStorageClasses(pluginRegistry));
throw new KestraRuntimeException(String.format(
"No storage interface can be found for 'kestra.storage.type=%s'. Supported types are: %s", pluginId, storageIds
));
}

if (classes.size() > 1) {
String storageIds = allIdsFor(classes.stream());
log.warn("Multiple storage interface candidates was found for 'kestra.storage.type={}' ({}). First one is used.", pluginId, storageIds);
}

Class<? extends StorageInterface> storageClass = classes.get(0);

// Storage are handle as any serializable/deserialize plugins.
StorageInterface plugin;
try {
plugin = JacksonMapper.toMap(pluginConfiguration, storageClass);
} catch (Exception e) {
throw new KestraRuntimeException(String.format(
"Failed to create storage '%s'. Error: %s", pluginId, e.getMessage())
);
}

// Validate configuration.
if (validator != null) {
Set<ConstraintViolation<StorageInterface>> violations;
try {
violations = validator.validate(plugin);
} catch (ConstraintViolationException e) {
throw new KestraRuntimeException(String.format(
"Failed to validate configuration for storage '%s'. Error: %s", pluginId, e.getMessage())
);
}
if (!violations.isEmpty()) {
ConstraintViolationException e = new ConstraintViolationException(violations);
throw new KestraRuntimeException(String.format(
"Invalid configuration for storage '%s'. Error: '%s'", pluginId, e.getMessage()), e
);
}
}

try {
plugin.init();
} catch (IOException e) {
throw new KestraRuntimeException(String.format(
"Failed to initialize storage '%s'. Error: %s", pluginId, e.getMessage()), e
);
}
return plugin;
}

/**
* @return all plugin classes for the {@link StorageInterface}s.
*/
private static Stream<Class<? extends StorageInterface>> allStorageClasses(final PluginRegistry pluginRegistry) {
return pluginRegistry.plugins()
.stream()
.map(RegisteredPlugin::getStorages)
.flatMap(List::stream);
}

/**
* @return all plugin identifier for the {@link StorageInterface}s.
*/
private static String allIdsFor(final Stream<Class<? extends StorageInterface>> classes) {
return classes
.map(Plugin::getId)
.flatMap(Optional::stream)
.collect(Collectors.joining(",", "[", "]"));
}
}
Loading

0 comments on commit 229e95b

Please sign in to comment.