-
Notifications
You must be signed in to change notification settings - Fork 41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix/rebalancing #30
fix/rebalancing #30
Conversation
we're facing the same issue, waiting this PR to be merged |
+1 here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it works for sessionless consumers.
pkg/kafka/subscriber.go
Outdated
@@ -578,6 +581,9 @@ ResendLoop: | |||
case <-ctx.Done(): | |||
h.logger.Trace("Closing, ctx cancelled before sent to consumer", receivedMsgLogFields) | |||
return nil | |||
case <-sessionContext.Done(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
won't this fail since sess
is nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Session can't be nil. Saram won't start consuming in such case. By the way sarama does the same check before consuming starts, but if rebalancing started after, during processing messages, it hangs on trying commit offset with staled session.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I've overlooked it. Fixed this.
Thank you for the contribution 🙏 |
Checking that the session context has been canceled. If this is not done, after rebalancing consumer will continue to work, but offset will not be able to commit because of error: