Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/retrytopic.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,22 @@ IMPORTANT: You can set the `AckMode` mode you prefer, but `RECORD` is suggested.

IMPORTANT: At this time this functionality doesn't support class level `@KafkaListener` annotations

When using a manual `AckMode` with `asyncAcks` set to true, the `DefaultErrorHandler` must be configured with `seekAfterError` set to `false`.
Starting with versions 2.9.10, 3.0.8, this will be set to true unconditionally for such configurations.
With earlier versions, it was necessary to override the `RetryConfigurationSupport.configureCustomizers()` method to set the property to `true`.

====
[source, java]
----
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer.customizeErrorHandler(eh -> eh.setSeekAfterError(false));
}
----
====

In addition, before those versions, using the default (logging) DLT handler was not compatible with any kind of manual `AckMode`, regardless of the `asyncAcks` property.

==== Back Off Delay Precision

===== Overview and Guarantees
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
Expand Down Expand Up @@ -84,6 +85,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS

private static final SpelExpressionParser PARSER = new SpelExpressionParser();

private static final Acknowledgment NO_OP_ACK = new NoOpAck();

/**
* Message used when no conversion is needed.
*/
Expand Down Expand Up @@ -120,6 +123,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS

private boolean hasAckParameter;

private boolean noOpAck;

private boolean hasMetadataParameter;

