Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Micronaut Kafka Listener commits messages at shutdown that should not be commited with offset strategy SYNC #315

Closed
2 tasks done
JoernSchimmelpfeng opened this issue Feb 3, 2021 · 10 comments · Fixed by #441
Assignees
Labels
priority: high High priority

Comments

@JoernSchimmelpfeng
Copy link

JoernSchimmelpfeng commented Feb 3, 2021

With OffsetStrategy set to SYNC or ASYNC the KafkaListner should not commit messages when an Exception occurs. However during shutdown all messages get committed regardless if there was an Exception or not.

Task List

  • [X ] Steps to reproduce provided
  • Stacktrace (if present) provided
  • Example that reproduces the problem uploaded to Github
  • [ X] Full description of the issue provided (see below)

Steps to Reproduce

  1. Have an Micronaut application with a KafkaListener in SYNC mode listening to a Kafka topic.
  2. Submit an event to this topic.
  3. Start process the event but have an Exception thrown during processing (like a SqlException during a database operation).
  4. You now have one message lag on your consumer group as you can validate on Kafka. (as documented)
  5. Stop the application
  6. You now have a zero message lag on your consumer group as you can validate on Kafka.

Expected Behaviour

The KafkaListener should not commit messages with offset strategy SYNC (or ASYNC) at shutdown time that could not be processed due to an Exception.

Actual Behaviour

All events get committed. Only with offset strategy DISABLED this set is skipped.

@JoernSchimmelpfeng
Copy link
Author

Guys this can potentially lead to data loss! Any opinion about this?

@graemerocher
Copy link
Contributor

@burtbeckwith can you please prioritise this issue

@burtbeckwith
Copy link
Member

@JoernSchimmelpfeng you checked "example on GitHub" - do you have a test app?

@JoernSchimmelpfeng
Copy link
Author

JoernSchimmelpfeng commented Feb 26, 2021

Sorry for my stupidity. I somehow checked the box and could not uncheck it again. Perhaps a browser issue.
However I can provide you with everything that you need, but it would take one or two days. I can also provide a PR if you like since I believe I know the cause. There is a finally block committing everything when leaving the consumer loop.

@graemerocher
Copy link
Contributor

@JoernSchimmelpfeng PRs welcome!

@graemerocher graemerocher added the priority: high High priority label Feb 28, 2021
JoernSchimmelpfeng added a commit to JoernSchimmelpfeng/micronaut-kafka that referenced this issue Mar 21, 2021
@seancarroll
Copy link
Contributor

I think I've noticed similar behavior. I see JoernSchimmelpfeng added a commit. Is there anything outstanding regarding his proposed fix?

@graemerocher
Copy link
Contributor

@seancarroll can you test the proposed PR and potentially provide a test? it hasn't been merged because it doesn't include a test

@seancarroll
Copy link
Contributor

@graemerocher, given the number of commits behind the PR is where does it make sense to potentially add a test to?

@graemerocher
Copy link
Contributor

You could either rebase the PR or create another PR based off that one and leave the rebase to us

@seancarroll
Copy link
Contributor

seancarroll commented Sep 9, 2021

Sounds good. I think I have a test that shows for one scenario but it does require an exception to be thrown. In another test that only has a wakeup i'm still seeing behavior in which commits are still taking place. Its unclear to me right now why on a WakeUpException we would ever want to do a commit sync. Or perhaps more specifically why doing a commit in the finally block is required. There also might be something else going in the scenario in which I'm only calling wakeup that is causing commits which I dont think should be occurring. Still attempting to understand the behavior. In particular one example (test) is when using batch and a WakeUpException occurs in the middle of processing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority: high High priority
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants