diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index 54d103b22551d..4b6fe4967caa9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -17,14 +17,6 @@ package org.apache.kafka.clients.admin; -import java.time.Duration; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; -import java.util.Set; - import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.ElectionType; import org.apache.kafka.common.KafkaFuture; @@ -42,6 +34,14 @@ import org.apache.kafka.common.quota.ClientQuotaFilter; import org.apache.kafka.common.requests.LeaveGroupResponse; +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + /** * The administrative client for Kafka, which supports managing and inspecting topics, brokers, configurations and ACLs. *

@@ -303,7 +303,33 @@ default DescribeTopicsResult describeTopics(Collection topicNames) { * @param options The options to use when describing the topic. * @return The DescribeTopicsResult. */ - DescribeTopicsResult describeTopics(Collection topicNames, DescribeTopicsOptions options); + default DescribeTopicsResult describeTopics(Collection topicNames, DescribeTopicsOptions options) { + return describeTopics(TopicCollection.ofTopicNames(topicNames), options); + } + + /** + * This is a convenience method for {@link #describeTopics(TopicCollection, DescribeTopicsOptions)} + * with default options. See the overload for more details. + *

+ * When using topic IDs, this operation is supported by brokers with version 3.1.0 or higher. + * + * @param topics The topics to describe. + * @return The DescribeTopicsResult. + */ + default DescribeTopicsResult describeTopics(TopicCollection topics) { + return describeTopics(topics, new DescribeTopicsOptions()); + } + + /** + * Describe some topics in the cluster. + * + * When using topic IDs, this operation is supported by brokers with version 3.1.0 or higher. + * + * @param topics The topics to describe. + * @param options The options to use when describing the topics. + * @return The DescribeTopicsResult. + */ + DescribeTopicsResult describeTopics(TopicCollection topics, DescribeTopicsOptions options); /** * Get information about the nodes in the cluster, using the default options. diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java index b0d23d97b4ce1..725b82a78dee8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java @@ -70,12 +70,12 @@ public Map> topicNameValues() { return nameFutures; } - @Deprecated /** * @return a map from topic names to futures which can be used to check the status of * individual deletions if the deleteTopics request used topic names. Otherwise return null. * @deprecated Since 3.0 use {@link #topicNameValues} instead */ + @Deprecated public Map> values() { return nameFutures; } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java index 7753984a7bda7..41593c52984f0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java @@ -18,6 +18,8 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicCollection; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.annotation.InterfaceStability; import java.util.Collection; @@ -32,28 +34,105 @@ */ @InterfaceStability.Evolving public class DescribeTopicsResult { - private final Map> futures; + private final Map> topicIdFutures; + private final Map> nameFutures; + @Deprecated protected DescribeTopicsResult(Map> futures) { - this.futures = futures; + this(null, futures); + } + + // VisibleForTesting + protected DescribeTopicsResult(Map> topicIdFutures, Map> nameFutures) { + if (topicIdFutures != null && nameFutures != null) + throw new IllegalArgumentException("topicIdFutures and nameFutures cannot both be specified."); + if (topicIdFutures == null && nameFutures == null) + throw new IllegalArgumentException("topicIdFutures and nameFutures cannot both be null."); + this.topicIdFutures = topicIdFutures; + this.nameFutures = nameFutures; + } + + static DescribeTopicsResult ofTopicIds(Map> topicIdFutures) { + return new DescribeTopicsResult(topicIdFutures, null); + } + + static DescribeTopicsResult ofTopicNames(Map> nameFutures) { + return new DescribeTopicsResult(null, nameFutures); + } + + /** + * Use when {@link Admin#describeTopics(TopicCollection, DescribeTopicsOptions)} used a TopicIdCollection + * + * @return a map from topic IDs to futures which can be used to check the status of + * individual topics if the request used topic IDs, otherwise return null. + */ + public Map> topicIdValues() { + return topicIdFutures; + } + + /** + * Use when {@link Admin#describeTopics(TopicCollection, DescribeTopicsOptions)} used a TopicNameCollection + * + * @return a map from topic names to futures which can be used to check the status of + * individual topics if the request used topic names, otherwise return null. + */ + public Map> topicNameValues() { + return nameFutures; } /** - * Return a map from topic names to futures which can be used to check the status of - * individual topics. + * @return a map from topic names to futures which can be used to check the status of + * individual topics if the request used topic names, otherwise return null. + * + * @deprecated Since 3.1.0 use {@link #topicNameValues} instead */ + @Deprecated public Map> values() { - return futures; + return nameFutures; } /** - * Return a future which succeeds only if all the topic descriptions succeed. + * @return A future map from topic names to descriptions which can be used to check + * the status of individual description if the describe topic request used + * topic names, otherwise return null, this request succeeds only if all the + * topic descriptions succeed + * + * @deprecated Since 3.1.0 use {@link #allTopicNames()} instead */ + @Deprecated public KafkaFuture> all() { - return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])). + return all(nameFutures); + } + + /** + * @return A future map from topic names to descriptions which can be used to check + * the status of individual description if the describe topic request used + * topic names, otherwise return null, this request succeeds only if all the + * topic descriptions succeed + */ + public KafkaFuture> allTopicNames() { + return all(nameFutures); + } + + /** + * @return A future map from topic ids to descriptions which can be used to check the + * status of individual description if the describe topic request used topic + * ids, otherwise return null, this request succeeds only if all the topic + * descriptions succeed + */ + public KafkaFuture> allTopicIds() { + return all(topicIdFutures); + } + + /** + * Return a future which succeeds only if all the topic descriptions succeed. + */ + private static KafkaFuture> all(Map> futures) { + KafkaFuture future = KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])); + return future. thenApply(v -> { - Map descriptions = new HashMap<>(futures.size()); - for (Map.Entry> entry : futures.entrySet()) { + Map descriptions = new HashMap<>(futures.size()); + for (Map.Entry> entry : futures.entrySet()) { try { descriptions.put(entry.getKey(), entry.getValue().get()); } catch (InterruptedException | ExecutionException e) { diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 981598f8cf1d7..4fae1cca1b0ee 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -271,6 +271,7 @@ import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment; import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment; import static org.apache.kafka.common.requests.MetadataRequest.convertToMetadataRequestTopic; +import static org.apache.kafka.common.requests.MetadataRequest.convertTopicIdsToMetadataRequestTopic; import static org.apache.kafka.common.utils.Utils.closeQuietly; /** @@ -1497,6 +1498,10 @@ private static boolean topicNameIsUnrepresentable(String topicName) { return topicName == null || topicName.isEmpty(); } + private static boolean topicIdIsUnrepresentable(Uuid topicId) { + return topicId == null || topicId == Uuid.ZERO_UUID; + } + // for testing int numPendingCalls() { return runnable.pendingCalls.size(); @@ -1884,7 +1889,7 @@ void handleResponse(AbstractResponse abstractResponse) { String topicName = topicMetadata.topic(); boolean isInternal = topicMetadata.isInternal(); if (!topicMetadata.isInternal() || options.shouldListInternal()) - topicListing.put(topicName, new TopicListing(topicName, isInternal)); + topicListing.put(topicName, new TopicListing(topicName, topicMetadata.topicId(), isInternal)); } topicListingFuture.complete(topicListing); } @@ -1898,7 +1903,16 @@ void handleFailure(Throwable throwable) { } @Override - public DescribeTopicsResult describeTopics(final Collection topicNames, DescribeTopicsOptions options) { + public DescribeTopicsResult describeTopics(final TopicCollection topics, DescribeTopicsOptions options) { + if (topics instanceof TopicIdCollection) + return DescribeTopicsResult.ofTopicIds(handleDescribeTopicsByIds(((TopicIdCollection) topics).topicIds(), options)); + else if (topics instanceof TopicNameCollection) + return DescribeTopicsResult.ofTopicNames(handleDescribeTopicsByNames(((TopicNameCollection) topics).topicNames(), options)); + else + throw new IllegalArgumentException("The TopicCollection: " + topics + " provided did not match any supported classes for describeTopics."); + } + + private Map> handleDescribeTopicsByNames(final Collection topicNames, DescribeTopicsOptions options) { final Map> topicFutures = new HashMap<>(topicNames.size()); final ArrayList topicNamesList = new ArrayList<>(); for (String topicName : topicNames) { @@ -1947,28 +1961,13 @@ void handleResponse(AbstractResponse abstractResponse) { future.completeExceptionally(new UnknownTopicOrPartitionException("Topic " + topicName + " not found.")); continue; } - boolean isInternal = cluster.internalTopics().contains(topicName); - List partitionInfos = cluster.partitionsForTopic(topicName); - List partitions = new ArrayList<>(partitionInfos.size()); - for (PartitionInfo partitionInfo : partitionInfos) { - TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo( - partitionInfo.partition(), leader(partitionInfo), Arrays.asList(partitionInfo.replicas()), - Arrays.asList(partitionInfo.inSyncReplicas())); - partitions.add(topicPartitionInfo); - } - partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition)); - TopicDescription topicDescription = new TopicDescription(topicName, isInternal, partitions, - validAclOperations(response.topicAuthorizedOperations(topicName).get()), cluster.topicId(topicName)); + Uuid topicId = cluster.topicId(topicName); + Integer authorizedOperations = response.topicAuthorizedOperations(topicName).get(); + TopicDescription topicDescription = getTopicDescriptionFromCluster(cluster, topicName, topicId, authorizedOperations); future.complete(topicDescription); } } - private Node leader(PartitionInfo partitionInfo) { - if (partitionInfo.leader() == null || partitionInfo.leader().id() == Node.noNode().id()) - return null; - return partitionInfo.leader(); - } - @Override boolean handleUnsupportedVersionException(UnsupportedVersionException exception) { if (supportsDisablingTopicCreation) { @@ -1986,7 +1985,93 @@ void handleFailure(Throwable throwable) { if (!topicNamesList.isEmpty()) { runnable.call(call, now); } - return new DescribeTopicsResult(new HashMap<>(topicFutures)); + return new HashMap<>(topicFutures); + } + + private Map> handleDescribeTopicsByIds(Collection topicIds, DescribeTopicsOptions options) { + + final Map> topicFutures = new HashMap<>(topicIds.size()); + final List topicIdsList = new ArrayList<>(); + for (Uuid topicId : topicIds) { + if (topicIdIsUnrepresentable(topicId)) { + KafkaFutureImpl future = new KafkaFutureImpl<>(); + future.completeExceptionally(new InvalidTopicException("The given topic id '" + + topicId + "' cannot be represented in a request.")); + topicFutures.put(topicId, future); + } else if (!topicFutures.containsKey(topicId)) { + topicFutures.put(topicId, new KafkaFutureImpl<>()); + topicIdsList.add(topicId); + } + } + final long now = time.milliseconds(); + Call call = new Call("describeTopicsWithIds", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + + @Override + MetadataRequest.Builder createRequest(int timeoutMs) { + return new MetadataRequest.Builder(new MetadataRequestData() + .setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList)) + .setAllowAutoTopicCreation(false) + .setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations())); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + MetadataResponse response = (MetadataResponse) abstractResponse; + // Handle server responses for particular topics. + Cluster cluster = response.buildCluster(); + Map errors = response.errorsByTopicId(); + for (Map.Entry> entry : topicFutures.entrySet()) { + Uuid topicId = entry.getKey(); + KafkaFutureImpl future = entry.getValue(); + + String topicName = cluster.topicName(topicId); + if (topicName == null) { + future.completeExceptionally(new InvalidTopicException("TopicId " + topicId + " not found.")); + continue; + } + Errors topicError = errors.get(topicId); + if (topicError != null) { + future.completeExceptionally(topicError.exception()); + continue; + } + + Integer authorizedOperations = response.topicAuthorizedOperations(topicName).get(); + TopicDescription topicDescription = getTopicDescriptionFromCluster(cluster, topicName, topicId, authorizedOperations); + future.complete(topicDescription); + } + } + + @Override + void handleFailure(Throwable throwable) { + completeAllExceptionally(topicFutures.values(), throwable); + } + }; + if (!topicIdsList.isEmpty()) { + runnable.call(call, now); + } + return new HashMap<>(topicFutures); + } + + private TopicDescription getTopicDescriptionFromCluster(Cluster cluster, String topicName, Uuid topicId, + Integer authorizedOperations) { + boolean isInternal = cluster.internalTopics().contains(topicName); + List partitionInfos = cluster.partitionsForTopic(topicName); + List partitions = new ArrayList<>(partitionInfos.size()); + for (PartitionInfo partitionInfo : partitionInfos) { + TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo( + partitionInfo.partition(), leader(partitionInfo), Arrays.asList(partitionInfo.replicas()), + Arrays.asList(partitionInfo.inSyncReplicas())); + partitions.add(topicPartitionInfo); + } + partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition)); + return new TopicDescription(topicName, isInternal, partitions, validAclOperations(authorizedOperations), topicId); + } + + private Node leader(PartitionInfo partitionInfo) { + if (partitionInfo.leader() == null || partitionInfo.leader().id() == Node.noNode().id()) + return null; + return partitionInfo.leader(); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java index a2d52e93d0bed..e8700d4d067d2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java @@ -72,13 +72,23 @@ public TopicDescription(String name, boolean internal, List * @param internal Whether the topic is internal to Kafka * @param partitions A list of partitions where the index represents the partition id and the element contains * leadership and replica information for that partition. - * @param authorizedOperations authorized operations for this topic, or null if this is not known. + * @param authorizedOperations authorized operations for this topic, or empty set if this is not known. */ public TopicDescription(String name, boolean internal, List partitions, Set authorizedOperations) { this(name, internal, partitions, authorizedOperations, Uuid.ZERO_UUID); } + /** + * Create an instance with the specified parameters. + * + * @param name The topic name + * @param internal Whether the topic is internal to Kafka + * @param partitions A list of partitions where the index represents the partition id and the element contains + * leadership and replica information for that partition. + * @param authorizedOperations authorized operations for this topic, or empty set if this is not known. + * @param topicId the topic id + */ public TopicDescription(String name, boolean internal, List partitions, Set authorizedOperations, Uuid topicId) { this.name = name; diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java index e5124be845f2a..42ceeff20bb93 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java @@ -17,11 +17,14 @@ package org.apache.kafka.clients.admin; +import org.apache.kafka.common.Uuid; + /** * A listing of a topic in the cluster. */ public class TopicListing { private final String name; + private final Uuid topicId; private final boolean internal; /** @@ -29,10 +32,33 @@ public class TopicListing { * * @param name The topic name * @param internal Whether the topic is internal to Kafka + * @deprecated Since 3.0 use {@link #TopicListing(String, Uuid, boolean)} instead */ + @Deprecated public TopicListing(String name, boolean internal) { this.name = name; this.internal = internal; + this.topicId = Uuid.ZERO_UUID; + } + + /** + * Create an instance with the specified parameters. + * + * @param name The topic name + * @param topicId The topic id. + * @param internal Whether the topic is internal to Kafka + */ + public TopicListing(String name, Uuid topicId, boolean internal) { + this.topicId = topicId; + this.name = name; + this.internal = internal; + } + + /** + * The id of the topic. + */ + public Uuid topicId() { + return topicId; } /** @@ -52,6 +78,6 @@ public boolean isInternal() { @Override public String toString() { - return "(name=" + name + ", internal=" + internal + ")"; + return "(name=" + name + ", topicId=" + topicId + ", internal=" + internal + ")"; } } diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java index 8c9bbcb6eff95..7d3f6f08a0139 100644 --- a/clients/src/main/java/org/apache/kafka/common/Cluster.java +++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java @@ -47,6 +47,7 @@ public final class Cluster { private final Map nodesById; private final ClusterResource clusterResource; private final Map topicIds; + private final Map topicNames; /** * Create a new cluster with the given id, nodes and partitions @@ -184,6 +185,9 @@ private Cluster(String clusterId, this.availablePartitionsByTopic = Collections.unmodifiableMap(tmpAvailablePartitionsByTopic); this.partitionsByNode = Collections.unmodifiableMap(tmpPartitionsByNode); this.topicIds = Collections.unmodifiableMap(topicIds); + Map tmpTopicNames = new HashMap<>(); + topicIds.forEach((key, value) -> tmpTopicNames.put(value, key)); + this.topicNames = Collections.unmodifiableMap(tmpTopicNames); this.unauthorizedTopics = Collections.unmodifiableSet(unauthorizedTopics); this.invalidTopics = Collections.unmodifiableSet(invalidTopics); @@ -354,6 +358,10 @@ public Uuid topicId(String topic) { return topicIds.getOrDefault(topic, Uuid.ZERO_UUID); } + public String topicName(Uuid topicId) { + return topicNames.get(topicId); + } + @Override public String toString() { return "Cluster(id = " + clusterResource.clusterId() + ", nodes = " + this.nodes + diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index d38e9ac47b80a..aab5fc6840262 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -65,6 +65,20 @@ public Builder(List topics, boolean allowAutoTopicCreation) { this(topics, allowAutoTopicCreation, ApiKeys.METADATA.oldestVersion(), ApiKeys.METADATA.latestVersion()); } + public Builder(List topicIds) { + super(ApiKeys.METADATA, ApiKeys.METADATA.oldestVersion(), ApiKeys.METADATA.latestVersion()); + MetadataRequestData data = new MetadataRequestData(); + if (topicIds == null) + data.setTopics(null); + else { + topicIds.forEach(topicId -> data.topics().add(new MetadataRequestTopic().setTopicId(topicId))); + } + + // It's impossible to create topic with topicId + data.setAllowAutoTopicCreation(false); + this.data = data; + } + public static Builder allTopics() { // This never causes auto-creation, but we set the boolean to true because that is the default value when // deserializing V2 and older. This way, the value is consistent after serialization and deserialization. @@ -95,10 +109,10 @@ public MetadataRequest build(short version) { "allowAutoTopicCreation field"); if (data.topics() != null) { data.topics().forEach(topic -> { - if (topic.name() == null) + if (topic.name() == null && version < 12) throw new UnsupportedVersionException("MetadataRequest version " + version + " does not support null topic names."); - if (topic.topicId() != Uuid.ZERO_UUID) + if (topic.topicId() != Uuid.ZERO_UUID && version < 12) throw new UnsupportedVersionException("MetadataRequest version " + version + " does not support non-zero topic IDs."); }); @@ -147,12 +161,12 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { public boolean isAllTopics() { return (data.topics() == null) || - (data.topics().isEmpty() && version() == 0); //In version 0, an empty topic list indicates + (data.topics().isEmpty() && version() == 0); // In version 0, an empty topic list indicates // "request metadata for all topics." } public List topics() { - if (isAllTopics()) //In version 0, we return null for empty topic list + if (isAllTopics()) // In version 0, we return null for empty topic list return null; else return data.topics() @@ -161,6 +175,18 @@ public List topics() { .collect(Collectors.toList()); } + public List topicIds() { + if (isAllTopics()) + return Collections.emptyList(); + else if (version() < 10) + return Collections.emptyList(); + else + return data.topics() + .stream() + .map(MetadataRequestTopic::topicId) + .collect(Collectors.toList()); + } + public boolean allowAutoTopicCreation() { return data.allowAutoTopicCreation(); } @@ -174,4 +200,10 @@ public static List convertToMetadataRequestTopic(final Col .setName(topic)) .collect(Collectors.toList()); } + + public static List convertTopicIdsToMetadataRequestTopic(final Collection topicIds) { + return topicIds.stream().map(topicId -> new MetadataRequestTopic() + .setTopicId(topicId)) + .collect(Collectors.toList()); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index c85b31d383011..d539fa871982c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -91,12 +91,31 @@ public int throttleTimeMs() { public Map errors() { Map errors = new HashMap<>(); for (MetadataResponseTopic metadata : data.topics()) { + if (metadata.name() == null) { + throw new IllegalStateException("Use errorsByTopicId() when managing topic using topic id"); + } if (metadata.errorCode() != Errors.NONE.code()) errors.put(metadata.name(), Errors.forCode(metadata.errorCode())); } return errors; } + /** + * Get a map of the topicIds which had metadata errors + * @return the map + */ + public Map errorsByTopicId() { + Map errors = new HashMap<>(); + for (MetadataResponseTopic metadata : data.topics()) { + if (metadata.topicId() == Uuid.ZERO_UUID) { + throw new IllegalStateException("Use errors() when managing topic using topic name"); + } + if (metadata.errorCode() != Errors.NONE.code()) + errors.put(metadata.topicId(), Errors.forCode(metadata.errorCode())); + } + return errors; + } + @Override public Map errorCounts() { Map errorCounts = new HashMap<>(); diff --git a/clients/src/main/resources/common/message/MetadataRequest.json b/clients/src/main/resources/common/message/MetadataRequest.json index 2d88a0db15744..5da95cfed68c8 100644 --- a/clients/src/main/resources/common/message/MetadataRequest.json +++ b/clients/src/main/resources/common/message/MetadataRequest.json @@ -18,7 +18,7 @@ "type": "request", "listeners": ["zkBroker", "broker"], "name": "MetadataRequest", - "validVersions": "0-11", + "validVersions": "0-12", "flexibleVersions": "9+", "fields": [ // In version 0, an empty array indicates "request metadata for all topics." In version 1 and @@ -38,6 +38,7 @@ // // Version 11 deprecates IncludeClusterAuthorizedOperations field. This is now exposed // by the DescribeCluster API (KIP-700). + // Version 12 supports topic Id. { "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+", "nullableVersions": "1+", "about": "The topics to fetch metadata for.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true, "about": "The topic id." }, diff --git a/clients/src/main/resources/common/message/MetadataResponse.json b/clients/src/main/resources/common/message/MetadataResponse.json index d0052949334e1..714b28b53d961 100644 --- a/clients/src/main/resources/common/message/MetadataResponse.json +++ b/clients/src/main/resources/common/message/MetadataResponse.json @@ -41,7 +41,8 @@ // // Version 11 deprecates ClusterAuthorizedOperations. This is now exposed // by the DescribeCluster API (KIP-700). - "validVersions": "0-11", + // Version 12 supports topicId. + "validVersions": "0-12", "flexibleVersions": "9+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true, @@ -65,7 +66,7 @@ "about": "Each topic in the response.", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The topic error, or 0 if there was no error." }, - { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "nullableVersions": "12+", "about": "The topic name." }, { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true, "about": "The topic id." }, { "name": "IsInternal", "type": "bool", "versions": "1+", "default": "false", "ignorable": true, diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java index 019566a5083e8..3db5739e692d7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java @@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.internals.KafkaFutureImpl; public class AdminClientTestUtils { @@ -70,7 +71,7 @@ public static DeleteTopicsResult deleteTopicsResult(String topic, Throwable t) { */ public static ListTopicsResult listTopicsResult(String topic) { KafkaFutureImpl> future = new KafkaFutureImpl<>(); - future.complete(Collections.singletonMap(topic, new TopicListing(topic, false))); + future.complete(Collections.singletonMap(topic, new TopicListing(topic, Uuid.ZERO_UUID, false))); return new ListTopicsResult(future); } @@ -93,11 +94,11 @@ public static CreatePartitionsResult createPartitionsResult(String topic, Throwa public static DescribeTopicsResult describeTopicsResult(String topic, TopicDescription description) { KafkaFutureImpl future = new KafkaFutureImpl<>(); future.complete(description); - return new DescribeTopicsResult(Collections.singletonMap(topic, future)); + return DescribeTopicsResult.ofTopicNames(Collections.singletonMap(topic, future)); } public static DescribeTopicsResult describeTopicsResult(Map topicDescriptions) { - return new DescribeTopicsResult(topicDescriptions.entrySet().stream() + return DescribeTopicsResult.ofTopicNames(topicDescriptions.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> KafkaFuture.completedFuture(e.getValue())))); } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index f1ace8571d958..46542db5d76ce 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -1203,7 +1203,7 @@ public void testInvalidTopicNames() throws Exception { assertEquals(0, env.kafkaClient().inFlightRequestCount()); Map> describeFutures = - env.adminClient().describeTopics(sillyTopicNames).values(); + env.adminClient().describeTopics(sillyTopicNames).topicNameValues(); for (String sillyTopicName : sillyTopicNames) { TestUtils.assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class); } @@ -1255,7 +1255,7 @@ Errors.NONE, new TopicPartition(topic, 0), Optional.of(leader.id()), Optional.of singletonList(partitionMetadata), MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED)))); DescribeTopicsResult result = env.adminClient().describeTopics(singleton(topic)); - Map topicDescriptions = result.all().get(); + Map topicDescriptions = result.allTopicNames().get(); assertEquals(leader, topicDescriptions.get(topic).partitions().get(0).leader()); assertNull(topicDescriptions.get(topic).authorizedOperations()); } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 2ef4a26426f57..473edae9cd1c8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -40,6 +40,7 @@ import org.apache.kafka.common.errors.ReplicaNotAvailableException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.errors.UnknownTopicIdException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.KafkaFutureImpl; @@ -199,10 +200,18 @@ synchronized public void controller(Node controller) { this.controller = controller; } - synchronized public void addTopic(boolean internal, + public void addTopic(boolean internal, String name, List partitions, Map configs) { + addTopic(internal, name, partitions, configs, true); + } + + synchronized public void addTopic(boolean internal, + String name, + List partitions, + Map configs, + boolean usesTopicId) { if (allTopics.containsKey(name)) { throw new IllegalArgumentException(String.format("Topic %s was already added.", name)); } @@ -223,10 +232,15 @@ synchronized public void addTopic(boolean internal, logDirs.add(brokerLogDirs.get(partition.leader().id()).get(0)); } } - allTopics.put(name, new TopicMetadata(internal, partitions, logDirs, configs)); - Uuid id = Uuid.randomUuid(); - topicIds.put(name, id); - topicNames.put(id, name); + Uuid topicId; + if (usesTopicId) { + topicId = Uuid.randomUuid(); + topicIds.put(name, topicId); + topicNames.put(topicId, name); + } else { + topicId = Uuid.ZERO_UUID; + } + allTopics.put(name, new TopicMetadata(topicId, internal, partitions, logDirs, configs)); } synchronized public void markTopicForDeletion(final String name) { @@ -317,10 +331,10 @@ synchronized public CreateTopicsResult createTopics(Collection newTopi partitions.add(new TopicPartitionInfo(i, brokers.get(0), replicas, Collections.emptyList())); logDirs.add(brokerLogDirs.get(partitions.get(i).leader().id()).get(0)); } - allTopics.put(topicName, new TopicMetadata(false, partitions, logDirs, newTopic.configs())); - Uuid id = Uuid.randomUuid(); - topicIds.put(topicName, id); - topicNames.put(id, topicName); + Uuid topicId = Uuid.randomUuid(); + topicIds.put(topicName, topicId); + topicNames.put(topicId, topicName); + allTopics.put(topicName, new TopicMetadata(topicId, false, partitions, logDirs, newTopic.configs())); future.complete(null); createTopicResult.put(topicName, future); } @@ -345,7 +359,7 @@ synchronized public ListTopicsResult listTopics(ListTopicsOptions options) { if (topicDescription.getValue().fetchesRemainingUntilVisible > 0) { topicDescription.getValue().fetchesRemainingUntilVisible--; } else { - topicListings.put(topicName, new TopicListing(topicName, topicDescription.getValue().isInternalTopic)); + topicListings.put(topicName, new TopicListing(topicName, topicDescription.getValue().topicId, topicDescription.getValue().isInternalTopic)); } } @@ -355,7 +369,16 @@ synchronized public ListTopicsResult listTopics(ListTopicsOptions options) { } @Override - synchronized public DescribeTopicsResult describeTopics(Collection topicNames, DescribeTopicsOptions options) { + synchronized public DescribeTopicsResult describeTopics(TopicCollection topics, DescribeTopicsOptions options) { + if (topics instanceof TopicIdCollection) + return DescribeTopicsResult.ofTopicIds(new HashMap<>(handleDescribeTopicsUsingIds(((TopicIdCollection) topics).topicIds(), options))); + else if (topics instanceof TopicNameCollection) + return DescribeTopicsResult.ofTopicNames(new HashMap<>(handleDescribeTopicsByNames(((TopicNameCollection) topics).topicNames(), options))); + else + throw new IllegalArgumentException("The TopicCollection provided did not match any supported classes for describeTopics."); + } + + private Map> handleDescribeTopicsByNames(Collection topicNames, DescribeTopicsOptions options) { Map> topicDescriptions = new HashMap<>(); if (timeoutNextRequests > 0) { @@ -366,20 +389,20 @@ synchronized public DescribeTopicsResult describeTopics(Collection topic } --timeoutNextRequests; - return new DescribeTopicsResult(topicDescriptions); + return topicDescriptions; } for (String requestedTopic : topicNames) { for (Map.Entry topicDescription : allTopics.entrySet()) { String topicName = topicDescription.getKey(); + Uuid topicId = topicIds.getOrDefault(topicName, Uuid.ZERO_UUID); if (topicName.equals(requestedTopic) && !topicDescription.getValue().markedForDeletion) { if (topicDescription.getValue().fetchesRemainingUntilVisible > 0) { topicDescription.getValue().fetchesRemainingUntilVisible--; } else { TopicMetadata topicMetadata = topicDescription.getValue(); KafkaFutureImpl future = new KafkaFutureImpl<>(); - future.complete(new TopicDescription(topicName, topicMetadata.isInternalTopic, topicMetadata.partitions, - Collections.emptySet())); + future.complete(new TopicDescription(topicName, topicMetadata.isInternalTopic, topicMetadata.partitions, Collections.emptySet(), topicId)); topicDescriptions.put(topicName, future); break; } @@ -392,7 +415,49 @@ synchronized public DescribeTopicsResult describeTopics(Collection topic } } - return new DescribeTopicsResult(topicDescriptions); + return topicDescriptions; + } + + synchronized public Map> handleDescribeTopicsUsingIds(Collection topicIds, DescribeTopicsOptions options) { + + Map> topicDescriptions = new HashMap<>(); + + if (timeoutNextRequests > 0) { + for (Uuid requestedTopicId : topicIds) { + KafkaFutureImpl future = new KafkaFutureImpl<>(); + future.completeExceptionally(new TimeoutException()); + topicDescriptions.put(requestedTopicId, future); + } + + --timeoutNextRequests; + return topicDescriptions; + } + + for (Uuid requestedTopicId : topicIds) { + for (Map.Entry topicDescription : allTopics.entrySet()) { + String topicName = topicDescription.getKey(); + Uuid topicId = this.topicIds.get(topicName); + + if (topicId != null && topicId.equals(requestedTopicId) && !topicDescription.getValue().markedForDeletion) { + if (topicDescription.getValue().fetchesRemainingUntilVisible > 0) { + topicDescription.getValue().fetchesRemainingUntilVisible--; + } else { + TopicMetadata topicMetadata = topicDescription.getValue(); + KafkaFutureImpl future = new KafkaFutureImpl<>(); + future.complete(new TopicDescription(topicName, topicMetadata.isInternalTopic, topicMetadata.partitions, Collections.emptySet(), topicId)); + topicDescriptions.put(requestedTopicId, future); + break; + } + } + } + if (!topicDescriptions.containsKey(requestedTopicId)) { + KafkaFutureImpl future = new KafkaFutureImpl<>(); + future.completeExceptionally(new UnknownTopicIdException("Topic id" + requestedTopicId + " not found.")); + topicDescriptions.put(requestedTopicId, future); + } + } + + return topicDescriptions; } @Override @@ -948,6 +1013,7 @@ public synchronized void updateEndOffsets(final Map newOff } private final static class TopicMetadata { + final Uuid topicId; final boolean isInternalTopic; final List partitions; final List partitionLogDirs; @@ -956,10 +1022,12 @@ private final static class TopicMetadata { public boolean markedForDeletion; - TopicMetadata(boolean isInternalTopic, + TopicMetadata(Uuid topicId, + boolean isInternalTopic, List partitions, List partitionLogDirs, Map configs) { + this.topicId = topicId; this.isInternalTopic = isInternalTopic; this.partitions = partitions; this.partitionLogDirs = partitionLogDirs; diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java index a45cf52fb278b..0a0466f6ff1e6 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java @@ -116,7 +116,7 @@ public void testAdminClientWithInvalidCredentials() { Map props = new HashMap<>(saslClientConfigs); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port()); try (Admin client = Admin.create(props)) { - KafkaFuture> future = client.describeTopics(Collections.singleton("test")).all(); + KafkaFuture> future = client.describeTopics(Collections.singleton("test")).allTopicNames(); TestUtils.assertFutureThrows(future, SaslAuthenticationException.class); } } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java index fd6bf9f27649c..0e3ccc9f2236e 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java @@ -399,7 +399,7 @@ private Collection listTopicAclBindings() private static Collection describeTopics(AdminClient adminClient, Collection topics) throws InterruptedException, ExecutionException { - return adminClient.describeTopics(topics).all().get().values(); + return adminClient.describeTopics(topics).allTopicNames().get().values(); } static Map configToMap(Config config) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index 9661c69e63fb5..7b2f152e2eede 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -452,7 +452,7 @@ public Map describeTopics(String... topics) { String topicNameList = String.join(", ", topics); Map> newResults = - admin.describeTopics(Arrays.asList(topics), new DescribeTopicsOptions()).values(); + admin.describeTopics(Arrays.asList(topics), new DescribeTopicsOptions()).topicNameValues(); // Iterate over each future so that we can handle individual failures like when some topics don't exist Map existingTopics = new HashMap<>(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java index edd989125b43e..dc2512900f666 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java @@ -797,7 +797,7 @@ protected void assertTopic(MockAdminClient admin, String topicName, int expected protected TopicDescription topicDescription(MockAdminClient admin, String topicName) throws ExecutionException, InterruptedException { DescribeTopicsResult result = admin.describeTopics(Collections.singleton(topicName)); - Map> byName = result.values(); + Map> byName = result.topicNameValues(); return byName.get(topicName).get(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java index db6f67e56e60f..17fd1acbdf8e1 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java @@ -305,7 +305,7 @@ public Map> describeTopics(Set topicN log.info("Describing topics {}", topicNames); try (Admin admin = createAdminClient()) { DescribeTopicsResult result = admin.describeTopics(topicNames); - Map> byName = result.values(); + Map> byName = result.topicNameValues(); for (Map.Entry> entry : byName.entrySet()) { String topicName = entry.getKey(); try { diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 74b722442a7c7..47c1d173b306b 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -465,7 +465,7 @@ object ConsumerGroupCommand extends Logging { topicWithoutPartitions.asJava, withTimeoutMs(new DescribeTopicsOptions)) - val unknownPartitions = describeTopicsResult.values().asScala.flatMap { case (topic, future) => + val unknownPartitions = describeTopicsResult.topicNameValues().asScala.flatMap { case (topic, future) => Try(future.get()) match { case Success(description) => description.partitions().asScala.map { partition => new TopicPartition(topic, partition.partition()) @@ -726,7 +726,7 @@ object ConsumerGroupCommand extends Logging { val descriptionMap = adminClient.describeTopics( topics.asJava, withTimeoutMs(new DescribeTopicsOptions) - ).all().get.asScala + ).allTopicNames().get.asScala descriptionMap.flatMap { case (topic, description) => description.partitions().asScala.map { tpInfo => new TopicPartition(topic, tpInfo.partition) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 186a4313af08e..ac6304b449191 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -376,7 +376,7 @@ object ReassignPartitionsCommand extends Logging { topicNamesToLookUp.add(part.topic) } val topicDescriptions = adminClient. - describeTopics(topicNamesToLookUp.asJava).values().asScala + describeTopics(topicNamesToLookUp.asJava).topicNameValues().asScala val notFoundResults = notFoundReassignments.map { case (part, targetReplicas) => currentReassignments.get(part) match { @@ -618,7 +618,7 @@ object ReassignPartitionsCommand extends Logging { private def describeTopics(adminClient: Admin, topics: Set[String]) : Map[String, TopicDescription] = { - adminClient.describeTopics(topics.asJava).values.asScala.map { case (topicName, topicDescriptionFuture) => + adminClient.describeTopics(topics.asJava).topicNameValues().asScala.map { case (topicName, topicDescriptionFuture) => try topicName -> topicDescriptionFuture.get catch { case t: ExecutionException if t.getCause.isInstanceOf[UnknownTopicOrPartitionException] => diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 8ba2e687d888a..f7b1c870b9cdd 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -28,7 +28,7 @@ import org.apache.kafka.clients.admin.CreatePartitionsOptions import org.apache.kafka.clients.admin.CreateTopicsOptions import org.apache.kafka.clients.admin.DeleteTopicsOptions import org.apache.kafka.clients.admin.{Admin, ListTopicsOptions, NewPartitions, NewTopic, PartitionReassignment, Config => JConfig} -import org.apache.kafka.common.{TopicPartition, TopicPartitionInfo, Uuid} +import org.apache.kafka.common.{TopicCollection, TopicPartition, TopicPartitionInfo, Uuid} import org.apache.kafka.common.config.ConfigResource.Type import org.apache.kafka.common.config.{ConfigResource, TopicConfig} import org.apache.kafka.common.errors.{ClusterAuthorizationException, TopicExistsException, UnsupportedVersionException} @@ -264,7 +264,7 @@ object TopicCommand extends Logging { ensureTopicExists(topics, opts.topic, !opts.ifExists) if (topics.nonEmpty) { - val topicsInfo = adminClient.describeTopics(topics.asJavaCollection).values() + val topicsInfo = adminClient.describeTopics(topics.asJavaCollection).topicNameValues() val newPartitions = topics.map { topicName => if (topic.hasReplicaAssignment) { val startPartitionId = topicsInfo.get(topicName).get().partitions().size() @@ -297,42 +297,60 @@ object TopicCommand extends Logging { } def describeTopic(opts: TopicCommandOptions): Unit = { - val topics = getTopics(opts.topic, opts.excludeInternalTopics) - ensureTopicExists(topics, opts.topic, !opts.ifExists) + // If topicId is provided and not zero, will use topicId regardless of topic name + val inputTopicId = opts.topicId.map(Uuid.fromString).filter(uuid => uuid != Uuid.ZERO_UUID) + val useTopicId = inputTopicId.nonEmpty - if (topics.nonEmpty) { - val allConfigs = adminClient.describeConfigs(topics.map(new ConfigResource(Type.TOPIC, _)).asJavaCollection).values() - val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()) - val topicDescriptions = adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala - val describeOptions = new DescribeOptions(opts, liveBrokers.toSet) - val topicPartitions = topicDescriptions - .flatMap(td => td.partitions.iterator().asScala.map(p => new TopicPartition(td.name(), p.partition()))) - .toSet.asJava - val reassignments = listAllReassignments(topicPartitions) - - for (td <- topicDescriptions) { - val topicName = td.name - val topicId = td.topicId() - val config = allConfigs.get(new ConfigResource(Type.TOPIC, topicName)).get() - val sortedPartitions = td.partitions.asScala.sortBy(_.partition) - - if (describeOptions.describeConfigs) { - val hasNonDefault = config.entries().asScala.exists(!_.isDefault) - if (!opts.reportOverriddenConfigs || hasNonDefault) { - val numPartitions = td.partitions().size - val firstPartition = td.partitions.iterator.next() - val reassignment = reassignments.get(new TopicPartition(td.name, firstPartition.partition)) - val topicDesc = TopicDescription(topicName, topicId, numPartitions, getReplicationFactor(firstPartition, reassignment), config, markedForDeletion = false) - topicDesc.printDescription() - } + val (topicIds, topics) = if (useTopicId) + (getTopicIds(inputTopicId, opts.excludeInternalTopics), Seq()) + else + (Seq(), getTopics(opts.topic, opts.excludeInternalTopics)) + + // Only check topic name when topicId is not provided + if (useTopicId) + ensureTopicIdExists(topicIds, inputTopicId, !opts.ifExists) + else + ensureTopicExists(topics, opts.topic, !opts.ifExists) + + val topicDescriptions = if (topicIds.nonEmpty) { + adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds.toSeq.asJavaCollection)).allTopicIds().get().values().asScala + } else if (topics.nonEmpty) { + adminClient.describeTopics(TopicCollection.ofTopicNames(topics.asJavaCollection)).allTopicNames().get().values().asScala + } else { + Seq() + } + + val topicNames = topicDescriptions.map(_.name()) + val allConfigs = adminClient.describeConfigs(topicNames.map(new ConfigResource(Type.TOPIC, _)).asJavaCollection).values() + val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()) + val describeOptions = new DescribeOptions(opts, liveBrokers.toSet) + val topicPartitions = topicDescriptions + .flatMap(td => td.partitions.iterator().asScala.map(p => new TopicPartition(td.name(), p.partition()))) + .toSet.asJava + val reassignments = listAllReassignments(topicPartitions) + + for (td <- topicDescriptions) { + val topicName = td.name + val topicId = td.topicId() + val config = allConfigs.get(new ConfigResource(Type.TOPIC, topicName)).get() + val sortedPartitions = td.partitions.asScala.sortBy(_.partition) + + if (describeOptions.describeConfigs) { + val hasNonDefault = config.entries().asScala.exists(!_.isDefault) + if (!opts.reportOverriddenConfigs || hasNonDefault) { + val numPartitions = td.partitions().size + val firstPartition = td.partitions.iterator.next() + val reassignment = reassignments.get(new TopicPartition(td.name, firstPartition.partition)) + val topicDesc = TopicDescription(topicName, topicId, numPartitions, getReplicationFactor(firstPartition, reassignment), config, markedForDeletion = false) + topicDesc.printDescription() } + } - if (describeOptions.describePartitions) { - for (partition <- sortedPartitions) { - val reassignment = reassignments.get(new TopicPartition(td.name, partition.partition)) - val partitionDesc = PartitionDescription(topicName, partition, Some(config), markedForDeletion = false, reassignment) - describeOptions.maybePrintPartitionDescription(partitionDesc) - } + if (describeOptions.describePartitions) { + for (partition <- sortedPartitions) { + val reassignment = reassignments.get(new TopicPartition(td.name, partition.partition)) + val partitionDesc = PartitionDescription(topicName, partition, Some(config), markedForDeletion = false, reassignment) + describeOptions.maybePrintPartitionDescription(partitionDesc) } } } @@ -345,13 +363,23 @@ object TopicCommand extends Logging { .all().get() } - def getTopics(topicIncludelist: Option[String], excludeInternalTopics: Boolean = false): Seq[String] = { + def getTopics(topicIncludeList: Option[String], excludeInternalTopics: Boolean = false): Seq[String] = { + val allTopics = if (excludeInternalTopics) { + adminClient.listTopics() + } else { + adminClient.listTopics(new ListTopicsOptions().listInternal(true)) + } + doGetTopics(allTopics.names().get().asScala.toSeq.sorted, topicIncludeList, excludeInternalTopics) + } + + def getTopicIds(topicIdIncludeList: Option[Uuid], excludeInternalTopics: Boolean = false): Seq[Uuid] = { val allTopics = if (excludeInternalTopics) { adminClient.listTopics() } else { adminClient.listTopics(new ListTopicsOptions().listInternal(true)) } - doGetTopics(allTopics.names().get().asScala.toSeq.sorted, topicIncludelist, excludeInternalTopics) + val allTopicIds = allTopics.listings().get().asScala.map(_.topicId()).toSeq.sorted + topicIdIncludeList.filter(allTopicIds.contains).toSeq } def close(): Unit = adminClient.close() @@ -374,6 +402,23 @@ object TopicCommand extends Logging { } } + /** + * ensures topic existence and throws exception if topic doesn't exist + * + * @param foundTopicIds Topics that were found to match the requested topic id. + * @param requestedTopicId Id of the topic that was requested. + * @param requireTopicIdExists Indicates if the topic needs to exist for the operation to be successful. + * If set to true, the command will throw an exception if the topic with the + * requested id does not exist. + */ + private def ensureTopicIdExists(foundTopicIds: Seq[Uuid], requestedTopicId: Option[Uuid], requireTopicIdExists: Boolean): Unit = { + // If no topic id was mentioned, do not need to throw exception. + if (requestedTopicId.isDefined && requireTopicIdExists && foundTopicIds.isEmpty) { + // If given topicId doesn't exist then throw exception + throw new IllegalArgumentException(s"TopicId '${requestedTopicId.get}' does not exist as expected") + } + } + private def doGetTopics(allTopics: Seq[String], topicIncludeList: Option[String], excludeInternalTopics: Boolean): Seq[String] = { if (topicIncludeList.isDefined) { val topicsFilter = IncludeList(topicIncludeList.get) @@ -464,6 +509,11 @@ object TopicCommand extends Logging { .withRequiredArg .describedAs("topic") .ofType(classOf[String]) + private val topicIdOpt = parser.accepts("topic-id", "The topic-id to describe." + + "This is used only with --bootstrap-server option for describing topics.") + .withRequiredArg + .describedAs("topic-id") + .ofType(classOf[String]) private val nl = System.getProperty("line.separator") private val kafkaConfigsCanAlterTopicConfigsViaBootstrapServer = " (the kafka-configs CLI supports altering topic configs with a --bootstrap-server option)" @@ -533,6 +583,7 @@ object TopicCommand extends Logging { def bootstrapServer: Option[String] = valueAsOption(bootstrapServerOpt) def commandConfig: Properties = if (has(commandConfigOpt)) Utils.loadProps(options.valueOf(commandConfigOpt)) else new Properties() def topic: Option[String] = valueAsOption(topicOpt) + def topicId: Option[String] = valueAsOption(topicIdOpt) def partitions: Option[Integer] = valueAsOption(partitionsOpt) def replicationFactor: Option[Integer] = valueAsOption(replicationFactorOpt) def replicaAssignment: Option[Map[Int, List[Int]]] = @@ -566,8 +617,12 @@ object TopicCommand extends Logging { // check required args if (!has(bootstrapServerOpt)) throw new IllegalArgumentException("--bootstrap-server must be specified") - if (has(describeOpt) && has(ifExistsOpt)) - CommandLineUtils.checkRequiredArgs(parser, options, topicOpt) + if (has(describeOpt) && has(ifExistsOpt)) { + if (!has(topicOpt) && !has(topicIdOpt)) + CommandLineUtils.printUsageAndDie(parser, "--topic or --topic-id is required to describe a topic") + if (has(topicOpt) && has(topicIdOpt)) + println("Only topic id will be used when both --topic and --topic-id are specified and topicId is not Uuid.ZERO_UUID") + } if (!has(listOpt) && !has(describeOpt)) CommandLineUtils.checkRequiredArgs(parser, options, topicOpt) if (has(createOpt) && !has(replicaAssignmentOpt)) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 144414da784c9..c2e0c15a36991 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1133,11 +1133,13 @@ class KafkaApis(val requestChannel: RequestChannel, private def metadataResponseTopic(error: Errors, topic: String, + topicId: Uuid, isInternal: Boolean, partitionData: util.List[MetadataResponsePartition]): MetadataResponseTopic = { new MetadataResponseTopic() .setErrorCode(error.code) .setName(topic) + .setTopicId(topicId) .setIsInternal(isInternal) .setPartitions(partitionData) } @@ -1174,6 +1176,7 @@ class KafkaApis(val requestChannel: RequestChannel, metadataResponseTopic( error, topic, + metadataCache.getTopicId(topic), Topic.isInternal(topic), util.Collections.emptyList() ) @@ -1191,16 +1194,29 @@ class KafkaApis(val requestChannel: RequestChannel, // Topic IDs are not supported for versions 10 and 11. Topic names can not be null in these versions. if (!metadataRequest.isAllTopics) { metadataRequest.data.topics.forEach{ topic => - if (topic.name == null) { + if (topic.name == null && metadataRequest.version < 12) { throw new InvalidRequestException(s"Topic name can not be null for version ${metadataRequest.version}") - } else if (topic.topicId != Uuid.ZERO_UUID) { + } else if (topic.topicId != Uuid.ZERO_UUID && metadataRequest.version < 12) { throw new InvalidRequestException(s"Topic IDs are not supported in requests for version ${metadataRequest.version}") } } } + // Check if topicId is presented firstly. + val topicIds = metadataRequest.topicIds.asScala.toSet.filterNot(_ == Uuid.ZERO_UUID) + val useTopicId = topicIds.nonEmpty + + // Only get topicIds and topicNames when supporting topicId + val unknownTopicIds = topicIds.filter(metadataCache.getTopicName(_).isEmpty) + val knownTopicNames = topicIds.flatMap(metadataCache.getTopicName) + + val unknownTopicIdsTopicMetadata = unknownTopicIds.map(topicId => + metadataResponseTopic(Errors.UNKNOWN_TOPIC_ID, null, topicId, false, util.Collections.emptyList())).toSeq + val topics = if (metadataRequest.isAllTopics) metadataCache.getAllTopics() + else if (useTopicId) + knownTopicNames else metadataRequest.topics.asScala.toSet @@ -1222,16 +1238,23 @@ class KafkaApis(val requestChannel: RequestChannel, } val unauthorizedForCreateTopicMetadata = unauthorizedForCreateTopics.map(topic => - metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, isInternal(topic), util.Collections.emptyList())) + // Set topicId to zero since we will never create topic which topicId + metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, Uuid.ZERO_UUID, isInternal(topic), util.Collections.emptyList())) // do not disclose the existence of topics unauthorized for Describe, so we've not even checked if they exist or not val unauthorizedForDescribeTopicMetadata = // In case of all topics, don't include topics unauthorized for Describe if ((requestVersion == 0 && (metadataRequest.topics == null || metadataRequest.topics.isEmpty)) || metadataRequest.isAllTopics) Set.empty[MetadataResponseTopic] - else + else if (useTopicId) { + // Topic IDs are not considered sensitive information, so returning TOPIC_AUTHORIZATION_FAILED is OK unauthorizedForDescribeTopics.map(topic => - metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, false, util.Collections.emptyList())) + metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, null, metadataCache.getTopicId(topic), false, util.Collections.emptyList())) + } else { + // We should not return topicId when on unauthorized error, so we return zero uuid. + unauthorizedForDescribeTopics.map(topic => + metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, Uuid.ZERO_UUID, false, util.Collections.emptyList())) + } // In version 0, we returned an error when brokers with replicas were unavailable, // while in higher versions we simply don't include the broker in the returned broker list @@ -1267,7 +1290,8 @@ class KafkaApis(val requestChannel: RequestChannel, } } - val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata + val completeTopicMetadata = unknownTopicIdsTopicMetadata ++ + topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata val brokers = metadataCache.getAliveBrokerNodes(request.context.listenerName) diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index a6d562365fc37..67285d7e5ddf6 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -54,6 +54,10 @@ trait MetadataCache { def getAliveBrokers(): Iterable[BrokerMetadata] + def getTopicId(topicName: String): Uuid + + def getTopicName(topicId: Uuid): Option[String] + def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): Option[Node] def getAliveBrokerNodes(listenerName: ListenerName): Iterable[Node] diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala index 6ae1922ad55a1..b7fbd17b643f4 100644 --- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala @@ -193,6 +193,10 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w } } + override def getTopicId(topicName: String): Uuid = _currentImage.topics().topicsByName().asScala.get(topicName).map(_.id()).getOrElse(Uuid.ZERO_UUID) + + override def getTopicName(topicId: Uuid): Option[String] = _currentImage.topics().topicsById.asScala.get(topicId).map(_.name()) + override def hasAliveBroker(brokerId: Int): Boolean = { Option(_currentImage.cluster().broker(brokerId)).count(!_.fenced()) == 1 } diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala index 71bfdf309111f..83b5eed21ac96 100755 --- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala @@ -234,6 +234,14 @@ class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging { metadataSnapshot.aliveBrokers.values.flatMap(_.getNode(listenerName)) } + def getTopicId(topicName: String): Uuid = { + metadataSnapshot.topicIds.getOrElse(topicName, Uuid.ZERO_UUID) + } + + def getTopicName(topicId: Uuid): Option[String] = { + metadataSnapshot.topicNames.get(topicId) + } + private def addOrUpdatePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]], topic: String, partitionId: Int, diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index 82bc477ff75d4..b2bece413b58a 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -221,7 +221,7 @@ object ReplicaVerificationTool extends Logging { private def listTopicsMetadata(adminClient: Admin): Seq[TopicDescription] = { val topics = adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names.get - adminClient.describeTopics(topics).all.get.values.asScala.toBuffer + adminClient.describeTopics(topics).allTopicNames.get.values.asScala.toBuffer } private def brokerDetails(adminClient: Admin): Map[Int, Node] = { diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala index b9aa2088eac66..7525cd217777e 100644 --- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala @@ -110,7 +110,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg assertFutureExceptionTypeEquals(failedCreateResult.replicationFactor("mytopic3"), classOf[TopicExistsException]) assertFutureExceptionTypeEquals(failedCreateResult.config("mytopic3"), classOf[TopicExistsException]) - val topicToDescription = client.describeTopics(topics.asJava).all.get() + val topicToDescription = client.describeTopics(topics.asJava).allTopicNames.get() assertEquals(topics.toSet, topicToDescription.keySet.asScala) val topic0 = topicToDescription.get("mytopic") @@ -225,7 +225,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg expectedNumPartitionsOpt: Option[Int] = None): TopicDescription = { var result: TopicDescription = null waitUntilTrue(() => { - val topicResult = client.describeTopics(Set(topic).asJava, describeOptions).values.get(topic) + val topicResult = client.describeTopics(Set(topic).asJava, describeOptions).topicNameValues().get(topic) try { result = topicResult.get expectedNumPartitionsOpt.map(_ == result.partitions.size).getOrElse(true) diff --git a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala index 6cc7c486e4a76..22fee477b8f04 100644 --- a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala +++ b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala @@ -183,13 +183,13 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS createTopic(Topic2) // test without includeAuthorizedOperations flag - var describeTopicsResult = client.describeTopics(Set(Topic1, Topic2).asJava).all.get() + var describeTopicsResult = client.describeTopics(Set(Topic1, Topic2).asJava).allTopicNames.get() assertNull(describeTopicsResult.get(Topic1).authorizedOperations) assertNull(describeTopicsResult.get(Topic2).authorizedOperations) // test with includeAuthorizedOperations flag describeTopicsResult = client.describeTopics(Set(Topic1, Topic2).asJava, - new DescribeTopicsOptions().includeAuthorizedOperations(true)).all.get() + new DescribeTopicsOptions().includeAuthorizedOperations(true)).allTopicNames.get() assertEquals(Set(AclOperation.DESCRIBE), describeTopicsResult.get(Topic1).authorizedOperations().asScala.toSet) assertEquals(Set(AclOperation.DESCRIBE), describeTopicsResult.get(Topic2).authorizedOperations().asScala.toSet) @@ -201,7 +201,7 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS val expectedOperations = AclEntry.supportedOperations(ResourceType.TOPIC).asJava describeTopicsResult = client.describeTopics(Set(Topic1, Topic2).asJava, - new DescribeTopicsOptions().includeAuthorizedOperations(true)).all.get() + new DescribeTopicsOptions().includeAuthorizedOperations(true)).allTopicNames.get() assertEquals(expectedOperations, describeTopicsResult.get(Topic1).authorizedOperations()) assertEquals(Set(AclOperation.DESCRIBE, AclOperation.DELETE), describeTopicsResult.get(Topic2).authorizedOperations().asScala.toSet) diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index f36164163e38c..df60010cf287f 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -346,7 +346,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas consumer.assign(List(tp).asJava) assertThrows(classOf[TopicAuthorizationException], () => consumeRecords(consumer, numRecords, topic = tp.topic)) val adminClient = createAdminClient() - val e1 = assertThrows(classOf[ExecutionException], () => adminClient.describeTopics(Set(topic).asJava).all().get()) + val e1 = assertThrows(classOf[ExecutionException], () => adminClient.describeTopics(Set(topic).asJava).allTopicNames().get()) assertTrue(e1.getCause.isInstanceOf[TopicAuthorizationException], "Unexpected exception " + e1.getCause) // Verify successful produce/consume/describe on another topic using the same producer, consumer and adminClient @@ -356,9 +356,9 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas sendRecords(producer, numRecords, tp2) consumer.assign(List(tp2).asJava) consumeRecords(consumer, numRecords, topic = topic2) - val describeResults = adminClient.describeTopics(Set(topic, topic2).asJava).values + val describeResults = adminClient.describeTopics(Set(topic, topic2).asJava).topicNameValues() assertEquals(1, describeResults.get(topic2).get().partitions().size()) - val e2 = assertThrows(classOf[ExecutionException], () => adminClient.describeTopics(Set(topic).asJava).all().get()) + val e2 = assertThrows(classOf[ExecutionException], () => adminClient.describeTopics(Set(topic).asJava).allTopicNames().get()) assertTrue(e2.getCause.isInstanceOf[TopicAuthorizationException], "Unexpected exception " + e2.getCause) // Verify that consumer manually assigning both authorized and unauthorized topic doesn't consume @@ -382,7 +382,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } sendRecords(producer, numRecords, tp) consumeRecordsIgnoreOneAuthorizationException(consumer, numRecords, startingOffset = 0, topic) - val describeResults2 = adminClient.describeTopics(Set(topic, topic2).asJava).values + val describeResults2 = adminClient.describeTopics(Set(topic, topic2).asJava).topicNameValues assertEquals(1, describeResults2.get(topic).get().partitions().size()) assertEquals(1, describeResults2.get(topic2).get().partitions().size()) } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 6885c79fb8e8b..36586ed2fde0d 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -40,17 +40,17 @@ import org.apache.kafka.common.errors._ import org.apache.kafka.common.requests.{DeleteRecordsRequest, MetadataResponse} import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType} import org.apache.kafka.common.utils.{Time, Utils} -import org.apache.kafka.common.{ConsumerGroupState, ElectionType, TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica} +import org.apache.kafka.common.{ConsumerGroupState, ElectionType, TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, Uuid} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test} import org.slf4j.LoggerFactory import scala.annotation.nowarn -import scala.jdk.CollectionConverters._ import scala.collection.Seq import scala.compat.java8.OptionConverters._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} +import scala.jdk.CollectionConverters._ import scala.util.Random /** @@ -145,7 +145,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get controller.shutdown() controller.awaitShutdown() - val topicDesc = client.describeTopics(topics.asJava).all.get() + val topicDesc = client.describeTopics(topics.asJava).allTopicNames.get() assertEquals(topics.toSet, topicDesc.keySet.asScala) } @@ -161,12 +161,28 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { waitForTopics(client, Seq(existingTopic), List()) val nonExistingTopic = "non-existing" - val results = client.describeTopics(Seq(nonExistingTopic, existingTopic).asJava).values + val results = client.describeTopics(Seq(nonExistingTopic, existingTopic).asJava).topicNameValues() assertEquals(existingTopic, results.get(existingTopic).get.name) assertThrows(classOf[ExecutionException], () => results.get(nonExistingTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException] assertEquals(None, zkClient.getTopicPartitionCount(nonExistingTopic)) } + @Test + def testDescribeTopicsWithIds(): Unit = { + client = Admin.create(createConfig) + + val existingTopic = "existing-topic" + client.createTopics(Seq(existingTopic).map(new NewTopic(_, 1, 1.toShort)).asJava).all.get() + waitForTopics(client, Seq(existingTopic), List()) + val existingTopicId = zkClient.getTopicIdsForTopics(Set(existingTopic)).values.head + + val nonExistingTopicId = Uuid.randomUuid() + + val results = client.describeTopics(TopicCollection.ofTopicIds(Seq(existingTopicId, nonExistingTopicId).asJava)).topicIdValues() + assertEquals(existingTopicId, results.get(existingTopicId).get.topicId()) + assertThrows(classOf[ExecutionException], () => results.get(nonExistingTopicId).get).getCause.isInstanceOf[UnknownTopicIdException] + } + @Test def testDescribeCluster(): Unit = { client = Admin.create(createConfig) diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala index 2d4abf4b6724e..2a7d5c92f9307 100644 --- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala @@ -136,7 +136,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with def describeTopic(): Unit = { try { - val response = adminClient.describeTopics(Collections.singleton(topic)).all.get + val response = adminClient.describeTopics(Collections.singleton(topic)).allTopicNames.get assertEquals(1, response.size) response.forEach { (topic, description) => assertEquals(numPartitions, description.partitions.size) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 08a55aedef101..84e8099928470 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -675,7 +675,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet consumer.commitSync() def partitionInfo: TopicPartitionInfo = - adminClients.head.describeTopics(Collections.singleton(topic)).values.get(topic).get().partitions().get(0) + adminClients.head.describeTopics(Collections.singleton(topic)).topicNameValues().get(topic).get().partitions().get(0) val partitionInfo0 = partitionInfo assertEquals(partitionInfo0.replicas.get(0), partitionInfo0.leader) diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index b7ca0f774ecdb..25e79ad6e8abf 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -393,7 +393,7 @@ class KRaftClusterTest { var currentMapping: Seq[Seq[Int]] = Seq() val expectedMapping = Seq(Seq(2, 1, 0), Seq(0, 1, 2), Seq(2, 3), Seq(3, 2, 0, 1)) TestUtils.waitUntilTrue( () => { - val topicInfoMap = admin.describeTopics(Collections.singleton("foo")).all().get() + val topicInfoMap = admin.describeTopics(Collections.singleton("foo")).allTopicNames().get() if (topicInfoMap.containsKey("foo")) { currentMapping = translatePartitionInfoToSeq(topicInfoMap.get("foo").partitions()) expectedMapping.equals(currentMapping) diff --git a/core/src/test/scala/integration/kafka/server/MetadataRequestBetweenDifferentIbpTest.scala b/core/src/test/scala/integration/kafka/server/MetadataRequestBetweenDifferentIbpTest.scala new file mode 100644 index 0000000000000..387c24432641b --- /dev/null +++ b/core/src/test/scala/integration/kafka/server/MetadataRequestBetweenDifferentIbpTest.scala @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.api.{ApiVersion, KAFKA_2_8_IV0} +import kafka.network.SocketServer +import kafka.utils.TestUtils +import kafka.zk.ZkVersion +import org.apache.kafka.common.Uuid +import org.apache.kafka.common.message.MetadataRequestData +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse} +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +import scala.collection.{Map, Seq} + +class MetadataRequestBetweenDifferentIbpTest extends BaseRequestTest { + + override def brokerCount: Int = 3 + override def generateConfigs: Seq[KafkaConfig] = { + Seq( + createConfig(0, KAFKA_2_8_IV0), + createConfig(1, ApiVersion.latestVersion), + createConfig(2, ApiVersion.latestVersion) + ) + } + + @Test + def testUnknownTopicId(): Unit = { + val topic = "topic" + + // Kill controller and restart until broker with latest ibp become controller + ensureControllerIn(Seq(1, 2)) + createTopic(topic, Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1))) + + val resp1 = sendMetadataRequest(new MetadataRequest(requestData(topic, Uuid.ZERO_UUID), 12.toShort), controllerSocketServer) + val topicId = resp1.topicMetadata.iterator().next().topicId() + + // We could still get topic metadata by topicId + val topicMetadata = sendMetadataRequest(new MetadataRequest(requestData(null, topicId), 12.toShort), controllerSocketServer) + .topicMetadata.iterator().next() + assertEquals(topicId, topicMetadata.topicId()) + assertEquals(topic, topicMetadata.topic()) + + // Make the broker whose version=KAFKA_2_8_IV0 controller + ensureControllerIn(Seq(0)) + + // Restart the broker whose ibp is higher, and the controller will send metadata request to it + killBroker(1) + restartDeadBrokers() + + // Send request to a broker whose ibp is higher and restarted just now + val resp2 = sendMetadataRequest(new MetadataRequest(requestData(topic, topicId), 12.toShort), brokerSocketServer(1)) + assertEquals(Errors.UNKNOWN_TOPIC_ID, resp2.topicMetadata.iterator().next().error()) + } + + private def ensureControllerIn(brokerIds: Seq[Int]): Unit = { + while (!brokerIds.contains(controllerSocketServer.config.brokerId)) { + zkClient.deleteController(ZkVersion.MatchAnyVersion) + TestUtils.waitUntilControllerElected(zkClient) + } + } + + private def createConfig(nodeId: Int,interBrokerVersion: ApiVersion): KafkaConfig = { + val props = TestUtils.createBrokerConfig(nodeId, zkConnect) + props.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerVersion.version) + KafkaConfig.fromProps(props) + } + + def requestData(topic: String, topicId: Uuid): MetadataRequestData = { + val data = new MetadataRequestData + data.topics.add(new MetadataRequestData.MetadataRequestTopic().setName(topic).setTopicId(topicId)) + data + } + + private def sendMetadataRequest(request: MetadataRequest, destination: SocketServer): MetadataResponse = { + connectAndReceive[MetadataResponse](request, destination) + } + +} diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 4ce76583e3d95..4a53c67502eaa 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -147,7 +147,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { private def waitUntilTopicGone(adminClient: Admin, topicName: String): Unit = { TestUtils.waitUntilTrue(() => { try { - adminClient.describeTopics(util.Collections.singletonList(topicName)).all().get() + adminClient.describeTopics(util.Collections.singletonList(topicName)).allTopicNames().get() false } catch { case e: ExecutionException => diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala index 9a9fca8bd12e1..19dbd6a7cdbc1 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala @@ -110,7 +110,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi val partitions = adminClient .describeTopics(Collections.singletonList(testTopicName)) - .all() + .allTopicNames() .get() .get(testTopicName) .partitions() @@ -125,7 +125,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi val partitions = adminClient .describeTopics(Collections.singletonList(testTopicName)) - .all() + .allTopicNames() .get() .get(testTopicName) .partitions() @@ -140,7 +140,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi val partitions = adminClient .describeTopics(Collections.singletonList(testTopicName)) - .all() + .allTopicNames() .get() .get(testTopicName) .partitions() @@ -190,7 +190,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi val partitions = adminClient .describeTopics(Collections.singletonList(testTopicName)) - .all() + .allTopicNames() .get() .get(testTopicName) .partitions() @@ -287,7 +287,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi topicService.alterTopic(new TopicCommandOptions( Array("--topic", testTopicName, "--partitions", "3"))) - val topicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)).values().get(testTopicName).get() + val topicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get() assertTrue(topicDescription.partitions().size() == 3) } @@ -300,7 +300,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi topicService.alterTopic(new TopicCommandOptions( Array("--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2", "--partitions", "3"))) - val topicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)).values().get(testTopicName).get() + val topicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get() assertTrue(topicDescription.partitions().size() == 3) assertEquals(List(4,2), topicDescription.partitions().get(2).replicas().asScala.map(_.id())) } @@ -491,7 +491,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi try { // check which partition is on broker 0 which we'll kill val testTopicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)) - .all().get().asScala(testTopicName) + .allTopicNames().get().asScala(testTopicName) val partitionOnBroker0 = testTopicDescription.partitions().asScala.find(_.leader().id() == 0).get.partition() killBroker(0) @@ -586,7 +586,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi val brokerIds = servers.map(_.config.brokerId) TestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp), throttleBytes = 1) - val testTopicDesc = adminClient.describeTopics(Collections.singleton(testTopicName)).all().get().get(testTopicName) + val testTopicDesc = adminClient.describeTopics(Collections.singleton(testTopicName)).allTopicNames().get().get(testTopicName) val firstPartition = testTopicDesc.partitions().asScala.head val replicasOfFirstPartition = firstPartition.replicas().asScala.map(_.id()) @@ -797,7 +797,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi Collections.emptyList(), Collections.emptyList()) val describeResult = AdminClientTestUtils.describeTopicsResult(testTopicName, new TopicDescription( testTopicName, false, Collections.singletonList(topicPartitionInfo))) - when(adminClient.describeTopics(any())).thenReturn(describeResult) + when(adminClient.describeTopics(any(classOf[java.util.Collection[String]]))).thenReturn(describeResult) val result = AdminClientTestUtils.createPartitionsResult(testTopicName, Errors.THROTTLING_QUOTA_EXCEEDED.exception()) when(adminClient.createPartitions(any(), any())).thenReturn(result) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 9739e929627af..1ddd63a854c9d 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -2232,6 +2232,117 @@ class KafkaApisTest { assertTrue(response.topicsByError(Errors.UNKNOWN_TOPIC_OR_PARTITION).isEmpty) } + @Test + def testUnauthorizedTopicMetadataRequest(): Unit = { + // 1. Set up broker information + val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) + val broker = new UpdateMetadataBroker() + .setId(0) + .setRack("rack") + .setEndpoints(Seq( + new UpdateMetadataEndpoint() + .setHost("broker0") + .setPort(9092) + .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) + .setListener(plaintextListener.value) + ).asJava) + + // 2. Set up authorizer + val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer]) + val unauthorizedTopic = "unauthorized-topic" + val authorizedTopic = "authorized-topic" + + val expectedActions = Seq( + new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, unauthorizedTopic, PatternType.LITERAL), 1, true, true), + new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, authorizedTopic, PatternType.LITERAL), 1, true, true) + ) + + // Here we need to use AuthHelperTest.matchSameElements instead of EasyMock.eq since the order of the request is unknown + EasyMock.expect(authorizer.authorize(anyObject[RequestContext], AuthHelperTest.matchSameElements(expectedActions.asJava))) + .andAnswer { () => + val actions = EasyMock.getCurrentArguments.apply(1).asInstanceOf[util.List[Action]].asScala + actions.map { action => + if (action.resourcePattern().name().equals(authorizedTopic)) + AuthorizationResult.ALLOWED + else + AuthorizationResult.DENIED + }.asJava + }.times(2) + + // 3. Set up MetadataCache + val authorizedTopicId = Uuid.randomUuid() + val unauthorizedTopicId = Uuid.randomUuid(); + + val topicIds = new util.HashMap[String, Uuid]() + topicIds.put(authorizedTopic, authorizedTopicId) + topicIds.put(unauthorizedTopic, unauthorizedTopicId) + + def createDummyPartitionStates(topic: String) = { + new UpdateMetadataPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(0) + .setReplicas(Collections.singletonList(0)) + .setZkVersion(0) + .setIsr(Collections.singletonList(0)) + } + + // Send UpdateMetadataReq to update MetadataCache + val partitionStates = Seq(unauthorizedTopic, authorizedTopic).map(createDummyPartitionStates) + + val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0, + 0, 0, partitionStates.asJava, Seq(broker).asJava, topicIds).build() + metadataCache.asInstanceOf[ZkMetadataCache].updateMetadata(correlationId = 0, updateMetadataRequest) + + // 4. Send TopicMetadataReq using topicId + val metadataReqByTopicId = new MetadataRequest.Builder(util.Arrays.asList(authorizedTopicId, unauthorizedTopicId)).build() + val repByTopicId = buildRequest(metadataReqByTopicId, plaintextListener) + val capturedMetadataByTopicIdResp = expectNoThrottling(repByTopicId) + EasyMock.replay(clientRequestQuotaManager, requestChannel, authorizer) + + createKafkaApis(authorizer = Some(authorizer)).handleTopicMetadataRequest(repByTopicId) + val metadataByTopicIdResp = capturedMetadataByTopicIdResp.getValue.asInstanceOf[MetadataResponse] + + val metadataByTopicId = metadataByTopicIdResp.data().topics().asScala.groupBy(_.topicId()).map(kv => (kv._1, kv._2.head)) + + metadataByTopicId.foreach{ case (topicId, metadataResponseTopic) => + if (topicId == unauthorizedTopicId) { + // Return an TOPIC_AUTHORIZATION_FAILED on unauthorized error regardless of leaking the existence of topic id + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), metadataResponseTopic.errorCode()) + // Do not return topic information on unauthorized error + assertNull(metadataResponseTopic.name()) + } else { + assertEquals(Errors.NONE.code(), metadataResponseTopic.errorCode()) + assertEquals(authorizedTopic, metadataResponseTopic.name()) + } + } + + // 4. Send TopicMetadataReq using topic name + EasyMock.reset(clientRequestQuotaManager, requestChannel) + val metadataReqByTopicName = new MetadataRequest.Builder(util.Arrays.asList(authorizedTopic, unauthorizedTopic), false).build() + val repByTopicName = buildRequest(metadataReqByTopicName, plaintextListener) + val capturedMetadataByTopicNameResp = expectNoThrottling(repByTopicName) + EasyMock.replay(clientRequestQuotaManager, requestChannel) + + createKafkaApis(authorizer = Some(authorizer)).handleTopicMetadataRequest(repByTopicName) + val metadataByTopicNameResp = capturedMetadataByTopicNameResp.getValue.asInstanceOf[MetadataResponse] + + val metadataByTopicName = metadataByTopicNameResp.data().topics().asScala.groupBy(_.name()).map(kv => (kv._1, kv._2.head)) + + metadataByTopicName.foreach{ case (topicName, metadataResponseTopic) => + if (topicName == unauthorizedTopic) { + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), metadataResponseTopic.errorCode()) + // Do not return topic Id on unauthorized error + assertEquals(Uuid.ZERO_UUID, metadataResponseTopic.topicId()) + } else { + assertEquals(Errors.NONE.code(), metadataResponseTopic.errorCode()) + assertEquals(authorizedTopicId, metadataResponseTopic.topicId()) + } + } + } + /** * Verifies that sending a fetch request with version 9 works correctly when * ReplicaManager.getLogConfig returns None. diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 5620bf7f5200a..64003b79cff2b 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1631,7 +1631,7 @@ object TestUtils extends Logging { waitUntilTrue(() => { try { - val topicResult = client.describeTopics(Arrays.asList(topic)).all.get.get(topic) + val topicResult = client.describeTopics(Arrays.asList(topic)).allTopicNames.get.get(topic) val partitionResult = topicResult.partitions.get(partition) Option(partitionResult.leader).map(_.id) == leader } catch { @@ -1643,7 +1643,7 @@ object TestUtils extends Logging { def waitForBrokersOutOfIsr(client: Admin, partition: Set[TopicPartition], brokerIds: Set[Int]): Unit = { waitUntilTrue( () => { - val description = client.describeTopics(partition.map(_.topic).asJava).all.get.asScala + val description = client.describeTopics(partition.map(_.topic).asJava).allTopicNames.get.asScala val isr = description .values .flatMap(_.partitions.asScala.flatMap(_.isr.asScala)) @@ -1659,7 +1659,7 @@ object TestUtils extends Logging { def waitForBrokersInIsr(client: Admin, partition: TopicPartition, brokerIds: Set[Int]): Unit = { waitUntilTrue( () => { - val description = client.describeTopics(Set(partition.topic).asJava).all.get.asScala + val description = client.describeTopics(Set(partition.topic).asJava).allTopicNames.get.asScala val isr = description .values .flatMap(_.partitions.asScala.flatMap(_.isr.asScala)) @@ -1675,7 +1675,7 @@ object TestUtils extends Logging { def waitForReplicasAssigned(client: Admin, partition: TopicPartition, brokerIds: Seq[Int]): Unit = { waitUntilTrue( () => { - val description = client.describeTopics(Set(partition.topic).asJava).all.get.asScala + val description = client.describeTopics(Set(partition.topic).asJava).allTopicNames.get.asScala val replicas = description .values .flatMap(_.partitions.asScala.flatMap(_.replicas.asScala)) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java index 8d9e2da0b2bc3..6b47e269d4813 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java @@ -399,7 +399,7 @@ private boolean isPartitionsCountSameAsConfigured(AdminClient adminClient, String topicName) throws InterruptedException, ExecutionException { log.debug("Getting topic details to check for partition count and replication factor."); TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton(topicName)) - .values().get(topicName).get(); + .topicNameValues().get(topicName).get(); int expectedPartitions = rlmmConfig.metadataTopicPartitionsCount(); int topicPartitionsSize = topicDescription.partitions().size(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index 12e62427e1ca4..695492122a3c7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -157,7 +157,7 @@ public ValidationResult validate(final Map topicCon Map> descriptionsForTopic = Collections.emptyMap(); if (!topicDescriptionsStillToValidate.isEmpty()) { final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topicDescriptionsStillToValidate); - descriptionsForTopic = describeTopicsResult.values(); + descriptionsForTopic = describeTopicsResult.topicNameValues(); } Map> configsForTopic = Collections.emptyMap(); if (!topicConfigsStillToValidate.isEmpty()) { @@ -515,7 +515,7 @@ protected Map getNumPartitions(final Set topics, log.debug("Trying to check if topics {} have been created with expected number of partitions.", topics); final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topics); - final Map> futures = describeTopicsResult.values(); + final Map> futures = describeTopicsResult.topicNameValues(); final Map existedTopicPartition = new HashMap<>(); for (final Map.Entry> topicFuture : futures.entrySet()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java index c2dee61a57baa..1e7f685debbf1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java @@ -675,7 +675,7 @@ public void shouldGoThroughRebalancingCorrectly() throws Exception { private int getNumberOfPartitionsForTopic(final String topic) throws Exception { try (final AdminClient adminClient = createAdminClient()) { final TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton(topic)) - .values() + .topicNameValues() .get(topic) .get(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java index c0fc7963df1fa..512d1c13bd4c0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java @@ -205,7 +205,7 @@ private KafkaStreams startStreams(final StreamsBuilder builder) throws Interrupt private int getNumberOfPartitionsForTopic(final String topic) throws Exception { try (final AdminClient adminClient = createAdminClient()) { final TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton(topic)) - .values() + .topicNameValues() .get(topic) .get(IntegrationTestUtils.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java index abfa5c4bd0f95..853d8d77c3a02 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java @@ -632,17 +632,17 @@ public void shouldCreateRequiredTopics() throws Exception { { add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())); } - }), mockAdminClient.describeTopics(Collections.singleton(topic1)).values().get(topic1).get()); + }), mockAdminClient.describeTopics(Collections.singleton(topic1)).topicNameValues().get(topic1).get()); assertEquals(new TopicDescription(topic2, false, new ArrayList() { { add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())); } - }), mockAdminClient.describeTopics(Collections.singleton(topic2)).values().get(topic2).get()); + }), mockAdminClient.describeTopics(Collections.singleton(topic2)).topicNameValues().get(topic2).get()); assertEquals(new TopicDescription(topic3, false, new ArrayList() { { add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())); } - }), mockAdminClient.describeTopics(Collections.singleton(topic3)).values().get(topic3).get()); + }), mockAdminClient.describeTopics(Collections.singleton(topic3)).topicNameValues().get(topic3).get()); final ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic1); final ConfigResource resource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2); @@ -1732,7 +1732,7 @@ private static class MockDeleteTopicsResult extends DeleteTopicsResult { private static class MockDescribeTopicsResult extends DescribeTopicsResult { MockDescribeTopicsResult(final Map> futures) { - super(futures); + super(null, futures); } } diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java index a5d6c7a835dbb..e40ca7ae82ecf 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java +++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java @@ -338,7 +338,7 @@ private void createTopicsResultTest(Admin client, Collection topics) throws InterruptedException, ExecutionException { while (true) { try { - client.describeTopics(topics).all().get(); + client.describeTopics(topics).allTopicNames().get(); break; } catch (ExecutionException e) { if (e.getCause() instanceof UnknownTopicOrPartitionException) diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java index b8f1c9dad182b..10a368a7c5f5c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java @@ -750,7 +750,7 @@ private void findTopicPartitions( List topicPartitions ) throws Exception { try { - Map topicDescriptions = admin.describeTopics(topics).all().get(); + Map topicDescriptions = admin.describeTopics(topics).allTopicNames().get(); topicDescriptions.forEach((topic, description) -> { description.partitions().forEach(partitionInfo -> { if (!brokerId.isPresent() || hasReplica(brokerId.get(), partitionInfo)) { diff --git a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java index 968c34a13a8b8..98704e593e91f 100644 --- a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java @@ -540,7 +540,7 @@ private void expectDescribeTopics( Map descriptions ) { DescribeTopicsResult result = Mockito.mock(DescribeTopicsResult.class); - Mockito.when(result.all()).thenReturn(completedFuture(descriptions)); + Mockito.when(result.allTopicNames()).thenReturn(completedFuture(descriptions)); Mockito.when(admin.describeTopics(new ArrayList<>(descriptions.keySet()))).thenReturn(result); } diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java index 812129e59f9b7..23c0ba4fcd0e8 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java @@ -279,7 +279,7 @@ private static Map topicDescriptions(Collection getMatchingTopicPartitions( List out = new ArrayList<>(); DescribeTopicsResult topicsResult = adminClient.describeTopics( matchedTopics, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT)); - Map topicDescriptionMap = topicsResult.all().get(); + Map topicDescriptionMap = topicsResult.allTopicNames().get(); for (TopicDescription desc: topicDescriptionMap.values()) { List partitions = desc.partitions(); for (TopicPartitionInfo info: partitions) { diff --git a/trogdor/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java b/trogdor/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java index 738ec3c82aa80..a5fbc85950b38 100644 --- a/trogdor/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java +++ b/trogdor/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java @@ -81,7 +81,7 @@ public void testCreateOneTopic() throws Throwable { Collections.singletonList( new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()))), adminClient.describeTopics( - Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get() + Collections.singleton(TEST_TOPIC)).topicNameValues().get(TEST_TOPIC).get() ); } @@ -98,7 +98,7 @@ public void testCreateRetriesOnTimeout() throws Throwable { Collections.singletonList( new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()))), adminClient.describeTopics( - Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get() + Collections.singleton(TEST_TOPIC)).topicNameValues().get(TEST_TOPIC).get() ); } @@ -177,7 +177,7 @@ public void testCreatesNotExistingTopics() throws Throwable { TEST_TOPIC, false, Collections.singletonList( new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()))), - adminClient.describeTopics(Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get() + adminClient.describeTopics(Collections.singleton(TEST_TOPIC)).topicNameValues().get(TEST_TOPIC).get() ); }