Skip to content

Commit

Permalink
Fixed test cases
Browse files Browse the repository at this point in the history
Signed-off-by: Kondaka <krishkdk@amazon.com>
  • Loading branch information
kkondaka committed Nov 19, 2024
1 parent 482f65c commit 6c05bcf
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static org.mockito.Mockito.lenient;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;

import java.util.ArrayList;
Expand Down Expand Up @@ -88,11 +89,21 @@ public class LambdaProcessorSinkIT {
@Mock
private ExpressionEvaluator expressionEvaluator;
@Mock
private Counter testCounter;
private Counter numberOfRecordsSuccessCounter;
@Mock
private Counter numberOfRecordsFailedCounter;
@Mock
private Counter numberOfRequestsSuccessCounter;
@Mock
private Counter numberOfRequestsFailedCounter;
@Mock
private Counter sinkSuccessCounter;
@Mock
private Timer testTimer;
private Timer lambdaLatencyMetric;
@Mock
private DistributionSummary requestPayloadMetric;
@Mock
private DistributionSummary responsePayloadMetric;
@Mock
InvocationType invocationType;

Expand Down Expand Up @@ -124,6 +135,13 @@ public void setup() {
role = System.getProperty("tests.lambda.processor.sts_role_arn");
successCount = new AtomicLong();
numEventHandlesReleased = new AtomicLong();
numberOfRecordsSuccessCounter = mock(Counter.class);
numberOfRecordsFailedCounter = mock(Counter.class);
numberOfRequestsSuccessCounter = mock(Counter.class);
numberOfRequestsFailedCounter = mock(Counter.class);
lambdaLatencyMetric = mock(Timer.class);
requestPayloadMetric = mock(DistributionSummary.class);
responsePayloadMetric = mock(DistributionSummary.class);

acknowledgementSet = mock(AcknowledgementSet.class);
try {
Expand All @@ -138,7 +156,6 @@ public void setup() {
}).when(acknowledgementSet).release(any(EventHandle.class), any(Boolean.class));
} catch (Exception e){ }
pluginMetrics = mock(PluginMetrics.class);
when(pluginMetrics.gauge(any(), any(AtomicLong.class))).thenReturn(new AtomicLong());
sinkSuccessCounter = mock(Counter.class);
try {
lenient().doAnswer(args -> {
Expand All @@ -147,26 +164,22 @@ public void setup() {
return null;
}).when(sinkSuccessCounter).increment(any(Double.class));
} catch (Exception e){ }
testCounter = mock(Counter.class);
try {
lenient().doAnswer(args -> {
return null;
}).when(testCounter).increment(any(Double.class));
}).when(numberOfRecordsSuccessCounter).increment(any(Double.class));
} catch (Exception e){}
try {
lenient().doAnswer(args -> {
return null;
}).when(testCounter).increment();
}).when(numberOfRecordsFailedCounter).increment();
} catch (Exception e){}
try {
lenient().doAnswer(args -> {
return null;
}).when(testTimer).record(any(Long.class), any(TimeUnit.class));
}).when(lambdaLatencyMetric).record(any(Long.class), any(TimeUnit.class));
} catch (Exception e){}
when(pluginMetrics.counter(any())).thenReturn(testCounter);

testTimer = mock(Timer.class);
when(pluginMetrics.timer(any())).thenReturn(testTimer);
lambdaProcessorConfig = mock(LambdaProcessorConfig.class);
expressionEvaluator = mock(ExpressionEvaluator.class);
awsCredentialsProvider = DefaultCredentialsProvider.create();
Expand Down Expand Up @@ -218,23 +231,31 @@ public void setup() {

}

private void setPrivateFields(final LambdaProcessor lambdaProcessor) throws Exception {
setPrivateField(lambdaProcessor, "numberOfRecordsSuccessCounter", numberOfRecordsSuccessCounter);
setPrivateField(lambdaProcessor, "numberOfRecordsFailedCounter", numberOfRecordsFailedCounter);
setPrivateField(lambdaProcessor, "numberOfRequestsSuccessCounter", numberOfRequestsSuccessCounter);
setPrivateField(lambdaProcessor, "numberOfRequestsFailedCounter", numberOfRequestsFailedCounter);
setPrivateField(lambdaProcessor, "lambdaLatencyMetric", lambdaLatencyMetric);
setPrivateField(lambdaProcessor, "requestPayloadMetric", requestPayloadMetric);
setPrivateField(lambdaProcessor, "responsePayloadMetric", responsePayloadMetric);
}

