Skip to content

Commit

Permalink
chore(s3): migrate to new StorageInterface
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Apr 29, 2024
1 parent 975fad4 commit a8fce5b
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 137 deletions.
9 changes: 0 additions & 9 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,6 @@ dependencies {
annotationProcessor "org.projectlombok:lombok:$lombokVersion"
compileOnly "org.projectlombok:lombok:$lombokVersion"

// micronaut
annotationProcessor platform("io.micronaut.platform:micronaut-platform:$micronautVersion")
annotationProcessor "io.micronaut:micronaut-inject-java"
annotationProcessor "io.micronaut.validation:micronaut-validation-processor"

compileOnly platform("io.micronaut.platform:micronaut-platform:$micronautVersion")
compileOnly "io.micronaut:micronaut-inject"
compileOnly "io.micronaut.validation:micronaut-validation"

// kestra
annotationProcessor group: "io.kestra", name: "processor", version: kestraVersion
compileOnly group: "io.kestra", name: "core", version: kestraVersion
Expand Down
39 changes: 19 additions & 20 deletions src/main/java/io/kestra/storage/s3/S3ClientFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
Expand All @@ -28,12 +27,12 @@ public static S3Client getS3Client(final S3Config s3Config) {
// Use the httpClientBuilder to delegate the lifecycle management of the HTTP client to the AWS SDK
.httpClientBuilder(serviceDefaults -> ApacheHttpClient.builder().build());

if (s3Config.endpoint() != null) {
clientBuilder.endpointOverride(URI.create(s3Config.endpoint()));
if (s3Config.getEndpoint() != null) {
clientBuilder.endpointOverride(URI.create(s3Config.getEndpoint()));
}

if (s3Config.region() != null) {
clientBuilder.region(Region.of(s3Config.region()));
if (s3Config.getRegion() != null) {
clientBuilder.region(Region.of(s3Config.getRegion()));
}

return clientBuilder
Expand All @@ -44,12 +43,12 @@ public static S3Client getS3Client(final S3Config s3Config) {
public static S3AsyncClient getAsyncS3Client(final S3Config s3Config) {
S3CrtAsyncClientBuilder clientBuilder = S3AsyncClient.crtBuilder();

if (s3Config.endpoint() != null) {
clientBuilder.endpointOverride(URI.create(s3Config.endpoint()));
if (s3Config.getEndpoint() != null) {
clientBuilder.endpointOverride(URI.create(s3Config.getEndpoint()));
}

if (s3Config.region() != null) {
clientBuilder.region(Region.of(s3Config.region()));
if (s3Config.getRegion() != null) {
clientBuilder.region(Region.of(s3Config.getRegion()));
}

return clientBuilder
Expand All @@ -66,13 +65,13 @@ public static S3AsyncClient getAsyncS3Client(final S3Config s3Config) {
*/
private static AwsCredentialsProvider getCredentials(final S3Config config) {
// StsAssumeRoleCredentialsProvider
if (StringUtils.isNotEmpty(config.stsRoleArn())) {
if (StringUtils.isNotEmpty(config.getStsRoleArn())) {
return stsAssumeRoleCredentialsProvider(config);
}

// StaticCredentialsProvider
if (StringUtils.isNotEmpty(config.accessKey()) &&
StringUtils.isNotEmpty(config.secretKey())) {
if (StringUtils.isNotEmpty(config.getAccessKey()) &&
StringUtils.isNotEmpty(config.getSecretKey())) {
return staticCredentialsProvider(config);
}

Expand All @@ -88,8 +87,8 @@ private static AwsCredentialsProvider getCredentials(final S3Config config) {
*/
private static StaticCredentialsProvider staticCredentialsProvider(final S3Config config) {
final AwsCredentials credentials = AwsBasicCredentials.create(
config.accessKey(),
config.secretKey()
config.getAccessKey(),
config.getSecretKey()
);
return StaticCredentialsProvider.create(credentials);
}
Expand All @@ -101,14 +100,14 @@ private static StaticCredentialsProvider staticCredentialsProvider(final S3Confi
* @return a new {@link StsAssumeRoleCredentialsProvider}.
*/
private static StsAssumeRoleCredentialsProvider stsAssumeRoleCredentialsProvider(final S3Config config) {
String roleSessionName = config.stsRoleSessionName();
String roleSessionName = config.getStsRoleSessionName();
roleSessionName = roleSessionName != null ? roleSessionName : "kestra-storage-s3-" + System.currentTimeMillis();

final AssumeRoleRequest assumeRoleRequest = AssumeRoleRequest.builder()
.roleArn(config.stsRoleArn())
.roleArn(config.getStsRoleArn())
.roleSessionName(roleSessionName)
.durationSeconds((int) config.stsRoleSessionDuration().toSeconds())
.externalId(config.stsRoleExternalId())
.durationSeconds((int) config.getStsRoleSessionDuration().toSeconds())
.externalId(config.getStsRoleExternalId())
.build();

return StsAssumeRoleCredentialsProvider.builder()
Expand All @@ -126,14 +125,14 @@ private static StsAssumeRoleCredentialsProvider stsAssumeRoleCredentialsProvider
private static StsClient stsClient(final S3Config config) {
StsClientBuilder builder = StsClient.builder();

final String stsEndpointOverride = config.stsEndpointOverride();
final String stsEndpointOverride = config.getStsEndpointOverride();
if (stsEndpointOverride != null) {
builder.applyMutation(stsClientBuilder ->
stsClientBuilder.endpointOverride(URI.create(stsEndpointOverride))
);
}

final String regionString = config.region();
final String regionString = config.getRegion();
if (regionString != null) {
builder.applyMutation(stsClientBuilder ->
stsClientBuilder.region(Region.of(regionString))
Expand Down
105 changes: 66 additions & 39 deletions src/main/java/io/kestra/storage/s3/S3Config.java
Original file line number Diff line number Diff line change
@@ -1,46 +1,73 @@
package io.kestra.storage.s3;

import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.core.bind.annotation.Bindable;
import jakarta.annotation.Nullable;
import jakarta.inject.Singleton;
import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;

import java.time.Duration;

/**
* The AWS S3 configuration.
*
* @param bucket The S3 bucket name.
* @param region The S3 region.
* @param endpoint The S3 endpoint.
* @param accessKey The AWS Access Key ID
* @param secretKey The AWS Secret Key.
* @param stsRoleArn The AWS STS Role.
* @param stsRoleExternalId The AWS STS External ID.
* @param stsRoleSessionName The AWS STS Session name.
* @param stsRoleSessionDuration The AWS STS Session duration.
* @param stsEndpointOverride The AWS STS Endpoint.
*/
@Singleton
@S3StorageEnabled
@ConfigurationProperties("kestra.storage.s3")
public record S3Config(
String bucket,

String region,

@Nullable String endpoint,

@Nullable String accessKey,

@Nullable String secretKey,
@Nullable String stsRoleArn,
@Nullable String stsRoleExternalId,
@Nullable String stsRoleSessionName,
@Nullable String stsEndpointOverride,

@Bindable(defaultValue = "15m")
Duration stsRoleSessionDuration
) {
public interface S3Config {

Duration AWS_MIN_STS_ROLE_SESSION_DURATION = Duration.ofSeconds(900);

@Schema(
title = "The S3 bucket where to store internal objects."
)
@PluginProperty
String getBucket();

@Schema(
title = "AWS region with which the SDK should communicate."
)
@PluginProperty
String getRegion();

@PluginProperty
String getEndpoint();

@Schema(
title = "Access Key Id in order to connect to AWS.",
description = "If no connection is defined, we will use the `DefaultCredentialsProvider` to fetch the value."
)
@PluginProperty
String getAccessKey();

@Schema(
title = "Secret Key Id in order to connect to AWS.",
description = "If no connection is defined, we will use the `DefaultCredentialsProvider` to fetch the value."
)
@PluginProperty
String getSecretKey();

@Schema(
title = "AWS STS Role.",
description = "The Amazon Resource Name (ARN) of the role to assume. If set the task will use the `StsAssumeRoleCredentialsProvider`. Otherwise, the `StaticCredentialsProvider` will be used with the provided Access Key Id and Secret Key."
)
@PluginProperty
String getStsRoleArn();

@Schema(
title = "AWS STS External Id.",
description = " A unique identifier that might be required when you assume a role in another account. This property is only used when an `stsRoleArn` is defined."
)
@PluginProperty
String getStsRoleExternalId();

@Schema(
title = "AWS STS Session name. This property is only used when an `stsRoleArn` is defined."
)
@PluginProperty
String getStsRoleSessionName();

@Schema(
title = "The AWS STS endpoint with which the SDKClient should communicate."
)
@PluginProperty
String getStsEndpointOverride();

@Schema(
title = "AWS STS Session duration.",
description = "The duration of the role session (default: 15 minutes, i.e., PT15M). This property is only used when an `stsRoleArn` is defined."
)
@PluginProperty
java.time.Duration getStsRoleSessionDuration();
}
Loading

0 comments on commit a8fce5b

Please sign in to comment.