From a8fce5b9a8b776abbaf6587071a2c6c62f43566d Mon Sep 17 00:00:00 2001 From: Florian Hussonnois Date: Thu, 25 Apr 2024 18:00:13 +0200 Subject: [PATCH] chore(s3): migrate to new StorageInterface --- build.gradle | 9 -- .../io/kestra/storage/s3/S3ClientFactory.java | 39 +++--- .../java/io/kestra/storage/s3/S3Config.java | 105 +++++++++------ .../java/io/kestra/storage/s3/S3Storage.java | 121 ++++++++++++------ .../kestra/storage/s3/S3StorageEnabled.java | 12 -- .../io/kestra/storage/s3/S3ConfigTest.java | 18 --- 6 files changed, 167 insertions(+), 137 deletions(-) delete mode 100644 src/main/java/io/kestra/storage/s3/S3StorageEnabled.java delete mode 100644 src/test/java/io/kestra/storage/s3/S3ConfigTest.java diff --git a/build.gradle b/build.gradle index 2b5f659..3ac8169 100644 --- a/build.gradle +++ b/build.gradle @@ -37,15 +37,6 @@ dependencies { annotationProcessor "org.projectlombok:lombok:$lombokVersion" compileOnly "org.projectlombok:lombok:$lombokVersion" - // micronaut - annotationProcessor platform("io.micronaut.platform:micronaut-platform:$micronautVersion") - annotationProcessor "io.micronaut:micronaut-inject-java" - annotationProcessor "io.micronaut.validation:micronaut-validation-processor" - - compileOnly platform("io.micronaut.platform:micronaut-platform:$micronautVersion") - compileOnly "io.micronaut:micronaut-inject" - compileOnly "io.micronaut.validation:micronaut-validation" - // kestra annotationProcessor group: "io.kestra", name: "processor", version: kestraVersion compileOnly group: "io.kestra", name: "core", version: kestraVersion diff --git a/src/main/java/io/kestra/storage/s3/S3ClientFactory.java b/src/main/java/io/kestra/storage/s3/S3ClientFactory.java index 7adb867..ce7961e 100644 --- a/src/main/java/io/kestra/storage/s3/S3ClientFactory.java +++ b/src/main/java/io/kestra/storage/s3/S3ClientFactory.java @@ -6,7 +6,6 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -28,12 +27,12 @@ public static S3Client getS3Client(final S3Config s3Config) { // Use the httpClientBuilder to delegate the lifecycle management of the HTTP client to the AWS SDK .httpClientBuilder(serviceDefaults -> ApacheHttpClient.builder().build()); - if (s3Config.endpoint() != null) { - clientBuilder.endpointOverride(URI.create(s3Config.endpoint())); + if (s3Config.getEndpoint() != null) { + clientBuilder.endpointOverride(URI.create(s3Config.getEndpoint())); } - if (s3Config.region() != null) { - clientBuilder.region(Region.of(s3Config.region())); + if (s3Config.getRegion() != null) { + clientBuilder.region(Region.of(s3Config.getRegion())); } return clientBuilder @@ -44,12 +43,12 @@ public static S3Client getS3Client(final S3Config s3Config) { public static S3AsyncClient getAsyncS3Client(final S3Config s3Config) { S3CrtAsyncClientBuilder clientBuilder = S3AsyncClient.crtBuilder(); - if (s3Config.endpoint() != null) { - clientBuilder.endpointOverride(URI.create(s3Config.endpoint())); + if (s3Config.getEndpoint() != null) { + clientBuilder.endpointOverride(URI.create(s3Config.getEndpoint())); } - if (s3Config.region() != null) { - clientBuilder.region(Region.of(s3Config.region())); + if (s3Config.getRegion() != null) { + clientBuilder.region(Region.of(s3Config.getRegion())); } return clientBuilder @@ -66,13 +65,13 @@ public static S3AsyncClient getAsyncS3Client(final S3Config s3Config) { */ private static AwsCredentialsProvider getCredentials(final S3Config config) { // StsAssumeRoleCredentialsProvider - if (StringUtils.isNotEmpty(config.stsRoleArn())) { + if (StringUtils.isNotEmpty(config.getStsRoleArn())) { return stsAssumeRoleCredentialsProvider(config); } // StaticCredentialsProvider - if (StringUtils.isNotEmpty(config.accessKey()) && - StringUtils.isNotEmpty(config.secretKey())) { + if (StringUtils.isNotEmpty(config.getAccessKey()) && + StringUtils.isNotEmpty(config.getSecretKey())) { return staticCredentialsProvider(config); } @@ -88,8 +87,8 @@ private static AwsCredentialsProvider getCredentials(final S3Config config) { */ private static StaticCredentialsProvider staticCredentialsProvider(final S3Config config) { final AwsCredentials credentials = AwsBasicCredentials.create( - config.accessKey(), - config.secretKey() + config.getAccessKey(), + config.getSecretKey() ); return StaticCredentialsProvider.create(credentials); } @@ -101,14 +100,14 @@ private static StaticCredentialsProvider staticCredentialsProvider(final S3Confi * @return a new {@link StsAssumeRoleCredentialsProvider}. */ private static StsAssumeRoleCredentialsProvider stsAssumeRoleCredentialsProvider(final S3Config config) { - String roleSessionName = config.stsRoleSessionName(); + String roleSessionName = config.getStsRoleSessionName(); roleSessionName = roleSessionName != null ? roleSessionName : "kestra-storage-s3-" + System.currentTimeMillis(); final AssumeRoleRequest assumeRoleRequest = AssumeRoleRequest.builder() - .roleArn(config.stsRoleArn()) + .roleArn(config.getStsRoleArn()) .roleSessionName(roleSessionName) - .durationSeconds((int) config.stsRoleSessionDuration().toSeconds()) - .externalId(config.stsRoleExternalId()) + .durationSeconds((int) config.getStsRoleSessionDuration().toSeconds()) + .externalId(config.getStsRoleExternalId()) .build(); return StsAssumeRoleCredentialsProvider.builder() @@ -126,14 +125,14 @@ private static StsAssumeRoleCredentialsProvider stsAssumeRoleCredentialsProvider private static StsClient stsClient(final S3Config config) { StsClientBuilder builder = StsClient.builder(); - final String stsEndpointOverride = config.stsEndpointOverride(); + final String stsEndpointOverride = config.getStsEndpointOverride(); if (stsEndpointOverride != null) { builder.applyMutation(stsClientBuilder -> stsClientBuilder.endpointOverride(URI.create(stsEndpointOverride)) ); } - final String regionString = config.region(); + final String regionString = config.getRegion(); if (regionString != null) { builder.applyMutation(stsClientBuilder -> stsClientBuilder.region(Region.of(regionString)) diff --git a/src/main/java/io/kestra/storage/s3/S3Config.java b/src/main/java/io/kestra/storage/s3/S3Config.java index b225e2d..1a4423f 100644 --- a/src/main/java/io/kestra/storage/s3/S3Config.java +++ b/src/main/java/io/kestra/storage/s3/S3Config.java @@ -1,46 +1,73 @@ package io.kestra.storage.s3; -import io.micronaut.context.annotation.ConfigurationProperties; -import io.micronaut.core.bind.annotation.Bindable; -import jakarta.annotation.Nullable; -import jakarta.inject.Singleton; +import io.kestra.core.models.annotations.PluginProperty; +import io.swagger.v3.oas.annotations.media.Schema; import java.time.Duration; -/** - * The AWS S3 configuration. - * - * @param bucket The S3 bucket name. - * @param region The S3 region. - * @param endpoint The S3 endpoint. - * @param accessKey The AWS Access Key ID - * @param secretKey The AWS Secret Key. - * @param stsRoleArn The AWS STS Role. - * @param stsRoleExternalId The AWS STS External ID. - * @param stsRoleSessionName The AWS STS Session name. - * @param stsRoleSessionDuration The AWS STS Session duration. - * @param stsEndpointOverride The AWS STS Endpoint. - */ -@Singleton -@S3StorageEnabled -@ConfigurationProperties("kestra.storage.s3") -public record S3Config( - String bucket, - - String region, - - @Nullable String endpoint, - - @Nullable String accessKey, - - @Nullable String secretKey, - @Nullable String stsRoleArn, - @Nullable String stsRoleExternalId, - @Nullable String stsRoleSessionName, - @Nullable String stsEndpointOverride, - - @Bindable(defaultValue = "15m") - Duration stsRoleSessionDuration -) { +public interface S3Config { + Duration AWS_MIN_STS_ROLE_SESSION_DURATION = Duration.ofSeconds(900); + + @Schema( + title = "The S3 bucket where to store internal objects." + ) + @PluginProperty + String getBucket(); + + @Schema( + title = "AWS region with which the SDK should communicate." + ) + @PluginProperty + String getRegion(); + + @PluginProperty + String getEndpoint(); + + @Schema( + title = "Access Key Id in order to connect to AWS.", + description = "If no connection is defined, we will use the `DefaultCredentialsProvider` to fetch the value." + ) + @PluginProperty + String getAccessKey(); + + @Schema( + title = "Secret Key Id in order to connect to AWS.", + description = "If no connection is defined, we will use the `DefaultCredentialsProvider` to fetch the value." + ) + @PluginProperty + String getSecretKey(); + + @Schema( + title = "AWS STS Role.", + description = "The Amazon Resource Name (ARN) of the role to assume. If set the task will use the `StsAssumeRoleCredentialsProvider`. Otherwise, the `StaticCredentialsProvider` will be used with the provided Access Key Id and Secret Key." + ) + @PluginProperty + String getStsRoleArn(); + + @Schema( + title = "AWS STS External Id.", + description = " A unique identifier that might be required when you assume a role in another account. This property is only used when an `stsRoleArn` is defined." + ) + @PluginProperty + String getStsRoleExternalId(); + + @Schema( + title = "AWS STS Session name. This property is only used when an `stsRoleArn` is defined." + ) + @PluginProperty + String getStsRoleSessionName(); + + @Schema( + title = "The AWS STS endpoint with which the SDKClient should communicate." + ) + @PluginProperty + String getStsEndpointOverride(); + + @Schema( + title = "AWS STS Session duration.", + description = "The duration of the role session (default: 15 minutes, i.e., PT15M). This property is only used when an `stsRoleArn` is defined." + ) + @PluginProperty + java.time.Duration getStsRoleSessionDuration(); } diff --git a/src/main/java/io/kestra/storage/s3/S3Storage.java b/src/main/java/io/kestra/storage/s3/S3Storage.java index 9ee504a..8e6384a 100644 --- a/src/main/java/io/kestra/storage/s3/S3Storage.java +++ b/src/main/java/io/kestra/storage/s3/S3Storage.java @@ -3,9 +3,13 @@ import io.kestra.core.models.annotations.Plugin; import io.kestra.core.storages.FileAttributes; import io.kestra.core.storages.StorageInterface; -import io.micronaut.core.annotation.Introspected; -import jakarta.inject.Inject; -import jakarta.inject.Singleton; +import jakarta.validation.constraints.NotEmpty; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.extern.jackson.Jacksonized; import org.apache.commons.lang3.StringUtils; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.ResponseInputStream; @@ -14,7 +18,24 @@ import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.*; +import software.amazon.awssdk.services.s3.model.CopyObjectRequest; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.DeletedObject; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Object; import software.amazon.awssdk.transfer.s3.S3TransferManager; import software.amazon.awssdk.transfer.s3.model.Upload; import software.amazon.awssdk.transfer.s3.model.UploadRequest; @@ -25,6 +46,7 @@ import java.io.InputStream; import java.net.URI; import java.nio.file.Path; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -35,38 +57,59 @@ import static io.kestra.core.utils.Rethrow.throwFunction; -@Singleton -@Introspected -@S3StorageEnabled +@AllArgsConstructor +@NoArgsConstructor +@Builder +@Jacksonized +@Getter @Plugin -public class S3Storage implements StorageInterface { - private final S3Client s3Client; +@Plugin.Id("s3") +public class S3Storage implements S3Config, StorageInterface { - private final S3AsyncClient s3AsyncClient; + @NotEmpty + private String bucket; + private String region; + private String endpoint; + // Configuration for StaticCredentialsProvider + private String accessKey; + private String secretKey; - private final S3Config s3Config; + // Configuration for AWS STS AssumeRole + private String stsRoleArn; + private String stsRoleExternalId; + private String stsRoleSessionName; + private String stsEndpointOverride; - /** - * No-arg constructor - required for Kestra plugin. - */ - public S3Storage() { - this.s3Config = null; - this.s3AsyncClient = null; - this.s3Client = null; - } + @Builder.Default + private Duration stsRoleSessionDuration = AWS_MIN_STS_ROLE_SESSION_DURATION; + + @Getter(AccessLevel.PRIVATE) + private S3Client s3Client; + + @Getter(AccessLevel.PRIVATE) + private S3AsyncClient s3AsyncClient; - @Inject - public S3Storage(final S3Config s3Config) { - this.s3Config = s3Config; - this.s3Client = S3ClientFactory.getS3Client(s3Config); - this.s3AsyncClient = S3ClientFactory.getAsyncS3Client(s3Config); + /** + * {@inheritDoc} + **/ + @Override + public void init() { + try { + System.out.println("ThreadContextClassLoader: " + Thread.currentThread().getContextClassLoader()); + Class aClass = Class.forName("software.amazon.awssdk.crt.s3.S3MetaRequest"); + System.out.println("ClassLoader for loaded class: " + aClass.getClassLoader()); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + this.s3Client = S3ClientFactory.getS3Client(this); + this.s3AsyncClient = S3ClientFactory.getAsyncS3Client(this); } public String createBucket() throws IOException { try { - CreateBucketRequest request = CreateBucketRequest.builder().bucket(s3Config.bucket()).build(); + CreateBucketRequest request = CreateBucketRequest.builder().bucket(this.getBucket()).build(); s3Client.createBucket(request); - return s3Config.bucket(); + return this.getBucket(); } catch (AwsServiceException exception) { throw new IOException(exception); } @@ -89,12 +132,12 @@ public List allByPrefix(String tenantId, URI prefix, boolean includeDirecto private InputStream get(String path) throws IOException { try { GetObjectRequest request = GetObjectRequest.builder() - .bucket(s3Config.bucket()) + .bucket(this.getBucket()) .key(path) .build(); ResponseInputStream inputStream = s3Client.getObject(request); boolean isEmpty = inputStream.response().contentLength() == 0; - if(isEmpty) { + if (isEmpty) { inputStream.close(); return InputStream.nullInputStream(); @@ -130,7 +173,7 @@ public List list(String tenantId, URI uri) throws IOException { private Stream keysForPrefix(String prefix, boolean recursive, boolean includeDirectories) { ListObjectsV2Request request = ListObjectsV2Request.builder() - .bucket(s3Config.bucket()) + .bucket(this.getBucket()) .prefix(prefix) .build(); List contents = s3Client.listObjectsV2(request).contents(); @@ -163,7 +206,7 @@ public FileAttributes getAttributes(String tenantId, URI uri) throws IOException private FileAttributes getFileAttributes(String path) throws IOException { try { HeadObjectRequest headObjectRequest = HeadObjectRequest.builder() - .bucket(s3Config.bucket()) + .bucket(this.getBucket()) .key(path) .build(); S3FileAttributes.S3FileAttributesBuilder builder = S3FileAttributes.builder() @@ -190,7 +233,7 @@ public URI put(String tenantId, URI uri, InputStream data) throws IOException { String path = getPath(tenantId, uri); mkdirs(path); PutObjectRequest request = PutObjectRequest.builder() - .bucket(s3Config.bucket()) + .bucket(this.getBucket()) .key(path) .build(); @@ -233,7 +276,7 @@ public boolean delete(String tenantId, URI uri) throws IOException { private boolean deleteSingleObject(String path) { DeleteObjectRequest deleteRequest = DeleteObjectRequest.builder() - .bucket(s3Config.bucket()) + .bucket(this.getBucket()) .key(path) .build(); @@ -248,7 +291,7 @@ public URI createDirectory(String tenantId, URI uri) throws IOException { } mkdirs(path); PutObjectRequest putRequest = PutObjectRequest.builder() - .bucket(s3Config.bucket()) + .bucket(this.getBucket()) .key(path) .build(); s3Client.putObject(putRequest, RequestBody.empty()); @@ -264,7 +307,7 @@ private void mkdirs(String path) throws IOException { for (int i = 0; i <= directories.length - (path.endsWith("/") ? 1 : 2); i++) { aggregatedPath.append(directories[i]).append("/"); PutObjectRequest putRequest = PutObjectRequest.builder() - .bucket(s3Config.bucket()) + .bucket(this.getBucket()) .key(aggregatedPath.toString()) .build(); s3Client.putObject(putRequest, RequestBody.empty()); @@ -282,7 +325,7 @@ public URI move(String tenantId, URI from, URI to) throws IOException { FileAttributes attributes = getAttributes(tenantId, from); if (attributes.getType() == FileAttributes.FileType.Directory) { ListObjectsV2Request listRequest = ListObjectsV2Request.builder() - .bucket(s3Config.bucket()) + .bucket(this.getBucket()) .prefix(source) .build(); @@ -308,9 +351,9 @@ public URI move(String tenantId, URI from, URI to) throws IOException { private void move(String oldKey, String newKey) { CopyObjectRequest copyRequest = CopyObjectRequest.builder() - .sourceBucket(s3Config.bucket()) + .sourceBucket(this.getBucket()) .sourceKey(oldKey) - .destinationBucket(s3Config.bucket()) + .destinationBucket(this.getBucket()) .destinationKey(newKey) .build(); s3Client.copyObject(copyRequest); @@ -321,7 +364,7 @@ private void move(String oldKey, String newKey) { @Override public List deleteByPrefix(String tenantId, URI storagePrefix) throws IOException { ListObjectsRequest listRequest = ListObjectsRequest.builder() - .bucket(s3Config.bucket()) + .bucket(this.getBucket()) .prefix(getPath(tenantId, storagePrefix)) .build(); ListObjectsResponse objectListing = s3Client.listObjects(listRequest); @@ -336,7 +379,7 @@ public List deleteByPrefix(String tenantId, URI storagePrefix) throws IOExc .toList(); DeleteObjectsRequest deleteRequest = DeleteObjectsRequest.builder() - .bucket(s3Config.bucket()) + .bucket(this.getBucket()) .delete(builder -> builder.objects(keys)) .build(); diff --git a/src/main/java/io/kestra/storage/s3/S3StorageEnabled.java b/src/main/java/io/kestra/storage/s3/S3StorageEnabled.java deleted file mode 100644 index 2fcab0d..0000000 --- a/src/main/java/io/kestra/storage/s3/S3StorageEnabled.java +++ /dev/null @@ -1,12 +0,0 @@ -package io.kestra.storage.s3; - -import io.micronaut.context.annotation.Requires; - -import java.lang.annotation.*; - -@Documented -@Retention(RetentionPolicy.RUNTIME) -@Target({ElementType.PACKAGE, ElementType.TYPE}) -@Requires(property = "kestra.storage.type", value = "s3") -public @interface S3StorageEnabled { -} diff --git a/src/test/java/io/kestra/storage/s3/S3ConfigTest.java b/src/test/java/io/kestra/storage/s3/S3ConfigTest.java deleted file mode 100644 index fe88f45..0000000 --- a/src/test/java/io/kestra/storage/s3/S3ConfigTest.java +++ /dev/null @@ -1,18 +0,0 @@ -package io.kestra.storage.s3; - -import io.micronaut.context.annotation.Replaces; -import jakarta.inject.Singleton; - -@Singleton -@Replaces(S3Config.class) -public class S3ConfigTest { - String bucket; - - String region; - - String endpoint; - - String accessKey; - - String secretKey; -}