diff --git a/build.gradle b/build.gradle index b961f62..45a5753 100644 --- a/build.gradle +++ b/build.gradle @@ -46,19 +46,13 @@ dependencies { annotationProcessor "org.projectlombok:lombok:$lombokVersion" compileOnly "org.projectlombok:lombok:$lombokVersion" - // micronaut - annotationProcessor platform("io.micronaut.platform:micronaut-platform:$micronautVersion") - annotationProcessor "io.micronaut:micronaut-inject-java" - annotationProcessor "io.micronaut.validation:micronaut-validation-processor" - - compileOnly platform("io.micronaut.platform:micronaut-platform:$micronautVersion") - compileOnly "io.micronaut:micronaut-inject" - compileOnly "io.micronaut.validation:micronaut-validation" - // kestra compileOnly group: "io.kestra", name: "core", version: kestraVersion annotationProcessor group: "io.kestra", name: "processor", version: kestraVersion + // Logs + compileOnly'org.slf4j:slf4j-api:2.0.13' + // libs api platform('com.google.cloud:libraries-bom:26.37.0') api 'com.google.cloud:google-cloud-storage' diff --git a/src/main/java/io/kestra/storage/gcs/GcsClientFactory.java b/src/main/java/io/kestra/storage/gcs/GcsClientFactory.java index b4ce251..3610d78 100644 --- a/src/main/java/io/kestra/storage/gcs/GcsClientFactory.java +++ b/src/main/java/io/kestra/storage/gcs/GcsClientFactory.java @@ -4,17 +4,13 @@ import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; -import io.micronaut.context.annotation.Factory; import lombok.SneakyThrows; import java.io.ByteArrayInputStream; -import jakarta.inject.Singleton; -@Factory -@GcsStorageEnabled public class GcsClientFactory { @SneakyThrows - protected GoogleCredentials credentials(GcsConfig config) { + protected static GoogleCredentials credentials(final GcsConfig config) { GoogleCredentials credentials; if (config.getServiceAccount() != null) { @@ -24,15 +20,13 @@ protected GoogleCredentials credentials(GcsConfig config) { } else { credentials = GoogleCredentials.getApplicationDefault(); } - return credentials; } - @Singleton - public Storage of(GcsConfig config) { + public static Storage of(final GcsConfig config) { return StorageOptions .newBuilder() - .setCredentials(this.credentials(config)) + .setCredentials(credentials(config)) .setProjectId(config.getProjectId()) .build() .getService(); diff --git a/src/main/java/io/kestra/storage/gcs/GcsConfig.java b/src/main/java/io/kestra/storage/gcs/GcsConfig.java index 27a9874..12110f9 100644 --- a/src/main/java/io/kestra/storage/gcs/GcsConfig.java +++ b/src/main/java/io/kestra/storage/gcs/GcsConfig.java @@ -1,17 +1,19 @@ package io.kestra.storage.gcs; -import io.micronaut.context.annotation.ConfigurationProperties; -import lombok.Getter; +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; -import jakarta.inject.Singleton; +public interface GcsConfig { -@Singleton -@Getter -@ConfigurationProperties("kestra.storage.gcs") -public class GcsConfig { - String bucket; + @JsonProperty + @NotNull + @NotBlank + String getBucket(); - String serviceAccount; + @JsonProperty + String getServiceAccount(); - String projectId; + @JsonProperty + String getProjectId(); } diff --git a/src/main/java/io/kestra/storage/gcs/GcsStorage.java b/src/main/java/io/kestra/storage/gcs/GcsStorage.java index 6a6f2eb..2c1f517 100644 --- a/src/main/java/io/kestra/storage/gcs/GcsStorage.java +++ b/src/main/java/io/kestra/storage/gcs/GcsStorage.java @@ -5,7 +5,6 @@ import com.google.cloud.storage.*; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.storages.FileAttributes; -import io.micronaut.core.annotation.Introspected; import io.kestra.core.storages.StorageInterface; import java.io.File; @@ -21,23 +20,52 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import jakarta.inject.Inject; -import jakarta.inject.Singleton; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; import lombok.NoArgsConstructor; +import lombok.extern.jackson.Jacksonized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static io.kestra.core.utils.Rethrow.throwFunction; -@Singleton -@GcsStorageEnabled -@Introspected -@Plugin +@AllArgsConstructor @NoArgsConstructor -public class GcsStorage implements StorageInterface { - @Inject - Storage storage; +@Builder +@Jacksonized +@Getter +@Plugin +@Plugin.Id("gcs") +public class GcsStorage implements StorageInterface, GcsConfig { + + private static final Logger log = LoggerFactory.getLogger(GcsStorage.class); + + private String bucket; + + private String serviceAccount; + + private String projectId; + + private Storage storage; - @Inject - GcsConfig config; + /** {@inheritDoc} **/ + @Override + public void init() { + this.storage = GcsClientFactory.of(this); + } + + /** {@inheritDoc} **/ + @Override + public void close() { + if (this.storage != null) { + try { + this.storage.close(); + } catch (Exception e) { + log.warn("Failed to close GcsStorage", e); + } + } + } private BlobId blob(String tenantId, URI uri) { String path = getPath(tenantId, uri); @@ -45,7 +73,7 @@ private BlobId blob(String tenantId, URI uri) { } private BlobId blob(String path) { - return BlobId.of(this.config.getBucket(), path); + return BlobId.of(bucket, path); } private String getPath(String tenantId, URI uri) { @@ -117,7 +145,7 @@ private Stream blobsForPrefix(String prefix, boolean recursive, boolean in Stream.of(Storage.BlobListOption.prefix(prefix)), recursive ? Stream.empty() : Stream.of(Storage.BlobListOption.currentDirectory()) ).toArray(Storage.BlobListOption[]::new); - Page blobs = this.storage.list(config.bucket, blobListOptions); + Page blobs = this.storage.list(bucket, blobListOptions); return blobs.streamAll() .filter(blob -> { String key = blob.getName().substring(prefix.length()); @@ -196,7 +224,7 @@ private void mkdirs(String path) { } // check if it exists before creating it - Page pathBlob = this.storage.list(this.config.getBucket(), Storage.BlobListOption.prefix(path), Storage.BlobListOption.pageSize(1)); + Page pathBlob = this.storage.list(bucket, Storage.BlobListOption.prefix(path), Storage.BlobListOption.pageSize(1)); if(pathBlob != null && pathBlob.hasNextPage()) { return; } @@ -207,7 +235,7 @@ private void mkdirs(String path) { for (int i = 1; i < directories.length; i++) { aggregatedPath.append(directories[i]).append("/"); // check if it exists before creating it - Page currentDir = this.storage.list(this.config.getBucket(), Storage.BlobListOption.prefix(aggregatedPath.toString()), Storage.BlobListOption.pageSize(1)); + Page currentDir = this.storage.list(bucket, Storage.BlobListOption.prefix(aggregatedPath.toString()), Storage.BlobListOption.pageSize(1)); if(currentDir != null && currentDir.hasNextPage()) { continue; } @@ -262,7 +290,7 @@ public URI move(String tenantId, URI from, URI to) throws IOException { // move directories String prefix = (!path.endsWith("/")) ? path + "/" : path; - Page list = this.storage.list(config.bucket, Storage.BlobListOption.prefix(prefix)); + Page list = this.storage.list(bucket, Storage.BlobListOption.prefix(prefix)); list.streamAll().forEach(blob -> { BlobId target = blob(getPath(tenantId, to) + "/" + blob.getName().substring(prefix.length())); moveFile(blob.getBlobId(), target, batch); @@ -286,9 +314,7 @@ public List deleteByPrefix(String tenantId, URI storagePrefix) throws IOExc String prefix = getPath(tenantId, storagePrefix); Page blobs = this.storage - .list(this.config.getBucket(), - Storage.BlobListOption.prefix(prefix) - ); + .list(bucket, Storage.BlobListOption.prefix(prefix)); for (Blob blob : blobs.iterateAll()) { results.put(URI.create("kestra://" + blob.getBlobId().getName().replace(tenantId + "/", "").replaceAll("/$", "")), batch.delete(blob.getBlobId())); diff --git a/src/main/java/io/kestra/storage/gcs/GcsStorageEnabled.java b/src/main/java/io/kestra/storage/gcs/GcsStorageEnabled.java deleted file mode 100644 index cd981af..0000000 --- a/src/main/java/io/kestra/storage/gcs/GcsStorageEnabled.java +++ /dev/null @@ -1,12 +0,0 @@ -package io.kestra.storage.gcs; - -import io.micronaut.context.annotation.Requires; - -import java.lang.annotation.*; - -@Documented -@Retention(RetentionPolicy.RUNTIME) -@Target({ElementType.PACKAGE, ElementType.TYPE}) -@Requires(property = "kestra.storage.type", value = "gcs") -public @interface GcsStorageEnabled { -}