From 68d99561a491c84810330b4aa442d5dd3d2a257a Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Mon, 12 Jul 2021 14:35:30 +0800 Subject: [PATCH 1/5] KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests --- .../DeleteConsumerGroupOffsetsHandler.java | 110 ++++++++++++++---- .../clients/admin/KafkaAdminClientTest.java | 83 +++++++++++-- ...DeleteConsumerGroupOffsetsHandlerTest.java | 110 ++++++++++++++---- 3 files changed, 248 insertions(+), 55 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java index 7e8b549b323c2..0efc42b800c36 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java @@ -19,7 +19,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.List; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -100,51 +100,115 @@ public ApiResult> handleResponse( final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; Map> completed = new HashMap<>(); Map failed = new HashMap<>(); - List unmapped = new ArrayList<>(); + final Set groupsToUnmap = new HashSet<>(); + final Set groupsToRetry = new HashSet<>(); final Errors error = Errors.forCode(response.data().errorCode()); if (error != Errors.NONE) { - handleError(groupId, error, failed, unmapped); + handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); } else { - final Map partitions = new HashMap<>(); - response.data().topics().forEach(topic -> - topic.partitions().forEach(partition -> { - Errors partitionError = Errors.forCode(partition.errorCode()); - if (!handleError(groupId, partitionError, failed, unmapped)) { - partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); + final Map partitionResults = new HashMap<>(); + response.data().topics().forEach(topic -> + topic.partitions().forEach(partitionoffsetDeleteResponse -> { + Errors partitionError = Errors.forCode(partitionoffsetDeleteResponse.errorCode()); + TopicPartition topicPartition = new TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex()); + if (partitionError != Errors.NONE) { + handlePartitionError(groupId, partitionError, topicPartition, groupsToUnmap, groupsToRetry); } + + partitionResults.put(new TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex()), partitionError); }) ); - if (!partitions.isEmpty()) - completed.put(groupId, partitions); + + completed.put(groupId, partitionResults); + } + + if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { + return new ApiResult<>( + completed, + failed, + Collections.emptyList() + ); + } else { + // retry the request, so don't send completed/failed results back + return new ApiResult<>( + Collections.emptyMap(), + Collections.emptyMap(), + new ArrayList<>(groupsToUnmap) + ); } - return new ApiResult<>(completed, failed, unmapped); } - private boolean handleError( + private void handleGroupError( CoordinatorKey groupId, Errors error, Map failed, - List unmapped + Set groupsToUnmap, + Set groupsToRetry ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: case GROUP_ID_NOT_FOUND: case INVALID_GROUP_ID: - log.error("Received non retriable error for group {} in `DeleteConsumerGroupOffsets` response", groupId, - error.exception()); + case NON_EMPTY_GROUP: + log.error("Received non retriable error for group {} in `{}` response", groupId, + apiName(), error.exception()); failed.put(groupId, error.exception()); - return true; + break; + case COORDINATOR_LOAD_IN_PROGRESS: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`{}` request for group {} failed because the coordinator" + + " is still in the process of loading state. Will retry.", apiName(), groupId); + groupsToRetry.add(groupId); + break; + case COORDINATOR_NOT_AVAILABLE: + case NOT_COORDINATOR: + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`{}` request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry.", apiName(), groupId, error); + groupsToUnmap.add(groupId); + break; + default: + final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", + groupId, apiName()); + log.error(unexpectedErrorMsg, error.exception()); + failed.put(groupId, error.exception()); + break; + } + } + + private void handlePartitionError( + CoordinatorKey groupId, + Errors error, + TopicPartition topicPartition, + Set groupsToUnmap, + Set groupsToRetry + ) { + switch (error) { case COORDINATOR_LOAD_IN_PROGRESS: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`{}` request for group {} in partition {} failed because the coordinator" + + " is still in the process of loading state. Will retry.", apiName(), groupId, topicPartition); + groupsToRetry.add(groupId); + break; case COORDINATOR_NOT_AVAILABLE: - return true; case NOT_COORDINATOR: - log.debug("DeleteConsumerGroupOffsets request for group {} returned error {}. Will retry", - groupId, error); - unmapped.add(groupId); - return true; + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`{}` request for group {} in partition {} returned error {}. " + + "Will attempt to find the coordinator again and retry.", apiName(), groupId, topicPartition, error); + groupsToUnmap.add(groupId); + break; + case GROUP_SUBSCRIBED_TO_TOPIC: + case TOPIC_AUTHORIZATION_FAILED: + case UNKNOWN_TOPIC_OR_PARTITION: + log.debug("`{}` request for group {} in partition {} returned error {}.", apiName(), groupId, topicPartition, error); + break; default: - return false; + log.error("`{}` request for group {} in partition {} returned unexpected error {}.", + apiName(), groupId, topicPartition, error); + break; } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 53e326ad8955f..973bea1f6b03a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -3345,12 +3345,9 @@ public void testDeleteConsumerGroupOffsetsRetryBackoff() throws Exception { } @Test - public void testDeleteConsumerGroupOffsets() throws Exception { - // Happy path - + public void testDeleteConsumerGroupOffsetsResponseIncludeCoordinatorErrorAndNoneError() throws Exception { final TopicPartition tp1 = new TopicPartition("foo", 0); final TopicPartition tp2 = new TopicPartition("bar", 0); - final TopicPartition tp3 = new TopicPartition("foobar", 0); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); @@ -3373,12 +3370,77 @@ public void testDeleteConsumerGroupOffsets() throws Exception { .setPartitions(new OffsetDeleteResponsePartitionCollection(Collections.singletonList( new OffsetDeleteResponsePartition() .setPartitionIndex(0) - .setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code()) + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) ).iterator())) ).collect(Collectors.toList()).iterator())) ) ); + env.kafkaClient().prepareResponse(new OffsetDeleteResponse( + new OffsetDeleteResponseData() + .setTopics(new OffsetDeleteResponseTopicCollection(Stream.of( + new OffsetDeleteResponseTopic() + .setName("foo") + .setPartitions(new OffsetDeleteResponsePartitionCollection(Collections.singletonList( + new OffsetDeleteResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code()) + ).iterator())), + new OffsetDeleteResponseTopic() + .setName("bar") + .setPartitions(new OffsetDeleteResponsePartitionCollection(Collections.singletonList( + new OffsetDeleteResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code()) + ).iterator())) + ).collect(Collectors.toList()).iterator())) + ) + ); + + final DeleteConsumerGroupOffsetsResult errorResult = env.adminClient().deleteConsumerGroupOffsets( + GROUP_ID, Stream.of(tp1, tp2).collect(Collectors.toSet())); + + assertNull(errorResult.partitionResult(tp1).get()); + assertNull(errorResult.partitionResult(tp2).get()); + assertNull(errorResult.all().get()); + } + } + + @Test + public void testDeleteConsumerGroupOffsets() throws Exception { + // Happy path + + final TopicPartition tp1 = new TopicPartition("foo", 0); + final TopicPartition tp2 = new TopicPartition("bar", 0); + final TopicPartition tp3 = new TopicPartition("foobar", 0); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + env.kafkaClient().prepareResponse(new OffsetDeleteResponse( + new OffsetDeleteResponseData() + .setTopics(new OffsetDeleteResponseTopicCollection(Stream.of( + new OffsetDeleteResponseTopic() + .setName("foo") + .setPartitions(new OffsetDeleteResponsePartitionCollection(Collections.singletonList( + new OffsetDeleteResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code()) + ).iterator())), + new OffsetDeleteResponseTopic() + .setName("bar") + .setPartitions(new OffsetDeleteResponsePartitionCollection(Collections.singletonList( + new OffsetDeleteResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code()) + ).iterator())) + ).collect(Collectors.toList()).iterator())) + ) + ); + final DeleteConsumerGroupOffsetsResult errorResult = env.adminClient().deleteConsumerGroupOffsets( GROUP_ID, Stream.of(tp1, tp2).collect(Collectors.toSet())); @@ -3401,9 +3463,6 @@ public void testDeleteConsumerGroupOffsetsRetriableErrors() throws Exception { env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - env.kafkaClient().prepareResponse( - prepareOffsetDeleteResponse(Errors.COORDINATOR_NOT_AVAILABLE)); - env.kafkaClient().prepareResponse( prepareOffsetDeleteResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS)); @@ -3411,6 +3470,8 @@ public void testDeleteConsumerGroupOffsetsRetriableErrors() throws Exception { * We need to return two responses here, one for NOT_COORDINATOR call when calling delete a consumer group * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response */ env.kafkaClient().prepareResponse( prepareOffsetDeleteResponse(Errors.NOT_COORDINATOR)); @@ -3418,6 +3479,12 @@ public void testDeleteConsumerGroupOffsetsRetriableErrors() throws Exception { env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse( + prepareOffsetDeleteResponse(Errors.COORDINATOR_NOT_AVAILABLE)); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse( prepareOffsetDeleteResponse("foo", 0, Errors.NONE)); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java index 439b37733d98c..fd4b874160ffa 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java @@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Map; @@ -33,6 +34,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.message.OffsetDeleteResponseData; import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartition; @@ -67,48 +69,91 @@ public void testBuildRequest() { @Test public void testSuccessfulHandleResponse() { Map responseData = Collections.singletonMap(t0p0, Errors.NONE); - assertCompleted(handleWithError(Errors.NONE), responseData); + assertCompleted(handleWithGroupError(Errors.NONE), responseData); } @Test public void testUnmappedHandleResponse() { - assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); + assertUnmapped(handleWithGroupError(Errors.NOT_COORDINATOR)); + assertUnmapped(handleWithGroupError(Errors.COORDINATOR_NOT_AVAILABLE)); + assertUnmapped(handleWithPartitionError(Errors.NOT_COORDINATOR)); + assertUnmapped(handleWithPartitionError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test public void testRetriableHandleResponse() { - assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); + assertRetriable(handleWithGroupError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); + assertRetriable(handleWithPartitionError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); } @Test - public void testFailedHandleResponse() { - assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED)); - assertFailed(GroupIdNotFoundException.class, handleWithError(Errors.GROUP_ID_NOT_FOUND)); - assertFailed(InvalidGroupIdException.class, handleWithError(Errors.INVALID_GROUP_ID)); + public void testFailedHandleResponseWithGroupError() { + assertGroupFailed(GroupAuthorizationException.class, handleWithGroupError(Errors.GROUP_AUTHORIZATION_FAILED)); + assertGroupFailed(GroupIdNotFoundException.class, handleWithGroupError(Errors.GROUP_ID_NOT_FOUND)); + assertGroupFailed(InvalidGroupIdException.class, handleWithGroupError(Errors.INVALID_GROUP_ID)); + assertGroupFailed(GroupNotEmptyException.class, handleWithGroupError(Errors.NON_EMPTY_GROUP)); } - private OffsetDeleteResponse buildResponse(Errors error) { + @Test + public void testFailedHandleResponseWithPartitionError() { + assertPartitionFailed(Collections.singletonMap(t0p0, Errors.GROUP_SUBSCRIBED_TO_TOPIC), + handleWithPartitionError(Errors.GROUP_SUBSCRIBED_TO_TOPIC)); + assertPartitionFailed(Collections.singletonMap(t0p0, Errors.TOPIC_AUTHORIZATION_FAILED), + handleWithPartitionError(Errors.TOPIC_AUTHORIZATION_FAILED)); + assertPartitionFailed(Collections.singletonMap(t0p0, Errors.UNKNOWN_TOPIC_OR_PARTITION), + handleWithPartitionError(Errors.UNKNOWN_TOPIC_OR_PARTITION)); + } + + private OffsetDeleteResponse buildGroupErrorResponse(Errors error) { + OffsetDeleteResponse response = new OffsetDeleteResponse( + new OffsetDeleteResponseData() + .setErrorCode(error.code())); + if (error == Errors.NONE) { + response.data() + .setThrottleTimeMs(0) + .setTopics(new OffsetDeleteResponseTopicCollection(singletonList( + new OffsetDeleteResponseTopic() + .setName("t0") + .setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList( + new OffsetDeleteResponsePartition() + .setPartitionIndex(0) + .setErrorCode(error.code()) + ).iterator())) + ).iterator())); + } + return response; + } + + private OffsetDeleteResponse buildPartitionErrorResponse(Errors error) { OffsetDeleteResponse response = new OffsetDeleteResponse( - new OffsetDeleteResponseData() - .setThrottleTimeMs(0) - .setTopics(new OffsetDeleteResponseTopicCollection(singletonList( - new OffsetDeleteResponseTopic() - .setName("t0") - .setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList( - new OffsetDeleteResponsePartition() - .setPartitionIndex(0) - .setErrorCode(error.code()) - ).iterator())) - ).iterator()))); + new OffsetDeleteResponseData() + .setThrottleTimeMs(0) + .setTopics(new OffsetDeleteResponseTopicCollection(singletonList( + new OffsetDeleteResponseTopic() + .setName("t0") + .setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList( + new OffsetDeleteResponsePartition() + .setPartitionIndex(0) + .setErrorCode(error.code()) + ).iterator())) + ).iterator())) + ); return response; } - private AdminApiHandler.ApiResult> handleWithError( + private AdminApiHandler.ApiResult> handleWithGroupError( Errors error ) { DeleteConsumerGroupOffsetsHandler handler = new DeleteConsumerGroupOffsetsHandler(groupId, tps, logContext); - OffsetDeleteResponse response = buildResponse(error); + OffsetDeleteResponse response = buildGroupErrorResponse(error); + return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response); + } + + private AdminApiHandler.ApiResult> handleWithPartitionError( + Errors error + ) { + DeleteConsumerGroupOffsetsHandler handler = new DeleteConsumerGroupOffsetsHandler(groupId, tps, logContext); + OffsetDeleteResponse response = buildPartitionErrorResponse(error); return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response); } @@ -139,7 +184,7 @@ private void assertCompleted( assertEquals(expected, result.completedKeys.get(key)); } - private void assertFailed( + private void assertGroupFailed( Class expectedExceptionType, AdminApiHandler.ApiResult> result ) { @@ -149,4 +194,21 @@ private void assertFailed( assertEquals(singleton(key), result.failedKeys.keySet()); assertTrue(expectedExceptionType.isInstance(result.failedKeys.get(key))); } -} + + private void assertPartitionFailed( + Map expectedResult, + AdminApiHandler.ApiResult> result + ) { + CoordinatorKey key = CoordinatorKey.byGroupId(groupId); + assertEquals(singleton(key), result.completedKeys.keySet()); + + // verify the completed value is expected result + Collection> completeCollection = result.completedKeys.values(); + assertEquals(1, completeCollection.size()); + Map completeMap = completeCollection.iterator().next(); + assertEquals(expectedResult, completeMap); + + assertEquals(emptyList(), result.unmappedKeys); + assertEquals(emptySet(), result.failedKeys.keySet()); + } +} \ No newline at end of file From cbc6743ab736fd8356111c8a5760336ac1903491 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Tue, 13 Jul 2021 21:46:31 +0800 Subject: [PATCH 2/5] KAFKA-13059: rename back to 'partition' --- .../internals/DeleteConsumerGroupOffsetsHandler.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java index 0efc42b800c36..f9ad960699c67 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java @@ -109,14 +109,14 @@ public ApiResult> handleResponse( } else { final Map partitionResults = new HashMap<>(); response.data().topics().forEach(topic -> - topic.partitions().forEach(partitionoffsetDeleteResponse -> { - Errors partitionError = Errors.forCode(partitionoffsetDeleteResponse.errorCode()); - TopicPartition topicPartition = new TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex()); + topic.partitions().forEach(partition -> { + Errors partitionError = Errors.forCode(partition.errorCode()); + TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex()); if (partitionError != Errors.NONE) { handlePartitionError(groupId, partitionError, topicPartition, groupsToUnmap, groupsToRetry); } - partitionResults.put(new TopicPartition(topic.name(), partitionoffsetDeleteResponse.partitionIndex()), partitionError); + partitionResults.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); }) ); From 68a491e2ee04e5d4176cf231c766f642efa10c18 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 14 Jul 2021 10:35:25 +0800 Subject: [PATCH 3/5] KAFKA-13059: don't handle partition error and fix tests --- .../DeleteConsumerGroupOffsetsHandler.java | 68 ++++++------------ .../clients/admin/KafkaAdminClientTest.java | 69 ++----------------- ...DeleteConsumerGroupOffsetsHandlerTest.java | 3 - 3 files changed, 24 insertions(+), 116 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java index f9ad960699c67..b62f5814e1ef7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java @@ -72,8 +72,19 @@ public static AdminApiFuture.SimpleAdminApiFuture groupIds + ) { + if (!groupIds.equals(Collections.singleton(groupId))) { + throw new IllegalArgumentException("Received unexpected group ids " + groupIds + + " (expected only " + Collections.singleton(groupId) + ")"); + } + } + @Override - public OffsetDeleteRequest.Builder buildRequest(int coordinatorId, Set keys) { + public OffsetDeleteRequest.Builder buildRequest(int coordinatorId, Set groupIds) { + validateKeys(groupIds); + final OffsetDeleteRequestTopicCollection topics = new OffsetDeleteRequestTopicCollection(); partitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).forEach((topic, topicPartitions) -> topics.add( new OffsetDeleteRequestTopic() @@ -97,6 +108,8 @@ public ApiResult> handleResponse( Set groupIds, AbstractResponse abstractResponse ) { + validateKeys(groupIds); + final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; Map> completed = new HashMap<>(); Map failed = new HashMap<>(); @@ -111,10 +124,6 @@ public ApiResult> handleResponse( response.data().topics().forEach(topic -> topic.partitions().forEach(partition -> { Errors partitionError = Errors.forCode(partition.errorCode()); - TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex()); - if (partitionError != Errors.NONE) { - handlePartitionError(groupId, partitionError, topicPartition, groupsToUnmap, groupsToRetry); - } partitionResults.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); }) @@ -151,65 +160,28 @@ private void handleGroupError( case GROUP_ID_NOT_FOUND: case INVALID_GROUP_ID: case NON_EMPTY_GROUP: - log.error("Received non retriable error for group {} in `{}` response", groupId, - apiName(), error.exception()); + log.debug("`OffsetDelete` request for group id {} failed due to error {}.", groupId, error); failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: // If the coordinator is in the middle of loading, then we just need to retry - log.debug("`{}` request for group {} failed because the coordinator" + - " is still in the process of loading state. Will retry.", apiName(), groupId); + log.debug("`OffsetDelete` request for group {} failed because the coordinator" + + " is still in the process of loading state. Will retry.", groupId); groupsToRetry.add(groupId); break; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: // If the coordinator is unavailable or there was a coordinator change, then we unmap // the key so that we retry the `FindCoordinator` request - log.debug("`{}` request for group {} returned error {}. " + - "Will attempt to find the coordinator again and retry.", apiName(), groupId, error); + log.debug("`OffsetDelete` request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry.", groupId, error); groupsToUnmap.add(groupId); break; default: - final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", - groupId, apiName()); - log.error(unexpectedErrorMsg, error.exception()); + log.error("`OffsetDelete` request for group id {} failed due to unexpected error {}.", groupId, error); failed.put(groupId, error.exception()); break; } } - private void handlePartitionError( - CoordinatorKey groupId, - Errors error, - TopicPartition topicPartition, - Set groupsToUnmap, - Set groupsToRetry - ) { - switch (error) { - case COORDINATOR_LOAD_IN_PROGRESS: - // If the coordinator is in the middle of loading, then we just need to retry - log.debug("`{}` request for group {} in partition {} failed because the coordinator" + - " is still in the process of loading state. Will retry.", apiName(), groupId, topicPartition); - groupsToRetry.add(groupId); - break; - case COORDINATOR_NOT_AVAILABLE: - case NOT_COORDINATOR: - // If the coordinator is unavailable or there was a coordinator change, then we unmap - // the key so that we retry the `FindCoordinator` request - log.debug("`{}` request for group {} in partition {} returned error {}. " + - "Will attempt to find the coordinator again and retry.", apiName(), groupId, topicPartition, error); - groupsToUnmap.add(groupId); - break; - case GROUP_SUBSCRIBED_TO_TOPIC: - case TOPIC_AUTHORIZATION_FAILED: - case UNKNOWN_TOPIC_OR_PARTITION: - log.debug("`{}` request for group {} in partition {} returned error {}.", apiName(), groupId, topicPartition, error); - break; - default: - log.error("`{}` request for group {} in partition {} returned unexpected error {}.", - apiName(), groupId, topicPartition, error); - break; - } - } - } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 973bea1f6b03a..b41e5b8820e7c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -3290,11 +3290,11 @@ public void testDeleteConsumerGroupOffsetsNumRetries() throws Exception { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - env.kafkaClient().prepareResponse(prepareOffsetDeleteResponse("foo", 0, Errors.NOT_COORDINATOR)); + env.kafkaClient().prepareResponse(prepareOffsetDeleteResponse(Errors.NOT_COORDINATOR)); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); final DeleteConsumerGroupOffsetsResult result = env.adminClient() - .deleteConsumerGroupOffsets("groupId", Stream.of(tp1).collect(Collectors.toSet())); + .deleteConsumerGroupOffsets(GROUP_ID, Stream.of(tp1).collect(Collectors.toSet())); TestUtils.assertFutureError(result.all(), TimeoutException.class); } @@ -3322,7 +3322,8 @@ public void testDeleteConsumerGroupOffsetsRetryBackoff() throws Exception { mockClient.prepareResponse(body -> { firstAttemptTime.set(time.milliseconds()); return true; - }, prepareOffsetDeleteResponse("foo", 0, Errors.NOT_COORDINATOR)); + }, prepareOffsetDeleteResponse(Errors.NOT_COORDINATOR)); + mockClient.prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); @@ -3344,68 +3345,6 @@ public void testDeleteConsumerGroupOffsetsRetryBackoff() throws Exception { } } - @Test - public void testDeleteConsumerGroupOffsetsResponseIncludeCoordinatorErrorAndNoneError() throws Exception { - final TopicPartition tp1 = new TopicPartition("foo", 0); - final TopicPartition tp2 = new TopicPartition("bar", 0); - - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - - env.kafkaClient().prepareResponse( - prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - - env.kafkaClient().prepareResponse(new OffsetDeleteResponse( - new OffsetDeleteResponseData() - .setTopics(new OffsetDeleteResponseTopicCollection(Stream.of( - new OffsetDeleteResponseTopic() - .setName("foo") - .setPartitions(new OffsetDeleteResponsePartitionCollection(Collections.singletonList( - new OffsetDeleteResponsePartition() - .setPartitionIndex(0) - .setErrorCode(Errors.NONE.code()) - ).iterator())), - new OffsetDeleteResponseTopic() - .setName("bar") - .setPartitions(new OffsetDeleteResponsePartitionCollection(Collections.singletonList( - new OffsetDeleteResponsePartition() - .setPartitionIndex(0) - .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) - ).iterator())) - ).collect(Collectors.toList()).iterator())) - ) - ); - - env.kafkaClient().prepareResponse(new OffsetDeleteResponse( - new OffsetDeleteResponseData() - .setTopics(new OffsetDeleteResponseTopicCollection(Stream.of( - new OffsetDeleteResponseTopic() - .setName("foo") - .setPartitions(new OffsetDeleteResponsePartitionCollection(Collections.singletonList( - new OffsetDeleteResponsePartition() - .setPartitionIndex(0) - .setErrorCode(Errors.NONE.code()) - ).iterator())), - new OffsetDeleteResponseTopic() - .setName("bar") - .setPartitions(new OffsetDeleteResponsePartitionCollection(Collections.singletonList( - new OffsetDeleteResponsePartition() - .setPartitionIndex(0) - .setErrorCode(Errors.NONE.code()) - ).iterator())) - ).collect(Collectors.toList()).iterator())) - ) - ); - - final DeleteConsumerGroupOffsetsResult errorResult = env.adminClient().deleteConsumerGroupOffsets( - GROUP_ID, Stream.of(tp1, tp2).collect(Collectors.toSet())); - - assertNull(errorResult.partitionResult(tp1).get()); - assertNull(errorResult.partitionResult(tp2).get()); - assertNull(errorResult.all().get()); - } - } - @Test public void testDeleteConsumerGroupOffsets() throws Exception { // Happy path diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java index fd4b874160ffa..dd7adc4e2a1e0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java @@ -76,14 +76,11 @@ public void testSuccessfulHandleResponse() { public void testUnmappedHandleResponse() { assertUnmapped(handleWithGroupError(Errors.NOT_COORDINATOR)); assertUnmapped(handleWithGroupError(Errors.COORDINATOR_NOT_AVAILABLE)); - assertUnmapped(handleWithPartitionError(Errors.NOT_COORDINATOR)); - assertUnmapped(handleWithPartitionError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test public void testRetriableHandleResponse() { assertRetriable(handleWithGroupError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); - assertRetriable(handleWithPartitionError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); } @Test From 7c1cd0aab63dabf2e4f3b84f495a53546179c9e1 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 14 Jul 2021 15:19:17 +0800 Subject: [PATCH 4/5] KAFKA-13059: refactor --- .../admin/internals/DeleteConsumerGroupOffsetsHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java index b62f5814e1ef7..810f0c9043b83 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java @@ -111,8 +111,8 @@ public ApiResult> handleResponse( validateKeys(groupIds); final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; - Map> completed = new HashMap<>(); - Map failed = new HashMap<>(); + final Map> completed = new HashMap<>(); + final Map failed = new HashMap<>(); final Set groupsToUnmap = new HashSet<>(); final Set groupsToRetry = new HashSet<>(); From 4bc3d9aed3993a69e50157a1b7cc06b0c88a4f94 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 14 Jul 2021 18:00:01 +0800 Subject: [PATCH 5/5] KAFKA-13059: address comments to refactor code --- .../DeleteConsumerGroupOffsetsHandler.java | 44 +++++++------------ .../clients/admin/KafkaAdminClientTest.java | 34 +++++++------- ...DeleteConsumerGroupOffsetsHandlerTest.java | 13 +++--- 3 files changed, 39 insertions(+), 52 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java index 810f0c9043b83..f766a8726a71d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java @@ -111,14 +111,15 @@ public ApiResult> handleResponse( validateKeys(groupIds); final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; - final Map> completed = new HashMap<>(); - final Map failed = new HashMap<>(); - final Set groupsToUnmap = new HashSet<>(); - final Set groupsToRetry = new HashSet<>(); - final Errors error = Errors.forCode(response.data().errorCode()); + if (error != Errors.NONE) { - handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); + final Map failed = new HashMap<>(); + final Set groupsToUnmap = new HashSet<>(); + + handleGroupError(groupId, error, failed, groupsToUnmap); + + return new ApiResult<>(Collections.emptyMap(), failed, new ArrayList<>(groupsToUnmap)); } else { final Map partitionResults = new HashMap<>(); response.data().topics().forEach(topic -> @@ -129,21 +130,10 @@ public ApiResult> handleResponse( }) ); - completed.put(groupId, partitionResults); - } - - if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { return new ApiResult<>( - completed, - failed, - Collections.emptyList() - ); - } else { - // retry the request, so don't send completed/failed results back - return new ApiResult<>( - Collections.emptyMap(), + Collections.singletonMap(groupId, partitionResults), Collections.emptyMap(), - new ArrayList<>(groupsToUnmap) + Collections.emptyList() ); } } @@ -152,33 +142,31 @@ private void handleGroupError( CoordinatorKey groupId, Errors error, Map failed, - Set groupsToUnmap, - Set groupsToRetry + Set groupsToUnmap ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: case GROUP_ID_NOT_FOUND: case INVALID_GROUP_ID: case NON_EMPTY_GROUP: - log.debug("`OffsetDelete` request for group id {} failed due to error {}.", groupId, error); + log.debug("`OffsetDelete` request for group id {} failed due to error {}.", groupId.idValue, error); failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: // If the coordinator is in the middle of loading, then we just need to retry - log.debug("`OffsetDelete` request for group {} failed because the coordinator" + - " is still in the process of loading state. Will retry.", groupId); - groupsToRetry.add(groupId); + log.debug("`OffsetDelete` request for group id {} failed because the coordinator" + + " is still in the process of loading state. Will retry.", groupId.idValue); break; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: // If the coordinator is unavailable or there was a coordinator change, then we unmap // the key so that we retry the `FindCoordinator` request - log.debug("`OffsetDelete` request for group {} returned error {}. " + - "Will attempt to find the coordinator again and retry.", groupId, error); + log.debug("`OffsetDelete` request for group id {} returned error {}. " + + "Will attempt to find the coordinator again and retry.", groupId.idValue, error); groupsToUnmap.add(groupId); break; default: - log.error("`OffsetDelete` request for group id {} failed due to unexpected error {}.", groupId, error); + log.error("`OffsetDelete` request for group id {} failed due to unexpected error {}.", groupId.idValue, error); failed.put(groupId, error.exception()); break; } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index b41e5b8820e7c..e79890ff8b87e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -3360,23 +3360,23 @@ public void testDeleteConsumerGroupOffsets() throws Exception { prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); env.kafkaClient().prepareResponse(new OffsetDeleteResponse( - new OffsetDeleteResponseData() - .setTopics(new OffsetDeleteResponseTopicCollection(Stream.of( - new OffsetDeleteResponseTopic() - .setName("foo") - .setPartitions(new OffsetDeleteResponsePartitionCollection(Collections.singletonList( - new OffsetDeleteResponsePartition() - .setPartitionIndex(0) - .setErrorCode(Errors.NONE.code()) - ).iterator())), - new OffsetDeleteResponseTopic() - .setName("bar") - .setPartitions(new OffsetDeleteResponsePartitionCollection(Collections.singletonList( - new OffsetDeleteResponsePartition() - .setPartitionIndex(0) - .setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code()) - ).iterator())) - ).collect(Collectors.toList()).iterator())) + new OffsetDeleteResponseData() + .setTopics(new OffsetDeleteResponseTopicCollection(Stream.of( + new OffsetDeleteResponseTopic() + .setName("foo") + .setPartitions(new OffsetDeleteResponsePartitionCollection(Collections.singletonList( + new OffsetDeleteResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code()) + ).iterator())), + new OffsetDeleteResponseTopic() + .setName("bar") + .setPartitions(new OffsetDeleteResponsePartitionCollection(Collections.singletonList( + new OffsetDeleteResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code()) + ).iterator())) + ).collect(Collectors.toList()).iterator())) ) ); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java index dd7adc4e2a1e0..b4aea93c3f3f6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java @@ -110,10 +110,10 @@ private OffsetDeleteResponse buildGroupErrorResponse(Errors error) { .setThrottleTimeMs(0) .setTopics(new OffsetDeleteResponseTopicCollection(singletonList( new OffsetDeleteResponseTopic() - .setName("t0") + .setName(t0p0.topic()) .setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList( new OffsetDeleteResponsePartition() - .setPartitionIndex(0) + .setPartitionIndex(t0p0.partition()) .setErrorCode(error.code()) ).iterator())) ).iterator())); @@ -127,10 +127,10 @@ private OffsetDeleteResponse buildPartitionErrorResponse(Errors error) { .setThrottleTimeMs(0) .setTopics(new OffsetDeleteResponseTopicCollection(singletonList( new OffsetDeleteResponseTopic() - .setName("t0") + .setName(t0p0.topic()) .setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList( new OffsetDeleteResponsePartition() - .setPartitionIndex(0) + .setPartitionIndex(t0p0.partition()) .setErrorCode(error.code()) ).iterator())) ).iterator())) @@ -202,10 +202,9 @@ private void assertPartitionFailed( // verify the completed value is expected result Collection> completeCollection = result.completedKeys.values(); assertEquals(1, completeCollection.size()); - Map completeMap = completeCollection.iterator().next(); - assertEquals(expectedResult, completeMap); + assertEquals(expectedResult, result.completedKeys.get(key)); assertEquals(emptyList(), result.unmappedKeys); assertEquals(emptySet(), result.failedKeys.keySet()); } -} \ No newline at end of file +}