Skip to content

Commit

Permalink
chore: migrate to new StorageInterface
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Apr 25, 2024
1 parent 3a6f71e commit 3f8e40d
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 48 deletions.
10 changes: 3 additions & 7 deletions src/main/java/io/kestra/storage/gcs/GcsClientFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@
import java.io.ByteArrayInputStream;
import jakarta.inject.Singleton;

@Factory
@GcsStorageEnabled
public class GcsClientFactory {
@SneakyThrows
protected GoogleCredentials credentials(GcsConfig config) {
protected static GoogleCredentials credentials(final GcsConfig config) {
GoogleCredentials credentials;

if (config.getServiceAccount() != null) {
Expand All @@ -24,15 +22,13 @@ protected GoogleCredentials credentials(GcsConfig config) {
} else {
credentials = GoogleCredentials.getApplicationDefault();
}

return credentials;
}

@Singleton
public Storage of(GcsConfig config) {
public static Storage of(final GcsConfig config) {
return StorageOptions
.newBuilder()
.setCredentials(this.credentials(config))
.setCredentials(credentials(config))
.setProjectId(config.getProjectId())
.build()
.getService();
Expand Down
22 changes: 12 additions & 10 deletions src/main/java/io/kestra/storage/gcs/GcsConfig.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package io.kestra.storage.gcs;

import io.micronaut.context.annotation.ConfigurationProperties;
import lombok.Getter;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;

import jakarta.inject.Singleton;
public interface GcsConfig {

@Singleton
@Getter
@ConfigurationProperties("kestra.storage.gcs")
public class GcsConfig {
String bucket;
@JsonProperty
@NotNull
@NotBlank
String getBucket();

String serviceAccount;
@JsonProperty
String getServiceAccount();

String projectId;
@JsonProperty
String getProjectId();
}
65 changes: 46 additions & 19 deletions src/main/java/io/kestra/storage/gcs/GcsStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.google.cloud.storage.*;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.storages.FileAttributes;
import io.micronaut.core.annotation.Introspected;
import io.kestra.core.storages.StorageInterface;

import java.io.File;
Expand All @@ -22,30 +21,60 @@
import java.util.stream.Stream;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.jackson.Jacksonized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.kestra.core.utils.Rethrow.throwFunction;

@Singleton
@GcsStorageEnabled
@Introspected
@Plugin
@AllArgsConstructor
@NoArgsConstructor
public class GcsStorage implements StorageInterface {
@Inject
Storage storage;
@Builder
@Jacksonized
@Getter
@Plugin
@Plugin.Id("gcs")
public class GcsStorage implements StorageInterface, GcsConfig {

private static final Logger log = LoggerFactory.getLogger(GcsStorage.class);

private String bucket;

private String serviceAccount;

private String projectId;

private Storage storage;

@Inject
GcsConfig config;
/** {@inheritDoc} **/
@Override
public void init() {
this.storage = GcsClientFactory.of(this);
}

/** {@inheritDoc} **/
@Override
public void close() {
if (this.storage != null) {
try {
this.storage.close();
} catch (Exception e) {
log.warn("Failed to close GcsStorage", e);
}
}
}

private BlobId blob(String tenantId, URI uri) {
String path = getPath(tenantId, uri);
return blob(path);
}

private BlobId blob(String path) {
return BlobId.of(this.config.getBucket(), path);
return BlobId.of(bucket, path);
}

private String getPath(String tenantId, URI uri) {
Expand Down Expand Up @@ -117,7 +146,7 @@ private Stream<Blob> blobsForPrefix(String prefix, boolean recursive, boolean in
Stream.of(Storage.BlobListOption.prefix(prefix)),
recursive ? Stream.empty() : Stream.of(Storage.BlobListOption.currentDirectory())
).toArray(Storage.BlobListOption[]::new);
Page<Blob> blobs = this.storage.list(config.bucket, blobListOptions);
Page<Blob> blobs = this.storage.list(bucket, blobListOptions);
return blobs.streamAll()
.filter(blob -> {
String key = blob.getName().substring(prefix.length());
Expand Down Expand Up @@ -196,7 +225,7 @@ private void mkdirs(String path) {
}

// check if it exists before creating it
Page<Blob> pathBlob = this.storage.list(this.config.getBucket(), Storage.BlobListOption.prefix(path), Storage.BlobListOption.pageSize(1));
Page<Blob> pathBlob = this.storage.list(bucket, Storage.BlobListOption.prefix(path), Storage.BlobListOption.pageSize(1));
if(pathBlob != null && pathBlob.hasNextPage()) {
return;
}
Expand All @@ -207,7 +236,7 @@ private void mkdirs(String path) {
for (int i = 1; i < directories.length; i++) {
aggregatedPath.append(directories[i]).append("/");
// check if it exists before creating it
Page<Blob> currentDir = this.storage.list(this.config.getBucket(), Storage.BlobListOption.prefix(aggregatedPath.toString()), Storage.BlobListOption.pageSize(1));
Page<Blob> currentDir = this.storage.list(bucket, Storage.BlobListOption.prefix(aggregatedPath.toString()), Storage.BlobListOption.pageSize(1));
if(currentDir != null && currentDir.hasNextPage()) {
continue;
}
Expand Down Expand Up @@ -262,7 +291,7 @@ public URI move(String tenantId, URI from, URI to) throws IOException {
// move directories
String prefix = (!path.endsWith("/")) ? path + "/" : path;

Page<Blob> list = this.storage.list(config.bucket, Storage.BlobListOption.prefix(prefix));
Page<Blob> list = this.storage.list(bucket, Storage.BlobListOption.prefix(prefix));
list.streamAll().forEach(blob -> {
BlobId target = blob(getPath(tenantId, to) + "/" + blob.getName().substring(prefix.length()));
moveFile(blob.getBlobId(), target, batch);
Expand All @@ -286,9 +315,7 @@ public List<URI> deleteByPrefix(String tenantId, URI storagePrefix) throws IOExc
String prefix = getPath(tenantId, storagePrefix);

Page<Blob> blobs = this.storage
.list(this.config.getBucket(),
Storage.BlobListOption.prefix(prefix)
);
.list(bucket, Storage.BlobListOption.prefix(prefix));

for (Blob blob : blobs.iterateAll()) {
results.put(URI.create("kestra://" + blob.getBlobId().getName().replace(tenantId + "/", "").replaceAll("/$", "")), batch.delete(blob.getBlobId()));
Expand Down
12 changes: 0 additions & 12 deletions src/main/java/io/kestra/storage/gcs/GcsStorageEnabled.java

This file was deleted.

0 comments on commit 3f8e40d

Please sign in to comment.