Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 35 additions & 9 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@

package org.apache.kafka.clients.admin;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.KafkaFuture;
Expand All @@ -42,6 +34,14 @@
import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.requests.LeaveGroupResponse;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;

/**
* The administrative client for Kafka, which supports managing and inspecting topics, brokers, configurations and ACLs.
* <p>
Expand Down Expand Up @@ -303,7 +303,33 @@ default DescribeTopicsResult describeTopics(Collection<String> topicNames) {
* @param options The options to use when describing the topic.
* @return The DescribeTopicsResult.
*/
DescribeTopicsResult describeTopics(Collection<String> topicNames, DescribeTopicsOptions options);
default DescribeTopicsResult describeTopics(Collection<String> topicNames, DescribeTopicsOptions options) {
return describeTopics(TopicCollection.ofTopicNames(topicNames), options);
}

/**
* This is a convenience method for {@link #describeTopics(TopicCollection, DescribeTopicsOptions)}
* with default options. See the overload for more details.
* <p>
* When using topic IDs, this operation is supported by brokers with version 3.1.0 or higher.
*
* @param topics The topics to describe.
* @return The DescribeTopicsResult.
*/
default DescribeTopicsResult describeTopics(TopicCollection topics) {
return describeTopics(topics, new DescribeTopicsOptions());
}

/**
* Describe some topics in the cluster.
*
* When using topic IDs, this operation is supported by brokers with version 3.1.0 or higher.
*
* @param topics The topics to describe.
* @param options The options to use when describing the topics.
* @return The DescribeTopicsResult.
*/
DescribeTopicsResult describeTopics(TopicCollection topics, DescribeTopicsOptions options);

