Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gcp): add gcp function http #447

Merged
merged 2 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,14 @@ dependencies {
annotationProcessor "org.projectlombok:lombok"
compileOnly "org.projectlombok:lombok"

// micronaut
// Micronaut
compileOnly "io.micronaut:micronaut-http-client"
compileOnly "io.micronaut.reactor:micronaut-reactor"
compileOnly "io.micronaut:micronaut-jackson-databind"

compileOnly group: 'com.fasterxml.jackson.module', name: 'jackson-module-parameter-names'
compileOnly group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-guava'
compileOnly group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310'

// kestra
annotationProcessor group: "io.kestra", name: "processor", version: kestraVersion
Expand Down
150 changes: 150 additions & 0 deletions src/main/java/io/kestra/plugin/gcp/function/HttpFunction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package io.kestra.plugin.gcp.function;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.auth.oauth2.IdTokenCredentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.gcp.AbstractTask;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpHeaders;
import io.micronaut.http.HttpMethod;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.client.DefaultHttpClientConfiguration;
import io.micronaut.http.client.HttpClient;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import io.micronaut.http.client.netty.NettyHttpClientFactory;
import io.micronaut.http.codec.MediaTypeCodecRegistry;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;
import reactor.core.publisher.Mono;

import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Trigger Google Cloud Run Function.",
description = "Use this task to trigger an Cloud Run Function and collect the result if any"
)
@Plugin(examples = {
@Example(
full = true,
code = """
id: test_gcp_function
namespace: com.company.test.gcp

tasks:
- id: get_hello_json
type: io.kestra.plugin.gcp.function.HttpFunction
httpMethod: GET
url: https://my-function.europe-west9.run.app
"""
)
})
public class HttpFunction extends AbstractTask implements RunnableTask<HttpFunction.Output> {
private static final Duration HTTP_READ_TIMEOUT = Duration.ofSeconds(60);
private static final NettyHttpClientFactory FACTORY = new NettyHttpClientFactory();

@Schema(title = "HTTP method")
@NotNull
protected Property<String> httpMethod;

@Schema(title = "GCP Function URL")
@NotNull
protected Property<String> url;

@Schema(
title = "HTTP body",
description = "JSON body of the Azure function"
)
@Builder.Default
protected Property<Map<String, Object>> httpBody = Property.of(Collections.emptyMap());

@Schema(
title = "Max duration",
description = "The maximum duration the task should wait until the Azure Function completion."
)
@Builder.Default
@PluginProperty(dynamic = true)
protected Duration maxDuration = Duration.ofMinutes(60);

@Override
public Output run(RunContext runContext) throws Exception {
IdTokenCredentials idTokenCredentials = IdTokenCredentials.newBuilder()
.setIdTokenProvider((ServiceAccountCredentials) this.credentials(runContext).createScoped(runContext.render(this.scopes)))
.setTargetAudience(this.url.as(runContext, String.class))
.build();

String token = idTokenCredentials.refreshAccessToken().getTokenValue();

try (HttpClient client = this.client(runContext)) {
Mono<HttpResponse> mono = Mono.from(client.exchange(HttpRequest
.create(
HttpMethod.valueOf(httpMethod.as(runContext, String.class)),
url.as(runContext, String.class)
).body(httpBody.asMap(runContext, String.class, Object.class))
.headers(Map.of(HttpHeaders.AUTHORIZATION, "Bearer " + token)),
Argument.of(String.class))
);
HttpResponse result = maxDuration != null ? mono.block(maxDuration) : mono.block();
String body = result != null && result.getBody().isPresent() ? (String) result.getBody().get() : "";
try {
ObjectMapper mapper = new ObjectMapper();
return Output.builder()
.responseBody(mapper.readTree(body))
.build();
} catch (Exception e) {
return Output.builder()
.responseBody(body)
.build();
}
} catch (HttpClientResponseException e) {
throw new HttpClientResponseException(
"Request failed '" + e.getStatus().getCode() + "' and body '" + e.getResponse().getBody(String.class).orElse("null") + "'",
e,
e.getResponse()
);
} catch (IllegalVariableEvaluationException | MalformedURLException | URISyntaxException e) {
throw new RuntimeException(e);
}
}

@Getter
@Builder
static class Output implements io.kestra.core.models.tasks.Output {
@Schema(title = "GCP Function response body")
private Object responseBody;
}

protected HttpClient client(RunContext runContext) throws IllegalVariableEvaluationException, MalformedURLException, URISyntaxException {
MediaTypeCodecRegistry mediaTypeCodecRegistry = ((DefaultRunContext)runContext).getApplicationContext().getBean(MediaTypeCodecRegistry.class);

var httpConfig = new DefaultHttpClientConfiguration();
httpConfig.setMaxContentLength(Integer.MAX_VALUE);
httpConfig.setReadTimeout(HTTP_READ_TIMEOUT);

DefaultHttpClient client = (DefaultHttpClient) FACTORY.createClient(URI.create(url.as(runContext, String.class)).toURL(), httpConfig);
client.setMediaTypeCodecRegistry(mediaTypeCodecRegistry);

return client;
}
}
9 changes: 9 additions & 0 deletions src/main/java/io/kestra/plugin/gcp/function/package-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
@PluginSubGroup(
title = "Google Cloud Function",
description = "This sub-group of plugins contains tasks for triggering Google Cloud Function.\n" +
"Cloud Run functions automatically manages and scales underlying infrastructure with the size of workload. Deploy your code and let Google run and scale it for you. ",
categories = { PluginSubGroup.PluginCategory.TOOL, PluginSubGroup.PluginCategory.CLOUD }
)
package io.kestra.plugin.gcp.function;

import io.kestra.core.models.annotations.PluginSubGroup;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.micronaut.http.client.netty.NettyHttpClientFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.micronaut.http.client.netty.NettyHttpClientFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.micronaut.jackson.databind.JacksonDatabindMapperSupplier
1 change: 1 addition & 0 deletions src/main/resources/icons/io.kestra.plugin.gcp.function.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
39 changes: 39 additions & 0 deletions src/test/java/io/kestra/plugin/gcp/function/HttpFunctionTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.kestra.plugin.gcp.function;

import com.fasterxml.jackson.databind.JsonNode;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.util.Collections;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

@KestraTest
class HttpFunctionTest {
private static final String GCP_FUNCTION_TEST_URL = "";
@Inject
private RunContextFactory runContextFactory;

@Disabled("Disabled with CI/CD, to run the test provide a GCP Function URL")
@Test
void testAzureFunctionWithStringOutput() throws Exception {
HttpFunction httpTrigger = HttpFunction.builder()
.url(Property.of(GCP_FUNCTION_TEST_URL + "?firstName=Bryan&name=Smith"))
.httpMethod(Property.of("GET"))
.build();

RunContext runContext = runContextFactory.of(Collections.emptyMap());

HttpFunction.Output functionOutput = httpTrigger.run(runContext);

JsonNode objectResult = (JsonNode) functionOutput.getResponseBody();
assertThat(objectResult.get("firstName").asText(), is("Bryan"));
assertThat(objectResult.get("name").asText(), is("Smith"));
}
}