-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Retry ErrNotCoordinatorForConsumer in ConsumerGroup.newSession #1231
Conversation
Signed the CLA. |
Something to possibly include in this would be that the heartbeatLoop can check for the It would change https://github.com/Shopify/sarama/blob/96e43a884d5ef985c98dc02e5ec6904a2b8b1d1c/consumer_group.go#L664 to look something like
|
no update ? |
This is reasonable and would improve consumer resiliency. @thomaslee Could you add a unit test to cover this use case? |
This is nice, please add a test and we can get it going. |
@sam-obeid @varun06 sure, but what's the best way to test this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, this edge case is bit tricky to test. code looks good to me.
Group coordinators seem to be selected from among those brokers which lead
__consumer_offsets
partitions, so when leadership of__consumer_offsets
partitions move from one broker to another (e.g. due to an outage, or a controlled reassignment of partition replicas) the coordinator for a given group may move with it.Sarama usually seems to handle these events by returning nil from
ConsumerGroup.Consume()
after a heartbeat fails due to aErrNotCoordinatorForConsumer
. In these cases it seems like you can just callConsumerGroup.Consume()
again to ask the consumer to resume where it left off. If you do this, what you typically see something like this in the logs when group coordinators move:However, if you're unlucky
ConsumerGroup.Consume()
can sometimes returnErrNotCoordinatorForConsumer
& fail in such a way that the ConsumerGroup never looks up the new group coordinator:At this point the ConsumerGroup seems to be in a broken state: checking the return value of
Consume()
forErrNotCoordinatorForConsumer
& simply retrying the call toConsume()
doesn't help since repeated calls toConsumerGroup.Consume()
will continue to returnErrNotCoordinatorForConsumer
forever. Something like this:This is obviously not a great state for the consumer to be in & can obviously cause problems for consumers during basic cluster maintenance operations or broker outages/downtime.
We seem to get into this state because we're not attempting coordinator refreshes during calls to
ConsumerGroup.newSession()
, which is what I'm attempting to address in this PR. With this change applied, I can't reproduce the issue.Example log output (prior to applying this change)
See this gist for an example of the problem this is trying to fix: https://gist.githubusercontent.com/thomaslee/b70498216fb04f6b02f26c21ef93a046/raw/6ac08ba203f6f86c75c65b9851acb7366c38fc7a/sarama-bug.txt
At the end of this output, the consumer process exits. (Note that the
**
output here is from some light edits of vendored Sarama code for diagnostic purposes. Note too the whining about MaxWaitTime being low, which I don't believe is actually a prerequisite to reproducing this bug.)Possible work-around
I haven't actually tried this myself, but it seems like until this PR is merged folks should be able to simply call
Client.RefreshCoordinator(yourGroupHere)
whenConsumerGroup.Consume()
returnsErrNotCoordinatorForConsumer
(perhaps with some backoff logic). Note that this requires users to create theirConsumerGroup
s usingNewConsumerGroupFromClient
.