-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extract Committer and RebalanceCoordinator classes from Runloop + unit tests #1375
Conversation
…cala Co-authored-by: Erik van Oosten <e.vanoosten@grons.nl>
…cala Co-authored-by: Erik van Oosten <e.vanoosten@grons.nl>
…cala Co-authored-by: Erik van Oosten <e.vanoosten@grons.nl>
zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopRebalanceListener.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea of extracting the Committer
is really nice. Without this, extracting the rebalance listener would be very hard.
I don't see any thing obviously wrong but since so much code moved around this is a huge PR. So I will have to take a deeper look once this PR exists draft.
zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala
Outdated
Show resolved
Hide resolved
With rebalanceSafeCommits, we expect all pulled records to be processed and committed.
Helps understanding the logic here
_ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic)) | ||
topic <- randomTopic | ||
clientId <- randomClient | ||
_ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic)).flatMap(ZIO.fromTry(_)).retryN(3).catchSome { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the retrying needed here, and not in other tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't figure that out either..
zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitterSpec.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was a bit disappointed that cleaner code gives more code (this PR effectively adds 593 lines of code). Otherwise I am happy with the split.
It is no longer directly obvious which code executes on the same-thread-runtime (with its restrictions in ZIO operators). Therefore, I propose that methods that are used from the rebalancer will be clearly marked. I proposed a few changes for that, but perhaps there are more.
Approving, but please look at the suggestions.
The code is mostly just moved to a different place, the logic was left mostly intact. The 'interface' between the components has been decoupled more, i.e. the rebalance listener no longer access the full Runloop's State and the pending commits are stored internally in the Committer.
Care has been taken to make the Committer usable during rebalancing as well, with the proper access to the Consumer for example. The part that waits the end of the streams and their commits has been changed to use the Committer.
Includes unit tests for the two new components.