Skip to content

Commit

Permalink
chore(core): migrate StorageInterface to service-loader mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Apr 29, 2024
1 parent 0ab3f7d commit b381ee5
Show file tree
Hide file tree
Showing 13 changed files with 340 additions and 112 deletions.
14 changes: 4 additions & 10 deletions cli/src/main/java/io/kestra/cli/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.kestra.cli.commands.servers.ServerCommand;
import io.kestra.cli.commands.sys.SysCommand;
import io.kestra.cli.commands.templates.TemplateCommand;
import io.kestra.core.contexts.KestraApplicationContext;
import io.kestra.core.plugins.DefaultPluginRegistry;
import io.kestra.core.plugins.PluginRegistry;
import io.micronaut.configuration.picocli.MicronautFactory;
Expand Down Expand Up @@ -65,7 +64,7 @@ protected static void execute(Class<?> cls, String... args) {
SLF4JBridgeHandler.install();

// Init ApplicationContext
ApplicationContext applicationContext = App.applicationContext(cls, args, DefaultPluginRegistry.getOrCreate());
ApplicationContext applicationContext = App.applicationContext(cls, args);

// Call Picocli command
int exitCode = new CommandLine(cls, new MicronautFactory(applicationContext)).execute(args);
Expand All @@ -84,10 +83,10 @@ protected static void execute(Class<?> cls, String... args) {
* @return the application context created
*/
protected static ApplicationContext applicationContext(Class<?> mainClass,
String[] args,
PluginRegistry pluginRegistry) {
String[] args) {

ApplicationContextBuilder builder = KestraApplicationContext.builder(pluginRegistry)
ApplicationContextBuilder builder = ApplicationContext
.builder()
.mainClass(mainClass)
.environments(Environment.CLI);

Expand Down Expand Up @@ -123,12 +122,7 @@ protected static ApplicationContext applicationContext(Class<?> mainClass,
});

builder.properties(properties);

// Load external plugins before starting ApplicationContext
Path pluginPath = ((AbstractCommand)commandLine.getCommandSpec().userObject()).pluginsPath;
pluginRegistry.registerIfAbsent(pluginPath);
}

return builder.build();
}

Expand Down

This file was deleted.

56 changes: 56 additions & 0 deletions core/src/main/java/io/kestra/core/contexts/KestraBeansFactory.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,73 @@
package io.kestra.core.contexts;

import io.kestra.core.exceptions.KestraRuntimeException;
import io.kestra.core.plugins.DefaultPluginRegistry;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.StorageInterfaceFactory;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.convert.format.MapFormat;
import io.micronaut.core.naming.conventions.StringConvention;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.validation.Validator;

import java.io.IOException;
import java.util.Map;
import java.util.Optional;

import static io.kestra.core.storages.StorageInterfaceFactory.KESTRA_STORAGE_TYPE_CONFIG;

@Factory
public class KestraBeansFactory {

@Inject
Validator validator;

@Inject
StorageConfig storageConfig;

@Value("${kestra.storage.type}")
Optional<String> storageType;

@Requires(missingBeans = PluginRegistry.class)
@Singleton
public PluginRegistry pluginRegistry() {
return DefaultPluginRegistry.getOrCreate();
}

@Requires(missingBeans = StorageInterface.class)
@Singleton
@Bean(preDestroy = "close")
public StorageInterface storageInterface(final PluginRegistry pluginRegistry) throws IOException {
String pluginId = storageType.orElseThrow(() -> new KestraRuntimeException(String.format(
"No storage configured through the application property '%s'. Support types are: %s"
, KESTRA_STORAGE_TYPE_CONFIG,
StorageInterfaceFactory.getLoggableStorageIds(pluginRegistry)
)));
return StorageInterfaceFactory.make(pluginRegistry, pluginId, storageConfig.getStorageConfig(pluginId), validator);
}

@ConfigurationProperties("kestra")
public record StorageConfig(
@Nullable
@MapFormat(keyFormat = StringConvention.CAMEL_CASE, transformation = MapFormat.MapTransformation.NESTED)
Map<String, Object> storage
) {

/**
* Returns the configuration for the configured storage.
*
* @return the configuration.
*/
@SuppressWarnings("unchecked")
private Map<String, Object> getStorageConfig(String type) {
return (Map<String, Object>) storage.get(StringConvention.CAMEL_CASE.format(type));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.kestra.core.exceptions;

/**
* The top-level {@link KestraRuntimeException} for non-recoverable errors.
*/
public class KestraRuntimeException extends RuntimeException {

public KestraRuntimeException() {
}

public KestraRuntimeException(String message) {
super(message);
}

public KestraRuntimeException(String message, Throwable cause) {
super(message, cause);
}

public KestraRuntimeException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
})
@Getter
@NoArgsConstructor
@Introspected
@SuperBuilder
public abstract class AbstractRetry {
abstract public String getType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import com.fasterxml.jackson.databind.module.SimpleModule;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.models.triggers.AbstractTrigger;
Expand Down
19 changes: 18 additions & 1 deletion core/src/main/java/io/kestra/core/storages/StorageInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,24 @@
import java.util.List;

@Introspected
public interface StorageInterface extends Plugin {
public interface StorageInterface extends AutoCloseable, Plugin {

/**
* Opens any resources or perform any pre-checks for initializing this storage.
*
* @throws IOException if an error happens during initialization.
*/
default void init() throws IOException {
// no-op
}

/**
* Closes any resources used by this class.
*/
@Override
default void close() {
// no-op
}

@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
InputStream get(String tenantId, URI uri) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package io.kestra.core.storages;

import io.kestra.core.exceptions.KestraRuntimeException;
import io.kestra.core.models.Plugin;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.plugins.RegisteredPlugin;
import io.kestra.core.serializers.JacksonMapper;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Validator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Factor class for constructing {@link StorageInterface} objects.
*/
public final class StorageInterfaceFactory {

private static final Logger log = LoggerFactory.getLogger(StorageInterfaceFactory.class);
public static final String KESTRA_STORAGE_TYPE_CONFIG = "kestra.storage.type";

/**
* Factory method for constructing and validating new {@link StorageInterface} of the given type with the given configuration.
*
* @param pluginRegistry The {@link PluginRegistry}. cannot be {@code null}.
* @param pluginId The ID of the storage. cannot be {@code null}.
* @param pluginConfiguration The configuration of the storage. cannot be {@code null}.
* @param validator The {@link Validator}.
* @return a new {@link StorageInterface}.
* @throws KestraRuntimeException if no storage can be found.
*/
public static StorageInterface make(final PluginRegistry pluginRegistry,
final String pluginId,
final Map<String, Object> pluginConfiguration,
final Validator validator) {
Optional<Class<? extends StorageInterface>> optional = allStorageClasses(pluginRegistry)
.filter(clazz -> Plugin.getId(clazz).map(id -> id.equalsIgnoreCase(pluginId)).orElse(false))
.findFirst();

if (optional.isEmpty()) {
String storageIds = getLoggableStorageIds(pluginRegistry);
throw new KestraRuntimeException(String.format(
"No storage interface can be found for '%s=%s'. Supported types are: %s", KESTRA_STORAGE_TYPE_CONFIG, pluginId, storageIds
));
}

Class<? extends StorageInterface> pluginClass = optional.get();

// Storage are handle as any serializable/deserialize plugins.
StorageInterface plugin;
try {
// Make sure config is not null, otherwise deserialization result will be null too.
Map<String, Object> nonEmptyConfig = Optional.ofNullable(pluginConfiguration).orElse(Map.of());
plugin = JacksonMapper.toMap(nonEmptyConfig, pluginClass);
} catch (Exception e) {
throw new KestraRuntimeException(String.format(
"Failed to create storage '%s'. Error: %s", pluginId, e.getMessage())
);
}

// Validate configuration.
Set<ConstraintViolation<StorageInterface>> violations;
try {
violations = validator.validate(plugin);
} catch (ConstraintViolationException e) {
throw new KestraRuntimeException(String.format(
"Failed to validate configuration for storage '%s'. Error: %s", pluginId, e.getMessage())
);
}
if (!violations.isEmpty()) {
ConstraintViolationException e = new ConstraintViolationException(violations);
throw new KestraRuntimeException(String.format(
"Invalid configuration for storage '%s'. Error: '%s'", pluginId, e.getMessage()), e
);
}

try {
plugin.init();
} catch (IOException e) {
throw new KestraRuntimeException(String.format(
"Failed to initialize storage '%s'. Error: %s", pluginId, e.getMessage()), e
);
}
return plugin;
}

public static String getLoggableStorageIds(final PluginRegistry pluginRegistry) {
return allIdsFor(allStorageClasses(pluginRegistry));
}

/**
* @return all plugin classes for the {@link StorageInterface}s.
*/
private static Stream<Class<? extends StorageInterface>> allStorageClasses(final PluginRegistry pluginRegistry) {
return pluginRegistry.plugins()
.stream()
.map(RegisteredPlugin::getStorages)
.flatMap(List::stream);
}

/**
* @return all plugin identifier for the {@link StorageInterface}s.
*/
private static String allIdsFor(final Stream<Class<? extends StorageInterface>> classes) {
return classes
.map(Plugin::getId)
.flatMap(Optional::stream)
.collect(Collectors.joining(",", "[", "]"));
}
}
Loading

0 comments on commit b381ee5

Please sign in to comment.