Skip to content

KAFKA-13419: Only reset generation ID when ILLEGAL_GENERATION error#11451

Merged
guozhangwang merged 9 commits intoapache:trunkfrom
showuon:KAFKA-13419
Dec 17, 2021
Merged

KAFKA-13419: Only reset generation ID when ILLEGAL_GENERATION error#11451
guozhangwang merged 9 commits intoapache:trunkfrom
showuon:KAFKA-13419

Conversation

@showuon
Copy link
Member

@showuon showuon commented Oct 29, 2021

Updated: This PR will reset generation ID when ILLEGAL_GENERATION error since the member ID is still valid.

=====
resetStateAndRejoin when REBALANCE_IN_PROGRESS error in sync group, to avoid out-of-date ownedPartition

== JIRA description ==
In KAFKA-13406, we found there's user got stuck when in rebalancing with cooperative sticky assignor. The reason is the "ownedPartition" is out-of-date, and it failed the cooperative assignment validation.

Investigate deeper, I found the root cause is we didn't reset generation and state after sync group fail. In KAFKA-12983, we fixed the issue that the onJoinPrepare is not called in resetStateAndRejoin method. And it causes the ownedPartition not get cleared. But there's another case that the ownedPartition will be out-of-date. Here's the example:

  1. consumer A joined and synced group successfully with generation 1
  2. New rebalance started with generation 2, consumer A joined successfully, but somehow, consumer A doesn't send out sync group immediately
  3. other consumer completed sync group successfully in generation 2, except consumer A.
  4. After consumer A send out sync group, the new rebalance start, with generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group response
  5. When receiving REBALANCE_IN_PROGRESS, we re-join the group, with generation 3, with the assignment (ownedPartition) in generation 1.
  6. So, now, we have out-of-date ownedPartition sent, with unexpected results happened

We might want to do resetStateAndRejoin when RebalanceInProgressException errors happend in sync group. Because when we got sync group error, it means, join group passed, and other consumers (and the leader) might already completed this round of rebalance. The assignment distribution this consumer have is already out-of-date.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@showuon
Copy link
Member Author

showuon commented Oct 29, 2021

@ableegoldman @guozhangwang @dajac @hachikuji , do you think this change make sense? Thanks.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @showuon thanks for reporting this issue. Just for my understanding: in join-group request protocol, we do not set the generation id, the generation id is only set as part of the protocol user data when sticky assignor is used. Only resetting the generation at the coordinator layer would not change what user data would be serialized and sent to the brokers. Am I missing something?

@showuon
Copy link
Member Author

showuon commented Nov 2, 2021

@guozhangwang , thanks for your comment. Answer your question below.

Only resetting the generation at the coordinator layer would not change what user data would be serialized and sent to the brokers. Am I missing something?

--> The point here is, when reset state and generation, we'll mark them as default value (i.e. NO_GENERATION for generation), and also mark the consumer as needsJoinPrepare, and needsRejoin. That means, when the consumer do next poll, it'll enter onJoinPrepare, and in ConsumerCoordinator, we'll clean up all the assigned partition in this consumer if it's NO_GENERATION, and then rejoin the group:

if (generation == Generation.NO_GENERATION.generationId &&
            memberId.equals(Generation.NO_GENERATION.memberId)) {
            revokedPartitions = new HashSet<>(subscriptions.assignedPartitions());

            if (!revokedPartitions.isEmpty()) {
                log.info("Giving away all assigned partitions as lost since generation has been reset," +
                    "indicating that consumer is no longer part of the group");
                exception = invokePartitionsLost(revokedPartitions);

                subscriptions.assignFromSubscribed(Collections.emptySet());
            }
        } 

That is, when we got REBALANCE_IN_PROGRESS in syncGroup response, that might mean that this consumer didn't complete previous round of rebalance (only pass joinGrgoup) and didn't get the assignment in previous round, while there are other consumers completed it. So, the ownedPartition in this consumer is out-of-date, and we should clean up this consumer's assignment (ownedPartition), and rejoin this group again. One thing to note here is that, since the consumer pass the joinGroup, the consumer leader will distribute some assignment to this consumer, too. So, that's why this out-of-date ownedPartition will cause rebalance never complete issue later(ex: KAFKA-13406).

Does that make sense?

@guozhangwang
Copy link
Contributor

Hi @showuon I think we were referring to different things :) There are different places where we encode the assigned partitions as well as generation:

  1. Inside the ConsumerCoordinator#subscriptions, where we save the currently assigned partitions.
  2. Inside the Assignor#userData, where for (cooperative) sticky assignor where we also encode the generation and the prev-assigned partitions as memberAssignment (note that in join-group request we do not encode the generation id).

My above comment is referring to 2) above, which is only set upon onAssignment, which means that, even if any things happens that caused the consumer to revoke previously owned partitions, the user data would still encode those partitions to the sticky assignor.

Maybe you are trying to fix 1) above only, in which case that's also fine, but do you feel 2) is also an issue that needs to be fixed separately?

@showuon
Copy link
Member Author

showuon commented Nov 3, 2021

@guozhangwang , that's a good point. Yes, I was focusing on the fix 1) above only. For 2), yes, we should also fix that, but I need some time to think a good way to fix that issue. And then open a jira ticket for it.

However, for 1) only, we can fix the issue in cooperativeStickeyAssignor, because in cooperative sticky assignor, we only encode generation info in Assignor#userData. That means, when we clean up the ownedPartition in subscriptions in coordinator layer, even the generation in Assignor#userData is up-to-date, it won't affect the assignor do the assignment. The consumer lead will get the empty ownedPartition with correct generation ID from that consumer, which is fine to us.

I'll continue to add tests for this PR.

Thank you!

@guozhangwang
Copy link
Contributor

@showuon Sounds good, let's just focus on 1) here then. The proposed fix looks reasonable to me.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon Could you check if the failed tests are relevant? I could re-trigger it if you think they are not.

@guozhangwang guozhangwang changed the title [WIP] KAFKA-13419: resetStateAndRejoin when rebalanceInProgress in syncgroup KAFKA-13419: resetStateAndRejoin when rebalanceInProgress in syncgroup Nov 8, 2021
@showuon
Copy link
Member Author

showuon commented Nov 8, 2021

@guozhangwang , no, no need to retrigger it. I found some slow rebalance issue after this change, an have a proposal for that. I should be able to finish the KIP today (my time). I'll let you know. Thank you.

@showuon
Copy link
Member Author

showuon commented Nov 9, 2021

@guozhangwang , when investigating the broken tests, I found my change will cause the "normal rebalance" slower. Here's why:
Before my change, the rebalance with 2 consumers will be like this:

  1. consumer A joined group G, and joinGroup completed
  2. consumer B joined group G, the group G state change to preparingRebalance
  3. consumer A send syncGroup, and got REBALANCE_IN_PROGRESS error
  4. consumer A rejoin group, send joinGroup to group G
  5. consumer B send joinGroup to group G
  6. consumer A and B complete the joinGroup and syncGroup successfully

It looks great. But after my change in this PR, it'll become (the change is highlighted in bold)

  1. consumer A joined group G, and joinGroup completed
  2. consumer B joined group G, the group G state change to preparingRebalance
  3. consumer A send syncGroup, and got REBALANCE_IN_PROGRESS error
    4. consumer A reset generation and state
    5. consumer A rejoin group, send joinGroup with a new member ID to group G
  4. consumer B send joinGroup to group G
    7. waiting until rebalance timeout to kick out the old consumer A(old member ID)
  5. consumer A and B complete the joinGroup and syncGroup successfully

That's why this change causes the rebalance slower. We can explicitly leave group when sync group with REBALANCE_IN_PROGRESS, but I think we have to fix it from the root!

===
Currently, all the issue we faced (i.e. KAFKA-12984, KAFKA-13406), is due to the ownedPartitions data in the subscription message is out-of-date, and we don't have a good way to identify it. So in KAFKA-12984, we have to put generation info into userData in Subscription message in CooperativeStickyAssignor. (And we have no way to workaround for custom cooperative assignors). But then, we found we forgot the cooperative assignment validation will also use the ownedPartitions data in KAFKA-13406, so we have to workaround it again.

Therefore, I think we should add an additional field generation into Subscription message, to help CooperativeStickyAssignor and custom assignors leverage ownedPartitions + generation to do "correct" assignment. KIP-792 is drafted. I'd like to get your feedback before I send it to dev group discussion. Thank you.

