Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-3276: Support async retry with @RetryableTopic #3523

Conversation

chickenchickenlove
Copy link
Contributor

@chickenchickenlove chickenchickenlove commented Sep 30, 2024

Motivation:

From now on, when the listener end point method returns CompletableFuture<?> and Mono<?> successfully and both CompletableFuture<?> and Mono<?> fail to be completed due to unexpected error, these failure record never retried even if a developer set @Retryable topic.

If we supports async retry with @RetryableTopic, we expect that spring-kafka's will be improved.

Changes

  • Add new method for sending ConsumerRecord when both CompletableFuture<?> and Mono<?> was failed.
  • Add test cases and scenario test cases.

Idea

The Kafka's Consumer has soft lock, so multiple threads can not access the consumer simultaneously.
It means that the thread in ThreadExecutor which handle CompletableFuture or Mono cannot access the consumer.

my idea is very simple.
the KafkaMessageListenerContainer has ConcurrentLinkedDeque as its field.
Then, when MessagingMessageListenerContainer add asyncSuccess() and asyncFailure() to CompletableFuture<?> and Mono<?> as hooks.

If these future object failed to be completed, future object will call MessagingMessageListenerContainer.asyncFailure().
When MessagingMessageListenerContainer.asyncFailure() is called, FailedRecordTuple will be put to ConcurrentLinkedDeque.

Then, next loop, KafkaMessageListenerContainer will call handleAsyncFailure().
then, KafkaMessageListenerContainers follows FailedRecordTracker's logic.

Result

========== Previous Context(below) ========

Background

I investigate how to support @RetryableTopic for async.
I think it is difficult task for me to solve it. 😓
Anyway, challenges are a good thing, so I dug a little deeper. 😅
First of all, i want to discuss with the reviewers about which direction i should take for the implementation.

I implemented an ErrorHandler that sends messages to Retry and DLT topics when an error occurs, passing it as a callback to MessagingMessageListenerAdapter and inserting it into MessagingMessageListeneerAdapter#asyncFailure() to send messages to Retry and DLT topics. Although this method has some issues, it can achieve the intended purpose.

You can check successful test cases below.
shouldRetryFirstFutureTopic
shouldRetryFirstMonoTopic
shouldRetrySecondFutureTopic
shouldRetrySecondMonoTopic
shouldRetryThirdFutureTopicWithTimeout
shouldRetryThirdMonoTopicWithTimeout
shouldRetryFourthFutureTopicWithNoDlt
shouldRetryFourthMonoTopicWithNoDlt
shouldGoStraightToDltInFuture
shouldGoStraightToDltInMono

However, i realized that it is hard for spring-kafka to support back-off to async kafka listener endpoint.
image

The Backoff time is determined when KafkaMessageListenerContainer calls invokeErroHandler().
At that time, FailedRecord is computed by Thread.

// FailedRecordTracker.java
...
// NOTE: currentThread.
Thread currentThread = Thread.currentThread();
Map<TopicPartition, FailedRecord> map = this.failures.computeIfAbsent(currentThread, t -> new HashMap<>());
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
FailedRecord failedRecord = getFailedRecordInstance(record, exception, map, topicPartition);
this.retryListeners.forEach(rl ->
		rl.failedDelivery(record, exception, failedRecord.getDeliveryAttempts().get()));
long nextBackOff = failedRecord.getBackOffExecution().nextBackOff();
...

However, it is hard to imagine that CompletableFuture and Mono always will be executed in same thread.
it always will affects nextBackOff.
Thus, spring-kafka cannot support BackOff Retry in async mode.

IMHO, I think that we have 3 options.

1. @RetryableTopic does not support backoff features in async mode.

in this case, new kafka header is needed to distinguish between sync and async.
the other one is that always use minimum backoff time \to avoid KafkaBackoffException

2. Introduce micrometer/context-propagation to support backoff features in async mode.

For example, https://spring.io/blog/2023/03/28/context-propagation-with-project-reactor-1-the-basics.
I believe that we can infer proper nextBackOffTime by using context-propagation to store and restore state in each thread.

3. Big refactoring.

But i have no idea. maybe SyncKafkaMessageListenerContainer and AsyncKafkaMessageListenerContainer?

Thanks for reading this description.
What do you think? Please let me know your opinion.
Thanks in advance! 🙇‍♂️

@artembilan
Copy link
Member

Thank you, @chickenchickenlove , for such a great investigation and the crack on the possible solution!
As for that FailedRecordTracker, I think the Thread.currentThread() is a bad choice in here since it is indeed cannot be performed on a different thread.
Let's see if we can make it based on the provided Consumer instead!
Technically that one can only be performed in a single thread, however I don't think it would be any harm if we propagate the current Consumer down to the MessagingMessageListeneerAdapter#asyncFailure() where our FailedRecordTracker could pick it up for its logic.
WDYT?

