From 9a82c659c061389a9a0c48f4f47bdcd1be09ac4c Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Mon, 12 Jul 2021 20:46:42 +0800 Subject: [PATCH 1/5] KAFKA-13064: refactor ListConsumerGroupOffsetsHandler and tests --- .../ListConsumerGroupOffsetsHandler.java | 94 +++++++++++++++---- .../clients/admin/KafkaAdminClientTest.java | 78 ++++++++++++++- .../ListConsumerGroupOffsetsHandlerTest.java | 36 ++++++- 3 files changed, 185 insertions(+), 23 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java index 240516d4ccdfe..71135086f5a02 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -85,11 +86,12 @@ public ApiResult> handleR final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse; Map> completed = new HashMap<>(); Map failed = new HashMap<>(); - List unmapped = new ArrayList<>(); + final Set groupsToUnmap = new HashSet<>(); + final Set groupsToRetry = new HashSet<>(); Errors responseError = response.groupLevelError(groupId.idValue); if (responseError != Errors.NONE) { - handleError(groupId, responseError, failed, unmapped); + handleGroupError(groupId, responseError, failed, groupsToUnmap, groupsToRetry); } else { final Map groupOffsetsListing = new HashMap<>(); Map partitionDataMap = @@ -110,41 +112,97 @@ public ApiResult> handleR groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata)); } } else { - log.warn("Skipping return offset for {} due to error {}.", topicPartition, error); + handlePartitionError(groupId, topicPartition, error, groupsToUnmap, groupsToRetry); } } completed.put(groupId, groupOffsetsListing); } - return new ApiResult<>(completed, failed, unmapped); + + if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { + return new ApiResult<>( + completed, + failed, + Collections.emptyList() + ); + } else { + // retry the request, so don't send completed/failed results back + return new ApiResult<>( + Collections.emptyMap(), + Collections.emptyMap(), + new ArrayList<>(groupsToUnmap) + ); + } } - private void handleError( + private void handleGroupError( CoordinatorKey groupId, Errors error, - Map failed, - List unmapped + Map failed, + Set groupsToUnmap, + Set groupsToRetry ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `OffsetFetch` response", groupId, - error.exception()); + log.error("Received authorization failure for group {} in `{}` response", groupId, + apiName(), error.exception()); failed.put(groupId, error.exception()); break; + case COORDINATOR_LOAD_IN_PROGRESS: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`{}` request for group {} failed because the coordinator " + + "is still in the process of loading state. Will retry", apiName(), groupId); + groupsToRetry.add(groupId); + break; case COORDINATOR_NOT_AVAILABLE: + case NOT_COORDINATOR: + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`{}` request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry", apiName(), groupId, error); + groupsToUnmap.add(groupId); break; + + default: + final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", + groupId, apiName()); + log.error(unexpectedErrorMsg, error.exception()); + failed.put(groupId, error.exception(unexpectedErrorMsg)); + } + } + + private void handlePartitionError( + CoordinatorKey groupId, + TopicPartition topicPartition, + Errors error, + Set groupsToUnmap, + Set groupsToRetry + ) { + switch (error) { + case COORDINATOR_LOAD_IN_PROGRESS: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`{}` request for group {} failed because the coordinator " + + "is still in the process of loading state. Will retry", apiName(), groupId); + groupsToRetry.add(groupId); + break; + case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: - log.debug("OffsetFetch request for group {} returned error {}. Will retry", - groupId, error); - unmapped.add(groupId); + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`{}` request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry", apiName(), groupId, error); + groupsToUnmap.add(groupId); + break; + case UNKNOWN_TOPIC_OR_PARTITION: + case TOPIC_AUTHORIZATION_FAILED: + case UNSTABLE_OFFSET_COMMIT: + log.warn("`{}` request for group {} returned error {} in partition {}. Skipping return offset for it.", + apiName(), groupId, error, topicPartition); break; default: - log.error("Received unexpected error for group {} in `OffsetFetch` response", - groupId, error.exception()); - failed.put(groupId, error.exception( - "Received unexpected error for group " + groupId + " in `OffsetFetch` response")); + log.error("`{}` request for group {} returned unexpected error {} in partition {}. Skipping return offset for it.", + apiName(), groupId, error, topicPartition); } } -} +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 53e326ad8955f..d9a948456bf80 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -2957,6 +2957,21 @@ public void testListConsumerGroupOffsetsRetriableErrors() throws Exception { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + env.kafkaClient().prepareResponse( + new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap())); + /* + * We need to return two responses here, one for NOT_COORDINATOR call when calling list consumer offsets + * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a + * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response + */ + env.kafkaClient().prepareResponse( + new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap())); + env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); @@ -2964,15 +2979,66 @@ public void testListConsumerGroupOffsetsRetriableErrors() throws Exception { new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap())); env.kafkaClient().prepareResponse( - new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap())); + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + env.kafkaClient().prepareResponse( + new OffsetFetchResponse(Errors.NONE, Collections.emptyMap())); + + final ListConsumerGroupOffsetsResult errorResult1 = env.adminClient().listConsumerGroupOffsets(GROUP_ID); + + assertEquals(Collections.emptyMap(), errorResult1.partitionsToOffsetAndMetadata().get()); + } + } + + @Test + public void testListConsumerGroupOffsetsRetriableErrorsInPartition() throws Exception { + // Retriable errors should be retried + + TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0); + TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1); + final Map responseData = new HashMap<>(); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + responseData.put(myTopicPartition0, new OffsetFetchResponse.PartitionData(10, + Optional.empty(), "", Errors.NONE)); + responseData.put(myTopicPartition1, new OffsetFetchResponse.PartitionData(0, + Optional.empty(), "", Errors.COORDINATOR_LOAD_IN_PROGRESS)); + + env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NONE, responseData)); /* * We need to return two responses here, one for NOT_COORDINATOR call when calling list consumer offsets * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response */ + + responseData.clear(); + + responseData.put(myTopicPartition0, new OffsetFetchResponse.PartitionData(10, + Optional.empty(), "", Errors.NONE)); + responseData.put(myTopicPartition1, new OffsetFetchResponse.PartitionData(0, + Optional.empty(), "", Errors.NOT_COORDINATOR)); + + env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NONE, responseData)); + env.kafkaClient().prepareResponse( - new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap())); + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + responseData.clear(); + + responseData.put(myTopicPartition0, new OffsetFetchResponse.PartitionData(10, + Optional.empty(), "", Errors.NONE)); + responseData.put(myTopicPartition1, new OffsetFetchResponse.PartitionData(0, + Optional.empty(), "", Errors.COORDINATOR_NOT_AVAILABLE)); + + env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NONE, responseData)); env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); @@ -3015,22 +3081,26 @@ public void testListConsumerGroupOffsets() throws Exception { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); // Retriable FindCoordinatorResponse errors should be retried - env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); // Retriable errors should be retried - env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap())); env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap())); /* * We need to return two responses here, one for NOT_COORDINATOR error when calling list consumer group offsets * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response */ env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap())); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0); TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1); TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java index b461ea3b23d41..902b338a9a1b5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java @@ -24,9 +24,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.Node; @@ -67,15 +69,29 @@ public void testSuccessfulHandleResponse() { assertCompleted(handleWithError(Errors.NONE), expected); } + + @Test + public void testSuccessfulHandleResponseWithOnePartitionError() { + Map expectedResult = Collections.singletonMap(t0p0, new OffsetAndMetadata(10L)); + + // expected that there's only 1 partition result returned because the other partition is skipped with error + assertCompleted(handleWithPartitionError(Errors.UNKNOWN_TOPIC_OR_PARTITION), expectedResult); + assertCompleted(handleWithPartitionError(Errors.TOPIC_AUTHORIZATION_FAILED), expectedResult); + assertCompleted(handleWithPartitionError(Errors.UNSTABLE_OFFSET_COMMIT), expectedResult); + } + @Test public void testUnmappedHandleResponse() { + assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); + assertUnmapped(handleWithPartitionError(Errors.COORDINATOR_NOT_AVAILABLE)); + assertUnmapped(handleWithPartitionError(Errors.NOT_COORDINATOR)); } @Test public void testRetriableHandleResponse() { assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); + assertRetriable(handleWithPartitionError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); } @Test @@ -91,6 +107,24 @@ private OffsetFetchResponse buildResponse(Errors error) { return response; } + private OffsetFetchResponse buildResponseWithPartitionError(Errors error) { + + Map responseData = new HashMap<>(); + responseData.put(t0p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", Errors.NONE)); + responseData.put(t0p1, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error)); + + OffsetFetchResponse response = new OffsetFetchResponse(Errors.NONE, responseData); + return response; + } + + private AdminApiHandler.ApiResult> handleWithPartitionError( + Errors error + ) { + ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupId, tps, logContext); + OffsetFetchResponse response = buildResponseWithPartitionError(error); + return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response); + } + private AdminApiHandler.ApiResult> handleWithError( Errors error ) { From 9b6f185c2883fa11771e2c965b0ae0bc576cacaa Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 14 Jul 2021 16:51:50 +0800 Subject: [PATCH 2/5] KAFKA-13064: refactor codes --- .../ListConsumerGroupOffsetsHandler.java | 38 ++++++++++++------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java index 71135086f5a02..43303a4f39e97 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java @@ -70,8 +70,18 @@ public AdminApiLookupStrategy lookupStrategy() { return lookupStrategy; } + private void validateKeys( + Set groupIds + ) { + if (!groupIds.equals(Collections.singleton(groupId))) { + throw new IllegalArgumentException("Received unexpected group ids " + groupIds + + " (expected only " + Collections.singleton(groupId) + ")"); + } + } + @Override - public OffsetFetchRequest.Builder buildRequest(int coordinatorId, Set keys) { + public OffsetFetchRequest.Builder buildRequest(int coordinatorId, Set groupIds) { + validateKeys(groupIds); // Set the flag to false as for admin client request, // we don't need to wait for any pending offset state to clear. return new OffsetFetchRequest.Builder(groupId.idValue, false, partitions, false); @@ -83,9 +93,11 @@ public ApiResult> handleR Set groupIds, AbstractResponse abstractResponse ) { + validateKeys(groupIds); + final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse; - Map> completed = new HashMap<>(); - Map failed = new HashMap<>(); + final Map> completed = new HashMap<>(); + final Map failed = new HashMap<>(); final Set groupsToUnmap = new HashSet<>(); final Set groupsToRetry = new HashSet<>(); @@ -112,6 +124,7 @@ public ApiResult> handleR groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata)); } } else { + // In responseData V0-V7, there's no group level error, we have to handle partition errors here handlePartitionError(groupId, topicPartition, error, groupsToUnmap, groupsToRetry); } } @@ -143,30 +156,29 @@ private void handleGroupError( ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `{}` response", groupId, - apiName(), error.exception()); + log.debug("`OffsetFetch` request for group id {} failed due to error {}", groupId, error); failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: // If the coordinator is in the middle of loading, then we just need to retry - log.debug("`{}` request for group {} failed because the coordinator " + - "is still in the process of loading state. Will retry", apiName(), groupId); + log.debug("`OffsetFetch` request for group {} failed because the coordinator " + + "is still in the process of loading state. Will retry", groupId); groupsToRetry.add(groupId); break; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: // If the coordinator is unavailable or there was a coordinator change, then we unmap // the key so that we retry the `FindCoordinator` request - log.debug("`{}` request for group {} returned error {}. " + - "Will attempt to find the coordinator again and retry", apiName(), groupId, error); + log.debug("`OffsetFetch` request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry", groupId, error); groupsToUnmap.add(groupId); break; default: - final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", - groupId, apiName()); - log.error(unexpectedErrorMsg, error.exception()); + final String unexpectedErrorMsg = + String.format("`OffsetFetch` request for group id %s failed due to error %s", groupId, error); + log.error(unexpectedErrorMsg); failed.put(groupId, error.exception(unexpectedErrorMsg)); } } @@ -205,4 +217,4 @@ private void handlePartitionError( } } -} \ No newline at end of file +} From 237ee0dff7ff32e9dda43b04fdab7c84b6e3b9fe Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 14 Jul 2021 20:11:01 +0800 Subject: [PATCH 3/5] KAFKA-13064: refactor --- .../internals/ListConsumerGroupOffsetsHandler.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java index 43303a4f39e97..5f3486bf37a18 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java @@ -156,28 +156,28 @@ private void handleGroupError( ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.debug("`OffsetFetch` request for group id {} failed due to error {}", groupId, error); + log.debug("`OffsetFetch` request for group id {} failed due to error {}", groupId.idValue, error); failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: // If the coordinator is in the middle of loading, then we just need to retry - log.debug("`OffsetFetch` request for group {} failed because the coordinator " + - "is still in the process of loading state. Will retry", groupId); + log.debug("`OffsetFetch` request for group id {} failed because the coordinator " + + "is still in the process of loading state. Will retry", groupId.idValue); groupsToRetry.add(groupId); break; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: // If the coordinator is unavailable or there was a coordinator change, then we unmap // the key so that we retry the `FindCoordinator` request - log.debug("`OffsetFetch` request for group {} returned error {}. " + - "Will attempt to find the coordinator again and retry", groupId, error); + log.debug("`OffsetFetch` request for group id {} returned error {}. " + + "Will attempt to find the coordinator again and retry", groupId.idValue, error); groupsToUnmap.add(groupId); break; default: final String unexpectedErrorMsg = - String.format("`OffsetFetch` request for group id %s failed due to error %s", groupId, error); + String.format("`OffsetFetch` request for group id %s failed due to error %s", groupId.idValue, error); log.error(unexpectedErrorMsg); failed.put(groupId, error.exception(unexpectedErrorMsg)); } From b685e6f0230f32e34c5cd83fe0cb80df06946504 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Thu, 15 Jul 2021 11:45:24 +0800 Subject: [PATCH 4/5] KAFKA-13064: update the comment to V0 and V1 --- .../admin/internals/ListConsumerGroupOffsetsHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java index 5f3486bf37a18..31853d61f65c1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java @@ -124,7 +124,7 @@ public ApiResult> handleR groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata)); } } else { - // In responseData V0-V7, there's no group level error, we have to handle partition errors here + // In responseData V0 and V1, there's no top level error, we have to handle errors here handlePartitionError(groupId, topicPartition, error, groupsToUnmap, groupsToRetry); } } From b41f92da38928c47c1a0de2692ecae0c977f7b05 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Fri, 16 Jul 2021 10:02:18 +0800 Subject: [PATCH 5/5] KAFKA-13064: remove handlePartitionError since group error contains all group level errors --- .../ListConsumerGroupOffsetsHandler.java | 80 ++++--------------- .../clients/admin/KafkaAdminClientTest.java | 64 +-------------- .../ListConsumerGroupOffsetsHandlerTest.java | 3 - 3 files changed, 17 insertions(+), 130 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java index 31853d61f65c1..bfcf1ab69b63f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java @@ -96,14 +96,16 @@ public ApiResult> handleR validateKeys(groupIds); final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse; - final Map> completed = new HashMap<>(); - final Map failed = new HashMap<>(); - final Set groupsToUnmap = new HashSet<>(); - final Set groupsToRetry = new HashSet<>(); - - Errors responseError = response.groupLevelError(groupId.idValue); - if (responseError != Errors.NONE) { - handleGroupError(groupId, responseError, failed, groupsToUnmap, groupsToRetry); + + // the groupError will contain the group level error for v0-v8 OffsetFetchResponse + Errors groupError = response.groupLevelError(groupId.idValue); + if (groupError != Errors.NONE) { + final Map failed = new HashMap<>(); + final Set groupsToUnmap = new HashSet<>(); + + handleGroupError(groupId, groupError, failed, groupsToUnmap); + + return new ApiResult<>(Collections.emptyMap(), failed, new ArrayList<>(groupsToUnmap)); } else { final Map groupOffsetsListing = new HashMap<>(); Map partitionDataMap = @@ -124,25 +126,14 @@ public ApiResult> handleR groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata)); } } else { - // In responseData V0 and V1, there's no top level error, we have to handle errors here - handlePartitionError(groupId, topicPartition, error, groupsToUnmap, groupsToRetry); + log.warn("Skipping return offset for {} due to error {}.", topicPartition, error); } } - completed.put(groupId, groupOffsetsListing); - } - if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { - return new ApiResult<>( - completed, - failed, - Collections.emptyList() - ); - } else { - // retry the request, so don't send completed/failed results back return new ApiResult<>( + Collections.singletonMap(groupId, groupOffsetsListing), Collections.emptyMap(), - Collections.emptyMap(), - new ArrayList<>(groupsToUnmap) + Collections.emptyList() ); } } @@ -151,8 +142,7 @@ private void handleGroupError( CoordinatorKey groupId, Errors error, Map failed, - Set groupsToUnmap, - Set groupsToRetry + Set groupsToUnmap ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: @@ -164,7 +154,6 @@ private void handleGroupError( // If the coordinator is in the middle of loading, then we just need to retry log.debug("`OffsetFetch` request for group id {} failed because the coordinator " + "is still in the process of loading state. Will retry", groupId.idValue); - groupsToRetry.add(groupId); break; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: @@ -176,45 +165,8 @@ private void handleGroupError( break; default: - final String unexpectedErrorMsg = - String.format("`OffsetFetch` request for group id %s failed due to error %s", groupId.idValue, error); - log.error(unexpectedErrorMsg); - failed.put(groupId, error.exception(unexpectedErrorMsg)); - } - } - - private void handlePartitionError( - CoordinatorKey groupId, - TopicPartition topicPartition, - Errors error, - Set groupsToUnmap, - Set groupsToRetry - ) { - switch (error) { - case COORDINATOR_LOAD_IN_PROGRESS: - // If the coordinator is in the middle of loading, then we just need to retry - log.debug("`{}` request for group {} failed because the coordinator " + - "is still in the process of loading state. Will retry", apiName(), groupId); - groupsToRetry.add(groupId); - break; - case COORDINATOR_NOT_AVAILABLE: - case NOT_COORDINATOR: - // If the coordinator is unavailable or there was a coordinator change, then we unmap - // the key so that we retry the `FindCoordinator` request - log.debug("`{}` request for group {} returned error {}. " + - "Will attempt to find the coordinator again and retry", apiName(), groupId, error); - groupsToUnmap.add(groupId); - break; - case UNKNOWN_TOPIC_OR_PARTITION: - case TOPIC_AUTHORIZATION_FAILED: - case UNSTABLE_OFFSET_COMMIT: - log.warn("`{}` request for group {} returned error {} in partition {}. Skipping return offset for it.", - apiName(), groupId, error, topicPartition); - break; - default: - log.error("`{}` request for group {} returned unexpected error {} in partition {}. Skipping return offset for it.", - apiName(), groupId, error, topicPartition); + log.error("`OffsetFetch` request for group id {} failed due to unexpected error {}", groupId.idValue, error); + failed.put(groupId, error.exception()); } } - } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index d9a948456bf80..977b0374b8e51 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -2990,68 +2990,6 @@ public void testListConsumerGroupOffsetsRetriableErrors() throws Exception { } } - @Test - public void testListConsumerGroupOffsetsRetriableErrorsInPartition() throws Exception { - // Retriable errors should be retried - - TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0); - TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1); - final Map responseData = new HashMap<>(); - - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - - env.kafkaClient().prepareResponse( - prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - - responseData.put(myTopicPartition0, new OffsetFetchResponse.PartitionData(10, - Optional.empty(), "", Errors.NONE)); - responseData.put(myTopicPartition1, new OffsetFetchResponse.PartitionData(0, - Optional.empty(), "", Errors.COORDINATOR_LOAD_IN_PROGRESS)); - - env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NONE, responseData)); - - /* - * We need to return two responses here, one for NOT_COORDINATOR call when calling list consumer offsets - * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a - * FindCoordinatorResponse. - * - * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response - */ - - responseData.clear(); - - responseData.put(myTopicPartition0, new OffsetFetchResponse.PartitionData(10, - Optional.empty(), "", Errors.NONE)); - responseData.put(myTopicPartition1, new OffsetFetchResponse.PartitionData(0, - Optional.empty(), "", Errors.NOT_COORDINATOR)); - - env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NONE, responseData)); - - env.kafkaClient().prepareResponse( - prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - - responseData.clear(); - - responseData.put(myTopicPartition0, new OffsetFetchResponse.PartitionData(10, - Optional.empty(), "", Errors.NONE)); - responseData.put(myTopicPartition1, new OffsetFetchResponse.PartitionData(0, - Optional.empty(), "", Errors.COORDINATOR_NOT_AVAILABLE)); - - env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NONE, responseData)); - - env.kafkaClient().prepareResponse( - prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - - env.kafkaClient().prepareResponse( - new OffsetFetchResponse(Errors.NONE, Collections.emptyMap())); - - final ListConsumerGroupOffsetsResult errorResult1 = env.adminClient().listConsumerGroupOffsets(GROUP_ID); - - assertEquals(Collections.emptyMap(), errorResult1.partitionsToOffsetAndMetadata().get()); - } - } - @Test public void testListConsumerGroupOffsetsNonRetriableErrors() throws Exception { // Non-retriable errors throw an exception @@ -3081,7 +3019,7 @@ public void testListConsumerGroupOffsets() throws Exception { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); // Retriable FindCoordinatorResponse errors should be retried - env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java index 902b338a9a1b5..9c9bb1e58adb5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java @@ -84,14 +84,11 @@ public void testSuccessfulHandleResponseWithOnePartitionError() { public void testUnmappedHandleResponse() { assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); - assertUnmapped(handleWithPartitionError(Errors.COORDINATOR_NOT_AVAILABLE)); - assertUnmapped(handleWithPartitionError(Errors.NOT_COORDINATOR)); } @Test public void testRetriableHandleResponse() { assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); - assertRetriable(handleWithPartitionError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); } @Test