So, back to your original comment about 2 places to fix:

  1. Inside the ConsumerCoordinator#subscriptions, where we save the currently assigned partitions.
  2. Inside the Assignor#userData, where for (cooperative) sticky assignor where we also encode the generation and the prev-assigned partitions as memberAssignment (note that in join-group request we do not encode the generation id).

Well, KIP-792 is still focusing on 1) above. For 2), I've thought about it for some days, and I think we can ignore it, because in stickyAssignor (not cooperative one), we put both ownedPartitions and generation info into userData, which means, even the ownedPartition is out-of-date, we can still identify it. For custom assignors with old bytecode, I think they can use the same way to achieve the same goal. WDYT?

Thank you.

@guozhangwang
Copy link
Contributor

Hi @showuon I think I agree with you that, if we are going to encode both ownedPartitions and generation into the protocol in the new bytecode, then we do not need to try to "fix" 2), but only detect and fail it.

As for this specific case, I'm actually thinking that we could consider having a slight different version of resetStateAndGeneration which, only reset the generation id, but not the member id field of the Generation. More specifically, we have three callers of resetStateAndRejoin, and one of them is resetGenerationOnResponseError (other two should always reset both generation and member ids). And there are several callers of resetGenerationOnResponseError:

  • UNKNOWN_MEMBER_ID in JoinGroup: reset both memberId and generation.
  • UNKNOWN_MEMBER_ID in SyncGroup: reset both.
  • ILLEGAL_GENERATION in SyncGroup: reset generation only.
  • UNKNOWN_MEMBER_ID in Heartbeat: reset both.
  • ILLEGAL_GENERATION in Heartbeat: reset generation only.
  • UNKNOWN_MEMBER_ID in OffsetCommit: reset both.
  • ILLEGAL_GENERATION in OffsetCommit: reset generation only.

When we add the generation id to the join group protocol, it means the response could also include UNKNOWN_MEMBER_ID as well:

  • UNKNOWN_MEMBER_ID in JoinGroup: reset both.

Now back to your original question:

  1. StickyAssignor in the new byte code would get the ownedPartitions from protocol directly, as in CooperativeStickyAssignor, and bump up the metadata to v2 with empty serialized data; the assign function would depend on the encoded metadata version to decide whether to retrieve the generation / ownedPartitions from the protocol (v0,v1) or from user-data (v2). Note that for old versions where the version is not actually encoded, we'd need to rely on deserialization exception with higher versions to fallback to lower versions still.

  2. CooperativeAssignor in the new byte code would get the generation from protocol directly, and bump up the metadata to V2; the assign function would depend on the encoded metadata version to decide whether to retrieve the generation / ownedPartitions from the protocol (v0,v1) or from user-data (v2). Same as 1) above.

  3. In the AbstractPartitionAssignor, we would have a validateSubscription function which takes in the ownedPartitions across all members, and needs to be called by all assignors (it is the customized assignor's own responsibility to call it), to check that ownedPartitions do not have overlaps.

  4. The broker-side coordinator would check for the generation upon Join-Group: if it is a sentinel value (e.g. null) then assume it is a new member that have never been in the group yet, and hence always for the current generation; if it is not sentinel value and stale, then return the error directly. Again, upon getting such error the member should not clear its memberId if there's one but only reset the generation to null and also its ownedPartitions before re-joining.

@guozhangwang
Copy link
Contributor

Also cc @dajac @hachikuji who're working on improving the general rebalance protocol here.

@showuon
Copy link
Member Author

showuon commented Nov 10, 2021

@guozhangwang , thanks for the comments and clear explanation.

When we add the generation id to the join group protocol, it means the response could also include UNKNOWN_MEMBER_ID as well:
UNKNOWN_MEMBER_ID in JoinGroup: reset both.

I think you're trying to say ILLEGAL_GENERATION in JoinGroup: reset generation only. here, not UNKNOWN_MEMBER_ID, right? :)

I've updated the KIP to add more detailed implementation you suggested.

And for reset generation only, and keep member ID change, I agree. It makes sense to me. I've updated the PR, and waiting for the jenkins build result to see if this change breaks any tests. You can check if this is what you expected.

Thank you.

Comment on lines 446 to 448
if (!generationSnapshot.equals(Generation.NO_GENERATION) && stateSnapshot == MemberState.STABLE) {
if ((generationSnapshot.generationId != Generation.NO_GENERATION.generationId ||
!generationSnapshot.memberId.equals(Generation.NO_GENERATION.memberId)) &&
stateSnapshot == MemberState.STABLE) {
Copy link
Member Author

@showuon showuon Nov 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this change, we'll always reset generation object to NO_GENERATION, but now, we'll have some cases that only reset generation ID. This check is after rebalance complete, consumer should have a valid generation ID in this moment, so, no generation ID (i.e. -1) also means the consumer needs to rejoin group. Change the if condition here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the || in this condition correct? I thought that we would consider the rebalance successful only if we have a valid generation and a valid member id. Am I missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right! Updated.

.setAssignments(Collections.emptyList())
);
log.debug("Sending follower SyncGroup to coordinator {} at generation {}: {}", this.coordinator, this.generation, requestBuilder);
log.debug("Sending follower SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requestBuilder already log the generation info. So, remove it.
Before this change, log is like below (with duplicated generation info output):

Sending leader SyncGroup to coordinator localhost:55644 (id: 2147483647 rack: null) at generation Generation{generationId=2, memberId='consumer-test.group-15-e79d4f58-f8cc-4f98-897d-f711fb3385d8', protocol='range'}: SyncGroupRequestData(groupId='test.group', generationId=2, memberId='consumer-test.group-15-e79d4f58-f8cc-4f98-897d-f711fb3385d8', groupInstanceId=null, protocolType='consumer', protocolName='range', assignments=[SyncGroupRequestAssignment(memberId='consumer-test.group-16-ba76c722-d177-4cad-8251-2e7ece935e7d', assignment=[0, 1, 0, 0, 0, 0, -1, -1, -1, -1]), SyncGroupRequestAssignment(memberId='consumer-test.group-15-e79d4f58-f8cc-4f98-897d-f711fb3385d8', assignment=[0, 1, 0, 0, 0, 1, 0, 3, 102, 111, 111, 0, 0, 0, 1, 0, 0, 0, 0, -1, -1, -1, -1])])

Comment on lines 749 to 752
if (!generation.equals(Generation.NO_GENERATION) && state == MemberState.COMPLETING_REBALANCE) {
// check protocol name only if the generation is not reset
if (generation.protocolName != null && state == MemberState.COMPLETING_REBALANCE) {
// check protocol name only if the generation is not reset (protocol name is not null)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only care about the protocolName here, so only check protocolName not null

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the intend was to validate protocolName only when the generation was not reset. It seems that we are changing this here. Why?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, updated and explained below. Thanks.

Comment on lines -708 to +714
if (generation == Generation.NO_GENERATION.generationId &&
if (generation == Generation.NO_GENERATION.generationId ||
memberId.equals(Generation.NO_GENERATION.memberId)) {
revokedPartitions = new HashSet<>(subscriptions.assignedPartitions());

if (!revokedPartitions.isEmpty()) {
log.info("Giving away all assigned partitions as lost since generation has been reset," +
"indicating that consumer is no longer part of the group");
log.info("Giving away all assigned partitions as lost since generation/memberID has been reset," +
"indicating that consumer is in old state or no longer part of the group");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change, either no generation ID or no member ID, we'll clear all their ownedPartitions since they are out-of-date.

if (subscriptions.hasAutoAssignedPartitions() && !droppedPartitions.isEmpty()) {
final Exception e;
if (generation() == Generation.NO_GENERATION || rebalanceInProgress()) {
if (currentGeneration.equals(Generation.NO_GENERATION) || rebalanceInProgress()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the currentGeneration snapshot to do the check. Otherwise, user might see the unexpected callback got called when comparing the log. Also, we should compare with equals here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this one. Is it correct to compare to Generation.NO_GENERATION here or do we need to compare to the generationId?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

David, you're right! I was focusing on fixing the == error here. Yes, we should be consistent with onJoinPrepare here, to invoke PartitionsLost when

generation == Generation.NO_GENERATION.generationId ||
            memberId.equals(Generation.NO_GENERATION.memberId)

Otherwise, invoke PartitionsRevoked.
I'll update it later. Thank you.

watchers.tryCompleteWatched()
debug(s"Request key $key unblocked $numCompleted $purgatoryName operations")
if (numCompleted > 0) {
debug(s"Request key $key unblocked $numCompleted $purgatoryName operations")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this change, we'll see many useless logs like this:
DEBUG Request key GroupJoinKey(test.group) unblocked 0 Rebalance operations
We should log when numCompleted > 0.

@showuon
Copy link
Member Author

showuon commented Nov 12, 2021

@guozhangwang , tests added. Please take a look when available. Thank you.

@showuon showuon changed the title KAFKA-13419: resetStateAndRejoin when rebalanceInProgress in syncgroup KAFKA-13419: Only reset generation ID when ILLEGAL_GENERATION error Nov 12, 2021
Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @showuon . I think the changes lgtm overall.

I want to note that, when we do the threading factoring where only the hb thread would do the network work, then a lot of these logic would be much simplified. More specifically (cc @dajac again):

  • We do not need to check if generation has been reset concurrently since only one thread would be doing hb along with join/sync-group requests, when the thread is doing rebalance, it would not do hb any more.
  • Illegal generation would then be very similar to unknown member id in hb response, since if the former is received, if means the member missed the most recent rebalance that bumps up the rebalance, and since that member does not participate in that rebalance the member.id should have been kicked out of the group (or someone else joins with the same member.id, but in either way this member's member.id would no longer be valid), so it's actually okay to always reset the member.id as well.

final Generation currentGeneration = generation();
final String memberId = currentGeneration.memberId;

log.debug("Executing onLeavePrepare with generation {} and memberId {}", currentGeneration, memberId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the rationale of removing the member id in logging?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because the memberId info is already included in the generation info. This is the log output currently:
Executing onLeavePrepare with generation Generation{generationId=1, memberId='consumer1', protocol='range'} and memberId consumer1

Sorry, I should have mentioned it to make it clear.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, thanks.

if (subscriptions.hasAutoAssignedPartitions() && !droppedPartitions.isEmpty()) {
final Exception e;
if (generation() == Generation.NO_GENERATION || rebalanceInProgress()) {
if (currentGeneration.equals(Generation.NO_GENERATION) || rebalanceInProgress()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

// then retry immediately
if (generationUnchanged())
resetGenerationOnResponseError(ApiKeys.JOIN_GROUP, error);
resetGenerationOnResponseError(ApiKeys.JOIN_GROUP, error, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe it's now better to rename this function, to resetStateOnResponseError?

@showuon
Copy link
Member Author

showuon commented Nov 19, 2021

@dajac , thanks for your comments. Yes, you are right, I didn't make the if condition correct. I've updated it to use a hasGenerationReset helper method:

private boolean hasGenerationReset(Generation gen) {
        // the member ID might not be reset for ILLEGAL_GENERATION error, so only check generationID and protocol name here
        return gen.generationId == Generation.NO_GENERATION.generationId && gen.protocolName == null;
    }

Before this change, we can just do if (generation.equals(NO_GENERATION)), to check if the generation object is reset or not, now, the check should check for generationID and protocolName field only, because member Id might not be reset for some cases.

Thank you.

@showuon
Copy link
Member Author

showuon commented Dec 2, 2021

@dajac , please have a 2nd review when available. Thank you.

@guozhangwang
Copy link
Contributor

I made another pass and it LGTM. @dajac do you want to make another pass?

@dajac
Copy link
Member

dajac commented Dec 15, 2021

I will take another look tomorrow. Sorry for the delay.

@showuon
Copy link
Member Author

showuon commented Dec 16, 2021

I will take another look tomorrow. Sorry for the delay.

No problem, David! :)

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, the PR LGTM. I left two clarification questions. @guozhangwang Could you double check them? Feel free to merge if my questions are irrelevant.

if (subscriptions.hasAutoAssignedPartitions() && !droppedPartitions.isEmpty()) {
final Exception e;
if (generation() == Generation.NO_GENERATION || rebalanceInProgress()) {
if (currentGeneration.equals(Generation.NO_GENERATION) || rebalanceInProgress()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this one. Is it correct to compare to Generation.NO_GENERATION here or do we need to compare to the generationId?

Exception exception = null;
final Set<TopicPartition> revokedPartitions;
if (generation == Generation.NO_GENERATION.generationId &&
if (generation == Generation.NO_GENERATION.generationId ||
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is || memberId.equals(Generation.NO_GENERATION.memberId) really necessary? My understanding is that a reset memberId implies that generationId was also reset. I guess that it does not hurt to have it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree that it doesn't hurt to have it. Thank you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dajac , I agree with that as well.

@guozhangwang guozhangwang merged commit c219fba into apache:trunk Dec 17, 2021
xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…pache#11451)

Updated: This PR will reset generation ID when ILLEGAL_GENERATION error since the member ID is still valid.

=====
resetStateAndRejoin when REBALANCE_IN_PROGRESS error in sync group, to avoid out-of-date ownedPartition

== JIRA description ==
In KAFKA-13406, we found there's user got stuck when in rebalancing with cooperative sticky assignor. The reason is the "ownedPartition" is out-of-date, and it failed the cooperative assignment validation.

Investigate deeper, I found the root cause is we didn't reset generation and state after sync group fail. In KAFKA-12983, we fixed the issue that the onJoinPrepare is not called in resetStateAndRejoin method. And it causes the ownedPartition not get cleared. But there's another case that the ownedPartition will be out-of-date. Here's the example:

consumer A joined and synced group successfully with generation 1
New rebalance started with generation 2, consumer A joined successfully, but somehow, consumer A doesn't send out sync group immediately
other consumer completed sync group successfully in generation 2, except consumer A.
After consumer A send out sync group, the new rebalance start, with generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group response
When receiving REBALANCE_IN_PROGRESS, we re-join the group, with generation 3, with the assignment (ownedPartition) in generation 1.
So, now, we have out-of-date ownedPartition sent, with unexpected results happened
We might want to do resetStateAndRejoin when RebalanceInProgressException errors happend in sync group. Because when we got sync group error, it means, join group passed, and other consumers (and the leader) might already completed this round of rebalance. The assignment distribution this consumer have is already out-of-date.

Reviewers: David Jacot <djacot@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
@aiquestion
Copy link
Contributor

aiquestion commented May 9, 2022

@showuon we are facing the same issue, and i want to ask
is the issue https://issues.apache.org/jira/browse/KAFKA-13419 get fixed in this PR? Or we still need KIP-792 to fix this.

for the original 2 places to fix:

  1. Inside the ConsumerCoordinator#subscriptions, where we save the currently assigned partitions.
  2. Inside the Assignor#userData, where for (cooperative) sticky assignor where we also encode the generation and the prev-assigned partitions as memberAssignment (note that in join-group request we do not encode the generation id).

In this change you want to fix the 1) one, but found that it will cause rebalance wait for session timeout, because memberId reset.

Seems only reset geneartionId when SyncGroupReqeust got RebalanceInProcess error can be a workaround for this.( i did some test and it works)
But you didn't do the reset in your final code.

Thanks a lot~~

@showuon
Copy link
Member Author

showuon commented May 9, 2022

Seems only reset geneartionId when SyncGroupReqeust got RebalanceInProcess error can be a workaround for this.( i did some test and it works)
But you didn't do the reset in your final code.

@aiquestion , seems you are right. I forgot to reset the generation ID when SyncGroupReqeust got RebalanceInProcess error. Are you interested in submitting a PR for it?

But one thing to clarify, after KAFKA-12984, KAFKA-13406 got fixed, even if the consumer joined with out-of-date ownedPartition, it won't cause the rebalancing stuck issue. So, the fix is just to allow the consumer lead has a correct version of consumer ownedPartitions to do partition assignment.

Thank you.

@aiquestion
Copy link
Contributor

Thanks for reply. Will try to submit a PR for it.

Yes, with KAFKA-12984, KAFKA-13406 rebalance will not stuck.
What we see is that: with many consumers, the group may rebalance for many rounds before stable ( Cooperative rebalance enabled)

  • consumer A joined group G, and joinGroup completed ( with ownedPartition P1/P2 )
  • consumer B joined group G, the group G state change to preparingRebalance
  • consumer A send syncGroup, and got REBALANCE_IN_PROGRESS error
  • consumer A rejoin group, send joinGroup to group G
  • consumer B send joinGroup to group G
  • consumer A and B complete the joinGroup and syncGroup successfully
    --- added ---
  • consumer A assigned partition P3/P4 (since it's ownedParititon ignored because of generation )
  • consumer A revoke P1/P2 and send joinGroup for another round of rebalance

Another round of rebalance begin and some other consumer C will not be able to syncGroup in time, so the rebalance will goes for many rounds before stable, and there will be dup consuming in the rebalance time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants