From 4019868e1b7faa965ffd424fb1d7f996cd4fb417 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 13 Jun 2023 16:45:01 -0400 Subject: [PATCH 1/2] GH-2702: RetryableTopic with asyncAcks Resolves https://github.com/spring-projects/spring-kafka/issues/2702 When using `asyncAcks` with manual ack modes, the `DefaultErrorHandler` must have `seekAfterError` set to `false`; this required user configuration. The framework now unconditionally sets the property when it configures a container using a manual ack mode. In addition, the default DLT handler was not compatible with any manual ack mode, regardless of the `asyncAcks` setting. Add `Acknowledgment` to the `LoggingDltListenerHandlerMethod`. Also tested with reporter's reproducer. **cherry-pick to 2.9.x (will require instanceof polishing for Java 8)** --- .../src/main/asciidoc/retrytopic.adoc | 16 ++++ ...fkaBackoffAwareMessageListenerAdapter.java | 18 ++++- .../ListenerContainerFactoryConfigurer.java | 19 +++-- .../retrytopic/RetryTopicConfigurer.java | 7 +- ...ckoffAwareMessageListenerAdapterTests.java | 11 +-- .../retrytopic/RetryTopicConfigurerTests.java | 6 +- .../RetryTopicIntegrationTests.java | 75 ++++++++++++++++++- 7 files changed, 136 insertions(+), 16 deletions(-) diff --git a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc index 3b0641e3af..895720eb39 100644 --- a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc +++ b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc @@ -22,6 +22,22 @@ IMPORTANT: You can set the `AckMode` mode you prefer, but `RECORD` is suggested. IMPORTANT: At this time this functionality doesn't support class level `@KafkaListener` annotations +When using a manual `AckMode` with `asyncAcks` set to true, the `DefaultErrorHandler` must be configured with `seekAfterError` set to `false`. +Starting with versions 2.9.10, 3.0.8, this will be set to true unconditionally for such configurations. +With earlier versions, it was necessary to override the `RetryConfigurationSupport.configureCustomizers()` method to set the property to `true`. + +==== +[source, java] +---- +@Override +protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) { + customizersConfigurer.customizeErrorHandler(eh -> eh.setSeekAfterError(false)); +} +---- +==== + +In addition, before those versions, using the default (logging) DLT handler was not compatible with any kind of manual `AckMode`, regardless of the `asyncAcks` property. + ==== Back Off Delay Precision ===== Overview and Guarantees diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapter.java index d4eb63e903..995ebc93b2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapter.java @@ -50,6 +50,8 @@ public class KafkaBackoffAwareMessageListenerAdapter extends AbstractDelegatingMessageListenerAdapter> implements AcknowledgingConsumerAwareMessageListener { + private static final Acknowledgment NO_OP_ACK = new NoOpAck(); + private final String listenerId; private final String backoffTimestampHeader; @@ -94,7 +96,12 @@ public void onMessage(ConsumerRecord consumerRecord, @Nullable Acknowledgm .ifPresent(nextExecutionTimestamp -> this.kafkaConsumerBackoffManager .backOffIfNecessary(createContext(consumerRecord, nextExecutionTimestamp, consumer))); try { - invokeDelegateOnMessage(consumerRecord, acknowledgment, consumer); + Acknowledgment ack = acknowledgment; + if (ack == null) { + // The default DLT handler now requires an Acknowledgment. + ack = NO_OP_ACK; + } + invokeDelegateOnMessage(consumerRecord, ack, consumer); } catch (Exception ex) { throw new TimestampedException(ex, Instant.now(this.clock)); @@ -143,4 +150,13 @@ public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) public void onMessage(ConsumerRecord data, Consumer consumer) { onMessage(data, null, consumer); } + + static class NoOpAck implements Acknowledgment { + + @Override + public void acknowledge() { + } + + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java index e6f49c27df..913b697f3e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.listener.KafkaConsumerBackoffManager; @@ -233,11 +234,19 @@ private class RetryTopicListenerContainerFactoryDecorator if (mainListenerId == null) { mainListenerId = listenerContainer.getListenerId(); } + CommonErrorHandler errorHandler = createErrorHandler( + ListenerContainerFactoryConfigurer.this.deadLetterPublishingRecovererFactory + .create(mainListenerId), + this.configuration); + if (listenerContainer.getContainerProperties().isAsyncAcks()) { + AckMode ackMode = listenerContainer.getContainerProperties().getAckMode(); + if ((AckMode.MANUAL.equals(ackMode) || AckMode.MANUAL_IMMEDIATE.equals(ackMode)) + && errorHandler instanceof DefaultErrorHandler deh) { + deh.setSeekAfterError(false); + } + } listenerContainer - .setCommonErrorHandler(createErrorHandler( - ListenerContainerFactoryConfigurer.this.deadLetterPublishingRecovererFactory - .create(mainListenerId), - this.configuration)); + .setCommonErrorHandler(errorHandler); setupBackoffAwareMessageListenerAdapter(listenerContainer, this.configuration, this.isSetContainerProperties); return listenerContainer; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java index 3e1dd60b1a..6ffd1f7d3e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java @@ -34,6 +34,7 @@ import org.springframework.kafka.config.KafkaListenerEndpointRegistrar; import org.springframework.kafka.config.MethodKafkaListenerEndpoint; import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint; +import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.EndpointHandlerMethod; import org.springframework.kafka.support.KafkaUtils; import org.springframework.kafka.support.TopicForRetryable; @@ -468,7 +469,7 @@ static class LoggingDltListenerHandlerMethod { public static final String DEFAULT_DLT_METHOD_NAME = "logMessage"; - public void logMessage(Object message) { + public void logMessage(Object message, @Nullable Acknowledgment ack) { if (message instanceof ConsumerRecord) { LOGGER.info(() -> "Received message in dlt listener: " + KafkaUtils.format((ConsumerRecord) message)); @@ -476,7 +477,11 @@ public void logMessage(Object message) { else { LOGGER.info(() -> "Received message in dlt listener."); } + if (ack != null) { + ack.acknowledge(); + } } + } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapterTests.java index f7bf119684..d411dababa 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.BDDMockito.given; @@ -147,7 +148,7 @@ void shouldCallBackoffManagerIfBackoffHeaderIsPresentAndFirstMethodIsCalled() { then(kafkaConsumerBackoffManager).should(times(1)) .backOffIfNecessary(context); - then(delegate).should(times(1)).onMessage(data, null, null); + then(delegate).should(times(1)).onMessage(eq(data), any(), isNull()); } @Test @@ -159,7 +160,7 @@ void shouldWrapExceptionInTimestampedException() { given(kafkaConsumerBackoffManager.createContext(originalTimestamp, listenerId, topicPartition, null)) .willReturn(context); RuntimeException thrownException = new RuntimeException(); - willThrow(thrownException).given(delegate).onMessage(data, null, null); + willThrow(thrownException).given(delegate).onMessage(eq(data), any(), isNull()); KafkaBackoffAwareMessageListenerAdapter backoffAwareMessageListenerAdapter = new KafkaBackoffAwareMessageListenerAdapter<>(delegate, kafkaConsumerBackoffManager, listenerId, clock); @@ -175,7 +176,7 @@ void shouldWrapExceptionInTimestampedException() { then(kafkaConsumerBackoffManager).should(times(1)) .backOffIfNecessary(context); - then(delegate).should(times(1)).onMessage(data, null, null); + then(delegate).should(times(1)).onMessage(eq(data), any(), isNull()); } @Test @@ -224,7 +225,7 @@ void shouldCallBackoffManagerIfBackoffHeaderIsPresentAndThirdMethodIsCalled() { then(kafkaConsumerBackoffManager).should(times(1)) .backOffIfNecessary(context); - then(delegate).should(times(1)).onMessage(data, null, consumer); + then(delegate).should(times(1)).onMessage(eq(data), any(), eq(consumer)); } @Test diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java index d9fcb9071b..6b45679da4 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -365,7 +365,7 @@ void shouldInstantiateIfNotInContainer() { void shouldLogConsumerRecordMessage() { RetryTopicConfigurer.LoggingDltListenerHandlerMethod method = new RetryTopicConfigurer.LoggingDltListenerHandlerMethod(); - method.logMessage(consumerRecordMessage); + method.logMessage(consumerRecordMessage, null); then(consumerRecordMessage).should().topic(); } @@ -373,7 +373,7 @@ void shouldLogConsumerRecordMessage() { void shouldNotLogObjectMessage() { RetryTopicConfigurer.LoggingDltListenerHandlerMethod method = new RetryTopicConfigurer.LoggingDltListenerHandlerMethod(); - method.logMessage(objectMessage); + method.logMessage(objectMessage, null); then(objectMessage).shouldHaveNoInteractions(); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java index 3823c37b6c..44dcb09123 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java @@ -18,6 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; +import static org.awaitility.Awaitility.await; import java.lang.reflect.Method; import java.util.ArrayList; @@ -27,6 +28,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -35,11 +37,14 @@ import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,7 +70,9 @@ import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.listener.KafkaListenerErrorHandler; +import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; @@ -94,7 +101,8 @@ RetryTopicIntegrationTests.SECOND_TOPIC, RetryTopicIntegrationTests.THIRD_TOPIC, RetryTopicIntegrationTests.FOURTH_TOPIC, - RetryTopicIntegrationTests.TWO_LISTENERS_TOPIC }) + RetryTopicIntegrationTests.TWO_LISTENERS_TOPIC, + RetryTopicIntegrationTests.MANUAL_TOPIC }) @TestPropertySource(properties = { "five.attempts=5", "kafka.template=customKafkaTemplate"}) public class RetryTopicIntegrationTests { @@ -110,6 +118,8 @@ public class RetryTopicIntegrationTests { public final static String TWO_LISTENERS_TOPIC = "myRetryTopic5"; + public final static String MANUAL_TOPIC = "myRetryTopic6"; + public final static String NOT_RETRYABLE_EXCEPTION_TOPIC = "noRetryTopic"; private final static String MAIN_TOPIC_CONTAINER_FACTORY = "kafkaListenerContainerFactory"; @@ -225,6 +235,38 @@ void shouldRetryFifthTopicWithTwoListenersAndManualAssignment(@Autowired FifthTo TWO_LISTENERS_TOPIC + "-listener2-dlt"); } + @Test + void shouldRetryManualTopicWithDefaultDlt(@Autowired KafkaListenerEndpointRegistry registry, + @Autowired ConsumerFactory cf) { + + logger.debug("Sending message to topic " + MANUAL_TOPIC); + kafkaTemplate.send(MANUAL_TOPIC, "Testing topic 6"); + assertThat(awaitLatch(latchContainer.countDownLatch6)).isTrue(); + registry.getListenerContainerIds().stream() + .filter(id -> id.startsWith("manual")) + .forEach(id -> { + ConcurrentMessageListenerContainer container = + (ConcurrentMessageListenerContainer) registry.getListenerContainer(id); + assertThat(container).extracting("commonErrorHandler") + .extracting("seekAfterError", InstanceOfAssertFactories.BOOLEAN) + .isFalse(); + }); + Consumer consumer = cf.createConsumer("manual-dlt", ""); + Set tp = + Set.of(new org.apache.kafka.common.TopicPartition(MANUAL_TOPIC + "-dlt", 0)); + consumer.assign(tp); + try { + await().untilAsserted(() -> { + OffsetAndMetadata offsetAndMetadata = consumer.committed(tp).get(tp.iterator().next()); + assertThat(offsetAndMetadata).isNotNull(); + assertThat(offsetAndMetadata.offset()).isEqualTo(1L); + }); + } + finally { + consumer.close(); + } + } + @Test public void shouldGoStraightToDlt() { logger.debug("Sending message to topic " + NOT_RETRYABLE_EXCEPTION_TOPIC); @@ -260,6 +302,7 @@ public void listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String r container.countDownLatch1.countDown(); throw new RuntimeException("Woooops... in topic " + receivedTopic); } + } @Component @@ -389,6 +432,24 @@ public void annotatedDltMethod(ConsumerRecord record) { } + @Component + static class SixthTopicDefaultDLTListener { + + @Autowired + CountDownLatchContainer container; + + @RetryableTopic(attempts = "4", backoff = @Backoff(50)) + @KafkaListener(id = "manual", topics = MANUAL_TOPIC, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY) + public void listenNoDlt(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic, + @SuppressWarnings("unused") Acknowledgment ack) { + + logger.debug("Message {} received in topic {} ", message, receivedTopic); + container.countDownIfNotKnown(receivedTopic, container.countDownLatch6); + throw new IllegalStateException("Another woooops... " + receivedTopic); + } + + } + @Component static class NoRetryTopicListener { @@ -421,6 +482,7 @@ static class CountDownLatchContainer { CountDownLatch countDownLatch4 = new CountDownLatch(4); CountDownLatch countDownLatch51 = new CountDownLatch(4); CountDownLatch countDownLatch52 = new CountDownLatch(4); + CountDownLatch countDownLatch6 = new CountDownLatch(4); CountDownLatch countDownLatchNoRetry = new CountDownLatch(1); CountDownLatch countDownLatchDltOne = new CountDownLatch(1); CountDownLatch countDownLatchDltTwo = new CountDownLatch(1); @@ -556,6 +618,11 @@ public FifthTopicListener2 fifthTopicListener2() { return new FifthTopicListener2(); } + @Bean + SixthTopicDefaultDLTListener manualTopicListener() { + return new SixthTopicDefaultDLTListener(); + } + @Bean public NoRetryTopicListener noRetryTopicListener() { return new NoRetryTopicListener(); @@ -668,6 +735,12 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerCont ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setConcurrency(1); + factory.setContainerCustomizer(container -> { + if (container.getListenerId().startsWith("manual")) { + container.getContainerProperties().setAckMode(AckMode.MANUAL); + container.getContainerProperties().setAsyncAcks(true); + } + }); return factory; } From 0b7991e0079be66fd29a2ae17f2f41a01468a83f Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 14 Jun 2023 11:12:19 -0400 Subject: [PATCH 2/2] Only supply `NoOpAck` with explicit `@NonNull` param annotation. --- ...fkaBackoffAwareMessageListenerAdapter.java | 18 +---------- .../MessagingMessageListenerAdapter.java | 30 +++++++++++++++---- .../retrytopic/RetryTopicConfigurer.java | 7 ++--- .../retrytopic/RetryTopicConfigurerTests.java | 6 ++-- 4 files changed, 33 insertions(+), 28 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapter.java index 995ebc93b2..d4eb63e903 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapter.java @@ -50,8 +50,6 @@ public class KafkaBackoffAwareMessageListenerAdapter extends AbstractDelegatingMessageListenerAdapter> implements AcknowledgingConsumerAwareMessageListener { - private static final Acknowledgment NO_OP_ACK = new NoOpAck(); - private final String listenerId; private final String backoffTimestampHeader; @@ -96,12 +94,7 @@ public void onMessage(ConsumerRecord consumerRecord, @Nullable Acknowledgm .ifPresent(nextExecutionTimestamp -> this.kafkaConsumerBackoffManager .backOffIfNecessary(createContext(consumerRecord, nextExecutionTimestamp, consumer))); try { - Acknowledgment ack = acknowledgment; - if (ack == null) { - // The default DLT handler now requires an Acknowledgment. - ack = NO_OP_ACK; - } - invokeDelegateOnMessage(consumerRecord, ack, consumer); + invokeDelegateOnMessage(consumerRecord, acknowledgment, consumer); } catch (Exception ex) { throw new TimestampedException(ex, Instant.now(this.clock)); @@ -150,13 +143,4 @@ public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) public void onMessage(ConsumerRecord data, Consumer consumer) { onMessage(data, null, consumer); } - - static class NoOpAck implements Acknowledgment { - - @Override - public void acknowledge() { - } - - } - } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 0f25ba2877..f1cc77b3c6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -52,6 +52,7 @@ import org.springframework.kafka.support.KafkaUtils; import org.springframework.kafka.support.converter.MessagingMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.lang.NonNull; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; @@ -84,6 +85,8 @@ public abstract class MessagingMessageListenerAdapter implements ConsumerS private static final SpelExpressionParser PARSER = new SpelExpressionParser(); + private static final Acknowledgment NO_OP_ACK = new NoOpAck(); + /** * Message used when no conversion is needed. */ @@ -120,6 +123,8 @@ public abstract class MessagingMessageListenerAdapter implements ConsumerS private boolean hasAckParameter; + private boolean noOpAck; + private boolean hasMetadataParameter; private boolean messageReturnType; @@ -353,25 +358,29 @@ protected Message toMessagingMessage(ConsumerRecord cRecord, @Nullable protected final Object invokeHandler(Object data, @Nullable Acknowledgment acknowledgment, Message message, Consumer consumer) { + Acknowledgment ack = acknowledgment; + if (ack == null && this.noOpAck) { + ack = NO_OP_ACK; + } try { if (data instanceof List && !this.isConsumerRecordList) { - return this.handlerMethod.invoke(message, acknowledgment, consumer); + return this.handlerMethod.invoke(message, ack, consumer); } else { if (this.hasMetadataParameter) { - return this.handlerMethod.invoke(message, data, acknowledgment, consumer, + return this.handlerMethod.invoke(message, data, ack, consumer, AdapterUtils.buildConsumerRecordMetadata(data)); } else { - return this.handlerMethod.invoke(message, data, acknowledgment, consumer); + return this.handlerMethod.invoke(message, data, ack, consumer); } } } catch (org.springframework.messaging.converter.MessageConversionException ex) { - throw checkAckArg(acknowledgment, message, new MessageConversionException("Cannot handle message", ex)); + throw checkAckArg(ack, message, new MessageConversionException("Cannot handle message", ex)); } catch (MethodArgumentNotValidException ex) { - throw checkAckArg(acknowledgment, message, ex); + throw checkAckArg(ack, message, ex); } catch (MessagingException ex) { throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " + @@ -607,6 +616,9 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity boolean isNotConvertible = parameterIsType(parameterType, ConsumerRecord.class); boolean isAck = parameterIsType(parameterType, Acknowledgment.class); this.hasAckParameter |= isAck; + if (isAck) { + this.noOpAck |= methodParameter.getParameterAnnotation(NonNull.class) != null; + } isNotConvertible |= isAck; boolean isConsumer = parameterIsType(parameterType, Consumer.class); isNotConvertible |= isConsumer; @@ -759,4 +771,12 @@ private boolean parameterIsType(Type parameterType, Type type) { public record ReplyExpressionRoot(Object request, Object source, Object result) { } + static class NoOpAck implements Acknowledgment { + + @Override + public void acknowledge() { + } + + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java index 6ffd1f7d3e..ccc057e775 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java @@ -38,6 +38,7 @@ import org.springframework.kafka.support.EndpointHandlerMethod; import org.springframework.kafka.support.KafkaUtils; import org.springframework.kafka.support.TopicForRetryable; +import org.springframework.lang.NonNull; import org.springframework.lang.Nullable; @@ -469,7 +470,7 @@ static class LoggingDltListenerHandlerMethod { public static final String DEFAULT_DLT_METHOD_NAME = "logMessage"; - public void logMessage(Object message, @Nullable Acknowledgment ack) { + public void logMessage(Object message, @NonNull Acknowledgment ack) { if (message instanceof ConsumerRecord) { LOGGER.info(() -> "Received message in dlt listener: " + KafkaUtils.format((ConsumerRecord) message)); @@ -477,9 +478,7 @@ public void logMessage(Object message, @Nullable Acknowledgment ack) { else { LOGGER.info(() -> "Received message in dlt listener."); } - if (ack != null) { - ack.acknowledge(); - } + ack.acknowledge(); } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java index 6b45679da4..0ed0addd9e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java @@ -24,6 +24,7 @@ import static org.mockito.BDDMockito.then; import static org.mockito.BDDMockito.willReturn; import static org.mockito.BDDMockito.willThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import java.lang.reflect.Method; @@ -51,6 +52,7 @@ import org.springframework.kafka.config.KafkaListenerEndpointRegistrar; import org.springframework.kafka.config.MethodKafkaListenerEndpoint; import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint; +import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.EndpointHandlerMethod; import org.springframework.kafka.test.condition.LogLevels; import org.springframework.test.util.ReflectionTestUtils; @@ -365,7 +367,7 @@ void shouldInstantiateIfNotInContainer() { void shouldLogConsumerRecordMessage() { RetryTopicConfigurer.LoggingDltListenerHandlerMethod method = new RetryTopicConfigurer.LoggingDltListenerHandlerMethod(); - method.logMessage(consumerRecordMessage, null); + method.logMessage(consumerRecordMessage, mock(Acknowledgment.class)); then(consumerRecordMessage).should().topic(); } @@ -373,7 +375,7 @@ void shouldLogConsumerRecordMessage() { void shouldNotLogObjectMessage() { RetryTopicConfigurer.LoggingDltListenerHandlerMethod method = new RetryTopicConfigurer.LoggingDltListenerHandlerMethod(); - method.logMessage(objectMessage, null); + method.logMessage(objectMessage, mock(Acknowledgment.class)); then(objectMessage).shouldHaveNoInteractions(); }