-
Notifications
You must be signed in to change notification settings - Fork 278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pass BaseConsumer to ConsumerContext::rebalance #636
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is a lot simpler than I imagined
This fixes #627 btw |
@loewenheim @Swatinem thanks a lot for working on this. We're currently investigating an issue on the latest release (as reported on #638). Once we manage to identify/solve it, we'll tackle this. Sorry for the delay. |
When this lands, I will be the happiest person on earth that uses this library. |
The PR looks good to me 👍 . I tried to find a way to pass in the configured consumer instead of the base one but I couldn't find an easy path. cc @duarten @davidblewett in case you wanna give it another view before merging. A common ask in the future could be: why are we receiving the base consumer instead of my configured one in the rebalance cb? @loewenheim can you update the CustomContext for the simple_consumer.rs example? |
@scanterog Done, thanks for the heads-up. |
@loewenheim thanks for the contribution. I would like to get #644 merged with a point release cut, then I will merge this and do a version bump. |
@davidblewett do you know when you will be able to release a new version with this change? thank you! |
We were waiting for fede1024/rust-rdkafka#636 to be merged, but then didn't follow up on this. Now we can finally release rust-arroyo on crates.io
Description of the change
This changes the interface of
ConsumerContext
in the following ways:&NativeClient
parameter inrebalance
is replaced with a&BaseConsumer<Self>
parameter.pre_rebalance
andpost_rebalance
also gain&BaseConsumer<Self>
parameters.This also requires
BaseConsumer::native_client
method;Sized
bound toConsumerContext
.Reason for the change
We're building an abstraction around Kafka at Sentry. It's set up in such a way that offsets are committed in batches, not individually, for performance reasons. This means, however, that when a rebalance happens, there may still be outstanding offsets that need to be committed. We want to do this by means of the
pre_rebalance
hook onConsumerContext
, but the problem is that this callback has no access to theBaseConsumer
(or any client that could perform the commit). We previously tried holding anArc
to theBaseConsumer
in theConsumerContext
, but that leads to a mire of reference cycles and deadlocks.Passing the
BaseConsumer
to the callbacks would neatly enable this use case for us.