Skip to content

Conversation

@showuon
Copy link
Member

@showuon showuon commented Jul 12, 2021

refactor DescribeConsumerGroupsHandler and tests. Also, put COORDINATOR_NOT_AVAILABLE as unmap retry.

the old handleResponse for DescribeConsumerGroups request:

void handleResponse(AbstractResponse abstractResponse) {
      final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse;

      List<DescribedGroup> describedGroups = response.data().groups();
      if (describedGroups.isEmpty()) {
          context.future().completeExceptionally(
                  new InvalidGroupIdException("No consumer group found for GroupId: " + context.groupId()));
          return;
      }

      if (describedGroups.size() > 1 ||
              !describedGroups.get(0).groupId().equals(context.groupId())) {
          String ids = Arrays.toString(describedGroups.stream().map(DescribedGroup::groupId).toArray());
          context.future().completeExceptionally(new InvalidGroupIdException(
                  "DescribeConsumerGroup request for GroupId: " + context.groupId() + " returned " + ids));
          return;
      }

      final DescribedGroup describedGroup = describedGroups.get(0);

      // If coordinator changed since we fetched it, retry
      if (ConsumerGroupOperationContext.hasCoordinatorMoved(response)) {
          Call call = getDescribeConsumerGroupsCall(context);
          rescheduleFindCoordinatorTask(context, () -> call, this);
          return;
      }

      final Errors groupError = Errors.forCode(describedGroup.errorCode());
      if (handleGroupRequestError(groupError, context.future()))
          return;

      final String protocolType = describedGroup.protocolType();
      if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) {
          final List<DescribedGroupMember> members = describedGroup.members();
          final List<MemberDescription> memberDescriptions = new ArrayList<>(members.size());
          final Set<AclOperation> authorizedOperations = validAclOperations(describedGroup.authorizedOperations());
          for (DescribedGroupMember groupMember : members) {
              Set<TopicPartition> partitions = Collections.emptySet();
              if (groupMember.memberAssignment().length > 0) {
                  final Assignment assignment = ConsumerProtocol.
                      deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
                  partitions = new HashSet<>(assignment.partitions());
              }
              final MemberDescription memberDescription = new MemberDescription(
                      groupMember.memberId(),
                      Optional.ofNullable(groupMember.groupInstanceId()),
                      groupMember.clientId(),
                      groupMember.clientHost(),
                      new MemberAssignment(partitions));
              memberDescriptions.add(memberDescription);
          }
          final ConsumerGroupDescription consumerGroupDescription =
              new ConsumerGroupDescription(context.groupId(), protocolType.isEmpty(),
                  memberDescriptions,
                  describedGroup.protocolData(),
                  ConsumerGroupState.parse(describedGroup.groupState()),
                  context.node().get(),
                  authorizedOperations);
          context.future().complete(consumerGroupDescription);
      } else {
          context.future().completeExceptionally(new IllegalArgumentException(
              String.format("GroupId %s is not a consumer group (%s).",
                  context.groupId(), protocolType)));
      }
  }

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Comment on lines 107 to 111
private void validateGroupsNotEmpty(List<DescribedGroup> describedGroups) {
if (describedGroups.isEmpty()) {
throw new InvalidGroupIdException("No consumer group found");
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this check? I am not sure where this InvalidGroupIdException thrown here will get to.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before KIP-699, there's the check:

if (describedGroups.isEmpty()) {
          context.future().completeExceptionally(
                  new InvalidGroupIdException("No consumer group found for GroupId: " + context.groupId()));
          return;
      }

But, you're right, the exception thrown will not return back to requester. Also, because there's no group found, we cannot have a failed key return because we need a group in key.

Removed it since it should never happen.

}
return new ApiResult<>(completed, failed, unmapped);

if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems incorrect here as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right! We accept multiple groups. Updated. Thanks.

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon Thank for the update. I left a few more comments.

Errors error = Errors.forCode(describedGroup.errorCode());
if (error != Errors.NONE) {
handleError(groupIdKey, error, failed, unmapped);
handleError(groupIdKey, error, failed, groupsToUnmap, groupsToRetry);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

groupsToRetry to retry is not really necessary in this case. We don't even use it later. Could we remove it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Updated.

}
return new ApiResult<>(completed, failed, unmapped);

return new ApiResult<>(completed, failed, new ArrayList<>(groupsToUnmap));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: There is an extra space before new.

final String unexpectedErrorMsg =
String.format("`DescribeGroups` request for group id %s failed due to error %s", groupId.idValue, error);
log.error(unexpectedErrorMsg);
failed.put(groupId, error.exception(unexpectedErrorMsg));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't provide the error message in any other case. Should we remove this one for the time being? I think that it is a good idea but only if we do it across the board.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. I just follow the previous behavior. Remove the error message. Thanks.

}

}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we revert this back?

@showuon showuon changed the title KAFKA-13063: refactor DescribeConsumerGroupsHandler and tests KAFKA-13063: Make DescribeConsumerGroupsHandler unmap for COORDINATOR_NOT_AVAILABLE error Jul 15, 2021
@showuon
Copy link
Member Author

showuon commented Jul 15, 2021

@dajac , I addressed your comments in this PR and all other 4 PRs. And also update the PR title and description accordingly. Please help take a look when available. Thank you.

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@dajac
Copy link
Member

dajac commented Jul 15, 2021

Failures are not related:

Build / JDK 8 and Scala 2.12 / testUnauthorizedDeleteTopicsWithDescribe() – kafka.api.AuthorizerIntegrationTest
1m 7s
Build / JDK 8 and Scala 2.12 / testCloseDuringRebalance() – kafka.api.ConsumerBounceTest
18s
Build / JDK 8 and Scala 2.12 / testListReassignmentsDoesNotShowNonReassigningPartitions() – kafka.api.PlaintextAdminIntegrationTest
5s
Build / JDK 8 and Scala 2.12 / testSendOffsetsToTransactionTimeout() – kafka.api.TransactionsTest

@dajac dajac merged commit df398aa into apache:trunk Jul 15, 2021
dajac pushed a commit that referenced this pull request Jul 15, 2021
…_NOT_AVAILABLE error (#11022)

This patch improve the error handling in `DescribeConsumerGroupsHandler` and ensure that `COORDINATOR_NOT_AVAILABLE` is unmapped in order to look up the coordinator again.

Reviewers: David Jacot <djacot@confluent.io>
@dajac
Copy link
Member

dajac commented Jul 15, 2021

Merged to trunk and to 3.0. cc @kkonstantine

xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…_NOT_AVAILABLE error (apache#11022)

This patch improve the error handling in `DescribeConsumerGroupsHandler` and ensure that `COORDINATOR_NOT_AVAILABLE` is unmapped in order to look up the coordinator again.

Reviewers: David Jacot <djacot@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants