diff --git a/build.gradle b/build.gradle index c7d54932..10f2e2fa 100644 --- a/build.gradle +++ b/build.gradle @@ -52,8 +52,14 @@ dependencies { annotationProcessor "org.projectlombok:lombok" compileOnly "org.projectlombok:lombok" - // micronaut + // Micronaut + compileOnly "io.micronaut:micronaut-http-client" compileOnly "io.micronaut.reactor:micronaut-reactor" + compileOnly "io.micronaut:micronaut-jackson-databind" + + compileOnly group: 'com.fasterxml.jackson.module', name: 'jackson-module-parameter-names' + compileOnly group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-guava' + compileOnly group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310' // kestra annotationProcessor group: "io.kestra", name: "processor", version: kestraVersion diff --git a/src/main/java/io/kestra/plugin/gcp/function/HttpFunction.java b/src/main/java/io/kestra/plugin/gcp/function/HttpFunction.java new file mode 100644 index 00000000..1201f414 --- /dev/null +++ b/src/main/java/io/kestra/plugin/gcp/function/HttpFunction.java @@ -0,0 +1,150 @@ +package io.kestra.plugin.gcp.function; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.auth.oauth2.IdTokenCredentials; +import com.google.auth.oauth2.ServiceAccountCredentials; +import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.runners.DefaultRunContext; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.gcp.AbstractTask; +import io.micronaut.core.type.Argument; +import io.micronaut.http.HttpHeaders; +import io.micronaut.http.HttpMethod; +import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.client.DefaultHttpClientConfiguration; +import io.micronaut.http.client.HttpClient; +import io.micronaut.http.client.exceptions.HttpClientResponseException; +import io.micronaut.http.client.netty.DefaultHttpClient; +import io.micronaut.http.client.netty.NettyHttpClientFactory; +import io.micronaut.http.codec.MediaTypeCodecRegistry; +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotNull; +import lombok.*; +import lombok.experimental.SuperBuilder; +import reactor.core.publisher.Mono; + +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.Collections; +import java.util.Map; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Schema( + title = "Trigger Google Cloud Run Function.", + description = "Use this task to trigger an Cloud Run Function and collect the result if any" +) +@Plugin(examples = { + @Example( + full = true, + code = """ + id: test_gcp_function + namespace: com.company.test.gcp + + tasks: + - id: get_hello_json + type: io.kestra.plugin.gcp.function.HttpFunction + httpMethod: GET + url: https://my-function.europe-west9.run.app + """ + ) +}) +public class HttpFunction extends AbstractTask implements RunnableTask { + private static final Duration HTTP_READ_TIMEOUT = Duration.ofSeconds(60); + private static final NettyHttpClientFactory FACTORY = new NettyHttpClientFactory(); + + @Schema(title = "HTTP method") + @NotNull + protected Property httpMethod; + + @Schema(title = "GCP Function URL") + @NotNull + protected Property url; + + @Schema( + title = "HTTP body", + description = "JSON body of the Azure function" + ) + @Builder.Default + protected Property> httpBody = Property.of(Collections.emptyMap()); + + @Schema( + title = "Max duration", + description = "The maximum duration the task should wait until the Azure Function completion." + ) + @Builder.Default + @PluginProperty(dynamic = true) + protected Duration maxDuration = Duration.ofMinutes(60); + + @Override + public Output run(RunContext runContext) throws Exception { + IdTokenCredentials idTokenCredentials = IdTokenCredentials.newBuilder() + .setIdTokenProvider((ServiceAccountCredentials) this.credentials(runContext).createScoped(runContext.render(this.scopes))) + .setTargetAudience(this.url.as(runContext, String.class)) + .build(); + + String token = idTokenCredentials.refreshAccessToken().getTokenValue(); + + try (HttpClient client = this.client(runContext)) { + Mono mono = Mono.from(client.exchange(HttpRequest + .create( + HttpMethod.valueOf(httpMethod.as(runContext, String.class)), + url.as(runContext, String.class) + ).body(httpBody.asMap(runContext, String.class, Object.class)) + .headers(Map.of(HttpHeaders.AUTHORIZATION, "Bearer " + token)), + Argument.of(String.class)) + ); + HttpResponse result = maxDuration != null ? mono.block(maxDuration) : mono.block(); + String body = result != null && result.getBody().isPresent() ? (String) result.getBody().get() : ""; + try { + ObjectMapper mapper = new ObjectMapper(); + return Output.builder() + .responseBody(mapper.readTree(body)) + .build(); + } catch (Exception e) { + return Output.builder() + .responseBody(body) + .build(); + } + } catch (HttpClientResponseException e) { + throw new HttpClientResponseException( + "Request failed '" + e.getStatus().getCode() + "' and body '" + e.getResponse().getBody(String.class).orElse("null") + "'", + e, + e.getResponse() + ); + } catch (IllegalVariableEvaluationException | MalformedURLException | URISyntaxException e) { + throw new RuntimeException(e); + } + } + + @Getter + @Builder + static class Output implements io.kestra.core.models.tasks.Output { + @Schema(title = "GCP Function response body") + private Object responseBody; + } + + protected HttpClient client(RunContext runContext) throws IllegalVariableEvaluationException, MalformedURLException, URISyntaxException { + MediaTypeCodecRegistry mediaTypeCodecRegistry = ((DefaultRunContext)runContext).getApplicationContext().getBean(MediaTypeCodecRegistry.class); + + var httpConfig = new DefaultHttpClientConfiguration(); + httpConfig.setMaxContentLength(Integer.MAX_VALUE); + httpConfig.setReadTimeout(HTTP_READ_TIMEOUT); + + DefaultHttpClient client = (DefaultHttpClient) FACTORY.createClient(URI.create(url.as(runContext, String.class)).toURL(), httpConfig); + client.setMediaTypeCodecRegistry(mediaTypeCodecRegistry); + + return client; + } +} diff --git a/src/main/java/io/kestra/plugin/gcp/function/package-info.java b/src/main/java/io/kestra/plugin/gcp/function/package-info.java new file mode 100644 index 00000000..5490c1dd --- /dev/null +++ b/src/main/java/io/kestra/plugin/gcp/function/package-info.java @@ -0,0 +1,9 @@ +@PluginSubGroup( + title = "Google Cloud Function", + description = "This sub-group of plugins contains tasks for triggering Google Cloud Function.\n" + + "Cloud Run functions automatically manages and scales underlying infrastructure with the size of workload. Deploy your code and let Google run and scale it for you. ", + categories = { PluginSubGroup.PluginCategory.TOOL, PluginSubGroup.PluginCategory.CLOUD } +) +package io.kestra.plugin.gcp.function; + +import io.kestra.core.models.annotations.PluginSubGroup; \ No newline at end of file diff --git a/src/main/resources/META-INF/services/com.fasterxml.jackson.databind.Module b/src/main/resources/META-INF/services/com.fasterxml.jackson.databind.Module new file mode 100644 index 00000000..62e3d566 --- /dev/null +++ b/src/main/resources/META-INF/services/com.fasterxml.jackson.databind.Module @@ -0,0 +1 @@ +com.fasterxml.jackson.datatype.jsr310.JavaTimeModule diff --git a/src/main/resources/META-INF/services/io.micronaut.http.client.HttpClientFactory b/src/main/resources/META-INF/services/io.micronaut.http.client.HttpClientFactory new file mode 100644 index 00000000..c75f8ca1 --- /dev/null +++ b/src/main/resources/META-INF/services/io.micronaut.http.client.HttpClientFactory @@ -0,0 +1 @@ +io.micronaut.http.client.netty.NettyHttpClientFactory \ No newline at end of file diff --git a/src/main/resources/META-INF/services/io.micronaut.http.client.StreamingHttpClientFactory b/src/main/resources/META-INF/services/io.micronaut.http.client.StreamingHttpClientFactory new file mode 100644 index 00000000..c75f8ca1 --- /dev/null +++ b/src/main/resources/META-INF/services/io.micronaut.http.client.StreamingHttpClientFactory @@ -0,0 +1 @@ +io.micronaut.http.client.netty.NettyHttpClientFactory \ No newline at end of file diff --git a/src/main/resources/META-INF/services/io.micronaut.json.JsonMapperSupplier b/src/main/resources/META-INF/services/io.micronaut.json.JsonMapperSupplier new file mode 100644 index 00000000..e99fd966 --- /dev/null +++ b/src/main/resources/META-INF/services/io.micronaut.json.JsonMapperSupplier @@ -0,0 +1 @@ +io.micronaut.jackson.databind.JacksonDatabindMapperSupplier \ No newline at end of file diff --git a/src/main/resources/icons/io.kestra.plugin.gcp.function.svg b/src/main/resources/icons/io.kestra.plugin.gcp.function.svg new file mode 100644 index 00000000..9313796b --- /dev/null +++ b/src/main/resources/icons/io.kestra.plugin.gcp.function.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/src/test/java/io/kestra/plugin/gcp/function/HttpFunctionTest.java b/src/test/java/io/kestra/plugin/gcp/function/HttpFunctionTest.java new file mode 100644 index 00000000..9b8c195a --- /dev/null +++ b/src/test/java/io/kestra/plugin/gcp/function/HttpFunctionTest.java @@ -0,0 +1,39 @@ +package io.kestra.plugin.gcp.function; + +import com.fasterxml.jackson.databind.JsonNode; +import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.models.property.Property; +import io.kestra.core.runners.RunContext; +import io.kestra.core.runners.RunContextFactory; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +@KestraTest +class HttpFunctionTest { + private static final String GCP_FUNCTION_TEST_URL = ""; + @Inject + private RunContextFactory runContextFactory; + + @Disabled("Disabled with CI/CD, to run the test provide a GCP Function URL") + @Test + void testAzureFunctionWithStringOutput() throws Exception { + HttpFunction httpTrigger = HttpFunction.builder() + .url(Property.of(GCP_FUNCTION_TEST_URL + "?firstName=Bryan&name=Smith")) + .httpMethod(Property.of("GET")) + .build(); + + RunContext runContext = runContextFactory.of(Collections.emptyMap()); + + HttpFunction.Output functionOutput = httpTrigger.run(runContext); + + JsonNode objectResult = (JsonNode) functionOutput.getResponseBody(); + assertThat(objectResult.get("firstName").asText(), is("Bryan")); + assertThat(objectResult.get("name").asText(), is("Smith")); + } +} \ No newline at end of file