From bcbc72e29ba824f47918f542ccfc2e47924bd2d9 Mon Sep 17 00:00:00 2001 From: Sanskar Jhajharia Date: Mon, 20 Jan 2025 13:17:14 +0530 Subject: [PATCH] [KAFKA-16720] AdminClient Support for ListShareGroupOffsets (1/n) (#18571) Reviewers: Apoorv Mittal , Andrew Schofield --- checkstyle/suppressions.xml | 4 +- .../org/apache/kafka/clients/admin/Admin.java | 22 + .../kafka/clients/admin/ForwardingAdmin.java | 5 + .../kafka/clients/admin/KafkaAdminClient.java | 8 + .../admin/ListShareGroupOffsetsOptions.java | 31 + .../admin/ListShareGroupOffsetsResult.java | 77 ++ .../admin/ListShareGroupOffsetsSpec.java | 76 ++ .../ReadShareGroupStateSummaryRequest.java | 22 +- .../ReadShareGroupStateSummaryResponse.java | 65 +- .../kafka/clients/admin/MockAdminClient.java | 5 + .../main/scala/kafka/server/KafkaApis.scala | 19 +- .../unit/kafka/server/KafkaApisTest.scala | 125 +++ .../persister/DefaultStatePersister.java | 111 ++- .../persister/NoOpShareStatePersister.java | 4 +- .../server/share/persister/PartitionData.java | 2 +- .../share/persister/PartitionFactory.java | 4 + .../persister/PartitionStateSummaryData.java | 32 + .../persister/PersisterStateManager.java | 183 ++++- .../ReadShareGroupStateSummaryResult.java | 14 +- .../persister/DefaultStatePersisterTest.java | 335 +++++++- .../persister/PersisterStateManagerTest.java | 773 ++++++++++++++++++ .../coordinator/share/ShareCoordinator.java | 10 + .../share/ShareCoordinatorService.java | 130 +++ .../share/ShareCoordinatorShard.java | 100 ++- .../share/ShareCoordinatorServiceTest.java | 243 +++++- .../share/ShareCoordinatorShardTest.java | 60 ++ ...TestingMetricsInterceptingAdminClient.java | 8 + 27 files changed, 2422 insertions(+), 46 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsOptions.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java create mode 100644 server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 51d2b3c186a54..ebff3f9bc8e8a 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -340,9 +340,11 @@ - + + diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index 3cb6283f1f90b..6c743dec048a8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -1793,6 +1793,28 @@ default DescribeShareGroupsResult describeShareGroups(Collection groupId return describeShareGroups(groupIds, new DescribeShareGroupsOptions()); } + /** + * List the share group offsets available in the cluster for the specified share groups. + * + * @param groupSpecs Map of share group ids to a spec that specifies the topic partitions of the group to list offsets for. + * @param options The options to use when listing the share group offsets. + * @return The ListShareGroupOffsetsResult + */ + ListShareGroupOffsetsResult listShareGroupOffsets(Map groupSpecs, ListShareGroupOffsetsOptions options); + + /** + * List the share group offsets available in the cluster for the specified share groups with the default options. + * + *

This is a convenience method for {@link #listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} + * to list offsets of all partitions for the specified share groups with default options. + * + * @param groupSpecs Map of share group ids to a spec that specifies the topic partitions of the group to list offsets for. + * @return The ListShareGroupOffsetsResult + */ + default ListShareGroupOffsetsResult listShareGroupOffsets(Map groupSpecs) { + return listShareGroupOffsets(groupSpecs, new ListShareGroupOffsetsOptions()); + } + /** * Describe some classic groups in the cluster. * diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java index 88a693934e1c8..45ed560dffc5f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java @@ -298,6 +298,11 @@ public DescribeShareGroupsResult describeShareGroups(Collection groupIds return delegate.describeShareGroups(groupIds, options); } + @Override + public ListShareGroupOffsetsResult listShareGroupOffsets(Map groupSpecs, ListShareGroupOffsetsOptions options) { + return delegate.listShareGroupOffsets(groupSpecs, options); + } + @Override public ListGroupsResult listGroups(ListGroupsOptions options) { return delegate.listGroups(options); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index b7e482720b00e..3f88cd5aa8070 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -3796,6 +3796,14 @@ public DescribeShareGroupsResult describeShareGroups(final Collection gr .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue))); } + // To do in a follow-up PR + @Override + public ListShareGroupOffsetsResult listShareGroupOffsets(final Map groupSpecs, + final ListShareGroupOffsetsOptions options) { + // To-do + throw new InvalidRequestException("The method is not yet implemented"); + } + @Override public DescribeClassicGroupsResult describeClassicGroups(final Collection groupIds, final DescribeClassicGroupsOptions options) { diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsOptions.java new file mode 100644 index 0000000000000..bd740c24670db --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsOptions.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; + +/** + * Options for {@link Admin#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}. + *

+ * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class ListShareGroupOffsetsOptions extends AbstractOptions { +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java new file mode 100644 index 0000000000000..8e28ec015370a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.clients.admin.internals.CoordinatorKey; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * The result of the {@link Admin#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} call. + *

+ * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class ListShareGroupOffsetsResult { + + private final Map>> futures; + + ListShareGroupOffsetsResult(final Map>> futures) { + this.futures = futures.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().idValue, Map.Entry::getValue)); + } + + /** + * Return the future when the requests for all groups succeed. + * + * @return - Future which yields all Map objects, if requests for all the groups succeed. + */ + public KafkaFuture>> all() { + return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply( + nil -> { + Map> offsets = new HashMap<>(futures.size()); + futures.forEach((groupId, future) -> { + try { + offsets.put(groupId, future.get()); + } catch (InterruptedException | ExecutionException e) { + // This should be unreachable, since the KafkaFuture#allOf already ensured + // that all the futures completed successfully. + throw new RuntimeException(e); + } + }); + return offsets; + }); + } + + /** + * @param groupId - The groupId for which the Map is needed + * @return - Future which yields a map of topic partitions to offsets for the specified group. + */ + public KafkaFuture> partitionsToOffset(String groupId) { + if (!futures.containsKey(groupId)) { + throw new IllegalArgumentException("Group ID not found: " + groupId); + } + return futures.get(groupId); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java new file mode 100644 index 0000000000000..050781ad5569f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Specification of share group offsets to list using {@link Admin#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}. + *

+ * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class ListShareGroupOffsetsSpec { + + private Collection topicPartitions; + + /** + * Set the topic partitions whose offsets are to be listed for a share group. + */ + public ListShareGroupOffsetsSpec topicPartitions(Collection topicPartitions) { + this.topicPartitions = topicPartitions; + return this; + } + + /** + * Returns the topic partitions whose offsets are to be listed for a share group. + */ + public Collection topicPartitions() { + return topicPartitions == null ? List.of() : topicPartitions; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ListShareGroupOffsetsSpec)) { + return false; + } + ListShareGroupOffsetsSpec that = (ListShareGroupOffsetsSpec) o; + return Objects.equals(topicPartitions, that.topicPartitions); + } + + @Override + public int hashCode() { + return Objects.hash(topicPartitions); + } + + @Override + public String toString() { + return "ListShareGroupOffsetsSpec(" + + "topicPartitions=" + (topicPartitions != null ? topicPartitions : "null") + + ')'; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryRequest.java index 27daa78967d35..dcec662e6a071 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryRequest.java @@ -64,16 +64,16 @@ public ReadShareGroupStateSummaryRequest(ReadShareGroupStateSummaryRequestData d public ReadShareGroupStateSummaryResponse getErrorResponse(int throttleTimeMs, Throwable e) { List results = new ArrayList<>(); data.topics().forEach( - topicResult -> results.add(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() - .setTopicId(topicResult.topicId()) - .setPartitions(topicResult.partitions().stream() - .map(partitionData -> new ReadShareGroupStateSummaryResponseData.PartitionResult() - .setPartition(partitionData.partition()) - .setErrorCode(Errors.forException(e).code()) - .setErrorMessage(Errors.forException(e).message())) - .collect(Collectors.toList())))); + topicResult -> results.add(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicResult.topicId()) + .setPartitions(topicResult.partitions().stream() + .map(partitionData -> new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partitionData.partition()) + .setErrorCode(Errors.forException(e).code()) + .setErrorMessage(Errors.forException(e).message())) + .collect(Collectors.toList())))); return new ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData() - .setResults(results)); + .setResults(results)); } @Override @@ -83,8 +83,8 @@ public ReadShareGroupStateSummaryRequestData data() { public static ReadShareGroupStateSummaryRequest parse(ByteBuffer buffer, short version) { return new ReadShareGroupStateSummaryRequest( - new ReadShareGroupStateSummaryRequestData(new ByteBufferAccessor(buffer), version), - version + new ReadShareGroupStateSummaryRequestData(new ByteBufferAccessor(buffer), version), + version ); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java index 77c1dac65a1b8..0374e7759433c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.requests; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; @@ -24,6 +25,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.List; import java.util.Map; public class ReadShareGroupStateSummaryResponse extends AbstractResponse { @@ -43,9 +45,9 @@ public ReadShareGroupStateSummaryResponseData data() { public Map errorCounts() { Map counts = new HashMap<>(); data.results().forEach( - result -> result.partitions().forEach( - partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode())) - ) + result -> result.partitions().forEach( + partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode())) + ) ); return counts; } @@ -59,9 +61,64 @@ public int throttleTimeMs() { public void maybeSetThrottleTimeMs(int throttleTimeMs) { // No op } + public static ReadShareGroupStateSummaryResponse parse(ByteBuffer buffer, short version) { return new ReadShareGroupStateSummaryResponse( - new ReadShareGroupStateSummaryResponseData(new ByteBufferAccessor(buffer), version) + new ReadShareGroupStateSummaryResponseData(new ByteBufferAccessor(buffer), version) ); } + + public static ReadShareGroupStateSummaryResponseData toErrorResponseData( + Uuid topicId, + int partitionId, + Errors error, + String errorMessage + ) { + return new ReadShareGroupStateSummaryResponseData().setResults( + List.of(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId) + .setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partitionId) + .setErrorCode(error.code()) + .setErrorMessage(errorMessage))))); + } + + public static ReadShareGroupStateSummaryResponseData.PartitionResult toErrorResponsePartitionResult( + int partitionId, + Errors error, + String errorMessage + ) { + return new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partitionId) + .setErrorCode(error.code()) + .setErrorMessage(errorMessage); + } + + public static ReadShareGroupStateSummaryResponseData toResponseData( + Uuid topicId, + int partition, + long startOffset, + int stateEpoch + ) { + return new ReadShareGroupStateSummaryResponseData() + .setResults(List.of( + new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId) + .setPartitions(List.of( + new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partition) + .setStartOffset(startOffset) + .setStateEpoch(stateEpoch) + )) + )); + } + + public static ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult toResponseReadStateSummaryResult( + Uuid topicId, + List partitionResults + ) { + return new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId) + .setPartitions(partitionResults); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 3be5dc7b3e8d6..ed2d7c61f08f9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -1389,6 +1389,11 @@ public synchronized DescribeShareGroupsResult describeShareGroups(Collection groupSpecs, ListShareGroupOffsetsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + @Override public synchronized DescribeClassicGroupsResult describeClassicGroups(Collection groupIds, DescribeClassicGroupsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index a606c6b22a09c..3f5d7bb8a7971 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3165,9 +3165,22 @@ class KafkaApis(val requestChannel: RequestChannel, def handleReadShareGroupStateSummaryRequest(request: RequestChannel.Request): Unit = { val readShareGroupStateSummaryRequest = request.body[ReadShareGroupStateSummaryRequest] - // TODO: Implement the ReadShareGroupStateSummaryRequest handling - requestHelper.sendMaybeThrottle(request, readShareGroupStateSummaryRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) + authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) + + shareCoordinator match { + case None => requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => + readShareGroupStateSummaryRequest.getErrorResponse(requestThrottleMs, + new ApiException("Share coordinator is not enabled."))) + CompletableFuture.completedFuture[Unit](()) + case Some(coordinator) => coordinator.readStateSummary(request.context, readShareGroupStateSummaryRequest.data) + .handle[Unit] { (response, exception) => + if (exception != null) { + requestHelper.sendMaybeThrottle(request, readShareGroupStateSummaryRequest.getErrorResponse(exception)) + } else { + requestHelper.sendMaybeThrottle(request, new ReadShareGroupStateSummaryResponse(response)) + } + } + } } def handleDescribeShareGroupOffsetsRequest(request: RequestChannel.Request): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 3d23b920a54db..bfa19917a6f60 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -10307,6 +10307,102 @@ class KafkaApisTest extends Logging { }) } + @Test + def testReadShareGroupStateSummarySuccess(): Unit = { + val topicId = Uuid.randomUuid(); + val readSummaryRequestData = new ReadShareGroupStateSummaryRequestData() + .setGroupId("group1") + .setTopics(List( + new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(topicId) + .setPartitions(List( + new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartition(1) + .setLeaderEpoch(1) + ).asJava) + ).asJava) + + val readStateSummaryResultData: util.List[ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult] = List( + new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId) + .setPartitions(List( + new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(1) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage(null) + .setStateEpoch(1) + .setStartOffset(10) + ).asJava) + ).asJava + + val config = Map( + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true", + ) + + val response = getReadShareGroupSummaryResponse( + readSummaryRequestData, + config ++ ShareCoordinatorTestConfig.testConfigMap().asScala, + verifyNoErr = true, + null, + readStateSummaryResultData + ) + + assertNotNull(response.data) + assertEquals(1, response.data.results.size) + } + + @Test + def testReadShareGroupStateSummaryAuthorizationFailed(): Unit = { + val topicId = Uuid.randomUuid(); + val readSummaryRequestData = new ReadShareGroupStateSummaryRequestData() + .setGroupId("group1") + .setTopics(List( + new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(topicId) + .setPartitions(List( + new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartition(1) + .setLeaderEpoch(1) + ).asJava) + ).asJava) + + val readStateSummaryResultData: util.List[ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult] = List( + new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId) + .setPartitions(List( + new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(1) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage(null) + .setStateEpoch(1) + .setStartOffset(10) + ).asJava) + ).asJava + + val authorizer: Authorizer = mock(classOf[Authorizer]) + when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) + .thenReturn(Seq(AuthorizationResult.DENIED).asJava, Seq(AuthorizationResult.ALLOWED).asJava) + + val config = Map( + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true", + ) + + val response = getReadShareGroupSummaryResponse( + readSummaryRequestData, + config ++ ShareCoordinatorTestConfig.testConfigMap().asScala, + verifyNoErr = false, + authorizer, + readStateSummaryResultData + ) + + assertNotNull(response.data) + assertEquals(1, response.data.results.size) + response.data.results.forEach(readResult => { + assertEquals(1, readResult.partitions.size) + assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED.code(), readResult.partitions.get(0).errorCode()) + }) + } + @Test def testWriteShareGroupStateSuccess(): Unit = { val topicId = Uuid.randomUuid(); @@ -10476,6 +10572,35 @@ class KafkaApisTest extends Logging { response } + def getReadShareGroupSummaryResponse(requestData: ReadShareGroupStateSummaryRequestData, configOverrides: Map[String, String] = Map.empty, + verifyNoErr: Boolean = true, authorizer: Authorizer = null, + readStateSummaryResult: util.List[ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult]): ReadShareGroupStateSummaryResponse = { + val requestChannelRequest = buildRequest(new ReadShareGroupStateSummaryRequest.Builder(requestData, true).build()) + + val future = new CompletableFuture[ReadShareGroupStateSummaryResponseData]() + when(shareCoordinator.readStateSummary( + any[RequestContext], + any[ReadShareGroupStateSummaryRequestData] + )).thenReturn(future) + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + kafkaApis = createKafkaApis( + overrideProperties = configOverrides, + authorizer = Option(authorizer), + ) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching()) + + future.complete(new ReadShareGroupStateSummaryResponseData() + .setResults(readStateSummaryResult)) + + val response = verifyNoThrottling[ReadShareGroupStateSummaryResponse](requestChannelRequest) + if (verifyNoErr) { + val expectedReadShareGroupStateSummaryResponseData = new ReadShareGroupStateSummaryResponseData() + .setResults(readStateSummaryResult) + assertEquals(expectedReadShareGroupStateSummaryResponseData, response.data) + } + response + } + def getWriteShareGroupResponse(requestData: WriteShareGroupStateRequestData, configOverrides: Map[String, String] = Map.empty, verifyNoErr: Boolean = true, authorizer: Authorizer = null, writeStateResult: util.List[WriteShareGroupStateResponseData.WriteStateResult]): WriteShareGroupStateResponse = { diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java index 01004a3d79969..3b6db31b65761 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ReadShareGroupStateResponse; +import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse; import org.apache.kafka.common.requests.WriteShareGroupStateResponse; import org.slf4j.Logger; @@ -126,7 +127,8 @@ stateManager.new WriteStateHandler( /** * Takes in a list of COMPLETED futures and combines the results, * taking care of errors if any, into a single WriteShareGroupStateResult - * @param futureMap - HashMap of {topic -> {part -> future}} + * + * @param futureMap - HashMap of {topic -> {partition -> future}} * @return Object representing combined result of type WriteShareGroupStateResult */ // visible for testing @@ -221,7 +223,8 @@ stateManager.new ReadStateHandler( /** * Takes in a list of COMPLETED futures and combines the results, * taking care of errors if any, into a single ReadShareGroupStateResult - * @param futureMap - HashMap of {topic -> {part -> future}} + * + * @param futureMap - HashMap of {topic -> {partition -> future}} * @return Object representing combined result of type ReadShareGroupStateResult */ // visible for testing @@ -288,7 +291,97 @@ public CompletableFuture deleteState(DeleteShareGro * @return A completable future of ReadShareGroupStateSummaryResult */ public CompletableFuture readSummary(ReadShareGroupStateSummaryParameters request) { - throw new RuntimeException("not implemented"); + try { + validate(request); + } catch (Exception e) { + log.error("Unable to validate read state summary request", e); + return CompletableFuture.failedFuture(e); + } + + GroupTopicPartitionData gtp = request.groupTopicPartitionData(); + String groupId = gtp.groupId(); + Map>> futureMap = new HashMap<>(); + List handlers = new ArrayList<>(); + + gtp.topicsData().forEach(topicData -> { + topicData.partitions().forEach(partitionData -> { + CompletableFuture future = futureMap + .computeIfAbsent(topicData.topicId(), k -> new HashMap<>()) + .computeIfAbsent(partitionData.partition(), k -> new CompletableFuture<>()); + + handlers.add( + stateManager.new ReadStateSummaryHandler( + groupId, + topicData.topicId(), + partitionData.partition(), + partitionData.leaderEpoch(), + future, + null + ) + ); + }); + }); + + for (PersisterStateManager.PersisterStateManagerHandler handler : handlers) { + stateManager.enqueue(handler); + } + + // Combine all futures into a single CompletableFuture + CompletableFuture combinedFuture = CompletableFuture.allOf( + handlers.stream() + .map(PersisterStateManager.ReadStateSummaryHandler::result) + .toArray(CompletableFuture[]::new)); + + // Transform the combined CompletableFuture into CompletableFuture + return combinedFuture.thenApply(v -> readSummaryResponsesToResult(futureMap)); + } + + /** + * Takes in a list of COMPLETED futures and combines the results, + * taking care of errors if any, into a single ReadShareGroupStateSummaryResult + * + * @param futureMap - HashMap of {topic -> {partition -> future}} + * @return Object representing combined result of type ReadShareGroupStateSummaryResult + */ + // visible for testing + ReadShareGroupStateSummaryResult readSummaryResponsesToResult( + Map>> futureMap + ) { + List> topicsData = futureMap.keySet().stream() + .map(topicId -> { + List partitionStateErrorData = futureMap.get(topicId).entrySet().stream() + .map(partitionFuture -> { + int partition = partitionFuture.getKey(); + CompletableFuture future = partitionFuture.getValue(); + try { + // already completed because of allOf call in the caller + ReadShareGroupStateSummaryResponse partitionResponse = future.join(); + return partitionResponse.data().results().get(0).partitions().stream() + .map(partitionResult -> PartitionFactory.newPartitionStateSummaryData( + partitionResult.partition(), + partitionResult.stateEpoch(), + partitionResult.startOffset(), + partitionResult.errorCode(), + partitionResult.errorMessage())) + .collect(Collectors.toList()); + } catch (Exception e) { + log.error("Unexpected exception while getting data from share coordinator", e); + return Collections.singletonList(PartitionFactory.newPartitionStateSummaryData( + partition, + -1, + -1, + Errors.UNKNOWN_SERVER_ERROR.code(), // No specific public error code exists for InterruptedException / ExecutionException + "Error reading state from share coordinator: " + e.getMessage())); + } + }) + .flatMap(List::stream) + .collect(Collectors.toList()); + return new TopicData<>(topicId, partitionStateErrorData); + }) + .collect(Collectors.toList()); + return new ReadShareGroupStateSummaryResult.Builder() + .setTopicsData(topicsData) + .build(); } private static void validate(WriteShareGroupStateParameters params) { @@ -315,6 +408,18 @@ private static void validate(ReadShareGroupStateParameters params) { validateGroupTopicPartitionData(prefix, params.groupTopicPartitionData()); } + private static void validate(ReadShareGroupStateSummaryParameters params) { + String prefix = "Read share group summary parameters"; + if (params == null) { + throw new IllegalArgumentException(prefix + " cannot be null."); + } + if (params.groupTopicPartitionData() == null) { + throw new IllegalArgumentException(prefix + " data cannot be null."); + } + + validateGroupTopicPartitionData(prefix, params.groupTopicPartitionData()); + } + private static void validateGroupTopicPartitionData(String prefix, GroupTopicPartitionData data) { String groupId = data.groupId(); if (groupId == null || groupId.isEmpty()) { diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpShareStatePersister.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpShareStatePersister.java index 83d3d7d74a89b..d4b22332be184 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpShareStatePersister.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpShareStatePersister.java @@ -86,13 +86,13 @@ public CompletableFuture deleteState(DeleteShareGro @Override public CompletableFuture readSummary(ReadShareGroupStateSummaryParameters request) { GroupTopicPartitionData reqData = request.groupTopicPartitionData(); - List> resultArgs = new ArrayList<>(); + List> resultArgs = new ArrayList<>(); // we will fetch topic and partition info from the request and // return valid but default response (keep partition id and topic from request but initialize other // values as default). for (TopicData topicData : reqData.topicsData()) { resultArgs.add(new TopicData<>(topicData.topicId(), topicData.partitions().stream(). - map(partitionIdData -> PartitionFactory.newPartitionStateErrorData( + map(partitionIdData -> PartitionFactory.newPartitionStateSummaryData( partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.UNINITIALIZED_START_OFFSET, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE)) .collect(Collectors.toList()))); } diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionData.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionData.java index 61b1d3a621f35..25db77380da7e 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionData.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionData.java @@ -26,7 +26,7 @@ */ public class PartitionData implements PartitionIdData, PartitionStateData, PartitionErrorData, PartitionStateErrorData, - PartitionStateBatchData, PartitionIdLeaderEpochData, PartitionAllData { + PartitionStateBatchData, PartitionIdLeaderEpochData, PartitionAllData, PartitionStateSummaryData { private final int partition; private final int stateEpoch; private final long startOffset; diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java index abd44a854ee70..009eb9cccc149 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java @@ -51,6 +51,10 @@ public static PartitionStateErrorData newPartitionStateErrorData(int partition, return new PartitionData(partition, stateEpoch, startOffset, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null); } + public static PartitionStateSummaryData newPartitionStateSummaryData(int partition, int stateEpoch, long startOffset, short errorCode, String errorMessage) { + return new PartitionData(partition, stateEpoch, startOffset, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null); + } + public static PartitionStateBatchData newPartitionStateBatchData(int partition, int stateEpoch, long startOffset, int leaderEpoch, List stateBatches) { return new PartitionData(partition, stateEpoch, startOffset, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, leaderEpoch, stateBatches); } diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java new file mode 100644 index 0000000000000..dc4732a79ae22 --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.share.persister; + +/** + * This interface is implemented by classes used to contain the data for a partition with state summary and error data (if any) + * in the interface to {@link Persister}. + */ +public interface PartitionStateSummaryData extends PartitionInfoData, PartitionIdData { + int stateEpoch(); + + long startOffset(); + + short errorCode(); + + String errorMessage(); +} diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java index ea14c4f59b432..5c9028b87e8c3 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java @@ -28,6 +28,8 @@ import org.apache.kafka.common.message.FindCoordinatorResponseData; import org.apache.kafka.common.message.ReadShareGroupStateRequestData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData; +import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData; +import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData; import org.apache.kafka.common.message.WriteShareGroupStateRequestData; import org.apache.kafka.common.message.WriteShareGroupStateResponseData; import org.apache.kafka.common.protocol.ApiKeys; @@ -38,6 +40,8 @@ import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.ReadShareGroupStateRequest; import org.apache.kafka.common.requests.ReadShareGroupStateResponse; +import org.apache.kafka.common.requests.ReadShareGroupStateSummaryRequest; +import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse; import org.apache.kafka.common.requests.WriteShareGroupStateRequest; import org.apache.kafka.common.requests.WriteShareGroupStateResponse; import org.apache.kafka.common.utils.ExponentialBackoff; @@ -679,7 +683,7 @@ protected void handleRequestResponse(ClientResponse response) { log.warn("Received retriable error in read state RPC for key {}: {}", partitionKey(), error.message()); if (!readStateBackoff.canAttempt()) { log.error("Exhausted max retries for read state RPC for key {} without success.", partitionKey()); - readStateErrorReponse(error, new Exception("Exhausted max retries to complete read state RPC without success.")); + readStateErrorResponse(error, new Exception("Exhausted max retries to complete read state RPC without success.")); return; } super.resetCoordinatorNode(); @@ -688,7 +692,7 @@ protected void handleRequestResponse(ClientResponse response) { default: log.error("Unable to perform read state RPC for key {}: {}", partitionKey(), error.message()); - readStateErrorReponse(error, null); + readStateErrorResponse(error, null); return; } } @@ -699,19 +703,19 @@ protected void handleRequestResponse(ClientResponse response) { IllegalStateException exception = new IllegalStateException( "Failed to read state for share partition " + partitionKey() ); - readStateErrorReponse(Errors.forException(exception), exception); + readStateErrorResponse(Errors.forException(exception), exception); } - protected void readStateErrorReponse(Errors error, Exception exception) { + protected void readStateErrorResponse(Errors error, Exception exception) { this.result.complete(new ReadShareGroupStateResponse( - ReadShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), error, "Error in find coordinator. " + + ReadShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), error, "Error in read state RPC. " + (exception == null ? error.message() : exception.getMessage())))); } @Override protected void findCoordinatorErrorResponse(Errors error, Exception exception) { this.result.complete(new ReadShareGroupStateResponse( - ReadShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), error, "Error in read state RPC. " + + ReadShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), error, "Error in find coordinator. " + (exception == null ? error.message() : exception.getMessage())))); } @@ -730,6 +734,147 @@ protected RPCType rpcType() { } } + public class ReadStateSummaryHandler extends PersisterStateManagerHandler { + private final int leaderEpoch; + private final CompletableFuture result; + private final BackoffManager readStateSummaryBackoff; + + public ReadStateSummaryHandler( + String groupId, + Uuid topicId, + int partition, + int leaderEpoch, + CompletableFuture result, + long backoffMs, + long backoffMaxMs, + int maxRPCRetryAttempts, + Consumer onCompleteCallback + ) { + super(groupId, topicId, partition, backoffMs, backoffMaxMs, maxRPCRetryAttempts); + this.leaderEpoch = leaderEpoch; + this.result = result; + this.readStateSummaryBackoff = new BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs); + } + + public ReadStateSummaryHandler( + String groupId, + Uuid topicId, + int partition, + int leaderEpoch, + CompletableFuture result, + Consumer onCompleteCallback + ) { + this( + groupId, + topicId, + partition, + leaderEpoch, + result, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + MAX_FIND_COORD_ATTEMPTS, + onCompleteCallback + ); + } + + @Override + protected String name() { + return "ReadStateSummaryHandler"; + } + + @Override + protected AbstractRequest.Builder requestBuilder() { + throw new RuntimeException("Read Summary requests are batchable, hence individual requests not needed."); + } + + @Override + protected boolean isResponseForRequest(ClientResponse response) { + return response.requestHeader().apiKey() == ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY; + } + + @Override + protected void handleRequestResponse(ClientResponse response) { + log.debug("Read state summary response received - {}", response); + readStateSummaryBackoff.incrementAttempt(); + + ReadShareGroupStateSummaryResponse combinedResponse = (ReadShareGroupStateSummaryResponse) response.responseBody(); + for (ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult readStateSummaryResult : combinedResponse.data().results()) { + if (readStateSummaryResult.topicId().equals(partitionKey().topicId())) { + Optional partitionStateData = + readStateSummaryResult.partitions().stream().filter(partitionResult -> partitionResult.partition() == partitionKey().partition()) + .findFirst(); + + if (partitionStateData.isPresent()) { + Errors error = Errors.forCode(partitionStateData.get().errorCode()); + switch (error) { + case NONE: + readStateSummaryBackoff.resetAttempts(); + ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult result = ReadShareGroupStateSummaryResponse.toResponseReadStateSummaryResult( + partitionKey().topicId(), + Collections.singletonList(partitionStateData.get()) + ); + this.result.complete(new ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData() + .setResults(Collections.singletonList(result)))); + return; + + // check retriable errors + case COORDINATOR_NOT_AVAILABLE: + case COORDINATOR_LOAD_IN_PROGRESS: + case NOT_COORDINATOR: + log.warn("Received retriable error in read state summary RPC for key {}: {}", partitionKey(), error.message()); + if (!readStateSummaryBackoff.canAttempt()) { + log.error("Exhausted max retries for read state summary RPC for key {} without success.", partitionKey()); + readStateSummaryErrorResponse(error, new Exception("Exhausted max retries to complete read state summary RPC without success.")); + return; + } + super.resetCoordinatorNode(); + timer.add(new PersisterTimerTask(readStateSummaryBackoff.backOff(), this)); + return; + + default: + log.error("Unable to perform read state summary RPC for key {}: {}", partitionKey(), error.message()); + readStateSummaryErrorResponse(error, null); + return; + } + } + } + } + + // no response found specific topic partition + IllegalStateException exception = new IllegalStateException( + "Failed to read state summary for share partition " + partitionKey() + ); + readStateSummaryErrorResponse(Errors.forException(exception), exception); + } + + protected void readStateSummaryErrorResponse(Errors error, Exception exception) { + this.result.complete(new ReadShareGroupStateSummaryResponse( + ReadShareGroupStateSummaryResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), error, "Error in read state summary RPC. " + + (exception == null ? error.message() : exception.getMessage())))); + } + + @Override + protected void findCoordinatorErrorResponse(Errors error, Exception exception) { + this.result.complete(new ReadShareGroupStateSummaryResponse( + ReadShareGroupStateSummaryResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), error, "Error in find coordinator. " + + (exception == null ? error.message() : exception.getMessage())))); + } + + protected CompletableFuture result() { + return result; + } + + @Override + protected boolean isBatchable() { + return true; + } + + @Override + protected RPCType rpcType() { + return RPCType.SUMMARY; + } + } + private class SendThread extends InterBrokerSendThread { private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); private final Random random; @@ -912,6 +1057,8 @@ public static AbstractRequest.Builder coalesceRequest return coalesceWrites(groupId, handlers); case READ: return coalesceReads(groupId, handlers); + case SUMMARY: + return coalesceReadSummaries(groupId, handlers); default: throw new RuntimeException("Unknown rpc type: " + rpcType); } @@ -969,5 +1116,29 @@ private static AbstractRequest.Builder coalesceReads( .setPartitions(entry.getValue())) .collect(Collectors.toList()))); } + + private static AbstractRequest.Builder coalesceReadSummaries(String groupId, List handlers) { + Map> partitionData = new HashMap<>(); + handlers.forEach(persisterStateManagerHandler -> { + assert persisterStateManagerHandler instanceof ReadStateSummaryHandler; + ReadStateSummaryHandler handler = (ReadStateSummaryHandler) persisterStateManagerHandler; + partitionData.computeIfAbsent(handler.partitionKey().topicId(), topicId -> new LinkedList<>()) + .add( + new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartition(handler.partitionKey().partition()) + .setLeaderEpoch(handler.leaderEpoch) + ); + }); + + return new ReadShareGroupStateSummaryRequest.Builder(new ReadShareGroupStateSummaryRequestData() + .setGroupId(groupId) + .setTopics(partitionData.entrySet().stream() + .map(entry -> new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(entry.getKey()) + .setPartitions(entry.getValue())) + .collect(Collectors.toList())), + true + ); + } } } diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java index 7172c6acef70b..7e0bee13c3806 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java @@ -26,9 +26,9 @@ * This class contains the result from {@link Persister#readSummary(ReadShareGroupStateSummaryParameters)}. */ public class ReadShareGroupStateSummaryResult implements PersisterResult { - private final List> topicsData; + private final List> topicsData; - private ReadShareGroupStateSummaryResult(List> topicsData) { + private ReadShareGroupStateSummaryResult(List> topicsData) { this.topicsData = topicsData; } @@ -37,7 +37,7 @@ public static ReadShareGroupStateSummaryResult from(ReadShareGroupStateSummaryRe .setTopicsData(data.results().stream() .map(readStateSummaryResult -> new TopicData<>(readStateSummaryResult.topicId(), readStateSummaryResult.partitions().stream() - .map(partitionResult -> PartitionFactory.newPartitionStateErrorData( + .map(partitionResult -> PartitionFactory.newPartitionStateSummaryData( partitionResult.partition(), partitionResult.stateEpoch(), partitionResult.startOffset(), partitionResult.errorCode(), partitionResult.errorMessage())) .collect(Collectors.toList()))) .collect(Collectors.toList())) @@ -45,9 +45,9 @@ public static ReadShareGroupStateSummaryResult from(ReadShareGroupStateSummaryRe } public static class Builder { - private List> topicsData; + private List> topicsData; - public Builder setTopicsData(List> topicsData) { + public Builder setTopicsData(List> topicsData) { this.topicsData = topicsData; return this; } @@ -56,4 +56,8 @@ public ReadShareGroupStateSummaryResult build() { return new ReadShareGroupStateSummaryResult(topicsData); } } + + public List> topicsData() { + return topicsData; + } } diff --git a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java index 9e0ce91bc85cc..88c83c88914f2 100644 --- a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java @@ -25,12 +25,15 @@ import org.apache.kafka.common.message.FindCoordinatorResponseData; import org.apache.kafka.common.message.ReadShareGroupStateRequestData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData; +import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData; import org.apache.kafka.common.message.WriteShareGroupStateRequestData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.ReadShareGroupStateRequest; import org.apache.kafka.common.requests.ReadShareGroupStateResponse; +import org.apache.kafka.common.requests.ReadShareGroupStateSummaryRequest; +import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse; import org.apache.kafka.common.requests.WriteShareGroupStateRequest; import org.apache.kafka.common.requests.WriteShareGroupStateResponse; import org.apache.kafka.common.utils.Time; @@ -170,8 +173,8 @@ public void testWriteStateValidate() { .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() .setGroupId(groupId) .setTopicsData(Collections.singletonList(new TopicData<>(null, - Collections.singletonList(PartitionFactory.newPartitionStateBatchData( - partition, 1, 0, 0, null))))).build()).build()); + Collections.singletonList(PartitionFactory.newPartitionStateBatchData( + partition, 1, 0, 0, null))))).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); assertFutureThrows(result, IllegalArgumentException.class); @@ -192,8 +195,8 @@ public void testWriteStateValidate() { .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() .setGroupId(groupId) .setTopicsData(Collections.singletonList(new TopicData<>(topicId, - Collections.singletonList(PartitionFactory.newPartitionStateBatchData( - incorrectPartition, 1, 0, 0, null))))).build()).build()); + Collections.singletonList(PartitionFactory.newPartitionStateBatchData( + incorrectPartition, 1, 0, 0, null))))).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); assertFutureThrows(result, IllegalArgumentException.class); @@ -246,7 +249,7 @@ public void testReadStateValidate() { .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() .setGroupId(groupId) .setTopicsData(Collections.singletonList(new TopicData<>(null, - Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(partition, 1)))) + Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(partition, 1)))) ).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); @@ -268,7 +271,82 @@ public void testReadStateValidate() { .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() .setGroupId(groupId) .setTopicsData(Collections.singletonList(new TopicData<>(topicId, - Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(incorrectPartition, 1))))).build()).build()); + Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(incorrectPartition, 1))))).build()).build()); + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + assertFutureThrows(result, IllegalArgumentException.class); + } + + @Test + public void testReadStateSummaryValidate() { + + String groupId = "group1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 0; + int incorrectPartition = -1; + + // Request Parameters are null + DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); + CompletableFuture result = defaultStatePersister.readSummary(null); + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + assertFutureThrows(result, IllegalArgumentException.class); + + // groupTopicPartitionData is null + defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); + result = defaultStatePersister.readSummary(new ReadShareGroupStateSummaryParameters.Builder().setGroupTopicPartitionData(null).build()); + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + assertFutureThrows(result, IllegalArgumentException.class); + + // groupId is null + defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); + result = defaultStatePersister.readSummary(new ReadShareGroupStateSummaryParameters.Builder() + .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() + .setGroupId(null).build()).build()); + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + assertFutureThrows(result, IllegalArgumentException.class); + + // topicsData is empty + defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); + result = defaultStatePersister.readSummary(new ReadShareGroupStateSummaryParameters.Builder() + .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() + .setGroupId(groupId) + .setTopicsData(Collections.emptyList()).build()).build()); + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + assertFutureThrows(result, IllegalArgumentException.class); + + // topicId is null + defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); + result = defaultStatePersister.readSummary(new ReadShareGroupStateSummaryParameters.Builder() + .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() + .setGroupId(groupId) + .setTopicsData(List.of(new TopicData<>(null, + List.of(PartitionFactory.newPartitionIdLeaderEpochData(partition, 1)))) + ).build()).build()); + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + assertFutureThrows(result, IllegalArgumentException.class); + + // partitionData is empty + defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); + result = defaultStatePersister.readSummary(new ReadShareGroupStateSummaryParameters.Builder() + .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() + .setGroupId(groupId) + .setTopicsData(List.of(new TopicData<>(topicId, Collections.emptyList()))).build()).build()); + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + assertFutureThrows(result, IllegalArgumentException.class); + + // partition value is incorrect + defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); + result = defaultStatePersister.readSummary(new ReadShareGroupStateSummaryParameters.Builder() + .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() + .setGroupId(groupId) + .setTopicsData(List.of(new TopicData<>(topicId, + List.of(PartitionFactory.newPartitionIdLeaderEpochData(incorrectPartition, 1))))).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); assertFutureThrows(result, IllegalArgumentException.class); @@ -576,6 +654,141 @@ public void testReadStateSuccess() { assertEquals(expectedResultMap, resultMap); } + @Test + public void testReadStateSummarySuccess() { + + MockClient client = new MockClient(MOCK_TIME); + + String groupId = "group1"; + Uuid topicId1 = Uuid.randomUuid(); + int partition1 = 10; + + Uuid topicId2 = Uuid.randomUuid(); + int partition2 = 8; + + Node suppliedNode = new Node(0, HOST, PORT); + Node coordinatorNode1 = new Node(5, HOST, PORT); + Node coordinatorNode2 = new Node(6, HOST, PORT); + + String coordinatorKey1 = SharePartitionKey.asCoordinatorKey(groupId, topicId1, partition1); + String coordinatorKey2 = SharePartitionKey.asCoordinatorKey(groupId, topicId2, partition2); + + client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest + && ((FindCoordinatorRequest) body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() + && ((FindCoordinatorRequest) body).data().coordinatorKeys().get(0).equals(coordinatorKey1), + new FindCoordinatorResponse( + new FindCoordinatorResponseData() + .setCoordinators(List.of( + new FindCoordinatorResponseData.Coordinator() + .setNodeId(5) + .setHost(HOST) + .setPort(PORT) + .setErrorCode(Errors.NONE.code()) + )) + ), + suppliedNode + ); + + client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest + && ((FindCoordinatorRequest) body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() + && ((FindCoordinatorRequest) body).data().coordinatorKeys().get(0).equals(coordinatorKey2), + new FindCoordinatorResponse( + new FindCoordinatorResponseData() + .setCoordinators(List.of( + new FindCoordinatorResponseData.Coordinator() + .setNodeId(6) + .setHost(HOST) + .setPort(PORT) + .setErrorCode(Errors.NONE.code()) + )) + ), + suppliedNode + ); + + client.prepareResponseFrom( + body -> { + ReadShareGroupStateSummaryRequest request = (ReadShareGroupStateSummaryRequest) body; + String requestGroupId = request.data().groupId(); + Uuid requestTopicId = request.data().topics().get(0).topicId(); + int requestPartition = request.data().topics().get(0).partitions().get(0).partition(); + + return requestGroupId.equals(groupId) && requestTopicId == topicId1 && requestPartition == partition1; + }, + new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId1, partition1, 0, 1)), + coordinatorNode1); + + client.prepareResponseFrom( + body -> { + ReadShareGroupStateSummaryRequest request = (ReadShareGroupStateSummaryRequest) body; + String requestGroupId = request.data().groupId(); + Uuid requestTopicId = request.data().topics().get(0).topicId(); + int requestPartition = request.data().topics().get(0).partitions().get(0).partition(); + + return requestGroupId.equals(groupId) && requestTopicId == topicId2 && requestPartition == partition2; + }, + new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId2, partition2, 0, 1)), + coordinatorNode2); + + ShareCoordinatorMetadataCacheHelper cacheHelper = getDefaultCacheHelper(suppliedNode); + + DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder() + .withKafkaClient(client) + .withCacheHelper(cacheHelper) + .build(); + + ReadShareGroupStateSummaryParameters request = ReadShareGroupStateSummaryParameters.from( + new ReadShareGroupStateSummaryRequestData() + .setGroupId(groupId) + .setTopics(Arrays.asList( + new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(topicId1) + .setPartitions(List.of( + new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartition(partition1) + .setLeaderEpoch(1) + )), + new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(topicId2) + .setPartitions(List.of( + new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartition(partition2) + .setLeaderEpoch(1) + )) + )) + ); + + CompletableFuture resultFuture = defaultStatePersister.readSummary(request); + + ReadShareGroupStateSummaryResult result = null; + try { + // adding long delay to allow for environment/GC issues + result = resultFuture.get(10L, TimeUnit.SECONDS); + } catch (Exception e) { + fail("Unexpected exception", e); + } + + HashSet resultMap = new HashSet<>(); + result.topicsData().forEach( + topicData -> topicData.partitions().forEach( + partitionData -> resultMap.add((PartitionData) partitionData) + ) + ); + + HashSet expectedResultMap = new HashSet<>(); + expectedResultMap.add( + (PartitionData) PartitionFactory.newPartitionStateSummaryData(partition1, 1, 0, Errors.NONE.code(), + null + )); + + expectedResultMap.add( + (PartitionData) PartitionFactory.newPartitionStateSummaryData(partition2, 1, 0, Errors.NONE.code(), + null + )); + + assertEquals(2, result.topicsData().size()); + assertEquals(expectedResultMap, resultMap); + } + @Test public void testWriteStateResponseToResultPartialResults() { Map>> futureMap = new HashMap<>(); @@ -790,6 +1003,114 @@ public void testReadStateResponseToResultFailedFuture() { ); } + @Test + public void testReadStateSummaryResponseToResultPartialResults() { + Map>> futureMap = new HashMap<>(); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), 1, null); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), 1, null); + + // one entry has valid results + futureMap.computeIfAbsent(tp1.topicId(), k -> new HashMap<>()) + .put(tp1.partition(), CompletableFuture.completedFuture( + new ReadShareGroupStateSummaryResponse( + ReadShareGroupStateSummaryResponse.toResponseData( + tp1.topicId(), + tp1.partition(), + 1L, + 2 + ) + ) + ) + ); + + // one entry has error + futureMap.computeIfAbsent(tp2.topicId(), k -> new HashMap<>()) + .put(tp2.partition(), CompletableFuture.completedFuture( + new ReadShareGroupStateSummaryResponse( + ReadShareGroupStateSummaryResponse.toErrorResponseData( + tp2.topicId(), + tp2.partition(), + Errors.UNKNOWN_TOPIC_OR_PARTITION, + "unknown tp" + ) + ) + ) + ); + + PersisterStateManager psm = mock(PersisterStateManager.class); + DefaultStatePersister dsp = new DefaultStatePersister(psm); + + ReadShareGroupStateSummaryResult results = dsp.readSummaryResponsesToResult(futureMap); + + // results should contain partial results + assertEquals(2, results.topicsData().size()); + assertTrue( + results.topicsData().contains( + new TopicData<>( + tp1.topicId(), + List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, Errors.NONE.code(), null)) + ) + ) + ); + assertTrue( + results.topicsData().contains( + new TopicData<>( + tp2.topicId(), + List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "unknown tp")) + ) + ) + ); + } + + @Test + public void testReadStateSummaryResponseToResultFailedFuture() { + Map>> futureMap = new HashMap<>(); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), 1, null); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), 1, null); + + // one entry has valid results + futureMap.computeIfAbsent(tp1.topicId(), k -> new HashMap<>()) + .put(tp1.partition(), CompletableFuture.completedFuture( + new ReadShareGroupStateSummaryResponse( + ReadShareGroupStateSummaryResponse.toResponseData( + tp1.topicId(), + tp1.partition(), + 1L, + 2 + ) + ) + ) + ); + + // one entry has failed future + futureMap.computeIfAbsent(tp2.topicId(), k -> new HashMap<>()) + .put(tp2.partition(), CompletableFuture.failedFuture(new Exception("scary stuff"))); + + PersisterStateManager psm = mock(PersisterStateManager.class); + DefaultStatePersister dsp = new DefaultStatePersister(psm); + + ReadShareGroupStateSummaryResult results = dsp.readSummaryResponsesToResult(futureMap); + + // results should contain partial results + assertEquals(2, results.topicsData().size()); + assertTrue( + results.topicsData().contains( + new TopicData<>( + tp1.topicId(), + List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, Errors.NONE.code(), null)) + ) + ) + ); + assertTrue( + results.topicsData().contains( + new TopicData<>( + tp2.topicId(), + List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), -1, -1L, Errors.UNKNOWN_SERVER_ERROR.code(), "Error reading state from share coordinator: java.lang.Exception: scary stuff")) + ) + ) + ); + } + @Test public void testDefaultPersisterClose() { PersisterStateManager psm = mock(PersisterStateManager.class); @@ -798,7 +1119,7 @@ public void testDefaultPersisterClose() { verify(psm, times(0)).stop(); dsp.stop(); - + verify(psm, times(1)).stop(); } catch (Exception e) { fail("Unexpected exception", e); diff --git a/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java b/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java index e410c2e3588a8..afa9b07aa177f 100644 --- a/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.FindCoordinatorResponseData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData; +import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData; import org.apache.kafka.common.message.WriteShareGroupStateResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractRequest; @@ -31,6 +32,8 @@ import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.ReadShareGroupStateRequest; import org.apache.kafka.common.requests.ReadShareGroupStateResponse; +import org.apache.kafka.common.requests.ReadShareGroupStateSummaryRequest; +import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse; import org.apache.kafka.common.requests.WriteShareGroupStateRequest; import org.apache.kafka.common.requests.WriteShareGroupStateResponse; import org.apache.kafka.common.utils.Time; @@ -2070,6 +2073,776 @@ public void testReadStateRequestFailureMaxRetriesExhausted() { } } + @Test + public void testReadStateSummaryRequestCoordinatorFoundSuccessfully() { + MockClient client = new MockClient(MOCK_TIME); + + String groupId = "group1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 10; + + Node suppliedNode = new Node(0, HOST, PORT); + Node coordinatorNode = new Node(1, HOST, PORT); + + String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId, topicId, partition); + + client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest + && ((FindCoordinatorRequest) body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() + && ((FindCoordinatorRequest) body).data().coordinatorKeys().get(0).equals(coordinatorKey), + new FindCoordinatorResponse( + new FindCoordinatorResponseData() + .setCoordinators(List.of( + new FindCoordinatorResponseData.Coordinator() + .setNodeId(1) + .setHost(HOST) + .setPort(PORT) + .setErrorCode(Errors.NONE.code()) + )) + ), + suppliedNode + ); + + client.prepareResponseFrom(body -> { + ReadShareGroupStateSummaryRequest request = (ReadShareGroupStateSummaryRequest) body; + String requestGroupId = request.data().groupId(); + Uuid requestTopicId = request.data().topics().get(0).topicId(); + int requestPartition = request.data().topics().get(0).partitions().get(0).partition(); + + return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition; + }, new ReadShareGroupStateSummaryResponse( + new ReadShareGroupStateSummaryResponseData() + .setResults(List.of( + new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId) + .setPartitions(List.of( + new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage("") + .setStateEpoch(1) + .setStartOffset(0) + )) + )) + ), coordinatorNode); + + ShareCoordinatorMetadataCacheHelper cacheHelper = getDefaultCacheHelper(suppliedNode); + + PersisterStateManager stateManager = PersisterStateManagerBuilder.builder() + .withKafkaClient(client) + .withTimer(mockTimer) + .withCacheHelper(cacheHelper) + .build(); + + stateManager.start(); + + CompletableFuture future = new CompletableFuture<>(); + + PersisterStateManager.ReadStateSummaryHandler handler = spy(stateManager.new ReadStateSummaryHandler( + groupId, + topicId, + partition, + 0, + future, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + MAX_RPC_RETRY_ATTEMPTS, + null + )); + + stateManager.enqueue(handler); + + CompletableFuture resultFuture = handler.result(); + + ReadShareGroupStateSummaryResponse result = null; + try { + result = resultFuture.get(); + } catch (Exception e) { + fail("Failed to get result from future", e); + } + + ReadShareGroupStateSummaryResponseData.PartitionResult partitionResult = result.data().results().get(0).partitions().get(0); + + verify(handler, times(1)).findShareCoordinatorBuilder(); + verify(handler, times(0)).requestBuilder(); + + // Verifying the coordinator node was populated correctly by the FIND_COORDINATOR request + assertEquals(coordinatorNode, handler.getCoordinatorNode()); + + // Verifying the result returned in correct + assertEquals(partition, partitionResult.partition()); + assertEquals(Errors.NONE.code(), partitionResult.errorCode()); + assertEquals(1, partitionResult.stateEpoch()); + assertEquals(0, partitionResult.startOffset()); + + try { + // Stopping the state manager + stateManager.stop(); + } catch (Exception e) { + fail("Failed to stop state manager", e); + } + } + + @Test + public void testReadStateSummaryRequestIllegalStateCoordinatorFoundSuccessfully() { + MockClient client = new MockClient(MOCK_TIME); + + String groupId = "group1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 10; + + Node suppliedNode = new Node(0, HOST, PORT); + Node coordinatorNode = new Node(1, HOST, PORT); + + String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId, topicId, partition); + + client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest + && ((FindCoordinatorRequest) body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() + && ((FindCoordinatorRequest) body).data().coordinatorKeys().get(0).equals(coordinatorKey), + new FindCoordinatorResponse( + new FindCoordinatorResponseData() + .setCoordinators(List.of( + new FindCoordinatorResponseData.Coordinator() + .setNodeId(1) + .setHost(HOST) + .setPort(PORT) + .setErrorCode(Errors.NONE.code()) + )) + ), + suppliedNode + ); + + client.prepareResponseFrom(body -> { + ReadShareGroupStateSummaryRequest request = (ReadShareGroupStateSummaryRequest) body; + String requestGroupId = request.data().groupId(); + Uuid requestTopicId = request.data().topics().get(0).topicId(); + int requestPartition = request.data().topics().get(0).partitions().get(0).partition(); + + return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition; + }, new ReadShareGroupStateSummaryResponse( + new ReadShareGroupStateSummaryResponseData() + .setResults(List.of( + new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(Uuid.randomUuid()) + .setPartitions(List.of( + new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(500) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage("") + .setStateEpoch(1) + .setStartOffset(0) + )) + )) + ), coordinatorNode); + + ShareCoordinatorMetadataCacheHelper cacheHelper = getDefaultCacheHelper(suppliedNode); + + PersisterStateManager stateManager = PersisterStateManagerBuilder.builder() + .withKafkaClient(client) + .withTimer(mockTimer) + .withCacheHelper(cacheHelper) + .build(); + + stateManager.start(); + + CompletableFuture future = new CompletableFuture<>(); + + PersisterStateManager.ReadStateSummaryHandler handler = spy(stateManager.new ReadStateSummaryHandler( + groupId, + topicId, + partition, + 0, + future, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + MAX_RPC_RETRY_ATTEMPTS, + null + )); + + stateManager.enqueue(handler); + + CompletableFuture resultFuture = handler.result(); + + ReadShareGroupStateSummaryResponse result = null; + try { + result = resultFuture.get(); + } catch (Exception e) { + fail("Failed to get result from future", e); + } + + ReadShareGroupStateSummaryResponseData.PartitionResult partitionResult = result.data().results().get(0).partitions().get(0); + + verify(handler, times(1)).findShareCoordinatorBuilder(); + verify(handler, times(0)).requestBuilder(); + + // Verifying the coordinator node was populated correctly by the FIND_COORDINATOR request + assertEquals(coordinatorNode, handler.getCoordinatorNode()); + + // Verifying the result returned in correct + assertEquals(Errors.UNKNOWN_SERVER_ERROR.code(), partitionResult.errorCode()); + + try { + // Stopping the state manager + stateManager.stop(); + } catch (Exception e) { + fail("Failed to stop state manager", e); + } + } + + @Test + public void testReadStateSummaryRequestRetryWithNotCoordinatorSuccessfulOnRetry() throws ExecutionException, InterruptedException { + MockClient client = new MockClient(MOCK_TIME); + + String groupId = "group1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 10; + + Node suppliedNode = new Node(0, HOST, PORT); + Node coordinatorNode = new Node(1, HOST, PORT); + + String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId, topicId, partition); + + client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest + && ((FindCoordinatorRequest) body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() + && ((FindCoordinatorRequest) body).data().coordinatorKeys().get(0).equals(coordinatorKey), + new FindCoordinatorResponse( + new FindCoordinatorResponseData() + .setCoordinators(List.of( + new FindCoordinatorResponseData.Coordinator() + .setErrorCode(Errors.NOT_COORDINATOR.code()) + )) + ), + suppliedNode + ); + + client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest + && ((FindCoordinatorRequest) body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() + && ((FindCoordinatorRequest) body).data().coordinatorKeys().get(0).equals(coordinatorKey), + new FindCoordinatorResponse( + new FindCoordinatorResponseData() + .setCoordinators(List.of( + new FindCoordinatorResponseData.Coordinator() + .setNodeId(1) + .setHost(HOST) + .setPort(PORT) + .setErrorCode(Errors.NONE.code()) + )) + ), + suppliedNode + ); + + client.prepareResponseFrom(body -> { + ReadShareGroupStateSummaryRequest request = (ReadShareGroupStateSummaryRequest) body; + String requestGroupId = request.data().groupId(); + Uuid requestTopicId = request.data().topics().get(0).topicId(); + int requestPartition = request.data().topics().get(0).partitions().get(0).partition(); + + return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition; + }, new ReadShareGroupStateSummaryResponse( + new ReadShareGroupStateSummaryResponseData() + .setResults(List.of( + new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId) + .setPartitions(List.of( + new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage("") + .setStateEpoch(1) + .setStartOffset(0) + )) + )) + ), coordinatorNode); + + ShareCoordinatorMetadataCacheHelper cacheHelper = getDefaultCacheHelper(suppliedNode); + + PersisterStateManager stateManager = PersisterStateManagerBuilder.builder() + .withKafkaClient(client) + .withTimer(mockTimer) + .withCacheHelper(cacheHelper) + .build(); + + stateManager.start(); + + CompletableFuture future = new CompletableFuture<>(); + + PersisterStateManager.ReadStateSummaryHandler handler = spy(stateManager.new ReadStateSummaryHandler( + groupId, + topicId, + partition, + 0, + future, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + MAX_RPC_RETRY_ATTEMPTS, + null + )); + + stateManager.enqueue(handler); + + CompletableFuture resultFuture = handler.result(); + + TestUtils.waitForCondition(resultFuture::isDone, TestUtils.DEFAULT_MAX_WAIT_MS, 10L, () -> "Failed to get result from future"); + + ReadShareGroupStateSummaryResponse result = resultFuture.get(); + ReadShareGroupStateSummaryResponseData.PartitionResult partitionResult = result.data().results().get(0).partitions().get(0); + + verify(handler, times(2)).findShareCoordinatorBuilder(); + verify(handler, times(0)).requestBuilder(); + + // Verifying the coordinator node was populated correctly by the FIND_COORDINATOR request + assertEquals(coordinatorNode, handler.getCoordinatorNode()); + + // Verifying the result returned in correct + assertEquals(partition, partitionResult.partition()); + assertEquals(Errors.NONE.code(), partitionResult.errorCode()); + + try { + // Stopping the state manager + stateManager.stop(); + } catch (Exception e) { + fail("Failed to stop state manager", e); + } + } + + @Test + public void testReadStateSummaryRequestCoordinatorFoundOnRetry() { + MockClient client = new MockClient(MOCK_TIME); + + String groupId = "group1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 10; + + Node suppliedNode = new Node(0, HOST, PORT); + Node coordinatorNode = new Node(1, HOST, PORT); + + String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId, topicId, partition); + + client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest + && ((FindCoordinatorRequest) body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() + && ((FindCoordinatorRequest) body).data().coordinatorKeys().get(0).equals(coordinatorKey), + new FindCoordinatorResponse( + new FindCoordinatorResponseData() + .setCoordinators(List.of( + new FindCoordinatorResponseData.Coordinator() + .setErrorCode(Errors.NOT_COORDINATOR.code()) + )) + ), + suppliedNode + ); + + client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest + && ((FindCoordinatorRequest) body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() + && ((FindCoordinatorRequest) body).data().coordinatorKeys().get(0).equals(coordinatorKey), + new FindCoordinatorResponse( + new FindCoordinatorResponseData() + .setCoordinators(List.of( + new FindCoordinatorResponseData.Coordinator() + .setNodeId(1) + .setHost(HOST) + .setPort(PORT) + .setErrorCode(Errors.NONE.code()) + )) + ), + suppliedNode + ); + + client.prepareResponseFrom(body -> { + ReadShareGroupStateSummaryRequest request = (ReadShareGroupStateSummaryRequest) body; + String requestGroupId = request.data().groupId(); + Uuid requestTopicId = request.data().topics().get(0).topicId(); + int requestPartition = request.data().topics().get(0).partitions().get(0).partition(); + + return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition; + }, new ReadShareGroupStateSummaryResponse( + new ReadShareGroupStateSummaryResponseData() + .setResults(List.of( + new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId) + .setPartitions(List.of( + new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage("") + .setStateEpoch(1) + .setStartOffset(0) + )) + )) + ), coordinatorNode); + + ShareCoordinatorMetadataCacheHelper cacheHelper = getDefaultCacheHelper(suppliedNode); + + PersisterStateManager stateManager = PersisterStateManagerBuilder.builder() + .withKafkaClient(client) + .withTimer(mockTimer) + .withCacheHelper(cacheHelper) + .build(); + + stateManager.start(); + + CompletableFuture future = new CompletableFuture<>(); + + PersisterStateManager.ReadStateSummaryHandler handler = spy(stateManager.new ReadStateSummaryHandler( + groupId, + topicId, + partition, + 0, + future, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + MAX_RPC_RETRY_ATTEMPTS, + null + )); + + stateManager.enqueue(handler); + + CompletableFuture resultFuture = handler.result(); + + ReadShareGroupStateSummaryResponse result = null; + try { + result = resultFuture.get(); + } catch (Exception e) { + fail("Failed to get result from future", e); + } + + ReadShareGroupStateSummaryResponseData.PartitionResult partitionResult = result.data().results().get(0).partitions().get(0); + + verify(handler, times(2)).findShareCoordinatorBuilder(); + verify(handler, times(0)).requestBuilder(); + + // Verifying the coordinator node was populated correctly by the FIND_COORDINATOR request + assertEquals(coordinatorNode, handler.getCoordinatorNode()); + + // Verifying the result returned in correct + assertEquals(partition, partitionResult.partition()); + assertEquals(Errors.NONE.code(), partitionResult.errorCode()); + assertEquals(1, partitionResult.stateEpoch()); + assertEquals(0, partitionResult.startOffset()); + + try { + // Stopping the state manager + stateManager.stop(); + } catch (Exception e) { + fail("Failed to stop state manager", e); + } + } + + @Test + public void testReadStateSummaryRequestWithCoordinatorNodeLookup() { + MockClient client = new MockClient(MOCK_TIME); + + String groupId = "group1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 10; + + Node coordinatorNode = new Node(1, HOST, PORT); + + client.prepareResponseFrom(body -> { + ReadShareGroupStateSummaryRequest request = (ReadShareGroupStateSummaryRequest) body; + String requestGroupId = request.data().groupId(); + Uuid requestTopicId = request.data().topics().get(0).topicId(); + int requestPartition = request.data().topics().get(0).partitions().get(0).partition(); + + return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition; + }, new ReadShareGroupStateSummaryResponse( + new ReadShareGroupStateSummaryResponseData() + .setResults(List.of( + new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId) + .setPartitions(List.of( + new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage("") + .setStateEpoch(1) + .setStartOffset(0) + )) + )) + ), coordinatorNode); + + ShareCoordinatorMetadataCacheHelper cacheHelper = getCoordinatorCacheHelper(coordinatorNode); + + PersisterStateManager stateManager = PersisterStateManagerBuilder.builder() + .withKafkaClient(client) + .withTimer(mockTimer) + .withCacheHelper(cacheHelper) + .build(); + + stateManager.start(); + + CompletableFuture future = new CompletableFuture<>(); + + PersisterStateManager.ReadStateSummaryHandler handler = spy(stateManager.new ReadStateSummaryHandler( + groupId, + topicId, + partition, + 0, + future, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + MAX_RPC_RETRY_ATTEMPTS, + null + )); + + stateManager.enqueue(handler); + + CompletableFuture resultFuture = handler.result(); + + ReadShareGroupStateSummaryResponse result = null; + try { + result = resultFuture.get(); + } catch (Exception e) { + fail("Failed to get result from future", e); + } + + ReadShareGroupStateSummaryResponseData.PartitionResult partitionResult = result.data().results().get(0).partitions().get(0); + + verify(handler, times(0)).findShareCoordinatorBuilder(); + verify(handler, times(0)).requestBuilder(); + verify(handler, times(1)).onComplete(any()); + + // Verifying the coordinator node was populated correctly by the constructor + assertEquals(coordinatorNode, handler.getCoordinatorNode()); + + // Verifying the result returned in correct + assertEquals(partition, partitionResult.partition()); + assertEquals(Errors.NONE.code(), partitionResult.errorCode()); + assertEquals(1, partitionResult.stateEpoch()); + assertEquals(0, partitionResult.startOffset()); + + try { + // Stopping the state manager + stateManager.stop(); + } catch (Exception e) { + fail("Failed to stop state manager", e); + } + } + + @Test + public void testReadStateSummaryRequestRetryWithCoordinatorNodeLookup() { + MockClient client = new MockClient(MOCK_TIME); + + String groupId = "group1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 10; + + Node coordinatorNode = new Node(1, HOST, PORT); + + client.prepareResponseFrom(body -> { + ReadShareGroupStateSummaryRequest request = (ReadShareGroupStateSummaryRequest) body; + String requestGroupId = request.data().groupId(); + Uuid requestTopicId = request.data().topics().get(0).topicId(); + int requestPartition = request.data().topics().get(0).partitions().get(0).partition(); + + return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition; + }, new ReadShareGroupStateSummaryResponse( + new ReadShareGroupStateSummaryResponseData() + .setResults(List.of( + new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId) + .setPartitions(List.of( + new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + .setErrorMessage("") + .setStateEpoch(1) + .setStartOffset(0) + )) + )) + ), coordinatorNode); + + client.prepareResponseFrom(body -> { + ReadShareGroupStateSummaryRequest request = (ReadShareGroupStateSummaryRequest) body; + String requestGroupId = request.data().groupId(); + Uuid requestTopicId = request.data().topics().get(0).topicId(); + int requestPartition = request.data().topics().get(0).partitions().get(0).partition(); + + return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition; + }, new ReadShareGroupStateSummaryResponse( + new ReadShareGroupStateSummaryResponseData() + .setResults(List.of( + new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId) + .setPartitions(List.of( + new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage("") + .setStateEpoch(1) + .setStartOffset(0) + )) + )) + ), coordinatorNode); + + ShareCoordinatorMetadataCacheHelper cacheHelper = getCoordinatorCacheHelper(coordinatorNode); + + PersisterStateManager stateManager = PersisterStateManagerBuilder.builder() + .withKafkaClient(client) + .withTimer(mockTimer) + .withCacheHelper(cacheHelper) + .build(); + + stateManager.start(); + + CompletableFuture future = new CompletableFuture<>(); + + PersisterStateManager.ReadStateSummaryHandler handler = spy(stateManager.new ReadStateSummaryHandler( + groupId, + topicId, + partition, + 0, + future, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + MAX_RPC_RETRY_ATTEMPTS, + null + )); + + stateManager.enqueue(handler); + + CompletableFuture resultFuture = handler.result(); + + ReadShareGroupStateSummaryResponse result = null; + try { + result = resultFuture.get(); + } catch (Exception e) { + fail("Failed to get result from future", e); + } + + ReadShareGroupStateSummaryResponseData.PartitionResult partitionResult = result.data().results().get(0).partitions().get(0); + + verify(handler, times(0)).findShareCoordinatorBuilder(); + verify(handler, times(0)).requestBuilder(); + verify(handler, times(2)).onComplete(any()); + + // Verifying the coordinator node was populated correctly by the constructor + assertEquals(coordinatorNode, handler.getCoordinatorNode()); + + // Verifying the result returned in correct + assertEquals(partition, partitionResult.partition()); + assertEquals(Errors.NONE.code(), partitionResult.errorCode()); + assertEquals(1, partitionResult.stateEpoch()); + assertEquals(0, partitionResult.startOffset()); + + try { + // Stopping the state manager + stateManager.stop(); + } catch (Exception e) { + fail("Failed to stop state manager", e); + } + } + + @Test + public void testReadStateSummaryRequestFailureMaxRetriesExhausted() { + MockClient client = new MockClient(MOCK_TIME); + + String groupId = "group1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 10; + + Node coordinatorNode = new Node(1, HOST, PORT); + + client.prepareResponseFrom(body -> { + ReadShareGroupStateSummaryRequest request = (ReadShareGroupStateSummaryRequest) body; + String requestGroupId = request.data().groupId(); + Uuid requestTopicId = request.data().topics().get(0).topicId(); + int requestPartition = request.data().topics().get(0).partitions().get(0).partition(); + + return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition; + }, new ReadShareGroupStateSummaryResponse( + new ReadShareGroupStateSummaryResponseData() + .setResults(List.of( + new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId) + .setPartitions(List.of( + new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + .setErrorMessage("") + .setStateEpoch(1) + .setStartOffset(0) + )) + )) + ), coordinatorNode); + + client.prepareResponseFrom(body -> { + ReadShareGroupStateSummaryRequest request = (ReadShareGroupStateSummaryRequest) body; + String requestGroupId = request.data().groupId(); + Uuid requestTopicId = request.data().topics().get(0).topicId(); + int requestPartition = request.data().topics().get(0).partitions().get(0).partition(); + + return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition; + }, new ReadShareGroupStateSummaryResponse( + new ReadShareGroupStateSummaryResponseData() + .setResults(List.of( + new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId) + .setPartitions(List.of( + new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) + .setErrorMessage("") + .setStateEpoch(1) + .setStartOffset(0) + )) + )) + ), coordinatorNode); + + ShareCoordinatorMetadataCacheHelper cacheHelper = getCoordinatorCacheHelper(coordinatorNode); + + PersisterStateManager stateManager = PersisterStateManagerBuilder.builder() + .withKafkaClient(client) + .withTimer(mockTimer) + .withCacheHelper(cacheHelper) + .build(); + + stateManager.start(); + + CompletableFuture future = new CompletableFuture<>(); + + PersisterStateManager.ReadStateSummaryHandler handler = spy(stateManager.new ReadStateSummaryHandler( + groupId, + topicId, + partition, + 0, + future, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + 2, + null + )); + + stateManager.enqueue(handler); + + CompletableFuture resultFuture = handler.result(); + + ReadShareGroupStateSummaryResponse result = null; + try { + result = resultFuture.get(); + } catch (Exception e) { + fail("Failed to get result from future", e); + } + + ReadShareGroupStateSummaryResponseData.PartitionResult partitionResult = result.data().results().get(0).partitions().get(0); + + verify(handler, times(0)).findShareCoordinatorBuilder(); + verify(handler, times(0)).requestBuilder(); + verify(handler, times(2)).onComplete(any()); + + // Verifying the coordinator node was populated correctly by the constructor + assertEquals(coordinatorNode, handler.getCoordinatorNode()); + + // Verifying the result returned in correct + assertEquals(partition, partitionResult.partition()); + assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS.code(), partitionResult.errorCode()); + + try { + // Stopping the state manager + stateManager.stop(); + } catch (Exception e) { + fail("Failed to stop state manager", e); + } + } + @Test public void testPersisterStateManagerClose() { KafkaClient client = mock(KafkaClient.class); diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java index bbbac426fe99a..72427ac870559 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java @@ -19,6 +19,8 @@ import org.apache.kafka.common.message.ReadShareGroupStateRequestData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData; +import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData; +import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData; import org.apache.kafka.common.message.WriteShareGroupStateRequestData; import org.apache.kafka.common.message.WriteShareGroupStateResponseData; import org.apache.kafka.common.requests.RequestContext; @@ -76,6 +78,14 @@ public interface ShareCoordinator { */ CompletableFuture readState(RequestContext context, ReadShareGroupStateRequestData request); + /** + * Handle read share state summary call + * @param context - represents the incoming read summary request context + * @param request - actual RPC request object + * @return completable future comprising ReadShareGroupStateSummaryRequestData + */ + CompletableFuture readStateSummary(RequestContext context, ReadShareGroupStateSummaryRequestData request); + /** * Called when new coordinator is elected * @param partitionIndex - The partition index (internal topic) diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java index b3f8b0c4fba9e..a006edd7e6479 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java @@ -24,10 +24,13 @@ import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.message.ReadShareGroupStateRequestData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData; +import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData; +import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData; import org.apache.kafka.common.message.WriteShareGroupStateRequestData; import org.apache.kafka.common.message.WriteShareGroupStateResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ReadShareGroupStateResponse; +import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse; import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.requests.WriteShareGroupStateResponse; import org.apache.kafka.common.utils.LogContext; @@ -577,6 +580,116 @@ public CompletableFuture readState(RequestConte }); } + @Override + public CompletableFuture readStateSummary(RequestContext context, ReadShareGroupStateSummaryRequestData request) { + // Send an empty response if the coordinator is not active. + if (!isActive.get()) { + return CompletableFuture.completedFuture( + generateErrorReadStateSummaryResponse( + request, + Errors.COORDINATOR_NOT_AVAILABLE, + "Share coordinator is not available." + ) + ); + } + + String groupId = request.groupId(); + // Send an empty response if groupId is invalid. + if (isGroupIdEmpty(groupId)) { + log.error("Group id must be specified and non-empty: {}", request); + return CompletableFuture.completedFuture( + new ReadShareGroupStateSummaryResponseData() + ); + } + + // Send an empty response if topic data is empty. + if (isEmpty(request.topics())) { + log.error("Topic Data is empty: {}", request); + return CompletableFuture.completedFuture( + new ReadShareGroupStateSummaryResponseData() + ); + } + + // Send an empty response if partition data is empty for any topic. + for (ReadShareGroupStateSummaryRequestData.ReadStateSummaryData topicData : request.topics()) { + if (isEmpty(topicData.partitions())) { + log.error("Partition Data for topic {} is empty: {}", topicData.topicId(), request); + return CompletableFuture.completedFuture( + new ReadShareGroupStateSummaryResponseData() + ); + } + } + + // A map to store the futures for each topicId and partition. + Map>> futureMap = new HashMap<>(); + + // The request received here could have multiple keys of structure group:topic:partition. However, + // the readStateSummary method in ShareCoordinatorShard expects a single key in the request. Hence, we will + // be looping over the keys below and constructing new ReadShareGroupStateSummaryRequestData objects to pass + // onto the shard method. + + for (ReadShareGroupStateSummaryRequestData.ReadStateSummaryData topicData : request.topics()) { + Uuid topicId = topicData.topicId(); + for (ReadShareGroupStateSummaryRequestData.PartitionData partitionData : topicData.partitions()) { + SharePartitionKey coordinatorKey = SharePartitionKey.getInstance(request.groupId(), topicId, partitionData.partition()); + + ReadShareGroupStateSummaryRequestData requestForCurrentPartition = new ReadShareGroupStateSummaryRequestData() + .setGroupId(groupId) + .setTopics(List.of(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(topicId) + .setPartitions(List.of(partitionData)))); + + CompletableFuture readFuture = runtime.scheduleWriteOperation( + "read-share-group-state-summary", + topicPartitionFor(coordinatorKey), + Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()), + coordinator -> coordinator.readStateSummary(requestForCurrentPartition) + ).exceptionally(readException -> + handleOperationException( + "read-share-group-state-summary", + request, + readException, + (error, message) -> ReadShareGroupStateSummaryResponse.toErrorResponseData( + topicData.topicId(), + partitionData.partition(), + error, + "Unable to read share group state summary: " + readException.getMessage() + ), + log + )); + + futureMap.computeIfAbsent(topicId, k -> new HashMap<>()) + .put(partitionData.partition(), readFuture); + } + } + + // Combine all futures into a single CompletableFuture. + CompletableFuture combinedFuture = CompletableFuture.allOf(futureMap.values().stream() + .flatMap(map -> map.values().stream()).toArray(CompletableFuture[]::new)); + + // Transform the combined CompletableFuture into CompletableFuture. + return combinedFuture.thenApply(v -> { + List readStateSummaryResult = new ArrayList<>(futureMap.size()); + futureMap.forEach( + (topicId, topicEntry) -> { + List partitionResults = new ArrayList<>(topicEntry.size()); + topicEntry.forEach( + (partitionId, responseFuture) -> { + // ResponseFut would already be completed by now since we have used + // CompletableFuture::allOf to create a combined future from the future map. + partitionResults.add( + responseFuture.getNow(null).results().get(0).partitions().get(0) + ); + } + ); + readStateSummaryResult.add(ReadShareGroupStateSummaryResponse.toResponseReadStateSummaryResult(topicId, partitionResults)); + } + ); + return new ReadShareGroupStateSummaryResponseData() + .setResults(readStateSummaryResult); + }); + } + private ReadShareGroupStateResponseData generateErrorReadStateResponse( ReadShareGroupStateRequestData request, Errors error, @@ -594,6 +707,23 @@ private ReadShareGroupStateResponseData generateErrorReadStateResponse( }).collect(Collectors.toList())); } + private ReadShareGroupStateSummaryResponseData generateErrorReadStateSummaryResponse( + ReadShareGroupStateSummaryRequestData request, + Errors error, + String errorMessage + ) { + return new ReadShareGroupStateSummaryResponseData().setResults(request.topics().stream() + .map(topicData -> { + ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult resultData = new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult(); + resultData.setTopicId(topicData.topicId()); + resultData.setPartitions(topicData.partitions().stream() + .map(partitionData -> ReadShareGroupStateSummaryResponse.toErrorResponsePartitionResult( + partitionData.partition(), error, errorMessage + )).collect(Collectors.toList())); + return resultData; + }).collect(Collectors.toList())); + } + private WriteShareGroupStateResponseData generateErrorWriteStateResponse( WriteShareGroupStateRequestData request, Errors error, diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java index 51031faa0a328..bf674c82e1c8a 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java @@ -22,11 +22,14 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.ReadShareGroupStateRequestData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData; +import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData; +import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData; import org.apache.kafka.common.message.WriteShareGroupStateRequestData; import org.apache.kafka.common.message.WriteShareGroupStateResponseData; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ReadShareGroupStateResponse; +import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse; import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.requests.WriteShareGroupStateResponse; import org.apache.kafka.common.utils.LogContext; @@ -321,6 +324,7 @@ public CoordinatorResult wr * It can happen that a read state call for a share partition has a higher leaderEpoch * value than seen so far. * In case an update is not required, empty record list will be generated along with a success response. + * * @param request - represents ReadShareGroupStateRequestData * @return CoordinatorResult object */ @@ -398,9 +402,70 @@ public CoordinatorResult rea return new CoordinatorResult<>(Collections.singletonList(record), responseData); } + /** + * This method finds the ShareSnapshotValue record corresponding to the requested topic partition from the + * in-memory state of coordinator shard, the shareStateMap. + *

+ * This method as called by the ShareCoordinatorService will be provided with + * the request data which covers only key i.e. group1:topic1:partition1. The implementation + * below was done keeping this in mind. + * + * @param request - ReadShareGroupStateSummaryRequestData for a single key + * @return CoordinatorResult(records, response) + */ + + public CoordinatorResult readStateSummary( + ReadShareGroupStateSummaryRequestData request + ) { + // Only one key will be there in the request by design. + Optional error = maybeGetReadStateSummaryError(request); + if (error.isPresent()) { + return new CoordinatorResult<>(List.of(), error.get()); + } + + ReadShareGroupStateSummaryRequestData.ReadStateSummaryData topicData = request.topics().get(0); + ReadShareGroupStateSummaryRequestData.PartitionData partitionData = topicData.partitions().get(0); + + Uuid topicId = topicData.topicId(); + int partitionId = partitionData.partition(); + SharePartitionKey key = SharePartitionKey.getInstance(request.groupId(), topicId, partitionId); + + ReadShareGroupStateSummaryResponseData responseData = null; + + if (!shareStateMap.containsKey(key)) { + responseData = ReadShareGroupStateSummaryResponse.toResponseData( + topicId, + partitionId, + PartitionFactory.UNINITIALIZED_START_OFFSET, + PartitionFactory.DEFAULT_STATE_EPOCH + ); + } else { + ShareGroupOffset offsetValue = shareStateMap.get(key); + if (offsetValue == null) { + log.error("Data not found for topic {}, partition {} for group {}, in the in-memory state of share coordinator", topicId, partitionId, request.groupId()); + responseData = ReadShareGroupStateSummaryResponse.toErrorResponseData( + topicId, + partitionId, + Errors.UNKNOWN_SERVER_ERROR, + "Data not found for the topics " + topicId + ", partition " + partitionId + " for group " + request.groupId() + ", in the in-memory state of share coordinator" + ); + } else { + responseData = ReadShareGroupStateSummaryResponse.toResponseData( + topicId, + partitionId, + offsetValue.startOffset(), + offsetValue.stateEpoch() + ); + } + } + + return new CoordinatorResult<>(Collections.emptyList(), responseData); + } + /** * Method which returns the last known redundant offset from the partition * led by this shard. + * * @return CoordinatorResult containing empty record list and an Optional representing the offset. */ public CoordinatorResult, CoordinatorRecord> lastRedundantOffset() { @@ -418,7 +483,7 @@ public CoordinatorResult, CoordinatorRecord> lastRedundantOffset( * else create a new ShareUpdate record * * @param partitionData - Represents the data which should be written into the share state record. - * @param key - The {@link SharePartitionKey} object. + * @param key - The {@link SharePartitionKey} object. * @return {@link CoordinatorRecord} representing ShareSnapshot or ShareUpdate */ private CoordinatorRecord generateShareStateRecord( @@ -572,6 +637,39 @@ private Optional maybeGetReadStateError(ReadSha return Optional.empty(); } + private Optional maybeGetReadStateSummaryError(ReadShareGroupStateSummaryRequestData request) { + ReadShareGroupStateSummaryRequestData.ReadStateSummaryData topicData = request.topics().get(0); + ReadShareGroupStateSummaryRequestData.PartitionData partitionData = topicData.partitions().get(0); + + Uuid topicId = topicData.topicId(); + int partitionId = partitionData.partition(); + + if (topicId == null) { + log.error("Request topic id is null."); + return Optional.of(ReadShareGroupStateSummaryResponse.toErrorResponseData( + null, partitionId, Errors.INVALID_REQUEST, NULL_TOPIC_ID.getMessage())); + } + + if (partitionId < 0) { + log.error("Request partition id is negative."); + return Optional.of(ReadShareGroupStateSummaryResponse.toErrorResponseData( + topicId, partitionId, Errors.INVALID_REQUEST, NEGATIVE_PARTITION_ID.getMessage())); + } + + if (metadataImage == null) { + log.error("Metadata image is null"); + return Optional.of(ReadShareGroupStateSummaryResponse.toErrorResponseData(topicId, partitionId, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_OR_PARTITION.message())); + } + + if (metadataImage.topics().getTopic(topicId) == null || + metadataImage.topics().getPartition(topicId, partitionId) == null) { + log.error("Topic/TopicPartition not found in metadata image."); + return Optional.of(ReadShareGroupStateSummaryResponse.toErrorResponseData(topicId, partitionId, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_OR_PARTITION.message())); + } + + return Optional.empty(); + } + private CoordinatorResult getWriteErrorResponse( Errors error, Exception exception, diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java index b3689a457144c..e7ace5fd2b923 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java @@ -25,6 +25,8 @@ import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.message.ReadShareGroupStateRequestData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData; +import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData; +import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData; import org.apache.kafka.common.message.WriteShareGroupStateRequestData; import org.apache.kafka.common.message.WriteShareGroupStateResponseData; import org.apache.kafka.common.metrics.Metrics; @@ -74,7 +76,7 @@ class ShareCoordinatorServiceTest { @SuppressWarnings("unchecked") private CoordinatorRuntime mockRuntime() { - CoordinatorRuntime runtime = mock(CoordinatorRuntime.class); + CoordinatorRuntime runtime = mock(CoordinatorRuntime.class); when(runtime.activeTopicPartitions()) .thenReturn(Collections.singletonList(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0))); return runtime; @@ -290,7 +292,7 @@ public void testReadStateSuccess() throws ExecutionException, InterruptedExcepti .setDeliveryState((byte) 0) ))) ); - + when(runtime.scheduleWriteOperation( eq("read-update-leader-epoch-state"), eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)), @@ -315,6 +317,90 @@ public void testReadStateSuccess() throws ExecutionException, InterruptedExcepti assertEquals(expectedResult, result); } + @Test + public void testReadStateSummarySuccess() throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime runtime = mockRuntime(); + ShareCoordinatorService service = new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()), + runtime, + new ShareCoordinatorMetrics(), + Time.SYSTEM, + mock(Timer.class), + mock(PartitionWriter.class) + ); + + service.startup(() -> 1); + + String groupId = "group1"; + Uuid topicId1 = Uuid.randomUuid(); + int partition1 = 0; + + Uuid topicId2 = Uuid.randomUuid(); + int partition2 = 1; + + ReadShareGroupStateSummaryRequestData request = new ReadShareGroupStateSummaryRequestData() + .setGroupId(groupId) + .setTopics(Arrays.asList( + new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(topicId1) + .setPartitions(Collections.singletonList( + new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartition(partition1) + .setLeaderEpoch(1) + )), + new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(topicId2) + .setPartitions(Collections.singletonList( + new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartition(partition2) + .setLeaderEpoch(1) + )) + ) + ); + + ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult topicData1 = new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId1) + .setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partition1) + .setErrorCode(Errors.NONE.code()) + .setStateEpoch(1) + .setStartOffset(0) + )); + + ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult topicData2 = new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId2) + .setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partition2) + .setErrorCode(Errors.NONE.code()) + .setStateEpoch(1) + .setStartOffset(0) + )); + + when(runtime.scheduleWriteOperation( + eq("read-share-group-state-summary"), + eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)), + any(), + any() + )) + .thenReturn(CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResponseData() + .setResults(Collections.singletonList(topicData1)))) + .thenReturn(CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResponseData() + .setResults(Collections.singletonList(topicData2)))); + + CompletableFuture future = service.readStateSummary( + requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY), + request + ); + + HashSet result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results()); + + HashSet expectedResult = new HashSet<>(Arrays.asList( + topicData1, + topicData2)); + assertEquals(expectedResult, result); + } + @Test public void testWriteStateValidationsError() throws ExecutionException, InterruptedException, TimeoutException { CoordinatorRuntime runtime = mockRuntime(); @@ -409,6 +495,53 @@ public void testReadStateValidationsError() throws ExecutionException, Interrupt ); } + @Test + public void testReadStateSummaryValidationsError() throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime runtime = mockRuntime(); + ShareCoordinatorService service = new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()), + runtime, + new ShareCoordinatorMetrics(), + Time.SYSTEM, + mock(Timer.class), + mock(PartitionWriter.class) + ); + + service.startup(() -> 1); + + String groupId = "group1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 0; + + // 1. Empty topicsData + assertEquals(new ReadShareGroupStateSummaryResponseData(), + service.readStateSummary( + requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY), + new ReadShareGroupStateSummaryRequestData().setGroupId(groupId) + ).get(5, TimeUnit.SECONDS) + ); + + // 2. Empty partitionsData + assertEquals(new ReadShareGroupStateSummaryResponseData(), + service.readStateSummary( + requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY), + new ReadShareGroupStateSummaryRequestData().setGroupId(groupId).setTopics(Collections.singletonList( + new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData().setTopicId(topicId))) + ).get(5, TimeUnit.SECONDS) + ); + + // 3. Invalid groupId + assertEquals(new ReadShareGroupStateSummaryResponseData(), + service.readStateSummary( + requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY), + new ReadShareGroupStateSummaryRequestData().setGroupId(null).setTopics(Collections.singletonList( + new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData().setTopicId(topicId).setPartitions(Collections.singletonList( + new ReadShareGroupStateSummaryRequestData.PartitionData().setPartition(partition))))) + ).get(5, TimeUnit.SECONDS) + ); + } + @Test public void testWriteStateWhenNotStarted() throws ExecutionException, InterruptedException, TimeoutException { CoordinatorRuntime runtime = mockRuntime(); @@ -551,6 +684,69 @@ public void testReadStateWhenNotStarted() throws ExecutionException, Interrupted assertEquals(expectedResult, result); } + @Test + public void testReadStateSummaryWhenNotStarted() throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime runtime = mockRuntime(); + ShareCoordinatorService service = new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()), + runtime, + new ShareCoordinatorMetrics(), + Time.SYSTEM, + mock(Timer.class), + mock(PartitionWriter.class) + ); + + String groupId = "group1"; + Uuid topicId1 = Uuid.randomUuid(); + int partition1 = 0; + + Uuid topicId2 = Uuid.randomUuid(); + int partition2 = 1; + + ReadShareGroupStateSummaryRequestData request = new ReadShareGroupStateSummaryRequestData() + .setGroupId(groupId) + .setTopics(Arrays.asList( + new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(topicId1) + .setPartitions(Collections.singletonList( + new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartition(partition1) + .setLeaderEpoch(1) + )), + new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(topicId2) + .setPartitions(Collections.singletonList( + new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartition(partition2) + .setLeaderEpoch(1) + )) + ) + ); + + CompletableFuture future = service.readStateSummary( + requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY), + request + ); + + HashSet result = new HashSet<>(future.get(5, TimeUnit.SECONDS).results()); + + HashSet expectedResult = new HashSet<>(Arrays.asList( + new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId2) + .setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partition2) + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + .setErrorMessage("Share coordinator is not available."))), + new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId1) + .setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partition1) + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + .setErrorMessage("Share coordinator is not available."))))); + assertEquals(expectedResult, result); + } + @Test public void testWriteFutureReturnsError() throws ExecutionException, InterruptedException, TimeoutException { CoordinatorRuntime runtime = mockRuntime(); @@ -644,6 +840,49 @@ public void testReadFutureReturnsError() throws ExecutionException, InterruptedE ); } + @Test + public void testReadSummaryFutureReturnsError() throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime runtime = mockRuntime(); + ShareCoordinatorService service = new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.createConfig(ShareCoordinatorTestConfig.testConfigMap()), + runtime, + new ShareCoordinatorMetrics(), + Time.SYSTEM, + mock(Timer.class), + mock(PartitionWriter.class) + ); + + service.startup(() -> 1); + + String groupId = "group1"; + Uuid topicId = Uuid.randomUuid(); + int partition = 0; + + when(runtime.scheduleWriteOperation(any(), any(), any(), any())) + .thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception())); + + assertEquals(new ReadShareGroupStateSummaryResponseData() + .setResults(Collections.singletonList(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId) + .setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()) + .setErrorMessage("Unable to read share group state summary: The server experienced an unexpected error when processing the request."))))), + service.readStateSummary( + requestContext(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY), + new ReadShareGroupStateSummaryRequestData().setGroupId(groupId) + .setTopics(Collections.singletonList(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(topicId) + .setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartition(partition) + .setLeaderEpoch(1) + )) + )) + ).get(5, TimeUnit.SECONDS) + ); + } + @Test public void testTopicPartitionFor() { CoordinatorRuntime runtime = mockRuntime(); diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java index d84347700f18a..4dca0a25bc37f 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java @@ -20,11 +20,14 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.ReadShareGroupStateRequestData; import org.apache.kafka.common.message.ReadShareGroupStateResponseData; +import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData; +import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData; import org.apache.kafka.common.message.WriteShareGroupStateRequestData; import org.apache.kafka.common.message.WriteShareGroupStateResponseData; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ReadShareGroupStateResponse; +import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse; import org.apache.kafka.common.requests.WriteShareGroupStateResponse; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.coordinator.common.runtime.CoordinatorMetrics; @@ -565,6 +568,34 @@ public void testReadStateSuccess() { assertEquals(0, shard.getLeaderMapValue(coordinatorKey)); } + @Test + public void testReadStateSummarySuccess() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + + SharePartitionKey coordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION); + + writeAndReplayDefaultRecord(shard); + + ReadShareGroupStateSummaryRequestData request = new ReadShareGroupStateSummaryRequestData() + .setGroupId(GROUP_ID) + .setTopics(Collections.singletonList(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(TOPIC_ID) + .setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartition(PARTITION) + .setLeaderEpoch(1))))); + + CoordinatorResult result = shard.readStateSummary(request); + + assertEquals(ReadShareGroupStateSummaryResponse.toResponseData( + TOPIC_ID, + PARTITION, + 0, + 0 + ), result.response()); + + assertEquals(0, shard.getLeaderMapValue(coordinatorKey)); + } + @Test public void testReadStateInvalidRequestData() { ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); @@ -594,6 +625,35 @@ public void testReadStateInvalidRequestData() { assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey)); } + @Test + public void testReadStateSummaryInvalidRequestData() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + + int partition = -1; + + writeAndReplayDefaultRecord(shard); + + SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION); + + ReadShareGroupStateSummaryRequestData request = new ReadShareGroupStateSummaryRequestData() + .setGroupId(GROUP_ID) + .setTopics(Collections.singletonList(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(TOPIC_ID) + .setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartition(partition) + .setLeaderEpoch(5))))); + + CoordinatorResult result = shard.readStateSummary(request); + + ReadShareGroupStateSummaryResponseData expectedData = ReadShareGroupStateSummaryResponse.toErrorResponseData( + TOPIC_ID, partition, Errors.INVALID_REQUEST, ShareCoordinatorShard.NEGATIVE_PARTITION_ID.getMessage()); + + assertEquals(expectedData, result.response()); + + // Leader epoch should not be changed because the request failed. + assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey)); + } + @Test public void testReadNullMetadataImage() { ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java index b3e888c240bd2..a08b604cd5f58 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java @@ -107,6 +107,9 @@ import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.ListPartitionReassignmentsOptions; import org.apache.kafka.clients.admin.ListPartitionReassignmentsResult; +import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions; +import org.apache.kafka.clients.admin.ListShareGroupOffsetsResult; +import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec; import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.ListTransactionsOptions; @@ -413,6 +416,11 @@ public DescribeShareGroupsResult describeShareGroups(final Collection gr return adminDelegate.describeShareGroups(groupIds, options); } + @Override + public ListShareGroupOffsetsResult listShareGroupOffsets(final Map groupSpecs, final ListShareGroupOffsetsOptions options) { + return adminDelegate.listShareGroupOffsets(groupSpecs, options); + } + @Override public DescribeClassicGroupsResult describeClassicGroups(final Collection groupIds, final DescribeClassicGroupsOptions options) { return adminDelegate.describeClassicGroups(groupIds, options);