private boolean messageReturnType;
Expand Down Expand Up @@ -353,25 +358,29 @@ protected Message<?> toMessagingMessage(ConsumerRecord<K, V> cRecord, @Nullable
protected final Object invokeHandler(Object data, @Nullable Acknowledgment acknowledgment, Message<?> message,
Consumer<?, ?> consumer) {

Acknowledgment ack = acknowledgment;
if (ack == null && this.noOpAck) {
ack = NO_OP_ACK;
}
try {
if (data instanceof List && !this.isConsumerRecordList) {
return this.handlerMethod.invoke(message, acknowledgment, consumer);
return this.handlerMethod.invoke(message, ack, consumer);
}
else {
if (this.hasMetadataParameter) {
return this.handlerMethod.invoke(message, data, acknowledgment, consumer,
return this.handlerMethod.invoke(message, data, ack, consumer,
AdapterUtils.buildConsumerRecordMetadata(data));
}
else {
return this.handlerMethod.invoke(message, data, acknowledgment, consumer);
return this.handlerMethod.invoke(message, data, ack, consumer);
}
}
}
catch (org.springframework.messaging.converter.MessageConversionException ex) {
throw checkAckArg(acknowledgment, message, new MessageConversionException("Cannot handle message", ex));
throw checkAckArg(ack, message, new MessageConversionException("Cannot handle message", ex));
}
catch (MethodArgumentNotValidException ex) {
throw checkAckArg(acknowledgment, message, ex);
throw checkAckArg(ack, message, ex);
}
catch (MessagingException ex) {
throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " +
Expand Down Expand Up @@ -607,6 +616,9 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity
boolean isNotConvertible = parameterIsType(parameterType, ConsumerRecord.class);
boolean isAck = parameterIsType(parameterType, Acknowledgment.class);
this.hasAckParameter |= isAck;
if (isAck) {
this.noOpAck |= methodParameter.getParameterAnnotation(NonNull.class) != null;
}
isNotConvertible |= isAck;
boolean isConsumer = parameterIsType(parameterType, Consumer.class);
isNotConvertible |= isConsumer;
Expand Down Expand Up @@ -759,4 +771,12 @@ private boolean parameterIsType(Type parameterType, Type type) {
public record ReplyExpressionRoot(Object request, Object source, Object result) {
}

static class NoOpAck implements Acknowledgment {

@Override
public void acknowledge() {
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,7 @@
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
Expand Down Expand Up @@ -233,11 +234,19 @@ private class RetryTopicListenerContainerFactoryDecorator
if (mainListenerId == null) {
mainListenerId = listenerContainer.getListenerId();
}
CommonErrorHandler errorHandler = createErrorHandler(
ListenerContainerFactoryConfigurer.this.deadLetterPublishingRecovererFactory
.create(mainListenerId),
this.configuration);
if (listenerContainer.getContainerProperties().isAsyncAcks()) {
AckMode ackMode = listenerContainer.getContainerProperties().getAckMode();
if ((AckMode.MANUAL.equals(ackMode) || AckMode.MANUAL_IMMEDIATE.equals(ackMode))
&& errorHandler instanceof DefaultErrorHandler deh) {
deh.setSeekAfterError(false);
}
}
listenerContainer
.setCommonErrorHandler(createErrorHandler(
ListenerContainerFactoryConfigurer.this.deadLetterPublishingRecovererFactory
.create(mainListenerId),
this.configuration));
.setCommonErrorHandler(errorHandler);
setupBackoffAwareMessageListenerAdapter(listenerContainer, this.configuration,
this.isSetContainerProperties);
return listenerContainer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.EndpointHandlerMethod;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.TopicForRetryable;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;


Expand Down Expand Up @@ -468,15 +470,17 @@ static class LoggingDltListenerHandlerMethod {

public static final String DEFAULT_DLT_METHOD_NAME = "logMessage";

public void logMessage(Object message) {
public void logMessage(Object message, @NonNull Acknowledgment ack) {
if (message instanceof ConsumerRecord) {
LOGGER.info(() -> "Received message in dlt listener: "
+ KafkaUtils.format((ConsumerRecord<?, ?>) message));
}
else {
LOGGER.info(() -> "Received message in dlt listener.");
}
ack.acknowledge();
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 the original author or authors.
* Copyright 2019-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
Expand Down Expand Up @@ -147,7 +148,7 @@ void shouldCallBackoffManagerIfBackoffHeaderIsPresentAndFirstMethodIsCalled() {
then(kafkaConsumerBackoffManager).should(times(1))
.backOffIfNecessary(context);

then(delegate).should(times(1)).onMessage(data, null, null);
then(delegate).should(times(1)).onMessage(eq(data), any(), isNull());
}

@Test
Expand All @@ -159,7 +160,7 @@ void shouldWrapExceptionInTimestampedException() {
given(kafkaConsumerBackoffManager.createContext(originalTimestamp, listenerId, topicPartition, null))
.willReturn(context);
RuntimeException thrownException = new RuntimeException();
willThrow(thrownException).given(delegate).onMessage(data, null, null);
willThrow(thrownException).given(delegate).onMessage(eq(data), any(), isNull());

KafkaBackoffAwareMessageListenerAdapter<Object, Object> backoffAwareMessageListenerAdapter =
new KafkaBackoffAwareMessageListenerAdapter<>(delegate, kafkaConsumerBackoffManager, listenerId, clock);
Expand All @@ -175,7 +176,7 @@ void shouldWrapExceptionInTimestampedException() {
then(kafkaConsumerBackoffManager).should(times(1))
.backOffIfNecessary(context);

then(delegate).should(times(1)).onMessage(data, null, null);
then(delegate).should(times(1)).onMessage(eq(data), any(), isNull());
}

@Test
Expand Down Expand Up @@ -224,7 +225,7 @@ void shouldCallBackoffManagerIfBackoffHeaderIsPresentAndThirdMethodIsCalled() {
then(kafkaConsumerBackoffManager).should(times(1))
.backOffIfNecessary(context);

then(delegate).should(times(1)).onMessage(data, null, consumer);
then(delegate).should(times(1)).onMessage(eq(data), any(), eq(consumer));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@
import static org.mockito.BDDMockito.then;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.BDDMockito.willThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;

import java.lang.reflect.Method;
Expand Down Expand Up @@ -51,6 +52,7 @@
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.EndpointHandlerMethod;
import org.springframework.kafka.test.condition.LogLevels;
import org.springframework.test.util.ReflectionTestUtils;
Expand Down Expand Up @@ -365,15 +367,15 @@ void shouldInstantiateIfNotInContainer() {
void shouldLogConsumerRecordMessage() {
RetryTopicConfigurer.LoggingDltListenerHandlerMethod method =
new RetryTopicConfigurer.LoggingDltListenerHandlerMethod();
method.logMessage(consumerRecordMessage);
method.logMessage(consumerRecordMessage, mock(Acknowledgment.class));
then(consumerRecordMessage).should().topic();
}

@Test
void shouldNotLogObjectMessage() {
RetryTopicConfigurer.LoggingDltListenerHandlerMethod method =
new RetryTopicConfigurer.LoggingDltListenerHandlerMethod();
method.logMessage(objectMessage);
method.logMessage(objectMessage, mock(Acknowledgment.class));
then(objectMessage).shouldHaveNoInteractions();
}

Expand Down
Loading