Skip to content

Conversation

@showuon
Copy link
Member

@showuon showuon commented Jul 13, 2021

refactor RemoveMembersFromConsumerGroupHandler and tests. Also, put COORDINATOR_NOT_AVAILABLE as unmap retry.

This is the old handle response logic. FYR:

void handleResponse(AbstractResponse abstractResponse) {
      final LeaveGroupResponse response = (LeaveGroupResponse) abstractResponse;

      // If coordinator changed since we fetched it, retry
      // note here: we'll collect all errors in group error and member errors, to check if if we need to retry
      if (ConsumerGroupOperationContext.hasCoordinatorMoved(response)) {
          Call call = getRemoveMembersFromGroupCall(context, members);
          rescheduleFindCoordinatorTask(context, () -> call, this);
          return;
      }

      if (handleGroupRequestError(response.topLevelError(), context.future()))
          return;

      final Map<MemberIdentity, Errors> memberErrors = new HashMap<>();
      for (MemberResponse memberResponse : response.memberResponses()) {
          memberErrors.put(new MemberIdentity()
                               .setMemberId(memberResponse.memberId())
                               .setGroupInstanceId(memberResponse.groupInstanceId()),
                           Errors.forCode(memberResponse.errorCode()));
      }
      context.future().complete(memberErrors);
  }

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@showuon
Copy link
Member Author

showuon commented Jul 13, 2021

@dajac , please take a look. Thanks.

String memberId = memberResponse.memberId();

if (memberError != Errors.NONE) {
handleMemberError(groupId, memberId, memberError, groupsToUnmap, groupsToRetry);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really necessary? It seems that group level errors as always retuned as top level error in the response. Do you confirm?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the question. I checked broker code, and confirmed that all member response only contain member level errors, not group level or coordinator level. I remove it. Thanks.

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon Thanks. I left a few more comments.

}
return new ApiResult<>(completed, failed, unmapped);

if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not seem necessary as we always expect the top level error. Would it make sense to handle it like we did here: https://github.com/apache/kafka/pull/11019/files#diff-e7eafbafe0b75099d0c8b4083c03c653d57245ef7b0fcfae7b9ccd258a9024e3R117?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree! Updated!


memberErrors.put(new MemberIdentity()
.setMemberId(memberResponse.memberId())
.setMemberId(memberId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could revert this change as it does not bring much.

.setMemberId(memberId)
.setGroupInstanceId(memberResponse.groupInstanceId()),
Errors.forCode(memberResponse.errorCode()));
memberError);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could revert this change as it does not bring much and re-align like it was before.

}

}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we add an empty line back?

final String unexpectedErrorMsg =
String.format("`LeaveGroup` request for group id %s failed due to unexpected error %s", groupId.idValue, error);
log.error(unexpectedErrorMsg);
failed.put(groupId, error.exception(unexpectedErrorMsg));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: As said in the other PR, this is a good idea but I would only do it if we do it for all exceptions.

Comment on lines 3699 to 3701
MemberResponse memberResponse = new MemberResponse()
.setGroupInstanceId(groupId)
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that we could remove this test as it is not possible to have COORDINATOR_LOAD_IN_PROGRESS as an error for a member, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right! Removed. Thanks.

@showuon showuon changed the title KAFKA-13072: refactor RemoveMembersFromConsumerGroupHandler and tests KAFKA-13072: Make RemoveMembersFromConsumerGroupHandler unmap for COORDINATOR_NOT_AVAILABLE error Jul 15, 2021
Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks.

@dajac
Copy link
Member

dajac commented Jul 15, 2021

I've re-triggered Jenkins.

@showuon
Copy link
Member Author

showuon commented Jul 15, 2021

Thank you. it have been running for whole day. :)

@dajac
Copy link
Member

dajac commented Jul 15, 2021

Failures are not related:

Build / JDK 16 and Scala 2.13 / testCloseDuringRebalance() – kafka.api.ConsumerBounceTest
4s
Build / JDK 16 and Scala 2.13 / testCloseDuringRebalance() – kafka.api.ConsumerBounceTest
10s
Build / JDK 11 and Scala 2.13 / testSendOffsetsToTransactionTimeout() – kafka.api.TransactionsTest
9s
Build / JDK 11 and Scala 2.13 / testSendOffsetsToTransactionTimeout() – kafka.api.TransactionsTest
13s

@dajac dajac merged commit 921a342 into apache:trunk Jul 15, 2021
dajac pushed a commit that referenced this pull request Jul 15, 2021
…RDINATOR_NOT_AVAILABLE error (#11035)

This patch improve the error handling in `RemoveMembersFromConsumerGroupHandler` and ensures that `COORDINATOR_NOT_AVAILABLE` is unmapped in order to look up the coordinator again.

Reviewers: David Jacot <djacot@confluent.io>
@dajac
Copy link
Member

dajac commented Jul 15, 2021

Merged to trunk and 3.0. cc @kkonstantine

xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…RDINATOR_NOT_AVAILABLE error (apache#11035)

This patch improve the error handling in `RemoveMembersFromConsumerGroupHandler` and ensures that `COORDINATOR_NOT_AVAILABLE` is unmapped in order to look up the coordinator again.

Reviewers: David Jacot <djacot@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants