diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index 551c7dfe34658..8fd02948d2473 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.TopicPartitionReplica; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBindingFilter; @@ -251,6 +252,41 @@ default DeleteTopicsResult deleteTopics(Collection topics) { * @return The DeleteTopicsResult. */ DeleteTopicsResult deleteTopics(Collection topics, DeleteTopicsOptions options); + + /** + * This is a convenience method for {@link #deleteTopicsWithIds(Collection, DeleteTopicsOptions)} + * with default options. See the overload for more details. + *

+ * This operation is supported by brokers with version 2.8.0 or higher. + * + * @param topics The topic IDs for the topics to delete. + * @return The DeleteTopicsWithIdsResult. + */ + default DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection topics) { + return deleteTopicsWithIds(topics, new DeleteTopicsOptions()); + } + + /** + * Delete a batch of topics. + *

+ * This operation is not transactional so it may succeed for some topics while fail for others. + *

+ * It may take several seconds after the {@link DeleteTopicsWithIdsResult} returns + * success for all the brokers to become aware that the topics are gone. + * During this time, {@link #listTopics()} and {@link #describeTopics(Collection)} + * may continue to return information about the deleted topics. + *

+ * If delete.topic.enable is false on the brokers, deleteTopicsWithIds will mark + * the topics for deletion, but not actually delete them. The futures will + * return successfully in this case. + *

+ * This operation is supported by brokers with version 2.8.0 or higher. + * + * @param topics The topic IDs for the topics to delete. + * @param options The options to use when deleting the topics. + * @return The DeleteTopicsWithIdsResult. + */ + DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection topics, DeleteTopicsOptions options); /** * List the topics available in the cluster with the default options. diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java index eac086a34c7bc..0b75959cd8093 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.errors.ApiException; @@ -68,6 +69,19 @@ public KafkaFuture config(String topic) { return futures.get(topic).thenApply(TopicMetadataAndConfig::config); } + /** + * Returns a future that provides topic ID for the topic when the request completes. + *

+ * If broker version doesn't support replication factor in the response, throw + * {@link org.apache.kafka.common.errors.UnsupportedVersionException}. + * If broker returned an error for topic configs, throw appropriate exception. For example, + * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not + * have permission to describe topic configs. + */ + public KafkaFuture topicId(String topic) { + return futures.get(topic).thenApply(TopicMetadataAndConfig::topicId); + } + /** * Returns a future that provides number of partitions in the topic when the request completes. *

@@ -96,12 +110,14 @@ public KafkaFuture replicationFactor(String topic) { public static class TopicMetadataAndConfig { private final ApiException exception; + private final Uuid topicId; private final int numPartitions; private final int replicationFactor; private final Config config; - TopicMetadataAndConfig(int numPartitions, int replicationFactor, Config config) { + TopicMetadataAndConfig(Uuid topicId, int numPartitions, int replicationFactor, Config config) { this.exception = null; + this.topicId = topicId; this.numPartitions = numPartitions; this.replicationFactor = replicationFactor; this.config = config; @@ -109,10 +125,16 @@ public static class TopicMetadataAndConfig { TopicMetadataAndConfig(ApiException exception) { this.exception = exception; + this.topicId = Uuid.ZERO_UUID; this.numPartitions = UNKNOWN; this.replicationFactor = UNKNOWN; this.config = null; } + + public Uuid topicId() { + ensureSuccess(); + return topicId; + } public int numPartitions() { ensureSuccess(); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsWithIdsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsWithIdsResult.java new file mode 100644 index 0000000000000..eeb91194a96ff --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsWithIdsResult.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Collection; +import java.util.Map; + +/** + * The result of the {@link Admin#deleteTopicsWithIds(Collection)} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DeleteTopicsWithIdsResult { + final Map> futures; + + DeleteTopicsWithIdsResult(Map> futures) { + this.futures = futures; + } + + /** + * Return a map from topic IDs to futures which can be used to check the status of + * individual deletions. + */ + public Map> values() { + return futures; + } + + /** + * Return a future which succeeds only if all the topic deletions succeed. + */ + public KafkaFuture all() { + return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 489f74da0afc2..db9400bc49f70 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -49,6 +49,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; @@ -67,6 +68,7 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnacceptableCredentialException; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnknownTopicIdException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -108,6 +110,7 @@ import org.apache.kafka.common.message.DeleteRecordsResponseData; import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsTopicResult; import org.apache.kafka.common.message.DeleteTopicsRequestData; +import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState; import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult; import org.apache.kafka.common.message.DescribeClusterRequestData; import org.apache.kafka.common.message.DescribeConfigsRequestData; @@ -1554,7 +1557,7 @@ public void handleResponse(AbstractResponse abstractResponse) { Config topicConfig = new Config(configs.stream() .map(this::configEntry) .collect(Collectors.toSet())); - topicMetadataAndConfig = new TopicMetadataAndConfig(result.numPartitions(), + topicMetadataAndConfig = new TopicMetadataAndConfig(result.topicId(), result.numPartitions(), result.replicationFactor(), topicConfig); } @@ -1625,6 +1628,32 @@ public DeleteTopicsResult deleteTopics(final Collection topicNames, return new DeleteTopicsResult(new HashMap<>(topicFutures)); } + @Override + public DeleteTopicsWithIdsResult deleteTopicsWithIds(final Collection topicIds, + final DeleteTopicsOptions options) { + final Map> topicFutures = new HashMap<>(topicIds.size()); + final List validTopicIds = new ArrayList<>(topicIds.size()); + for (Uuid topicId : topicIds) { + if (topicId.equals(Uuid.ZERO_UUID)) { + KafkaFutureImpl future = new KafkaFutureImpl<>(); + future.completeExceptionally(new UnknownTopicIdException("The given topic ID '" + + topicId + "' cannot be represented in a request.")); + topicFutures.put(topicId, future); + } else if (!topicFutures.containsKey(topicId)) { + topicFutures.put(topicId, new KafkaFutureImpl<>()); + validTopicIds.add(topicId); + } + } + if (!validTopicIds.isEmpty()) { + final long now = time.milliseconds(); + final long deadline = calcDeadlineMs(now, options.timeoutMs()); + final Call call = getDeleteTopicsWithIdsCall(options, topicFutures, validTopicIds, + Collections.emptyMap(), now, deadline); + runnable.call(call, now); + } + return new DeleteTopicsWithIdsResult(new HashMap<>(topicFutures)); + } + private Call getDeleteTopicsCall(final DeleteTopicsOptions options, final Map> futures, final List topics, @@ -1696,6 +1725,79 @@ void handleFailure(Throwable throwable) { } }; } + + private Call getDeleteTopicsWithIdsCall(final DeleteTopicsOptions options, + final Map> futures, + final List topicIds, + final Map quotaExceededExceptions, + final long now, + final long deadline) { + return new Call("deleteTopics", deadline, new ControllerNodeProvider()) { + @Override + DeleteTopicsRequest.Builder createRequest(int timeoutMs) { + return new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopics(topicIds.stream().map( + topic -> new DeleteTopicState().setTopicId(topic)).collect(Collectors.toList())) + .setTimeoutMs(timeoutMs)); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + // Check for controller change + handleNotControllerError(abstractResponse); + // Handle server responses for particular topics. + final DeleteTopicsResponse response = (DeleteTopicsResponse) abstractResponse; + final List retryTopics = new ArrayList<>(); + final Map retryTopicQuotaExceededExceptions = new HashMap<>(); + for (DeletableTopicResult result : response.data().responses()) { + KafkaFutureImpl future = futures.get(result.topicId()); + if (future == null) { + log.warn("Server response mentioned unknown topic ID {}", result.topicId()); + } else { + ApiError error = new ApiError(result.errorCode(), result.errorMessage()); + if (error.isFailure()) { + if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) { + ThrottlingQuotaExceededException quotaExceededException = new ThrottlingQuotaExceededException( + response.throttleTimeMs(), error.messageWithFallback()); + if (options.shouldRetryOnQuotaViolation()) { + retryTopics.add(result.topicId()); + retryTopicQuotaExceededExceptions.put(result.topicId(), quotaExceededException); + } else { + future.completeExceptionally(quotaExceededException); + } + } else { + future.completeExceptionally(error.exception()); + } + } else { + future.complete(null); + } + } + } + // If there are topics to retry, retry them; complete unrealized futures otherwise. + if (retryTopics.isEmpty()) { + // The server should send back a response for every topic. But do a sanity check anyway. + completeUnrealizedFutures(futures.entrySet().stream(), + topic -> "The controller response did not contain a result for topic " + topic); + } else { + final long now = time.milliseconds(); + final Call call = getDeleteTopicsWithIdsCall(options, futures, retryTopics, + retryTopicQuotaExceededExceptions, now, deadline); + runnable.call(call, now); + } + } + + @Override + void handleFailure(Throwable throwable) { + // If there were any topics retries due to a quota exceeded exception, we propagate + // the initial error back to the caller if the request timed out. + maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(), + throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now)); + // Fail all the other remaining futures + completeAllExceptionally(futures.values(), throwable); + } + }; + } @Override public ListTopicsResult listTopics(final ListTopicsOptions options) { diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicIdException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicIdException.java new file mode 100644 index 0000000000000..e5023eb7cdbb3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicIdException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class UnknownTopicIdException extends InvalidMetadataException { + + private static final long serialVersionUID = 1L; + + public UnknownTopicIdException(String message) { + super(message); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index ed505f7195c35..3f7f750ae378b 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -111,6 +111,7 @@ import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.errors.UnknownProducerIdException; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnknownTopicIdException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnstableOffsetCommitException; import org.apache.kafka.common.errors.UnsupportedByAuthenticationException; @@ -350,7 +351,8 @@ public enum Errors { POSITION_OUT_OF_RANGE( 99, "Requested position is not greater than or equal to zero, and less than the size of the snapshot.", - PositionOutOfRangeException::new); + PositionOutOfRangeException::new), + UNKNOWN_TOPIC_ID(100, "This server does not host this topic ID.", UnknownTopicIdException::new); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java index dfd2e72df03d5..995526d369b93 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java @@ -16,13 +16,19 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.DeleteTopicsRequestData; import org.apache.kafka.common.message.DeleteTopicsResponseData; import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult; +import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; public class DeleteTopicsRequest extends AbstractRequest { @@ -36,8 +42,25 @@ public Builder(DeleteTopicsRequestData data) { @Override public DeleteTopicsRequest build(short version) { + if (version >= 6 && !data.topicNames().isEmpty()) { + data.setTopics(groupByTopic(data.topicNames())); + } else if (version >= 6) { + for (DeleteTopicState topic : data.topics()) { + if (topic.name() != null && topic.name().equals("")) { + topic.setName(null); + } + } + } return new DeleteTopicsRequest(data, version); } + + private List groupByTopic(List topics) { + List topicStates = new ArrayList<>(); + for (String topic : topics) { + topicStates.add(new DeleteTopicState().setName(topic)); + } + return topicStates; + } @Override public String toString() { @@ -64,13 +87,32 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { response.setThrottleTimeMs(throttleTimeMs); } ApiError apiError = ApiError.fromThrowable(e); - for (String topic : data.topicNames()) { + for (DeleteTopicState topic : topics()) { response.responses().add(new DeletableTopicResult() - .setName(topic) + .setName(topic.name()) + .setTopicId(topic.topicId()) .setErrorCode(apiError.error().code())); } return new DeleteTopicsResponse(response); } + + public List topicNames() { + if (version() >= 6) + return data.topics().stream().map(topic -> topic.name()).collect(Collectors.toList()); + return data.topicNames(); + } + + public List topicIds() { + if (version() >= 6) + return data.topics().stream().map(topic -> topic.topicId()).collect(Collectors.toList()); + return Collections.emptyList(); + } + + public List topics() { + if (version() >= 6) + return data.topics(); + return data.topicNames().stream().map(name -> new DeleteTopicState().setName(name)).collect(Collectors.toList()); + } public static DeleteTopicsRequest parse(ByteBuffer buffer, short version) { return new DeleteTopicsRequest(new DeleteTopicsRequestData(new ByteBufferAccessor(buffer), version), version); diff --git a/clients/src/main/resources/common/message/CreateTopicsRequest.json b/clients/src/main/resources/common/message/CreateTopicsRequest.json index c80add64a7b48..1a8d57a3f0bea 100644 --- a/clients/src/main/resources/common/message/CreateTopicsRequest.json +++ b/clients/src/main/resources/common/message/CreateTopicsRequest.json @@ -26,7 +26,9 @@ // // Version 6 is identical to version 5 but may return a THROTTLING_QUOTA_EXCEEDED error // in the response if the topics creation is throttled (KIP-599). - "validVersions": "0-6", + // + // Version 7 is the same as version 6. + "validVersions": "0-7", "flexibleVersions": "5+", "fields": [ { "name": "Topics", "type": "[]CreatableTopic", "versions": "0+", diff --git a/clients/src/main/resources/common/message/CreateTopicsResponse.json b/clients/src/main/resources/common/message/CreateTopicsResponse.json index 9f2296124a783..c1bf88285a150 100644 --- a/clients/src/main/resources/common/message/CreateTopicsResponse.json +++ b/clients/src/main/resources/common/message/CreateTopicsResponse.json @@ -30,7 +30,9 @@ // // Version 6 is identical to version 5 but may return a THROTTLING_QUOTA_EXCEEDED error // in the response if the topics creation is throttled (KIP-599). - "validVersions": "0-6", + // + // Version 7 returns the topic ID of the newly created topic if creation is sucessful. + "validVersions": "0-7", "flexibleVersions": "5+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true, @@ -39,6 +41,7 @@ "about": "Results for each topic we tried to create.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "about": "The topic name." }, + { "name": "TopicId", "type": "uuid", "versions": "7+", "ignorable": true, "about": "The unique topic ID"}, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "1+", "nullableVersions": "0+", "ignorable": true, diff --git a/clients/src/main/resources/common/message/DeleteTopicsRequest.json b/clients/src/main/resources/common/message/DeleteTopicsRequest.json index 9dfba16515633..7e11554ad480c 100644 --- a/clients/src/main/resources/common/message/DeleteTopicsRequest.json +++ b/clients/src/main/resources/common/message/DeleteTopicsRequest.json @@ -23,10 +23,17 @@ // // Version 5 adds ErrorMessage in the response and may return a THROTTLING_QUOTA_EXCEEDED error // in the response if the topics deletion is throttled (KIP-599). - "validVersions": "0-5", + // + // Version 6 reorganizes topics, adds topic IDs and allows topic names to be null. + "validVersions": "0-6", "flexibleVersions": "4+", "fields": [ - { "name": "TopicNames", "type": "[]string", "versions": "0+", "entityType": "topicName", + { "name": "Topics", "type": "[]DeleteTopicState", "versions": "6+", "about": "The name or topic ID of the topic", + "fields": [ + {"name": "Name", "type": "string", "versions": "6+", "nullableVersions": "6+", "about": "The topic name"}, + {"name": "TopicId", "type": "uuid", "versions": "6+", "about": "The unique topic ID"} + ]}, + { "name": "TopicNames", "type": "[]string", "versions": "0-5", "entityType": "topicName", "ignorable": true, "about": "The names of the topics to delete" }, { "name": "TimeoutMs", "type": "int32", "versions": "0+", "about": "The length of time in milliseconds to wait for the deletions to complete." } diff --git a/clients/src/main/resources/common/message/DeleteTopicsResponse.json b/clients/src/main/resources/common/message/DeleteTopicsResponse.json index 6986b0adb339a..cb531bb13f761 100644 --- a/clients/src/main/resources/common/message/DeleteTopicsResponse.json +++ b/clients/src/main/resources/common/message/DeleteTopicsResponse.json @@ -27,15 +27,20 @@ // // Version 5 adds ErrorMessage in the response and may return a THROTTLING_QUOTA_EXCEEDED error // in the response if the topics deletion is throttled (KIP-599). - "validVersions": "0-5", + // + // Version 6 adds topic ID to responses. An UNSUPPORTED_VERSION error code will be returned when attempting to + // delete using topic IDs when IBP < 2.8. UNKNOWN_TOPIC_ID error code will be returned when IBP is at least 2.8, but + // the topic ID was not found. + "validVersions": "0-6", "flexibleVersions": "4+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Responses", "type": "[]DeletableTopicResult", "versions": "0+", "about": "The results for each topic we tried to delete.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0+", "nullableVersions": "6+", "mapKey": true, "entityType": "topicName", "about": "The topic name" }, + {"name": "TopicId", "type": "uuid", "versions": "6+", "ignorable": "true", "about": "the unique topic ID"}, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The deletion error, or 0 if the deletion succeeded." }, { "name": "ErrorMessage", "type": "string", "versions": "5+", "nullableVersions": "5+", "ignorable": true, "default": "null", diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index d23b9f6932474..ed62f431e64d2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -65,6 +65,7 @@ import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnknownTopicIdException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.feature.Features; @@ -410,6 +411,12 @@ public static DeletableTopicResult deletableTopicResult(String topicName, Errors .setErrorCode(error.code()); } + public static DeletableTopicResult deletableTopicResultWithId(Uuid topicId, Errors error) { + return new DeletableTopicResult() + .setTopicId(topicId) + .setErrorCode(error.code()); + } + public static CreatePartitionsResponse prepareCreatePartitionsResponse(int throttleTimeMs, CreatePartitionsTopicResult... topics) { CreatePartitionsResponseData data = new CreatePartitionsResponseData() .setThrottleTimeMs(throttleTimeMs) @@ -436,6 +443,14 @@ private static DeleteTopicsResponse prepareDeleteTopicsResponse(String topicName return new DeleteTopicsResponse(data); } + private static DeleteTopicsResponse prepareDeleteTopicsResponseWithTopicId(Uuid id, Errors error) { + DeleteTopicsResponseData data = new DeleteTopicsResponseData(); + data.responses().add(new DeletableTopicResult() + .setTopicId(id) + .setErrorCode(error.code())); + return new DeleteTopicsResponse(data); + } + private static FindCoordinatorResponse prepareFindCoordinatorResponse(Errors error, Node node) { return FindCoordinatorResponse.prepareResponse(error, node); } @@ -869,8 +884,33 @@ public void testDeleteTopics() throws Exception { future = env.adminClient().deleteTopics(singletonList("myTopic"), new DeleteTopicsOptions()).all(); TestUtils.assertFutureError(future, UnknownTopicOrPartitionException.class); + + // With topic IDs + Uuid topicId = Uuid.randomUuid(); + + env.kafkaClient().prepareResponse( + expectDeleteTopicsRequestWithTopicIds(topicId), + prepareDeleteTopicsResponseWithTopicId(topicId, Errors.NONE)); + future = env.adminClient().deleteTopicsWithIds(singletonList(topicId), + new DeleteTopicsOptions()).all(); + assertNull(future.get()); + + env.kafkaClient().prepareResponse( + expectDeleteTopicsRequestWithTopicIds(topicId), + prepareDeleteTopicsResponseWithTopicId(topicId, Errors.TOPIC_DELETION_DISABLED)); + future = env.adminClient().deleteTopicsWithIds(singletonList(topicId), + new DeleteTopicsOptions()).all(); + TestUtils.assertFutureError(future, TopicDeletionDisabledException.class); + + env.kafkaClient().prepareResponse( + expectDeleteTopicsRequestWithTopicIds(topicId), + prepareDeleteTopicsResponseWithTopicId(topicId, Errors.UNKNOWN_TOPIC_ID)); + future = env.adminClient().deleteTopicsWithIds(singletonList(topicId), + new DeleteTopicsOptions()).all(); + TestUtils.assertFutureError(future, UnknownTopicIdException.class); } } + @Test public void testDeleteTopicsPartialResponse() throws Exception { @@ -887,6 +927,20 @@ public void testDeleteTopicsPartialResponse() throws Exception { result.values().get("myTopic").get(); TestUtils.assertFutureThrows(result.values().get("myOtherTopic"), ApiException.class); + + // With topic IDs + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + env.kafkaClient().prepareResponse( + expectDeleteTopicsRequestWithTopicIds(topicId1, topicId2), + prepareDeleteTopicsResponse(1000, + deletableTopicResultWithId(topicId1, Errors.NONE))); + + DeleteTopicsWithIdsResult resultIds = env.adminClient().deleteTopicsWithIds( + asList(topicId1, topicId2), new DeleteTopicsOptions()); + + resultIds.values().get(topicId1).get(); + TestUtils.assertFutureThrows(resultIds.values().get(topicId2), ApiException.class); } } @@ -919,6 +973,36 @@ public void testDeleteTopicsRetryThrottlingExceptionWhenEnabled() throws Excepti assertNull(result.values().get("topic1").get()); assertNull(result.values().get("topic2").get()); TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class); + + // With topic IDs + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + Uuid topicId3 = Uuid.randomUuid(); + + env.kafkaClient().prepareResponse( + expectDeleteTopicsRequestWithTopicIds(topicId1, topicId2, topicId3), + prepareDeleteTopicsResponse(1000, + deletableTopicResultWithId(topicId1, Errors.NONE), + deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED), + deletableTopicResultWithId(topicId3, Errors.UNKNOWN_TOPIC_ID))); + + env.kafkaClient().prepareResponse( + expectDeleteTopicsRequestWithTopicIds(topicId2), + prepareDeleteTopicsResponse(1000, + deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED))); + + env.kafkaClient().prepareResponse( + expectDeleteTopicsRequestWithTopicIds(topicId2), + prepareDeleteTopicsResponse(0, + deletableTopicResultWithId(topicId2, Errors.NONE))); + + DeleteTopicsWithIdsResult resultIds = env.adminClient().deleteTopicsWithIds( + asList(topicId1, topicId2, topicId3), + new DeleteTopicsOptions().retryOnQuotaViolation(true)); + + assertNull(resultIds.values().get(topicId1).get()); + assertNull(resultIds.values().get(topicId2).get()); + TestUtils.assertFutureThrows(resultIds.values().get(topicId3), UnknownTopicIdException.class); } } @@ -964,6 +1048,43 @@ public void testDeleteTopicsRetryThrottlingExceptionWhenEnabledUntilRequestTimeO ThrottlingQuotaExceededException.class); assertEquals(0, e.throttleTimeMs()); TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class); + + // With topic IDs + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + Uuid topicId3 = Uuid.randomUuid(); + env.kafkaClient().prepareResponse( + expectDeleteTopicsRequestWithTopicIds(topicId1, topicId2, topicId3), + prepareDeleteTopicsResponse(1000, + deletableTopicResultWithId(topicId1, Errors.NONE), + deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED), + deletableTopicResultWithId(topicId3, Errors.UNKNOWN_TOPIC_ID))); + + env.kafkaClient().prepareResponse( + expectDeleteTopicsRequestWithTopicIds(topicId2), + prepareDeleteTopicsResponse(1000, + deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED))); + + DeleteTopicsWithIdsResult resultIds = env.adminClient().deleteTopicsWithIds( + asList(topicId1, topicId2, topicId3), + new DeleteTopicsOptions().retryOnQuotaViolation(true)); + + // Wait until the prepared attempts have consumed + TestUtils.waitForCondition(() -> env.kafkaClient().numAwaitingResponses() == 0, + "Failed awaiting DeleteTopics requests"); + + // Wait until the next request is sent out + TestUtils.waitForCondition(() -> env.kafkaClient().inFlightRequestCount() == 1, + "Failed awaiting next DeleteTopics request"); + + // Advance time past the default api timeout to time out the inflight request + time.sleep(defaultApiTimeout + 1); + + assertNull(resultIds.values().get(topicId1).get()); + e = TestUtils.assertFutureThrows(resultIds.values().get(topicId2), + ThrottlingQuotaExceededException.class); + assertEquals(0, e.throttleTimeMs()); + TestUtils.assertFutureThrows(resultIds.values().get(topicId3), UnknownTopicIdException.class); } } @@ -988,6 +1109,27 @@ public void testDeleteTopicsDontRetryThrottlingExceptionWhenDisabled() throws Ex ThrottlingQuotaExceededException.class); assertEquals(1000, e.throttleTimeMs()); TestUtils.assertFutureError(result.values().get("topic3"), TopicExistsException.class); + + // With topic IDs + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + Uuid topicId3 = Uuid.randomUuid(); + env.kafkaClient().prepareResponse( + expectDeleteTopicsRequestWithTopicIds(topicId1, topicId2, topicId3), + prepareDeleteTopicsResponse(1000, + deletableTopicResultWithId(topicId1, Errors.NONE), + deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED), + deletableTopicResultWithId(topicId3, Errors.UNKNOWN_TOPIC_ID))); + + DeleteTopicsWithIdsResult resultIds = env.adminClient().deleteTopicsWithIds( + asList(topicId1, topicId2, topicId3), + new DeleteTopicsOptions().retryOnQuotaViolation(false)); + + assertNull(resultIds.values().get(topicId1).get()); + e = TestUtils.assertFutureThrows(resultIds.values().get(topicId2), + ThrottlingQuotaExceededException.class); + assertEquals(1000, e.throttleTimeMs()); + TestUtils.assertFutureError(resultIds.values().get(topicId3), UnknownTopicIdException.class); } } @@ -995,7 +1137,17 @@ private MockClient.RequestMatcher expectDeleteTopicsRequestWithTopics(final Stri return body -> { if (body instanceof DeleteTopicsRequest) { DeleteTopicsRequest request = (DeleteTopicsRequest) body; - return request.data().topicNames().equals(Arrays.asList(topics)); + return request.topicNames().equals(Arrays.asList(topics)); + } + return false; + }; + } + + private MockClient.RequestMatcher expectDeleteTopicsRequestWithTopicIds(final Uuid... topicIds) { + return body -> { + if (body instanceof DeleteTopicsRequest) { + DeleteTopicsRequest request = (DeleteTopicsRequest) body; + return request.topicIds().equals(Arrays.asList(topicIds)); } return false; }; diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 3e9f605923bb7..767ee57333baf 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -17,15 +17,16 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.ElectionType; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; @@ -60,6 +61,8 @@ public class MockAdminClient extends AdminClient { private final List brokers; private final Map allTopics = new HashMap<>(); + private final Map topicIds = new HashMap<>(); + private final Map topicNames = new HashMap<>(); private final Map reassignments = new HashMap<>(); private final Map replicaMoves = @@ -207,6 +210,9 @@ synchronized public void addTopic(boolean internal, } } allTopics.put(name, new TopicMetadata(internal, partitions, logDirs, configs)); + Uuid id = Uuid.randomUuid(); + topicIds.put(name, id); + topicNames.put(id, name); } synchronized public void markTopicForDeletion(final String name) { @@ -298,6 +304,9 @@ synchronized public CreateTopicsResult createTopics(Collection newTopi logDirs.add(brokerLogDirs.get(partitions.get(i).leader().id()).get(0)); } allTopics.put(topicName, new TopicMetadata(false, partitions, logDirs, newTopic.configs())); + Uuid id = Uuid.randomUuid(); + topicIds.put(topicName, id); + topicNames.put(id, topicName); future.complete(null); createTopicResult.put(topicName, future); } @@ -393,6 +402,7 @@ synchronized public DeleteTopicsResult deleteTopics(Collection topicsToD if (allTopics.remove(topicName) == null) { future.completeExceptionally(new UnknownTopicOrPartitionException(String.format("Topic %s does not exist.", topicName))); } else { + topicNames.remove(topicIds.remove(topicName)); future.complete(null); } deleteTopicsResult.put(topicName, future); @@ -401,6 +411,37 @@ synchronized public DeleteTopicsResult deleteTopics(Collection topicsToD return new DeleteTopicsResult(deleteTopicsResult); } + @Override + synchronized public DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection topicsToDelete, DeleteTopicsOptions options) { + Map> deleteTopicsWithIdsResult = new HashMap<>(); + + if (timeoutNextRequests > 0) { + for (final Uuid topicId : topicsToDelete) { + KafkaFutureImpl future = new KafkaFutureImpl<>(); + future.completeExceptionally(new TimeoutException()); + deleteTopicsWithIdsResult.put(topicId, future); + } + + --timeoutNextRequests; + return new DeleteTopicsWithIdsResult(deleteTopicsWithIdsResult); + } + + for (final Uuid topicId : topicsToDelete) { + KafkaFutureImpl future = new KafkaFutureImpl<>(); + + String name = topicNames.remove(topicId); + if (name == null || allTopics.remove(name) == null) { + future.completeExceptionally(new UnknownTopicOrPartitionException(String.format("Topic %s does not exist.", topicId))); + } else { + topicIds.remove(name); + future.complete(null); + } + deleteTopicsWithIdsResult.put(topicId, future); + } + + return new DeleteTopicsWithIdsResult(deleteTopicsWithIdsResult); + } + @Override synchronized public CreatePartitionsResult createPartitions(Map newPartitions, CreatePartitionsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 09c1ba49aca99..7e13711358ae1 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -77,7 +77,7 @@ import org.apache.kafka.common.resource.{Resource, ResourceType} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation} import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time} -import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.common.{Node, TopicPartition, Uuid} import org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic} import org.apache.kafka.server.authorizer._ @@ -1930,29 +1930,47 @@ class KafkaApis(val requestChannel: RequestChannel, val results = new DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size) val toDelete = mutable.Set[String]() if (!controller.isActive) { - deleteTopicRequest.data.topicNames.forEach { topic => + deleteTopicRequest.topics().forEach { topic => results.add(new DeletableTopicResult() - .setName(topic) + .setName(topic.name()) + .setTopicId(topic.topicId()) .setErrorCode(Errors.NOT_CONTROLLER.code)) } sendResponseCallback(results) } else if (!config.deleteTopicEnable) { val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST else Errors.TOPIC_DELETION_DISABLED - deleteTopicRequest.data.topicNames.forEach { topic => + deleteTopicRequest.topics().forEach { topic => results.add(new DeletableTopicResult() - .setName(topic) + .setName(topic.name()) + .setTopicId(topic.topicId()) .setErrorCode(error.code)) } sendResponseCallback(results) } else { - deleteTopicRequest.data.topicNames.forEach { topic => + val topicIdsFromRequest = deleteTopicRequest.topicIds().asScala.filter(topicId => topicId != Uuid.ZERO_UUID).toSet + deleteTopicRequest.topics().forEach { topic => + if (topic.name() != null && topic.topicId() != Uuid.ZERO_UUID) + throw new InvalidRequestException("Topic name and topic ID can not both be specified.") + val name = if (topic.topicId() == Uuid.ZERO_UUID) topic.name() + else controller.controllerContext.topicNames.getOrElse(topic.topicId(), null) results.add(new DeletableTopicResult() - .setName(topic)) + .setName(name) + .setTopicId(topic.topicId())) } - val authorizedTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC, - results.asScala)(_.name) + val authorizedDescribeTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, + results.asScala.filter(result => result.name() != null))(_.name) + val authorizedDeleteTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC, + results.asScala.filter(result => result.name() != null))(_.name) results.forEach { topic => - if (!authorizedTopics.contains(topic.name)) + val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) && topic.name() == null + if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) { + topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code) + topic.setErrorMessage("Topic IDs are not supported on the server.") + } else if (unresolvedTopicId) + topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code) + else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name)) + topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code) + else if (!authorizedDeleteTopics.contains(topic.name)) topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) else if (!metadataCache.contains(topic.name)) topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala b/core/src/main/scala/kafka/server/ZkAdminManager.scala index fd4732f0dd50d..1fa70522872ba 100644 --- a/core/src/main/scala/kafka/server/ZkAdminManager.scala +++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala @@ -30,6 +30,7 @@ import kafka.utils.Implicits._ import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.admin.{AlterConfigOp, ScramMechanism} import org.apache.kafka.clients.admin.AlterConfigOp.OpType +import org.apache.kafka.common.Uuid import org.apache.kafka.common.config.ConfigDef.ConfigKey import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource, LogLevelConfig} import org.apache.kafka.common.errors.ThrottlingQuotaExceededException @@ -133,6 +134,13 @@ class ZkAdminManager(val config: KafkaConfig, } } + private def populateIds(metadataAndConfigs: Map[String, CreatableTopicResult], + topicName: String) : Unit = { + metadataAndConfigs.get(topicName).foreach { result => + result.setTopicId(zkClient.getTopicIdsForTopics(Predef.Set(result.name())).getOrElse(result.name(), Uuid.ZERO_UUID)) + } + } + /** * Create topics and wait until the topics have been completely created. * The callback function will be triggered either when timeout, error or the topics are created. @@ -195,6 +203,7 @@ class ZkAdminManager(val config: KafkaConfig, } else { controllerMutationQuota.record(assignments.size) adminZkClient.createTopicWithAssignment(topic.name, configs, assignments, validate = false, config.usesTopicId) + populateIds(includeConfigsAndMetadata, topic.name) CreatePartitionsMetadata(topic.name, assignments.keySet) } } catch { diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index edda7a9838628..1b48be3e23731 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -100,7 +100,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val brokerId: Integer = 0 val topic = "topic" - val topicId = Uuid.randomUuid() val topicPattern = "topic.*" val transactionalId = "transactional.id" val producerId = 83392L @@ -108,8 +107,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val correlationId = 0 val clientId = "client-Id" val tp = new TopicPartition(topic, part) - val topicIds = Collections.singletonMap(topic, topicId) - val topicNames = Collections.singletonMap(topicId, topic) val logDir = "logDir" val group = "my-group" val protocolType = "consumer" @@ -159,7 +156,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } @nowarn("cat=deprecation") - val requestKeyToError = Map[ApiKeys, Nothing => Errors]( + val requestKeyToError = (topicNames: Map[Uuid, String]) => Map[ApiKeys, Nothing => Errors]( ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2), ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses.asScala.find(_._1 == tp).get._2.error), ApiKeys.FETCH -> ((resp: requests.FetchResponse[Records]) => resp.responseData.asScala.find(_._1 == tp).get._2.error), @@ -185,7 +182,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error), ApiKeys.DELETE_GROUPS -> ((resp: DeleteGroupsResponse) => resp.get(group)), ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => Errors.forCode( - resp.topics.asScala.find(t => topicNames.get(t.topicId) == tp.topic).get.partitionErrors.asScala.find( + resp.topics.asScala.find(t => topicNames(t.topicId) == tp.topic).get.partitionErrors.asScala.find( p => p.partitionIndex == tp.partition).get.errorCode)), ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => Errors.forCode( resp.partitionErrors.asScala.find(pe => pe.topicName == tp.topic && pe.partitionIndex == tp.partition).get.errorCode)), @@ -235,6 +232,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest { }) ) + val requestKeysToErrorWithIds = (id: Uuid) => Map[ApiKeys, Nothing => Errors]( + ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => Errors.forCode(resp.data.responses.asScala.find(_.topicId == id).get.errorCode())) + ) + val requestKeysToAcls = Map[ApiKeys, Map[ResourcePattern, Set[AccessControlEntry]]]( ApiKeys.METADATA -> topicDescribeAcl, ApiKeys.PRODUCE -> (topicWriteAcl ++ transactionIdWriteAcl ++ clusterIdempotentWriteAcl), @@ -479,7 +480,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { .setZkVersion(2) .setReplicas(Seq(brokerId).asJava) .setIsNew(false)).asJava, - topicIds, + getTopicIds().asJava, Set(new Node(brokerId, "localhost", 0)).asJava).build() } @@ -518,6 +519,15 @@ class AuthorizerIntegrationTest extends BaseRequestTest { .setTimeoutMs(5000)).build() } + private def deleteTopicsWithIdsRequest(id: Uuid = getTopicIds()(topic)): DeleteTopicsRequest = { + new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopics(Collections.singletonList( + new DeleteTopicsRequestData.DeleteTopicState() + .setTopicId(id))) + .setTimeoutMs(5000)).build() + } + private def deleteRecordsRequest = new DeleteRecordsRequest.Builder( new DeleteRecordsRequestData() .setTimeoutMs(5000) @@ -628,12 +638,33 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ) ).build() + private def sendRequests(requestKeyToRequest: mutable.Map[ApiKeys, AbstractRequest], topicExists: Boolean = true) = { + for ((key, request) <- requestKeyToRequest) { + removeAllClientAcls() + val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet + sendRequestAndVerifyResponseError(request, resources, isAuthorized = false, topicExists = topicExists) + + val resourceToAcls = requestKeysToAcls(key) + resourceToAcls.get(topicResource).foreach { acls => + val describeAcls = topicDescribeAcl(topicResource) + val isAuthorized = describeAcls == acls + addAndVerifyAcls(describeAcls, topicResource) + sendRequestAndVerifyResponseError(request, resources, isAuthorized = isAuthorized, topicExists = topicExists) + removeAllClientAcls() + } + + for ((resource, acls) <- resourceToAcls) + addAndVerifyAcls(acls, resource) + sendRequestAndVerifyResponseError(request, resources, isAuthorized = true, topicExists = topicExists) + } + } + @Test def testAuthorizationWithTopicExisting(): Unit = { - val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( - // First create the topic - ApiKeys.CREATE_TOPICS -> createTopicsRequest, + //First create the topic so we have a valid topic ID + sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest)) + val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true), ApiKeys.PRODUCE -> createProduceRequest, ApiKeys.FETCH -> createFetchRequest, @@ -673,23 +704,32 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.DELETE_TOPICS -> deleteTopicsRequest ) - for ((key, request) <- requestKeyToRequest) { + sendRequests(requestKeyToRequest) + } + + @Test + def testAuthorizationDeleteTopicsIdWithTopicExisting(): Unit = { + sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest)) + + val id = getTopicIds()(topic) + + for ((key, request) <- mutable.Map(ApiKeys.DELETE_TOPICS -> deleteTopicsWithIdsRequest())) { removeAllClientAcls() val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet - sendRequestAndVerifyResponseError(request, resources, isAuthorized = false) + sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = false, topicExists = true, describeAuthorized = false, id = id) val resourceToAcls = requestKeysToAcls(key) resourceToAcls.get(topicResource).foreach { acls => val describeAcls = topicDescribeAcl(topicResource) val isAuthorized = describeAcls == acls addAndVerifyAcls(describeAcls, topicResource) - sendRequestAndVerifyResponseError(request, resources, isAuthorized = isAuthorized) + sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = isAuthorized, topicExists = true, describeAuthorized = true, id = id) removeAllClientAcls() } for ((resource, acls) <- resourceToAcls) addAndVerifyAcls(acls, resource) - sendRequestAndVerifyResponseError(request, resources, isAuthorized = true) + sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = true, topicExists = true, describeAuthorized = true, id = id) } } @@ -715,23 +755,33 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.ELECT_LEADERS -> electLeadersRequest ) + sendRequests(requestKeyToRequest, false) + } + + @Test + def testAuthorizationDeleteTopicsIdWithTopicNotExisting(): Unit = { + val id = Uuid.randomUuid() + val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( + ApiKeys.DELETE_TOPICS -> deleteTopicsWithIdsRequest(id), + ) + for ((key, request) <- requestKeyToRequest) { removeAllClientAcls() val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet - sendRequestAndVerifyResponseError(request, resources, isAuthorized = false, topicExists = false) + sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = false, topicExists = false, describeAuthorized = false, id = id) val resourceToAcls = requestKeysToAcls(key) resourceToAcls.get(topicResource).foreach { acls => val describeAcls = topicDescribeAcl(topicResource) val isAuthorized = describeAcls == acls addAndVerifyAcls(describeAcls, topicResource) - sendRequestAndVerifyResponseError(request, resources, isAuthorized = isAuthorized, topicExists = false) + sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = isAuthorized, topicExists = false, describeAuthorized = true, id = id) removeAllClientAcls() } for ((resource, acls) <- resourceToAcls) addAndVerifyAcls(acls, resource) - sendRequestAndVerifyResponseError(request, resources, isAuthorized = true, topicExists = false) + sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = true, topicExists = false, describeAuthorized = true, id = id) } } @@ -1903,13 +1953,52 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } } + private def sendRequestWithIdAndVerifyResponseError(request: AbstractRequest, + resources: Set[ResourceType], + isAuthorized: Boolean, + topicExists: Boolean, + describeAuthorized: Boolean, + id: Uuid): AbstractResponse = { + val apiKey = request.apiKey + val response = connectAndReceive[AbstractResponse](request) + val error = requestKeysToErrorWithIds(id)(apiKey).asInstanceOf[AbstractResponse => Errors](response) + + val authorizationErrors = resources.flatMap { resourceType => + if (resourceType == TOPIC) { + if (isAuthorized) + Set(Errors.UNKNOWN_TOPIC_ID, AclEntry.authorizationError(ResourceType.TOPIC)) + else if (describeAuthorized) + Set(AclEntry.authorizationError(ResourceType.TOPIC)) + else + Set(Errors.UNKNOWN_TOPIC_ID) + } else { + Set(AclEntry.authorizationError(resourceType)) + } + } + + if (topicExists) + if (isAuthorized) + assertFalse(authorizationErrors.contains(error), s"$apiKey should be allowed. Found unexpected authorization error $error") + else + assertTrue(authorizationErrors.contains(error), s"$apiKey should be forbidden. Found error $error but expected one of $authorizationErrors") + else if (resources == Set(TOPIC)) + if (isAuthorized) + assertEquals(Errors.UNKNOWN_TOPIC_ID, error, s"$apiKey had an unexpected error") + else { + assertEquals(Errors.UNKNOWN_TOPIC_ID, error, s"$apiKey had an unexpected error") + } + + response + } + private def sendRequestAndVerifyResponseError(request: AbstractRequest, resources: Set[ResourceType], isAuthorized: Boolean, - topicExists: Boolean = true): AbstractResponse = { + topicExists: Boolean = true, + topicNames: Map[Uuid, String] = getTopicNames()): AbstractResponse = { val apiKey = request.apiKey val response = connectAndReceive[AbstractResponse](request) - val error = requestKeyToError(apiKey).asInstanceOf[AbstractResponse => Errors](response) + val error = requestKeyToError(topicNames)(apiKey).asInstanceOf[AbstractResponse => Errors](response) val authorizationErrors = resources.flatMap { resourceType => if (resourceType == TOPIC) { diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala index e5d4af5ff6bf3..b9aa2088eac66 100644 --- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala @@ -24,6 +24,7 @@ import kafka.server.KafkaConfig import kafka.utils.Logging import kafka.utils.TestUtils._ import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, CreateTopicsOptions, CreateTopicsResult, DescribeClusterOptions, DescribeTopicsOptions, NewTopic, TopicDescription} +import org.apache.kafka.common.Uuid import org.apache.kafka.common.acl.AclOperation import org.apache.kafka.common.errors.{TopicExistsException, UnknownTopicOrPartitionException} import org.apache.kafka.common.resource.ResourceType @@ -90,6 +91,12 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg createResult.all.get() waitForTopics(client, topics, List()) validateMetadataAndConfigs(createResult) + val topicIds = getTopicIds() + topics.foreach { topic => + assertNotEquals(Uuid.ZERO_UUID, createResult.topicId(topic).get()) + assertEquals(topicIds(topic), createResult.topicId(topic).get()) + } + val failedCreateResult = client.createTopics(newTopics.asJava) val results = failedCreateResult.values() diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 049ef67f90df3..87cb25725d409 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory import scala.annotation.nowarn import scala.jdk.CollectionConverters._ import scala.collection.Seq +import scala.compat.java8.OptionConverters._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} import scala.util.Random @@ -115,6 +116,24 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertTrue(e.getCause.isInstanceOf[TopicExistsException]) } + @Test + def testDeleteTopicsWithIds(): Unit = { + client = Admin.create(createConfig) + val topics = Seq("mytopic", "mytopic2", "mytopic3") + val newTopics = Seq( + new NewTopic("mytopic", Map((0: Integer) -> Seq[Integer](1, 2).asJava, (1: Integer) -> Seq[Integer](2, 0).asJava).asJava), + new NewTopic("mytopic2", 3, 3.toShort), + new NewTopic("mytopic3", Option.empty[Integer].asJava, Option.empty[java.lang.Short].asJava) + ) + val createResult = client.createTopics(newTopics.asJava) + createResult.all.get() + waitForTopics(client, topics, List()) + val topicIds = getTopicIds().values.toSet + + client.deleteTopicsWithIds(topicIds.asJava).all.get() + waitForTopics(client, List(), topics) + } + @Test def testMetadataRefresh(): Unit = { client = Admin.create(createConfig) diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala index 916ffbc9cbca8..1f15563b012ea 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala @@ -19,6 +19,7 @@ import kafka.server.{Defaults, KafkaConfig} import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils} import kafka.utils.TestUtils._ import org.apache.kafka.clients.admin._ +import org.apache.kafka.common.Uuid import org.apache.kafka.common.acl._ import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE} import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY} @@ -422,6 +423,11 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu createResult.all.get() waitForTopics(client, topics, List()) validateMetadataAndConfigs(createResult) + val topicIds = getTopicIds() + assertNotEquals(Uuid.ZERO_UUID, createResult.topicId(topic1).get()) + assertEquals(topicIds(topic1), createResult.topicId(topic1).get()) + assertFutureExceptionTypeEquals(createResult.topicId(topic2), classOf[TopicAuthorizationException]) + val createResponseConfig = createResult.config(topic1).get().entries.asScala val describeResponseConfig = describeConfigs(topic1) diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 8eae07e69e56b..bc6717df2cf1e 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -30,7 +30,7 @@ import scala.collection.Seq import scala.collection.mutable.{ArrayBuffer, Buffer} import java.util.Properties -import org.apache.kafka.common.KafkaException +import org.apache.kafka.common.{KafkaException, Uuid} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.scram.ScramCredential import org.apache.kafka.common.utils.Time @@ -168,4 +168,17 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness { } } + def getController(): KafkaServer = { + val controllerId = TestUtils.waitUntilControllerElected(zkClient) + servers.filter(s => s.config.brokerId == controllerId).head + } + + def getTopicIds(): Map[String, Uuid] = { + getController().kafkaController.controllerContext.topicIds.toMap + } + + def getTopicNames(): Map[Uuid, String] = { + getController().kafkaController.controllerContext.topicNames.toMap + } + } diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala index e48ea58ff9d7f..6283410942fe5 100644 --- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala @@ -18,6 +18,7 @@ package kafka.server import kafka.utils._ +import org.apache.kafka.common.Uuid import org.apache.kafka.common.message.CreateTopicsRequestData import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection import org.apache.kafka.common.protocol.ApiKeys @@ -170,6 +171,11 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest { assertEquals(-1, topicResponse.replicationFactor) assertTrue(topicResponse.configs.isEmpty) } + + if (version >= 7) + assertNotEquals(Uuid.ZERO_UUID, topicResponse.topicId()) + else + assertEquals(Uuid.ZERO_UUID, topicResponse.topicId()) } } } diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala index 1bf0bc0577757..a17612170d7a0 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala @@ -21,7 +21,9 @@ import java.util.{Arrays, Collections} import kafka.network.SocketServer import kafka.utils._ +import org.apache.kafka.common.Uuid import org.apache.kafka.common.message.DeleteTopicsRequestData +import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{DeleteTopicsRequest, DeleteTopicsResponse, MetadataRequest, MetadataResponse} import org.junit.jupiter.api.Assertions._ @@ -47,6 +49,17 @@ class DeleteTopicsRequestTest extends BaseRequestTest { new DeleteTopicsRequestData() .setTopicNames(Arrays.asList("topic-3", "topic-4")) .setTimeoutMs(timeout)).build()) + + // Topic Ids + createTopic("topic-7", 3, 2) + createTopic("topic-6", 1, 2) + val ids = getTopicIds() + validateValidDeleteTopicRequestsWithIds(new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopics(Arrays.asList(new DeleteTopicState().setTopicId(ids("topic-7")), + new DeleteTopicState().setTopicId(ids("topic-6")) + ) + ).setTimeoutMs(timeout)).build()) } private def validateValidDeleteTopicRequests(request: DeleteTopicsRequest): Unit = { @@ -58,6 +71,15 @@ class DeleteTopicsRequestTest extends BaseRequestTest { } } + private def validateValidDeleteTopicRequestsWithIds(request: DeleteTopicsRequest): Unit = { + val response = sendDeleteTopicsRequest(request) + val error = response.errorCounts.asScala.find(_._1 != Errors.NONE) + assertTrue(error.isEmpty, s"There should be no errors, found ${response.data.responses.asScala}") + response.data.responses.forEach { response => + validateTopicIsDeleted(response.name()) + } + } + @Test def testErrorDeleteTopicRequests(): Unit = { val timeout = 30000 @@ -81,6 +103,21 @@ class DeleteTopicsRequestTest extends BaseRequestTest { "partial-invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION ) ) + + // Topic IDs + createTopic("topic-id-1", 1, 1) + val validId = getTopicIds()("topic-id-1") + val invalidId = Uuid.randomUuid + validateErrorDeleteTopicRequestsWithIds(new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopics(Arrays.asList(new DeleteTopicState().setTopicId(invalidId), + new DeleteTopicState().setTopicId(validId))) + .setTimeoutMs(timeout)).build(), + Map( + invalidId -> Errors.UNKNOWN_TOPIC_ID, + validId -> Errors.NONE + ) + ) // Timeout createTopic(timeoutTopic, 5, 2) @@ -111,6 +148,24 @@ class DeleteTopicsRequestTest extends BaseRequestTest { } } + private def validateErrorDeleteTopicRequestsWithIds(request: DeleteTopicsRequest, expectedResponse: Map[Uuid, Errors]): Unit = { + val response = sendDeleteTopicsRequest(request) + val responses = response.data.responses + val errors = responses.asScala.map(result => result.topicId() -> result.errorCode()).toMap + val names = responses.asScala.map(result => result.topicId() -> result.name()).toMap + + val errorCount = response.errorCounts().asScala.foldLeft(0)(_+_._2) + assertEquals(expectedResponse.size, errorCount, "The response size should match") + + expectedResponse.foreach { case (topic, expectedError) => + assertEquals(expectedResponse(topic).code, errors(topic), "The response error should match") + // If no error validate the topic was deleted + if (expectedError == Errors.NONE) { + validateTopicIsDeleted(names(topic)) + } + } + } + @Test def testNotController(): Unit = { val request = new DeleteTopicsRequest.Builder( @@ -119,7 +174,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest { .setTimeoutMs(1000)).build() val response = sendDeleteTopicsRequest(request, notControllerSocketServer) - val error = response.data.responses().find("not-controller").errorCode() + val error = response.data.responses.find("not-controller").errorCode() assertEquals(Errors.NOT_CONTROLLER.code, error, "Expected controller error when routed incorrectly") } diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestIBPTest/TopicIdWithOldInterBrokerProtocolTest.scala b/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala similarity index 50% rename from core/src/test/scala/unit/kafka/server/MetadataRequestIBPTest/TopicIdWithOldInterBrokerProtocolTest.scala rename to core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala index 9c56818635fe9..4d425c011cf8c 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataRequestIBPTest/TopicIdWithOldInterBrokerProtocolTest.scala +++ b/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala @@ -17,19 +17,22 @@ package kafka.server -import java.util.Properties +import java.util.{Arrays, Properties} import kafka.api.KAFKA_2_7_IV0 import kafka.network.SocketServer - +import kafka.server.{BaseRequestTest, KafkaConfig} +import kafka.utils.TestUtils import org.apache.kafka.common.Uuid +import org.apache.kafka.common.message.DeleteTopicsRequestData +import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse} -import org.junit.jupiter.api.Assertions.assertEquals; +import org.apache.kafka.common.requests.{DeleteTopicsRequest, DeleteTopicsResponse, MetadataRequest, MetadataResponse} +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{BeforeEach, Test} -import scala.jdk.CollectionConverters._ import scala.collection.Seq +import scala.jdk.CollectionConverters._ class TopicIdWithOldInterBrokerProtocolTest extends BaseRequestTest { @@ -59,8 +62,55 @@ class TopicIdWithOldInterBrokerProtocolTest extends BaseRequestTest { } } + @Test + def testDeleteTopicsWithOldIBP(): Unit = { + val timeout = 10000 + createTopic("topic-3", 5, 2) + createTopic("topic-4", 1, 2) + val request = new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopicNames(Arrays.asList("topic-3", "topic-4")) + .setTimeoutMs(timeout)).build() + val resp = sendDeleteTopicsRequest(request) + val error = resp.errorCounts.asScala.find(_._1 != Errors.NONE) + assertTrue(error.isEmpty, s"There should be no errors, found ${resp.data.responses.asScala}") + request.data.topicNames.forEach { topic => + validateTopicIsDeleted(topic) + } + resp.data.responses.forEach { response => + assertEquals(Uuid.ZERO_UUID, response.topicId()) + } + } + + @Test + def testDeleteTopicsWithOldIBPUsingIDs(): Unit = { + val timeout = 10000 + createTopic("topic-7", 3, 2) + createTopic("topic-6", 1, 2) + val ids = Map("topic-7" -> Uuid.randomUuid(), "topic-6" -> Uuid.randomUuid()) + val request = new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopics(Arrays.asList(new DeleteTopicState().setTopicId(ids("topic-7")), + new DeleteTopicState().setTopicId(ids("topic-6")) + )).setTimeoutMs(timeout)).build() + val response = sendDeleteTopicsRequest(request) + val error = response.errorCounts.asScala + assertEquals(2, error(Errors.UNSUPPORTED_VERSION)) + } + private def sendMetadataRequest(request: MetadataRequest, destination: Option[SocketServer]): MetadataResponse = { connectAndReceive[MetadataResponse](request, destination = destination.getOrElse(anySocketServer)) } + private def sendDeleteTopicsRequest(request: DeleteTopicsRequest, socketServer: SocketServer = controllerSocketServer): DeleteTopicsResponse = { + connectAndReceive[DeleteTopicsResponse](request, destination = socketServer) + } + + private def validateTopicIsDeleted(topic: String): Unit = { + val metadata = connectAndReceive[MetadataResponse](new MetadataRequest.Builder( + List(topic).asJava, true).build).topicMetadata.asScala + TestUtils.waitUntilTrue (() => !metadata.exists(p => p.topic.equals(topic) && p.error == Errors.NONE), + s"The topic $topic should not exist") + } + }