Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-3276: Support async retry with @RetryableTopic #3523

Closed
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
eee3515
Test
chickenchickenlove Sep 29, 2024
fdc59f0
DRAFT
chickenchickenlove Sep 30, 2024
87ac522
GH-3276: Draft and test codes.
chickenchickenlove Oct 1, 2024
723cc18
Add test cases.
chickenchickenlove Oct 6, 2024
ac110b4
Add method
chickenchickenlove Oct 6, 2024
e30319d
Fixes lint errors.
chickenchickenlove Oct 7, 2024
6399b81
Minimize FailedRecordTuple dependency.
chickenchickenlove Oct 9, 2024
4a4a1ee
Refactor test codes and real codes.
chickenchickenlove Oct 10, 2024
08e7721
Modify method name.
chickenchickenlove Oct 10, 2024
c0fea39
Fixes compile error.
chickenchickenlove Oct 10, 2024
e74072c
Remove async retry callback from MessageListener contract.
chickenchickenlove Oct 11, 2024
645d7ce
Revert Copyright period.
chickenchickenlove Oct 11, 2024
0c89c92
Revert KafkaBackkOffAwareMessageListenerAdapter.
chickenchickenlove Oct 11, 2024
3f6b447
Depends on general type on KafkaMessageListenerContainer.
chickenchickenlove Oct 11, 2024
01de80e
Remove sleep(1) from async retry test.
chickenchickenlove Oct 11, 2024
97f05be
Remove waitAWhile().
chickenchickenlove Oct 12, 2024
d219037
Add java docs.
chickenchickenlove Oct 12, 2024
e8378da
Add author
chickenchickenlove Oct 12, 2024
967a7c4
Fixes flaky test.
chickenchickenlove Oct 12, 2024
f2c67c7
Modify contract of callback for async failure.
chickenchickenlove Oct 14, 2024
56e6fd3
Fixes java docs for setCallbackFroAsyncFailure.
chickenchickenlove Oct 14, 2024
5e70c76
Fix the tests for async failure retry.
chickenchickenlove Oct 15, 2024
1d9f7b9
Remove weed from async retry test and fix lint error.
chickenchickenlove Oct 15, 2024
e418b10
Add Tags for conditional test.
chickenchickenlove Oct 15, 2024
002aaf4
Remove useless tag for async failure retry tests.
chickenchickenlove Oct 15, 2024
3b79d59
Remove unused import
chickenchickenlove Oct 15, 2024
82ecac7
Make a method static and Add @SuppressWarning.
chickenchickenlove Oct 15, 2024
0afae4e
Remove useless latch.
chickenchickenlove Oct 15, 2024
796397f
Use specific ListenerAdapter type for async retry feature.
chickenchickenlove Oct 15, 2024
4e89d53
Fixes lint error.
chickenchickenlove Oct 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -73,6 +74,7 @@
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;

import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.context.ApplicationContext;
Expand Down Expand Up @@ -105,7 +107,9 @@
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.AsyncRepliesAware;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.KafkaUtils;
Expand Down Expand Up @@ -167,6 +171,7 @@
* @author Mikael Carlstedt
* @author Borahm Lee
* @author Lokesh Alamuri
* @author Sanghyeok An
*/
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
Expand Down Expand Up @@ -840,6 +845,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private volatile long lastPoll = System.currentTimeMillis();

private final ConcurrentLinkedDeque<FailedRecordTuple<K, V>> failedRecords = new ConcurrentLinkedDeque<>();

@SuppressWarnings(UNCHECKED)
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType,
ObservationRegistry observationRegistry) {
Expand Down Expand Up @@ -895,6 +902,15 @@ else if (listener instanceof MessageListener) {
this.wantsFullRecords = false;
this.pollThreadStateProcessor = setUpPollProcessor(false);
this.observationEnabled = this.containerProperties.isObservationEnabled();

if (!AopUtils.isAopProxy(this.genericListener) &&
this.genericListener instanceof AbstractDelegatingMessageListenerAdapter<?>) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you check only for an AbstractDelegatingMessageListenerAdapter?
Why regular this.genericListener cannot be as a RecordMessagingMessageListenerAdapter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the runtime, KafkaBackOffAwareMessageListenerAdapter is set for this.genericListener and this.listener.
The KafkaBackOffAwareMessageListenerAdapter has reference of RecordMessagingMessageListenerAdapter as its delegate.

Please refer to a image below 🙇‍♂️
image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. What I mean that it might be better to have a generic solution. We with @sobychacko are working for async observation and gave sumilar condition almost at the same line of code. So, I’m anticipating to combine them in the end. Therefore this clarification is useful . Thanks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I think your previous solution was correct.
We have to set callback only in case of a KafkaBackoffAwareMessageListenerAdapter which is indeed an indicator that we are in retry topic scenario.
This way we won't break all other use-cases where such an error handling would be unexpected.

AbstractDelegatingMessageListenerAdapter<MessageListener<K, V>> genListener =
(AbstractDelegatingMessageListenerAdapter<MessageListener<K, V>>) this.genericListener;
if (genListener.getDelegate() instanceof RecordMessagingMessageListenerAdapter<K, V> adapterListener) {
adapterListener.setCallbackForAsyncFailure(this::callbackForAsyncFailure);
}
}
}
else {
throw new IllegalArgumentException("Listener must be one of 'MessageListener', "
Expand Down Expand Up @@ -1294,6 +1310,15 @@ public void run() {
boolean failedAuthRetry = false;
this.lastReceive = System.currentTimeMillis();
while (isRunning()) {

try {
handleAsyncFailure();
}
catch (Exception e) {
ListenerConsumer.this.logger.error(
"Failed to process async retry messages. skip this time, try it again next loop.");
}

try {
pollAndInvoke();
if (failedAuthRetry) {
Expand Down Expand Up @@ -1435,6 +1460,29 @@ protected void pollAndInvoke() {
}
}

protected void handleAsyncFailure() {
List<FailedRecordTuple<K, V>> copyFailedRecords = new ArrayList<>();
while (!this.failedRecords.isEmpty()) {
FailedRecordTuple<K, V> failedRecordTuple = this.failedRecords.pollFirst();
copyFailedRecords.add(failedRecordTuple);
}

// If any copied and failed record fails to complete due to an unexpected error,
// We will give up on retrying with the remaining copied and failed Records.
for (FailedRecordTuple<K, V> copyFailedRecord : copyFailedRecords) {
try {
invokeErrorHandlerBySingleRecord(copyFailedRecord);
}
catch (Exception e) {
this.logger.warn(() ->
"Async failed record failed to complete, thus skip it. record :"
+ copyFailedRecord.toString()
+ ", Exception : "
+ e.getMessage());
}
}
}

private void doProcessCommits() {
if (!this.autoCommit && !this.isRecordAck) {
try {
Expand Down Expand Up @@ -2827,6 +2875,44 @@ private void doInvokeOnMessage(final ConsumerRecord<K, V> recordArg) {
}
}

private void invokeErrorHandlerBySingleRecord(final FailedRecordTuple<K, V> failedRecordTuple) {
final ConsumerRecord<K, V> cRecord = failedRecordTuple.record;
RuntimeException rte = failedRecordTuple.ex;
if (this.commonErrorHandler.seeksAfterHandling() || rte instanceof CommitFailedException) {
try {
if (this.producer == null) {
processCommits();
}
}
catch (Exception ex) { // NO SONAR
this.logger.error(ex, "Failed to commit before handling error");
}
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
records.add(cRecord);
this.commonErrorHandler.handleRemaining(rte, records, this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer);
}
else {
boolean handled = false;
try {
handled = this.commonErrorHandler.handleOne(rte, cRecord, this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer);
}
catch (Exception ex) {
this.logger.error(ex, "ErrorHandler threw unexpected exception");
}
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
if (!handled) {
records.computeIfAbsent(new TopicPartition(cRecord.topic(), cRecord.partition()),
tp -> new ArrayList<>()).add(cRecord);
}
if (!records.isEmpty()) {
this.remainingRecords = new ConsumerRecords<>(records);
this.pauseForPending = true;
}
}
}

private void invokeErrorHandler(final ConsumerRecord<K, V> cRecord,
Iterator<ConsumerRecord<K, V>> iterator, RuntimeException rte) {

Expand Down Expand Up @@ -3299,6 +3385,10 @@ private Collection<ConsumerRecord<K, V>> getHighestOffsetRecords(ConsumerRecords
.values();
}

private void callbackForAsyncFailure(ConsumerRecord<K, V> cRecord, RuntimeException ex) {
this.failedRecords.addLast(new FailedRecordTuple<>(cRecord, ex));
}

@Override
public void seek(String topic, int partition, long offset) {
this.seeks.add(new TopicPartitionOffset(topic, partition, offset));
Expand Down Expand Up @@ -3913,4 +4003,6 @@ private static class StopAfterFenceException extends KafkaException {

}

private record FailedRecordTuple<K, V>(ConsumerRecord<K, V> record, RuntimeException ex) { };

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -91,6 +93,7 @@
* @author Wang ZhiYang
* @author Huijin Hong
* @author Soby Chacko
* @author Sanghyeok An
*/
public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerSeekAware, AsyncRepliesAware {

Expand Down Expand Up @@ -152,6 +155,9 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS

private String correlationHeaderName = KafkaHeaders.CORRELATION_ID;

@Nullable
private BiConsumer<ConsumerRecord<K, V>, RuntimeException> callbackForAsyncFailure;

/**
* Create an instance with the provided bean and method.
* @param bean the bean.
Expand Down Expand Up @@ -665,16 +671,27 @@ protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgm
Throwable t, Message<?> source) {

try {
Throwable cause = t instanceof CompletionException ? t.getCause() : t;
handleException(request, acknowledgment, consumer, source,
new ListenerExecutionFailedException(createMessagingErrorMessage(
"Async Fail", source.getPayload()), t));
new ListenerExecutionFailedException(createMessagingErrorMessage(
"Async Fail", source.getPayload()), cause));
}
catch (Throwable ex) {
this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
acknowledge(acknowledgment);
if (canAsyncRetry(request, ex) &&
Objects.nonNull(this.callbackForAsyncFailure)) {
ConsumerRecord<K, V> record = (ConsumerRecord<K, V>) request;
artembilan marked this conversation as resolved.
Show resolved Hide resolved
this.callbackForAsyncFailure.accept(record, (RuntimeException) ex);
}
}
}

private boolean canAsyncRetry(Object request, Throwable exception) {
artembilan marked this conversation as resolved.
Show resolved Hide resolved
// The async retry with @RetryableTopic only support in case of SingleRecord Listener.
return request instanceof ConsumerRecord && exception instanceof RuntimeException;
}

protected void handleException(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
Message<?> message, ListenerExecutionFailedException e) {

Expand Down Expand Up @@ -869,6 +886,18 @@ private boolean rawByParameterIsType(Type parameterType, Type type) {
return parameterType instanceof ParameterizedType pType && pType.getRawType().equals(type);
}

/**
* Set the retry callback for failures of both {@link CompletableFuture} and {@link Mono}.
* {@link MessagingMessageListenerAdapter#asyncFailure(Object, Acknowledgment, Consumer, Throwable, Message)}
* will invoke {@link MessagingMessageListenerAdapter#callbackForAsyncFailure} when
* {@link CompletableFuture} or {@link Mono} fails to complete.
* @param asyncRetryCallback the callback for async retry.
* @since 3.3
*/
public void setCallbackForAsyncFailure(BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback) {
artembilan marked this conversation as resolved.
Show resolved Hide resolved
this.callbackForAsyncFailure = asyncRetryCallback;
}

/**
* Root object for reply expression evaluation.
* @param request the request.
Expand Down
Loading