Skip to content

Commit df398aa

Browse files
authored
KAFKA-13063: Make DescribeConsumerGroupsHandler unmap for COORDINATOR_NOT_AVAILABLE error (#11022)
This patch improve the error handling in `DescribeConsumerGroupsHandler` and ensure that `COORDINATOR_NOT_AVAILABLE` is unmapped in order to look up the coordinator again. Reviewers: David Jacot <djacot@confluent.io>
1 parent 46c91f4 commit df398aa

File tree

3 files changed

+38
-32
lines changed

3 files changed

+38
-32
lines changed

clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -109,16 +109,16 @@ public ApiResult<CoordinatorKey, ConsumerGroupDescription> handleResponse(
109109
Set<CoordinatorKey> groupIds,
110110
AbstractResponse abstractResponse
111111
) {
112-
DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse;
113-
Map<CoordinatorKey, ConsumerGroupDescription> completed = new HashMap<>();
114-
Map<CoordinatorKey, Throwable> failed = new HashMap<>();
115-
List<CoordinatorKey> unmapped = new ArrayList<>();
112+
final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse;
113+
final Map<CoordinatorKey, ConsumerGroupDescription> completed = new HashMap<>();
114+
final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
115+
final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
116116

117117
for (DescribedGroup describedGroup : response.data().groups()) {
118118
CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(describedGroup.groupId());
119119
Errors error = Errors.forCode(describedGroup.errorCode());
120120
if (error != Errors.NONE) {
121-
handleError(groupIdKey, error, failed, unmapped);
121+
handleError(groupIdKey, error, failed, groupsToUnmap);
122122
continue;
123123
}
124124
final String protocolType = describedGroup.protocolType();
@@ -151,38 +151,41 @@ public ApiResult<CoordinatorKey, ConsumerGroupDescription> handleResponse(
151151
completed.put(groupIdKey, consumerGroupDescription);
152152
} else {
153153
failed.put(groupIdKey, new IllegalArgumentException(
154-
String.format("GroupId %s is not a consumer group (%s).",
155-
groupIdKey.idValue, protocolType)));
154+
String.format("GroupId %s is not a consumer group (%s).",
155+
groupIdKey.idValue, protocolType)));
156156
}
157157
}
158-
return new ApiResult<>(completed, failed, unmapped);
158+
159+
return new ApiResult<>(completed, failed, new ArrayList<>(groupsToUnmap));
159160
}
160161

161162
private void handleError(
162163
CoordinatorKey groupId,
163164
Errors error,
164165
Map<CoordinatorKey, Throwable> failed,
165-
List<CoordinatorKey> unmapped
166+
Set<CoordinatorKey> groupsToUnmap
166167
) {
167168
switch (error) {
168169
case GROUP_AUTHORIZATION_FAILED:
169-
log.error("Received authorization failure for group {} in `DescribeGroups` response", groupId,
170-
error.exception());
170+
log.debug("`DescribeGroups` request for group id {} failed due to error {}", groupId.idValue, error);
171171
failed.put(groupId, error.exception());
172172
break;
173173
case COORDINATOR_LOAD_IN_PROGRESS:
174-
case COORDINATOR_NOT_AVAILABLE:
174+
// If the coordinator is in the middle of loading, then we just need to retry
175+
log.debug("`DescribeGroups` request for group id {} failed because the coordinator " +
176+
"is still in the process of loading state. Will retry", groupId.idValue);
175177
break;
178+
case COORDINATOR_NOT_AVAILABLE:
176179
case NOT_COORDINATOR:
177-
log.debug("DescribeGroups request for group {} returned error {}. Will retry",
178-
groupId, error);
179-
unmapped.add(groupId);
180+
// If the coordinator is unavailable or there was a coordinator change, then we unmap
181+
// the key so that we retry the `FindCoordinator` request
182+
log.debug("`DescribeGroups` request for group id {} returned error {}. " +
183+
"Will attempt to find the coordinator again and retry", groupId.idValue, error);
184+
groupsToUnmap.add(groupId);
180185
break;
181186
default:
182-
log.error("Received unexpected error for group {} in `DescribeGroups` response",
183-
groupId, error.exception());
184-
failed.put(groupId, error.exception(
185-
"Received unexpected error for group " + groupId + " in `DescribeGroups` response"));
187+
log.error("`DescribeGroups` request for group id {} failed due to unexpected error {}", groupId.idValue, error);
188+
failed.put(groupId, error.exception());
186189
}
187190
}
188191

clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2688,7 +2688,7 @@ public void testDescribeConsumerGroups() throws Exception {
26882688
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
26892689
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
26902690

2691-
//Retriable FindCoordinatorResponse errors should be retried
2691+
// Retriable FindCoordinatorResponse errors should be retried
26922692
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
26932693
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode()));
26942694

@@ -2707,21 +2707,12 @@ public void testDescribeConsumerGroups() throws Exception {
27072707
Collections.emptySet()));
27082708
env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
27092709

2710-
data = new DescribeGroupsResponseData();
2711-
data.groups().add(DescribeGroupsResponse.groupMetadata(
2712-
GROUP_ID,
2713-
Errors.COORDINATOR_NOT_AVAILABLE,
2714-
"",
2715-
"",
2716-
"",
2717-
Collections.emptyList(),
2718-
Collections.emptySet()));
2719-
env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
2720-
27212710
/*
27222711
* We need to return two responses here, one with NOT_COORDINATOR error when calling describe consumer group
27232712
* api using coordinator that has moved. This will retry whole operation. So we need to again respond with a
27242713
* FindCoordinatorResponse.
2714+
*
2715+
* And the same reason for COORDINATOR_NOT_AVAILABLE error response
27252716
*/
27262717
data = new DescribeGroupsResponseData();
27272718
data.groups().add(DescribeGroupsResponse.groupMetadata(
@@ -2735,6 +2726,18 @@ public void testDescribeConsumerGroups() throws Exception {
27352726
env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
27362727
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
27372728

2729+
data = new DescribeGroupsResponseData();
2730+
data.groups().add(DescribeGroupsResponse.groupMetadata(
2731+
GROUP_ID,
2732+
Errors.COORDINATOR_NOT_AVAILABLE,
2733+
"",
2734+
"",
2735+
"",
2736+
Collections.emptyList(),
2737+
Collections.emptySet()));
2738+
env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
2739+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
2740+
27382741
data = new DescribeGroupsResponseData();
27392742
TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
27402743
TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);

clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,13 +104,13 @@ public void testSuccessfulHandleResponse() {
104104

105105
@Test
106106
public void testUnmappedHandleResponse() {
107+
assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE, ""));
107108
assertUnmapped(handleWithError(Errors.NOT_COORDINATOR, ""));
108109
}
109110

110111
@Test
111112
public void testRetriableHandleResponse() {
112113
assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS, ""));
113-
assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE, ""));
114114
}
115115

116116
@Test

0 commit comments

Comments
 (0)