-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Convert commitAsync callback handling to ZIO sooner #1404
Conversation
…cala Co-authored-by: Erik van Oosten <e.vanoosten@grons.nl>
…cala Co-authored-by: Erik van Oosten <e.vanoosten@grons.nl>
…cala Co-authored-by: Erik van Oosten <e.vanoosten@grons.nl>
With rebalanceSafeCommits, we expect all pulled records to be processed and committed.
Helps understanding the logic here
…-rebalance-listener-file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not done but I have to go. Will look further another time.
Please be aware that this PR is consistently 10ms slower than main
(see the benchmarks).
zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Outdated
Show resolved
Hide resolved
zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Outdated
Show resolved
Hide resolved
zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala
Outdated
Show resolved
Hide resolved
zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala
Outdated
Show resolved
Hide resolved
zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala
Outdated
Show resolved
Hide resolved
zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Outdated
Show resolved
Hide resolved
zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Outdated
Show resolved
Hide resolved
…tter.scala Co-authored-by: Erik van Oosten <e.vanoosten@grons.nl>
…cala Co-authored-by: Erik van Oosten <e.vanoosten@grons.nl>
zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me!
private def commitAsyncZIO( | ||
consumer: ByteArrayKafkaConsumer, | ||
offsets: Map[TopicPartition, OffsetAndMetadata], | ||
doOnComplete: Either[Exception, Map[TopicPartition, OffsetAndMetadata]] => UIO[Unit] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this PR we now always provide a callback to commitAsync
, even when the given list of offsets is empty (so a callback is not neccesary). During a rebalance, this method is called a lot (every 100ms) and it is likely that most of these calls do not need a callback.
WDYT of making doOnComplete
an Option
, and then when it's None
we pass null
to commitAsync
like we did before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the other hand, doing this removes the ability to react to errors... 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah I totally missed this. But I don't think it would make a large performance difference if it's only once every 100 ms.
KafkaConsumer's
commitAsync
takes a callback, which we program against with complicated followup code. This PR attempts to convert everything to ZIO's earlier on, making chaining followup effects easier to reason about.As this changes some functionality around locking and same / single threads, here's a summary of what do we need to ensure:
commitAsync
. InRunloop.run
this is done usingConsumerAccess
. In the rebalance coordinator (while rebalancing) we already have the lock as we're callingpoll()
so no need for extra locking.Runloop.run
we get this for free by guaranteeing exclusive access. In the rebalance coordinator apoll()
call is in the middle of being executed and we need to callcommitAsync
on the same thread as the rebalance listener is invoked.Anything that is not calling commitAsync is free to run on any thread as executed by the default ZIO runtime.