Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka time stamp fix #5108

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class KafkaCustomConsumer implements Runnable, ConsumerRebalanceListener
private final LogRateLimiter errLogRateLimiter;
private final ByteDecoder byteDecoder;
private final long maxRetriesOnException;
private final Map<Integer, Long> partitionToLastReceivedTimestampMillis;

public KafkaCustomConsumer(final KafkaConsumer consumer,
final AtomicBoolean shutdownInProgress,
Expand All @@ -122,6 +123,7 @@ public KafkaCustomConsumer(final KafkaConsumer consumer,
this.pauseConsumePredicate = pauseConsumePredicate;
this.topicMetrics.register(consumer);
this.offsetsToCommit = new HashMap<>();
this.partitionToLastReceivedTimestampMillis = new HashMap<>();
this.ownedPartitionsEpoch = new HashMap<>();
this.metricsUpdatedTime = Instant.now().getEpochSecond();
this.acknowledgedOffsets = new ArrayList<>();
Expand All @@ -142,6 +144,22 @@ KafkaTopicConsumerMetrics getTopicMetrics() {
return topicMetrics;
}

<T> long getRecordTimeStamp(final ConsumerRecord<String, T> consumerRecord, final long nowMs) {
final long timestamp = consumerRecord.timestamp();
int partition = consumerRecord.partition();
if (timestamp > nowMs) {
topicMetrics.getNumberOfInvalidTimeStamps().increment();
if (partitionToLastReceivedTimestampMillis.containsKey(partition)) {
return partitionToLastReceivedTimestampMillis.get(partition);
} else {
return nowMs;
}
} else {
partitionToLastReceivedTimestampMillis.put(partition, timestamp);
return timestamp;
}
}

private long getCurrentTimeNanos() {
Instant now = Instant.now();
return now.getEpochSecond()*1000000000+now.getNano();
Expand Down Expand Up @@ -436,12 +454,13 @@ private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord, in
}
eventMetadata.setAttribute("kafka_headers", headerData);
}
eventMetadata.setAttribute("kafka_timestamp", consumerRecord.timestamp());
final long receivedTimeStamp = getRecordTimeStamp(consumerRecord, now.toEpochMilli());
eventMetadata.setAttribute("kafka_timestamp", receivedTimeStamp);
eventMetadata.setAttribute("kafka_timestamp_type", consumerRecord.timestampType().toString());
eventMetadata.setAttribute("kafka_topic", topicName);
eventMetadata.setAttribute("kafka_partition", String.valueOf(partition));
eventMetadata.setExternalOriginationTime(Instant.ofEpochMilli(consumerRecord.timestamp()));
event.getEventHandle().setExternalOriginationTime(Instant.ofEpochMilli(consumerRecord.timestamp()));
eventMetadata.setExternalOriginationTime(Instant.ofEpochMilli(receivedTimeStamp));
event.getEventHandle().setExternalOriginationTime(Instant.ofEpochMilli(receivedTimeStamp));

return new Record<Event>(event);
}
Expand Down Expand Up @@ -511,7 +530,9 @@ private <T> void iterateRecordPartitions(ConsumerRecords<String, T> records, fin
if (schema == MessageFormat.BYTES) {
InputStream inputStream = new ByteArrayInputStream((byte[])consumerRecord.value());
if(byteDecoder != null) {
byteDecoder.parse(inputStream, Instant.ofEpochMilli(consumerRecord.timestamp()), (record) -> {
final long receivedTimeStamp = getRecordTimeStamp(consumerRecord, Instant.now().toEpochMilli());

byteDecoder.parse(inputStream, Instant.ofEpochMilli(receivedTimeStamp), (record) -> {
processRecord(acknowledgementSet, record);
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class KafkaTopicConsumerMetrics {
static final String NUMBER_OF_RECORDS_FAILED_TO_PARSE = "numberOfRecordsFailedToParse";
static final String NUMBER_OF_DESERIALIZATION_ERRORS = "numberOfDeserializationErrors";
static final String NUMBER_OF_BUFFER_SIZE_OVERFLOWS = "numberOfBufferSizeOverflows";
static final String NUMBER_OF_INVALID_TIMESTAMPS = "numberOfInvalidTimeStamps";
static final String NUMBER_OF_POLL_AUTH_ERRORS = "numberOfPollAuthErrors";
static final String NUMBER_OF_RECORDS_COMMITTED = "numberOfRecordsCommitted";
static final String NUMBER_OF_RECORDS_CONSUMED = "numberOfRecordsConsumed";
Expand All @@ -38,6 +39,7 @@ public class KafkaTopicConsumerMetrics {
private final Counter numberOfDeserializationErrors;
private final Counter numberOfBufferSizeOverflows;
private final Counter numberOfPollAuthErrors;
private final Counter numberOfInvalidTimeStamps;
private final Counter numberOfRecordsCommitted;
private final Counter numberOfRecordsConsumed;
private final Counter numberOfBytesConsumed;
Expand All @@ -53,6 +55,7 @@ public KafkaTopicConsumerMetrics(final String topicName, final PluginMetrics plu
this.numberOfBytesConsumed = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_BYTES_CONSUMED, topicNameInMetrics));
this.numberOfRecordsCommitted = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_RECORDS_COMMITTED, topicNameInMetrics));
this.numberOfRecordsFailedToParse = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_RECORDS_FAILED_TO_PARSE, topicNameInMetrics));
this.numberOfInvalidTimeStamps = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_INVALID_TIMESTAMPS, topicNameInMetrics));
this.numberOfDeserializationErrors = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_DESERIALIZATION_ERRORS, topicNameInMetrics));
this.numberOfBufferSizeOverflows = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_BUFFER_SIZE_OVERFLOWS, topicNameInMetrics));
this.numberOfPollAuthErrors = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_POLL_AUTH_ERRORS, topicNameInMetrics));
Expand Down Expand Up @@ -151,6 +154,10 @@ public Counter getNumberOfNegativeAcknowledgements() {
return numberOfNegativeAcknowledgements;
}

public Counter getNumberOfInvalidTimeStamps() {
return numberOfInvalidTimeStamps;
}

public Counter getNumberOfPositiveAcknowledgements() {
return numberOfPositiveAcknowledgements;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -51,6 +52,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasEntry;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -138,6 +140,7 @@ public void setUp() {
when(topicMetrics.getNumberOfBufferSizeOverflows()).thenReturn(overflowCounter);
when(topicMetrics.getNumberOfRecordsCommitted()).thenReturn(counter);
when(topicMetrics.getNumberOfDeserializationErrors()).thenReturn(counter);
when(topicMetrics.getNumberOfInvalidTimeStamps()).thenReturn(counter);
when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1));
when(topicConfig.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT);
when(topicConfig.getAutoCommit()).thenReturn(false);
Expand Down Expand Up @@ -196,6 +199,27 @@ private BlockingBuffer<Record<Event>> getBuffer() {
return new BlockingBuffer<>(pluginSetting);
}

@Test
public void testGetRecordTimeStamp() {
ConsumerRecord<String, Object> consumerRecord1 = mock(ConsumerRecord.class);
ConsumerRecord<String, Object> consumerRecord2 = mock(ConsumerRecord.class);
ConsumerRecord<String, Object> consumerRecord3 = mock(ConsumerRecord.class);
consumer = createObjectUnderTestWithMockBuffer("plaintext");
long nowMs = Instant.now().toEpochMilli();
long timestamp1 = nowMs - 5;
when(consumerRecord1.timestamp()).thenReturn(timestamp1);
when(consumerRecord1.partition()).thenReturn(1);
assertThat(consumer.getRecordTimeStamp(consumerRecord1, nowMs), equalTo(timestamp1));
long timestamp2 = nowMs + 5;
when(consumerRecord2.timestamp()).thenReturn(timestamp2);
when(consumerRecord2.partition()).thenReturn(1);
assertThat(consumer.getRecordTimeStamp(consumerRecord2, nowMs), equalTo(timestamp1));
long timestamp3 = nowMs + 10;
when(consumerRecord3.timestamp()).thenReturn(timestamp3);
when(consumerRecord3.partition()).thenReturn(2);
assertThat(consumer.getRecordTimeStamp(consumerRecord3, nowMs), equalTo(nowMs));
}

@Test
public void testBufferOverflowPauseResume() throws InterruptedException, Exception {
when(topicConfig.getMaxPollInterval()).thenReturn(Duration.ofMillis(4000));
Expand Down
Loading