Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5279,6 +5279,7 @@ public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
====

The error handler can be configured with one or more `RetryListener` s, receiving notifications of retry and recovery progress.
Starting with version 2.8.10, methods for batch listeners were added.

====
[source, java]
Expand All @@ -5294,6 +5295,15 @@ public interface RetryListener {
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}

default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
}

default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
}

default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
}

}
----
====
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.kafka.listener;

import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.function.BiConsumer;

Expand All @@ -27,6 +28,7 @@
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.BackOffExecution;

Expand All @@ -39,9 +41,28 @@
*/
public final class ErrorHandlingUtils {

private static final ThreadLocal<List<RetryListener>> retryListeners = new ThreadLocal<>();

private ErrorHandlingUtils() {
}

/**
* Set the retry listeners.
* @param listeners the listeners.
* @since 2.8.10
*/
public static void setRetryListeners(List<RetryListener> listeners) {
retryListeners.set(listeners);
}

/**
* Clear the retry listeners.
* @since 2.8.10
*/
public static void clearRetryListeners() {
retryListeners.remove();
}

/**
* Retry a complete batch by pausing the consumer and then, in a loop, poll the
* consumer, wait for the next back off, then call the listener. When retries are
Expand All @@ -67,6 +88,9 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
String failed = null;
Set<TopicPartition> assignment = consumer.assignment();
consumer.pause(assignment);
List<RetryListener> listeners = retryListeners.get();
int attempt = 1;
listen(listeners, records, thrownException, attempt++);
if (container instanceof KafkaMessageListenerContainer) {
((KafkaMessageListenerContainer<?, ?>) container).publishConsumerPausedEvent(assignment, "For batch retry");
}
Expand All @@ -88,20 +112,27 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
invokeListener.run();
return;
}
catch (Exception e) {
catch (Exception ex) {
listen(listeners, records, ex, attempt++);
if (failed == null) {
failed = recordsToString(records);
}
String toLog = failed;
logger.debug(e, () -> "Retry failed for: " + toLog);
logger.debug(ex, () -> "Retry failed for: " + toLog);
}
nextBackOff = execution.nextBackOff();
}
try {
recoverer.accept(records, thrownException);
if (listeners != null) {
listeners.forEach(listener -> listener.recovered(records, thrownException));
}
}
catch (Exception e) {
logger.error(e, () -> "Recoverer threw an exception; re-seeking batch");
catch (Exception ex) {
logger.error(ex, () -> "Recoverer threw an exception; re-seeking batch");
if (listeners != null) {
listeners.forEach(listener -> listener.recoveryFailed(records, thrownException, ex));
}
seeker.handleBatch(thrownException, records, consumer, container, () -> { });
}
}
Expand All @@ -114,6 +145,14 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
}
}

private static void listen(@Nullable List<RetryListener> listeners, ConsumerRecords<?, ?> records,
Exception thrownException, int attempt) {

if (listeners != null) {
listeners.forEach(listener -> listener.failedDelivery(records, thrownException, attempt));
}
}

/**
* Represent the records as a comma-delimited String of {@code topic-part@offset}.
* @param records the records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,6 @@ public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception
this(recoverer, backOff, null, fallbackHandler);
}

/**
* Return the fallback batch error handler.
* @return the handler.
* @since 2.8.8
*/
protected CommonErrorHandler getFallbackBatchHandler() {
return this.fallbackBatchHandler;
}


/**
* Construct an instance with the provided properties.
* @param recoverer the recoverer.
Expand All @@ -94,6 +84,15 @@ public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception
this.fallbackBatchHandler = fallbackHandler;
}

/**
* Return the fallback batch error handler.
* @return the handler.
* @since 2.8.8
*/
protected CommonErrorHandler getFallbackBatchHandler() {
return this.fallbackBatchHandler;
}

protected void doHandle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
MessageListenerContainer container, Runnable invokeListener) {

Expand All @@ -105,17 +104,18 @@ protected <K, V> ConsumerRecords<K, V> handle(Exception thrownException, Consume

BatchListenerFailedException batchListenerFailedException = getBatchListenerFailedException(thrownException);
if (batchListenerFailedException == null) {
this.logger.debug(thrownException, "Expected a BatchListenerFailedException; re-seeking batch");
this.fallbackBatchHandler.handleBatch(thrownException, data, consumer, container, invokeListener);
this.logger.debug(thrownException, "Expected a BatchListenerFailedException; re-delivering full batch");
fallback(thrownException, data, consumer, container, invokeListener);
}
else {
getRetryListeners().forEach(listener -> listener.failedDelivery(data, thrownException, 1));
ConsumerRecord<?, ?> record = batchListenerFailedException.getRecord();
int index = record != null ? findIndex(data, record) : batchListenerFailedException.getIndex();
if (index < 0 || index >= data.count()) {
this.logger.warn(batchListenerFailedException, () ->
String.format("Record not found in batch: %s-%d@%d; re-seeking batch",
record.topic(), record.partition(), record.offset()));
this.fallbackBatchHandler.handleBatch(thrownException, data, consumer, container, invokeListener);
fallback(thrownException, data, consumer, container, invokeListener);
}
else {
return seekOrRecover(thrownException, data, consumer, container, index);
Expand All @@ -124,6 +124,18 @@ protected <K, V> ConsumerRecords<K, V> handle(Exception thrownException, Consume
return ConsumerRecords.empty();
}

private void fallback(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
MessageListenerContainer container, Runnable invokeListener) {

ErrorHandlingUtils.setRetryListeners(getRetryListeners());
try {
this.fallbackBatchHandler.handleBatch(thrownException, data, consumer, container, invokeListener);
}
finally {
ErrorHandlingUtils.clearRetryListeners();
}
}

private int findIndex(ConsumerRecords<?, ?> data, ConsumerRecord<?, ?> record) {
if (record == null) {
return -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.kafka.listener;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -54,6 +56,8 @@ public abstract class FailedRecordProcessor extends ExceptionClassifier implemen

private final FailedRecordTracker failureTracker;

private final List<RetryListener> retryListeners = new ArrayList<>();

private boolean commitRecovered;

private BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> userBackOffFunction = (rec, ex) -> null;
Expand Down Expand Up @@ -130,7 +134,14 @@ public void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange)
* @since 2.7
*/
public void setRetryListeners(RetryListener... listeners) {
Assert.noNullElements(listeners, "'listeners' cannot have null elements");
this.failureTracker.setRetryListeners(listeners);
this.retryListeners.clear();
this.retryListeners.addAll(Arrays.asList(listeners));
}

protected List<RetryListener> getRetryListeners() {
return this.retryListeners;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 the original author or authors.
* Copyright 2021-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.kafka.listener;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

/**
* A listener for retry activity.
Expand Down Expand Up @@ -53,4 +54,31 @@ default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}

/**
* Called after a delivery failed for batch records.
* @param records the records.
* @param ex the exception.
* @param deliveryAttempt the delivery attempt, if available.
* @since 2.8.10
*/
default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
}

/**
* Called after a failing record was successfully recovered.
* @param records the record.
* @param ex the exception.
*/
default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
}

/**
* Called after a recovery attempt failed.
* @param records the record.
* @param original the original exception causing the recovery attempt.
* @param failure the exception thrown by the recoverer.
*/
default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 the original author or authors.
* Copyright 2020-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,10 +19,12 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.time.Duration;
Expand Down Expand Up @@ -205,6 +207,33 @@ records, mockConsumer, mock(MessageListenerContainer.class), () -> { }))
verify(mockConsumer).seek(tp, 0L);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
void fallbackListener() {
Consumer mockConsumer = mock(Consumer.class);
ConsumerRecordRecoverer recoverer = mock(ConsumerRecordRecoverer.class);
DefaultErrorHandler beh = new DefaultErrorHandler(recoverer, new FixedBackOff(0, 2));
RetryListener retryListener = mock(RetryListener.class);
beh.setRetryListeners(retryListener);
TopicPartition tp = new TopicPartition("foo", 0);
ConsumerRecords<?, ?> records = new ConsumerRecords(Collections.singletonMap(tp,
List.of(new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo",
new RecordHeaders(), Optional.empty()),
new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo",
new RecordHeaders(), Optional.empty()))));
MessageListenerContainer container = mock(MessageListenerContainer.class);
given(container.isRunning()).willReturn(true);
beh.handleBatch(new ListenerExecutionFailedException("test"),
records, mockConsumer, container, () -> {
throw new ListenerExecutionFailedException("test");
});
verify(retryListener).failedDelivery(any(ConsumerRecords.class), any(), eq(1));
verify(retryListener).failedDelivery(any(ConsumerRecords.class), any(), eq(2));
verify(retryListener).failedDelivery(any(ConsumerRecords.class), any(), eq(3));
verify(recoverer, times(2)).accept(any(), any()); // each record in batch
verify(retryListener).recovered(any(ConsumerRecords.class), any());
}

@Configuration
@EnableKafka
public static class Config {
Expand Down