Skip to content

Commit

Permalink
Modify the code as per comments
Browse files Browse the repository at this point in the history
Signed-off-by: Souvik Bose <souvbose@amazon.com>
  • Loading branch information
sbose2k21 committed Dec 14, 2024
1 parent 6260611 commit d271864
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public List<KinesisInputOutputRecord> convert(final DecompressionEngine decompre
event.getEventHandle().setExternalOriginationTime(externalOriginationTime);
event.getMetadata().setExternalOriginationTime(externalOriginationTime);
records.add(KinesisInputOutputRecord.builder()
.withKinesisClientRecord(kinesisClientRecord)
.withIncomingRecordSizeBytes(kinesisClientRecord.data().position())
.withDataPrepperRecord(record).build());
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
import lombok.Getter;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

@Builder(setterPrefix = "with")
@Getter
@AllArgsConstructor
public class KinesisInputOutputRecord {
private Record<Event> dataPrepperRecord;
private KinesisClientRecord kinesisClientRecord;
private long incomingRecordSizeBytes;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.Optional;
Expand Down Expand Up @@ -178,7 +176,7 @@ public void processRecords(ProcessRecordsInput processRecordsInput) {
int eventCount = 0;
for (KinesisInputOutputRecord kinesisInputOutputRecord: kinesisOutputRecords) {
Record<Event> dataPrepperRecord = kinesisInputOutputRecord.getDataPrepperRecord();
int incomingRecordSizeBytes = kinesisInputOutputRecord.getKinesisClientRecord().data().position();
long incomingRecordSizeBytes = kinesisInputOutputRecord.getIncomingRecordSizeBytes();
bytesReceivedSummary.record(incomingRecordSizeBytes);
Event event = dataPrepperRecord.getData();
acknowledgementSetOpt.ifPresent(acknowledgementSet -> acknowledgementSet.add(event));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ void builder_defaultCreatesObjectCorrectly() {

KinesisInputOutputRecord kinesisInputOutputRecord = KinesisInputOutputRecord.builder().build();

assertNull(kinesisInputOutputRecord.getKinesisClientRecord());
assertEquals(0L, kinesisInputOutputRecord.getIncomingRecordSizeBytes());
assertNull(kinesisInputOutputRecord.getDataPrepperRecord());
}

Expand All @@ -46,13 +46,12 @@ void builder_createsObjectCorrectly() {
.sequenceNumber(Integer.toString(1000)).subSequenceNumber(1).build();

KinesisInputOutputRecord kinesisInputOutputRecord = KinesisInputOutputRecord.builder()
.withKinesisClientRecord(kinesisClientRecord)
.withIncomingRecordSizeBytes(100L)
.withDataPrepperRecord(record)
.build();

assertNotNull(kinesisInputOutputRecord.getKinesisClientRecord());
assertNotNull(kinesisInputOutputRecord.getDataPrepperRecord());
assertEquals(kinesisInputOutputRecord.getDataPrepperRecord(), record);
assertEquals(kinesisInputOutputRecord.getKinesisClientRecord(), kinesisClientRecord);
assertEquals(100L, kinesisInputOutputRecord.getIncomingRecordSizeBytes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import org.opensearch.dataprepper.event.TestEventFactory;
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.plugins.codec.json.NdjsonInputCodec;
import org.opensearch.dataprepper.plugins.codec.json.NdjsonInputConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,16 @@ void testProcessRecordsWithoutAcknowledgementsWithCheckpointApplied()
.sequenceNumber(Integer.toString(1000)).subSequenceNumber(1).build();
List<KinesisClientRecord> kinesisClientRecords = new ArrayList<>();
kinesisClientRecords.add(kinesisClientRecord);
final long recordsSize = kinesisClientRecords.stream()
.map(kclRecord -> kclRecord.data().position())
.mapToLong(Integer::longValue).sum();

records.add(KinesisInputOutputRecord.builder()
.withDataPrepperRecord(record)
.withKinesisClientRecord(kinesisClientRecord).build()
.withIncomingRecordSizeBytes(recordsSize).build()
);
when(processRecordsInput.records()).thenReturn(kinesisClientRecords);

final Long recordsSize = kinesisClientRecords.stream()
.map(kclRecord -> kclRecord.data().position())
.mapToLong(Integer::longValue).sum();

InputStream inputStream = mock(InputStream.class);
when(decompressionEngine.createInputStream(inputStream)).thenReturn(inputStream);
Expand Down Expand Up @@ -267,8 +268,8 @@ void testProcessRecordsWithoutAcknowledgementsWithCheckpointApplied()

verify(acknowledgementSetManager, times(0)).create(any(), any(Duration.class));
verify(recordProcessed, times(1)).increment(anyDouble());
verify(bytesReceivedSummary, times(1)).record(eq(recordsSize.doubleValue()));
verify(bytesProcessedSummary, times(1)).record(eq(recordsSize.doubleValue()));
verify(bytesReceivedSummary, times(1)).record(eq((double) recordsSize));
verify(bytesProcessedSummary, times(1)).record(eq((double) recordsSize));
}

@Test
Expand All @@ -287,15 +288,14 @@ public void testProcessRecordsWithoutAcknowledgementsEnabled()
.sequenceNumber(Integer.toString(1000)).subSequenceNumber(1).build();
List<KinesisClientRecord> kinesisClientRecords = new ArrayList<>();
kinesisClientRecords.add(kinesisClientRecord);
final long recordsSize = kinesisClientRecords.stream()
.map(kclRecord -> kclRecord.data().position())
.mapToLong(Integer::longValue).sum();
records.add(KinesisInputOutputRecord.builder()
.withDataPrepperRecord(record)
.withKinesisClientRecord(kinesisClientRecord).build()
.withIncomingRecordSizeBytes(recordsSize).build()
);

final Long recordsSize = kinesisClientRecords.stream()
.map(kclRecord -> kclRecord.data().position())
.mapToLong(Integer::longValue).sum();

when(processRecordsInput.records()).thenReturn(kinesisClientRecords);
when(kinesisRecordConverter.convert(eq(decompressionEngine), eq(kinesisClientRecords), eq(streamId))).thenReturn(records);

Expand All @@ -322,8 +322,8 @@ public void testProcessRecordsWithoutAcknowledgementsEnabled()

verify(acknowledgementSetManager, times(0)).create(any(), any(Duration.class));
verify(recordProcessed, times(1)).increment(anyDouble());
verify(bytesReceivedSummary, times(1)).record(eq(recordsSize.doubleValue()));
verify(bytesProcessedSummary, times(1)).record(eq(recordsSize.doubleValue()));
verify(bytesReceivedSummary, times(1)).record(eq((double) recordsSize));
verify(bytesProcessedSummary, times(1)).record(eq((double) recordsSize));
}

@Test
Expand Down Expand Up @@ -353,13 +353,13 @@ void testProcessRecordsWithAcknowledgementsEnabled()
List<KinesisClientRecord> kinesisClientRecords = new ArrayList<>();
when(processRecordsInput.records()).thenReturn(kinesisClientRecords);
kinesisClientRecords.add(kinesisClientRecord);
final long recordsSize = kinesisClientRecords.stream()
.map(kclRecord -> kclRecord.data().position())
.mapToLong(Integer::longValue).sum();
records.add(KinesisInputOutputRecord.builder()
.withDataPrepperRecord(record)
.withKinesisClientRecord(kinesisClientRecord).build()
.withIncomingRecordSizeBytes(recordsSize).build()
);
final Long recordsSize = kinesisClientRecords.stream()
.map(kclRecord -> kclRecord.data().position())
.mapToLong(Integer::longValue).sum();
when(kinesisRecordConverter.convert(eq(decompressionEngine), eq(kinesisClientRecords), eq(streamId))).thenReturn(records);

kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig,
Expand Down Expand Up @@ -390,8 +390,8 @@ void testProcessRecordsWithAcknowledgementsEnabled()
verify(acknowledgementSetSuccesses, atLeastOnce()).increment();
verify(recordProcessed, times(1)).increment(anyDouble());
verifyNoInteractions(recordProcessingErrors);
verify(bytesReceivedSummary, times(1)).record(eq(recordsSize.doubleValue()));
verify(bytesProcessedSummary, times(1)).record(eq(recordsSize.doubleValue()));
verify(bytesReceivedSummary, times(1)).record(eq((double) recordsSize));
verify(bytesProcessedSummary, times(1)).record(eq((double) recordsSize));
}

@Test
Expand Down Expand Up @@ -420,13 +420,13 @@ void testProcessRecordsWithNDJsonInputCodec()
List<KinesisClientRecord> kinesisClientRecords = new ArrayList<>();
when(processRecordsInput.records()).thenReturn(kinesisClientRecords);
kinesisClientRecords.add(kinesisClientRecord);
final long recordsSize = kinesisClientRecords.stream()
.map(kclRecord -> kclRecord.data().position())
.mapToLong(Integer::longValue).sum();
records.add(KinesisInputOutputRecord.builder()
.withDataPrepperRecord(record)
.withKinesisClientRecord(kinesisClientRecord).build()
.withIncomingRecordSizeBytes(recordsSize).build()
);
final Long recordsSize = kinesisClientRecords.stream()
.map(kclRecord -> kclRecord.data().position())
.mapToLong(Integer::longValue).sum();
when(kinesisRecordConverter.convert(eq(decompressionEngine), eq(kinesisClientRecords), eq(streamId))).thenReturn(records);

kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig,
Expand Down Expand Up @@ -456,8 +456,8 @@ void testProcessRecordsWithNDJsonInputCodec()

verify(acknowledgementSetManager, times(0)).create(any(), any(Duration.class));
verify(recordProcessed, times(1)).increment(anyDouble());
verify(bytesReceivedSummary, times(1)).record(eq(recordsSize.doubleValue()));
verify(bytesProcessedSummary, times(1)).record(eq(recordsSize.doubleValue()));
verify(bytesReceivedSummary, times(1)).record(eq((double) recordsSize));
verify(bytesProcessedSummary, times(1)).record(eq((double) recordsSize));
}

@Test
Expand All @@ -475,27 +475,26 @@ void testProcessRecordsNoThrowException()
.data(ByteBuffer.wrap(event.toJsonString().getBytes()))
.sequenceNumber(Integer.toString(1000)).subSequenceNumber(1).build();
kinesisClientRecords.add(kinesisClientRecord);
final long recordsSize = kinesisClientRecords.stream()
.map(kclRecord -> kclRecord.data().position())
.mapToLong(Integer::longValue).sum();
records.add(KinesisInputOutputRecord.builder()
.withDataPrepperRecord(record)
.withKinesisClientRecord(kinesisClientRecord).build()
.withIncomingRecordSizeBytes(recordsSize).build()
);
when(kinesisRecordConverter.convert(eq(decompressionEngine), eq(kinesisClientRecords), eq(streamId))).thenReturn(records);
final Throwable exception = mock(RuntimeException.class);
doThrow(exception).when(bufferAccumulator).add(any(Record.class));

final Long recordsSize = kinesisClientRecords.stream()
.map(kclRecord -> kclRecord.data().position())
.mapToLong(Integer::longValue).sum();

kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig,
acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier);
kinesisRecordProcessor.initialize(initializationInput);

assertDoesNotThrow(() -> kinesisRecordProcessor.processRecords(processRecordsInput));
verify(recordProcessingErrors, times(1)).increment();
verify(recordProcessed, times(0)).increment(anyDouble());
verify(bytesReceivedSummary, times(1)).record(eq(recordsSize.doubleValue()));
verify(bytesProcessedSummary, times(0)).record(eq(recordsSize.doubleValue()));
verify(bytesReceivedSummary, times(1)).record(eq((double) recordsSize));
verify(bytesProcessedSummary, times(0)).record(eq((double) recordsSize));
}

