diff --git a/src/main/java/io/kestra/plugin/aws/lambda/Invoke.java b/src/main/java/io/kestra/plugin/aws/lambda/Invoke.java index 8884f4bf..e8bba246 100644 --- a/src/main/java/io/kestra/plugin/aws/lambda/Invoke.java +++ b/src/main/java/io/kestra/plugin/aws/lambda/Invoke.java @@ -197,10 +197,9 @@ void handleError(String functionArn, ContentType contentType, SdkBytes payload) } if (errorPayload != null && ContentType.APPLICATION_JSON.getMimeType().equals(contentType.getMimeType())) { - var errorMessage = readError(errorPayload); throw new LambdaInvokeException( "Lambda Invoke task responded with error for function: " + functionArn - + (errorMessage.isPresent() ? ". Error: " + errorMessage.get() : "")); + + ". Error: " + errorPayload); } else { throw new LambdaInvokeException( "Lambda Invoke task responded with error for function: " + functionArn); diff --git a/src/test/java/io/kestra/plugin/aws/lambda/AbstractInvokeTest.java b/src/test/java/io/kestra/plugin/aws/lambda/AbstractInvokeTest.java new file mode 100644 index 00000000..dcf582e9 --- /dev/null +++ b/src/test/java/io/kestra/plugin/aws/lambda/AbstractInvokeTest.java @@ -0,0 +1,91 @@ +package io.kestra.plugin.aws.lambda; + +import java.io.InputStream; +import java.util.stream.Collectors; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import io.kestra.core.runners.RunContextFactory; +import io.kestra.core.storages.StorageInterface; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.core.waiters.WaiterResponse; +import software.amazon.awssdk.services.lambda.LambdaClient; +import software.amazon.awssdk.services.lambda.model.CreateFunctionRequest; +import software.amazon.awssdk.services.lambda.model.CreateFunctionResponse; +import software.amazon.awssdk.services.lambda.model.FunctionCode; +import software.amazon.awssdk.services.lambda.model.GetFunctionRequest; +import software.amazon.awssdk.services.lambda.model.GetFunctionResponse; +import software.amazon.awssdk.services.lambda.model.Runtime; +import software.amazon.awssdk.services.lambda.waiters.LambdaWaiter; + +@MicronautTest +@Testcontainers +public class AbstractInvokeTest { + + static final String FUNCTION_NAME = "Test-Lambda"; + + static final String FUNCTION_ROLE_ARN = "arn:aws:iam::000000000000:role/lambda-role"; + + static final String FUNCTION_CODE = "lambda/test.py.zip"; + + protected static LocalStackContainer localstack; + + @Inject + protected RunContextFactory runContextFactory; + + @Inject + protected StorageInterface storageInterface; + + protected String functionArn; + + @BeforeAll + static void startLocalstack() { + localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:1.3.1")) + .withServices(LocalStackContainer.Service.LAMBDA); + localstack.start(); + } + + @AfterAll + static void stopLocalstack() { + if (localstack != null) { + localstack.stop(); + } + } + + void createFunction(LambdaClient client) { + if (client.listFunctions().functions().stream() + .filter(config -> config.functionName().equals(FUNCTION_NAME)) + .collect(Collectors.toList()).size() == 0) { + LambdaWaiter waiter = client.waiter(); + + InputStream is = getClass().getClassLoader().getResourceAsStream(FUNCTION_CODE); + + SdkBytes codeToUpload = SdkBytes.fromInputStream(is); + + FunctionCode code = FunctionCode.builder().zipFile(codeToUpload).build(); + + CreateFunctionRequest functionRequest = CreateFunctionRequest.builder() + .functionName(FUNCTION_NAME).description("Created by the Lambda Java API") + .code(code).handler("test.handler") + .role(FUNCTION_ROLE_ARN) + .runtime(Runtime.PYTHON3_9) + .build(); + + // Create a Lambda function using a waiter. + CreateFunctionResponse functionResponse = client.createFunction(functionRequest); + GetFunctionRequest getFunctionRequest = + GetFunctionRequest.builder().functionName(FUNCTION_NAME).build(); + WaiterResponse waiterResponse = + waiter.waitUntilFunctionExists(getFunctionRequest); + waiterResponse.matched().response().ifPresent(System.out::println); + // FYI ARN can be found as follows + //functionArn = functionResponse.functionArn(); + } + } + + +} diff --git a/src/test/java/io/kestra/plugin/aws/lambda/InvokeTest.java b/src/test/java/io/kestra/plugin/aws/lambda/InvokeTest.java new file mode 100644 index 00000000..c7e003d6 --- /dev/null +++ b/src/test/java/io/kestra/plugin/aws/lambda/InvokeTest.java @@ -0,0 +1,113 @@ +package io.kestra.plugin.aws.lambda; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.HashMap; +import java.util.Map; +import org.apache.http.entity.ContentType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.localstack.LocalStackContainer; +import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.runners.RunContext; + +public class InvokeTest extends AbstractInvokeTest { + + private RunContext context; + + @BeforeEach + public void setUp() throws IllegalVariableEvaluationException { + this.context = runContextFactory.of(); + } + + @Test + public void givenExistingLambda_whenInvoked_thenOutputOkMetricsOk() throws Exception { + // Given + var invoke = Invoke.builder() + .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.LAMBDA) + .toString()) + .functionArn(FUNCTION_NAME) + .id(InvokeTest.class.getSimpleName()) + .type(InvokeTest.class.getName()) + .region(localstack.getRegion()) + .accessKeyId(localstack.getAccessKey()) + .secretKeyId(localstack.getSecretKey()) + .build(); + + var client = invoke.client(context); + createFunction(client); + + // When + var output = invoke.run(context); + + // Then + assertNotNull(output.getUri(), "File URI should be present"); + assertEquals(ContentType.APPLICATION_JSON.getMimeType(), output.getContentType(), + "Output content type should be present"); + assertTrue(output.getContentLength() > 10, "Output content length should have a value"); + assertTrue( + context.metrics().stream().filter(m -> m.getName().equals("file.size")) + .map(m -> m.getValue()).findFirst().isPresent(), + "Metric file.size should be present"); + assertTrue( + context.metrics().stream().filter(m -> m.getName().equals("duration")) + .map(m -> m.getValue()).findFirst().isPresent(), + "Metric duration should be present"); + } + + @Test + public void givenNotFoundLambda_whenInvoked_thenErrorNoMetrics() throws Exception { + // Given + var invoke = Invoke.builder() + .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.LAMBDA) + .toString()) + .functionArn("Fake_ARN") + .id(InvokeTest.class.getSimpleName()) + .type(InvokeTest.class.getName()) + .region(localstack.getRegion()) + .accessKeyId(localstack.getAccessKey()) + .secretKeyId(localstack.getSecretKey()) + .build(); + + var client = invoke.client(context); + createFunction(client); + + // When + assertThrows(LambdaInvokeException.class, () -> invoke.run(context), + "Invokation should thrown an exception"); + + // Then + assertTrue(context.metrics().size() == 0, "Metrics should not be present"); + } + + @Test + public void givenFailingLambda_whenInvoked_thenFailureNoMetrics() throws Exception { + // Given + Map params = new HashMap<>(); + // ask for an error in the Lambda by function param (see test resource lambda/test.py) + params.put("action", "error"); + var invoke = Invoke.builder() + .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.LAMBDA) + .toString()) + .functionArn(FUNCTION_NAME).functionPayload(params) + .id(InvokeTest.class.getSimpleName()) + .type(InvokeTest.class.getName()) + .region(localstack.getRegion()) + .accessKeyId(localstack.getAccessKey()) + .secretKeyId(localstack.getSecretKey()) + .build(); + + var client = invoke.client(context); + createFunction(client); + + // When + assertThrows(LambdaInvokeException.class, () -> invoke.run(context), + "Invokation should fail"); + + // Then + assertTrue(context.metrics().size() == 0, "Metrics should not be present"); + } + +} diff --git a/src/test/resources/lambda/test.py b/src/test/resources/lambda/test.py new file mode 100644 index 00000000..9c44ab11 --- /dev/null +++ b/src/test/resources/lambda/test.py @@ -0,0 +1,20 @@ +import logging + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +def handler(event, context): + print("Lambda function ARN:", context.invoked_function_arn) + action = event.get("action") + if action is not None: + if action == "error": + logger.info("Will throw an Exception. Action: " + action) + raise Exception("Error for client tests") + logger.info("Normal work - Unknow action: " + action) + else: + logger.info("Normal work - All OK!") + + return { + "message": "All OK!", + "action": action + } \ No newline at end of file diff --git a/src/test/resources/lambda/test.py.zip b/src/test/resources/lambda/test.py.zip new file mode 100644 index 00000000..ab4a4b1c Binary files /dev/null and b/src/test/resources/lambda/test.py.zip differ