Skip to content

Commit

Permalink
feat: avoid recreating existing dir
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Oct 10, 2024
1 parent b1b999e commit 7c084a6
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 34 deletions.
60 changes: 35 additions & 25 deletions src/main/java/io/kestra/storage/s3/S3Storage.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.storage.s3;

import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.StorageInterface;
Expand All @@ -20,24 +21,7 @@
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.DeletedObject;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.*;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.Download;
import software.amazon.awssdk.transfer.s3.model.DownloadRequest;
Expand Down Expand Up @@ -104,16 +88,35 @@ public void init() {
this.s3AsyncClient = S3ClientFactory.getAsyncS3Client(this);
}

@Override
public boolean exists(String tenantId, URI uri) {
String path = getPath(tenantId, uri);
return exists(path);
}

private boolean exists(String path) {
try {
HeadObjectRequest headObjectRequest = HeadObjectRequest.builder()
.bucket(this.getBucket())
.key(path)
.build();
s3Client.headObject(headObjectRequest);
return true;
} catch (NoSuchKeyException e) {
return false;
}
}

@Override
public InputStream get(String tenantId, URI uri) throws IOException {
return this.getWithMetadata(tenantId, uri).inputStream();
}

public String createBucket() throws IOException {
@VisibleForTesting
void createBucket() throws IOException {
try {
CreateBucketRequest request = CreateBucketRequest.builder().bucket(this.getBucket()).build();
s3Client.createBucket(request);
return this.getBucket();
} catch (AwsServiceException exception) {
throw new IOException(exception);
}
Expand Down Expand Up @@ -242,7 +245,6 @@ public URI put(String tenantId, URI uri, StorageObject storageObject) throws IOE
InputStream data = storageObject.inputStream();
S3TransferManager transferManager = S3TransferManager.builder().s3Client(s3AsyncClient).build()
) {

String path = getPath(tenantId, uri);
mkdirs(path);
PutObjectRequest request = PutObjectRequest.builder()
Expand Down Expand Up @@ -314,13 +316,21 @@ public URI createDirectory(String tenantId, URI uri) throws IOException {
}

private void mkdirs(String path) throws IOException {
path = path.replaceAll("^/*", "");
if (!path.endsWith("/")) {
path = path.substring(0, path.lastIndexOf("/") + 1);
}

// check if it exists before creating it
if (exists(path)) {
return;
}

String[] directories = path.split("/");
StringBuilder aggregatedPath = new StringBuilder("/");
StringBuilder aggregatedPath = new StringBuilder();
try {
// perform 1 put request per parent directory in the path
for (int i = 0; i <= directories.length - (path.endsWith("/") ? 1 : 2); i++) {
aggregatedPath.append(directories[i]).append("/");
for (String directory : directories) {
aggregatedPath.append(directory).append("/");
PutObjectRequest putRequest = PutObjectRequest.builder()
.bucket(this.getBucket())
.key(aggregatedPath.toString())
Expand Down
22 changes: 14 additions & 8 deletions src/test/java/io/kestra/storage/s3/S3StorageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.IdUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.*;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.core.ResponseInputStream;
Expand All @@ -18,22 +15,31 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class S3StorageTest extends StorageTestSuite {
@Inject
StorageInterface storageInterface;

private static LocalStackContainer localstack;

@BeforeAll
static void startLocalstack() {
localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:1.4.0"));
void startLocalstack() throws IOException {
localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:3.8.1"));
// some tests use a real flow with hardcoded configuration, so we have to fix the binding port
localstack.setPortBindings(java.util.List.of("4566:4566"));
localstack.start();

storageInterface = S3Storage.builder()
.accessKey(localstack.getAccessKey())
.secretKey(localstack.getSecretKey())
.bucket("kestra-unit-test")
.region(localstack.getRegion())
.endpoint(localstack.getEndpoint().toString())
.build();
storageInterface.init();
}

@AfterAll
static void stopLocalstack() {
void stopLocalstack() {
if (localstack != null) {
localstack.stop();
}
Expand Down
2 changes: 1 addition & 1 deletion src/test/resources/application-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ kestra:
bucket: "kestra-unit-test"
accessKey: "accesskey"
secretKey: "secretkey"
region: "eu-west-3"
region: "us-east-1"
endpoint: "http://127.0.0.1:4566"

0 comments on commit 7c084a6

Please sign in to comment.