diff --git a/src/main/java/io/kestra/storage/s3/S3Storage.java b/src/main/java/io/kestra/storage/s3/S3Storage.java index a870316..249d06d 100644 --- a/src/main/java/io/kestra/storage/s3/S3Storage.java +++ b/src/main/java/io/kestra/storage/s3/S3Storage.java @@ -55,11 +55,11 @@ public String createBucket(String bucketName) throws IOException { } @Override - public InputStream get(URI uri) throws IOException { + public InputStream get(String tenantId, URI uri) throws IOException { try { GetObjectRequest request = GetObjectRequest.builder() .bucket(s3Config.getBucket()) - .key(uri.getPath()) + .key(getPath(tenantId, uri)) .build(); return s3Client.getObject(request); } catch (NoSuchKeyException exception) { @@ -70,11 +70,11 @@ public InputStream get(URI uri) throws IOException { } @Override - public Long size(URI uri) throws IOException { + public Long size(String tenantId, URI uri) throws IOException { try { HeadObjectRequest headObjectRequest = HeadObjectRequest.builder() .bucket(s3Config.getBucket()) - .key(uri.getPath()) + .key(getPath(tenantId, uri)) .build(); return s3Client.headObject(headObjectRequest).contentLength(); } catch (NoSuchKeyException exception) { @@ -85,11 +85,11 @@ public Long size(URI uri) throws IOException { } @Override - public Long lastModifiedTime(URI uri) throws IOException { + public Long lastModifiedTime(String tenantId, URI uri) throws IOException { try { HeadObjectRequest headObjectRequest = HeadObjectRequest.builder() .bucket(s3Config.getBucket()) - .key(uri.getPath()) + .key(getPath(tenantId, uri)) .build(); return s3Client.headObject(headObjectRequest).lastModified().getEpochSecond(); } catch (NoSuchKeyException exception) { @@ -100,13 +100,13 @@ public Long lastModifiedTime(URI uri) throws IOException { } @Override - public URI put(URI uri, InputStream data) throws IOException { + public URI put(String tenantId, URI uri, InputStream data) throws IOException { try { int length = data.available(); PutObjectRequest request = PutObjectRequest.builder() .bucket(s3Config.getBucket()) - .key(uri.getPath()) + .key(getPath(tenantId, uri)) .build(); Optional upload; @@ -132,17 +132,17 @@ public URI put(URI uri, InputStream data) throws IOException { } @Override - public boolean delete(URI uri) throws IOException { + public boolean delete(String tenantId, URI uri) throws IOException { try { try { - lastModifiedTime(uri); + lastModifiedTime(tenantId, uri); } catch (FileNotFoundException exception) { return false; } DeleteObjectRequest request = DeleteObjectRequest.builder() .bucket(s3Config.getBucket()) - .key(uri.getPath()) + .key(getPath(tenantId, uri)) .build(); return s3Client.deleteObject(request).sdkHttpResponse().isSuccessful(); @@ -154,10 +154,10 @@ public boolean delete(URI uri) throws IOException { } @Override - public List deleteByPrefix(URI storagePrefix) throws IOException { + public List deleteByPrefix(String tenantId, URI storagePrefix) throws IOException { ListObjectsRequest listRequest = ListObjectsRequest.builder() .bucket(s3Config.getBucket()) - .prefix(storagePrefix.getPath()) + .prefix(getPath(tenantId, storagePrefix)) .build(); ListObjectsResponse objectListing = s3Client.listObjects(listRequest); @@ -183,13 +183,16 @@ public List deleteByPrefix(URI storagePrefix) throws IOException { return result.deleted().stream() .map(DeletedObject::key) - .map(S3Storage::createUri) + .map(key -> createUri(key.replace(tenantId + "/", ""))) .toList(); } catch (AwsServiceException exception) { throw new IOException(exception); } } + private String getPath(String tenantId, URI uri) { + return "/" + tenantId + uri.getPath(); + } private static URI createUri(String key) { return URI.create("kestra://%s".formatted(key)); } diff --git a/src/test/java/io/kestra/storage/s3/S3StorageTest.java b/src/test/java/io/kestra/storage/s3/S3StorageTest.java index 16f8f69..0bf558d 100644 --- a/src/test/java/io/kestra/storage/s3/S3StorageTest.java +++ b/src/test/java/io/kestra/storage/s3/S3StorageTest.java @@ -70,8 +70,10 @@ static void stopContainers() { } } - private URI putFile(URL resource, String path) throws Exception { + private URI putFile(String tenantId, URL resource, String path) throws Exception { + return storageInterface.put( + tenantId, new URI(path), new FileInputStream(Objects.requireNonNull(resource).getFile()) ); @@ -80,39 +82,42 @@ private URI putFile(URL resource, String path) throws Exception { @Test void get() throws Exception { String prefix = IdUtils.create(); + String tenantId = IdUtils.create(); URL resource = S3StorageTest.class.getClassLoader().getResource("application.yml"); String content = CharStreams.toString(new InputStreamReader(new FileInputStream(Objects.requireNonNull(resource).getFile()))); - this.putFile(resource, "/" + prefix + "/storage/get.yml"); + this.putFile(tenantId, resource, "/" + prefix + "/storage/get.yml"); URI item = new URI("/" + prefix + "/storage/get.yml"); - InputStream get = storageInterface.get(item); + InputStream get = storageInterface.get(tenantId, item); assertThat(CharStreams.toString(new InputStreamReader(get)), is(content)); - assertTrue(storageInterface.exists(item)); - assertThat(storageInterface.size(item), is((long) content.length())); - assertThat(storageInterface.lastModifiedTime(item), notNullValue()); + assertTrue(storageInterface.exists(tenantId, item)); + assertThat(storageInterface.size(tenantId, item), is((long) content.length())); + assertThat(storageInterface.lastModifiedTime(tenantId, item), notNullValue()); - InputStream getScheme = storageInterface.get(new URI("kestra:///" + prefix + "/storage/get.yml")); + InputStream getScheme = storageInterface.get(tenantId, new URI("kestra:///" + prefix + "/storage/get.yml")); assertThat(CharStreams.toString(new InputStreamReader(getScheme)), is(content)); } @Test void missing() { String prefix = IdUtils.create(); + String tenantId = IdUtils.create(); assertThrows(FileNotFoundException.class, () -> { - storageInterface.get(new URI("/" + prefix + "/storage/missing.yml")); + storageInterface.get(tenantId, new URI("/" + prefix + "/storage/missing.yml")); }); } @Test void put() throws Exception { String prefix = IdUtils.create(); + String tenantId = IdUtils.create(); URL resource = S3StorageTest.class.getClassLoader().getResource("application.yml"); - URI put = this.putFile(resource, "/" + prefix + "/storage/put.yml"); - InputStream get = storageInterface.get(new URI("/" + prefix + "/storage/put.yml")); + URI put = this.putFile(tenantId, resource, "/" + prefix + "/storage/put.yml"); + InputStream get = storageInterface.get(tenantId, new URI("/" + prefix + "/storage/put.yml")); assertThat(put.toString(), is(new URI("kestra:///" + prefix + "/storage/put.yml").toString())); assertThat( @@ -120,26 +125,27 @@ void put() throws Exception { is(CharStreams.toString(new InputStreamReader(new FileInputStream(Objects.requireNonNull(resource).getFile())))) ); - assertThat(storageInterface.size(new URI("/" + prefix + "/storage/put.yml")), is(74L)); + assertThat(storageInterface.size(tenantId, new URI("/" + prefix + "/storage/put.yml")), is(74L)); assertThrows(FileNotFoundException.class, () -> { - assertThat(storageInterface.size(new URI("/" + prefix + "/storage/muissing.yml")), is(74L)); + assertThat(storageInterface.size(tenantId, new URI("/" + prefix + "/storage/muissing.yml")), is(74L)); }); - boolean delete = storageInterface.delete(put); + boolean delete = storageInterface.delete(tenantId, put); assertThat(delete, is(true)); - delete = storageInterface.delete(put); + delete = storageInterface.delete(tenantId, put); assertThat(delete, is(false)); assertThrows(FileNotFoundException.class, () -> { - storageInterface.get(new URI("/" + prefix + "/storage/put.yml")); + storageInterface.get(tenantId, new URI("/" + prefix + "/storage/put.yml")); }); } @Test void deleteByPrefix() throws Exception { String prefix = IdUtils.create(); + String tenantId = IdUtils.create(); URL resource = S3StorageTest.class.getClassLoader().getResource("application.yml"); @@ -149,20 +155,20 @@ void deleteByPrefix() throws Exception { "/" + prefix + "/storage/level1/level2/1.yml" ); - path.forEach(throwConsumer(s -> this.putFile(resource, s))); + path.forEach(throwConsumer(s -> this.putFile(tenantId, resource, s))); - List deleted = storageInterface.deleteByPrefix(new URI("/" + prefix + "/storage/")); + List deleted = storageInterface.deleteByPrefix(tenantId, new URI("/" + prefix + "/storage/")); assertThat(deleted, containsInAnyOrder(path.stream().map(s -> URI.create("kestra://" + s)).toArray())); assertThrows(FileNotFoundException.class, () -> { - storageInterface.get(new URI("/" + prefix + "/storage/")); + storageInterface.get(tenantId, new URI("/" + prefix + "/storage/")); }); path .forEach(s -> { assertThrows(FileNotFoundException.class, () -> { - storageInterface.get(new URI(s)); + storageInterface.get(tenantId, new URI(s)); }); }); } @@ -170,8 +176,9 @@ void deleteByPrefix() throws Exception { @Test void deleteByPrefixNoResult() throws Exception { String prefix = IdUtils.create(); + String tenantId = IdUtils.create(); - List deleted = storageInterface.deleteByPrefix(new URI("/" + prefix + "/storage/")); + List deleted = storageInterface.deleteByPrefix(tenantId, new URI("/" + prefix + "/storage/")); assertThat(deleted.size(), is(0)); } }