diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java index 7e8b549b323c2..f766a8726a71d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java @@ -19,7 +19,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.List; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -72,8 +72,19 @@ public static AdminApiFuture.SimpleAdminApiFuture groupIds + ) { + if (!groupIds.equals(Collections.singleton(groupId))) { + throw new IllegalArgumentException("Received unexpected group ids " + groupIds + + " (expected only " + Collections.singleton(groupId) + ")"); + } + } + @Override - public OffsetDeleteRequest.Builder buildRequest(int coordinatorId, Set keys) { + public OffsetDeleteRequest.Builder buildRequest(int coordinatorId, Set groupIds) { + validateKeys(groupIds); + final OffsetDeleteRequestTopicCollection topics = new OffsetDeleteRequestTopicCollection(); partitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).forEach((topic, topicPartitions) -> topics.add( new OffsetDeleteRequestTopic() @@ -97,54 +108,67 @@ public ApiResult> handleResponse( Set groupIds, AbstractResponse abstractResponse ) { - final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; - Map> completed = new HashMap<>(); - Map failed = new HashMap<>(); - List unmapped = new ArrayList<>(); + validateKeys(groupIds); + final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; final Errors error = Errors.forCode(response.data().errorCode()); + if (error != Errors.NONE) { - handleError(groupId, error, failed, unmapped); + final Map failed = new HashMap<>(); + final Set groupsToUnmap = new HashSet<>(); + + handleGroupError(groupId, error, failed, groupsToUnmap); + + return new ApiResult<>(Collections.emptyMap(), failed, new ArrayList<>(groupsToUnmap)); } else { - final Map partitions = new HashMap<>(); - response.data().topics().forEach(topic -> + final Map partitionResults = new HashMap<>(); + response.data().topics().forEach(topic -> topic.partitions().forEach(partition -> { Errors partitionError = Errors.forCode(partition.errorCode()); - if (!handleError(groupId, partitionError, failed, unmapped)) { - partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); - } + + partitionResults.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); }) ); - if (!partitions.isEmpty()) - completed.put(groupId, partitions); + + return new ApiResult<>( + Collections.singletonMap(groupId, partitionResults), + Collections.emptyMap(), + Collections.emptyList() + ); } - return new ApiResult<>(completed, failed, unmapped); } - private boolean handleError( + private void handleGroupError( CoordinatorKey groupId, Errors error, Map failed, - List unmapped + Set groupsToUnmap ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: case GROUP_ID_NOT_FOUND: case INVALID_GROUP_ID: - log.error("Received non retriable error for group {} in `DeleteConsumerGroupOffsets` response", groupId, - error.exception()); + case NON_EMPTY_GROUP: + log.debug("`OffsetDelete` request for group id {} failed due to error {}.", groupId.idValue, error); failed.put(groupId, error.exception()); - return true; + break; case COORDINATOR_LOAD_IN_PROGRESS: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`OffsetDelete` request for group id {} failed because the coordinator" + + " is still in the process of loading state. Will retry.", groupId.idValue); + break; case COORDINATOR_NOT_AVAILABLE: - return true; case NOT_COORDINATOR: - log.debug("DeleteConsumerGroupOffsets request for group {} returned error {}. Will retry", - groupId, error); - unmapped.add(groupId); - return true; + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`OffsetDelete` request for group id {} returned error {}. " + + "Will attempt to find the coordinator again and retry.", groupId.idValue, error); + groupsToUnmap.add(groupId); + break; default: - return false; + log.error("`OffsetDelete` request for group id {} failed due to unexpected error {}.", groupId.idValue, error); + failed.put(groupId, error.exception()); + break; } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 53e326ad8955f..e79890ff8b87e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -3290,11 +3290,11 @@ public void testDeleteConsumerGroupOffsetsNumRetries() throws Exception { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - env.kafkaClient().prepareResponse(prepareOffsetDeleteResponse("foo", 0, Errors.NOT_COORDINATOR)); + env.kafkaClient().prepareResponse(prepareOffsetDeleteResponse(Errors.NOT_COORDINATOR)); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); final DeleteConsumerGroupOffsetsResult result = env.adminClient() - .deleteConsumerGroupOffsets("groupId", Stream.of(tp1).collect(Collectors.toSet())); + .deleteConsumerGroupOffsets(GROUP_ID, Stream.of(tp1).collect(Collectors.toSet())); TestUtils.assertFutureError(result.all(), TimeoutException.class); } @@ -3322,7 +3322,8 @@ public void testDeleteConsumerGroupOffsetsRetryBackoff() throws Exception { mockClient.prepareResponse(body -> { firstAttemptTime.set(time.milliseconds()); return true; - }, prepareOffsetDeleteResponse("foo", 0, Errors.NOT_COORDINATOR)); + }, prepareOffsetDeleteResponse(Errors.NOT_COORDINATOR)); + mockClient.prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); @@ -3401,9 +3402,6 @@ public void testDeleteConsumerGroupOffsetsRetriableErrors() throws Exception { env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - env.kafkaClient().prepareResponse( - prepareOffsetDeleteResponse(Errors.COORDINATOR_NOT_AVAILABLE)); - env.kafkaClient().prepareResponse( prepareOffsetDeleteResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS)); @@ -3411,6 +3409,8 @@ public void testDeleteConsumerGroupOffsetsRetriableErrors() throws Exception { * We need to return two responses here, one for NOT_COORDINATOR call when calling delete a consumer group * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response */ env.kafkaClient().prepareResponse( prepareOffsetDeleteResponse(Errors.NOT_COORDINATOR)); @@ -3418,6 +3418,12 @@ public void testDeleteConsumerGroupOffsetsRetriableErrors() throws Exception { env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse( + prepareOffsetDeleteResponse(Errors.COORDINATOR_NOT_AVAILABLE)); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse( prepareOffsetDeleteResponse("foo", 0, Errors.NONE)); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java index 439b37733d98c..b4aea93c3f3f6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java @@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Map; @@ -33,6 +34,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.message.OffsetDeleteResponseData; import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartition; @@ -67,48 +69,88 @@ public void testBuildRequest() { @Test public void testSuccessfulHandleResponse() { Map responseData = Collections.singletonMap(t0p0, Errors.NONE); - assertCompleted(handleWithError(Errors.NONE), responseData); + assertCompleted(handleWithGroupError(Errors.NONE), responseData); } @Test public void testUnmappedHandleResponse() { - assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); + assertUnmapped(handleWithGroupError(Errors.NOT_COORDINATOR)); + assertUnmapped(handleWithGroupError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test public void testRetriableHandleResponse() { - assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); + assertRetriable(handleWithGroupError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); } @Test - public void testFailedHandleResponse() { - assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED)); - assertFailed(GroupIdNotFoundException.class, handleWithError(Errors.GROUP_ID_NOT_FOUND)); - assertFailed(InvalidGroupIdException.class, handleWithError(Errors.INVALID_GROUP_ID)); + public void testFailedHandleResponseWithGroupError() { + assertGroupFailed(GroupAuthorizationException.class, handleWithGroupError(Errors.GROUP_AUTHORIZATION_FAILED)); + assertGroupFailed(GroupIdNotFoundException.class, handleWithGroupError(Errors.GROUP_ID_NOT_FOUND)); + assertGroupFailed(InvalidGroupIdException.class, handleWithGroupError(Errors.INVALID_GROUP_ID)); + assertGroupFailed(GroupNotEmptyException.class, handleWithGroupError(Errors.NON_EMPTY_GROUP)); } - private OffsetDeleteResponse buildResponse(Errors error) { + @Test + public void testFailedHandleResponseWithPartitionError() { + assertPartitionFailed(Collections.singletonMap(t0p0, Errors.GROUP_SUBSCRIBED_TO_TOPIC), + handleWithPartitionError(Errors.GROUP_SUBSCRIBED_TO_TOPIC)); + assertPartitionFailed(Collections.singletonMap(t0p0, Errors.TOPIC_AUTHORIZATION_FAILED), + handleWithPartitionError(Errors.TOPIC_AUTHORIZATION_FAILED)); + assertPartitionFailed(Collections.singletonMap(t0p0, Errors.UNKNOWN_TOPIC_OR_PARTITION), + handleWithPartitionError(Errors.UNKNOWN_TOPIC_OR_PARTITION)); + } + + private OffsetDeleteResponse buildGroupErrorResponse(Errors error) { + OffsetDeleteResponse response = new OffsetDeleteResponse( + new OffsetDeleteResponseData() + .setErrorCode(error.code())); + if (error == Errors.NONE) { + response.data() + .setThrottleTimeMs(0) + .setTopics(new OffsetDeleteResponseTopicCollection(singletonList( + new OffsetDeleteResponseTopic() + .setName(t0p0.topic()) + .setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList( + new OffsetDeleteResponsePartition() + .setPartitionIndex(t0p0.partition()) + .setErrorCode(error.code()) + ).iterator())) + ).iterator())); + } + return response; + } + + private OffsetDeleteResponse buildPartitionErrorResponse(Errors error) { OffsetDeleteResponse response = new OffsetDeleteResponse( - new OffsetDeleteResponseData() - .setThrottleTimeMs(0) - .setTopics(new OffsetDeleteResponseTopicCollection(singletonList( - new OffsetDeleteResponseTopic() - .setName("t0") - .setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList( - new OffsetDeleteResponsePartition() - .setPartitionIndex(0) - .setErrorCode(error.code()) - ).iterator())) - ).iterator()))); + new OffsetDeleteResponseData() + .setThrottleTimeMs(0) + .setTopics(new OffsetDeleteResponseTopicCollection(singletonList( + new OffsetDeleteResponseTopic() + .setName(t0p0.topic()) + .setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList( + new OffsetDeleteResponsePartition() + .setPartitionIndex(t0p0.partition()) + .setErrorCode(error.code()) + ).iterator())) + ).iterator())) + ); return response; } - private AdminApiHandler.ApiResult> handleWithError( + private AdminApiHandler.ApiResult> handleWithGroupError( Errors error ) { DeleteConsumerGroupOffsetsHandler handler = new DeleteConsumerGroupOffsetsHandler(groupId, tps, logContext); - OffsetDeleteResponse response = buildResponse(error); + OffsetDeleteResponse response = buildGroupErrorResponse(error); + return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response); + } + + private AdminApiHandler.ApiResult> handleWithPartitionError( + Errors error + ) { + DeleteConsumerGroupOffsetsHandler handler = new DeleteConsumerGroupOffsetsHandler(groupId, tps, logContext); + OffsetDeleteResponse response = buildPartitionErrorResponse(error); return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response); } @@ -139,7 +181,7 @@ private void assertCompleted( assertEquals(expected, result.completedKeys.get(key)); } - private void assertFailed( + private void assertGroupFailed( Class expectedExceptionType, AdminApiHandler.ApiResult> result ) { @@ -149,4 +191,20 @@ private void assertFailed( assertEquals(singleton(key), result.failedKeys.keySet()); assertTrue(expectedExceptionType.isInstance(result.failedKeys.get(key))); } + + private void assertPartitionFailed( + Map expectedResult, + AdminApiHandler.ApiResult> result + ) { + CoordinatorKey key = CoordinatorKey.byGroupId(groupId); + assertEquals(singleton(key), result.completedKeys.keySet()); + + // verify the completed value is expected result + Collection> completeCollection = result.completedKeys.values(); + assertEquals(1, completeCollection.size()); + assertEquals(expectedResult, result.completedKeys.get(key)); + + assertEquals(emptyList(), result.unmappedKeys); + assertEquals(emptySet(), result.failedKeys.keySet()); + } }