Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] chore: update to use new secretinterface with tenant #14

Merged
merged 1 commit into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions src/main/java/io/kestra/storage/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ public String createBucket(String bucketName) throws IOException {
}

@Override
public InputStream get(URI uri) throws IOException {
public InputStream get(String tenantId, URI uri) throws IOException {
try {
GetObjectRequest request = GetObjectRequest.builder()
.bucket(s3Config.getBucket())
.key(uri.getPath())
.key(getPath(tenantId, uri))
.build();
return s3Client.getObject(request);
} catch (NoSuchKeyException exception) {
Expand All @@ -70,11 +70,11 @@ public InputStream get(URI uri) throws IOException {
}

@Override
public Long size(URI uri) throws IOException {
public Long size(String tenantId, URI uri) throws IOException {
try {
HeadObjectRequest headObjectRequest = HeadObjectRequest.builder()
.bucket(s3Config.getBucket())
.key(uri.getPath())
.key(getPath(tenantId, uri))
.build();
return s3Client.headObject(headObjectRequest).contentLength();
} catch (NoSuchKeyException exception) {
Expand All @@ -85,11 +85,11 @@ public Long size(URI uri) throws IOException {
}

@Override
public Long lastModifiedTime(URI uri) throws IOException {
public Long lastModifiedTime(String tenantId, URI uri) throws IOException {
try {
HeadObjectRequest headObjectRequest = HeadObjectRequest.builder()
.bucket(s3Config.getBucket())
.key(uri.getPath())
.key(getPath(tenantId, uri))
.build();
return s3Client.headObject(headObjectRequest).lastModified().getEpochSecond();
} catch (NoSuchKeyException exception) {
Expand All @@ -100,13 +100,13 @@ public Long lastModifiedTime(URI uri) throws IOException {
}

@Override
public URI put(URI uri, InputStream data) throws IOException {
public URI put(String tenantId, URI uri, InputStream data) throws IOException {
try {
int length = data.available();

PutObjectRequest request = PutObjectRequest.builder()
.bucket(s3Config.getBucket())
.key(uri.getPath())
.key(getPath(tenantId, uri))
.build();

Optional<Upload> upload;
Expand All @@ -132,17 +132,17 @@ public URI put(URI uri, InputStream data) throws IOException {
}

@Override
public boolean delete(URI uri) throws IOException {
public boolean delete(String tenantId, URI uri) throws IOException {
try {
try {
lastModifiedTime(uri);
lastModifiedTime(tenantId, uri);
} catch (FileNotFoundException exception) {
return false;
}

DeleteObjectRequest request = DeleteObjectRequest.builder()
.bucket(s3Config.getBucket())
.key(uri.getPath())
.key(getPath(tenantId, uri))
.build();

return s3Client.deleteObject(request).sdkHttpResponse().isSuccessful();
Expand All @@ -154,10 +154,10 @@ public boolean delete(URI uri) throws IOException {
}

@Override
public List<URI> deleteByPrefix(URI storagePrefix) throws IOException {
public List<URI> deleteByPrefix(String tenantId, URI storagePrefix) throws IOException {
ListObjectsRequest listRequest = ListObjectsRequest.builder()
.bucket(s3Config.getBucket())
.prefix(storagePrefix.getPath())
.prefix(getPath(tenantId, storagePrefix))
.build();
ListObjectsResponse objectListing = s3Client.listObjects(listRequest);

Expand All @@ -183,13 +183,16 @@ public List<URI> deleteByPrefix(URI storagePrefix) throws IOException {

return result.deleted().stream()
.map(DeletedObject::key)
.map(S3Storage::createUri)
.map(key -> createUri(key.replace(tenantId + "/", "")))
.toList();
} catch (AwsServiceException exception) {
throw new IOException(exception);
}
}

private String getPath(String tenantId, URI uri) {
return "/" + tenantId + uri.getPath();
}
private static URI createUri(String key) {
return URI.create("kestra://%s".formatted(key));
}
Expand Down
47 changes: 27 additions & 20 deletions src/test/java/io/kestra/storage/s3/S3StorageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ static void stopContainers() {
}
}

private URI putFile(URL resource, String path) throws Exception {
private URI putFile(String tenantId, URL resource, String path) throws Exception {

return storageInterface.put(
tenantId,
new URI(path),
new FileInputStream(Objects.requireNonNull(resource).getFile())
);
Expand All @@ -80,66 +82,70 @@ private URI putFile(URL resource, String path) throws Exception {
@Test
void get() throws Exception {
String prefix = IdUtils.create();
String tenantId = IdUtils.create();

URL resource = S3StorageTest.class.getClassLoader().getResource("application.yml");
String content = CharStreams.toString(new InputStreamReader(new FileInputStream(Objects.requireNonNull(resource).getFile())));

this.putFile(resource, "/" + prefix + "/storage/get.yml");
this.putFile(tenantId, resource, "/" + prefix + "/storage/get.yml");

URI item = new URI("/" + prefix + "/storage/get.yml");
InputStream get = storageInterface.get(item);
InputStream get = storageInterface.get(tenantId, item);
assertThat(CharStreams.toString(new InputStreamReader(get)), is(content));
assertTrue(storageInterface.exists(item));
assertThat(storageInterface.size(item), is((long) content.length()));
assertThat(storageInterface.lastModifiedTime(item), notNullValue());
assertTrue(storageInterface.exists(tenantId, item));
assertThat(storageInterface.size(tenantId, item), is((long) content.length()));
assertThat(storageInterface.lastModifiedTime(tenantId, item), notNullValue());

InputStream getScheme = storageInterface.get(new URI("kestra:///" + prefix + "/storage/get.yml"));
InputStream getScheme = storageInterface.get(tenantId, new URI("kestra:///" + prefix + "/storage/get.yml"));
assertThat(CharStreams.toString(new InputStreamReader(getScheme)), is(content));
}

@Test
void missing() {
String prefix = IdUtils.create();
String tenantId = IdUtils.create();

assertThrows(FileNotFoundException.class, () -> {
storageInterface.get(new URI("/" + prefix + "/storage/missing.yml"));
storageInterface.get(tenantId, new URI("/" + prefix + "/storage/missing.yml"));
});
}

@Test
void put() throws Exception {
String prefix = IdUtils.create();
String tenantId = IdUtils.create();

URL resource = S3StorageTest.class.getClassLoader().getResource("application.yml");
URI put = this.putFile(resource, "/" + prefix + "/storage/put.yml");
InputStream get = storageInterface.get(new URI("/" + prefix + "/storage/put.yml"));
URI put = this.putFile(tenantId, resource, "/" + prefix + "/storage/put.yml");
InputStream get = storageInterface.get(tenantId, new URI("/" + prefix + "/storage/put.yml"));

assertThat(put.toString(), is(new URI("kestra:///" + prefix + "/storage/put.yml").toString()));
assertThat(
CharStreams.toString(new InputStreamReader(get)),
is(CharStreams.toString(new InputStreamReader(new FileInputStream(Objects.requireNonNull(resource).getFile()))))
);

assertThat(storageInterface.size(new URI("/" + prefix + "/storage/put.yml")), is(74L));
assertThat(storageInterface.size(tenantId, new URI("/" + prefix + "/storage/put.yml")), is(74L));

assertThrows(FileNotFoundException.class, () -> {
assertThat(storageInterface.size(new URI("/" + prefix + "/storage/muissing.yml")), is(74L));
assertThat(storageInterface.size(tenantId, new URI("/" + prefix + "/storage/muissing.yml")), is(74L));
});

boolean delete = storageInterface.delete(put);
boolean delete = storageInterface.delete(tenantId, put);
assertThat(delete, is(true));

delete = storageInterface.delete(put);
delete = storageInterface.delete(tenantId, put);
assertThat(delete, is(false));

assertThrows(FileNotFoundException.class, () -> {
storageInterface.get(new URI("/" + prefix + "/storage/put.yml"));
storageInterface.get(tenantId, new URI("/" + prefix + "/storage/put.yml"));
});
}

@Test
void deleteByPrefix() throws Exception {
String prefix = IdUtils.create();
String tenantId = IdUtils.create();

URL resource = S3StorageTest.class.getClassLoader().getResource("application.yml");

Expand All @@ -149,29 +155,30 @@ void deleteByPrefix() throws Exception {
"/" + prefix + "/storage/level1/level2/1.yml"
);

path.forEach(throwConsumer(s -> this.putFile(resource, s)));
path.forEach(throwConsumer(s -> this.putFile(tenantId, resource, s)));

List<URI> deleted = storageInterface.deleteByPrefix(new URI("/" + prefix + "/storage/"));
List<URI> deleted = storageInterface.deleteByPrefix(tenantId, new URI("/" + prefix + "/storage/"));

assertThat(deleted, containsInAnyOrder(path.stream().map(s -> URI.create("kestra://" + s)).toArray()));

assertThrows(FileNotFoundException.class, () -> {
storageInterface.get(new URI("/" + prefix + "/storage/"));
storageInterface.get(tenantId, new URI("/" + prefix + "/storage/"));
});

path
.forEach(s -> {
assertThrows(FileNotFoundException.class, () -> {
storageInterface.get(new URI(s));
storageInterface.get(tenantId, new URI(s));
});
});
}

@Test
void deleteByPrefixNoResult() throws Exception {
String prefix = IdUtils.create();
String tenantId = IdUtils.create();

List<URI> deleted = storageInterface.deleteByPrefix(new URI("/" + prefix + "/storage/"));
List<URI> deleted = storageInterface.deleteByPrefix(tenantId, new URI("/" + prefix + "/storage/"));
assertThat(deleted.size(), is(0));
}
}