Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.errors.RetriableException;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand All @@ -62,6 +63,8 @@ public class KafkaCommitOffset<K, V>

static class CommitOffsetDoFn extends DoFn<KV<KafkaSourceDescriptor, Long>, Void> {
private static final Logger LOG = LoggerFactory.getLogger(CommitOffsetDoFn.class);
private static final int MAX_RETRIES = 3;
private static final long INITIAL_BACKOFF_MS = 500;
private final Map<String, Object> consumerConfig;
private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
consumerFactoryFn;
Expand All @@ -79,14 +82,37 @@ public void processElement(@Element KV<KafkaSourceDescriptor, Long> element) {
Map<String, Object> updatedConsumerConfig =
overrideBootstrapServersConfig(consumerConfig, element.getKey());
try (Consumer<byte[], byte[]> consumer = consumerFactoryFn.apply(updatedConsumerConfig)) {
try {
consumer.commitSync(
Collections.singletonMap(
element.getKey().getTopicPartition(),
new OffsetAndMetadata(element.getValue() + 1)));
} catch (Exception e) {
// TODO: consider retrying.
LOG.warn("Getting exception when committing offset: {}", e.getMessage());
for (int attempt = 0; attempt <= MAX_RETRIES; attempt++) {
try {
consumer.commitSync(
Collections.singletonMap(
element.getKey().getTopicPartition(),
new OffsetAndMetadata(element.getValue() + 1)));
return;
} catch (RetriableException e) {
if (attempt < MAX_RETRIES) {
long backoffMs = INITIAL_BACKOFF_MS * (1L << attempt);
LOG.warn(
"Retriable exception committing offset (attempt {}/{}), retrying in {} ms: {}",
attempt + 1,
MAX_RETRIES + 1,
backoffMs,
e.getMessage());
try {
Thread.sleep(backoffMs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted while retrying offset commit: {}", ie.getMessage());
return;
}
} else {
LOG.warn(
"Failed to commit offset after {} attempts: {}", MAX_RETRIES + 1, e.getMessage());
}
} catch (Exception e) {
LOG.warn("Getting exception when committing offset: {}", e.getMessage());
return;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
Expand All @@ -41,6 +42,7 @@
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.common.TopicPartition;
import org.junit.Assert;
import org.junit.Rule;
Expand All @@ -59,6 +61,8 @@ public class KafkaCommitOffsetTest {
new KafkaCommitOffsetMockConsumer(null, false);
private final KafkaCommitOffsetMockConsumer errorConsumer =
new KafkaCommitOffsetMockConsumer(null, true);
private final KafkaRetriableMockConsumer retriableConsumer =
new KafkaRetriableMockConsumer(null, 2);

private static final KafkaCommitOffsetMockConsumer COMPOSITE_CONSUMER =
new KafkaCommitOffsetMockConsumer(null, false);
Expand Down Expand Up @@ -188,6 +192,32 @@ private void testKafkaOffsetHelper(boolean use259Implementation)
Assert.assertEquals(expectedOffsets, COMPOSITE_CONSUMER_BOOTSTRAP.commitOffsets);
}

@Test
public void testCommitOffsetRetriableErrorSucceedsAfterRetry() {
Map<String, Object> configMap = new HashMap<>();
configMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");

ReadSourceDescriptors<Object, Object> descriptors =
ReadSourceDescriptors.read()
.withBootstrapServers("bootstrap_server")
.withConsumerConfigUpdates(configMap)
.withConsumerFactoryFn(
(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>)
input -> {
Assert.assertEquals("group1", input.get(ConsumerConfig.GROUP_ID_CONFIG));
return retriableConsumer;
});
CommitOffsetDoFn doFn = new CommitOffsetDoFn(descriptors);

final TopicPartition partition = new TopicPartition("topic", 0);
doFn.processElement(
KV.of(KafkaSourceDescriptor.of(partition, null, null, null, null, null), 5L));

expectedLogs.verifyWarn("Retriable exception committing offset (attempt 1/4)");
expectedLogs.verifyWarn("Retriable exception committing offset (attempt 2/4)");
Assert.assertEquals(6L, (long) retriableConsumer.commitOffsets.get(partition));
}

@Test
public void testCommitOffsetError() {
Map<String, Object> configMap = new HashMap<>();
Expand Down Expand Up @@ -240,4 +270,35 @@ public synchronized void close(long timeout, TimeUnit unit) {
// Ignore closing since we're using a single consumer.
}
}

/**
* A mock consumer that throws {@link RetriableCommitFailedException} for the first N attempts,
* then succeeds.
*/
private static class KafkaRetriableMockConsumer extends MockConsumer<byte[], byte[]> {

public final HashMap<TopicPartition, Long> commitOffsets = new HashMap<>();
private final int failuresBeforeSuccess;
private final AtomicInteger attemptCount = new AtomicInteger(0);

public KafkaRetriableMockConsumer(
OffsetResetStrategy offsetResetStrategy, int failuresBeforeSuccess) {
super(offsetResetStrategy);
this.failuresBeforeSuccess = failuresBeforeSuccess;
}

@Override
public synchronized void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
if (attemptCount.getAndIncrement() < failuresBeforeSuccess) {
throw new RetriableCommitFailedException("Transient failure");
}
commitAsync(offsets, null);
offsets.forEach((topic, offsetMetadata) -> commitOffsets.put(topic, offsetMetadata.offset()));
}

@Override
public synchronized void close(long timeout, TimeUnit unit) {
// Ignore closing since we're using a single consumer.
}
}
}
Loading