Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,16 @@ public ApiResult<CoordinatorKey, ConsumerGroupDescription> handleResponse(
Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse
) {
DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse;
Map<CoordinatorKey, ConsumerGroupDescription> completed = new HashMap<>();
Map<CoordinatorKey, Throwable> failed = new HashMap<>();
List<CoordinatorKey> unmapped = new ArrayList<>();
final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse;
final Map<CoordinatorKey, ConsumerGroupDescription> completed = new HashMap<>();
final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();

for (DescribedGroup describedGroup : response.data().groups()) {
CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(describedGroup.groupId());
Errors error = Errors.forCode(describedGroup.errorCode());
if (error != Errors.NONE) {
handleError(groupIdKey, error, failed, unmapped);
handleError(groupIdKey, error, failed, groupsToUnmap);
continue;
}
final String protocolType = describedGroup.protocolType();
Expand Down Expand Up @@ -151,38 +151,41 @@ public ApiResult<CoordinatorKey, ConsumerGroupDescription> handleResponse(
completed.put(groupIdKey, consumerGroupDescription);
} else {
failed.put(groupIdKey, new IllegalArgumentException(
String.format("GroupId %s is not a consumer group (%s).",
groupIdKey.idValue, protocolType)));
String.format("GroupId %s is not a consumer group (%s).",
groupIdKey.idValue, protocolType)));
}
}
return new ApiResult<>(completed, failed, unmapped);

return new ApiResult<>(completed, failed, new ArrayList<>(groupsToUnmap));
}

private void handleError(
CoordinatorKey groupId,
Errors error,
Map<CoordinatorKey, Throwable> failed,
List<CoordinatorKey> unmapped
Set<CoordinatorKey> groupsToUnmap
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
log.error("Received authorization failure for group {} in `DescribeGroups` response", groupId,
error.exception());
log.debug("`DescribeGroups` request for group id {} failed due to error {}", groupId.idValue, error);
failed.put(groupId, error.exception());
break;
case COORDINATOR_LOAD_IN_PROGRESS:
case COORDINATOR_NOT_AVAILABLE:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("`DescribeGroups` request for group id {} failed because the coordinator " +
"is still in the process of loading state. Will retry", groupId.idValue);
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
log.debug("DescribeGroups request for group {} returned error {}. Will retry",
groupId, error);
unmapped.add(groupId);
// If the coordinator is unavailable or there was a coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("`DescribeGroups` request for group id {} returned error {}. " +
"Will attempt to find the coordinator again and retry", groupId.idValue, error);
groupsToUnmap.add(groupId);
break;
default:
log.error("Received unexpected error for group {} in `DescribeGroups` response",
groupId, error.exception());
failed.put(groupId, error.exception(
"Received unexpected error for group " + groupId + " in `DescribeGroups` response"));
log.error("`DescribeGroups` request for group id {} failed due to unexpected error {}", groupId.idValue, error);
failed.put(groupId, error.exception());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2688,7 +2688,7 @@ public void testDescribeConsumerGroups() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());

//Retriable FindCoordinatorResponse errors should be retried
// Retriable FindCoordinatorResponse errors should be retried
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode()));

Expand All @@ -2707,21 +2707,12 @@ public void testDescribeConsumerGroups() throws Exception {
Collections.emptySet()));
env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));

data = new DescribeGroupsResponseData();
data.groups().add(DescribeGroupsResponse.groupMetadata(
GROUP_ID,
Errors.COORDINATOR_NOT_AVAILABLE,
"",
"",
"",
Collections.emptyList(),
Collections.emptySet()));
env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));

/*
* We need to return two responses here, one with NOT_COORDINATOR error when calling describe consumer group
* api using coordinator that has moved. This will retry whole operation. So we need to again respond with a
* FindCoordinatorResponse.
*
* And the same reason for COORDINATOR_NOT_AVAILABLE error response
*/
data = new DescribeGroupsResponseData();
data.groups().add(DescribeGroupsResponse.groupMetadata(
Expand All @@ -2735,6 +2726,18 @@ public void testDescribeConsumerGroups() throws Exception {
env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

data = new DescribeGroupsResponseData();
data.groups().add(DescribeGroupsResponse.groupMetadata(
GROUP_ID,
Errors.COORDINATOR_NOT_AVAILABLE,
"",
"",
"",
Collections.emptyList(),
Collections.emptySet()));
env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

data = new DescribeGroupsResponseData();
TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ public void testSuccessfulHandleResponse() {

@Test
public void testUnmappedHandleResponse() {
assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE, ""));
assertUnmapped(handleWithError(Errors.NOT_COORDINATOR, ""));
}

@Test
public void testRetriableHandleResponse() {
assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS, ""));
assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE, ""));
}

@Test
Expand Down