@ParameterizedTest
@ValueSource(ints = {11})
public void testLambdaProcessorAndLambdaSink(int numRecords) throws Exception {
when(invocationType.getAwsLambdaValue()).thenReturn(InvocationType.REQUEST_RESPONSE.getAwsLambdaValue());
when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(true);
lambdaProcessor = createLambdaProcessor(lambdaProcessorConfig);
setPrivateField(lambdaProcessor, "pluginMetrics", pluginMetrics);
setPrivateFields(lambdaProcessor);
List<Record<Event>> records = createRecords(numRecords);

Collection<Record<Event>> results = lambdaProcessor.doExecute(records);

assertThat(results.size(), equalTo(numRecords));
validateStrictModeResults(records, results);
LambdaSink lambdaSink = createLambdaSink(lambdaSinkConfig);
try {
setPrivateField(lambdaSink, "numberOfRecordsSuccessCounter", sinkSuccessCounter);
} catch (Exception e){}
setPrivateField(lambdaSink, "numberOfRecordsSuccessCounter", sinkSuccessCounter);
lambdaSink.output(results);
assertThat(successCount.get(), equalTo((long)numRecords));
assertThat(numEventHandlesReleased.get(), equalTo((long)numRecords));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import static org.mockito.Mockito.spy;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.event.EventMetadata;
Expand Down Expand Up @@ -43,7 +42,9 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.lenient;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
import java.lang.reflect.Field;
Expand Down Expand Up @@ -72,15 +73,25 @@ public class LambdaProcessorIT {
@Mock
private PluginFactory pluginFactory;
@Mock
private PluginMetrics pluginMetrics;
@Mock
private PluginSetting pluginSetting;
@Mock
private ExpressionEvaluator expressionEvaluator;
@Mock
private Counter testCounter;
private Counter numberOfRecordsSuccessCounter;
@Mock
private Counter numberOfRecordsFailedCounter;
@Mock
private Counter numberOfRequestsSuccessCounter;
@Mock
private Counter numberOfRequestsFailedCounter;
@Mock
private Counter sinkSuccessCounter;
@Mock
private Timer lambdaLatencyMetric;
@Mock
private DistributionSummary requestPayloadMetric;
@Mock
private Timer testTimer;
private DistributionSummary responsePayloadMetric;
@Mock
InvocationType invocationType;
private LambdaProcessor createObjectUnderTest(LambdaProcessorConfig processorConfig) {
Expand All @@ -93,29 +104,47 @@ private void setPrivateField(Object targetObject, String fieldName, Object value
field.set(targetObject, value);
}

private void setPrivateFields(final LambdaProcessor lambdaProcessor) throws Exception {
setPrivateField(lambdaProcessor, "numberOfRecordsSuccessCounter", numberOfRecordsSuccessCounter);
setPrivateField(lambdaProcessor, "numberOfRecordsFailedCounter", numberOfRecordsFailedCounter);
setPrivateField(lambdaProcessor, "numberOfRequestsSuccessCounter", numberOfRequestsSuccessCounter);
setPrivateField(lambdaProcessor, "numberOfRequestsFailedCounter", numberOfRequestsFailedCounter);
setPrivateField(lambdaProcessor, "lambdaLatencyMetric", lambdaLatencyMetric);
setPrivateField(lambdaProcessor, "requestPayloadMetric", requestPayloadMetric);
setPrivateField(lambdaProcessor, "responsePayloadMetric", responsePayloadMetric);
}

@BeforeEach
public void setup() {
lambdaRegion = System.getProperty("tests.lambda.processor.region");
functionName = System.getProperty("tests.lambda.processor.functionName");
role = System.getProperty("tests.lambda.processor.sts_role_arn");
pluginMetrics = mock(PluginMetrics.class);
pluginSetting = mock(PluginSetting.class);
when(pluginSetting.getPipelineName()).thenReturn("pipeline");
when(pluginSetting.getName()).thenReturn("name");
testCounter = mock(Counter.class);
numberOfRecordsSuccessCounter = mock(Counter.class);
numberOfRecordsFailedCounter = mock(Counter.class);
numberOfRequestsSuccessCounter = mock(Counter.class);
numberOfRequestsFailedCounter = mock(Counter.class);
lambdaLatencyMetric = mock(Timer.class);
requestPayloadMetric = mock(DistributionSummary.class);
responsePayloadMetric = mock(DistributionSummary.class);
try {
lenient().doAnswer(args -> {
return null;
}).when(testCounter).increment(any(Double.class));
}).when(numberOfRecordsSuccessCounter).increment(any(Double.class));
} catch (Exception e){}
try {
lenient().doAnswer(args -> {
return null;
}).when(testTimer).record(any(Long.class), any(TimeUnit.class));
}).when(numberOfRecordsFailedCounter).increment();
} catch (Exception e){}
when(pluginMetrics.counter(any())).thenReturn(testCounter);
testTimer = mock(Timer.class);
when(pluginMetrics.timer(any())).thenReturn(testTimer);
try {
lenient().doAnswer(args -> {
return null;
}).when(lambdaLatencyMetric).record(any(Long.class), any(TimeUnit.class));
} catch (Exception e){}

