Skip to content

Commit

Permalink
[KAFKA-16720] AdminClient Support for ListShareGroupOffsets (1/n) (ap…
Browse files Browse the repository at this point in the history
…ache#18571)

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>
  • Loading branch information
sjhajharia authored Jan 20, 2025
1 parent 9649902 commit bcbc72e
Show file tree
Hide file tree
Showing 27 changed files with 2,422 additions and 46 deletions.
4 changes: 3 additions & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,11 @@
<suppress checks="NPathComplexity"
files="CoordinatorRuntime.java"/>

<!-- share coordinator -->
<!-- share coordinator and persister-->
<suppress checks="NPathComplexity"
files="ShareCoordinatorShard.java"/>
<suppress checks="ClassDataAbstractionCouplingCheck"
files="(ShareCoordinatorServiceTest|DefaultStatePersisterTest|PersisterStateManagerTest).java"/>
<suppress checks="CyclomaticComplexity"
files="ShareCoordinatorShard.java"/>

Expand Down
22 changes: 22 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -1793,6 +1793,28 @@ default DescribeShareGroupsResult describeShareGroups(Collection<String> groupId
return describeShareGroups(groupIds, new DescribeShareGroupsOptions());
}

/**
* List the share group offsets available in the cluster for the specified share groups.
*
* @param groupSpecs Map of share group ids to a spec that specifies the topic partitions of the group to list offsets for.
* @param options The options to use when listing the share group offsets.
* @return The ListShareGroupOffsetsResult
*/
ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options);

/**
* List the share group offsets available in the cluster for the specified share groups with the default options.
*
* <p>This is a convenience method for {@link #listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}
* to list offsets of all partitions for the specified share groups with default options.
*
* @param groupSpecs Map of share group ids to a spec that specifies the topic partitions of the group to list offsets for.
* @return The ListShareGroupOffsetsResult
*/
default ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs) {
return listShareGroupOffsets(groupSpecs, new ListShareGroupOffsetsOptions());
}

/**
* Describe some classic groups in the cluster.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@ public DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds
return delegate.describeShareGroups(groupIds, options);
}

@Override
public ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options) {
return delegate.listShareGroupOffsets(groupSpecs, options);
}

@Override
public ListGroupsResult listGroups(ListGroupsOptions options) {
return delegate.listGroups(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3796,6 +3796,14 @@ public DescribeShareGroupsResult describeShareGroups(final Collection<String> gr
.collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
}

// To do in a follow-up PR
@Override
public ListShareGroupOffsetsResult listShareGroupOffsets(final Map<String, ListShareGroupOffsetsSpec> groupSpecs,
final ListShareGroupOffsetsOptions options) {
// To-do
throw new InvalidRequestException("The method is not yet implemented");
}

@Override
public DescribeClassicGroupsResult describeClassicGroups(final Collection<String> groupIds,
final DescribeClassicGroupsOptions options) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Map;

/**
* Options for {@link Admin#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ListShareGroupOffsetsOptions extends AbstractOptions<ListShareGroupOffsetsOptions> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

/**
* The result of the {@link Admin#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ListShareGroupOffsetsResult {

private final Map<String, KafkaFuture<Map<TopicPartition, Long>>> futures;

ListShareGroupOffsetsResult(final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, Long>>> futures) {
this.futures = futures.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().idValue, Map.Entry::getValue));
}

/**
* Return the future when the requests for all groups succeed.
*
* @return - Future which yields all Map<String, Map<TopicPartition, Long> objects, if requests for all the groups succeed.
*/
public KafkaFuture<Map<String, Map<TopicPartition, Long>>> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply(
nil -> {
Map<String, Map<TopicPartition, Long>> offsets = new HashMap<>(futures.size());
futures.forEach((groupId, future) -> {
try {
offsets.put(groupId, future.get());
} catch (InterruptedException | ExecutionException e) {
// This should be unreachable, since the KafkaFuture#allOf already ensured
// that all the futures completed successfully.
throw new RuntimeException(e);
}
});
return offsets;
});
}

/**
* @param groupId - The groupId for which the Map<TopicPartition, Long> is needed
* @return - Future which yields a map of topic partitions to offsets for the specified group.
*/
public KafkaFuture<Map<TopicPartition, Long>> partitionsToOffset(String groupId) {
if (!futures.containsKey(groupId)) {
throw new IllegalArgumentException("Group ID not found: " + groupId);
}
return futures.get(groupId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* Specification of share group offsets to list using {@link Admin#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ListShareGroupOffsetsSpec {

private Collection<TopicPartition> topicPartitions;

/**
* Set the topic partitions whose offsets are to be listed for a share group.
*/
public ListShareGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions) {
this.topicPartitions = topicPartitions;
return this;
}

/**
* Returns the topic partitions whose offsets are to be listed for a share group.
*/
public Collection<TopicPartition> topicPartitions() {
return topicPartitions == null ? List.of() : topicPartitions;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ListShareGroupOffsetsSpec)) {
return false;
}
ListShareGroupOffsetsSpec that = (ListShareGroupOffsetsSpec) o;
return Objects.equals(topicPartitions, that.topicPartitions);
}

@Override
public int hashCode() {
return Objects.hash(topicPartitions);
}

@Override
public String toString() {
return "ListShareGroupOffsetsSpec(" +
"topicPartitions=" + (topicPartitions != null ? topicPartitions : "null") +
')';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,16 @@ public ReadShareGroupStateSummaryRequest(ReadShareGroupStateSummaryRequestData d
public ReadShareGroupStateSummaryResponse getErrorResponse(int throttleTimeMs, Throwable e) {
List<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> results = new ArrayList<>();
data.topics().forEach(
topicResult -> results.add(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setTopicId(topicResult.topicId())
.setPartitions(topicResult.partitions().stream()
.map(partitionData -> new ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partitionData.partition())
.setErrorCode(Errors.forException(e).code())
.setErrorMessage(Errors.forException(e).message()))
.collect(Collectors.toList()))));
topicResult -> results.add(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setTopicId(topicResult.topicId())
.setPartitions(topicResult.partitions().stream()
.map(partitionData -> new ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partitionData.partition())
.setErrorCode(Errors.forException(e).code())
.setErrorMessage(Errors.forException(e).message()))
.collect(Collectors.toList()))));
return new ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData()
.setResults(results));
.setResults(results));
}

@Override
Expand All @@ -83,8 +83,8 @@ public ReadShareGroupStateSummaryRequestData data() {

public static ReadShareGroupStateSummaryRequest parse(ByteBuffer buffer, short version) {
return new ReadShareGroupStateSummaryRequest(
new ReadShareGroupStateSummaryRequestData(new ByteBufferAccessor(buffer), version),
version
new ReadShareGroupStateSummaryRequestData(new ByteBufferAccessor(buffer), version),
version
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.kafka.common.requests;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ReadShareGroupStateSummaryResponse extends AbstractResponse {
Expand All @@ -43,9 +45,9 @@ public ReadShareGroupStateSummaryResponseData data() {
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> counts = new HashMap<>();
data.results().forEach(
result -> result.partitions().forEach(
partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode()))
)
result -> result.partitions().forEach(
partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode()))
)
);
return counts;
}
Expand All @@ -59,9 +61,64 @@ public int throttleTimeMs() {
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
// No op
}

public static ReadShareGroupStateSummaryResponse parse(ByteBuffer buffer, short version) {
return new ReadShareGroupStateSummaryResponse(
new ReadShareGroupStateSummaryResponseData(new ByteBufferAccessor(buffer), version)
new ReadShareGroupStateSummaryResponseData(new ByteBufferAccessor(buffer), version)
);
}

public static ReadShareGroupStateSummaryResponseData toErrorResponseData(
Uuid topicId,
int partitionId,
Errors error,
String errorMessage
) {
return new ReadShareGroupStateSummaryResponseData().setResults(
List.of(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setTopicId(topicId)
.setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partitionId)
.setErrorCode(error.code())
.setErrorMessage(errorMessage)))));
}

public static ReadShareGroupStateSummaryResponseData.PartitionResult toErrorResponsePartitionResult(
int partitionId,
Errors error,
String errorMessage
) {
return new ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partitionId)
.setErrorCode(error.code())
.setErrorMessage(errorMessage);
}

public static ReadShareGroupStateSummaryResponseData toResponseData(
Uuid topicId,
int partition,
long startOffset,
int stateEpoch
) {
return new ReadShareGroupStateSummaryResponseData()
.setResults(List.of(
new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setTopicId(topicId)
.setPartitions(List.of(
new ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partition)
.setStartOffset(startOffset)
.setStateEpoch(stateEpoch)
))
));
}

public static ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult toResponseReadStateSummaryResult(
Uuid topicId,
List<ReadShareGroupStateSummaryResponseData.PartitionResult> partitionResults
) {
return new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setTopicId(topicId)
.setPartitions(partitionResults);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,11 @@ public synchronized DescribeShareGroupsResult describeShareGroups(Collection<Str
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public synchronized ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public synchronized DescribeClassicGroupsResult describeClassicGroups(Collection<String> groupIds, DescribeClassicGroupsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
Expand Down
Loading

0 comments on commit bcbc72e

Please sign in to comment.