@Test
Expand All @@ -513,13 +512,13 @@ void testProcessRecordsBufferFlushNoThrowException()
List<KinesisClientRecord> kinesisClientRecords = new ArrayList<>();
when(processRecordsInput.records()).thenReturn(kinesisClientRecords);
kinesisClientRecords.add(kinesisClientRecord);
final long recordsSize = kinesisClientRecords.stream()
.map(kclRecord -> kclRecord.data().position())
.mapToLong(Integer::longValue).sum();
records.add(KinesisInputOutputRecord.builder()
.withDataPrepperRecord(record)
.withKinesisClientRecord(kinesisClientRecord).build()
.withIncomingRecordSizeBytes(recordsSize).build()
);
final Long recordsSize = kinesisClientRecords.stream()
.map(kclRecord -> kclRecord.data().position())
.mapToLong(Integer::longValue).sum();

when(kinesisRecordConverter.convert(eq(decompressionEngine), eq(kinesisClientRecords), eq(streamId))).thenReturn(records);
final Throwable exception = mock(RuntimeException.class);
Expand All @@ -532,8 +531,8 @@ void testProcessRecordsBufferFlushNoThrowException()
assertDoesNotThrow(() -> kinesisRecordProcessor.processRecords(processRecordsInput));
verify(recordProcessingErrors, times(1)).increment();
verify(recordProcessed, times(0)).increment(anyDouble());
verify(bytesReceivedSummary, times(1)).record(eq(recordsSize.doubleValue()));
verify(bytesProcessedSummary, times(1)).record(eq(recordsSize.doubleValue()));
verify(bytesReceivedSummary, times(1)).record(eq((double) recordsSize));
verify(bytesProcessedSummary, times(1)).record(eq((double) recordsSize));
}

@Test
Expand Down

0 comments on commit d271864

Please sign in to comment.