diff --git a/src/main/java/io/kestra/storage/s3/S3Storage.java b/src/main/java/io/kestra/storage/s3/S3Storage.java index e33a51a..19fa864 100644 --- a/src/main/java/io/kestra/storage/s3/S3Storage.java +++ b/src/main/java/io/kestra/storage/s3/S3Storage.java @@ -1,5 +1,6 @@ package io.kestra.storage.s3; +import com.google.common.annotations.VisibleForTesting; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.storages.FileAttributes; import io.kestra.core.storages.StorageInterface; @@ -20,24 +21,7 @@ import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.CopyObjectRequest; -import software.amazon.awssdk.services.s3.model.CreateBucketRequest; -import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; -import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; -import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; -import software.amazon.awssdk.services.s3.model.DeletedObject; -import software.amazon.awssdk.services.s3.model.GetObjectRequest; -import software.amazon.awssdk.services.s3.model.GetObjectResponse; -import software.amazon.awssdk.services.s3.model.HeadObjectRequest; -import software.amazon.awssdk.services.s3.model.ListObjectsRequest; -import software.amazon.awssdk.services.s3.model.ListObjectsResponse; -import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; -import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; -import software.amazon.awssdk.services.s3.model.NoSuchKeyException; -import software.amazon.awssdk.services.s3.model.ObjectIdentifier; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; -import software.amazon.awssdk.services.s3.model.S3Exception; -import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.services.s3.model.*; import software.amazon.awssdk.transfer.s3.S3TransferManager; import software.amazon.awssdk.transfer.s3.model.Download; import software.amazon.awssdk.transfer.s3.model.DownloadRequest; @@ -104,16 +88,35 @@ public void init() { this.s3AsyncClient = S3ClientFactory.getAsyncS3Client(this); } + @Override + public boolean exists(String tenantId, URI uri) { + String path = getPath(tenantId, uri); + return exists(path); + } + + private boolean exists(String path) { + try { + HeadObjectRequest headObjectRequest = HeadObjectRequest.builder() + .bucket(this.getBucket()) + .key(path) + .build(); + s3Client.headObject(headObjectRequest); + return true; + } catch (NoSuchKeyException e) { + return false; + } + } + @Override public InputStream get(String tenantId, URI uri) throws IOException { return this.getWithMetadata(tenantId, uri).inputStream(); } - public String createBucket() throws IOException { + @VisibleForTesting + void createBucket() throws IOException { try { CreateBucketRequest request = CreateBucketRequest.builder().bucket(this.getBucket()).build(); s3Client.createBucket(request); - return this.getBucket(); } catch (AwsServiceException exception) { throw new IOException(exception); } @@ -242,7 +245,6 @@ public URI put(String tenantId, URI uri, StorageObject storageObject) throws IOE InputStream data = storageObject.inputStream(); S3TransferManager transferManager = S3TransferManager.builder().s3Client(s3AsyncClient).build() ) { - String path = getPath(tenantId, uri); mkdirs(path); PutObjectRequest request = PutObjectRequest.builder() @@ -314,13 +316,21 @@ public URI createDirectory(String tenantId, URI uri) throws IOException { } private void mkdirs(String path) throws IOException { - path = path.replaceAll("^/*", ""); + if (!path.endsWith("/")) { + path = path.substring(0, path.lastIndexOf("/") + 1); + } + + // check if it exists before creating it + if (exists(path)) { + return; + } + String[] directories = path.split("/"); - StringBuilder aggregatedPath = new StringBuilder("/"); + StringBuilder aggregatedPath = new StringBuilder(); try { // perform 1 put request per parent directory in the path - for (int i = 0; i <= directories.length - (path.endsWith("/") ? 1 : 2); i++) { - aggregatedPath.append(directories[i]).append("/"); + for (String directory : directories) { + aggregatedPath.append(directory).append("/"); PutObjectRequest putRequest = PutObjectRequest.builder() .bucket(this.getBucket()) .key(aggregatedPath.toString()) diff --git a/src/test/java/io/kestra/storage/s3/S3StorageTest.java b/src/test/java/io/kestra/storage/s3/S3StorageTest.java index 49806a0..a51800c 100644 --- a/src/test/java/io/kestra/storage/s3/S3StorageTest.java +++ b/src/test/java/io/kestra/storage/s3/S3StorageTest.java @@ -4,10 +4,7 @@ import io.kestra.core.storages.StorageInterface; import io.kestra.core.utils.IdUtils; import jakarta.inject.Inject; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; import org.testcontainers.containers.localstack.LocalStackContainer; import org.testcontainers.utility.DockerImageName; import software.amazon.awssdk.core.ResponseInputStream; @@ -18,22 +15,31 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; +@TestInstance(TestInstance.Lifecycle.PER_CLASS) class S3StorageTest extends StorageTestSuite { - @Inject StorageInterface storageInterface; private static LocalStackContainer localstack; @BeforeAll - static void startLocalstack() { - localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:1.4.0")); + void startLocalstack() throws IOException { + localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:3.8.1")); // some tests use a real flow with hardcoded configuration, so we have to fix the binding port localstack.setPortBindings(java.util.List.of("4566:4566")); localstack.start(); + + storageInterface = S3Storage.builder() + .accessKey(localstack.getAccessKey()) + .secretKey(localstack.getSecretKey()) + .bucket("kestra-unit-test") + .region(localstack.getRegion()) + .endpoint(localstack.getEndpoint().toString()) + .build(); + storageInterface.init(); } @AfterAll - static void stopLocalstack() { + void stopLocalstack() { if (localstack != null) { localstack.stop(); } diff --git a/src/test/resources/application-test.yml b/src/test/resources/application-test.yml index b2ec6cf..860416d 100644 --- a/src/test/resources/application-test.yml +++ b/src/test/resources/application-test.yml @@ -5,5 +5,5 @@ kestra: bucket: "kestra-unit-test" accessKey: "accesskey" secretKey: "secretkey" - region: "eu-west-3" + region: "us-east-1" endpoint: "http://127.0.0.1:4566"