Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer stalled after commit failure during rebalance #2933

Closed
5 of 7 tasks
gridaphobe opened this issue Jun 12, 2020 · 13 comments
Closed
5 of 7 tasks

Consumer stalled after commit failure during rebalance #2933

gridaphobe opened this issue Jun 12, 2020 · 13 comments
Milestone

Comments

@gridaphobe
Copy link
Contributor

Read the FAQ first: https://github.com/edenhill/librdkafka/wiki/FAQ

Description

If an OffsetCommit request overlaps with a rebalance, the partition fetcher threads are not restarted until the OffsetCommit response is received. If the commit fails, they are not restarted at all, and the consumer appears to hang.

How to reproduce

See https://gist.github.com/gridaphobe/d1c544631c9569af810b405e572144cd. We force a commit failure by unassigning ourselves before committing, which causes the commit to be processed by the broker after the generation id has been incremented. (The actual consumer code where we discovered this issue does an asynchronous commit on a regular interval, which only sometimes overlaps with rebalances in this way.) Start one instance of the consumer, and after it has received a few messages start another one. After the rebalance, only the new consumer will receive messages even though the partitions have been evenly distributed. The original consumer never recovers, even if you shut down the other one.

IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/edenhill/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

@gridaphobe
Copy link
Contributor Author

I forgot to mention, I tested this against librdkafka 1.4.0 as well and the consumer does not stall. Here's the relevant portion of the debug logs.

%7|1592004943.002|JOIN|rdkafka#consumer-1| [thrd:main]: localhost:62625/2: Joining group "test" with 1 subscribed topic(s)
%7|1592004943.002|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "test" changed join state init -> wait-join (v6, state up)
%7|1592004943.002|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "test" received op OFFSET_COMMIT (v0) in state up (join state wait-join, v6 vs 0)
%7|1592004943.008|COMMIT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/2: Committing offsets for 4 partition(s): manual
%7|1592004943.008|JOINGROUP|rdkafka#consumer-1| [thrd:main]: JoinGroup response: GenerationId 11, Protocol range, LeaderId rdkafka-d98c643b-b00d-4c7c-b7d3-c70e75dbaa43 (me), my MemberId rdkafka-d98c643b-b00d-4c7c-b7d3-c70e75dbaa43, 2 members in group: (no error)
%7|1592004943.008|JOINGROUP|rdkafka#consumer-1| [thrd:main]: Elected leader for group "test" with 2 member(s)
%7|1592004943.009|GRPLEADER|rdkafka#consumer-1| [thrd:main]: Group "test": resetting group leader info: JoinGroup response clean-up
%7|1592004943.009|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "test" changed join state wait-join -> wait-metadata (v6, state up)
%7|1592004943.009|COMMIT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/2: OffsetCommit for 4 partition(s): manual: returned: Broker: Group rebalance in progress
%4|1592004943.009|COMMITFAIL|rdkafka#consumer-1| [thrd:main]: Offset commit (manual) failed for 4/4 partition(s): Broker: Group rebalance in progress: test_topic[0]@72(Broker: Group rebalance in progress), test_topic[1]@63(Broker: Group rebalance in progress), test_topic[2]@55(Broker: Group rebalance in progress), test_topic[3]@67(Broker: Group rebalance in progress)
%7|1592004943.009|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "test" running range assignment for 2 member(s):
%7|1592004943.009|ASSIGN|rdkafka#consumer-1| [thrd:main]:  Member "rdkafka-d98c643b-b00d-4c7c-b7d3-c70e75dbaa43" (me) with 1 subscription(s):
%7|1592004943.009|ASSIGN|rdkafka#consumer-1| [thrd:main]:   test_topic [-1]
%7|1592004943.009|ASSIGN|rdkafka#consumer-1| [thrd:main]:  Member "rdkafka-998173de-95f6-48ce-bba1-0b564bbea3c0" with 1 subscription(s):
%7|1592004943.009|ASSIGN|rdkafka#consumer-1| [thrd:main]:   test_topic [-1]
%7|1592004943.009|ASSIGN|rdkafka#consumer-1| [thrd:main]: range: Topic test_topic with 4 partition(s) and 2 subscribing member(s)
%7|1592004943.009|ASSIGN|rdkafka#consumer-1| [thrd:main]: range: Member "rdkafka-998173de-95f6-48ce-bba1-0b564bbea3c0": assigned topic test_topic partitions 0..1
%7|1592004943.009|ASSIGN|rdkafka#consumer-1| [thrd:main]: range: Member "rdkafka-d98c643b-b00d-4c7c-b7d3-c70e75dbaa43": assigned topic test_topic partitions 2..3
%7|1592004943.010|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "test" range assignment for 2 member(s) finished in 0.051ms:
%7|1592004943.010|ASSIGN|rdkafka#consumer-1| [thrd:main]:  Member "rdkafka-d98c643b-b00d-4c7c-b7d3-c70e75dbaa43" (me) assigned 2 partition(s):
%7|1592004943.010|ASSIGN|rdkafka#consumer-1| [thrd:main]:   test_topic [2]
%7|1592004943.010|ASSIGN|rdkafka#consumer-1| [thrd:main]:   test_topic [3]
%7|1592004943.010|ASSIGN|rdkafka#consumer-1| [thrd:main]:  Member "rdkafka-998173de-95f6-48ce-bba1-0b564bbea3c0" assigned 2 partition(s):
%7|1592004943.010|ASSIGN|rdkafka#consumer-1| [thrd:main]:   test_topic [0]
%7|1592004943.010|ASSIGN|rdkafka#consumer-1| [thrd:main]:   test_topic [1]
%7|1592004943.010|ASSIGNOR|rdkafka#consumer-1| [thrd:main]: Group "test": "range" assignor run for 2 member(s)
%7|1592004943.010|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "test" changed join state wait-metadata -> wait-sync (v6, state up)
%7|1592004943.011|SYNCGROUP|rdkafka#consumer-1| [thrd:main]: SyncGroup response: Success (34 bytes of MemberState data)
%7|1592004943.011|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "test": delegating assign of 2 partition(s) to application rebalance callback on queue rd_kafka_cgrp_new: new assignment
%7|1592004943.011|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "test" changed join state wait-sync -> wait-assign-rebalance_cb (v6, state up)
%7|1592004943.011|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "test" received op ASSIGN (v0) in state up (join state wait-assign-rebalance_cb, v6 vs 0)
%7|1592004943.011|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "test": new assignment of 2 partition(s) in join state wait-assign-rebalance_cb
%7|1592004943.011|BARRIER|rdkafka#consumer-1| [thrd:main]: Group "test": rd_kafka_cgrp_assign:2566: new version barrier v7
%7|1592004943.011|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "test": assigning 2 partition(s) in join state wait-assign-rebalance_cb
%7|1592004943.011|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "test" changed join state wait-assign-rebalance_cb -> assigned (v7, state up)
%7|1592004943.011|BARRIER|rdkafka#consumer-1| [thrd:main]: Group "test": rd_kafka_cgrp_partitions_fetch_start0:1848: new version barrier v8
%7|1592004943.011|FETCHSTART|rdkafka#consumer-1| [thrd:main]: Group "test": starting fetchers for 2 assigned partition(s) in join-state assigned (usable_offsets=no, v8, line 2619)

It looks like the OffsetCommit response is received before the new partitions are assigned, so the assign() is not deferred pending the commit. That seems like a race condition, but the ordering of events is very consistent in my testing: in 1.4.0 the OffsetCommit fails before we assign(), and in 1.4.2 it fails afterwards.

@gridaphobe
Copy link
Contributor Author

I just bisected the issue to commit 757b376.

@gridaphobe
Copy link
Contributor Author

@edenhill I think I see what's going on here. The above commit changed the semantics of OffsetCommit failures, in particular it retries the OffsetCommit for RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS errors, which keeps the commit alive long enough for the partition fetchers to not be restarted when the new assignment is received. Eventually the OffsetCommit fails permanently with RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, but the partition fetchers are still not restarted.

I can fix the issue by changing the error action for RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS to RD_KAFKA_ERR_ACTION_PERMANENT, which seems pretty sensible to me. If a rebalance is in progress, we know the commit will eventually fail due to an old generation id, so why bother retrying? (I think you could argue similarly that retrying RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION is pointless.)

But this fix doesn't sit well with me. Why does an outstanding commit request prevent restarting the partition fetchers in the first place? If there's a good reason, would it make sense to unconditionally restart them when the request succeeds or fails?

@edenhill
Copy link
Contributor

Wow, very impressed by your root cause analysis! 💯

@edenhill
Copy link
Contributor

This will not make v1.4.4, will address for v1.5.0

@edenhill
Copy link
Contributor

edenhill commented Jul 7, 2020

But this fix doesn't sit well with me. Why does an outstanding commit request prevent restarting the partition fetchers in the first place? If there's a good reason, would it make sense to unconditionally restart them when the request succeeds or fails?

The reason is that the fetcher might need to resume from the committed offset (which is the default behaviour), so we'll want to make sure any outstanding commits are done before we try to read the commits back from the broker.

@edenhill edenhill added this to the v1.6.0 milestone Jul 7, 2020
@gridaphobe
Copy link
Contributor Author

Ok, so we want to delay starting the fetcher until the OffsetCommit request returns to ensure it has an accurate starting point, which could either be (1) the offset we're trying to commit (if the OffsetCommit request succeeds) or (2) the previously-committed offset on the cluster (if the OffsetCommit request fails). In that case, we'd want to unconditionally restart the fetchers when outstanding OffsetCommits return, regardless of success/failure, right?

edenhill added a commit that referenced this issue Sep 16, 2020
…2933)

This also refactors the cgrp's OffsetCommit handling.
@guttulasunil
Copy link

Thanks for sharing the issue, and nice RCA. Is there any workaround for this, other than using v1.4.0?

@edenhill
Copy link
Contributor

I think it might be possible to call assign() again on the current assignment.

@guttulasunil
Copy link

Ok, let me check that.

edenhill added a commit that referenced this issue Sep 22, 2020
…2933)

This also refactors the cgrp's OffsetCommit handling.
@mkevac
Copy link

mkevac commented Oct 7, 2020

Hi! Any plans to fix this? We are seeing this problem in production constantly and this is a huge problem for us. Thanks.

@edenhill
Copy link
Contributor

There's a fix for #2933 in librdkafka v1.5.2 that you'll definitely want to have.

andremissaglia added a commit to arquivei/goduck that referenced this issue Nov 12, 2020
Besides keeping the worker updated, this release solves a bug, where consumer
would randomly get stuck during rebalances.

Links:
 - confluentinc/librdkafka#2933
andremissaglia added a commit to arquivei/goduck that referenced this issue Nov 12, 2020
Besides keeping the worker updated, this release solves a bug, where consumer
would randomly get stuck during rebalances.

Links:
 - confluentinc/librdkafka#2933
@edenhill
Copy link
Contributor

Can't reproduce on master

mfelsche pushed a commit to tremor-rs/tremor-runtime that referenced this issue Sep 28, 2021
This mitigates confluentinc/librdkafka#2933 which is fixed in 1.5.2.
This issue was leading to hanging consumers when they tried to commit during a rebalance operation.

Signed-off-by: Matthias Wahl <mwahl@wayfair.com>
mfelsche pushed a commit to tremor-rs/tremor-runtime that referenced this issue Sep 28, 2021
This mitigates confluentinc/librdkafka#2933 which is fixed in 1.5.2.
This issue was leading to hanging consumers when they tried to commit during a rebalance operation.

Signed-off-by: Matthias Wahl <mwahl@wayfair.com>
Licenser pushed a commit to tremor-rs/tremor-runtime that referenced this issue Oct 29, 2021
This mitigates confluentinc/librdkafka#2933 which is fixed in 1.5.2.
This issue was leading to hanging consumers when they tried to commit during a rebalance operation.

Signed-off-by: Matthias Wahl <mwahl@wayfair.com>
Signed-off-by: Heinz N. Gies <heinz@licenser.net>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants