Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-3276: Support async retry with @RetryableTopic #3523

Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
eee3515
Test
chickenchickenlove Sep 29, 2024
fdc59f0
DRAFT
chickenchickenlove Sep 30, 2024
87ac522
GH-3276: Draft and test codes.
chickenchickenlove Oct 1, 2024
723cc18
Add test cases.
chickenchickenlove Oct 6, 2024
ac110b4
Add method
chickenchickenlove Oct 6, 2024
e30319d
Fixes lint errors.
chickenchickenlove Oct 7, 2024
6399b81
Minimize FailedRecordTuple dependency.
chickenchickenlove Oct 9, 2024
4a4a1ee
Refactor test codes and real codes.
chickenchickenlove Oct 10, 2024
08e7721
Modify method name.
chickenchickenlove Oct 10, 2024
c0fea39
Fixes compile error.
chickenchickenlove Oct 10, 2024
e74072c
Remove async retry callback from MessageListener contract.
chickenchickenlove Oct 11, 2024
645d7ce
Revert Copyright period.
chickenchickenlove Oct 11, 2024
0c89c92
Revert KafkaBackkOffAwareMessageListenerAdapter.
chickenchickenlove Oct 11, 2024
3f6b447
Depends on general type on KafkaMessageListenerContainer.
chickenchickenlove Oct 11, 2024
01de80e
Remove sleep(1) from async retry test.
chickenchickenlove Oct 11, 2024
97f05be
Remove waitAWhile().
chickenchickenlove Oct 12, 2024
d219037
Add java docs.
chickenchickenlove Oct 12, 2024
e8378da
Add author
chickenchickenlove Oct 12, 2024
967a7c4
Fixes flaky test.
chickenchickenlove Oct 12, 2024
f2c67c7
Modify contract of callback for async failure.
chickenchickenlove Oct 14, 2024
56e6fd3
Fixes java docs for setCallbackFroAsyncFailure.
chickenchickenlove Oct 14, 2024
5e70c76
Fix the tests for async failure retry.
chickenchickenlove Oct 15, 2024
1d9f7b9
Remove weed from async retry test and fix lint error.
chickenchickenlove Oct 15, 2024
e418b10
Add Tags for conditional test.
chickenchickenlove Oct 15, 2024
002aaf4
Remove useless tag for async failure retry tests.
chickenchickenlove Oct 15, 2024
3b79d59
Remove unused import
chickenchickenlove Oct 15, 2024
82ecac7
Make a method static and Add @SuppressWarning.
chickenchickenlove Oct 15, 2024
0afae4e
Remove useless latch.
chickenchickenlove Oct 15, 2024
796397f
Use specific ListenerAdapter type for async retry feature.
chickenchickenlove Oct 15, 2024
4e89d53
Fixes lint error.
chickenchickenlove Oct 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -73,6 +74,7 @@
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;

import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.context.ApplicationContext;
Expand Down Expand Up @@ -840,6 +842,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private volatile long lastPoll = System.currentTimeMillis();

private final ConcurrentLinkedDeque<FailedRecordTuple<K, V>> failedRecords = new ConcurrentLinkedDeque();

@SuppressWarnings(UNCHECKED)
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType,
ObservationRegistry observationRegistry) {
Expand Down Expand Up @@ -895,6 +899,15 @@ else if (listener instanceof MessageListener) {
this.wantsFullRecords = false;
this.pollThreadStateProcessor = setUpPollProcessor(false);
this.observationEnabled = this.containerProperties.isObservationEnabled();

if (!AopUtils.isAopProxy(listener)) {
BiConsumer<ConsumerRecord<K, V>, RuntimeException> callbackForAsyncFailureQueue =
(cRecord, ex) -> {
FailedRecordTuple<K, V> failedRecord = new FailedRecordTuple<>(cRecord, ex);
this.failedRecords.addLast(failedRecord);
artembilan marked this conversation as resolved.
Show resolved Hide resolved
};
this.listener.setCallbackForAsyncFailureQueue(callbackForAsyncFailureQueue);
}
}
else {
throw new IllegalArgumentException("Listener must be one of 'MessageListener', "
Expand Down Expand Up @@ -1294,6 +1307,16 @@ public void run() {
boolean failedAuthRetry = false;
this.lastReceive = System.currentTimeMillis();
while (isRunning()) {

try {
handleAsyncFailure();
}
catch (Exception e) {
// TODO: Need to improve error handling.
// TODO: Need to determine how to handle a failed message.
this.logger.error("Failed to process re-try messages. ");
artembilan marked this conversation as resolved.
Show resolved Hide resolved
}

try {
pollAndInvoke();
if (failedAuthRetry) {
Expand Down Expand Up @@ -1435,6 +1458,19 @@ protected void pollAndInvoke() {
}
}

protected void handleAsyncFailure() {
List<FailedRecordTuple> copyFailedRecords = new ArrayList<>();
while (!this.failedRecords.isEmpty()) {
FailedRecordTuple failedRecordTuple = this.failedRecords.pollFirst();
copyFailedRecords.add(failedRecordTuple);
}

if (!copyFailedRecords.isEmpty()) {
artembilan marked this conversation as resolved.
Show resolved Hide resolved
copyFailedRecords.forEach(failedRecordTuple ->
this.invokeErrorHandlerBySingleRecord(failedRecordTuple.record(), failedRecordTuple.ex()));
artembilan marked this conversation as resolved.
Show resolved Hide resolved
}
}

private void doProcessCommits() {
if (!this.autoCommit && !this.isRecordAck) {
try {
Expand Down Expand Up @@ -2827,6 +2863,42 @@ private void doInvokeOnMessage(final ConsumerRecord<K, V> recordArg) {
}
}

private void invokeErrorHandlerBySingleRecord(final ConsumerRecord<K, V> cRecord, RuntimeException rte) {
if (this.commonErrorHandler.seeksAfterHandling() || rte instanceof CommitFailedException) {
try {
if (this.producer == null) {
processCommits();
}
}
catch (Exception ex) { // NO SONAR
this.logger.error(ex, "Failed to commit before handling error");
}
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
records.add(cRecord);
this.commonErrorHandler.handleRemaining(rte, records, this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer);
}
else {
boolean handled = false;
try {
handled = this.commonErrorHandler.handleOne(rte, cRecord, this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer);
}
catch (Exception ex) {
this.logger.error(ex, "ErrorHandler threw unexpected exception");
}
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
if (!handled) {
records.computeIfAbsent(new TopicPartition(cRecord.topic(), cRecord.partition()),
tp -> new ArrayList<>()).add(cRecord);
}
if (!records.isEmpty()) {
this.remainingRecords = new ConsumerRecords<>(records);
this.pauseForPending = true;
}
}
}

private void invokeErrorHandler(final ConsumerRecord<K, V> cRecord,
Iterator<ConsumerRecord<K, V>> iterator, RuntimeException rte) {

Expand Down Expand Up @@ -3913,4 +3985,6 @@ private static class StopAfterFenceException extends KafkaException {

}

record FailedRecordTuple<K, V>(ConsumerRecord<K, V> record, RuntimeException ex) { };
artembilan marked this conversation as resolved.
Show resolved Hide resolved

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2024 the original author or authors.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This apparently has to be reverted as well.
I mean you revert, but don't run check locally.

*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package org.springframework.kafka.listener;

import java.util.function.BiConsumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
Expand All @@ -26,8 +28,13 @@
*
* @author Marius Bogoevici
* @author Gary Russell
* @author Sanghyeok An
*/
@FunctionalInterface
public interface MessageListener<K, V> extends GenericMessageListener<ConsumerRecord<K, V>> {

default void setCallbackForAsyncFailureQueue(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have you renamed method to Queue?
What was wrong with a Callback?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes to setCallbackForAsyncFail(...)!

BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback) {
//
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import java.time.Clock;
import java.time.Instant;
import java.util.Optional;
import java.util.function.BiConsumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -43,6 +44,7 @@
* @param <K> the record key type.
* @param <V> the record value type.
* @author Tomaz Fernandes
* @author Sanghyeok An
* @since 2.7
*
*/
Expand Down Expand Up @@ -143,4 +145,9 @@ public void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment)
public void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer) {
onMessage(data, null, consumer);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: all the changes in this class have to be reverted.

@Override
public void setCallbackForAsyncFailureQueue(BiConsumer<ConsumerRecord<K, V>, RuntimeException> callbackForAsyncFailureQueue) {
this.delegate.setCallbackForAsyncFailureQueue(callbackForAsyncFailureQueue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -91,6 +93,7 @@
* @author Wang ZhiYang
* @author Huijin Hong
* @author Soby Chacko
* @author Sanghyeok An
*/
public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerSeekAware, AsyncRepliesAware {

Expand Down Expand Up @@ -152,6 +155,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS

private String correlationHeaderName = KafkaHeaders.CORRELATION_ID;

private BiConsumer<ConsumerRecord<K, V>, RuntimeException> callbackForAsyncFailureQueue;

/**
* Create an instance with the provided bean and method.
* @param bean the bean.
Expand Down Expand Up @@ -665,16 +670,34 @@ protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgm
Throwable t, Message<?> source) {

try {
handleException(request, acknowledgment, consumer, source,
new ListenerExecutionFailedException(createMessagingErrorMessage(
"Async Fail", source.getPayload()), t));
if (t instanceof CompletionException) {
// For CompletableFuture Object
handleException(request, acknowledgment, consumer, source,
new ListenerExecutionFailedException(createMessagingErrorMessage(
"Async Fail", source.getPayload()), t.getCause()));
}
else {
// For Mono Object.
handleException(request, acknowledgment, consumer, source,
new ListenerExecutionFailedException(createMessagingErrorMessage(
"Async Fail", source.getPayload()), t));
artembilan marked this conversation as resolved.
Show resolved Hide resolved
}
}
catch (Throwable ex) {
this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
acknowledge(acknowledgment);
if (canAsyncRetry(request, ex)) {
ConsumerRecord<K, V> record = (ConsumerRecord<K, V>) request;
artembilan marked this conversation as resolved.
Show resolved Hide resolved
this.callbackForAsyncFailureQueue.accept(record, (RuntimeException) ex);
}
}
}

private boolean canAsyncRetry(Object request, Throwable exception) {
artembilan marked this conversation as resolved.
Show resolved Hide resolved
// The async retry with @RetryableTopic only support in case of SingleRecord Listener.
return request instanceof ConsumerRecord && exception instanceof RuntimeException;
}

protected void handleException(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
Message<?> message, ListenerExecutionFailedException e) {

Expand Down Expand Up @@ -869,6 +892,10 @@ private boolean rawByParameterIsType(Type parameterType, Type type) {
return parameterType instanceof ParameterizedType pType && pType.getRawType().equals(type);
}

public void putInAsyncFailureQueue(BiConsumer<ConsumerRecord<K, V>, RuntimeException> callbackForAsyncFailureQueue) {
artembilan marked this conversation as resolved.
Show resolved Hide resolved
this.callbackForAsyncFailureQueue = callbackForAsyncFailureQueue;
}

/**
* Root object for reply expression evaluation.
* @param request the request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.kafka.listener.adapter;

import java.lang.reflect.Method;
import java.util.function.BiConsumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -85,4 +86,9 @@ public void onMessage(ConsumerRecord<K, V> record, @Nullable Acknowledgment ackn
invoke(record, acknowledgment, consumer, message);
}

@Override
public void setCallbackForAsyncFailureQueue(
BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback) {
putInAsyncFailureQueue(asyncRetryCallback);
}
}
Loading