Skip to content

Commit

Permalink
tracking metrics for delivered records.
Browse files Browse the repository at this point in the history
  • Loading branch information
eldernewborn committed Nov 12, 2024
1 parent a02e5df commit 12f1777
Showing 1 changed file with 5 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class VenicePubsubInputPartitionReader implements PartitionReader<Interna
private long recordsServed = 0;
private long recordsSkipped = 0;
private long lastKnownProgressPercent = 0;
private long recordsDeliveredByGet = 0;

private Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> consumerBuffer =
new HashMap<>();
Expand All @@ -69,8 +70,7 @@ public VenicePubsubInputPartitionReader(VeniceProperties jobConfig, VenicePubsub
this(
jobConfig,
inputPartition,
new PubSubClientsFactory(jobConfig).getConsumerAdapterFactory() // need to review the
// properties bag ...
new PubSubClientsFactory(jobConfig).getConsumerAdapterFactory()
.create(
jobConfig,
false,
Expand Down Expand Up @@ -125,6 +125,7 @@ private void initialize() {
// if it returns a row, it's going to be key and value and offset in the row in that order
@Override
public InternalRow get() {
recordsDeliveredByGet++;
// should return the same row if called multiple times
return currentRow;
}
Expand All @@ -149,11 +150,8 @@ public boolean next() {
@Override
public void close() {
pubSubConsumer.close();
LOGGER.info(
"Consuming ended for Topic: {} , consumed {} records, skipped {} records",
targetPubSubTopicName,
recordsServed,
recordsSkipped);
LOGGER.info("Consuming ended for Topic: {} , consumed {} records,", targetPubSubTopicName, recordsServed);
LOGGER.info(" skipped {} records, gets invoked : {} .", recordsSkipped, recordsDeliveredByGet);
}

// borrowing Gaojie's code for dealing with empty polls.
Expand Down Expand Up @@ -188,7 +186,6 @@ private void loadRecords() {
throw new RuntimeException("Empty poll after " + retry + " retries");
}
messageBuffer.addAll(partitionMessagesBuffer);

}

private InternalRow processPubSubMessageToRow(
Expand Down

0 comments on commit 12f1777

Please sign in to comment.