/**
* Get information about the nodes in the cluster, using the default options.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ public Map<String, KafkaFuture<Void>> topicNameValues() {
return nameFutures;
}

@Deprecated
/**
* @return a map from topic names to futures which can be used to check the status of
* individual deletions if the deleteTopics request used topic names. Otherwise return null.
* @deprecated Since 3.0 use {@link #topicNameValues} instead
*/
@Deprecated
public Map<String, KafkaFuture<Void>> values() {
return nameFutures;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicCollection;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Collection;
Expand All @@ -32,28 +34,105 @@
*/
@InterfaceStability.Evolving
public class DescribeTopicsResult {
private final Map<String, KafkaFuture<TopicDescription>> futures;
private final Map<Uuid, KafkaFuture<TopicDescription>> topicIdFutures;
private final Map<String, KafkaFuture<TopicDescription>> nameFutures;

@Deprecated
protected DescribeTopicsResult(Map<String, KafkaFuture<TopicDescription>> futures) {
this.futures = futures;
this(null, futures);
}

// VisibleForTesting
protected DescribeTopicsResult(Map<Uuid, KafkaFuture<TopicDescription>> topicIdFutures, Map<String, KafkaFuture<TopicDescription>> nameFutures) {
if (topicIdFutures != null && nameFutures != null)
throw new IllegalArgumentException("topicIdFutures and nameFutures cannot both be specified.");
if (topicIdFutures == null && nameFutures == null)
throw new IllegalArgumentException("topicIdFutures and nameFutures cannot both be null.");
this.topicIdFutures = topicIdFutures;
this.nameFutures = nameFutures;
}

static DescribeTopicsResult ofTopicIds(Map<Uuid, KafkaFuture<TopicDescription>> topicIdFutures) {
return new DescribeTopicsResult(topicIdFutures, null);
}

static DescribeTopicsResult ofTopicNames(Map<String, KafkaFuture<TopicDescription>> nameFutures) {
return new DescribeTopicsResult(null, nameFutures);
}

/**
* Use when {@link Admin#describeTopics(TopicCollection, DescribeTopicsOptions)} used a TopicIdCollection
*
* @return a map from topic IDs to futures which can be used to check the status of
* individual topics if the request used topic IDs, otherwise return null.
*/
public Map<Uuid, KafkaFuture<TopicDescription>> topicIdValues() {
return topicIdFutures;
}

/**
* Use when {@link Admin#describeTopics(TopicCollection, DescribeTopicsOptions)} used a TopicNameCollection
*
* @return a map from topic names to futures which can be used to check the status of
* individual topics if the request used topic names, otherwise return null.
*/
public Map<String, KafkaFuture<TopicDescription>> topicNameValues() {
return nameFutures;
}

/**
* Return a map from topic names to futures which can be used to check the status of
* individual topics.
* @return a map from topic names to futures which can be used to check the status of
* individual topics if the request used topic names, otherwise return null.
*
* @deprecated Since 3.1.0 use {@link #topicNameValues} instead
*/
@Deprecated
public Map<String, KafkaFuture<TopicDescription>> values() {
return futures;
return nameFutures;
}

/**
* Return a future which succeeds only if all the topic descriptions succeed.
* @return A future map from topic names to descriptions which can be used to check
* the status of individual description if the describe topic request used
* topic names, otherwise return null, this request succeeds only if all the
* topic descriptions succeed
*
* @deprecated Since 3.1.0 use {@link #allTopicNames()} instead
*/
@Deprecated
public KafkaFuture<Map<String, TopicDescription>> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
return all(nameFutures);
}

/**
* @return A future map from topic names to descriptions which can be used to check
* the status of individual description if the describe topic request used
* topic names, otherwise return null, this request succeeds only if all the
* topic descriptions succeed
*/
public KafkaFuture<Map<String, TopicDescription>> allTopicNames() {
return all(nameFutures);
}

/**
* @return A future map from topic ids to descriptions which can be used to check the
* status of individual description if the describe topic request used topic
* ids, otherwise return null, this request succeeds only if all the topic
* descriptions succeed
*/
public KafkaFuture<Map<Uuid, TopicDescription>> allTopicIds() {
return all(topicIdFutures);
}

/**
* Return a future which succeeds only if all the topic descriptions succeed.
*/
private static <T> KafkaFuture<Map<T, TopicDescription>> all(Map<T, KafkaFuture<TopicDescription>> futures) {
KafkaFuture<Void> future = KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
return future.
thenApply(v -> {
Map<String, TopicDescription> descriptions = new HashMap<>(futures.size());
for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : futures.entrySet()) {
Map<T, TopicDescription> descriptions = new HashMap<>(futures.size());
for (Map.Entry<T, KafkaFuture<TopicDescription>> entry : futures.entrySet()) {
try {
descriptions.put(entry.getKey(), entry.getValue().get());
} catch (InterruptedException | ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@
import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
import static org.apache.kafka.common.requests.MetadataRequest.convertToMetadataRequestTopic;
import static org.apache.kafka.common.requests.MetadataRequest.convertTopicIdsToMetadataRequestTopic;
import static org.apache.kafka.common.utils.Utils.closeQuietly;

/**
Expand Down Expand Up @@ -1497,6 +1498,10 @@ private static boolean topicNameIsUnrepresentable(String topicName) {
return topicName == null || topicName.isEmpty();
}

private static boolean topicIdIsUnrepresentable(Uuid topicId) {
return topicId == null || topicId == Uuid.ZERO_UUID;
}

// for testing
int numPendingCalls() {
return runnable.pendingCalls.size();
Expand Down Expand Up @@ -1884,7 +1889,7 @@ void handleResponse(AbstractResponse abstractResponse) {
String topicName = topicMetadata.topic();
boolean isInternal = topicMetadata.isInternal();
if (!topicMetadata.isInternal() || options.shouldListInternal())
topicListing.put(topicName, new TopicListing(topicName, isInternal));
topicListing.put(topicName, new TopicListing(topicName, topicMetadata.topicId(), isInternal));
}
topicListingFuture.complete(topicListing);
}
Expand All @@ -1898,7 +1903,16 @@ void handleFailure(Throwable throwable) {
}

@Override
public DescribeTopicsResult describeTopics(final Collection<String> topicNames, DescribeTopicsOptions options) {
public DescribeTopicsResult describeTopics(final TopicCollection topics, DescribeTopicsOptions options) {
if (topics instanceof TopicIdCollection)
return DescribeTopicsResult.ofTopicIds(handleDescribeTopicsByIds(((TopicIdCollection) topics).topicIds(), options));
else if (topics instanceof TopicNameCollection)
return DescribeTopicsResult.ofTopicNames(handleDescribeTopicsByNames(((TopicNameCollection) topics).topicNames(), options));
else
throw new IllegalArgumentException("The TopicCollection: " + topics + " provided did not match any supported classes for describeTopics.");
}

private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNames(final Collection<String> topicNames, DescribeTopicsOptions options) {
final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size());
final ArrayList<String> topicNamesList = new ArrayList<>();
for (String topicName : topicNames) {
Expand Down Expand Up @@ -1947,28 +1961,13 @@ void handleResponse(AbstractResponse abstractResponse) {
future.completeExceptionally(new UnknownTopicOrPartitionException("Topic " + topicName + " not found."));
continue;
}
boolean isInternal = cluster.internalTopics().contains(topicName);
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topicName);
List<TopicPartitionInfo> partitions = new ArrayList<>(partitionInfos.size());
for (PartitionInfo partitionInfo : partitionInfos) {
TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(
partitionInfo.partition(), leader(partitionInfo), Arrays.asList(partitionInfo.replicas()),
Arrays.asList(partitionInfo.inSyncReplicas()));
partitions.add(topicPartitionInfo);
}
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
TopicDescription topicDescription = new TopicDescription(topicName, isInternal, partitions,
validAclOperations(response.topicAuthorizedOperations(topicName).get()), cluster.topicId(topicName));
Uuid topicId = cluster.topicId(topicName);
Integer authorizedOperations = response.topicAuthorizedOperations(topicName).get();
TopicDescription topicDescription = getTopicDescriptionFromCluster(cluster, topicName, topicId, authorizedOperations);
future.complete(topicDescription);
}
}

private Node leader(PartitionInfo partitionInfo) {
if (partitionInfo.leader() == null || partitionInfo.leader().id() == Node.noNode().id())
return null;
return partitionInfo.leader();
}

@Override
boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
if (supportsDisablingTopicCreation) {
Expand All @@ -1986,7 +1985,93 @@ void handleFailure(Throwable throwable) {
if (!topicNamesList.isEmpty()) {
runnable.call(call, now);
}
return new DescribeTopicsResult(new HashMap<>(topicFutures));
return new HashMap<>(topicFutures);
}

private Map<Uuid, KafkaFuture<TopicDescription>> handleDescribeTopicsByIds(Collection<Uuid> topicIds, DescribeTopicsOptions options) {

final Map<Uuid, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicIds.size());
final List<Uuid> topicIdsList = new ArrayList<>();
for (Uuid topicId : topicIds) {
if (topicIdIsUnrepresentable(topicId)) {
KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>();
future.completeExceptionally(new InvalidTopicException("The given topic id '" +
topicId + "' cannot be represented in a request."));
topicFutures.put(topicId, future);
} else if (!topicFutures.containsKey(topicId)) {
topicFutures.put(topicId, new KafkaFutureImpl<>());
topicIdsList.add(topicId);
}
}
final long now = time.milliseconds();
Call call = new Call("describeTopicsWithIds", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedNodeProvider()) {

@Override
MetadataRequest.Builder createRequest(int timeoutMs) {
return new MetadataRequest.Builder(new MetadataRequestData()
.setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList))
.setAllowAutoTopicCreation(false)
.setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations()));
}

@Override
void handleResponse(AbstractResponse abstractResponse) {
MetadataResponse response = (MetadataResponse) abstractResponse;
// Handle server responses for particular topics.
Cluster cluster = response.buildCluster();
Map<Uuid, Errors> errors = response.errorsByTopicId();
for (Map.Entry<Uuid, KafkaFutureImpl<TopicDescription>> entry : topicFutures.entrySet()) {
Uuid topicId = entry.getKey();
KafkaFutureImpl<TopicDescription> future = entry.getValue();

String topicName = cluster.topicName(topicId);
if (topicName == null) {
future.completeExceptionally(new InvalidTopicException("TopicId " + topicId + " not found."));
continue;
}
Errors topicError = errors.get(topicId);
if (topicError != null) {
future.completeExceptionally(topicError.exception());
continue;
}

Integer authorizedOperations = response.topicAuthorizedOperations(topicName).get();
TopicDescription topicDescription = getTopicDescriptionFromCluster(cluster, topicName, topicId, authorizedOperations);
future.complete(topicDescription);
}
}

@Override
void handleFailure(Throwable throwable) {
completeAllExceptionally(topicFutures.values(), throwable);
}
};
if (!topicIdsList.isEmpty()) {
runnable.call(call, now);
}
return new HashMap<>(topicFutures);
}

private TopicDescription getTopicDescriptionFromCluster(Cluster cluster, String topicName, Uuid topicId,
Integer authorizedOperations) {
boolean isInternal = cluster.internalTopics().contains(topicName);
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topicName);
List<TopicPartitionInfo> partitions = new ArrayList<>(partitionInfos.size());
for (PartitionInfo partitionInfo : partitionInfos) {
TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(
partitionInfo.partition(), leader(partitionInfo), Arrays.asList(partitionInfo.replicas()),
Arrays.asList(partitionInfo.inSyncReplicas()));
partitions.add(topicPartitionInfo);
}
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
return new TopicDescription(topicName, isInternal, partitions, validAclOperations(authorizedOperations), topicId);
}

private Node leader(PartitionInfo partitionInfo) {
if (partitionInfo.leader() == null || partitionInfo.leader().id() == Node.noNode().id())
return null;
return partitionInfo.leader();
}

@Override
Expand Down
Loading