diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java index 6d10e0636ce..72892f710ed 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java @@ -60,6 +60,7 @@ public class VenicePubsubInputPartitionReader implements PartitionReader>> consumerBuffer = new HashMap<>(); @@ -69,8 +70,7 @@ public VenicePubsubInputPartitionReader(VeniceProperties jobConfig, VenicePubsub this( jobConfig, inputPartition, - new PubSubClientsFactory(jobConfig).getConsumerAdapterFactory() // need to review the - // properties bag ... + new PubSubClientsFactory(jobConfig).getConsumerAdapterFactory() .create( jobConfig, false, @@ -125,6 +125,7 @@ private void initialize() { // if it returns a row, it's going to be key and value and offset in the row in that order @Override public InternalRow get() { + recordsDeliveredByGet++; // should return the same row if called multiple times return currentRow; } @@ -149,11 +150,8 @@ public boolean next() { @Override public void close() { pubSubConsumer.close(); - LOGGER.info( - "Consuming ended for Topic: {} , consumed {} records, skipped {} records", - targetPubSubTopicName, - recordsServed, - recordsSkipped); + LOGGER.info("Consuming ended for Topic: {} , consumed {} records,", targetPubSubTopicName, recordsServed); + LOGGER.info(" skipped {} records, gets invoked : {} .", recordsSkipped, recordsDeliveredByGet); } // borrowing Gaojie's code for dealing with empty polls. @@ -188,7 +186,6 @@ private void loadRecords() { throw new RuntimeException("Empty poll after " + retry + " retries"); } messageBuffer.addAll(partitionMessagesBuffer); - } private InternalRow processPubSubMessageToRow(