diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/metrics/PluginMetrics.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/metrics/PluginMetrics.java index e04eef230f..57eca27ea3 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/metrics/PluginMetrics.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/metrics/PluginMetrics.java @@ -23,11 +23,15 @@ public class PluginMetrics { private final String metricsPrefix; - public static PluginMetrics fromPluginSetting(final PluginSetting pluginSetting) { + public static PluginMetrics fromPluginSetting(final PluginSetting pluginSetting, final String name) { if(pluginSetting.getPipelineName() == null) { throw new IllegalArgumentException("PluginSetting.pipelineName must not be null"); } - return PluginMetrics.fromNames(pluginSetting.getName(), pluginSetting.getPipelineName()); + return PluginMetrics.fromNames(name, pluginSetting.getPipelineName()); + } + + public static PluginMetrics fromPluginSetting(final PluginSetting pluginSetting) { + return fromPluginSetting(pluginSetting, pluginSetting.getName()); } /** diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/metrics/PluginMetricsTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/metrics/PluginMetricsTest.java index 873be3bc71..bb3a444444 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/metrics/PluginMetricsTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/metrics/PluginMetricsTest.java @@ -54,6 +54,19 @@ public void testCounterWithMetricsPrefix() { counter.getId().getName()); } + @Test + public void testCounterWithMetricsPrefixWithCustomMetricsName() { + final String customName = PLUGIN_NAME + "_custom"; + objectUnderTest = PluginMetrics.fromPluginSetting(pluginSetting, customName); + + final Counter counter = objectUnderTest.counter("counter"); + assertEquals( + pluginSetting.getPipelineName() + MetricNames.DELIMITER + + customName + MetricNames.DELIMITER + + "counter", + counter.getId().getName()); + } + @Test public void testCounter() { final Counter counter = objectUnderTest.counter("counter"); diff --git a/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/LambdaProcessorSinkIT.java b/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/LambdaProcessorSinkIT.java index 1745605add..ee3ddeaa1e 100644 --- a/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/LambdaProcessorSinkIT.java +++ b/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/LambdaProcessorSinkIT.java @@ -54,6 +54,7 @@ import static org.mockito.Mockito.lenient; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Timer; import java.util.ArrayList; @@ -88,11 +89,21 @@ public class LambdaProcessorSinkIT { @Mock private ExpressionEvaluator expressionEvaluator; @Mock - private Counter testCounter; + private Counter numberOfRecordsSuccessCounter; + @Mock + private Counter numberOfRecordsFailedCounter; + @Mock + private Counter numberOfRequestsSuccessCounter; + @Mock + private Counter numberOfRequestsFailedCounter; @Mock private Counter sinkSuccessCounter; @Mock - private Timer testTimer; + private Timer lambdaLatencyMetric; + @Mock + private DistributionSummary requestPayloadMetric; + @Mock + private DistributionSummary responsePayloadMetric; @Mock InvocationType invocationType; @@ -103,7 +114,7 @@ public class LambdaProcessorSinkIT { private AcknowledgementSet acknowledgementSet; private LambdaProcessor createLambdaProcessor(LambdaProcessorConfig processorConfig) { - return new LambdaProcessor(pluginFactory, pluginMetrics, processorConfig, awsCredentialsSupplier, expressionEvaluator); + return new LambdaProcessor(pluginFactory, pluginSetting, processorConfig, awsCredentialsSupplier, expressionEvaluator); } private LambdaSink createLambdaSink(LambdaSinkConfig lambdaSinkConfig) { @@ -111,6 +122,12 @@ private LambdaSink createLambdaSink(LambdaSinkConfig lambdaSinkConfig) { } + private void setPrivateField(Object targetObject, String fieldName, Object value) throws Exception { + Field field = targetObject.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(targetObject, value); + } + @BeforeEach public void setup() { lambdaRegion = System.getProperty("tests.lambda.processor.region"); @@ -118,6 +135,13 @@ public void setup() { role = System.getProperty("tests.lambda.processor.sts_role_arn"); successCount = new AtomicLong(); numEventHandlesReleased = new AtomicLong(); + numberOfRecordsSuccessCounter = mock(Counter.class); + numberOfRecordsFailedCounter = mock(Counter.class); + numberOfRequestsSuccessCounter = mock(Counter.class); + numberOfRequestsFailedCounter = mock(Counter.class); + lambdaLatencyMetric = mock(Timer.class); + requestPayloadMetric = mock(DistributionSummary.class); + responsePayloadMetric = mock(DistributionSummary.class); acknowledgementSet = mock(AcknowledgementSet.class); try { @@ -132,7 +156,6 @@ public void setup() { }).when(acknowledgementSet).release(any(EventHandle.class), any(Boolean.class)); } catch (Exception e){ } pluginMetrics = mock(PluginMetrics.class); - when(pluginMetrics.gauge(any(), any(AtomicLong.class))).thenReturn(new AtomicLong()); sinkSuccessCounter = mock(Counter.class); try { lenient().doAnswer(args -> { @@ -141,26 +164,22 @@ public void setup() { return null; }).when(sinkSuccessCounter).increment(any(Double.class)); } catch (Exception e){ } - testCounter = mock(Counter.class); try { lenient().doAnswer(args -> { return null; - }).when(testCounter).increment(any(Double.class)); + }).when(numberOfRecordsSuccessCounter).increment(any(Double.class)); } catch (Exception e){} try { lenient().doAnswer(args -> { return null; - }).when(testCounter).increment(); + }).when(numberOfRecordsFailedCounter).increment(); } catch (Exception e){} try { lenient().doAnswer(args -> { return null; - }).when(testTimer).record(any(Long.class), any(TimeUnit.class)); + }).when(lambdaLatencyMetric).record(any(Long.class), any(TimeUnit.class)); } catch (Exception e){} - when(pluginMetrics.counter(any())).thenReturn(testCounter); - testTimer = mock(Timer.class); - when(pluginMetrics.timer(any())).thenReturn(testTimer); lambdaProcessorConfig = mock(LambdaProcessorConfig.class); expressionEvaluator = mock(ExpressionEvaluator.class); awsCredentialsProvider = DefaultCredentialsProvider.create(); @@ -212,19 +231,23 @@ public void setup() { } - private void setPrivateField(Object targetObject, String fieldName, Object value) - throws Exception { - Field field = targetObject.getClass().getDeclaredField(fieldName); - field.setAccessible(true); - field.set(targetObject, value); + private void setPrivateFields(final LambdaProcessor lambdaProcessor) throws Exception { + setPrivateField(lambdaProcessor, "numberOfRecordsSuccessCounter", numberOfRecordsSuccessCounter); + setPrivateField(lambdaProcessor, "numberOfRecordsFailedCounter", numberOfRecordsFailedCounter); + setPrivateField(lambdaProcessor, "numberOfRequestsSuccessCounter", numberOfRequestsSuccessCounter); + setPrivateField(lambdaProcessor, "numberOfRequestsFailedCounter", numberOfRequestsFailedCounter); + setPrivateField(lambdaProcessor, "lambdaLatencyMetric", lambdaLatencyMetric); + setPrivateField(lambdaProcessor, "requestPayloadMetric", requestPayloadMetric); + setPrivateField(lambdaProcessor, "responsePayloadMetric", responsePayloadMetric); } @ParameterizedTest @ValueSource(ints = {11}) - public void testLambdaProcessorAndLambdaSink(int numRecords) { + public void testLambdaProcessorAndLambdaSink(int numRecords) throws Exception { when(invocationType.getAwsLambdaValue()).thenReturn(InvocationType.REQUEST_RESPONSE.getAwsLambdaValue()); when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(true); lambdaProcessor = createLambdaProcessor(lambdaProcessorConfig); + setPrivateFields(lambdaProcessor); List> records = createRecords(numRecords); Collection> results = lambdaProcessor.doExecute(records); @@ -232,9 +255,7 @@ public void testLambdaProcessorAndLambdaSink(int numRecords) { assertThat(results.size(), equalTo(numRecords)); validateStrictModeResults(records, results); LambdaSink lambdaSink = createLambdaSink(lambdaSinkConfig); - try { - setPrivateField(lambdaSink, "numberOfRecordsSuccessCounter", sinkSuccessCounter); - } catch (Exception e){} + setPrivateField(lambdaSink, "numberOfRecordsSuccessCounter", sinkSuccessCounter); lambdaSink.output(results); assertThat(successCount.get(), equalTo((long)numRecords)); assertThat(numEventHandlesReleased.get(), equalTo((long)numRecords)); diff --git a/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorIT.java b/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorIT.java index ae9efb9377..f05ab16b2e 100644 --- a/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorIT.java +++ b/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorIT.java @@ -7,7 +7,6 @@ import static org.mockito.Mockito.spy; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.event.EventMetadata; @@ -43,9 +42,12 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.lenient; + import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Timer; import software.amazon.awssdk.services.lambda.model.InvokeResponse; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -71,17 +73,45 @@ public class LambdaProcessorIT { @Mock private PluginFactory pluginFactory; @Mock - private PluginMetrics pluginMetrics; + private PluginSetting pluginSetting; @Mock private ExpressionEvaluator expressionEvaluator; @Mock - private Counter testCounter; + private Counter numberOfRecordsSuccessCounter; + @Mock + private Counter numberOfRecordsFailedCounter; + @Mock + private Counter numberOfRequestsSuccessCounter; + @Mock + private Counter numberOfRequestsFailedCounter; @Mock - private Timer testTimer; + private Counter sinkSuccessCounter; + @Mock + private Timer lambdaLatencyMetric; + @Mock + private DistributionSummary requestPayloadMetric; + @Mock + private DistributionSummary responsePayloadMetric; @Mock InvocationType invocationType; private LambdaProcessor createObjectUnderTest(LambdaProcessorConfig processorConfig) { - return new LambdaProcessor(pluginFactory, pluginMetrics, processorConfig, awsCredentialsSupplier, expressionEvaluator); + return new LambdaProcessor(pluginFactory, pluginSetting, processorConfig, awsCredentialsSupplier, expressionEvaluator); + } + + private void setPrivateField(Object targetObject, String fieldName, Object value) throws Exception { + Field field = targetObject.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(targetObject, value); + } + + private void setPrivateFields(final LambdaProcessor lambdaProcessor) throws Exception { + setPrivateField(lambdaProcessor, "numberOfRecordsSuccessCounter", numberOfRecordsSuccessCounter); + setPrivateField(lambdaProcessor, "numberOfRecordsFailedCounter", numberOfRecordsFailedCounter); + setPrivateField(lambdaProcessor, "numberOfRequestsSuccessCounter", numberOfRequestsSuccessCounter); + setPrivateField(lambdaProcessor, "numberOfRequestsFailedCounter", numberOfRequestsFailedCounter); + setPrivateField(lambdaProcessor, "lambdaLatencyMetric", lambdaLatencyMetric); + setPrivateField(lambdaProcessor, "requestPayloadMetric", requestPayloadMetric); + setPrivateField(lambdaProcessor, "responsePayloadMetric", responsePayloadMetric); } @BeforeEach @@ -89,22 +119,32 @@ public void setup() { lambdaRegion = System.getProperty("tests.lambda.processor.region"); functionName = System.getProperty("tests.lambda.processor.functionName"); role = System.getProperty("tests.lambda.processor.sts_role_arn"); - pluginMetrics = mock(PluginMetrics.class); - //when(pluginMetrics.gauge(any(), any(AtomicLong.class))).thenReturn(new AtomicLong()); - //testCounter = mock(Counter.class); + pluginSetting = mock(PluginSetting.class); + when(pluginSetting.getPipelineName()).thenReturn("pipeline"); + when(pluginSetting.getName()).thenReturn("name"); + numberOfRecordsSuccessCounter = mock(Counter.class); + numberOfRecordsFailedCounter = mock(Counter.class); + numberOfRequestsSuccessCounter = mock(Counter.class); + numberOfRequestsFailedCounter = mock(Counter.class); + lambdaLatencyMetric = mock(Timer.class); + requestPayloadMetric = mock(DistributionSummary.class); + responsePayloadMetric = mock(DistributionSummary.class); + try { + lenient().doAnswer(args -> { + return null; + }).when(numberOfRecordsSuccessCounter).increment(any(Double.class)); + } catch (Exception e){} try { lenient().doAnswer(args -> { return null; - }).when(testCounter).increment(any(Double.class)); + }).when(numberOfRecordsFailedCounter).increment(); } catch (Exception e){} try { lenient().doAnswer(args -> { return null; - }).when(testTimer).record(any(Long.class), any(TimeUnit.class)); + }).when(lambdaLatencyMetric).record(any(Long.class), any(TimeUnit.class)); } catch (Exception e){} - when(pluginMetrics.counter(any())).thenReturn(testCounter); - testTimer = mock(Timer.class); - when(pluginMetrics.timer(any())).thenReturn(testTimer); + lambdaProcessorConfig = mock(LambdaProcessorConfig.class); expressionEvaluator = mock(ExpressionEvaluator.class); awsCredentialsProvider = DefaultCredentialsProvider.create(); @@ -166,7 +206,7 @@ public void testRequestResponseWithMatchingEventsAggregateMode(int numRecords) { @ParameterizedTest @ValueSource(ints = {1000}) - public void testRequestResponse_WithMatchingEvents_StrictMode_WithMultipleThreads(int numRecords) throws InterruptedException { + public void testRequestResponse_WithMatchingEvents_StrictMode_WithMultipleThreads(int numRecords) throws Exception { when(invocationType.getAwsLambdaValue()).thenReturn(InvocationType.REQUEST_RESPONSE.getAwsLambdaValue()); when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(true); lambdaProcessor = createObjectUnderTest(lambdaProcessorConfig); @@ -191,7 +231,7 @@ public void testRequestResponse_WithMatchingEvents_StrictMode_WithMultipleThread @ParameterizedTest @ValueSource(strings = {"RequestResponse", "Event"}) - public void testDifferentInvocationTypes(String invocationType) { + public void testDifferentInvocationTypes(String invocationType) throws Exception { when(this.invocationType.getAwsLambdaValue()).thenReturn(invocationType); when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(true); lambdaProcessor = createObjectUnderTest(lambdaProcessorConfig); @@ -207,7 +247,7 @@ public void testDifferentInvocationTypes(String invocationType) { } @Test - public void testWithFailureTags() { + public void testWithFailureTags() throws Exception { when(invocationType.getAwsLambdaValue()).thenReturn(InvocationType.REQUEST_RESPONSE.getAwsLambdaValue()); when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(false); when(lambdaProcessorConfig.getTagsOnFailure()).thenReturn(Collections.singletonList("lambda_failure")); diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java index d456d644cb..85e33e192c 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java @@ -71,7 +71,7 @@ public class LambdaProcessor extends AbstractProcessor, Record tagsOnMatchFailure; + private final List tagsOnFailure; private final LambdaAsyncClient lambdaAsyncClient; private final DistributionSummary requestPayloadMetric; private final DistributionSummary responsePayloadMetric; @@ -79,11 +79,11 @@ public class LambdaProcessor extends AbstractProcessor, Record> doExecute(Collection> records) { for (Record record : records) { final Event event = record.getData(); // If the condition is false, add the event to resultRecords as-is - if (whenCondition != null && !expressionEvaluator.evaluateConditional(whenCondition, - event)) { + if (whenCondition != null && !expressionEvaluator.evaluateConditional(whenCondition, event)) { resultRecords.add(record); continue; } @@ -155,7 +154,6 @@ public Collection> doExecute(Collection> records) { Map> bufferToFutureMap = LambdaCommonHandler.sendRecords( recordsToLambda, lambdaProcessorConfig, lambdaAsyncClient, new OutputCodecContext()); - for (Map.Entry> entry : bufferToFutureMap.entrySet()) { CompletableFuture future = entry.getValue(); Buffer inputBuffer = entry.getKey(); @@ -163,20 +161,26 @@ public Collection> doExecute(Collection> records) { InvokeResponse response = future.join(); Duration latency = inputBuffer.stopLatencyWatch(); lambdaLatencyMetric.record(latency.toMillis(), TimeUnit.MILLISECONDS); + requestPayloadMetric.record(inputBuffer.getPayloadRequestSize()); if (isSuccess(response)) { + resultRecords.addAll(convertLambdaResponseToEvent(inputBuffer, response)); numberOfRecordsSuccessCounter.increment(inputBuffer.getEventCount()); numberOfRequestsSuccessCounter.increment(); - resultRecords.addAll(convertLambdaResponseToEvent(inputBuffer, response)); + if (response.payload() != null) { + responsePayloadMetric.record(response.payload().asByteArray().length); + } + continue; } else { LOG.error("Lambda invoke failed with error {} ", response.statusCode()); - resultRecords.addAll(addFailureTags(inputBuffer.getRecords())); + /* fall through */ } } catch (Exception e) { LOG.error("Exception from Lambda invocation ", e); - numberOfRecordsFailedCounter.increment(inputBuffer.getEventCount()); - numberOfRequestsFailedCounter.increment(); - resultRecords.addAll(addFailureTags(inputBuffer.getRecords())); + /* fall through */ } + numberOfRecordsFailedCounter.increment(inputBuffer.getEventCount()); + numberOfRequestsFailedCounter.increment(); + resultRecords.addAll(addFailureTags(inputBuffer.getRecords())); } return resultRecords; } @@ -190,40 +194,36 @@ List> convertLambdaResponseToEvent(Buffer flushedBuffer, final InvokeResponse lambdaResponse) { InputCodec responseCodec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSetting); List> originalRecords = flushedBuffer.getRecords(); - try { - List parsedEvents = new ArrayList<>(); - List> resultRecords = new ArrayList<>(); - SdkBytes payload = lambdaResponse.payload(); - // Handle null or empty payload - if (payload == null || payload.asByteArray() == null - || payload.asByteArray().length == 0) { - LOG.warn(NOISY, - "Lambda response payload is null or empty, dropping the original events"); - } else { - InputStream inputStream = new ByteArrayInputStream(payload.asByteArray()); - //Convert to response codec - try { - responseCodec.parse(inputStream, record -> { + List parsedEvents = new ArrayList<>(); + + List> resultRecords = new ArrayList<>(); + SdkBytes payload = lambdaResponse.payload(); + // Handle null or empty payload + if (payload == null || payload.asByteArray() == null || payload.asByteArray().length == 0) { + LOG.warn(NOISY, "Lambda response payload is null or empty, dropping the original events"); + } else { + InputStream inputStream = new ByteArrayInputStream(payload.asByteArray()); + //Convert to response codec + try { + responseCodec.parse(inputStream, record -> { Event event = record.getData(); parsedEvents.add(event); - }); - } catch (IOException ex) { - throw new RuntimeException(ex); - } + }); + } catch (IOException ex) { + LOG.error("Error while trying to parse response from Lambda", ex); + throw new RuntimeException(ex); + } + if (parsedEvents.size() == 0) { + throw new RuntimeException("Lambda Response could not be parsed, returning original events"); + } - LOG.debug("Parsed Event Size:{}, FlushedBuffer eventCount:{}, " + + LOG.debug("Parsed Event Size:{}, FlushedBuffer eventCount:{}, " + "FlushedBuffer size:{}", parsedEvents.size(), flushedBuffer.getEventCount(), flushedBuffer.getSize()); - responseStrategy.handleEvents(parsedEvents, originalRecords, resultRecords, - flushedBuffer); - } - return resultRecords; - } catch (Exception e) { - LOG.error(NOISY, "Error converting Lambda response to Event"); - addFailureTags(flushedBuffer.getRecords()); - return originalRecords; + responseStrategy.handleEvents(parsedEvents, originalRecords, resultRecords, flushedBuffer); } + return resultRecords; } /* @@ -231,12 +231,15 @@ List> convertLambdaResponseToEvent(Buffer flushedBuffer, * Batch fails and tag each event in that Batch. */ private List> addFailureTags(List> records) { + if (tagsOnFailure == null || tagsOnFailure.isEmpty()) { + return records; + } // Add failure tags to each event in the batch for (Record record : records) { Event event = record.getData(); EventMetadata metadata = event.getMetadata(); if (metadata != null) { - metadata.addTags(tagsOnMatchFailure); + metadata.addTags(tagsOnFailure); } else { LOG.warn("Event metadata is null, cannot add failure tags."); } diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java index 25c4bf2e1c..f4e59e967d 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java @@ -164,10 +164,15 @@ public void doOutput(final Collection> records) { InvokeResponse response = future.join(); Duration latency = inputBuffer.stopLatencyWatch(); lambdaLatencyMetric.record(latency.toMillis(), TimeUnit.MILLISECONDS); + requestPayloadMetric.record(inputBuffer.getPayloadRequestSize()); if (isSuccess(response)) { + releaseEventHandlesPerBatch(true, inputBuffer); numberOfRecordsSuccessCounter.increment(inputBuffer.getEventCount()); numberOfRequestsSuccessCounter.increment(); - releaseEventHandlesPerBatch(true, inputBuffer); + if (response.payload() != null) { + responsePayloadMetric.record(response.payload().asByteArray().length); + } + continue; } else { LOG.error("Lambda invoke failed with error {} ", response.statusCode()); handleFailure(new RuntimeException("failed"), inputBuffer); diff --git a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java index 70c0135179..baa8989d22 100644 --- a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java +++ b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java @@ -4,11 +4,10 @@ */ package org.opensearch.dataprepper.plugins.lambda.processor; - import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyDouble; @@ -21,14 +20,11 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.opensearch.dataprepper.plugins.lambda.processor.LambdaProcessor.NUMBER_OF_FAILED_REQUESTS_TO_LAMBDA; -import static org.opensearch.dataprepper.plugins.lambda.processor.LambdaProcessor.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED; -import static org.opensearch.dataprepper.plugins.lambda.processor.LambdaProcessor.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS; -import static org.opensearch.dataprepper.plugins.lambda.processor.LambdaProcessor.NUMBER_OF_SUCCESSFUL_REQUESTS_TO_LAMBDA; import static org.opensearch.dataprepper.plugins.lambda.sink.LambdaSinkTest.getSampleRecord; import static org.opensearch.dataprepper.plugins.lambda.utils.LambdaTestSetupUtil.createLambdaConfigurationFromYaml; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Timer; import java.io.InputStream; import java.lang.reflect.Field; @@ -38,8 +34,9 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; + +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -50,10 +47,9 @@ import org.mockito.quality.Strictness; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.expression.ExpressionEvaluator; -import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.codec.InputCodec; -import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventMetadata; @@ -87,7 +83,7 @@ public class LambdaProcessorTest { private PluginFactory pluginFactory; @Mock - private PluginMetrics pluginMetrics; + private PluginSetting pluginSetting; @Mock private LambdaProcessorConfig lambdaProcessorConfig; @@ -113,15 +109,25 @@ public class LambdaProcessorTest { @Mock private Counter numberOfRecordsFailedCounter; + @Mock private Counter numberOfRequestsFailedCounter; + @Mock + private DistributionSummary requestPayloadMetric; + + @Mock + private DistributionSummary responsePayloadMetric; + @Mock private InvokeResponse invokeResponse; @Mock private Timer lambdaLatencyMetric; + @Mock + private LambdaAsyncClient lambdaAsyncClient; + // The class under test private LambdaProcessor lambdaProcessor; @@ -129,6 +135,9 @@ public class LambdaProcessorTest { public void setUp() throws Exception { MockitoAnnotations.openMocks(this); + when(pluginSetting.getName()).thenReturn("testProcessor"); + when(pluginSetting.getPipelineName()).thenReturn("testPipeline"); +/* // Mock PluginMetrics counters and timers when(pluginMetrics.counter(eq(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS))).thenReturn( numberOfRecordsSuccessCounter); @@ -139,8 +148,7 @@ public void setUp() throws Exception { when(pluginMetrics.counter(eq(NUMBER_OF_FAILED_REQUESTS_TO_LAMBDA))).thenReturn( numberOfRecordsFailedCounter); when(pluginMetrics.timer(anyString())).thenReturn(lambdaLatencyMetric); - when(pluginMetrics.gauge(anyString(), any(AtomicLong.class))).thenAnswer( - invocation -> invocation.getArgument(1)); +*/ ClientOptions clientOptions = new ClientOptions(); when(lambdaProcessorConfig.getClientOptions()).thenReturn(clientOptions); @@ -170,21 +178,19 @@ public void setUp() throws Exception { responseCodec); // Instantiate the LambdaProcessor manually - lambdaProcessor = new LambdaProcessor(pluginFactory, pluginMetrics, lambdaProcessorConfig, + lambdaProcessor = new LambdaProcessor(pluginFactory, pluginSetting, lambdaProcessorConfig, awsCredentialsSupplier, expressionEvaluator); + populatePrivateFields(); + //setPrivateField(lambdaProcessor, "pluginMetrics", pluginMetrics); // Mock InvokeResponse when(invokeResponse.payload()).thenReturn(SdkBytes.fromUtf8String("[{\"key\":\"value\"}]")); when(invokeResponse.statusCode()).thenReturn(200); // Success status code - // Mock LambdaAsyncClient inside LambdaProcessor - LambdaAsyncClient lambdaAsyncClientMock = mock(LambdaAsyncClient.class); - setPrivateField(lambdaProcessor, "lambdaAsyncClient", lambdaAsyncClientMock); - // Mock the invoke method to return a completed future CompletableFuture invokeFuture = CompletableFuture.completedFuture( invokeResponse); - when(lambdaAsyncClientMock.invoke(any(InvokeRequest.class))).thenReturn(invokeFuture); + when(lambdaAsyncClient.invoke(any(InvokeRequest.class))).thenReturn(invokeFuture); // Mock Response Codec parse method doNothing().when(responseCodec).parse(any(InputStream.class), any(Consumer.class)); @@ -192,7 +198,6 @@ public void setUp() throws Exception { } private void populatePrivateFields() throws Exception { - List tagsOnMatchFailure = Collections.singletonList("failure_tag"); // Use reflection to set the private fields setPrivateField(lambdaProcessor, "numberOfRecordsSuccessCounter", numberOfRecordsSuccessCounter); @@ -202,8 +207,10 @@ private void populatePrivateFields() throws Exception { numberOfRecordsFailedCounter); setPrivateField(lambdaProcessor, "numberOfRequestsFailedCounter", numberOfRequestsFailedCounter); - setPrivateField(lambdaProcessor, "tagsOnMatchFailure", tagsOnMatchFailure); - setPrivateField(lambdaProcessor, "lambdaCommonHandler", lambdaCommonHandler); + setPrivateField(lambdaProcessor, "lambdaLatencyMetric", lambdaLatencyMetric); + setPrivateField(lambdaProcessor, "responsePayloadMetric", responsePayloadMetric); + setPrivateField(lambdaProcessor, "requestPayloadMetric", requestPayloadMetric); + setPrivateField(lambdaProcessor, "lambdaAsyncClient", lambdaAsyncClient); } // Helper method to set private fields via reflection @@ -247,20 +254,41 @@ public void testProcessorDefaults() { @ParameterizedTest @ValueSource(strings = {"lambda-processor-success-config.yaml"}) - public void testDoExecute_WithExceptionDuringProcessing(String configFileName) { + public void testDoExecute_WithExceptionInSendRecords(String configFileName) throws Exception { // Arrange List> records = Collections.singletonList(getSampleRecord()); LambdaProcessorConfig lambdaProcessorConfig = createLambdaConfigurationFromYaml( configFileName); - LambdaProcessor lambdaProcessor = new LambdaProcessor(pluginFactory, pluginMetrics, + lambdaProcessor = new LambdaProcessor(pluginFactory, pluginSetting, lambdaProcessorConfig, awsCredentialsSupplier, expressionEvaluator); + populatePrivateFields(); + + + when(lambdaAsyncClient.invoke(any(InvokeRequest.class))).thenThrow(new RuntimeException("test exception")); + Assertions.assertThrows( RuntimeException.class, () -> lambdaProcessor.doExecute(records)); + + } + + @ParameterizedTest + @ValueSource(strings = {"lambda-processor-success-config.yaml"}) + public void testDoExecute_WithExceptionDuringProcessing(String configFileName) throws Exception { + // Arrange + List> records = Collections.singletonList(getSampleRecord()); + LambdaProcessorConfig lambdaProcessorConfig = createLambdaConfigurationFromYaml( + configFileName); + lambdaProcessor = new LambdaProcessor(pluginFactory, pluginSetting, + lambdaProcessorConfig, + awsCredentialsSupplier, expressionEvaluator); + populatePrivateFields(); - // make batch options null to generate exception - when(lambdaProcessorConfig.getBatchOptions()).thenReturn(null); - // Act - Collection> result = lambdaProcessor.doExecute(records); + CompletableFuture invokeFuture = CompletableFuture.completedFuture( + invokeResponse); + when(lambdaAsyncClient.invoke(any(InvokeRequest.class))).thenReturn(invokeFuture); + when(invokeResponse.payload()).thenThrow(new RuntimeException("Test Exception")); + + Collection> result = lambdaProcessor.doExecute(records); // Assert assertEquals(1, result.size()); verify(numberOfRecordsFailedCounter, times(1)).increment(1.0); @@ -332,7 +360,7 @@ public void testDoExecute_WhenConditionFalse() { when(lambdaProcessorConfig.getWhenCondition()).thenReturn("some_condition"); // Instantiate the LambdaProcessor manually - lambdaProcessor = new LambdaProcessor(pluginFactory, pluginMetrics, lambdaProcessorConfig, + lambdaProcessor = new LambdaProcessor(pluginFactory, pluginSetting, lambdaProcessorConfig, awsCredentialsSupplier, expressionEvaluator); // Act @@ -356,14 +384,10 @@ public void testDoExecute_SuccessfulProcessing() throws Exception { Record record = new Record<>(event); Collection> records = Collections.singletonList(record); - // Mock LambdaAsyncClient inside LambdaProcessor - LambdaAsyncClient lambdaAsyncClientMock = mock(LambdaAsyncClient.class); - setPrivateField(lambdaProcessor, "lambdaAsyncClient", lambdaAsyncClientMock); - // Mock the invoke method to return a completed future CompletableFuture invokeFuture = CompletableFuture.completedFuture( invokeResponse); - when(lambdaAsyncClientMock.invoke(any(InvokeRequest.class))).thenReturn(invokeFuture); + when(lambdaAsyncClient.invoke(any(InvokeRequest.class))).thenReturn(invokeFuture); // Mock Buffer behavior when(bufferMock.getEventCount()).thenReturn(0).thenReturn(1).thenReturn(0);