diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index d3824038fbc0..866dfd487108 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -157,7 +157,7 @@ public boolean advance() throws IOException { */ while (true) { if (curBatch.hasNext()) { - // Initalize metrics container. + // Initialize metrics container. kafkaResults = KafkaSinkMetrics.kafkaMetrics(); PartitionState pState = curBatch.next(); @@ -374,6 +374,7 @@ public boolean offsetBasedDeduplicationSupported() { private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT_MIN = Duration.millis(1); private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT_MAX = Duration.millis(20); private static final Duration RECORDS_ENQUEUE_POLL_TIMEOUT = Duration.millis(100); + private static final Duration MIN_COMMIT_FAIL_LOG_INTERVAL = Duration.standardMinutes(10); // Use a separate thread to read Kafka messages. Kafka Consumer does all its work including // network I/O inside poll(). Polling only inside #advance(), especially with a small timeout @@ -392,6 +393,7 @@ public boolean offsetBasedDeduplicationSupported() { private AtomicReference<@Nullable KafkaCheckpointMark> finalizedCheckpointMark = new AtomicReference<>(); private AtomicBoolean closed = new AtomicBoolean(false); + private Instant nextAllowedCommitFailLogTime = Instant.ofEpochMilli(0); // Backlog support : // Kafka consumer does not have an API to fetch latest offset for topic. We need to seekToEnd() @@ -612,6 +614,7 @@ private void commitCheckpointMark() { if (checkpointMark != null) { LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark); Consumer consumer = Preconditions.checkStateNotNull(this.consumer); + Instant now = Instant.now(); try { consumer.commitSync( @@ -621,11 +624,24 @@ private void commitCheckpointMark() { Collectors.toMap( p -> new TopicPartition(p.getTopic(), p.getPartition()), p -> new OffsetAndMetadata(p.getNextOffset())))); + nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL); } catch (Exception e) { // Log but ignore the exception. Committing consumer offsets to Kafka is not critical for // KafkaIO because it relies on the offsets stored in KafkaCheckpointMark. - LOG.warn( - String.format("%s: Could not commit finalized checkpoint %s", this, checkpointMark), e); + if (now.isAfter(nextAllowedCommitFailLogTime)) { + LOG.warn( + String.format( + "%s: Did not successfully commit finalized checkpoint for > %s. Current checkpoint: %s", + this, MIN_COMMIT_FAIL_LOG_INTERVAL, checkpointMark), + e); + nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL); + } else { + LOG.info( + String.format( + "%s: Could not commit finalized checkpoint. Commit will be retried with subsequent reads. Current checkpoint: %s", + this, checkpointMark), + e); + } } } }