From 04f9aa9dc60922e8f5a752dabbf54c03ac6ee0f5 Mon Sep 17 00:00:00 2001 From: junaiddshaukat Date: Sun, 15 Feb 2026 00:43:37 +0500 Subject: [PATCH] Add retry logic for transient commit failures in KafkaCommitOffset Resolves #37606. Adds exponential backoff retry for retriable Kafka exceptions during offset commits, while preserving warn-and-skip behavior for non-retriable errors. --- .../beam/sdk/io/kafka/KafkaCommitOffset.java | 42 ++++++++++--- .../sdk/io/kafka/KafkaCommitOffsetTest.java | 61 +++++++++++++++++++ 2 files changed, 95 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java index ac6650c354d4..82f14ee16eb3 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java @@ -41,6 +41,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.errors.RetriableException; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.joda.time.Duration; import org.joda.time.Instant; @@ -62,6 +63,8 @@ public class KafkaCommitOffset static class CommitOffsetDoFn extends DoFn, Void> { private static final Logger LOG = LoggerFactory.getLogger(CommitOffsetDoFn.class); + private static final int MAX_RETRIES = 3; + private static final long INITIAL_BACKOFF_MS = 500; private final Map consumerConfig; private final SerializableFunction, Consumer> consumerFactoryFn; @@ -79,14 +82,37 @@ public void processElement(@Element KV element) { Map updatedConsumerConfig = overrideBootstrapServersConfig(consumerConfig, element.getKey()); try (Consumer consumer = consumerFactoryFn.apply(updatedConsumerConfig)) { - try { - consumer.commitSync( - Collections.singletonMap( - element.getKey().getTopicPartition(), - new OffsetAndMetadata(element.getValue() + 1))); - } catch (Exception e) { - // TODO: consider retrying. - LOG.warn("Getting exception when committing offset: {}", e.getMessage()); + for (int attempt = 0; attempt <= MAX_RETRIES; attempt++) { + try { + consumer.commitSync( + Collections.singletonMap( + element.getKey().getTopicPartition(), + new OffsetAndMetadata(element.getValue() + 1))); + return; + } catch (RetriableException e) { + if (attempt < MAX_RETRIES) { + long backoffMs = INITIAL_BACKOFF_MS * (1L << attempt); + LOG.warn( + "Retriable exception committing offset (attempt {}/{}), retrying in {} ms: {}", + attempt + 1, + MAX_RETRIES + 1, + backoffMs, + e.getMessage()); + try { + Thread.sleep(backoffMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while retrying offset commit: {}", ie.getMessage()); + return; + } + } else { + LOG.warn( + "Failed to commit offset after {} attempts: {}", MAX_RETRIES + 1, e.getMessage()); + } + } catch (Exception e) { + LOG.warn("Getting exception when committing offset: {}", e.getMessage()); + return; + } } } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffsetTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffsetTest.java index c16e25510ab8..16fec6af2e5f 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffsetTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffsetTest.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -41,6 +42,7 @@ import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.RetriableCommitFailedException; import org.apache.kafka.common.TopicPartition; import org.junit.Assert; import org.junit.Rule; @@ -59,6 +61,8 @@ public class KafkaCommitOffsetTest { new KafkaCommitOffsetMockConsumer(null, false); private final KafkaCommitOffsetMockConsumer errorConsumer = new KafkaCommitOffsetMockConsumer(null, true); + private final KafkaRetriableMockConsumer retriableConsumer = + new KafkaRetriableMockConsumer(null, 2); private static final KafkaCommitOffsetMockConsumer COMPOSITE_CONSUMER = new KafkaCommitOffsetMockConsumer(null, false); @@ -188,6 +192,32 @@ private void testKafkaOffsetHelper(boolean use259Implementation) Assert.assertEquals(expectedOffsets, COMPOSITE_CONSUMER_BOOTSTRAP.commitOffsets); } + @Test + public void testCommitOffsetRetriableErrorSucceedsAfterRetry() { + Map configMap = new HashMap<>(); + configMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); + + ReadSourceDescriptors descriptors = + ReadSourceDescriptors.read() + .withBootstrapServers("bootstrap_server") + .withConsumerConfigUpdates(configMap) + .withConsumerFactoryFn( + (SerializableFunction, Consumer>) + input -> { + Assert.assertEquals("group1", input.get(ConsumerConfig.GROUP_ID_CONFIG)); + return retriableConsumer; + }); + CommitOffsetDoFn doFn = new CommitOffsetDoFn(descriptors); + + final TopicPartition partition = new TopicPartition("topic", 0); + doFn.processElement( + KV.of(KafkaSourceDescriptor.of(partition, null, null, null, null, null), 5L)); + + expectedLogs.verifyWarn("Retriable exception committing offset (attempt 1/4)"); + expectedLogs.verifyWarn("Retriable exception committing offset (attempt 2/4)"); + Assert.assertEquals(6L, (long) retriableConsumer.commitOffsets.get(partition)); + } + @Test public void testCommitOffsetError() { Map configMap = new HashMap<>(); @@ -240,4 +270,35 @@ public synchronized void close(long timeout, TimeUnit unit) { // Ignore closing since we're using a single consumer. } } + + /** + * A mock consumer that throws {@link RetriableCommitFailedException} for the first N attempts, + * then succeeds. + */ + private static class KafkaRetriableMockConsumer extends MockConsumer { + + public final HashMap commitOffsets = new HashMap<>(); + private final int failuresBeforeSuccess; + private final AtomicInteger attemptCount = new AtomicInteger(0); + + public KafkaRetriableMockConsumer( + OffsetResetStrategy offsetResetStrategy, int failuresBeforeSuccess) { + super(offsetResetStrategy); + this.failuresBeforeSuccess = failuresBeforeSuccess; + } + + @Override + public synchronized void commitSync(Map offsets) { + if (attemptCount.getAndIncrement() < failuresBeforeSuccess) { + throw new RetriableCommitFailedException("Transient failure"); + } + commitAsync(offsets, null); + offsets.forEach((topic, offsetMetadata) -> commitOffsets.put(topic, offsetMetadata.offset())); + } + + @Override + public synchronized void close(long timeout, TimeUnit unit) { + // Ignore closing since we're using a single consumer. + } + } }