Skip to content

Commit

Permalink
Improved Invoke error handling (show full error as is to the user); A…
Browse files Browse the repository at this point in the history
…dded integration tests within LocalStack/Lambda
  • Loading branch information
pnedonosko committed Sep 12, 2023
1 parent aee71f6 commit 09e6492
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 2 deletions.
3 changes: 1 addition & 2 deletions src/main/java/io/kestra/plugin/aws/lambda/Invoke.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,9 @@ void handleError(String functionArn, ContentType contentType, SdkBytes payload)
}
if (errorPayload != null
&& ContentType.APPLICATION_JSON.getMimeType().equals(contentType.getMimeType())) {
var errorMessage = readError(errorPayload);
throw new LambdaInvokeException(
"Lambda Invoke task responded with error for function: " + functionArn
+ (errorMessage.isPresent() ? ". Error: " + errorMessage.get() : ""));
+ ". Error: " + errorPayload);
} else {
throw new LambdaInvokeException(
"Lambda Invoke task responded with error for function: " + functionArn);
Expand Down
91 changes: 91 additions & 0 deletions src/test/java/io/kestra/plugin/aws/lambda/AbstractInvokeTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package io.kestra.plugin.aws.lambda;

import java.io.InputStream;
import java.util.stream.Collectors;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StorageInterface;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.waiters.WaiterResponse;
import software.amazon.awssdk.services.lambda.LambdaClient;
import software.amazon.awssdk.services.lambda.model.CreateFunctionRequest;
import software.amazon.awssdk.services.lambda.model.CreateFunctionResponse;
import software.amazon.awssdk.services.lambda.model.FunctionCode;
import software.amazon.awssdk.services.lambda.model.GetFunctionRequest;
import software.amazon.awssdk.services.lambda.model.GetFunctionResponse;
import software.amazon.awssdk.services.lambda.model.Runtime;
import software.amazon.awssdk.services.lambda.waiters.LambdaWaiter;

@MicronautTest
@Testcontainers
public class AbstractInvokeTest {

static final String FUNCTION_NAME = "Test-Lambda";

static final String FUNCTION_ROLE_ARN = "arn:aws:iam::000000000000:role/lambda-role";

static final String FUNCTION_CODE = "lambda/test.py.zip";

protected static LocalStackContainer localstack;

@Inject
protected RunContextFactory runContextFactory;

@Inject
protected StorageInterface storageInterface;

protected String functionArn;

@BeforeAll
static void startLocalstack() {
localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:1.3.1"))
.withServices(LocalStackContainer.Service.LAMBDA);
localstack.start();
}

@AfterAll
static void stopLocalstack() {
if (localstack != null) {
localstack.stop();
}
}

void createFunction(LambdaClient client) {
if (client.listFunctions().functions().stream()
.filter(config -> config.functionName().equals(FUNCTION_NAME))
.collect(Collectors.toList()).size() == 0) {
LambdaWaiter waiter = client.waiter();

InputStream is = getClass().getClassLoader().getResourceAsStream(FUNCTION_CODE);

SdkBytes codeToUpload = SdkBytes.fromInputStream(is);

FunctionCode code = FunctionCode.builder().zipFile(codeToUpload).build();

CreateFunctionRequest functionRequest = CreateFunctionRequest.builder()
.functionName(FUNCTION_NAME).description("Created by the Lambda Java API")
.code(code).handler("test.handler")
.role(FUNCTION_ROLE_ARN)
.runtime(Runtime.PYTHON3_9)
.build();

// Create a Lambda function using a waiter.
CreateFunctionResponse functionResponse = client.createFunction(functionRequest);
GetFunctionRequest getFunctionRequest =
GetFunctionRequest.builder().functionName(FUNCTION_NAME).build();
WaiterResponse<GetFunctionResponse> waiterResponse =
waiter.waitUntilFunctionExists(getFunctionRequest);
waiterResponse.matched().response().ifPresent(System.out::println);
// FYI ARN can be found as follows
//functionArn = functionResponse.functionArn();
}
}


}
113 changes: 113 additions & 0 deletions src/test/java/io/kestra/plugin/aws/lambda/InvokeTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package io.kestra.plugin.aws.lambda;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.HashMap;
import java.util.Map;
import org.apache.http.entity.ContentType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.localstack.LocalStackContainer;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.RunContext;

public class InvokeTest extends AbstractInvokeTest {

private RunContext context;

@BeforeEach
public void setUp() throws IllegalVariableEvaluationException {
this.context = runContextFactory.of();
}

@Test
public void givenExistingLambda_whenInvoked_thenOutputOkMetricsOk() throws Exception {
// Given
var invoke = Invoke.builder()
.endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.LAMBDA)
.toString())
.functionArn(FUNCTION_NAME)
.id(InvokeTest.class.getSimpleName())
.type(InvokeTest.class.getName())
.region(localstack.getRegion())
.accessKeyId(localstack.getAccessKey())
.secretKeyId(localstack.getSecretKey())
.build();

var client = invoke.client(context);
createFunction(client);

// When
var output = invoke.run(context);

// Then
assertNotNull(output.getUri(), "File URI should be present");
assertEquals(ContentType.APPLICATION_JSON.getMimeType(), output.getContentType(),
"Output content type should be present");
assertTrue(output.getContentLength() > 10, "Output content length should have a value");
assertTrue(
context.metrics().stream().filter(m -> m.getName().equals("file.size"))
.map(m -> m.getValue()).findFirst().isPresent(),
"Metric file.size should be present");
assertTrue(
context.metrics().stream().filter(m -> m.getName().equals("duration"))
.map(m -> m.getValue()).findFirst().isPresent(),
"Metric duration should be present");
}

