From d271864053efc7fab39ce3cecce5c81545b8513d Mon Sep 17 00:00:00 2001 From: Souvik Bose Date: Fri, 13 Dec 2024 11:03:48 -0800 Subject: [PATCH] Modify the code as per comments Signed-off-by: Souvik Bose --- .../converter/KinesisRecordConverter.java | 2 +- .../processor/KinesisInputOutputRecord.java | 3 +- .../processor/KinesisRecordProcessor.java | 4 +- .../source/KinesisInputOutputRecordTest.java | 7 +- .../converter/KinesisRecordConverterTest.java | 2 - .../processor/KinesisRecordProcessorTest.java | 75 +++++++++---------- 6 files changed, 43 insertions(+), 50 deletions(-) diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverter.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverter.java index fdad523b63..3514670097 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverter.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverter.java @@ -50,7 +50,7 @@ public List convert(final DecompressionEngine decompre event.getEventHandle().setExternalOriginationTime(externalOriginationTime); event.getMetadata().setExternalOriginationTime(externalOriginationTime); records.add(KinesisInputOutputRecord.builder() - .withKinesisClientRecord(kinesisClientRecord) + .withIncomingRecordSizeBytes(kinesisClientRecord.data().position()) .withDataPrepperRecord(record).build()); }); } diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisInputOutputRecord.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisInputOutputRecord.java index 9663dc54fe..782e74ba21 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisInputOutputRecord.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisInputOutputRecord.java @@ -15,12 +15,11 @@ import lombok.Getter; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; -import software.amazon.kinesis.retrieval.KinesisClientRecord; @Builder(setterPrefix = "with") @Getter @AllArgsConstructor public class KinesisInputOutputRecord { private Record dataPrepperRecord; - private KinesisClientRecord kinesisClientRecord; + private long incomingRecordSizeBytes; } diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java index b6b9c0d0fd..2b087c5139 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java @@ -39,9 +39,7 @@ import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import java.nio.ByteBuffer; import java.time.Duration; -import java.util.ArrayList; import java.util.List; import java.util.ListIterator; import java.util.Optional; @@ -178,7 +176,7 @@ public void processRecords(ProcessRecordsInput processRecordsInput) { int eventCount = 0; for (KinesisInputOutputRecord kinesisInputOutputRecord: kinesisOutputRecords) { Record dataPrepperRecord = kinesisInputOutputRecord.getDataPrepperRecord(); - int incomingRecordSizeBytes = kinesisInputOutputRecord.getKinesisClientRecord().data().position(); + long incomingRecordSizeBytes = kinesisInputOutputRecord.getIncomingRecordSizeBytes(); bytesReceivedSummary.record(incomingRecordSizeBytes); Event event = dataPrepperRecord.getData(); acknowledgementSetOpt.ifPresent(acknowledgementSet -> acknowledgementSet.add(event)); diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisInputOutputRecordTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisInputOutputRecordTest.java index 808124995a..643a57c0fe 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisInputOutputRecordTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisInputOutputRecordTest.java @@ -32,7 +32,7 @@ void builder_defaultCreatesObjectCorrectly() { KinesisInputOutputRecord kinesisInputOutputRecord = KinesisInputOutputRecord.builder().build(); - assertNull(kinesisInputOutputRecord.getKinesisClientRecord()); + assertEquals(0L, kinesisInputOutputRecord.getIncomingRecordSizeBytes()); assertNull(kinesisInputOutputRecord.getDataPrepperRecord()); } @@ -46,13 +46,12 @@ void builder_createsObjectCorrectly() { .sequenceNumber(Integer.toString(1000)).subSequenceNumber(1).build(); KinesisInputOutputRecord kinesisInputOutputRecord = KinesisInputOutputRecord.builder() - .withKinesisClientRecord(kinesisClientRecord) + .withIncomingRecordSizeBytes(100L) .withDataPrepperRecord(record) .build(); - assertNotNull(kinesisInputOutputRecord.getKinesisClientRecord()); assertNotNull(kinesisInputOutputRecord.getDataPrepperRecord()); assertEquals(kinesisInputOutputRecord.getDataPrepperRecord(), record); - assertEquals(kinesisInputOutputRecord.getKinesisClientRecord(), kinesisClientRecord); + assertEquals(100L, kinesisInputOutputRecord.getIncomingRecordSizeBytes()); } } diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverterTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverterTest.java index 5d1dbbf363..0f9081455e 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverterTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverterTest.java @@ -15,8 +15,6 @@ import org.opensearch.dataprepper.event.TestEventFactory; import org.opensearch.dataprepper.model.codec.DecompressionEngine; import org.opensearch.dataprepper.model.codec.InputCodec; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.plugins.codec.json.NdjsonInputCodec; import org.opensearch.dataprepper.plugins.codec.json.NdjsonInputConfig; diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java index 97a3a167fc..b5037f2ad3 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java @@ -224,15 +224,16 @@ void testProcessRecordsWithoutAcknowledgementsWithCheckpointApplied() .sequenceNumber(Integer.toString(1000)).subSequenceNumber(1).build(); List kinesisClientRecords = new ArrayList<>(); kinesisClientRecords.add(kinesisClientRecord); + final long recordsSize = kinesisClientRecords.stream() + .map(kclRecord -> kclRecord.data().position()) + .mapToLong(Integer::longValue).sum(); + records.add(KinesisInputOutputRecord.builder() .withDataPrepperRecord(record) - .withKinesisClientRecord(kinesisClientRecord).build() + .withIncomingRecordSizeBytes(recordsSize).build() ); when(processRecordsInput.records()).thenReturn(kinesisClientRecords); - final Long recordsSize = kinesisClientRecords.stream() - .map(kclRecord -> kclRecord.data().position()) - .mapToLong(Integer::longValue).sum(); InputStream inputStream = mock(InputStream.class); when(decompressionEngine.createInputStream(inputStream)).thenReturn(inputStream); @@ -267,8 +268,8 @@ void testProcessRecordsWithoutAcknowledgementsWithCheckpointApplied() verify(acknowledgementSetManager, times(0)).create(any(), any(Duration.class)); verify(recordProcessed, times(1)).increment(anyDouble()); - verify(bytesReceivedSummary, times(1)).record(eq(recordsSize.doubleValue())); - verify(bytesProcessedSummary, times(1)).record(eq(recordsSize.doubleValue())); + verify(bytesReceivedSummary, times(1)).record(eq((double) recordsSize)); + verify(bytesProcessedSummary, times(1)).record(eq((double) recordsSize)); } @Test @@ -287,15 +288,14 @@ public void testProcessRecordsWithoutAcknowledgementsEnabled() .sequenceNumber(Integer.toString(1000)).subSequenceNumber(1).build(); List kinesisClientRecords = new ArrayList<>(); kinesisClientRecords.add(kinesisClientRecord); + final long recordsSize = kinesisClientRecords.stream() + .map(kclRecord -> kclRecord.data().position()) + .mapToLong(Integer::longValue).sum(); records.add(KinesisInputOutputRecord.builder() .withDataPrepperRecord(record) - .withKinesisClientRecord(kinesisClientRecord).build() + .withIncomingRecordSizeBytes(recordsSize).build() ); - final Long recordsSize = kinesisClientRecords.stream() - .map(kclRecord -> kclRecord.data().position()) - .mapToLong(Integer::longValue).sum(); - when(processRecordsInput.records()).thenReturn(kinesisClientRecords); when(kinesisRecordConverter.convert(eq(decompressionEngine), eq(kinesisClientRecords), eq(streamId))).thenReturn(records); @@ -322,8 +322,8 @@ public void testProcessRecordsWithoutAcknowledgementsEnabled() verify(acknowledgementSetManager, times(0)).create(any(), any(Duration.class)); verify(recordProcessed, times(1)).increment(anyDouble()); - verify(bytesReceivedSummary, times(1)).record(eq(recordsSize.doubleValue())); - verify(bytesProcessedSummary, times(1)).record(eq(recordsSize.doubleValue())); + verify(bytesReceivedSummary, times(1)).record(eq((double) recordsSize)); + verify(bytesProcessedSummary, times(1)).record(eq((double) recordsSize)); } @Test @@ -353,13 +353,13 @@ void testProcessRecordsWithAcknowledgementsEnabled() List kinesisClientRecords = new ArrayList<>(); when(processRecordsInput.records()).thenReturn(kinesisClientRecords); kinesisClientRecords.add(kinesisClientRecord); + final long recordsSize = kinesisClientRecords.stream() + .map(kclRecord -> kclRecord.data().position()) + .mapToLong(Integer::longValue).sum(); records.add(KinesisInputOutputRecord.builder() .withDataPrepperRecord(record) - .withKinesisClientRecord(kinesisClientRecord).build() + .withIncomingRecordSizeBytes(recordsSize).build() ); - final Long recordsSize = kinesisClientRecords.stream() - .map(kclRecord -> kclRecord.data().position()) - .mapToLong(Integer::longValue).sum(); when(kinesisRecordConverter.convert(eq(decompressionEngine), eq(kinesisClientRecords), eq(streamId))).thenReturn(records); kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, @@ -390,8 +390,8 @@ void testProcessRecordsWithAcknowledgementsEnabled() verify(acknowledgementSetSuccesses, atLeastOnce()).increment(); verify(recordProcessed, times(1)).increment(anyDouble()); verifyNoInteractions(recordProcessingErrors); - verify(bytesReceivedSummary, times(1)).record(eq(recordsSize.doubleValue())); - verify(bytesProcessedSummary, times(1)).record(eq(recordsSize.doubleValue())); + verify(bytesReceivedSummary, times(1)).record(eq((double) recordsSize)); + verify(bytesProcessedSummary, times(1)).record(eq((double) recordsSize)); } @Test @@ -420,13 +420,13 @@ void testProcessRecordsWithNDJsonInputCodec() List kinesisClientRecords = new ArrayList<>(); when(processRecordsInput.records()).thenReturn(kinesisClientRecords); kinesisClientRecords.add(kinesisClientRecord); + final long recordsSize = kinesisClientRecords.stream() + .map(kclRecord -> kclRecord.data().position()) + .mapToLong(Integer::longValue).sum(); records.add(KinesisInputOutputRecord.builder() .withDataPrepperRecord(record) - .withKinesisClientRecord(kinesisClientRecord).build() + .withIncomingRecordSizeBytes(recordsSize).build() ); - final Long recordsSize = kinesisClientRecords.stream() - .map(kclRecord -> kclRecord.data().position()) - .mapToLong(Integer::longValue).sum(); when(kinesisRecordConverter.convert(eq(decompressionEngine), eq(kinesisClientRecords), eq(streamId))).thenReturn(records); kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, @@ -456,8 +456,8 @@ void testProcessRecordsWithNDJsonInputCodec() verify(acknowledgementSetManager, times(0)).create(any(), any(Duration.class)); verify(recordProcessed, times(1)).increment(anyDouble()); - verify(bytesReceivedSummary, times(1)).record(eq(recordsSize.doubleValue())); - verify(bytesProcessedSummary, times(1)).record(eq(recordsSize.doubleValue())); + verify(bytesReceivedSummary, times(1)).record(eq((double) recordsSize)); + verify(bytesProcessedSummary, times(1)).record(eq((double) recordsSize)); } @Test @@ -475,18 +475,17 @@ void testProcessRecordsNoThrowException() .data(ByteBuffer.wrap(event.toJsonString().getBytes())) .sequenceNumber(Integer.toString(1000)).subSequenceNumber(1).build(); kinesisClientRecords.add(kinesisClientRecord); + final long recordsSize = kinesisClientRecords.stream() + .map(kclRecord -> kclRecord.data().position()) + .mapToLong(Integer::longValue).sum(); records.add(KinesisInputOutputRecord.builder() .withDataPrepperRecord(record) - .withKinesisClientRecord(kinesisClientRecord).build() + .withIncomingRecordSizeBytes(recordsSize).build() ); when(kinesisRecordConverter.convert(eq(decompressionEngine), eq(kinesisClientRecords), eq(streamId))).thenReturn(records); final Throwable exception = mock(RuntimeException.class); doThrow(exception).when(bufferAccumulator).add(any(Record.class)); - final Long recordsSize = kinesisClientRecords.stream() - .map(kclRecord -> kclRecord.data().position()) - .mapToLong(Integer::longValue).sum(); - kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier); kinesisRecordProcessor.initialize(initializationInput); @@ -494,8 +493,8 @@ void testProcessRecordsNoThrowException() assertDoesNotThrow(() -> kinesisRecordProcessor.processRecords(processRecordsInput)); verify(recordProcessingErrors, times(1)).increment(); verify(recordProcessed, times(0)).increment(anyDouble()); - verify(bytesReceivedSummary, times(1)).record(eq(recordsSize.doubleValue())); - verify(bytesProcessedSummary, times(0)).record(eq(recordsSize.doubleValue())); + verify(bytesReceivedSummary, times(1)).record(eq((double) recordsSize)); + verify(bytesProcessedSummary, times(0)).record(eq((double) recordsSize)); } @Test @@ -513,13 +512,13 @@ void testProcessRecordsBufferFlushNoThrowException() List kinesisClientRecords = new ArrayList<>(); when(processRecordsInput.records()).thenReturn(kinesisClientRecords); kinesisClientRecords.add(kinesisClientRecord); + final long recordsSize = kinesisClientRecords.stream() + .map(kclRecord -> kclRecord.data().position()) + .mapToLong(Integer::longValue).sum(); records.add(KinesisInputOutputRecord.builder() .withDataPrepperRecord(record) - .withKinesisClientRecord(kinesisClientRecord).build() + .withIncomingRecordSizeBytes(recordsSize).build() ); - final Long recordsSize = kinesisClientRecords.stream() - .map(kclRecord -> kclRecord.data().position()) - .mapToLong(Integer::longValue).sum(); when(kinesisRecordConverter.convert(eq(decompressionEngine), eq(kinesisClientRecords), eq(streamId))).thenReturn(records); final Throwable exception = mock(RuntimeException.class); @@ -532,8 +531,8 @@ void testProcessRecordsBufferFlushNoThrowException() assertDoesNotThrow(() -> kinesisRecordProcessor.processRecords(processRecordsInput)); verify(recordProcessingErrors, times(1)).increment(); verify(recordProcessed, times(0)).increment(anyDouble()); - verify(bytesReceivedSummary, times(1)).record(eq(recordsSize.doubleValue())); - verify(bytesProcessedSummary, times(1)).record(eq(recordsSize.doubleValue())); + verify(bytesReceivedSummary, times(1)).record(eq((double) recordsSize)); + verify(bytesProcessedSummary, times(1)).record(eq((double) recordsSize)); } @Test