@chickenchickenlove
Copy link
Contributor Author

chickenchickenlove commented Sep 30, 2024

Sounds good, @artembilan 👍
I would like to check one thing for confirmation to sync our opinions. (Sorry for my pool english 😓 )

My Understanding 1

  1. Call asyncFailure() with Consumer.
  2. asyncFailure() calls asyncRetryCallback with Consumer
  3. asyncRetryCallback calls FailedRecordTracker.restore() with Consumer.
    And then, we should investigate that Consumer can be replaced with Thread.currentThread().

My Understanding 2

  1. Call asyncFailure() with Consumer.
  2. asyncFailure() calls FailedRecordTracker directly to get nextBackOff
  3. After then, asyncFailure produce failed record to retry or dlt topic.

Which understanding is closer to what you described?
When you have time, please let me know 🙇‍♂️

@artembilan
Copy link
Member

The asyncFailure() is already called with a Consumer.
I just followed a description you have provided so far:

I implemented an ErrorHandler that sends messages to Retry and DLT topics when an error occurs, passing it as a callback to MessagingMessageListenerAdapter.

So, I assume that your impl deals already with a FailedRecordTracker.
Sorry, didn't look into the code yet.

I'm not sure what is asyncRetryCallback, but my understanding is to follow a logic of the DefaultErrorHandler:

		try {
			return getFailureTracker().recovered(record, thrownException, container, consumer);
		}

And as you see, the Consumer is there already.

@chickenchickenlove
Copy link
Contributor Author

Thanks for your description. 🙇‍♂️
I'm going to go back and dig it further.
If it works well, I'll complete the test code and get back to you. If it doesn’t go well, I'll also investigate whether there is other options.

FYI, asyncRetryCallback is almost same with KafkaMessageListenerContainer#invokeErrorHandler().

From now on, function calls flow is : asyncFailure() -> asyncRetryCallback -> KafkaMessageListenerContainer.invokeErrorHandler() -> ... -> FailedRecordTracker.recovered()

@chickenchickenlove
Copy link
Contributor Author

chickenchickenlove commented Oct 1, 2024

@artembilan ,
There is an issue when attempting an async retry using a Consumer. 😢
LegacyKafkaConsumer has a soft lock. because it is not safe for multi-threaded access. (code).

For example, we have

  • Consumer A: subscribe Topic A and poll messages.
  • Main Thread: is in charge of Consumer A main loop. (fetch, poll, and Etc.)
  • SubThread in ThreadPool Executor : is in charge of executing CompleatbleFuture or Mono instance and async retry.

In that case, ConcurrentModificationException can occurs in here.

image

Initially, only the Main Thread will own Consumer A, as there is no async failure, so there is no need to call partitionsFor().

However, once async failures occurs, a ConcurrentModificationException() may be thrown due to a race condition between the thread(Sub Thread) calling partitionsFor() and the thread(Main Thread) calling consumer.poll() or consumer.pause().

Thus, it can not be proper workaround.

How about adding a ConcurrentLinkedQueue as a field in KafkaMessageListenerContainer and using it for SubThread to send a failure record to the Main Thread? Then, Main Thread will convert it to FailedRecord and send it to Retry Topic or DLT Topic.

I imagined adding a method like performAsyncFailureSendIfNeeded() either before or after pollAndInvoke(), and calling that method to handle the async failures in the ConcurrentLinkedQueue.

What are your thoughts on this approach?

@chickenchickenlove
Copy link
Contributor Author

chickenchickenlove commented Oct 1, 2024

I imagined adding a method like performAsyncFailureSendIfNeeded() either before or after pollAndInvoke(), and calling that method to handle the async failures in the ConcurrentLinkedQueue.

If so, it seems to work well.
I implemented the code and test cases.
All test cases for async retry (CompletableFuture, Mono) passed!

I made a new commit and pushed it to current branch.
When you have time, could you please take a look? 🙇‍♂️

Would you consider this to be the right direction? If so, there are a few additional points we may need to consider 😀

I think that these are needed!

  1. Handling each failed record individually will have minimal impact on producing performance. because producer use record buffer to improve Network IO.
  2. However, it is necessary to set the maximum number of failure records to process at one time. Otherwise, the consumer will spend all its time handling failure records.
  3. We need to decide how to handle errors that occur while processing a failure record. We must determine whether to discard the failure record itself or the failure record in the concurrent queue.

@chickenchickenlove chickenchickenlove changed the title GH-3276: DRAFT discussion. GH-3276: Support async retry with @RetryableTopic Oct 8, 2024
@chickenchickenlove
Copy link
Contributor Author

Hi, @artembilan !
I added new commit for code and scenario test cases!
Also, i updated description of this PR.

