Skip to content

Commit

Permalink
Refactor MessagingMessageListenerAdapter
Browse files Browse the repository at this point in the history
* move `BatchMessagingMessageListenerAdapter#invoke` and `RecordMessagingMessageListenerAdapter#invoke` to `MessagingMessageListenerAdapter`
* move `KafkaListenerErrorHandler` to `MessagingMessageListenerAdapter`
* add `@Nullable` to `KafkaListenerErrorHandler`
  • Loading branch information
Zhiyang.Wang1 authored and Wzy19930507 committed Jan 30, 2024
1 parent 8a684b3 commit 9852ebc
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 73 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -72,6 +72,7 @@ default Object handleError(Message<?> message, ListenerExecutionFailedException
* @return the return value is ignored unless the annotated method has a
* {@code @SendTo} annotation.
*/
@Nullable
default Object handleError(Message<?> message, ListenerExecutionFailedException exception,
Consumer<?, ?> consumer, @Nullable Acknowledgment ack) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@

import org.springframework.kafka.listener.BatchAcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;

Expand All @@ -56,15 +54,14 @@
* @author Gary Russell
* @author Artem Bilan
* @author Venil Noronha
* @author Wang ZhiYang
* @since 1.1
*/
public class BatchMessagingMessageListenerAdapter<K, V> extends MessagingMessageListenerAdapter<K, V>
implements BatchAcknowledgingConsumerAwareMessageListener<K, V> {

private BatchMessageConverter batchMessageConverter = new BatchMessagingMessageConverter();

private final KafkaListenerErrorHandler errorHandler;

private BatchToRecordAdapter<K, V> batchToRecordAdapter;

/**
Expand All @@ -85,8 +82,7 @@ public BatchMessagingMessageListenerAdapter(Object bean, Method method) {
public BatchMessagingMessageListenerAdapter(Object bean, Method method,
@Nullable KafkaListenerErrorHandler errorHandler) {

super(bean, method);
this.errorHandler = errorHandler;
super(bean, method, errorHandler);
}

/**
Expand Down Expand Up @@ -172,39 +168,6 @@ public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgme
invoke(records, acknowledgment, consumer, message);
}

protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
final Message<?> messageArg) {

Message<?> message = messageArg;
try {
Object result = invokeHandler(records, acknowledgment, message, consumer);
if (result != null) {
handleResult(result, records, message);
}
}
catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control
if (this.errorHandler != null) {
try {
if (message.equals(NULL_MESSAGE)) {
message = new GenericMessage<>(records);
}
Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment);
if (result != null) {
handleResult(result, records, message);
}
}
catch (Exception ex) {
throw new ListenerExecutionFailedException(createMessagingErrorMessage(// NOSONAR stack trace loss
"Listener error handler threw an exception for the incoming message",
message.getPayload()), ex);
}
}
else {
throw e;
}
}
}

@SuppressWarnings({ "unchecked", "rawtypes" })
protected Message<?> toMessagingMessage(List records, @Nullable Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.springframework.expression.spel.support.StandardTypeConverter;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
Expand Down Expand Up @@ -102,6 +103,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS

private final StandardEvaluationContext evaluationContext = new StandardEvaluationContext();

private final KafkaListenerErrorHandler errorHandler;

private HandlerAdapter handlerMethod;

private boolean isConsumerRecordList;
Expand Down Expand Up @@ -143,8 +146,19 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS
* @param method the method.
*/
protected MessagingMessageListenerAdapter(Object bean, Method method) {
this(bean, method, null);
}

/**
* Create an instance with the provided bean, method and kafka listener error handler.
* @param bean the bean.
* @param method the method.
* @param errorHandler the kafka listener error handler.
*/
protected MessagingMessageListenerAdapter(Object bean, Method method, @Nullable KafkaListenerErrorHandler errorHandler) {
this.bean = bean;
this.inferredType = determineInferredType(method); // NOSONAR = intentionally not final
this.errorHandler = errorHandler;
}

/**
Expand Down Expand Up @@ -348,6 +362,20 @@ protected Message<?> toMessagingMessage(ConsumerRecord<K, V> cRecord, @Nullable
return getMessageConverter().toMessage(cRecord, acknowledgment, consumer, getType());
}

protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
final Message<?> message) {

try {
Object result = invokeHandler(records, acknowledgment, message, consumer);
if (result != null) {
handleResult(result, records, message);
}
}
catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control
handleException(records, acknowledgment, consumer, message, e);
}
}

/**
* Invoke the handler, wrapping any exception to a {@link ListenerExecutionFailedException}
* with a dedicated error message.
Expand Down Expand Up @@ -558,6 +586,30 @@ private void sendReplyForMessageSource(Object result, String topic, Message<?> s
this.replyTemplate.send(builder.build());
}

protected void handleException(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
Message<?> message, ListenerExecutionFailedException e) {

if (this.errorHandler != null) {
try {
if (NULL_MESSAGE.equals(message)) {
message = new GenericMessage<>(records);
}
Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment);
if (result != null) {
handleResult(result, records, message);
}
}
catch (Exception ex) {
throw new ListenerExecutionFailedException(createMessagingErrorMessage(// NOSONAR stack trace loss
"Listener error handler threw an exception for the incoming message",
message.getPayload()), ex);
}
}
else {
throw e;
}
}

private void setCorrelation(MessageBuilder<?> builder, Message<?> source) {
byte[] correlationBytes = getCorrelation(source);
if (correlationBytes != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@

import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.ProjectingMessageConverter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;


/**
Expand All @@ -53,17 +51,14 @@
public class RecordMessagingMessageListenerAdapter<K, V> extends MessagingMessageListenerAdapter<K, V>
implements AcknowledgingConsumerAwareMessageListener<K, V> {

private final KafkaListenerErrorHandler errorHandler;

public RecordMessagingMessageListenerAdapter(Object bean, Method method) {
this(bean, method, null);
}

public RecordMessagingMessageListenerAdapter(Object bean, Method method,
@Nullable KafkaListenerErrorHandler errorHandler) {

super(bean, method);
this.errorHandler = errorHandler;
super(bean, method, errorHandler);
}

/**
Expand All @@ -88,33 +83,7 @@ public void onMessage(ConsumerRecord<K, V> record, @Nullable Acknowledgment ackn
if (logger.isDebugEnabled() && !(getMessageConverter() instanceof ProjectingMessageConverter)) {
this.logger.debug("Processing [" + message + "]");
}
try {
Object result = invokeHandler(record, acknowledgment, message, consumer);
if (result != null) {
handleResult(result, record, message);
}
}
catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control
if (this.errorHandler != null) {
try {
if (message.equals(NULL_MESSAGE)) {
message = new GenericMessage<>(record);
}
Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment);
if (result != null) {
handleResult(result, record, message);
}
}
catch (Exception ex) {
throw new ListenerExecutionFailedException(createMessagingErrorMessage(// NOSONAR stack trace loss
"Listener error handler threw an exception for the incoming message",
message.getPayload()), ex);
}
}
else {
throw e;
}
}
invoke(record, acknowledgment, consumer, message);
}

}

0 comments on commit 9852ebc

Please sign in to comment.