lambdaProcessorConfig = mock(LambdaProcessorConfig.class);
expressionEvaluator = mock(ExpressionEvaluator.class);
awsCredentialsProvider = DefaultCredentialsProvider.create();
Expand Down Expand Up @@ -181,7 +210,6 @@ public void testRequestResponse_WithMatchingEvents_StrictMode_WithMultipleThread
when(invocationType.getAwsLambdaValue()).thenReturn(InvocationType.REQUEST_RESPONSE.getAwsLambdaValue());
when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(true);
lambdaProcessor = createObjectUnderTest(lambdaProcessorConfig);
setPrivateField(lambdaProcessor, "pluginMetrics", pluginMetrics);
int numThreads = 5;
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
CountDownLatch latch = new CountDownLatch(numThreads);
Expand All @@ -207,7 +235,6 @@ public void testDifferentInvocationTypes(String invocationType) throws Exception
when(this.invocationType.getAwsLambdaValue()).thenReturn(invocationType);
when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(true);
lambdaProcessor = createObjectUnderTest(lambdaProcessorConfig);
setPrivateField(lambdaProcessor, "pluginMetrics", pluginMetrics);
List<Record<Event>> records = createRecords(10);
Collection<Record<Event>> results = lambdaProcessor.doExecute(records);
if (invocationType.equals("RequestResponse")) {
Expand All @@ -225,7 +252,6 @@ public void testWithFailureTags() throws Exception {
when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(false);
when(lambdaProcessorConfig.getTagsOnFailure()).thenReturn(Collections.singletonList("lambda_failure"));
LambdaProcessor spyLambdaProcessor = spy(createObjectUnderTest(lambdaProcessorConfig));
setPrivateField(spyLambdaProcessor, "pluginMetrics", pluginMetrics);
doThrow(new RuntimeException("Simulated Lambda failure"))
.when(spyLambdaProcessor).convertLambdaResponseToEvent(any(Buffer.class), any(InvokeResponse.class));
List<Record<Event>> records = createRecords(5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;

import javax.management.RuntimeMBeanException;

@DataPrepperPlugin(name = "aws_lambda", pluginType = Processor.class, pluginConfigurationType = LambdaProcessorConfig.class)
public class LambdaProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {

Expand Down Expand Up @@ -79,15 +77,13 @@ public class LambdaProcessor extends AbstractProcessor<Record<Event>, Record<Eve
private final DistributionSummary responsePayloadMetric;
private final ResponseEventHandlingStrategy responseStrategy;
private final JsonOutputCodecConfig jsonOutputCodecConfig;
private final PluginMetrics pluginMetrics;

@DataPrepperPluginConstructor
public LambdaProcessor(final PluginFactory pluginFactory, final PluginSetting pluginSetting,
final LambdaProcessorConfig lambdaProcessorConfig,
final AwsCredentialsSupplier awsCredentialsSupplier,
final ExpressionEvaluator expressionEvaluator) {
super(PluginMetrics.fromPluginSetting(pluginSetting, pluginSetting.getName()+"_processor"));
pluginMetrics = getPluginMetrics();
this.expressionEvaluator = expressionEvaluator;
this.pluginFactory = pluginFactory;
this.lambdaProcessorConfig = lambdaProcessorConfig;
Expand Down

0 comments on commit 6c05bcf

Please sign in to comment.