@Test
public void givenNotFoundLambda_whenInvoked_thenErrorNoMetrics() throws Exception {
// Given
var invoke = Invoke.builder()
.endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.LAMBDA)
.toString())
.functionArn("Fake_ARN")
.id(InvokeTest.class.getSimpleName())
.type(InvokeTest.class.getName())
.region(localstack.getRegion())
.accessKeyId(localstack.getAccessKey())
.secretKeyId(localstack.getSecretKey())
.build();

var client = invoke.client(context);
createFunction(client);

// When
assertThrows(LambdaInvokeException.class, () -> invoke.run(context),
"Invokation should thrown an exception");

// Then
assertTrue(context.metrics().size() == 0, "Metrics should not be present");
}

@Test
public void givenFailingLambda_whenInvoked_thenFailureNoMetrics() throws Exception {
// Given
Map<String, Object> params = new HashMap<>();
// ask for an error in the Lambda by function param (see test resource lambda/test.py)
params.put("action", "error");
var invoke = Invoke.builder()
.endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.LAMBDA)
.toString())
.functionArn(FUNCTION_NAME).functionPayload(params)
.id(InvokeTest.class.getSimpleName())
.type(InvokeTest.class.getName())
.region(localstack.getRegion())
.accessKeyId(localstack.getAccessKey())
.secretKeyId(localstack.getSecretKey())
.build();

var client = invoke.client(context);
createFunction(client);

// When
assertThrows(LambdaInvokeException.class, () -> invoke.run(context),
"Invokation should fail");

// Then
assertTrue(context.metrics().size() == 0, "Metrics should not be present");
}

}
20 changes: 20 additions & 0 deletions src/test/resources/lambda/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def handler(event, context):
print("Lambda function ARN:", context.invoked_function_arn)
action = event.get("action")
if action is not None:
if action == "error":
logger.info("Will throw an Exception. Action: " + action)
raise Exception("Error for client tests")
logger.info("Normal work - Unknow action: " + action)
else:
logger.info("Normal work - All OK!")

return {
"message": "All OK!",
"action": action
}
Binary file added src/test/resources/lambda/test.py.zip
Binary file not shown.

0 comments on commit 09e6492

Please sign in to comment.