When you have time, could you take a look? 🙇‍♂️
Thanks in advance!!

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what is going on now.
Thank you for digging further!
Now I know what is the power of single-threaded processors and how to handle its behavior.

So, I have only one simple concern about public API and after that it would be great to mention this feature in the docs.

Re. error handling of the error handler.
See invokeErrorHandler():

				try {
					handled = this.commonErrorHandler.handleOne(rte, cRecord, this.consumer,
							KafkaMessageListenerContainer.this.thisOrParentContainer);
				}
				catch (Exception ex) {
					this.logger.error(ex, "ErrorHandler threw unexpected exception");
				}

So, that should be enough for now to close all the gaps we have so far.

@chickenchickenlove
Copy link
Contributor Author

@artembilan , sorry to bother you.

So, I have only one simple concern about public API and after that it would be great to mention this feature in the docs.
Re. error handling of the error handler.
See invokeErrorHandler():
So, that should be enough for now to close all the gaps we have so far.

I can't understand your comments 😢.
When you have time, could you give me more detail and context...?
Thanks in advance 🙇‍♂️

*/
@FunctionalInterface
public interface MessageListener<K, V> extends GenericMessageListener<ConsumerRecord<K, V>> {

default void setCallbackForAsyncFailureQueue(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have you renamed method to Queue?
What was wrong with a Callback?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes to setCallbackForAsyncFail(...)!

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for so lengthy review, but in my feeling we are close for merge, so I'd like to clear up things around.

Thank you for incredible work you've done with this flaky use-case!

this.genericListener instanceof KafkaBackoffAwareMessageListenerAdapter<?, ?>) {
KafkaBackoffAwareMessageListenerAdapter<K, V> genListener =
(KafkaBackoffAwareMessageListenerAdapter<K, V>) this.genericListener;
if (genListener.getDelegate() instanceof RecordMessagingMessageListenerAdapter<K, V>) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed to extract pattern variable: https://www.baeldung.com/java-pattern-matching-instanceof

@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2024 the original author or authors.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This apparently has to be reverted as well.
I mean you revert, but don't run check locally.

@@ -143,4 +143,5 @@ public void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment)
public void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer) {
onMessage(data, null, consumer);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: all the changes in this class have to be reverted.

weeded.incrementAndGet();
}
});
assertThat(weeded.get()).isEqualTo(2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How this reflection stuff is related to the logic we would like to cover with this tests?
I thought we are pursuing a goal to verify that async failures are retried.
How does KafkaAdmin interaction affects an expected behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AsyncCompletableFutureRetryTopicClassLevelIntegrationTests demonstrates that the async retry feature works well in RetryTopicClassLevelIntegrationTests. It means the synchronous condition. (ConsumerRecord succeed or fail one by one)

So, I did copy and paste RetryTopicClassLevelIntegrationTests and replace KafkaListener's return type to Mono<?> or CompletableFuture<?>.

In the RetryTopicClassLevelIntegrationTests, weeded seems to demonstrate that some new topics should be created.
image

As a result of admin.newTopics(), instance weededTopics has two types (TopicForRetryable, NewTopic).
I think that the writer of RetryTopicClassLevelIntegrationTests want to verify whether expected NewTopics are created.

If there are no major issues, I would like to follow the assumptions of RetryTopicClassLevelIntegrationTests as well.
This is because I believe that AsyncRetry should also satisfy the synchronous retry condition where offsets are processed sequentially, given that AsyncRetry may or may not process offsets in sequence.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't see how this weeded are relevant to async handling.
That is really not a part of retry topic behavior, but rather its configuration.
So, having same verifications in two different places is redundant.

@chickenchickenlove chickenchickenlove force-pushed the 240929-async-retry branch 2 times, most recently from 8b2aebb to 645d7ce Compare October 11, 2024 22:41
@chickenchickenlove
Copy link
Contributor Author

chickenchickenlove commented Oct 15, 2024

@artembilan , I have a question!

Don't we need test codes for this situation? 🤔

  • Consumer Listener method which subscribe topicA returns CompletableFuture<?> or Mono<?>
  • The topicA without RetryTopicConfiguration.

I didn't wrote any test codes for this situation.
I haven't looked into it yet, but it seems there is no test code for this case.

Do you think I should write it? What are your thoughts?
Please let me know. Thanks a lot 🙇‍♂️

@artembilan
Copy link
Member

Don't we need test codes for this situation? 🤔

Consumer Listener method which subscribe topicA returns CompletableFuture or Mono
The topicA without RetryTopicConfiguration.

Sorry, I'm not totally sure in your question.
Why doesn't the AsyncListenerTests meet your expectations?

@chickenchickenlove
Copy link
Contributor Author

chickenchickenlove commented Oct 15, 2024

Sorry, I'm not totally sure in your question.
Why doesn't the AsyncListenerTests meet your expectations?

Sorry for lack of explanation 🙇‍♂️
Yes, AsyncListenerTest seems to meet my expectations. 😄

For your information,
In the AsyncListenerTest, this.genericListener in KafkaMessageListenerContainer is RecordMessagingMessageListenerAdapter. so, it doesn't have asyncFailureCallback.
So, even if Object errorResult = this.errorHandler.handleError(...); throws error, it will never call asyncFailureCallback.

However, in the AsyncMonoRetryTopicScenarioTests, this.genericListener is KafkaBackoffAwareMessageListenerAdapter. so, it has asyncFailureCallback. so it can call asyncFailureCallback after Object errorResult = this.errorHandler.handleError(...); throws error.

@artembilan
Copy link
Member

In the AsyncListenerTests, this.genericListener in KafkaMessageListenerContainer is RecordMessagingMessageListenerAdapter. so, it doesn't have asyncFailureCallback.

That's exactly what I meant about checking against this.genericListener first and then against delegate.
However this might be really good sign to not do that for regular async listener.
The error over there is handled by the MessagingMessageListenerAdapter itself.
There is nothing to do with retries, so all good so far.

@@ -895,6 +902,15 @@ else if (listener instanceof MessageListener) {
this.wantsFullRecords = false;
this.pollThreadStateProcessor = setUpPollProcessor(false);
this.observationEnabled = this.containerProperties.isObservationEnabled();

if (!AopUtils.isAopProxy(this.genericListener) &&
this.genericListener instanceof AbstractDelegatingMessageListenerAdapter<?>) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I think your previous solution was correct.
We have to set callback only in case of a KafkaBackoffAwareMessageListenerAdapter which is indeed an indicator that we are in retry topic scenario.
This way we won't break all other use-cases where such an error handling would be unexpected.

@chickenchickenlove
Copy link
Contributor Author

chickenchickenlove commented Oct 15, 2024

That's exactly what I meant about checking against this.genericListener first and then against delegate.
However this might be really good sign to not do that for regular async listener.
The error over there is handled by the MessagingMessageListenerAdapter itself.
There is nothing to do with retries, so all good so far.

As per your advice, I'm glad I debugged the AsyncListener test code.
Thanks to that, I feel more confident about writing more stable code. 🙇‍♂️

When you have time, Please take another look! 🙇‍♂️

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pulling locally for final review and possible merge.

@artembilan artembilan added this to the 3.3.0-RC1 milestone Oct 16, 2024
@artembilan artembilan removed this from the 3.3.0-RC1 milestone Oct 16, 2024
@artembilan
Copy link
Member

Merged as 9eafa61.

@chickenchickenlove ,

thank you for awesome contribution as always!

So, I've accepted those new tests as well, although they are indeed some kind of slow.
We would need to think about filtering them if they will be some maintenance burden eventually.

A couple remarks about contribution overall:

  1. It is very convenient to have a branch named after GH issue.
  2. It is better to mention in the commit message an issue your are fixing.
  3. Adding something like Fixed: #3276 would close that issue automatically on merge.
  4. The first commit of the PR becomes a description of the PR.
  5. It is great to have a good commit message to preserve a history and motivation of the change.
    The PR is fully not mentioned in the commit history, but looking into the commit I would see what is going on and why without looking for an issue and related PR.

See more info here: https://cbea.ms/git-commit/

@artembilan artembilan closed this Oct 16, 2024
@chickenchickenlove
Copy link
Contributor Author

chickenchickenlove commented Oct 17, 2024

Thanks for your time and awesome review, always! 🙇‍♂️
I will look into ways to shorten the test times by using a latch, as you suggested!.
Also, from the next PR, I will actively incorporate your comments to improve my commit messages. 🙇‍♂️
Thanks again, always!

@artembilan
Copy link
Member

So, yeah, we really got some failing tests already from this story:

AsyncCompletableFutureRetryTopicScenarioTests > oneLongSuccessMsgBetween100ShortFailMsg(TestTopicListener5, MyCustomDltProcessor) FAILED
    org.opentest4j.AssertionFailedError: 
    Expecting value to be true but was false
        at java.base@17.0.12/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at java.base@17.0.12/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
        at java.base@17.0.12/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.base@17.0.12/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
        at app//org.springframework.kafka.retrytopic.AsyncCompletableFutureRetryTopicScenarioTests.oneLongSuccessMsgBetween100ShortFailMsg(AsyncCompletableFutureRetryTopicScenarioTests.java:429)

@chickenchickenlove
Copy link
Contributor Author

I see 🥲.
Sorry for inconvenience.
I will fix it by using latch in coming weekend.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

@RetryableTopic not working for asynchronous @KafkaListener return types
3 participants