diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsResult.java index 38ee14a15e60a..c4cb2e998897c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsResult.java @@ -21,8 +21,6 @@ import java.util.stream.Collectors; import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.KafkaFuture.BaseFunction; -import org.apache.kafka.common.KafkaFuture.BiConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.internals.KafkaFutureImpl; @@ -48,23 +46,19 @@ public class AlterConsumerGroupOffsetsResult { public KafkaFuture partitionResult(final TopicPartition partition) { final KafkaFutureImpl result = new KafkaFutureImpl<>(); - this.future.whenComplete(new BiConsumer, Throwable>() { - @Override - public void accept(final Map topicPartitions, final Throwable throwable) { - if (throwable != null) { - result.completeExceptionally(throwable); - } else if (!topicPartitions.containsKey(partition)) { - result.completeExceptionally(new IllegalArgumentException( - "Alter offset for partition \"" + partition + "\" was not attempted")); + this.future.whenComplete((topicPartitions, throwable) -> { + if (throwable != null) { + result.completeExceptionally(throwable); + } else if (!topicPartitions.containsKey(partition)) { + result.completeExceptionally(new IllegalArgumentException( + "Alter offset for partition \"" + partition + "\" was not attempted")); + } else { + final Errors error = topicPartitions.get(partition); + if (error == Errors.NONE) { + result.complete(null); } else { - final Errors error = topicPartitions.get(partition); - if (error == Errors.NONE) { - result.complete(null); - } else { - result.completeExceptionally(error.exception()); - } + result.completeExceptionally(error.exception()); } - } }); @@ -75,22 +69,19 @@ public void accept(final Map topicPartitions, final Thro * Return a future which succeeds if all the alter offsets succeed. */ public KafkaFuture all() { - return this.future.thenApply(new BaseFunction, Void>() { - @Override - public Void apply(final Map topicPartitionErrorsMap) { - List partitionsFailed = topicPartitionErrorsMap.entrySet() - .stream() - .filter(e -> e.getValue() != Errors.NONE) - .map(Map.Entry::getKey) - .collect(Collectors.toList()); - for (Errors error : topicPartitionErrorsMap.values()) { - if (error != Errors.NONE) { - throw error.exception( - "Failed altering consumer group offsets for the following partitions: " + partitionsFailed); - } + return this.future.thenApply(topicPartitionErrorsMap -> { + List partitionsFailed = topicPartitionErrorsMap.entrySet() + .stream() + .filter(e -> e.getValue() != Errors.NONE) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + for (Errors error : topicPartitionErrorsMap.values()) { + if (error != Errors.NONE) { + throw error.exception( + "Failed altering consumer group offsets for the following partitions: " + partitionsFailed); } - return null; } + return null; }); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 10371351016e8..981598f8cf1d7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -3642,9 +3642,11 @@ public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(Strin } @Override - public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, - Map offsets, - AlterConsumerGroupOffsetsOptions options) { + public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets( + String groupId, + Map offsets, + AlterConsumerGroupOffsetsOptions options + ) { SimpleAdminApiFuture> future = AlterConsumerGroupOffsetsHandler.newFuture(groupId); AlterConsumerGroupOffsetsHandler handler = new AlterConsumerGroupOffsetsHandler(groupId, offsets, logContext); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java index cd99b54c72a80..7ac90b63fb96c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java @@ -16,10 +16,12 @@ */ package org.apache.kafka.clients.admin.internals; +import static java.util.Collections.singleton; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.List; +import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -73,29 +75,38 @@ public static AdminApiFuture.SimpleAdminApiFuture keys) { - List topics = new ArrayList<>(); - Map> offsetData = new HashMap<>(); - for (Map.Entry entry : offsets.entrySet()) { - String topic = entry.getKey().topic(); - OffsetAndMetadata oam = entry.getValue(); - OffsetCommitRequestPartition partition = new OffsetCommitRequestPartition() - .setCommittedOffset(oam.offset()) - .setCommittedLeaderEpoch(oam.leaderEpoch().orElse(-1)) - .setCommittedMetadata(oam.metadata()) - .setPartitionIndex(entry.getKey().partition()); - offsetData.computeIfAbsent(topic, key -> new ArrayList<>()).add(partition); - } - for (Map.Entry> entry : offsetData.entrySet()) { - OffsetCommitRequestTopic topic = new OffsetCommitRequestTopic() - .setName(entry.getKey()) - .setPartitions(entry.getValue()); - topics.add(topic); + private void validateKeys(Set groupIds) { + if (!groupIds.equals(singleton(groupId))) { + throw new IllegalArgumentException("Received unexpected group ids " + groupIds + + " (expected only " + singleton(groupId) + ")"); } + } + + @Override + public OffsetCommitRequest.Builder buildRequest( + int coordinatorId, + Set groupIds + ) { + validateKeys(groupIds); + + Map offsetData = new HashMap<>(); + offsets.forEach((topicPartition, offsetAndMetadata) -> { + OffsetCommitRequestTopic topic = offsetData.computeIfAbsent( + topicPartition.topic(), + key -> new OffsetCommitRequestTopic().setName(topicPartition.topic()) + ); + + topic.partitions().add(new OffsetCommitRequestPartition() + .setCommittedOffset(offsetAndMetadata.offset()) + .setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch().orElse(-1)) + .setCommittedMetadata(offsetAndMetadata.metadata()) + .setPartitionIndex(topicPartition.partition())); + }); + OffsetCommitRequestData data = new OffsetCommitRequestData() .setGroupId(groupId.idValue) - .setTopics(topics); + .setTopics(new ArrayList<>(offsetData.values())); + return new OffsetCommitRequest.Builder(data); } @@ -105,53 +116,88 @@ public ApiResult> handleResponse( Set groupIds, AbstractResponse abstractResponse ) { + validateKeys(groupIds); + final OffsetCommitResponse response = (OffsetCommitResponse) abstractResponse; - Map> completed = new HashMap<>(); - Map failed = new HashMap<>(); - List unmapped = new ArrayList<>(); + final Set groupsToUnmap = new HashSet<>(); + final Set groupsToRetry = new HashSet<>(); + final Map partitionResults = new HashMap<>(); - Map partitions = new HashMap<>(); for (OffsetCommitResponseTopic topic : response.data().topics()) { for (OffsetCommitResponsePartition partition : topic.partitions()) { - TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); + TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex()); Errors error = Errors.forCode(partition.errorCode()); + if (error != Errors.NONE) { - handleError(groupId, error, failed, unmapped); + handleError( + groupId, + topicPartition, + error, + partitionResults, + groupsToUnmap, + groupsToRetry + ); } else { - partitions.put(tp, error); + partitionResults.put(topicPartition, error); } } } - if (failed.isEmpty() && unmapped.isEmpty()) - completed.put(groupId, partitions); - return new ApiResult<>(completed, failed, unmapped); + if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { + return ApiResult.completed(groupId, partitionResults); + } else { + return ApiResult.unmapped(new ArrayList<>(groupsToUnmap)); + } } private void handleError( CoordinatorKey groupId, + TopicPartition topicPartition, Errors error, - Map failed, - List unmapped + Map partitionResults, + Set groupsToUnmap, + Set groupsToRetry ) { switch (error) { - case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `OffsetCommit` response", groupId, - error.exception()); - failed.put(groupId, error.exception()); - break; + // If the coordinator is in the middle of loading, then we just need to retry. case COORDINATOR_LOAD_IN_PROGRESS: + log.debug("OffsetCommit request for group id {} failed because the coordinator" + + " is still in the process of loading state. Will retry.", groupId.idValue); + groupsToRetry.add(groupId); + break; + + // If the coordinator is not available, then we unmap and retry. case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: - log.debug("OffsetCommit request for group {} returned error {}. Will retry", groupId, error); - unmapped.add(groupId); + log.debug("OffsetCommit request for group id {} returned error {}. Will retry.", + groupId.idValue, error); + groupsToUnmap.add(groupId); + break; + + // Group level errors. + case INVALID_GROUP_ID: + case REBALANCE_IN_PROGRESS: + case INVALID_COMMIT_OFFSET_SIZE: + case GROUP_AUTHORIZATION_FAILED: + log.debug("OffsetCommit request for group id {} failed due to error {}.", + groupId.idValue, error); + partitionResults.put(topicPartition, error); + break; + + // TopicPartition level errors. + case UNKNOWN_TOPIC_OR_PARTITION: + case OFFSET_METADATA_TOO_LARGE: + case TOPIC_AUTHORIZATION_FAILED: + log.debug("OffsetCommit request for group id {} and partition {} failed due" + + " to error {}.", groupId.idValue, topicPartition, error); + partitionResults.put(topicPartition, error); break; + + // Unexpected errors. default: - log.error("Received unexpected error for group {} in `OffsetCommit` response", - groupId, error.exception()); - failed.put(groupId, error.exception( - "Received unexpected error for group " + groupId + " in `OffsetCommit` response")); + log.error("OffsetCommit request for group id {} and partition {} failed due" + + " to unexpected error {}.", groupId.idValue, topicPartition, error); + partitionResults.put(topicPartition, error); } } - } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 53e326ad8955f..98d2e53f4087c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -2543,6 +2543,39 @@ public void testOffsetCommitNumRetries() throws Exception { } } + @Test + public void testOffsetCommitWithMultipleErrors() throws Exception { + final Cluster cluster = mockCluster(3, 0); + final Time time = new MockTime(); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, + AdminClientConfig.RETRIES_CONFIG, "0")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + final TopicPartition foo0 = new TopicPartition("foo", 0); + final TopicPartition foo1 = new TopicPartition("foo", 1); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + Map responseData = new HashMap<>(); + responseData.put(foo0, Errors.NONE); + responseData.put(foo1, Errors.UNKNOWN_TOPIC_OR_PARTITION); + env.kafkaClient().prepareResponse(new OffsetCommitResponse(0, responseData)); + + Map offsets = new HashMap<>(); + offsets.put(foo0, new OffsetAndMetadata(123L)); + offsets.put(foo1, new OffsetAndMetadata(456L)); + final AlterConsumerGroupOffsetsResult result = env.adminClient() + .alterConsumerGroupOffsets(GROUP_ID, offsets); + + assertNull(result.partitionResult(foo0).get()); + TestUtils.assertFutureError(result.partitionResult(foo1), UnknownTopicOrPartitionException.class); + + TestUtils.assertFutureError(result.all(), UnknownTopicOrPartitionException.class); + } + } + @Test public void testOffsetCommitRetryBackoff() throws Exception { MockTime time = new MockTime(); @@ -4109,9 +4142,6 @@ public void testAlterConsumerGroupOffsetsRetriableErrors() throws Exception { env.kafkaClient().prepareResponse( prepareOffsetCommitResponse(tp1, Errors.COORDINATOR_LOAD_IN_PROGRESS)); - env.kafkaClient().prepareResponse( - prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - env.kafkaClient().prepareResponse( prepareOffsetCommitResponse(tp1, Errors.NOT_COORDINATOR)); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java index c20107f67aa74..8988c0f6c82d8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java @@ -16,8 +16,8 @@ */ package org.apache.kafka.clients.admin.internals; +import static java.util.Collections.emptyMap; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static java.util.Collections.emptyList; @@ -31,8 +31,6 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.GroupAuthorizationException; -import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.OffsetCommitResponse; @@ -72,7 +70,7 @@ public void testBuildRequest() { } @Test - public void testSuccessfulHandleResponse() { + public void testHandleSuccessfulResponse() { AlterConsumerGroupOffsetsHandler handler = new AlterConsumerGroupOffsetsHandler(groupId, partitions, logContext); Map responseData = Collections.singletonMap(t0p0, Errors.NONE); OffsetCommitResponse response = new OffsetCommitResponse(0, responseData); @@ -81,30 +79,98 @@ public void testSuccessfulHandleResponse() { } @Test - public void testRetriableHandleResponse() { - assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); - assertUnmapped(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); - assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); + public void testHandleRetriableResponse() { + assertUnmappedKey(partitionErrors(Errors.NOT_COORDINATOR)); + assertUnmappedKey(partitionErrors(Errors.COORDINATOR_NOT_AVAILABLE)); + assertRetriableError(partitionErrors(Errors.COORDINATOR_LOAD_IN_PROGRESS)); } @Test - public void testFailedHandleResponse() { - assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED)); - assertFailed(UnknownServerException.class, handleWithError(Errors.UNKNOWN_SERVER_ERROR)); + public void testHandleErrorResponse() { + assertFatalError(partitionErrors(Errors.TOPIC_AUTHORIZATION_FAILED)); + assertFatalError(partitionErrors(Errors.GROUP_AUTHORIZATION_FAILED)); + assertFatalError(partitionErrors(Errors.INVALID_GROUP_ID)); + assertFatalError(partitionErrors(Errors.UNKNOWN_TOPIC_OR_PARTITION)); + assertFatalError(partitionErrors(Errors.OFFSET_METADATA_TOO_LARGE)); + assertFatalError(partitionErrors(Errors.ILLEGAL_GENERATION)); + assertFatalError(partitionErrors(Errors.UNKNOWN_MEMBER_ID)); + assertFatalError(partitionErrors(Errors.REBALANCE_IN_PROGRESS)); + assertFatalError(partitionErrors(Errors.INVALID_COMMIT_OFFSET_SIZE)); + assertFatalError(partitionErrors(Errors.UNKNOWN_SERVER_ERROR)); } - private AdminApiHandler.ApiResult> handleWithError( + @Test + public void testHandleMultipleErrorsResponse() { + Map partitionErrors = new HashMap<>(); + partitionErrors.put(t0p0, Errors.UNKNOWN_TOPIC_OR_PARTITION); + partitionErrors.put(t0p1, Errors.INVALID_COMMIT_OFFSET_SIZE); + partitionErrors.put(t1p0, Errors.TOPIC_AUTHORIZATION_FAILED); + partitionErrors.put(t1p1, Errors.OFFSET_METADATA_TOO_LARGE); + assertFatalError(partitionErrors); + } + + private AdminApiHandler.ApiResult> handleResponse( + CoordinatorKey groupKey, + Map partitions, + Map partitionResults + ) { + AlterConsumerGroupOffsetsHandler handler = + new AlterConsumerGroupOffsetsHandler(groupKey.idValue, partitions, logContext); + OffsetCommitResponse response = new OffsetCommitResponse(0, partitionResults); + return handler.handleResponse(node, singleton(groupKey), response); + } + + private Map partitionErrors( Errors error ) { - AlterConsumerGroupOffsetsHandler handler = new AlterConsumerGroupOffsetsHandler(groupId, partitions, logContext); - Map responseData = Collections.singletonMap(t0p0, error); - OffsetCommitResponse response = new OffsetCommitResponse(0, responseData); - return handler.handleResponse(node, singleton(CoordinatorKey.byGroupId(groupId)), response); + Map partitionErrors = new HashMap<>(); + partitions.keySet().forEach(partition -> + partitionErrors.put(partition, error) + ); + return partitionErrors; + } + + private void assertFatalError( + Map partitionResults + ) { + CoordinatorKey groupKey = CoordinatorKey.byGroupId(groupId); + AdminApiHandler.ApiResult> result = handleResponse( + groupKey, + partitions, + partitionResults + ); + + assertEquals(singleton(groupKey), result.completedKeys.keySet()); + assertEquals(partitionResults, result.completedKeys.get(groupKey)); + assertEquals(emptyList(), result.unmappedKeys); + assertEquals(emptyMap(), result.failedKeys); } - private void assertUnmapped( - AdminApiHandler.ApiResult> result + private void assertRetriableError( + Map partitionResults ) { + CoordinatorKey groupKey = CoordinatorKey.byGroupId(groupId); + AdminApiHandler.ApiResult> result = handleResponse( + groupKey, + partitions, + partitionResults + ); + + assertEquals(emptySet(), result.completedKeys.keySet()); + assertEquals(emptyList(), result.unmappedKeys); + assertEquals(emptyMap(), result.failedKeys); + } + + private void assertUnmappedKey( + Map partitionResults + ) { + CoordinatorKey groupKey = CoordinatorKey.byGroupId(groupId); + AdminApiHandler.ApiResult> result = handleResponse( + groupKey, + partitions, + partitionResults + ); + assertEquals(emptySet(), result.completedKeys.keySet()); assertEquals(emptySet(), result.failedKeys.keySet()); assertEquals(singletonList(CoordinatorKey.byGroupId(groupId)), result.unmappedKeys); @@ -120,15 +186,4 @@ private void assertCompleted( assertEquals(singleton(key), result.completedKeys.keySet()); assertEquals(expected, result.completedKeys.get(key)); } - - private void assertFailed( - Class expectedExceptionType, - AdminApiHandler.ApiResult> result - ) { - CoordinatorKey key = CoordinatorKey.byGroupId(groupId); - assertEquals(emptySet(), result.completedKeys.keySet()); - assertEquals(emptyList(), result.unmappedKeys); - assertEquals(singleton(key), result.failedKeys.keySet()); - assertTrue(expectedExceptionType.isInstance(result.failedKeys.get(key))); - } }