Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-3407 : Support KafkaHeaders.DELIVERY_ATTEMP for batch listeners. #3539

Merged

Conversation

chickenchickenlove
Copy link
Contributor

Motivation:

The developer expects to get the KafkaHeaders.DELIVERY_ATTEMPT header from the received ConsumerRecord.
It works well with the SingleRecordListener.
However, it does not work well with the BatchListener even when containerProperties.setDeliveryAttemptHeader(true) is set.
If the KafkaContainerListener can include the KafkaHeaders.DELIVERY_ATTEMPT header for each ConsumerRecord with the BatchListener, it will be more helpful to people who they expect.

Modifications

  • Add DeliveryAttemptAwareRetryListener class.
  • Add unit tests and integration tests for DeliveryAttemptAwareRetryListener.

To Reviewer

With current spring-kafka codes, it is hard to integrate it to FailedRecordTracker.
By the way, the function ErrorHandlingUtils.retryBatch() has deliveryAttempt count as their local scope and has same reference for ConsumerRecord.
Thus, the DeliveryAttemptAwareRetryListener is easier way to achieve the goal.
Because, in each fail to invocation, ErrorHandlingUtils.retryBatch() calls listeners.forEach(listener -> listener.failedDelivery(records, thrownException, attempt).

If the developer want to use this feature, they should create ContainerFactory and DeliveryAttemptAwareRetryListener. also, they set DeliveryAttemptAwareRetryListener as RetryListener.
Please refer to code below.

final FixedBackOff fixedBackOff = new FixedBackOff(1000L, MAX_ATTEMPT_COUNT1);
DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener());

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);

Result:

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see now how this makes sense.
Thank you for fixing it!

Please, consider to mention a new feature in the docs, including a short entry with a link into target doc in the whats-new.adoc.

@chickenchickenlove
Copy link
Contributor Author

@artembilan thanks for your time and comments!
I made a new commit to apply your comments.
When you have time, please take another look 🙇‍♂️

@chickenchickenlove
Copy link
Contributor Author

@artembilan thanks for your time for looking into changes, again! 🙇‍♂️
I made a new commit to apply your comments.
When you have time, please take another look!

Always thanks to read my code 🙇‍♂️

@chickenchickenlove
Copy link
Contributor Author

@artembilan , It's my bad!
I cleaned up the code that I thought was useless.
When you have time, please take another look 🙇‍♂️

@artembilan artembilan merged commit 5f26b72 into spring-projects:main Oct 10, 2024
3 checks passed
@artembilan
Copy link
Member

Thank you, @chickenchickenlove , again for contribution!

Please, take a look into this article about commit messages: https://cbea.ms/git-commit/.
Also take into account that first commit message becomes PR description automatically.
Just for a sake to save some yours and ours time:

  • first - no need to write separate PR description
  • second - no need to write extra commit message on merge
  • and last - the commit message is exactly what we always need to be good. The PR easily might be lost, but commit is there forever.

Now imaging time is passing and we don't have contacts with you to remind us whatever was done in that commit years ago.
but if you write a good commit message, it will be merged there into a history with respective honor outcome.

@chickenchickenlove
Copy link
Contributor Author

Thanks for your time 🙇‍♂️
As you mentioned, I will try to write good commit message 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

KafkaHeaders.DELIVERY_ATTEMPT is not added for batch listeners
2 participants