diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index 3a12a2e2b1..badfba6dbf 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -5279,6 +5279,7 @@ public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) { ==== The error handler can be configured with one or more `RetryListener` s, receiving notifications of retry and recovery progress. +Starting with version 2.8.10, methods for batch listeners were added. ==== [source, java] @@ -5294,6 +5295,15 @@ public interface RetryListener { default void recoveryFailed(ConsumerRecord record, Exception original, Exception failure) { } + default void failedDelivery(ConsumerRecords records, Exception ex, int deliveryAttempt) { + } + + default void recovered(ConsumerRecords records, Exception ex) { + } + + default void recoveryFailed(ConsumerRecords records, Exception original, Exception failure) { + } + } ---- ==== diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java index b4014013b9..574fdf56e0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java @@ -17,6 +17,7 @@ package org.springframework.kafka.listener; import java.time.Duration; +import java.util.List; import java.util.Set; import java.util.function.BiConsumer; @@ -27,6 +28,7 @@ import org.springframework.core.log.LogAccessor; import org.springframework.kafka.KafkaException; import org.springframework.kafka.support.KafkaUtils; +import org.springframework.lang.Nullable; import org.springframework.util.backoff.BackOff; import org.springframework.util.backoff.BackOffExecution; @@ -39,9 +41,28 @@ */ public final class ErrorHandlingUtils { + private static final ThreadLocal> retryListeners = new ThreadLocal<>(); + private ErrorHandlingUtils() { } + /** + * Set the retry listeners. + * @param listeners the listeners. + * @since 2.8.10 + */ + public static void setRetryListeners(List listeners) { + retryListeners.set(listeners); + } + + /** + * Clear the retry listeners. + * @since 2.8.10 + */ + public static void clearRetryListeners() { + retryListeners.remove(); + } + /** * Retry a complete batch by pausing the consumer and then, in a loop, poll the * consumer, wait for the next back off, then call the listener. When retries are @@ -67,6 +88,9 @@ public static void retryBatch(Exception thrownException, ConsumerRecords r String failed = null; Set assignment = consumer.assignment(); consumer.pause(assignment); + List listeners = retryListeners.get(); + int attempt = 1; + listen(listeners, records, thrownException, attempt++); if (container instanceof KafkaMessageListenerContainer) { ((KafkaMessageListenerContainer) container).publishConsumerPausedEvent(assignment, "For batch retry"); } @@ -88,20 +112,27 @@ public static void retryBatch(Exception thrownException, ConsumerRecords r invokeListener.run(); return; } - catch (Exception e) { + catch (Exception ex) { + listen(listeners, records, ex, attempt++); if (failed == null) { failed = recordsToString(records); } String toLog = failed; - logger.debug(e, () -> "Retry failed for: " + toLog); + logger.debug(ex, () -> "Retry failed for: " + toLog); } nextBackOff = execution.nextBackOff(); } try { recoverer.accept(records, thrownException); + if (listeners != null) { + listeners.forEach(listener -> listener.recovered(records, thrownException)); + } } - catch (Exception e) { - logger.error(e, () -> "Recoverer threw an exception; re-seeking batch"); + catch (Exception ex) { + logger.error(ex, () -> "Recoverer threw an exception; re-seeking batch"); + if (listeners != null) { + listeners.forEach(listener -> listener.recoveryFailed(records, thrownException, ex)); + } seeker.handleBatch(thrownException, records, consumer, container, () -> { }); } } @@ -114,6 +145,14 @@ public static void retryBatch(Exception thrownException, ConsumerRecords r } } + private static void listen(@Nullable List listeners, ConsumerRecords records, + Exception thrownException, int attempt) { + + if (listeners != null) { + listeners.forEach(listener -> listener.failedDelivery(records, thrownException, attempt)); + } + } + /** * Represent the records as a comma-delimited String of {@code topic-part@offset}. * @param records the records. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java index 328396fe3b..9c17baa97a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java @@ -69,16 +69,6 @@ public FailedBatchProcessor(@Nullable BiConsumer, Exception this(recoverer, backOff, null, fallbackHandler); } - /** - * Return the fallback batch error handler. - * @return the handler. - * @since 2.8.8 - */ - protected CommonErrorHandler getFallbackBatchHandler() { - return this.fallbackBatchHandler; - } - - /** * Construct an instance with the provided properties. * @param recoverer the recoverer. @@ -94,6 +84,15 @@ public FailedBatchProcessor(@Nullable BiConsumer, Exception this.fallbackBatchHandler = fallbackHandler; } + /** + * Return the fallback batch error handler. + * @return the handler. + * @since 2.8.8 + */ + protected CommonErrorHandler getFallbackBatchHandler() { + return this.fallbackBatchHandler; + } + protected void doHandle(Exception thrownException, ConsumerRecords data, Consumer consumer, MessageListenerContainer container, Runnable invokeListener) { @@ -105,17 +104,18 @@ protected ConsumerRecords handle(Exception thrownException, Consume BatchListenerFailedException batchListenerFailedException = getBatchListenerFailedException(thrownException); if (batchListenerFailedException == null) { - this.logger.debug(thrownException, "Expected a BatchListenerFailedException; re-seeking batch"); - this.fallbackBatchHandler.handleBatch(thrownException, data, consumer, container, invokeListener); + this.logger.debug(thrownException, "Expected a BatchListenerFailedException; re-delivering full batch"); + fallback(thrownException, data, consumer, container, invokeListener); } else { + getRetryListeners().forEach(listener -> listener.failedDelivery(data, thrownException, 1)); ConsumerRecord record = batchListenerFailedException.getRecord(); int index = record != null ? findIndex(data, record) : batchListenerFailedException.getIndex(); if (index < 0 || index >= data.count()) { this.logger.warn(batchListenerFailedException, () -> String.format("Record not found in batch: %s-%d@%d; re-seeking batch", record.topic(), record.partition(), record.offset())); - this.fallbackBatchHandler.handleBatch(thrownException, data, consumer, container, invokeListener); + fallback(thrownException, data, consumer, container, invokeListener); } else { return seekOrRecover(thrownException, data, consumer, container, index); @@ -124,6 +124,18 @@ protected ConsumerRecords handle(Exception thrownException, Consume return ConsumerRecords.empty(); } + private void fallback(Exception thrownException, ConsumerRecords data, Consumer consumer, + MessageListenerContainer container, Runnable invokeListener) { + + ErrorHandlingUtils.setRetryListeners(getRetryListeners()); + try { + this.fallbackBatchHandler.handleBatch(thrownException, data, consumer, container, invokeListener); + } + finally { + ErrorHandlingUtils.clearRetryListeners(); + } + } + private int findIndex(ConsumerRecords data, ConsumerRecord record) { if (record == null) { return -1; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java index f47437e722..5cdc652219 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java @@ -16,6 +16,8 @@ package org.springframework.kafka.listener; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.function.BiConsumer; import java.util.function.BiFunction; @@ -54,6 +56,8 @@ public abstract class FailedRecordProcessor extends ExceptionClassifier implemen private final FailedRecordTracker failureTracker; + private final List retryListeners = new ArrayList<>(); + private boolean commitRecovered; private BiFunction, Exception, BackOff> userBackOffFunction = (rec, ex) -> null; @@ -130,7 +134,14 @@ public void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange) * @since 2.7 */ public void setRetryListeners(RetryListener... listeners) { + Assert.noNullElements(listeners, "'listeners' cannot have null elements"); this.failureTracker.setRetryListeners(listeners); + this.retryListeners.clear(); + this.retryListeners.addAll(Arrays.asList(listeners)); + } + + protected List getRetryListeners() { + return this.retryListeners; } /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/RetryListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/RetryListener.java index e3f7df2bb3..9c8bc80c06 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/RetryListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/RetryListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.kafka.listener; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; /** * A listener for retry activity. @@ -53,4 +54,31 @@ default void recovered(ConsumerRecord record, Exception ex) { default void recoveryFailed(ConsumerRecord record, Exception original, Exception failure) { } + /** + * Called after a delivery failed for batch records. + * @param records the records. + * @param ex the exception. + * @param deliveryAttempt the delivery attempt, if available. + * @since 2.8.10 + */ + default void failedDelivery(ConsumerRecords records, Exception ex, int deliveryAttempt) { + } + + /** + * Called after a failing record was successfully recovered. + * @param records the record. + * @param ex the exception. + */ + default void recovered(ConsumerRecords records, Exception ex) { + } + + /** + * Called after a recovery attempt failed. + * @param records the record. + * @param original the original exception causing the recovery attempt. + * @param failure the exception thrown by the recoverer. + */ + default void recoveryFailed(ConsumerRecords records, Exception original, Exception failure) { + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerBatchTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerBatchTests.java index 5c53964e60..96c9eb2c67 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerBatchTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerBatchTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 the original author or authors. + * Copyright 2020-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,10 +19,12 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.willAnswer; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.time.Duration; @@ -205,6 +207,33 @@ records, mockConsumer, mock(MessageListenerContainer.class), () -> { })) verify(mockConsumer).seek(tp, 0L); } + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + void fallbackListener() { + Consumer mockConsumer = mock(Consumer.class); + ConsumerRecordRecoverer recoverer = mock(ConsumerRecordRecoverer.class); + DefaultErrorHandler beh = new DefaultErrorHandler(recoverer, new FixedBackOff(0, 2)); + RetryListener retryListener = mock(RetryListener.class); + beh.setRetryListeners(retryListener); + TopicPartition tp = new TopicPartition("foo", 0); + ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(tp, + List.of(new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo", + new RecordHeaders(), Optional.empty())))); + MessageListenerContainer container = mock(MessageListenerContainer.class); + given(container.isRunning()).willReturn(true); + beh.handleBatch(new ListenerExecutionFailedException("test"), + records, mockConsumer, container, () -> { + throw new ListenerExecutionFailedException("test"); + }); + verify(retryListener).failedDelivery(any(ConsumerRecords.class), any(), eq(1)); + verify(retryListener).failedDelivery(any(ConsumerRecords.class), any(), eq(2)); + verify(retryListener).failedDelivery(any(ConsumerRecords.class), any(), eq(3)); + verify(recoverer, times(2)).accept(any(), any()); // each record in batch + verify(retryListener).recovered(any(ConsumerRecords.class), any()); + } + @Configuration @EnableKafka public static class Config {