Skip to content

Commit

Permalink
Remove async retry callback from MessageListener contract.
Browse files Browse the repository at this point in the history
  • Loading branch information
chickenchickenlove committed Oct 11, 2024
1 parent c0fea39 commit e74072c
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
import org.springframework.kafka.listener.adapter.AsyncRepliesAware;
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.KafkaUtils;
Expand Down Expand Up @@ -842,7 +844,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private volatile long lastPoll = System.currentTimeMillis();

private final ConcurrentLinkedDeque<FailedRecordTuple<K, V>> failedRecords = new ConcurrentLinkedDeque();
private final ConcurrentLinkedDeque<FailedRecordTuple<K, V>> failedRecords = new ConcurrentLinkedDeque<>();

@SuppressWarnings(UNCHECKED)
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType,
Expand Down Expand Up @@ -900,13 +902,20 @@ else if (listener instanceof MessageListener) {
this.pollThreadStateProcessor = setUpPollProcessor(false);
this.observationEnabled = this.containerProperties.isObservationEnabled();

if (!AopUtils.isAopProxy(listener)) {
BiConsumer<ConsumerRecord<K, V>, RuntimeException> callbackForAsyncFailureQueue =
(cRecord, ex) -> {
FailedRecordTuple<K, V> failedRecord = new FailedRecordTuple<>(cRecord, ex);
this.failedRecords.addLast(failedRecord);
};
this.listener.setCallbackForAsyncFailure(callbackForAsyncFailureQueue);
if (!AopUtils.isAopProxy(this.genericListener) &&
this.genericListener instanceof KafkaBackoffAwareMessageListenerAdapter<?, ?>) {
KafkaBackoffAwareMessageListenerAdapter<K, V> genListener =
(KafkaBackoffAwareMessageListenerAdapter<K, V>) this.genericListener;
if (genListener.getDelegate() instanceof RecordMessagingMessageListenerAdapter<K, V>) {

RecordMessagingMessageListenerAdapter<K, V> recordAdapterListener =
(RecordMessagingMessageListenerAdapter<K, V>) genListener.getDelegate();

BiConsumer<ConsumerRecord<K, V>, RuntimeException> callbackForAsyncFailure =
(cRecord, ex) -> this.failedRecords.addLast(new FailedRecordTuple<>(cRecord, ex));
recordAdapterListener.setCallbackForAsyncFailure(callbackForAsyncFailure);
}

}
}
else {
Expand Down Expand Up @@ -1458,17 +1467,25 @@ protected void pollAndInvoke() {
}

protected void handleAsyncFailure() {
List<FailedRecordTuple> copyFailedRecords = new ArrayList<>();
List<FailedRecordTuple<K, V>> copyFailedRecords = new ArrayList<>();
while (!this.failedRecords.isEmpty()) {
FailedRecordTuple failedRecordTuple = this.failedRecords.pollFirst();
FailedRecordTuple<K, V> failedRecordTuple = this.failedRecords.pollFirst();
copyFailedRecords.add(failedRecordTuple);
}

// If any copied and failed record fails to complete due to an unexpected error,
// We will give up on retrying with the remaining copied and failed Records.
if (!copyFailedRecords.isEmpty()) {
copyFailedRecords.forEach(failedRecordTuple ->
this.invokeErrorHandlerBySingleRecord(failedRecordTuple.record(), failedRecordTuple.ex()));
for (FailedRecordTuple<K, V> copyFailedRecord : copyFailedRecords) {
try {
invokeErrorHandlerBySingleRecord(copyFailedRecord);
}
catch (Exception e) {
this.logger.warn(() ->
"Async failed record failed to complete, thus skip it. record :"
+ copyFailedRecord.toString()
+ ", Exception : "
+ e.getMessage());
}
}
}

Expand Down Expand Up @@ -2864,7 +2881,9 @@ private void doInvokeOnMessage(final ConsumerRecord<K, V> recordArg) {
}
}

private void invokeErrorHandlerBySingleRecord(final ConsumerRecord<K, V> cRecord, RuntimeException rte) {
private void invokeErrorHandlerBySingleRecord(final FailedRecordTuple<K, V> failedRecordTuple) {
final ConsumerRecord<K, V> cRecord = failedRecordTuple.record;
RuntimeException rte = failedRecordTuple.ex;
if (this.commonErrorHandler.seeksAfterHandling() || rte instanceof CommitFailedException) {
try {
if (this.producer == null) {
Expand Down Expand Up @@ -3986,6 +4005,6 @@ private static class StopAfterFenceException extends KafkaException {

}

record FailedRecordTuple<K, V>(ConsumerRecord<K, V> record, RuntimeException ex) { };
private record FailedRecordTuple<K, V>(ConsumerRecord<K, V> record, RuntimeException ex) { };

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package org.springframework.kafka.listener;

import java.util.function.BiConsumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
Expand All @@ -28,13 +26,8 @@
*
* @author Marius Bogoevici
* @author Gary Russell
* @author Sanghyeok An
*/
@FunctionalInterface
public interface MessageListener<K, V> extends GenericMessageListener<ConsumerRecord<K, V>> {

default void setCallbackForAsyncFailure(
BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback) {
//
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.time.Clock;
import java.time.Instant;
import java.util.Optional;
import java.util.function.BiConsumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -44,7 +43,6 @@
* @param <K> the record key type.
* @param <V> the record value type.
* @author Tomaz Fernandes
* @author Sanghyeok An
* @since 2.7
*
*/
Expand Down Expand Up @@ -146,8 +144,4 @@ public void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer) {
onMessage(data, null, consumer);
}

@Override
public void setCallbackForAsyncFailure(BiConsumer<ConsumerRecord<K, V>, RuntimeException> callbackForAsyncFailureQueue) {
this.delegate.setCallbackForAsyncFailure(callbackForAsyncFailureQueue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS

private String correlationHeaderName = KafkaHeaders.CORRELATION_ID;

private BiConsumer<ConsumerRecord<K, V>, RuntimeException> callbackForAsyncFailureQueue;
@Nullable
private BiConsumer<ConsumerRecord<K, V>, RuntimeException> callbackForAsyncFailure;

/**
* Create an instance with the provided bean and method.
Expand Down Expand Up @@ -678,9 +679,10 @@ protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgm
catch (Throwable ex) {
this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
acknowledge(acknowledgment);
if (canAsyncRetry(request, ex)) {
if (canAsyncRetry(request, ex) &&
Objects.nonNull(this.callbackForAsyncFailure)) {
ConsumerRecord<K, V> record = (ConsumerRecord<K, V>) request;
this.callbackForAsyncFailureQueue.accept(record, (RuntimeException) ex);
this.callbackForAsyncFailure.accept(record, (RuntimeException) ex);
}
}
}
Expand Down Expand Up @@ -884,8 +886,8 @@ private boolean rawByParameterIsType(Type parameterType, Type type) {
return parameterType instanceof ParameterizedType pType && pType.getRawType().equals(type);
}

protected void setAsyncFailureCallback(BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback) {
this.callbackForAsyncFailureQueue = asyncRetryCallback;
public void setCallbackForAsyncFailure(BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback) {
this.callbackForAsyncFailure = asyncRetryCallback;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.springframework.kafka.listener.adapter;

import java.lang.reflect.Method;
import java.util.function.BiConsumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -86,9 +85,4 @@ public void onMessage(ConsumerRecord<K, V> record, @Nullable Acknowledgment ackn
invoke(record, acknowledgment, consumer, message);
}

@Override
public void setCallbackForAsyncFailure(
BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback) {
setAsyncFailureCallback(asyncRetryCallback);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2024 the original author or authors.
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2024 the original author or authors.
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,6 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
Expand All @@ -30,7 +29,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
Expand All @@ -45,7 +43,6 @@
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
Expand Down Expand Up @@ -1210,13 +1207,6 @@ static class KafkaConsumerConfig {
@Autowired
EmbeddedKafkaBroker broker;

@Bean
KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker.getBrokersAsString());
return new KafkaAdmin(configs);
}

@Bean
ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = KafkaTestUtils.consumerProps(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2024 the original author or authors.
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2024 the original author or authors.
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,15 +22,13 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
Expand All @@ -45,7 +43,6 @@
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
Expand Down Expand Up @@ -1178,13 +1175,6 @@ static class KafkaConsumerConfig {
@Autowired
EmbeddedKafkaBroker broker;

@Bean
KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker.getBrokersAsString());
return new KafkaAdmin(configs);
}

@Bean
ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = KafkaTestUtils.consumerProps(
Expand Down

0 comments on commit e74072c

Please sign in to comment.