From f983256d5bae1c354bfb0fb84ee8f08ff4989896 Mon Sep 17 00:00:00 2001 From: Justine Date: Tue, 1 Dec 2020 08:18:24 -0800 Subject: [PATCH 01/10] Returns topic ID in CreateTopicsResponse --- .../clients/admin/CreateTopicsResult.java | 24 ++++++++++++++++++- .../kafka/clients/admin/KafkaAdminClient.java | 2 +- .../common/message/CreateTopicsRequest.json | 2 +- .../common/message/CreateTopicsResponse.json | 3 ++- .../main/scala/kafka/server/KafkaApis.scala | 5 ++++ .../main/scala/kafka/zk/KafkaZkClient.scala | 8 +++---- .../kafka/api/BaseAdminIntegrationTest.scala | 9 +++++++ .../api/SaslSslAdminIntegrationTest.scala | 8 ++++++- .../server/ControllerMutationQuotaTest.scala | 2 +- .../unit/kafka/server/KafkaApisTest.scala | 7 ++++-- 10 files changed, 58 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java index eac086a34c7bc..0b75959cd8093 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.errors.ApiException; @@ -68,6 +69,19 @@ public KafkaFuture config(String topic) { return futures.get(topic).thenApply(TopicMetadataAndConfig::config); } + /** + * Returns a future that provides topic ID for the topic when the request completes. + *

+ * If broker version doesn't support replication factor in the response, throw + * {@link org.apache.kafka.common.errors.UnsupportedVersionException}. + * If broker returned an error for topic configs, throw appropriate exception. For example, + * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not + * have permission to describe topic configs. + */ + public KafkaFuture topicId(String topic) { + return futures.get(topic).thenApply(TopicMetadataAndConfig::topicId); + } + /** * Returns a future that provides number of partitions in the topic when the request completes. *

@@ -96,12 +110,14 @@ public KafkaFuture replicationFactor(String topic) { public static class TopicMetadataAndConfig { private final ApiException exception; + private final Uuid topicId; private final int numPartitions; private final int replicationFactor; private final Config config; - TopicMetadataAndConfig(int numPartitions, int replicationFactor, Config config) { + TopicMetadataAndConfig(Uuid topicId, int numPartitions, int replicationFactor, Config config) { this.exception = null; + this.topicId = topicId; this.numPartitions = numPartitions; this.replicationFactor = replicationFactor; this.config = config; @@ -109,10 +125,16 @@ public static class TopicMetadataAndConfig { TopicMetadataAndConfig(ApiException exception) { this.exception = exception; + this.topicId = Uuid.ZERO_UUID; this.numPartitions = UNKNOWN; this.replicationFactor = UNKNOWN; this.config = null; } + + public Uuid topicId() { + ensureSuccess(); + return topicId; + } public int numPartitions() { ensureSuccess(); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 9c931cd7556f7..82ef8d9048c34 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -1552,7 +1552,7 @@ public void handleResponse(AbstractResponse abstractResponse) { Config topicConfig = new Config(configs.stream() .map(this::configEntry) .collect(Collectors.toSet())); - topicMetadataAndConfig = new TopicMetadataAndConfig(result.numPartitions(), + topicMetadataAndConfig = new TopicMetadataAndConfig(result.topicId(), result.numPartitions(), result.replicationFactor(), topicConfig); } diff --git a/clients/src/main/resources/common/message/CreateTopicsRequest.json b/clients/src/main/resources/common/message/CreateTopicsRequest.json index c80add64a7b48..4f94178380255 100644 --- a/clients/src/main/resources/common/message/CreateTopicsRequest.json +++ b/clients/src/main/resources/common/message/CreateTopicsRequest.json @@ -26,7 +26,7 @@ // // Version 6 is identical to version 5 but may return a THROTTLING_QUOTA_EXCEEDED error // in the response if the topics creation is throttled (KIP-599). - "validVersions": "0-6", + "validVersions": "0-7", "flexibleVersions": "5+", "fields": [ { "name": "Topics", "type": "[]CreatableTopic", "versions": "0+", diff --git a/clients/src/main/resources/common/message/CreateTopicsResponse.json b/clients/src/main/resources/common/message/CreateTopicsResponse.json index 9f2296124a783..a0d7a2d4704e8 100644 --- a/clients/src/main/resources/common/message/CreateTopicsResponse.json +++ b/clients/src/main/resources/common/message/CreateTopicsResponse.json @@ -30,7 +30,7 @@ // // Version 6 is identical to version 5 but may return a THROTTLING_QUOTA_EXCEEDED error // in the response if the topics creation is throttled (KIP-599). - "validVersions": "0-6", + "validVersions": "0-7", "flexibleVersions": "5+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true, @@ -39,6 +39,7 @@ "about": "Results for each topic we tried to create.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "about": "The topic name." }, + { "name": "TopicId", "type": "uuid", "versions": "7+", "ignorable": true, "about": "The unique topic ID"}, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "1+", "nullableVersions": "0+", "ignorable": true, diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 4392406de5c55..accef974d830d 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1896,6 +1896,11 @@ class KafkaApis(val requestChannel: RequestChannel, .setTopicConfigErrorCode(Errors.NONE.code) } } + val topicIds = zkClient.getTopicIdsForTopics(results.asScala.map(result => result.name()).toSet) + topicIds.foreach { case (name, id) => + val result = results.find(name) + result.setTopicId(id) + } sendResponseCallback(results) } adminManager.createTopics( diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index c8122d7f95615..01b812573da8d 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -621,10 +621,10 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo case Code.NONODE => None case _ => throw getDataResponse.resultException.get } - }.map(_.get) - .map(topicIdAssignment => (topicIdAssignment.topic, - topicIdAssignment.topicId.getOrElse( - throw new IllegalStateException("Topic " + topicIdAssignment.topic + " does not have a topic ID.")))) + }.filter(_.isDefined) + .map(_.get) + .map(topicIdAssignment => (topicIdAssignment.topic, topicIdAssignment.topicId.getOrElse( + throw new IllegalStateException("Topic " + topicIdAssignment.topic + " does not have a topic ID.")))) .toMap } diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala index a1fc75bebf8b1..521fa2a7130ad 100644 --- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala @@ -25,6 +25,7 @@ import kafka.server.KafkaConfig import kafka.utils.Logging import kafka.utils.TestUtils._ import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, CreateTopicsOptions, CreateTopicsResult, DescribeClusterOptions, DescribeTopicsOptions, NewTopic, TopicDescription} +import org.apache.kafka.common.Uuid import org.apache.kafka.common.acl.AclOperation import org.apache.kafka.common.errors.{TopicExistsException, UnknownTopicOrPartitionException} import org.apache.kafka.common.resource.ResourceType @@ -94,6 +95,14 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg createResult.all.get() waitForTopics(client, topics, List()) validateMetadataAndConfigs(createResult) + val topicIds = zkClient.getTopicIdsForTopics(topics.toSet) + assertNotEquals(Uuid.ZERO_UUID, createResult.topicId("mytopic").get()) + assertNotEquals(Uuid.ZERO_UUID, createResult.topicId("mytopic2").get()) + assertNotEquals(Uuid.ZERO_UUID, createResult.topicId("mytopic3").get()) + assertEquals(topicIds("mytopic"), createResult.topicId("mytopic").get()) + assertEquals(topicIds("mytopic2"), createResult.topicId("mytopic2").get()) + assertEquals(topicIds("mytopic3"), createResult.topicId("mytopic3").get()) + val failedCreateResult = client.createTopics(newTopics.asJava) val results = failedCreateResult.values() diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala index c4e291f3ab92b..ad90d1a25d751 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala @@ -20,6 +20,7 @@ import kafka.server.{Defaults, KafkaConfig} import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils} import kafka.utils.TestUtils._ import org.apache.kafka.clients.admin._ +import org.apache.kafka.common.Uuid import org.apache.kafka.common.acl._ import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE} import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY} @@ -27,7 +28,7 @@ import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException, TopicAuthorizationException, UnknownTopicOrPartitionException} import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} -import org.junit.Assert.{assertEquals, assertTrue} +import org.junit.Assert.{assertEquals, assertNotEquals, assertTrue} import org.junit.{After, Assert, Before, Test} import scala.annotation.nowarn @@ -427,6 +428,11 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu createResult.all.get() waitForTopics(client, topics, List()) validateMetadataAndConfigs(createResult) + val topicIds = zkClient.getTopicIdsForTopics(topics.toSet) + assertNotEquals(Uuid.ZERO_UUID, createResult.topicId(topic1).get()) + assertEquals(topicIds(topic1), createResult.topicId(topic1).get()) + assertFutureExceptionTypeEquals(createResult.topicId(topic2), classOf[TopicAuthorizationException]) + val createResponseConfig = createResult.config(topic1).get().entries.asScala val describeResponseConfig = describeConfigs(topic1) diff --git a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala index 7407a984b9029..262bf58e2cb8b 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala @@ -176,7 +176,7 @@ class ControllerMutationQuotaTest extends BaseRequestTest { } @Test - def testPermissiveCreateTopicsRequest(): Unit = { + def OtestPermissiveCreateTopicsRequest(): Unit = { asPrincipal(ThrottledPrincipal) { // Create two topics worth of 30 partitions each. As we use a permissive quota, we // expect both topics to be created. diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index f26c5d3ee3c65..7a13ede72c1f0 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -64,7 +64,7 @@ import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _} import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType} import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol} import org.apache.kafka.common.utils.ProducerIdAndEpoch -import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition} +import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid} import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer} import org.easymock.EasyMock._ import org.easymock.{Capture, EasyMock, IAnswer, IArgumentMatcher} @@ -744,9 +744,12 @@ class KafkaApisTest { anyObject(), EasyMock.eq(UnboundedControllerMutationQuota), EasyMock.capture(capturedCallback))) + + EasyMock.expect(zkClient.getTopicIdsForTopics(Set(authorizedTopic, unauthorizedTopic))).andReturn( + Map(authorizedTopic -> Uuid.ZERO_UUID, unauthorizedTopic -> Uuid.ZERO_UUID)) EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager, - requestChannel, authorizer, adminManager, controller) + requestChannel, authorizer, adminManager, controller, zkClient) createKafkaApis(authorizer = Some(authorizer)).handleCreateTopicsRequest(request) From 8b14ab47fe4ef3b7b0418ab125eda17ebc96e506 Mon Sep 17 00:00:00 2001 From: Justine Date: Thu, 3 Dec 2020 14:31:10 -0800 Subject: [PATCH 02/10] DeleteTopicsRequest allows for specifying topic IDs --- .../org/apache/kafka/clients/admin/Admin.java | 36 +++++ .../admin/DeleteTopicsWithIdsResult.java | 54 +++++++ .../kafka/clients/admin/KafkaAdminClient.java | 101 ++++++++++++ .../errors/UnknownTopicIdException.java | 27 ++++ .../apache/kafka/common/protocol/Errors.java | 4 +- .../common/requests/DeleteTopicsRequest.java | 46 +++++- .../common/message/DeleteTopicsRequest.json | 11 +- .../common/message/DeleteTopicsResponse.json | 5 +- .../clients/admin/KafkaAdminClientTest.java | 153 ++++++++++++++++++ .../kafka/clients/admin/MockAdminClient.java | 36 ++++- .../main/scala/kafka/server/KafkaApis.scala | 37 +++-- .../kafka/api/BaseAdminIntegrationTest.scala | 18 +++ .../server/DeleteTopicsRequestTest.scala | 55 +++++++ 13 files changed, 567 insertions(+), 16 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsWithIdsResult.java create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicIdException.java diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index 90e9b0b64be68..7d5b7ea2f7eea 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.TopicPartitionReplica; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBindingFilter; @@ -246,6 +247,41 @@ default DeleteTopicsResult deleteTopics(Collection topics) { * @return The DeleteTopicsResult. */ DeleteTopicsResult deleteTopics(Collection topics, DeleteTopicsOptions options); + + /** + * This is a convenience method for {@link #deleteTopicsWithIds(Collection, DeleteTopicsOptions)} + * with default options. See the overload for more details. + *

+ * This operation is supported by brokers with version x.x.x.x or higher. + * + * @param topics The topic IDs for the topics to delete. + * @return The DeleteTopicsResult. + */ + default DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection topics) { + return deleteTopicsWithIds(topics, new DeleteTopicsOptions()); + } + + /** + * Delete a batch of topics. + *

+ * This operation is not transactional so it may succeed for some topics while fail for others. + *

+ * It may take several seconds after the {@link DeleteTopicsResult} returns + * success for all the brokers to become aware that the topics are gone. + * During this time, {@link #listTopics()} and {@link #describeTopics(Collection)} + * may continue to return information about the deleted topics. + *

+ * If delete.topic.enable is false on the brokers, deleteTopics will mark + * the topics for deletion, but not actually delete them. The futures will + * return successfully in this case. + *

+ * This operation is supported by brokers with version x.x.x.x or higher. + * + * @param topics The topic IDs for the topics to delete. + * @param options The options to use when deleting the topics. + * @return The DeleteTopicsResult. + */ + DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection topics, DeleteTopicsOptions options); /** * List the topics available in the cluster with the default options. diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsWithIdsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsWithIdsResult.java new file mode 100644 index 0000000000000..76fc7e0362d84 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsWithIdsResult.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Collection; +import java.util.Map; + +/** + * The result of the {@link Admin#deleteTopics(Collection)} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DeleteTopicsWithIdsResult { + final Map> futures; + + DeleteTopicsWithIdsResult(Map> futures) { + this.futures = futures; + } + + /** + * Return a map from topic IDs to futures which can be used to check the status of + * individual deletions. + */ + public Map> values() { + return futures; + } + + /** + * Return a future which succeeds only if all the topic deletions succeed. + */ + public KafkaFuture all() { + return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 82ef8d9048c34..6e0fdbc12a1eb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -49,6 +49,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; @@ -108,6 +109,7 @@ import org.apache.kafka.common.message.DeleteRecordsResponseData; import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsTopicResult; import org.apache.kafka.common.message.DeleteTopicsRequestData; +import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState; import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult; import org.apache.kafka.common.message.DescribeConfigsRequestData; import org.apache.kafka.common.message.DescribeConfigsResponseData; @@ -1623,6 +1625,32 @@ public DeleteTopicsResult deleteTopics(final Collection topicNames, return new DeleteTopicsResult(new HashMap<>(topicFutures)); } + @Override + public DeleteTopicsWithIdsResult deleteTopicsWithIds(final Collection topicIds, + final DeleteTopicsOptions options) { + final Map> topicFutures = new HashMap<>(topicIds.size()); + final List validTopicIds = new ArrayList<>(topicIds.size()); + for (Uuid topicId : topicIds) { + if (topicId.equals(Uuid.ZERO_UUID)) { + KafkaFutureImpl future = new KafkaFutureImpl<>(); + future.completeExceptionally(new InvalidTopicException("The given topic ID '" + + topicId + "' cannot be represented in a request.")); + topicFutures.put(topicId, future); + } else if (!topicFutures.containsKey(topicId)) { + topicFutures.put(topicId, new KafkaFutureImpl<>()); + validTopicIds.add(topicId); + } + } + if (!validTopicIds.isEmpty()) { + final long now = time.milliseconds(); + final long deadline = calcDeadlineMs(now, options.timeoutMs()); + final Call call = getDeleteTopicsWithIdsCall(options, topicFutures, validTopicIds, + Collections.emptyMap(), now, deadline); + runnable.call(call, now); + } + return new DeleteTopicsWithIdsResult(new HashMap<>(topicFutures)); + } + private Call getDeleteTopicsCall(final DeleteTopicsOptions options, final Map> futures, final List topics, @@ -1694,6 +1722,79 @@ void handleFailure(Throwable throwable) { } }; } + + private Call getDeleteTopicsWithIdsCall(final DeleteTopicsOptions options, + final Map> futures, + final List topicIds, + final Map quotaExceededExceptions, + final long now, + final long deadline) { + return new Call("deleteTopics", deadline, new ControllerNodeProvider()) { + @Override + DeleteTopicsRequest.Builder createRequest(int timeoutMs) { + return new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopics(topicIds.stream().map( + topic -> new DeleteTopicState().setTopicId(topic)).collect(Collectors.toList())) + .setTimeoutMs(timeoutMs)); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + // Check for controller change + handleNotControllerError(abstractResponse); + // Handle server responses for particular topics. + final DeleteTopicsResponse response = (DeleteTopicsResponse) abstractResponse; + final List retryTopics = new ArrayList<>(); + final Map retryTopicQuotaExceededExceptions = new HashMap<>(); + for (DeletableTopicResult result : response.data().responses()) { + KafkaFutureImpl future = futures.get(result.topicId()); + if (future == null) { + log.warn("Server response mentioned unknown topic {}", result.name()); + } else { + ApiError error = new ApiError(result.errorCode(), result.errorMessage()); + if (error.isFailure()) { + if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) { + ThrottlingQuotaExceededException quotaExceededException = new ThrottlingQuotaExceededException( + response.throttleTimeMs(), error.messageWithFallback()); + if (options.shouldRetryOnQuotaViolation()) { + retryTopics.add(result.topicId()); + retryTopicQuotaExceededExceptions.put(result.topicId(), quotaExceededException); + } else { + future.completeExceptionally(quotaExceededException); + } + } else { + future.completeExceptionally(error.exception()); + } + } else { + future.complete(null); + } + } + } + // If there are topics to retry, retry them; complete unrealized futures otherwise. + if (retryTopics.isEmpty()) { + // The server should send back a response for every topic. But do a sanity check anyway. + completeUnrealizedFutures(futures.entrySet().stream(), + topic -> "The controller response did not contain a result for topic " + topic); + } else { + final long now = time.milliseconds(); + final Call call = getDeleteTopicsWithIdsCall(options, futures, retryTopics, + retryTopicQuotaExceededExceptions, now, deadline); + runnable.call(call, now); + } + } + + @Override + void handleFailure(Throwable throwable) { + // If there were any topics retries due to a quota exceeded exception, we propagate + // the initial error back to the caller if the request timed out. + maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(), + throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now)); + // Fail all the other remaining futures + completeAllExceptionally(futures.values(), throwable); + } + }; + } @Override public ListTopicsResult listTopics(final ListTopicsOptions options) { diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicIdException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicIdException.java new file mode 100644 index 0000000000000..e5023eb7cdbb3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicIdException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class UnknownTopicIdException extends InvalidMetadataException { + + private static final long serialVersionUID = 1L; + + public UnknownTopicIdException(String message) { + super(message); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 5332369f0f9be..230c7ab5cb8e6 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -108,6 +108,7 @@ import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.errors.UnknownProducerIdException; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnknownTopicIdException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnstableOffsetCommitException; import org.apache.kafka.common.errors.UnsupportedByAuthenticationException; @@ -341,7 +342,8 @@ public enum Errors { INVALID_UPDATE_VERSION(95, "The given update version was invalid.", InvalidUpdateVersionException::new), FEATURE_UPDATE_FAILED(96, "Unable to update finalized features due to an unexpected server error.", FeatureUpdateFailedException::new), PRINCIPAL_DESERIALIZATION_FAILURE(97, "Request principal deserialization failed during forwarding. " + - "This indicates an internal error on the broker cluster security setup.", PrincipalDeserializationException::new); + "This indicates an internal error on the broker cluster security setup.", PrincipalDeserializationException::new), + UNKNOWN_TOPIC_ID(98, "This server does not host this topic ID.", UnknownTopicIdException::new); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java index 440acfd2b838a..9eb5d7173cb5e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java @@ -16,13 +16,19 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.DeleteTopicsRequestData; import org.apache.kafka.common.message.DeleteTopicsResponseData; import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult; +import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; public class DeleteTopicsRequest extends AbstractRequest { @@ -39,8 +45,25 @@ public Builder(DeleteTopicsRequestData data) { @Override public DeleteTopicsRequest build(short version) { + if (version >= 6 && data.topicNames().size() != 0) { + data.setTopics(groupByTopic(data.topicNames())); + } else if (version >= 6) { + for (DeleteTopicState topic : data.topics()) { + if (topic.name().equals("")) { + topic.setName(null); + } + } + } return new DeleteTopicsRequest(data, version); } + + private List groupByTopic(List topics) { + List topicStates = new ArrayList<>(); + for (String topic : topics) { + topicStates.add(new DeleteTopicState().setName(topic)); + } + return topicStates; + } @Override public String toString() { @@ -76,13 +99,32 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { response.setThrottleTimeMs(throttleTimeMs); } ApiError apiError = ApiError.fromThrowable(e); - for (String topic : data.topicNames()) { + for (DeleteTopicState topic : topics()) { response.responses().add(new DeletableTopicResult() - .setName(topic) + .setName(topic.name()) + .setTopicId(topic.topicId()) .setErrorCode(apiError.error().code())); } return new DeleteTopicsResponse(response); } + + public List topicNames() { + if (version >= 6) + return data.topics().stream().map(topic -> topic.name()).collect(Collectors.toList()); + return data.topicNames(); + } + + public List topicIds() { + if (version >= 6) + return data.topics().stream().map(topic -> topic.topicId()).collect(Collectors.toList()); + return Collections.emptyList(); + } + + public List topics() { + if (version >= 6) + return data.topics(); + return data.topicNames().stream().map(name -> new DeleteTopicState().setName(name)).collect(Collectors.toList()); + } public static DeleteTopicsRequest parse(ByteBuffer buffer, short version) { return new DeleteTopicsRequest(ApiKeys.DELETE_TOPICS.parseRequest(version, buffer), version); diff --git a/clients/src/main/resources/common/message/DeleteTopicsRequest.json b/clients/src/main/resources/common/message/DeleteTopicsRequest.json index 9dfba16515633..0aa911bdcf189 100644 --- a/clients/src/main/resources/common/message/DeleteTopicsRequest.json +++ b/clients/src/main/resources/common/message/DeleteTopicsRequest.json @@ -23,10 +23,17 @@ // // Version 5 adds ErrorMessage in the response and may return a THROTTLING_QUOTA_EXCEEDED error // in the response if the topics deletion is throttled (KIP-599). - "validVersions": "0-5", + // + // Version 6 reorganizes topics, adds topic IDs and allows topic names to be null. + "validVersions": "0-6", "flexibleVersions": "4+", "fields": [ - { "name": "TopicNames", "type": "[]string", "versions": "0+", "entityType": "topicName", + { "name": "Topics", "type": "[]DeleteTopicState", "versions": "6+", "about": "The name or topic ID of the topic", + "fields": [ + {"name": "Name", "type": "string", "versions": "6+", "nullableVersions": "6+", "about": "The topic name"}, + {"name": "TopicId", "type": "uuid", "versions": "6+", "about": "The unique topic ID"} + ]}, + { "name": "TopicNames", "type": "[]string", "versions": "0-5", "entityType": "topicName", "ignorable": true, "about": "The names of the topics to delete" }, { "name": "TimeoutMs", "type": "int32", "versions": "0+", "about": "The length of time in milliseconds to wait for the deletions to complete." } diff --git a/clients/src/main/resources/common/message/DeleteTopicsResponse.json b/clients/src/main/resources/common/message/DeleteTopicsResponse.json index 6986b0adb339a..bc68765de12fd 100644 --- a/clients/src/main/resources/common/message/DeleteTopicsResponse.json +++ b/clients/src/main/resources/common/message/DeleteTopicsResponse.json @@ -27,7 +27,9 @@ // // Version 5 adds ErrorMessage in the response and may return a THROTTLING_QUOTA_EXCEEDED error // in the response if the topics deletion is throttled (KIP-599). - "validVersions": "0-5", + // + // Version 6 adds topic ID to responses and an UNKNOWN_TOPIC_ID error code may be returned. + "validVersions": "0-6", "flexibleVersions": "4+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, @@ -36,6 +38,7 @@ "about": "The results for each topic we tried to delete.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "about": "The topic name" }, + {"name": "TopicId", "type": "uuid", "versions": "6+", "ignorable": "true", "about": "the unique topic ID"}, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The deletion error, or 0 if the deletion succeeded." }, { "name": "ErrorMessage", "type": "string", "versions": "5+", "nullableVersions": "5+", "ignorable": true, "default": "null", diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 1d2dd4ae23997..e391acae6ce43 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AccessControlEntryFilter; import org.apache.kafka.common.acl.AclBinding; @@ -65,6 +66,7 @@ import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnknownTopicIdException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.feature.Features; @@ -406,6 +408,12 @@ public static DeletableTopicResult deletableTopicResult(String topicName, Errors .setErrorCode(error.code()); } + public static DeletableTopicResult deletableTopicResultWithId(Uuid topicId, Errors error) { + return new DeletableTopicResult() + .setTopicId(topicId) + .setErrorCode(error.code()); + } + public static CreatePartitionsResponse prepareCreatePartitionsResponse(int throttleTimeMs, CreatePartitionsTopicResult... topics) { CreatePartitionsResponseData data = new CreatePartitionsResponseData() .setThrottleTimeMs(throttleTimeMs) @@ -432,6 +440,14 @@ private static DeleteTopicsResponse prepareDeleteTopicsResponse(String topicName return new DeleteTopicsResponse(data); } + private static DeleteTopicsResponse prepareDeleteTopicsResponseWithTopicId(Uuid id, Errors error) { + DeleteTopicsResponseData data = new DeleteTopicsResponseData(); + data.responses().add(new DeletableTopicResult() + .setTopicId(id) + .setErrorCode(error.code())); + return new DeleteTopicsResponse(data); + } + private static FindCoordinatorResponse prepareFindCoordinatorResponse(Errors error, Node node) { return FindCoordinatorResponse.prepareResponse(error, node); } @@ -865,8 +881,33 @@ public void testDeleteTopics() throws Exception { future = env.adminClient().deleteTopics(singletonList("myTopic"), new DeleteTopicsOptions()).all(); TestUtils.assertFutureError(future, UnknownTopicOrPartitionException.class); + + // With topic IDs + Uuid topicId = Uuid.randomUuid(); + + env.kafkaClient().prepareResponse( + expectDeleteTopicsRequestWithTopicIds(topicId), + prepareDeleteTopicsResponseWithTopicId(topicId, Errors.NONE)); + future = env.adminClient().deleteTopicsWithIds(singletonList(topicId), + new DeleteTopicsOptions()).all(); + assertNull(future.get()); + + env.kafkaClient().prepareResponse( + expectDeleteTopicsRequestWithTopicIds(topicId), + prepareDeleteTopicsResponseWithTopicId(topicId, Errors.TOPIC_DELETION_DISABLED)); + future = env.adminClient().deleteTopicsWithIds(singletonList(topicId), + new DeleteTopicsOptions()).all(); + TestUtils.assertFutureError(future, TopicDeletionDisabledException.class); + + env.kafkaClient().prepareResponse( + expectDeleteTopicsRequestWithTopicIds(topicId), + prepareDeleteTopicsResponseWithTopicId(topicId, Errors.UNKNOWN_TOPIC_ID)); + future = env.adminClient().deleteTopicsWithIds(singletonList(topicId), + new DeleteTopicsOptions()).all(); + TestUtils.assertFutureError(future, UnknownTopicIdException.class); } } + @Test public void testDeleteTopicsPartialResponse() throws Exception { @@ -883,6 +924,20 @@ public void testDeleteTopicsPartialResponse() throws Exception { result.values().get("myTopic").get(); TestUtils.assertFutureThrows(result.values().get("myOtherTopic"), ApiException.class); + + // With topic IDs + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + env.kafkaClient().prepareResponse( + expectDeleteTopicsRequestWithTopicIds(topicId1, topicId2), + prepareDeleteTopicsResponse(1000, + deletableTopicResultWithId(topicId1, Errors.NONE))); + + DeleteTopicsWithIdsResult resultIds = env.adminClient().deleteTopicsWithIds( + asList(topicId1, topicId2), new DeleteTopicsOptions()); + + resultIds.values().get(topicId1).get(); + TestUtils.assertFutureThrows(resultIds.values().get(topicId2), ApiException.class); } } @@ -915,6 +970,36 @@ public void testDeleteTopicsRetryThrottlingExceptionWhenEnabled() throws Excepti assertNull(result.values().get("topic1").get()); assertNull(result.values().get("topic2").get()); TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class); + + // With topic IDs + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + Uuid topicId3 = Uuid.randomUuid(); + + env.kafkaClient().prepareResponse( + expectDeleteTopicsRequestWithTopicIds(topicId1, topicId2, topicId3), + prepareDeleteTopicsResponse(1000, + deletableTopicResultWithId(topicId1, Errors.NONE), + deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED), + deletableTopicResultWithId(topicId3, Errors.TOPIC_ALREADY_EXISTS))); + + env.kafkaClient().prepareResponse( + expectDeleteTopicsRequestWithTopicIds(topicId2), + prepareDeleteTopicsResponse(1000, + deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED))); + + env.kafkaClient().prepareResponse( + expectDeleteTopicsRequestWithTopicIds(topicId2), + prepareDeleteTopicsResponse(0, + deletableTopicResultWithId(topicId2, Errors.NONE))); + + DeleteTopicsWithIdsResult resultIds = env.adminClient().deleteTopicsWithIds( + asList(topicId1, topicId2, topicId3), + new DeleteTopicsOptions().retryOnQuotaViolation(true)); + + assertNull(resultIds.values().get(topicId1).get()); + assertNull(resultIds.values().get(topicId2).get()); + TestUtils.assertFutureThrows(resultIds.values().get(topicId3), TopicExistsException.class); } } @@ -960,6 +1045,43 @@ public void testDeleteTopicsRetryThrottlingExceptionWhenEnabledUntilRequestTimeO ThrottlingQuotaExceededException.class); assertEquals(0, e.throttleTimeMs()); TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class); + + // With topic IDs + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + Uuid topicId3 = Uuid.randomUuid(); + env.kafkaClient().prepareResponse( + expectDeleteTopicsRequestWithTopicIds(topicId1, topicId2, topicId3), + prepareDeleteTopicsResponse(1000, + deletableTopicResultWithId(topicId1, Errors.NONE), + deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED), + deletableTopicResultWithId(topicId3, Errors.TOPIC_ALREADY_EXISTS))); + + env.kafkaClient().prepareResponse( + expectDeleteTopicsRequestWithTopicIds(topicId2), + prepareDeleteTopicsResponse(1000, + deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED))); + + DeleteTopicsWithIdsResult resultIds = env.adminClient().deleteTopicsWithIds( + asList(topicId1, topicId2, topicId3), + new DeleteTopicsOptions().retryOnQuotaViolation(true)); + + // Wait until the prepared attempts have consumed + TestUtils.waitForCondition(() -> env.kafkaClient().numAwaitingResponses() == 0, + "Failed awaiting DeleteTopics requests"); + + // Wait until the next request is sent out + TestUtils.waitForCondition(() -> env.kafkaClient().inFlightRequestCount() == 1, + "Failed awaiting next DeleteTopics request"); + + // Advance time past the default api timeout to time out the inflight request + time.sleep(defaultApiTimeout + 1); + + assertNull(resultIds.values().get(topicId1).get()); + e = TestUtils.assertFutureThrows(resultIds.values().get(topicId2), + ThrottlingQuotaExceededException.class); + assertEquals(0, e.throttleTimeMs()); + TestUtils.assertFutureThrows(resultIds.values().get(topicId3), TopicExistsException.class); } } @@ -984,6 +1106,27 @@ public void testDeleteTopicsDontRetryThrottlingExceptionWhenDisabled() throws Ex ThrottlingQuotaExceededException.class); assertEquals(1000, e.throttleTimeMs()); TestUtils.assertFutureError(result.values().get("topic3"), TopicExistsException.class); + + // With topic IDs + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + Uuid topicId3 = Uuid.randomUuid(); + env.kafkaClient().prepareResponse( + expectDeleteTopicsRequestWithTopicIds(topicId1, topicId2, topicId3), + prepareDeleteTopicsResponse(1000, + deletableTopicResultWithId(topicId1, Errors.NONE), + deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED), + deletableTopicResultWithId(topicId3, Errors.TOPIC_ALREADY_EXISTS))); + + DeleteTopicsWithIdsResult resultIds = env.adminClient().deleteTopicsWithIds( + asList(topicId1, topicId2, topicId3), + new DeleteTopicsOptions().retryOnQuotaViolation(false)); + + assertNull(resultIds.values().get(topicId1).get()); + e = TestUtils.assertFutureThrows(resultIds.values().get(topicId2), + ThrottlingQuotaExceededException.class); + assertEquals(1000, e.throttleTimeMs()); + TestUtils.assertFutureError(resultIds.values().get(topicId3), TopicExistsException.class); } } @@ -997,6 +1140,16 @@ private MockClient.RequestMatcher expectDeleteTopicsRequestWithTopics(final Stri }; } + private MockClient.RequestMatcher expectDeleteTopicsRequestWithTopicIds(final Uuid... topicIds) { + return body -> { + if (body instanceof DeleteTopicsRequest) { + DeleteTopicsRequest request = (DeleteTopicsRequest) body; + return request.topicIds().equals(Arrays.asList(topicIds)); + } + return false; + }; + } + @Test public void testInvalidTopicNames() throws Exception { try (AdminClientUnitTestEnv env = mockClientEnv()) { diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 3e9f605923bb7..761a3443d9a6b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -17,15 +17,16 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.ElectionType; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; @@ -60,6 +61,7 @@ public class MockAdminClient extends AdminClient { private final List brokers; private final Map allTopics = new HashMap<>(); + private final Map topicNames = new HashMap<>(); private final Map reassignments = new HashMap<>(); private final Map replicaMoves = @@ -298,6 +300,7 @@ synchronized public CreateTopicsResult createTopics(Collection newTopi logDirs.add(brokerLogDirs.get(partitions.get(i).leader().id()).get(0)); } allTopics.put(topicName, new TopicMetadata(false, partitions, logDirs, newTopic.configs())); + topicNames.put(Uuid.randomUuid(), topicName); future.complete(null); createTopicResult.put(topicName, future); } @@ -401,6 +404,37 @@ synchronized public DeleteTopicsResult deleteTopics(Collection topicsToD return new DeleteTopicsResult(deleteTopicsResult); } + @Override + synchronized public DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection topicsToDelete, DeleteTopicsOptions options) { + Map> deleteTopicsWithIdsResult = new HashMap<>(); + + if (timeoutNextRequests > 0) { + for (final Uuid topicId : topicsToDelete) { + KafkaFutureImpl future = new KafkaFutureImpl<>(); + future.completeExceptionally(new TimeoutException()); + deleteTopicsWithIdsResult.put(topicId, future); + } + + --timeoutNextRequests; + return new DeleteTopicsWithIdsResult(deleteTopicsWithIdsResult); + } + + for (final Uuid topicId : topicsToDelete) { + KafkaFutureImpl future = new KafkaFutureImpl<>(); + + String name = topicNames.remove(topicId); + if (name == null) { + future.completeExceptionally(new UnknownTopicOrPartitionException(String.format("Topic %s does not exist.", topicId))); + } else { + allTopics.remove(name); + future.complete(null); + } + deleteTopicsWithIdsResult.put(topicId, future); + } + + return new DeleteTopicsWithIdsResult(deleteTopicsWithIdsResult); + } + @Override synchronized public CreatePartitionsResult createPartitions(Map newPartitions, CreatePartitionsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index accef974d830d..a7c3b096bfd05 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -78,7 +78,7 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation} import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time, Utils} -import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.common.{Node, TopicPartition, Uuid} import org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic} import org.apache.kafka.server.authorizer._ @@ -1986,42 +1986,61 @@ class KafkaApis(val requestChannel: RequestChannel, val results = new DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size) val toDelete = mutable.Set[String]() if (!controller.isActive) { - deleteTopicRequest.data.topicNames.forEach { topic => + deleteTopicRequest.topics().forEach { topic => results.add(new DeletableTopicResult() - .setName(topic) + .setName(topic.name()) + .setTopicId(topic.topicId()) .setErrorCode(Errors.NOT_CONTROLLER.code)) } sendResponseCallback(results) } else if (!config.deleteTopicEnable) { val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST else Errors.TOPIC_DELETION_DISABLED - deleteTopicRequest.data.topicNames.forEach { topic => + deleteTopicRequest.topics().forEach { topic => results.add(new DeletableTopicResult() - .setName(topic) + .setName(topic.name()) + .setTopicId(topic.topicId()) .setErrorCode(error.code)) } sendResponseCallback(results) } else { - deleteTopicRequest.data.topicNames.forEach { topic => + deleteTopicRequest.topics().forEach { topic => + val name = if (topic.name() != null) topic.name() + else controller.controllerContext.topicNames.getOrElse(topic.topicId(), "") results.add(new DeletableTopicResult() - .setName(topic)) + .setName(name) + .setTopicId(topic.topicId())) } + val authorizedTopics = filterByAuthorized(request.context, DELETE, TOPIC, results.asScala)(_.name) results.forEach { topic => - if (!authorizedTopics.contains(topic.name)) + val foundTopicId = !topic.topicId().equals(Uuid.ZERO_UUID) && !topic.name().equals("") + val topicIdZero = topic.topicId().equals(Uuid.ZERO_UUID) + if (!foundTopicId && !topicIdZero) + topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code) + else if (!authorizedTopics.contains(topic.name)) topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) else if (!metadataCache.contains(topic.name)) topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) - else + else { + if (topicIdZero) + topic.setTopicId(controller.controllerContext.topicIds(topic.name())) toDelete += topic.name + } } // If no authorized topics return immediately if (toDelete.isEmpty) sendResponseCallback(results) else { def handleDeleteTopicsResults(errors: Map[String, Errors]): Unit = { + //def find( name: String, id: Uuid) = { + // val _key = new DeleteTopicsResponseData.DeletableTopicResult + // _key.setName(name).setTopicId(id) + // results.find(_key) + //} errors.foreach { case (topicName, error) => + //val topic = find(topicName, controller.controllerContext.topicIds(topicName)) results.find(topicName) .setErrorCode(error.code) } diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala index 521fa2a7130ad..161477f6f6e0d 100644 --- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala @@ -161,6 +161,24 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg client.deleteTopics(topics.asJava).all.get() waitForTopics(client, List(), topics) } + + @Test + def testDeleteTopicsWithIds(): Unit = { + client = Admin.create(createConfig) + val topics = Seq("mytopic", "mytopic2", "mytopic3") + val newTopics = Seq( + new NewTopic("mytopic", Map((0: Integer) -> Seq[Integer](1, 2).asJava, (1: Integer) -> Seq[Integer](2, 0).asJava).asJava), + new NewTopic("mytopic2", 3, 3.toShort), + new NewTopic("mytopic3", Option.empty[Integer].asJava, Option.empty[java.lang.Short].asJava) + ) + val createResult = client.createTopics(newTopics.asJava) + createResult.all.get() + waitForTopics(client, topics, List()) + val topicIds = zkClient.getTopicIdsForTopics(topics.toSet).values.toSet + + client.deleteTopicsWithIds(topicIds.asJava).all.get() + waitForTopics(client, List(), topics) + } @Test def testAuthorizedOperations(): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala index 2994fd926e4fe..eb80bdc1b99a7 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala @@ -21,7 +21,9 @@ import java.util.{Arrays, Collections} import kafka.network.SocketServer import kafka.utils._ +import org.apache.kafka.common.Uuid import org.apache.kafka.common.message.DeleteTopicsRequestData +import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{DeleteTopicsRequest, DeleteTopicsResponse, MetadataRequest, MetadataResponse} import org.junit.Assert._ @@ -47,6 +49,17 @@ class DeleteTopicsRequestTest extends BaseRequestTest { new DeleteTopicsRequestData() .setTopicNames(Arrays.asList("topic-3", "topic-4")) .setTimeoutMs(timeout)).build()) + + // Topic Ids + createTopic("topic-7", 3, 2) + createTopic("topic-6", 1, 2) + val ids = zkClient.getTopicIdsForTopics(Set("topic-7", "topic-6")) + validateValidDeleteTopicRequestsWithIds(new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopics(Arrays.asList(new DeleteTopicState().setTopicId(ids("topic-7")) + //new DeleteTopicState().setTopicId(ids("topic-6")) + ) + ).setTimeoutMs(timeout)).build()) } private def validateValidDeleteTopicRequests(request: DeleteTopicsRequest): Unit = { @@ -58,6 +71,15 @@ class DeleteTopicsRequestTest extends BaseRequestTest { } } + private def validateValidDeleteTopicRequestsWithIds(request: DeleteTopicsRequest): Unit = { + val response = sendDeleteTopicsRequest(request) + val error = response.errorCounts.asScala.find(_._1 != Errors.NONE) + assertTrue(s"There should be no errors, found ${response.data.responses.asScala}", error.isEmpty) + response.data().responses().forEach { response => + validateTopicIsDeleted(response.name()) + } + } + @Test def testErrorDeleteTopicRequests(): Unit = { val timeout = 30000 @@ -81,6 +103,21 @@ class DeleteTopicsRequestTest extends BaseRequestTest { "partial-invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION ) ) + + // Topic IDs + createTopic("topic-id-1", 1, 1) + val validId = zkClient.getTopicIdsForTopics(Set("topic-id-1"))("topic-id-1") + val invalidId = Uuid.randomUuid + validateErrorDeleteTopicRequestsWithIds(new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopics(Arrays.asList(new DeleteTopicState().setTopicId(invalidId), + new DeleteTopicState().setTopicId(validId))) + .setTimeoutMs(timeout)).build(), + Map( + invalidId -> Errors.UNKNOWN_TOPIC_ID, + validId -> Errors.NONE + ) + ) // Timeout createTopic(timeoutTopic, 5, 2) @@ -111,6 +148,24 @@ class DeleteTopicsRequestTest extends BaseRequestTest { } } + private def validateErrorDeleteTopicRequestsWithIds(request: DeleteTopicsRequest, expectedResponse: Map[Uuid, Errors]): Unit = { + val response = sendDeleteTopicsRequest(request) + val responses = response.data.responses + val errors = responses.asScala.map(result => result.topicId() -> result.errorCode()).toMap + val names = responses.asScala.map(result => result.topicId() -> result.name()).toMap + + val errorCount = response.errorCounts().asScala.foldLeft(0)(_+_._2) + assertEquals("The response size should match", expectedResponse.size, errorCount) + + expectedResponse.foreach { case (topic, expectedError) => + assertEquals("The response error should match", expectedResponse(topic).code, errors(topic)) + // If no error validate the topic was deleted + if (expectedError == Errors.NONE) { + validateTopicIsDeleted(names(topic)) + } + } + } + @Test def testNotController(): Unit = { val request = new DeleteTopicsRequest.Builder( From b8c1a0a8c8c7e33e7d260835c11b0590384c8e3e Mon Sep 17 00:00:00 2001 From: Justine Date: Thu, 3 Dec 2020 15:25:03 -0800 Subject: [PATCH 03/10] Fix some typos, correctly add/remove topic Ids in MockAdminClient, fix comments --- .../java/org/apache/kafka/clients/admin/Admin.java | 12 ++++++------ .../clients/admin/DeleteTopicsWithIdsResult.java | 2 +- .../common/message/CreateTopicsRequest.json | 2 ++ .../common/message/CreateTopicsResponse.json | 2 ++ .../kafka/clients/admin/KafkaAdminClientTest.java | 2 +- .../apache/kafka/clients/admin/MockAdminClient.java | 12 ++++++++++-- core/src/main/scala/kafka/server/KafkaApis.scala | 6 ------ .../kafka/server/ControllerMutationQuotaTest.scala | 2 +- 8 files changed, 23 insertions(+), 17 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index 7d5b7ea2f7eea..3d152d0f91957 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -252,10 +252,10 @@ default DeleteTopicsResult deleteTopics(Collection topics) { * This is a convenience method for {@link #deleteTopicsWithIds(Collection, DeleteTopicsOptions)} * with default options. See the overload for more details. *

- * This operation is supported by brokers with version x.x.x.x or higher. + * This operation is supported by brokers with version 2.8.0 or higher. * * @param topics The topic IDs for the topics to delete. - * @return The DeleteTopicsResult. + * @return The DeleteTopicsWithIdsResult. */ default DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection topics) { return deleteTopicsWithIds(topics, new DeleteTopicsOptions()); @@ -266,20 +266,20 @@ default DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection topics) { *

* This operation is not transactional so it may succeed for some topics while fail for others. *

- * It may take several seconds after the {@link DeleteTopicsResult} returns + * It may take several seconds after the {@link DeleteTopicsWithIdsResult} returns * success for all the brokers to become aware that the topics are gone. * During this time, {@link #listTopics()} and {@link #describeTopics(Collection)} * may continue to return information about the deleted topics. *

- * If delete.topic.enable is false on the brokers, deleteTopics will mark + * If delete.topic.enable is false on the brokers, deleteTopicsWithIds will mark * the topics for deletion, but not actually delete them. The futures will * return successfully in this case. *

- * This operation is supported by brokers with version x.x.x.x or higher. + * This operation is supported by brokers with version 2.8.0 or higher. * * @param topics The topic IDs for the topics to delete. * @param options The options to use when deleting the topics. - * @return The DeleteTopicsResult. + * @return The DeleteTopicsWithIdsResult. */ DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection topics, DeleteTopicsOptions options); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsWithIdsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsWithIdsResult.java index 76fc7e0362d84..eeb91194a96ff 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsWithIdsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsWithIdsResult.java @@ -25,7 +25,7 @@ import java.util.Map; /** - * The result of the {@link Admin#deleteTopics(Collection)} call. + * The result of the {@link Admin#deleteTopicsWithIds(Collection)} call. * * The API of this class is evolving, see {@link Admin} for details. */ diff --git a/clients/src/main/resources/common/message/CreateTopicsRequest.json b/clients/src/main/resources/common/message/CreateTopicsRequest.json index 4f94178380255..1a8d57a3f0bea 100644 --- a/clients/src/main/resources/common/message/CreateTopicsRequest.json +++ b/clients/src/main/resources/common/message/CreateTopicsRequest.json @@ -26,6 +26,8 @@ // // Version 6 is identical to version 5 but may return a THROTTLING_QUOTA_EXCEEDED error // in the response if the topics creation is throttled (KIP-599). + // + // Version 7 is the same as version 6. "validVersions": "0-7", "flexibleVersions": "5+", "fields": [ diff --git a/clients/src/main/resources/common/message/CreateTopicsResponse.json b/clients/src/main/resources/common/message/CreateTopicsResponse.json index a0d7a2d4704e8..0f87d9fb0278b 100644 --- a/clients/src/main/resources/common/message/CreateTopicsResponse.json +++ b/clients/src/main/resources/common/message/CreateTopicsResponse.json @@ -30,6 +30,8 @@ // // Version 6 is identical to version 5 but may return a THROTTLING_QUOTA_EXCEEDED error // in the response if the topics creation is throttled (KIP-599). + // + // Version 7 returns the topic ID of the newly created topic if creation is sucessful. "validVersions": "0-7", "flexibleVersions": "5+", "fields": [ diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index e391acae6ce43..e21f39b3f8bda 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -1134,7 +1134,7 @@ private MockClient.RequestMatcher expectDeleteTopicsRequestWithTopics(final Stri return body -> { if (body instanceof DeleteTopicsRequest) { DeleteTopicsRequest request = (DeleteTopicsRequest) body; - return request.data().topicNames().equals(Arrays.asList(topics)); + return request.topicNames().equals(Arrays.asList(topics)); } return false; }; diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 761a3443d9a6b..042c55d054ed5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -61,6 +61,7 @@ public class MockAdminClient extends AdminClient { private final List brokers; private final Map allTopics = new HashMap<>(); + private final Map topicIds = new HashMap<>(); private final Map topicNames = new HashMap<>(); private final Map reassignments = new HashMap<>(); @@ -209,6 +210,9 @@ synchronized public void addTopic(boolean internal, } } allTopics.put(name, new TopicMetadata(internal, partitions, logDirs, configs)); + Uuid id = Uuid.randomUuid(); + topicIds.put(name, id); + topicNames.put(id, name); } synchronized public void markTopicForDeletion(final String name) { @@ -300,7 +304,9 @@ synchronized public CreateTopicsResult createTopics(Collection newTopi logDirs.add(brokerLogDirs.get(partitions.get(i).leader().id()).get(0)); } allTopics.put(topicName, new TopicMetadata(false, partitions, logDirs, newTopic.configs())); - topicNames.put(Uuid.randomUuid(), topicName); + Uuid id = Uuid.randomUuid(); + topicIds.put(topicName, id); + topicNames.put(id, topicName); future.complete(null); createTopicResult.put(topicName, future); } @@ -396,6 +402,7 @@ synchronized public DeleteTopicsResult deleteTopics(Collection topicsToD if (allTopics.remove(topicName) == null) { future.completeExceptionally(new UnknownTopicOrPartitionException(String.format("Topic %s does not exist.", topicName))); } else { + topicNames.remove(topicIds.remove(topicName)); future.complete(null); } deleteTopicsResult.put(topicName, future); @@ -423,9 +430,10 @@ synchronized public DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection future = new KafkaFutureImpl<>(); String name = topicNames.remove(topicId); - if (name == null) { + if (allTopics.remove(name) == null) { future.completeExceptionally(new UnknownTopicOrPartitionException(String.format("Topic %s does not exist.", topicId))); } else { + topicIds.remove(name); allTopics.remove(name); future.complete(null); } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index a7c3b096bfd05..eba9b74b815a7 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2033,14 +2033,8 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseCallback(results) else { def handleDeleteTopicsResults(errors: Map[String, Errors]): Unit = { - //def find( name: String, id: Uuid) = { - // val _key = new DeleteTopicsResponseData.DeletableTopicResult - // _key.setName(name).setTopicId(id) - // results.find(_key) - //} errors.foreach { case (topicName, error) => - //val topic = find(topicName, controller.controllerContext.topicIds(topicName)) results.find(topicName) .setErrorCode(error.code) } diff --git a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala index 262bf58e2cb8b..7407a984b9029 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala @@ -176,7 +176,7 @@ class ControllerMutationQuotaTest extends BaseRequestTest { } @Test - def OtestPermissiveCreateTopicsRequest(): Unit = { + def testPermissiveCreateTopicsRequest(): Unit = { asPrincipal(ThrottledPrincipal) { // Create two topics worth of 30 partitions each. As we use a permissive quota, we // expect both topics to be created. From bd593cf7dbd2e230fbc8d2adea2dc6adc354b8fb Mon Sep 17 00:00:00 2001 From: Justine Date: Tue, 22 Dec 2020 11:29:50 -0800 Subject: [PATCH 04/10] Fixes to json files and handling in KafkaApis --- .../common/message/CreateTopicsResponse.json | 2 +- .../common/message/DeleteTopicsRequest.json | 6 +++--- .../common/message/DeleteTopicsResponse.json | 4 ++-- core/src/main/scala/kafka/server/KafkaApis.scala | 13 +++++-------- 4 files changed, 11 insertions(+), 14 deletions(-) diff --git a/clients/src/main/resources/common/message/CreateTopicsResponse.json b/clients/src/main/resources/common/message/CreateTopicsResponse.json index 0f87d9fb0278b..c1bf88285a150 100644 --- a/clients/src/main/resources/common/message/CreateTopicsResponse.json +++ b/clients/src/main/resources/common/message/CreateTopicsResponse.json @@ -41,7 +41,7 @@ "about": "Results for each topic we tried to create.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "about": "The topic name." }, - { "name": "TopicId", "type": "uuid", "versions": "7+", "ignorable": true, "about": "The unique topic ID"}, + { "name": "TopicId", "type": "uuid", "versions": "7+", "ignorable": true, "about": "The unique topic ID"}, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "1+", "nullableVersions": "0+", "ignorable": true, diff --git a/clients/src/main/resources/common/message/DeleteTopicsRequest.json b/clients/src/main/resources/common/message/DeleteTopicsRequest.json index 0aa911bdcf189..7e11554ad480c 100644 --- a/clients/src/main/resources/common/message/DeleteTopicsRequest.json +++ b/clients/src/main/resources/common/message/DeleteTopicsRequest.json @@ -28,10 +28,10 @@ "validVersions": "0-6", "flexibleVersions": "4+", "fields": [ - { "name": "Topics", "type": "[]DeleteTopicState", "versions": "6+", "about": "The name or topic ID of the topic", + { "name": "Topics", "type": "[]DeleteTopicState", "versions": "6+", "about": "The name or topic ID of the topic", "fields": [ - {"name": "Name", "type": "string", "versions": "6+", "nullableVersions": "6+", "about": "The topic name"}, - {"name": "TopicId", "type": "uuid", "versions": "6+", "about": "The unique topic ID"} + {"name": "Name", "type": "string", "versions": "6+", "nullableVersions": "6+", "about": "The topic name"}, + {"name": "TopicId", "type": "uuid", "versions": "6+", "about": "The unique topic ID"} ]}, { "name": "TopicNames", "type": "[]string", "versions": "0-5", "entityType": "topicName", "ignorable": true, "about": "The names of the topics to delete" }, diff --git a/clients/src/main/resources/common/message/DeleteTopicsResponse.json b/clients/src/main/resources/common/message/DeleteTopicsResponse.json index bc68765de12fd..985172018aa1c 100644 --- a/clients/src/main/resources/common/message/DeleteTopicsResponse.json +++ b/clients/src/main/resources/common/message/DeleteTopicsResponse.json @@ -36,9 +36,9 @@ "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Responses", "type": "[]DeletableTopicResult", "versions": "0+", "about": "The results for each topic we tried to delete.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0+", "nullableVersions": "6+", "mapKey": true, "entityType": "topicName", "about": "The topic name" }, - {"name": "TopicId", "type": "uuid", "versions": "6+", "ignorable": "true", "about": "the unique topic ID"}, + {"name": "TopicId", "type": "uuid", "versions": "6+", "ignorable": "true", "about": "the unique topic ID"}, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The deletion error, or 0 if the deletion succeeded." }, { "name": "ErrorMessage", "type": "string", "versions": "5+", "nullableVersions": "5+", "ignorable": true, "default": "null", diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index eba9b74b815a7..0d4da20482331 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2005,7 +2005,7 @@ class KafkaApis(val requestChannel: RequestChannel, } else { deleteTopicRequest.topics().forEach { topic => val name = if (topic.name() != null) topic.name() - else controller.controllerContext.topicNames.getOrElse(topic.topicId(), "") + else controller.controllerContext.topicNames.getOrElse(topic.topicId(), null) results.add(new DeletableTopicResult() .setName(name) .setTopicId(topic.topicId())) @@ -2014,19 +2014,16 @@ class KafkaApis(val requestChannel: RequestChannel, val authorizedTopics = filterByAuthorized(request.context, DELETE, TOPIC, results.asScala)(_.name) results.forEach { topic => - val foundTopicId = !topic.topicId().equals(Uuid.ZERO_UUID) && !topic.name().equals("") - val topicIdZero = topic.topicId().equals(Uuid.ZERO_UUID) - if (!foundTopicId && !topicIdZero) + val foundTopicId = !topic.topicId().equals(Uuid.ZERO_UUID) && topic.name() != null + val topicIdSpecified = !topic.topicId().equals(Uuid.ZERO_UUID) + if (!foundTopicId && topicIdSpecified) topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code) else if (!authorizedTopics.contains(topic.name)) topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) else if (!metadataCache.contains(topic.name)) topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) - else { - if (topicIdZero) - topic.setTopicId(controller.controllerContext.topicIds(topic.name())) + else toDelete += topic.name - } } // If no authorized topics return immediately if (toDelete.isEmpty) From 7f2ebfbd4a77d290526a680a7b6b6d8730926d4b Mon Sep 17 00:00:00 2001 From: Justine Date: Mon, 4 Jan 2021 18:19:38 -0800 Subject: [PATCH 05/10] Addressed comments, added handling for older IBPs --- .../kafka/clients/admin/KafkaAdminClient.java | 5 ++-- .../common/requests/DeleteTopicsRequest.java | 2 +- .../common/message/DeleteTopicsResponse.json | 4 ++- .../src/main/scala/kafka/api/ApiVersion.scala | 11 +++++++- .../main/scala/kafka/server/KafkaApis.scala | 18 ++++++------ .../main/scala/kafka/server/KafkaConfig.scala | 6 +++- .../kafka/api/BaseAdminIntegrationTest.scala | 28 +++---------------- .../api/PlaintextAdminIntegrationTest.scala | 19 +++++++++++++ .../server/DeleteTopicsRequestTest.scala | 26 +++++++++++++++-- .../unit/kafka/server/KafkaApisTest.scala | 22 +++++++++++---- 10 files changed, 93 insertions(+), 48 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 6e0fdbc12a1eb..3ef1ed3db063b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -68,6 +68,7 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnacceptableCredentialException; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnknownTopicIdException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -1633,7 +1634,7 @@ public DeleteTopicsWithIdsResult deleteTopicsWithIds(final Collection topi for (Uuid topicId : topicIds) { if (topicId.equals(Uuid.ZERO_UUID)) { KafkaFutureImpl future = new KafkaFutureImpl<>(); - future.completeExceptionally(new InvalidTopicException("The given topic ID '" + + future.completeExceptionally(new UnknownTopicIdException("The given topic ID '" + topicId + "' cannot be represented in a request.")); topicFutures.put(topicId, future); } else if (!topicFutures.containsKey(topicId)) { @@ -1750,7 +1751,7 @@ void handleResponse(AbstractResponse abstractResponse) { for (DeletableTopicResult result : response.data().responses()) { KafkaFutureImpl future = futures.get(result.topicId()); if (future == null) { - log.warn("Server response mentioned unknown topic {}", result.name()); + log.warn("Server response mentioned unknown topic ID {}", result.topicId()); } else { ApiError error = new ApiError(result.errorCode(), result.errorMessage()); if (error.isFailure()) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java index 9eb5d7173cb5e..c0b82a795cbfe 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java @@ -49,7 +49,7 @@ public DeleteTopicsRequest build(short version) { data.setTopics(groupByTopic(data.topicNames())); } else if (version >= 6) { for (DeleteTopicState topic : data.topics()) { - if (topic.name().equals("")) { + if (topic.name() != null && topic.name().equals("")) { topic.setName(null); } } diff --git a/clients/src/main/resources/common/message/DeleteTopicsResponse.json b/clients/src/main/resources/common/message/DeleteTopicsResponse.json index 985172018aa1c..ae96c1d01bc7a 100644 --- a/clients/src/main/resources/common/message/DeleteTopicsResponse.json +++ b/clients/src/main/resources/common/message/DeleteTopicsResponse.json @@ -28,7 +28,9 @@ // Version 5 adds ErrorMessage in the response and may return a THROTTLING_QUOTA_EXCEEDED error // in the response if the topics deletion is throttled (KIP-599). // - // Version 6 adds topic ID to responses and an UNKNOWN_TOPIC_ID error code may be returned. + // Version 6 adds topic ID to responses. An UNSUPPORTED_VERSION error code will be returned when attempting to + // delete using topic IDs when IBP > 2.8. UNKNOWN_TOPIC_ID error code will be returned when IBP is at least 2.8, but + // the topic ID was not found. "validVersions": "0-6", "flexibleVersions": "4+", "fields": [ diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala index 819e12c6ec9da..e30cc98bbd9d7 100644 --- a/core/src/main/scala/kafka/api/ApiVersion.scala +++ b/core/src/main/scala/kafka/api/ApiVersion.scala @@ -108,7 +108,9 @@ object ApiVersion { // Bup Fetch protocol for Raft protocol (KIP-595) KAFKA_2_7_IV1, // Introduced AlterIsr (KIP-497) - KAFKA_2_7_IV2 + KAFKA_2_7_IV2, + // Flexible versioning on ListOffsets, WriteTxnMarkers and OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516) + KAFKA_2_8_IV0, ) // Map keys are the union of the short and full versions @@ -424,6 +426,13 @@ case object KAFKA_2_7_IV2 extends DefaultApiVersion { val id: Int = 30 } +case object KAFKA_2_8_IV0 extends DefaultApiVersion { + val shortVersion: String = "2.8" + val subVersion = "IV0" + val recordVersion = RecordVersion.V2 + val id: Int = 31 +} + object ApiVersionValidator extends Validator { override def ensureValid(name: String, value: Any): Unit = { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 0d4da20482331..e201494aaf7b6 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1894,13 +1894,10 @@ class KafkaApis(val requestChannel: RequestChannel, .setNumPartitions(-1) .setReplicationFactor(-1) .setTopicConfigErrorCode(Errors.NONE.code) + } else { + result.setTopicId(controller.controllerContext.topicIds.getOrElse(result.name(), Uuid.ZERO_UUID)) } } - val topicIds = zkClient.getTopicIdsForTopics(results.asScala.map(result => result.name()).toSet) - topicIds.foreach { case (name, id) => - val result = results.find(name) - result.setTopicId(id) - } sendResponseCallback(results) } adminManager.createTopics( @@ -2004,7 +2001,7 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseCallback(results) } else { deleteTopicRequest.topics().forEach { topic => - val name = if (topic.name() != null) topic.name() + val name = if (topic.topicId().equals(Uuid.ZERO_UUID)) topic.name() else controller.controllerContext.topicNames.getOrElse(topic.topicId(), null) results.add(new DeletableTopicResult() .setName(name) @@ -2016,9 +2013,12 @@ class KafkaApis(val requestChannel: RequestChannel, results.forEach { topic => val foundTopicId = !topic.topicId().equals(Uuid.ZERO_UUID) && topic.name() != null val topicIdSpecified = !topic.topicId().equals(Uuid.ZERO_UUID) - if (!foundTopicId && topicIdSpecified) - topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code) - else if (!authorizedTopics.contains(topic.name)) + if (!foundTopicId && topicIdSpecified) { + if (config.usesTopicId) + topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code) + else + topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code) + } else if (!authorizedTopics.contains(topic.name)) topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) else if (!metadataCache.contains(topic.name)) topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 93b840576e09a..592457dd3c6a2 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -20,7 +20,7 @@ package kafka.server import java.util import java.util.{Collections, Locale, Properties} -import kafka.api.{ApiVersion, ApiVersionValidator, KAFKA_0_10_0_IV1, KAFKA_2_1_IV0, KAFKA_2_7_IV0} +import kafka.api.{ApiVersion, ApiVersionValidator, KAFKA_0_10_0_IV1, KAFKA_2_1_IV0, KAFKA_2_7_IV0, KAFKA_2_8_IV0} import kafka.cluster.EndPoint import kafka.coordinator.group.OffsetConfig import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager} @@ -1767,6 +1767,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO } } + def usesTopicId: Boolean = { + interBrokerProtocolVersion >= KAFKA_2_8_IV0 + } + validateValues() private def validateValues(): Unit = { diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala index 161477f6f6e0d..a13e055daff29 100644 --- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala @@ -96,12 +96,10 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg waitForTopics(client, topics, List()) validateMetadataAndConfigs(createResult) val topicIds = zkClient.getTopicIdsForTopics(topics.toSet) - assertNotEquals(Uuid.ZERO_UUID, createResult.topicId("mytopic").get()) - assertNotEquals(Uuid.ZERO_UUID, createResult.topicId("mytopic2").get()) - assertNotEquals(Uuid.ZERO_UUID, createResult.topicId("mytopic3").get()) - assertEquals(topicIds("mytopic"), createResult.topicId("mytopic").get()) - assertEquals(topicIds("mytopic2"), createResult.topicId("mytopic2").get()) - assertEquals(topicIds("mytopic3"), createResult.topicId("mytopic3").get()) + topics.foreach { topic => + assertNotEquals(Uuid.ZERO_UUID, createResult.topicId(topic).get()) + assertEquals(topicIds(topic), createResult.topicId(topic).get()) + } val failedCreateResult = client.createTopics(newTopics.asJava) @@ -161,24 +159,6 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg client.deleteTopics(topics.asJava).all.get() waitForTopics(client, List(), topics) } - - @Test - def testDeleteTopicsWithIds(): Unit = { - client = Admin.create(createConfig) - val topics = Seq("mytopic", "mytopic2", "mytopic3") - val newTopics = Seq( - new NewTopic("mytopic", Map((0: Integer) -> Seq[Integer](1, 2).asJava, (1: Integer) -> Seq[Integer](2, 0).asJava).asJava), - new NewTopic("mytopic2", 3, 3.toShort), - new NewTopic("mytopic3", Option.empty[Integer].asJava, Option.empty[java.lang.Short].asJava) - ) - val createResult = client.createTopics(newTopics.asJava) - createResult.all.get() - waitForTopics(client, topics, List()) - val topicIds = zkClient.getTopicIdsForTopics(topics.toSet).values.toSet - - client.deleteTopicsWithIds(topicIds.asJava).all.get() - waitForTopics(client, List(), topics) - } @Test def testAuthorizedOperations(): Unit = { diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index c0e0de9fe8a4b..3339f2559fd4c 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory import scala.annotation.nowarn import scala.jdk.CollectionConverters._ import scala.collection.Seq +import scala.compat.java8.OptionConverters._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} import scala.util.Random @@ -117,6 +118,24 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertTrue(e.getCause.isInstanceOf[TopicExistsException]) } + @Test + def testDeleteTopicsWithIds(): Unit = { + client = Admin.create(createConfig) + val topics = Seq("mytopic", "mytopic2", "mytopic3") + val newTopics = Seq( + new NewTopic("mytopic", Map((0: Integer) -> Seq[Integer](1, 2).asJava, (1: Integer) -> Seq[Integer](2, 0).asJava).asJava), + new NewTopic("mytopic2", 3, 3.toShort), + new NewTopic("mytopic3", Option.empty[Integer].asJava, Option.empty[java.lang.Short].asJava) + ) + val createResult = client.createTopics(newTopics.asJava) + createResult.all.get() + waitForTopics(client, topics, List()) + val topicIds = zkClient.getTopicIdsForTopics(topics.toSet).values.toSet + + client.deleteTopicsWithIds(topicIds.asJava).all.get() + waitForTopics(client, List(), topics) + } + @Test def testMetadataRefresh(): Unit = { client = Admin.create(createConfig) diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala index eb80bdc1b99a7..941dd491f8666 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala @@ -17,8 +17,9 @@ package kafka.server -import java.util.{Arrays, Collections} +import java.util.{Arrays, Collections, Properties} +import kafka.api.KAFKA_2_7_IV0 import kafka.network.SocketServer import kafka.utils._ import org.apache.kafka.common.Uuid @@ -56,8 +57,8 @@ class DeleteTopicsRequestTest extends BaseRequestTest { val ids = zkClient.getTopicIdsForTopics(Set("topic-7", "topic-6")) validateValidDeleteTopicRequestsWithIds(new DeleteTopicsRequest.Builder( new DeleteTopicsRequestData() - .setTopics(Arrays.asList(new DeleteTopicState().setTopicId(ids("topic-7")) - //new DeleteTopicState().setTopicId(ids("topic-6")) + .setTopics(Arrays.asList(new DeleteTopicState().setTopicId(ids("topic-7")), + new DeleteTopicState().setTopicId(ids("topic-6")) ) ).setTimeoutMs(timeout)).build()) } @@ -132,6 +133,25 @@ class DeleteTopicsRequestTest extends BaseRequestTest { validateTopicIsDeleted(timeoutTopic) } + @Test + def testErrorDeleteTopicRequestsOnOldVersions(): Unit = { + val timeout = 30000 + val props = new Properties + props.put(KafkaConfig.InterBrokerProtocolVersionProp, KAFKA_2_7_IV0.toString) + brokerPropertyOverrides(props) + + createTopic("topic-id-1", 1, 1) + val invalidId = Uuid.randomUuid + validateErrorDeleteTopicRequestsWithIds(new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopics(Arrays.asList(new DeleteTopicState().setTopicId(invalidId))) + .setTimeoutMs(timeout)).build(), + Map( + invalidId -> Errors.UNKNOWN_TOPIC_ID, + ) + ) + } + private def validateErrorDeleteTopicRequests(request: DeleteTopicsRequest, expectedResponse: Map[String, Errors]): Unit = { val response = sendDeleteTopicsRequest(request) val errors = response.data.responses diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 7a13ede72c1f0..d0ba8d5c174d2 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -26,7 +26,7 @@ import java.util.{Collections, Optional, Random} import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1, LeaderAndIsr} import kafka.cluster.Partition -import kafka.controller.KafkaController +import kafka.controller.{ControllerContext, KafkaController} import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback} import kafka.coordinator.group.{GroupCoordinator, GroupOverview, GroupSummary, JoinGroupResult, MemberSummary, SyncGroupResult} import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator} @@ -697,6 +697,7 @@ class KafkaApisTest { val authorizedTopic = "authorized-topic" val unauthorizedTopic = "unauthorized-topic" + val authorizedTopicId = Uuid.randomUuid() authorizeResource(authorizer, AclOperation.CREATE, ResourceType.CLUSTER, Resource.CLUSTER_NAME, AuthorizationResult.DENIED, logIfDenied = false) @@ -744,9 +745,12 @@ class KafkaApisTest { anyObject(), EasyMock.eq(UnboundedControllerMutationQuota), EasyMock.capture(capturedCallback))) - - EasyMock.expect(zkClient.getTopicIdsForTopics(Set(authorizedTopic, unauthorizedTopic))).andReturn( - Map(authorizedTopic -> Uuid.ZERO_UUID, unauthorizedTopic -> Uuid.ZERO_UUID)) + + val controllerContext = new ControllerContext + controllerContext.allTopics.add(authorizedTopic) + controllerContext.addTopicId(authorizedTopic, authorizedTopicId) + + EasyMock.expect(controller.controllerContext).andStubReturn(controllerContext) EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager, requestChannel, authorizer, adminManager, controller, zkClient) @@ -757,7 +761,8 @@ class KafkaApisTest { verifyCreateTopicsResult(createTopicsRequest, capturedResponse, Map(authorizedTopic -> Errors.NONE, - unauthorizedTopic -> Errors.TOPIC_AUTHORIZATION_FAILED)) + unauthorizedTopic -> Errors.TOPIC_AUTHORIZATION_FAILED), Map(authorizedTopic -> authorizedTopicId, + unauthorizedTopic -> Uuid.ZERO_UUID)) verify(authorizer, adminManager, clientControllerQuotaManager) } @@ -813,14 +818,19 @@ class KafkaApisTest { private def verifyCreateTopicsResult(createTopicsRequest: CreateTopicsRequest, capturedResponse: Capture[RequestChannel.Response], - expectedResults: Map[String, Errors]): Unit = { + expectedResults: Map[String, Errors], + expectedIds: Map[String, Uuid]): Unit = { val response = readResponse(createTopicsRequest, capturedResponse) .asInstanceOf[CreateTopicsResponse] val responseMap = response.data.topics().asScala.map { topicResponse => topicResponse.name() -> Errors.forCode(topicResponse.errorCode) }.toMap + val responseIdMap = response.data.topics().asScala.map { topicResponse => + topicResponse.name() -> topicResponse.topicId() + }.toMap assertEquals(expectedResults, responseMap) + assertEquals(expectedIds, responseIdMap) } @Test From f1a56cc363a9dec14789332acec891c57ce020d6 Mon Sep 17 00:00:00 2001 From: Justine Date: Tue, 5 Jan 2021 16:42:10 -0800 Subject: [PATCH 06/10] Added to a simple createtopics request test --- .../scala/unit/kafka/server/CreateTopicsRequestTest.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala index 245f13f9aca85..3957335f61818 100644 --- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala @@ -18,6 +18,7 @@ package kafka.server import kafka.utils._ +import org.apache.kafka.common.Uuid import org.apache.kafka.common.message.CreateTopicsRequestData import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection import org.apache.kafka.common.protocol.ApiKeys @@ -170,6 +171,11 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest { assertEquals(-1, topicResponse.replicationFactor) assertTrue(topicResponse.configs.isEmpty) } + + if (version >= 7) + assertNotEquals(Uuid.ZERO_UUID, topicResponse.topicId()) + else + assertEquals(Uuid.ZERO_UUID, topicResponse.topicId()) } } } From 0dfd4ee84a3e76ce5153816c9e875bcaccac2929 Mon Sep 17 00:00:00 2001 From: Justine Date: Wed, 6 Jan 2021 10:40:51 -0800 Subject: [PATCH 07/10] Removed incorrect test and added error message to clarify unsupported version exception --- .../main/scala/kafka/server/KafkaApis.scala | 4 +++- .../server/DeleteTopicsRequestTest.scala | 22 +------------------ 2 files changed, 4 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index aabf9fbe2bd30..e5e005d00b444 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1995,8 +1995,10 @@ class KafkaApis(val requestChannel: RequestChannel, if (!foundTopicId && topicIdSpecified) { if (config.usesTopicId) topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code) - else + else { topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code) + topic.setErrorMessage("Topic IDs are not supported on the server.") + } } else if (!authorizedTopics.contains(topic.name)) topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) else if (!metadataCache.contains(topic.name)) diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala index 941dd491f8666..755f90349e2ca 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala @@ -17,9 +17,8 @@ package kafka.server -import java.util.{Arrays, Collections, Properties} +import java.util.{Arrays, Collections} -import kafka.api.KAFKA_2_7_IV0 import kafka.network.SocketServer import kafka.utils._ import org.apache.kafka.common.Uuid @@ -133,25 +132,6 @@ class DeleteTopicsRequestTest extends BaseRequestTest { validateTopicIsDeleted(timeoutTopic) } - @Test - def testErrorDeleteTopicRequestsOnOldVersions(): Unit = { - val timeout = 30000 - val props = new Properties - props.put(KafkaConfig.InterBrokerProtocolVersionProp, KAFKA_2_7_IV0.toString) - brokerPropertyOverrides(props) - - createTopic("topic-id-1", 1, 1) - val invalidId = Uuid.randomUuid - validateErrorDeleteTopicRequestsWithIds(new DeleteTopicsRequest.Builder( - new DeleteTopicsRequestData() - .setTopics(Arrays.asList(new DeleteTopicState().setTopicId(invalidId))) - .setTimeoutMs(timeout)).build(), - Map( - invalidId -> Errors.UNKNOWN_TOPIC_ID, - ) - ) - } - private def validateErrorDeleteTopicRequests(request: DeleteTopicsRequest, expectedResponse: Map[String, Errors]): Unit = { val response = sendDeleteTopicsRequest(request) val errors = response.data.responses From 40175b371eca4e00b862d127786aece1e5fef02b Mon Sep 17 00:00:00 2001 From: Justine Date: Thu, 21 Jan 2021 14:16:58 -0800 Subject: [PATCH 08/10] Cleanups, added tests for older IBP version --- .../kafka/clients/admin/MockAdminClient.java | 3 +- .../src/main/scala/kafka/api/ApiVersion.scala | 1 - .../kafka/api/BaseAdminIntegrationTest.scala | 2 +- .../api/PlaintextAdminIntegrationTest.scala | 2 +- .../api/SaslSslAdminIntegrationTest.scala | 2 +- .../integration/KafkaServerTestHarness.scala | 15 ++++- .../unit/kafka/server/BaseRequestTest.scala | 2 +- .../server/DeleteTopicsRequestTest.scala | 8 +-- ...opicIdWithOldInterBrokerProtocolTest.scala | 58 +++++++++++++++++-- 9 files changed, 77 insertions(+), 16 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 042c55d054ed5..767ee57333baf 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -430,11 +430,10 @@ synchronized public DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection future = new KafkaFutureImpl<>(); String name = topicNames.remove(topicId); - if (allTopics.remove(name) == null) { + if (name == null || allTopics.remove(name) == null) { future.completeExceptionally(new UnknownTopicOrPartitionException(String.format("Topic %s does not exist.", topicId))); } else { topicIds.remove(name); - allTopics.remove(name); future.complete(null); } deleteTopicsWithIdsResult.put(topicId, future); diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala index 12f79031ce171..e89f9fb46a555 100644 --- a/core/src/main/scala/kafka/api/ApiVersion.scala +++ b/core/src/main/scala/kafka/api/ApiVersion.scala @@ -454,7 +454,6 @@ case object KAFKA_2_8_IV1 extends DefaultApiVersion { val id: Int = 32 } - object ApiVersionValidator extends Validator { override def ensureValid(name: String, value: Any): Unit = { diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala index 17a829de6a86d..b9aa2088eac66 100644 --- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala @@ -91,7 +91,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg createResult.all.get() waitForTopics(client, topics, List()) validateMetadataAndConfigs(createResult) - val topicIds = zkClient.getTopicIdsForTopics(topics.toSet) + val topicIds = getTopicIds() topics.foreach { topic => assertNotEquals(Uuid.ZERO_UUID, createResult.topicId(topic).get()) assertEquals(topicIds(topic), createResult.topicId(topic).get()) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 565d42f87d121..87cb25725d409 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -128,7 +128,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val createResult = client.createTopics(newTopics.asJava) createResult.all.get() waitForTopics(client, topics, List()) - val topicIds = zkClient.getTopicIdsForTopics(topics.toSet).values.toSet + val topicIds = getTopicIds().values.toSet client.deleteTopicsWithIds(topicIds.asJava).all.get() waitForTopics(client, List(), topics) diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala index e7ef639f12c50..1f15563b012ea 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala @@ -423,7 +423,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu createResult.all.get() waitForTopics(client, topics, List()) validateMetadataAndConfigs(createResult) - val topicIds = zkClient.getTopicIdsForTopics(topics.toSet) + val topicIds = getTopicIds() assertNotEquals(Uuid.ZERO_UUID, createResult.topicId(topic1).get()) assertEquals(topicIds(topic1), createResult.topicId(topic1).get()) assertFutureExceptionTypeEquals(createResult.topicId(topic2), classOf[TopicAuthorizationException]) diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 8eae07e69e56b..bc6717df2cf1e 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -30,7 +30,7 @@ import scala.collection.Seq import scala.collection.mutable.{ArrayBuffer, Buffer} import java.util.Properties -import org.apache.kafka.common.KafkaException +import org.apache.kafka.common.{KafkaException, Uuid} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.scram.ScramCredential import org.apache.kafka.common.utils.Time @@ -168,4 +168,17 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness { } } + def getController(): KafkaServer = { + val controllerId = TestUtils.waitUntilControllerElected(zkClient) + servers.filter(s => s.config.brokerId == controllerId).head + } + + def getTopicIds(): Map[String, Uuid] = { + getController().kafkaController.controllerContext.topicIds.toMap + } + + def getTopicNames(): Map[Uuid, String] = { + getController().kafkaController.controllerContext.topicNames.toMap + } + } diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala index 13cd74e5a3b92..4b6012a70fe41 100644 --- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala @@ -21,6 +21,7 @@ import java.io.{DataInputStream, DataOutputStream} import java.net.Socket import java.nio.ByteBuffer import java.util.Properties + import kafka.api.IntegrationTestHarness import kafka.network.SocketServer import kafka.utils.NotNothing @@ -148,5 +149,4 @@ abstract class BaseRequestTest extends IntegrationTestHarness { } new RequestHeader(apiKey, apiVersion, clientId, correlationId) } - } diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala index 943e2698d66a5..a17612170d7a0 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala @@ -53,7 +53,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest { // Topic Ids createTopic("topic-7", 3, 2) createTopic("topic-6", 1, 2) - val ids = zkClient.getTopicIdsForTopics(Set("topic-7", "topic-6")) + val ids = getTopicIds() validateValidDeleteTopicRequestsWithIds(new DeleteTopicsRequest.Builder( new DeleteTopicsRequestData() .setTopics(Arrays.asList(new DeleteTopicState().setTopicId(ids("topic-7")), @@ -75,7 +75,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest { val response = sendDeleteTopicsRequest(request) val error = response.errorCounts.asScala.find(_._1 != Errors.NONE) assertTrue(error.isEmpty, s"There should be no errors, found ${response.data.responses.asScala}") - response.data().responses().forEach { response => + response.data.responses.forEach { response => validateTopicIsDeleted(response.name()) } } @@ -106,7 +106,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest { // Topic IDs createTopic("topic-id-1", 1, 1) - val validId = zkClient.getTopicIdsForTopics(Set("topic-id-1"))("topic-id-1") + val validId = getTopicIds()("topic-id-1") val invalidId = Uuid.randomUuid validateErrorDeleteTopicRequestsWithIds(new DeleteTopicsRequest.Builder( new DeleteTopicsRequestData() @@ -174,7 +174,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest { .setTimeoutMs(1000)).build() val response = sendDeleteTopicsRequest(request, notControllerSocketServer) - val error = response.data.responses().find("not-controller").errorCode() + val error = response.data.responses.find("not-controller").errorCode() assertEquals(Errors.NOT_CONTROLLER.code, error, "Expected controller error when routed incorrectly") } diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestIBPTest/TopicIdWithOldInterBrokerProtocolTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestIBPTest/TopicIdWithOldInterBrokerProtocolTest.scala index 9c56818635fe9..5d5996a509c50 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataRequestIBPTest/TopicIdWithOldInterBrokerProtocolTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataRequestIBPTest/TopicIdWithOldInterBrokerProtocolTest.scala @@ -17,15 +17,17 @@ package kafka.server -import java.util.Properties +import java.util.{Arrays, Properties} import kafka.api.KAFKA_2_7_IV0 import kafka.network.SocketServer - +import kafka.utils.TestUtils import org.apache.kafka.common.Uuid +import org.apache.kafka.common.message.DeleteTopicsRequestData +import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse} -import org.junit.jupiter.api.Assertions.assertEquals; +import org.apache.kafka.common.requests.{DeleteTopicsRequest, DeleteTopicsResponse, MetadataRequest, MetadataResponse} +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{BeforeEach, Test} import scala.jdk.CollectionConverters._ @@ -59,8 +61,56 @@ class TopicIdWithOldInterBrokerProtocolTest extends BaseRequestTest { } } + @Test + def testDeleteTopicsWithOldIBP(): Unit = { + val timeout = 10000 + createTopic("topic-3", 5, 2) + createTopic("topic-4", 1, 2) + val request = new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopicNames(Arrays.asList("topic-3", "topic-4")) + .setTimeoutMs(timeout)).build() + val resp = sendDeleteTopicsRequest(request) + val error = resp.errorCounts.asScala.find(_._1 != Errors.NONE) + assertTrue(error.isEmpty, s"There should be no errors, found ${resp.data.responses.asScala}") + request.data.topicNames.forEach { topic => + validateTopicIsDeleted(topic) + } + resp.data.responses.forEach { response => + assertEquals(Uuid.ZERO_UUID, response.topicId()) + } + } + + @Test + def testDeleteTopicsWithOldIBPUsingIDs(): Unit = { + val timeout = 10000 + createTopic("topic-7", 3, 2) + createTopic("topic-6", 1, 2) + val ids = Map("topic-7" -> Uuid.randomUuid(), "topic-6" -> Uuid.randomUuid()) + val request = new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopics(Arrays.asList(new DeleteTopicState().setTopicId(ids("topic-7")), + new DeleteTopicState().setTopicId(ids("topic-6")) + ) + ).setTimeoutMs(timeout)).build() + val response = sendDeleteTopicsRequest(request) + val error = response.errorCounts.asScala + assertEquals(2, error(Errors.UNSUPPORTED_VERSION)) + } + private def sendMetadataRequest(request: MetadataRequest, destination: Option[SocketServer]): MetadataResponse = { connectAndReceive[MetadataResponse](request, destination = destination.getOrElse(anySocketServer)) } + private def sendDeleteTopicsRequest(request: DeleteTopicsRequest, socketServer: SocketServer = controllerSocketServer): DeleteTopicsResponse = { + connectAndReceive[DeleteTopicsResponse](request, destination = socketServer) + } + + private def validateTopicIsDeleted(topic: String): Unit = { + val metadata = connectAndReceive[MetadataResponse](new MetadataRequest.Builder( + List(topic).asJava, true).build).topicMetadata.asScala + TestUtils.waitUntilTrue (() => !metadata.exists(p => p.topic.equals(topic) && p.error == Errors.NONE), + s"The topic $topic should not exist") + } + } From d4754e27edb1f3c3969dcbcf28feb6b1eb5b5bfc Mon Sep 17 00:00:00 2001 From: Justine Date: Wed, 27 Jan 2021 18:45:20 -0800 Subject: [PATCH 09/10] Addressed comments--moved topic ID in create to ZkAdminManager, updated authorization logic --- .../kafka/clients/admin/KafkaAdminClient.java | 12 +- .../common/requests/DeleteTopicsRequest.java | 2 +- .../common/message/DeleteTopicsResponse.json | 2 +- .../clients/admin/KafkaAdminClientTest.java | 12 +- .../main/scala/kafka/server/KafkaApis.scala | 32 ++--- .../scala/kafka/server/ZkAdminManager.scala | 9 ++ .../kafka/api/AuthorizerIntegrationTest.scala | 125 +++++++++++++++--- .../unit/kafka/server/BaseRequestTest.scala | 2 +- .../unit/kafka/server/KafkaApisTest.scala | 21 +-- ...opicIdWithOldInterBrokerProtocolTest.scala | 8 +- 10 files changed, 156 insertions(+), 69 deletions(-) rename core/src/test/scala/unit/kafka/server/{MetadataRequestIBPTest => }/TopicIdWithOldInterBrokerProtocolTest.scala (97%) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index a737ddb5e0ca4..db9400bc49f70 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -1630,7 +1630,7 @@ public DeleteTopicsResult deleteTopics(final Collection topicNames, @Override public DeleteTopicsWithIdsResult deleteTopicsWithIds(final Collection topicIds, - final DeleteTopicsOptions options) { + final DeleteTopicsOptions options) { final Map> topicFutures = new HashMap<>(topicIds.size()); final List validTopicIds = new ArrayList<>(topicIds.size()); for (Uuid topicId : topicIds) { @@ -1727,11 +1727,11 @@ void handleFailure(Throwable throwable) { } private Call getDeleteTopicsWithIdsCall(final DeleteTopicsOptions options, - final Map> futures, - final List topicIds, - final Map quotaExceededExceptions, - final long now, - final long deadline) { + final Map> futures, + final List topicIds, + final Map quotaExceededExceptions, + final long now, + final long deadline) { return new Call("deleteTopics", deadline, new ControllerNodeProvider()) { @Override DeleteTopicsRequest.Builder createRequest(int timeoutMs) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java index 646edd4155748..995526d369b93 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java @@ -42,7 +42,7 @@ public Builder(DeleteTopicsRequestData data) { @Override public DeleteTopicsRequest build(short version) { - if (version >= 6 && data.topicNames().size() != 0) { + if (version >= 6 && !data.topicNames().isEmpty()) { data.setTopics(groupByTopic(data.topicNames())); } else if (version >= 6) { for (DeleteTopicState topic : data.topics()) { diff --git a/clients/src/main/resources/common/message/DeleteTopicsResponse.json b/clients/src/main/resources/common/message/DeleteTopicsResponse.json index ae96c1d01bc7a..cb531bb13f761 100644 --- a/clients/src/main/resources/common/message/DeleteTopicsResponse.json +++ b/clients/src/main/resources/common/message/DeleteTopicsResponse.json @@ -29,7 +29,7 @@ // in the response if the topics deletion is throttled (KIP-599). // // Version 6 adds topic ID to responses. An UNSUPPORTED_VERSION error code will be returned when attempting to - // delete using topic IDs when IBP > 2.8. UNKNOWN_TOPIC_ID error code will be returned when IBP is at least 2.8, but + // delete using topic IDs when IBP < 2.8. UNKNOWN_TOPIC_ID error code will be returned when IBP is at least 2.8, but // the topic ID was not found. "validVersions": "0-6", "flexibleVersions": "4+", diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index de26bf8115a42..ed62f431e64d2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -984,7 +984,7 @@ public void testDeleteTopicsRetryThrottlingExceptionWhenEnabled() throws Excepti prepareDeleteTopicsResponse(1000, deletableTopicResultWithId(topicId1, Errors.NONE), deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED), - deletableTopicResultWithId(topicId3, Errors.TOPIC_ALREADY_EXISTS))); + deletableTopicResultWithId(topicId3, Errors.UNKNOWN_TOPIC_ID))); env.kafkaClient().prepareResponse( expectDeleteTopicsRequestWithTopicIds(topicId2), @@ -1002,7 +1002,7 @@ public void testDeleteTopicsRetryThrottlingExceptionWhenEnabled() throws Excepti assertNull(resultIds.values().get(topicId1).get()); assertNull(resultIds.values().get(topicId2).get()); - TestUtils.assertFutureThrows(resultIds.values().get(topicId3), TopicExistsException.class); + TestUtils.assertFutureThrows(resultIds.values().get(topicId3), UnknownTopicIdException.class); } } @@ -1058,7 +1058,7 @@ public void testDeleteTopicsRetryThrottlingExceptionWhenEnabledUntilRequestTimeO prepareDeleteTopicsResponse(1000, deletableTopicResultWithId(topicId1, Errors.NONE), deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED), - deletableTopicResultWithId(topicId3, Errors.TOPIC_ALREADY_EXISTS))); + deletableTopicResultWithId(topicId3, Errors.UNKNOWN_TOPIC_ID))); env.kafkaClient().prepareResponse( expectDeleteTopicsRequestWithTopicIds(topicId2), @@ -1084,7 +1084,7 @@ public void testDeleteTopicsRetryThrottlingExceptionWhenEnabledUntilRequestTimeO e = TestUtils.assertFutureThrows(resultIds.values().get(topicId2), ThrottlingQuotaExceededException.class); assertEquals(0, e.throttleTimeMs()); - TestUtils.assertFutureThrows(resultIds.values().get(topicId3), TopicExistsException.class); + TestUtils.assertFutureThrows(resultIds.values().get(topicId3), UnknownTopicIdException.class); } } @@ -1119,7 +1119,7 @@ public void testDeleteTopicsDontRetryThrottlingExceptionWhenDisabled() throws Ex prepareDeleteTopicsResponse(1000, deletableTopicResultWithId(topicId1, Errors.NONE), deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED), - deletableTopicResultWithId(topicId3, Errors.TOPIC_ALREADY_EXISTS))); + deletableTopicResultWithId(topicId3, Errors.UNKNOWN_TOPIC_ID))); DeleteTopicsWithIdsResult resultIds = env.adminClient().deleteTopicsWithIds( asList(topicId1, topicId2, topicId3), @@ -1129,7 +1129,7 @@ public void testDeleteTopicsDontRetryThrottlingExceptionWhenDisabled() throws Ex e = TestUtils.assertFutureThrows(resultIds.values().get(topicId2), ThrottlingQuotaExceededException.class); assertEquals(1000, e.throttleTimeMs()); - TestUtils.assertFutureError(resultIds.values().get(topicId3), TopicExistsException.class); + TestUtils.assertFutureError(resultIds.values().get(topicId3), UnknownTopicIdException.class); } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 126aa0cb8867d..7e13711358ae1 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1843,8 +1843,6 @@ class KafkaApis(val requestChannel: RequestChannel, .setNumPartitions(-1) .setReplicationFactor(-1) .setTopicConfigErrorCode(Errors.NONE.code) - } else { - result.setTopicId(controller.controllerContext.topicIds.getOrElse(result.name(), Uuid.ZERO_UUID)) } } sendResponseCallback(results) @@ -1949,26 +1947,30 @@ class KafkaApis(val requestChannel: RequestChannel, } sendResponseCallback(results) } else { + val topicIdsFromRequest = deleteTopicRequest.topicIds().asScala.filter(topicId => topicId != Uuid.ZERO_UUID).toSet deleteTopicRequest.topics().forEach { topic => - val name = if (topic.topicId().equals(Uuid.ZERO_UUID)) topic.name() - else controller.controllerContext.topicNames.getOrElse(topic.topicId(), null) + if (topic.name() != null && topic.topicId() != Uuid.ZERO_UUID) + throw new InvalidRequestException("Topic name and topic ID can not both be specified.") + val name = if (topic.topicId() == Uuid.ZERO_UUID) topic.name() + else controller.controllerContext.topicNames.getOrElse(topic.topicId(), null) results.add(new DeletableTopicResult() .setName(name) .setTopicId(topic.topicId())) } - val authorizedTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC, - results.asScala)(_.name) + val authorizedDescribeTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, + results.asScala.filter(result => result.name() != null))(_.name) + val authorizedDeleteTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC, + results.asScala.filter(result => result.name() != null))(_.name) results.forEach { topic => - val foundTopicId = !topic.topicId().equals(Uuid.ZERO_UUID) && topic.name() != null - val topicIdSpecified = !topic.topicId().equals(Uuid.ZERO_UUID) - if (!foundTopicId && topicIdSpecified) { - if (config.usesTopicId) + val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) && topic.name() == null + if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) { + topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code) + topic.setErrorMessage("Topic IDs are not supported on the server.") + } else if (unresolvedTopicId) topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code) - else { - topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code) - topic.setErrorMessage("Topic IDs are not supported on the server.") - } - } else if (!authorizedTopics.contains(topic.name)) + else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name)) + topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code) + else if (!authorizedDeleteTopics.contains(topic.name)) topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) else if (!metadataCache.contains(topic.name)) topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala b/core/src/main/scala/kafka/server/ZkAdminManager.scala index fd4732f0dd50d..1fa70522872ba 100644 --- a/core/src/main/scala/kafka/server/ZkAdminManager.scala +++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala @@ -30,6 +30,7 @@ import kafka.utils.Implicits._ import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.admin.{AlterConfigOp, ScramMechanism} import org.apache.kafka.clients.admin.AlterConfigOp.OpType +import org.apache.kafka.common.Uuid import org.apache.kafka.common.config.ConfigDef.ConfigKey import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource, LogLevelConfig} import org.apache.kafka.common.errors.ThrottlingQuotaExceededException @@ -133,6 +134,13 @@ class ZkAdminManager(val config: KafkaConfig, } } + private def populateIds(metadataAndConfigs: Map[String, CreatableTopicResult], + topicName: String) : Unit = { + metadataAndConfigs.get(topicName).foreach { result => + result.setTopicId(zkClient.getTopicIdsForTopics(Predef.Set(result.name())).getOrElse(result.name(), Uuid.ZERO_UUID)) + } + } + /** * Create topics and wait until the topics have been completely created. * The callback function will be triggered either when timeout, error or the topics are created. @@ -195,6 +203,7 @@ class ZkAdminManager(val config: KafkaConfig, } else { controllerMutationQuota.record(assignments.size) adminZkClient.createTopicWithAssignment(topic.name, configs, assignments, validate = false, config.usesTopicId) + populateIds(includeConfigsAndMetadata, topic.name) CreatePartitionsMetadata(topic.name, assignments.keySet) } } catch { diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index edda7a9838628..1b48be3e23731 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -100,7 +100,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val brokerId: Integer = 0 val topic = "topic" - val topicId = Uuid.randomUuid() val topicPattern = "topic.*" val transactionalId = "transactional.id" val producerId = 83392L @@ -108,8 +107,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val correlationId = 0 val clientId = "client-Id" val tp = new TopicPartition(topic, part) - val topicIds = Collections.singletonMap(topic, topicId) - val topicNames = Collections.singletonMap(topicId, topic) val logDir = "logDir" val group = "my-group" val protocolType = "consumer" @@ -159,7 +156,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } @nowarn("cat=deprecation") - val requestKeyToError = Map[ApiKeys, Nothing => Errors]( + val requestKeyToError = (topicNames: Map[Uuid, String]) => Map[ApiKeys, Nothing => Errors]( ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2), ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses.asScala.find(_._1 == tp).get._2.error), ApiKeys.FETCH -> ((resp: requests.FetchResponse[Records]) => resp.responseData.asScala.find(_._1 == tp).get._2.error), @@ -185,7 +182,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error), ApiKeys.DELETE_GROUPS -> ((resp: DeleteGroupsResponse) => resp.get(group)), ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => Errors.forCode( - resp.topics.asScala.find(t => topicNames.get(t.topicId) == tp.topic).get.partitionErrors.asScala.find( + resp.topics.asScala.find(t => topicNames(t.topicId) == tp.topic).get.partitionErrors.asScala.find( p => p.partitionIndex == tp.partition).get.errorCode)), ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => Errors.forCode( resp.partitionErrors.asScala.find(pe => pe.topicName == tp.topic && pe.partitionIndex == tp.partition).get.errorCode)), @@ -235,6 +232,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest { }) ) + val requestKeysToErrorWithIds = (id: Uuid) => Map[ApiKeys, Nothing => Errors]( + ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => Errors.forCode(resp.data.responses.asScala.find(_.topicId == id).get.errorCode())) + ) + val requestKeysToAcls = Map[ApiKeys, Map[ResourcePattern, Set[AccessControlEntry]]]( ApiKeys.METADATA -> topicDescribeAcl, ApiKeys.PRODUCE -> (topicWriteAcl ++ transactionIdWriteAcl ++ clusterIdempotentWriteAcl), @@ -479,7 +480,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { .setZkVersion(2) .setReplicas(Seq(brokerId).asJava) .setIsNew(false)).asJava, - topicIds, + getTopicIds().asJava, Set(new Node(brokerId, "localhost", 0)).asJava).build() } @@ -518,6 +519,15 @@ class AuthorizerIntegrationTest extends BaseRequestTest { .setTimeoutMs(5000)).build() } + private def deleteTopicsWithIdsRequest(id: Uuid = getTopicIds()(topic)): DeleteTopicsRequest = { + new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopics(Collections.singletonList( + new DeleteTopicsRequestData.DeleteTopicState() + .setTopicId(id))) + .setTimeoutMs(5000)).build() + } + private def deleteRecordsRequest = new DeleteRecordsRequest.Builder( new DeleteRecordsRequestData() .setTimeoutMs(5000) @@ -628,12 +638,33 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ) ).build() + private def sendRequests(requestKeyToRequest: mutable.Map[ApiKeys, AbstractRequest], topicExists: Boolean = true) = { + for ((key, request) <- requestKeyToRequest) { + removeAllClientAcls() + val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet + sendRequestAndVerifyResponseError(request, resources, isAuthorized = false, topicExists = topicExists) + + val resourceToAcls = requestKeysToAcls(key) + resourceToAcls.get(topicResource).foreach { acls => + val describeAcls = topicDescribeAcl(topicResource) + val isAuthorized = describeAcls == acls + addAndVerifyAcls(describeAcls, topicResource) + sendRequestAndVerifyResponseError(request, resources, isAuthorized = isAuthorized, topicExists = topicExists) + removeAllClientAcls() + } + + for ((resource, acls) <- resourceToAcls) + addAndVerifyAcls(acls, resource) + sendRequestAndVerifyResponseError(request, resources, isAuthorized = true, topicExists = topicExists) + } + } + @Test def testAuthorizationWithTopicExisting(): Unit = { - val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( - // First create the topic - ApiKeys.CREATE_TOPICS -> createTopicsRequest, + //First create the topic so we have a valid topic ID + sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest)) + val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true), ApiKeys.PRODUCE -> createProduceRequest, ApiKeys.FETCH -> createFetchRequest, @@ -673,23 +704,32 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.DELETE_TOPICS -> deleteTopicsRequest ) - for ((key, request) <- requestKeyToRequest) { + sendRequests(requestKeyToRequest) + } + + @Test + def testAuthorizationDeleteTopicsIdWithTopicExisting(): Unit = { + sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest)) + + val id = getTopicIds()(topic) + + for ((key, request) <- mutable.Map(ApiKeys.DELETE_TOPICS -> deleteTopicsWithIdsRequest())) { removeAllClientAcls() val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet - sendRequestAndVerifyResponseError(request, resources, isAuthorized = false) + sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = false, topicExists = true, describeAuthorized = false, id = id) val resourceToAcls = requestKeysToAcls(key) resourceToAcls.get(topicResource).foreach { acls => val describeAcls = topicDescribeAcl(topicResource) val isAuthorized = describeAcls == acls addAndVerifyAcls(describeAcls, topicResource) - sendRequestAndVerifyResponseError(request, resources, isAuthorized = isAuthorized) + sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = isAuthorized, topicExists = true, describeAuthorized = true, id = id) removeAllClientAcls() } for ((resource, acls) <- resourceToAcls) addAndVerifyAcls(acls, resource) - sendRequestAndVerifyResponseError(request, resources, isAuthorized = true) + sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = true, topicExists = true, describeAuthorized = true, id = id) } } @@ -715,23 +755,33 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.ELECT_LEADERS -> electLeadersRequest ) + sendRequests(requestKeyToRequest, false) + } + + @Test + def testAuthorizationDeleteTopicsIdWithTopicNotExisting(): Unit = { + val id = Uuid.randomUuid() + val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( + ApiKeys.DELETE_TOPICS -> deleteTopicsWithIdsRequest(id), + ) + for ((key, request) <- requestKeyToRequest) { removeAllClientAcls() val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet - sendRequestAndVerifyResponseError(request, resources, isAuthorized = false, topicExists = false) + sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = false, topicExists = false, describeAuthorized = false, id = id) val resourceToAcls = requestKeysToAcls(key) resourceToAcls.get(topicResource).foreach { acls => val describeAcls = topicDescribeAcl(topicResource) val isAuthorized = describeAcls == acls addAndVerifyAcls(describeAcls, topicResource) - sendRequestAndVerifyResponseError(request, resources, isAuthorized = isAuthorized, topicExists = false) + sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = isAuthorized, topicExists = false, describeAuthorized = true, id = id) removeAllClientAcls() } for ((resource, acls) <- resourceToAcls) addAndVerifyAcls(acls, resource) - sendRequestAndVerifyResponseError(request, resources, isAuthorized = true, topicExists = false) + sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = true, topicExists = false, describeAuthorized = true, id = id) } } @@ -1903,13 +1953,52 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } } + private def sendRequestWithIdAndVerifyResponseError(request: AbstractRequest, + resources: Set[ResourceType], + isAuthorized: Boolean, + topicExists: Boolean, + describeAuthorized: Boolean, + id: Uuid): AbstractResponse = { + val apiKey = request.apiKey + val response = connectAndReceive[AbstractResponse](request) + val error = requestKeysToErrorWithIds(id)(apiKey).asInstanceOf[AbstractResponse => Errors](response) + + val authorizationErrors = resources.flatMap { resourceType => + if (resourceType == TOPIC) { + if (isAuthorized) + Set(Errors.UNKNOWN_TOPIC_ID, AclEntry.authorizationError(ResourceType.TOPIC)) + else if (describeAuthorized) + Set(AclEntry.authorizationError(ResourceType.TOPIC)) + else + Set(Errors.UNKNOWN_TOPIC_ID) + } else { + Set(AclEntry.authorizationError(resourceType)) + } + } + + if (topicExists) + if (isAuthorized) + assertFalse(authorizationErrors.contains(error), s"$apiKey should be allowed. Found unexpected authorization error $error") + else + assertTrue(authorizationErrors.contains(error), s"$apiKey should be forbidden. Found error $error but expected one of $authorizationErrors") + else if (resources == Set(TOPIC)) + if (isAuthorized) + assertEquals(Errors.UNKNOWN_TOPIC_ID, error, s"$apiKey had an unexpected error") + else { + assertEquals(Errors.UNKNOWN_TOPIC_ID, error, s"$apiKey had an unexpected error") + } + + response + } + private def sendRequestAndVerifyResponseError(request: AbstractRequest, resources: Set[ResourceType], isAuthorized: Boolean, - topicExists: Boolean = true): AbstractResponse = { + topicExists: Boolean = true, + topicNames: Map[Uuid, String] = getTopicNames()): AbstractResponse = { val apiKey = request.apiKey val response = connectAndReceive[AbstractResponse](request) - val error = requestKeyToError(apiKey).asInstanceOf[AbstractResponse => Errors](response) + val error = requestKeyToError(topicNames)(apiKey).asInstanceOf[AbstractResponse => Errors](response) val authorizationErrors = resources.flatMap { resourceType => if (resourceType == TOPIC) { diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala index 4b6012a70fe41..13cd74e5a3b92 100644 --- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala @@ -21,7 +21,6 @@ import java.io.{DataInputStream, DataOutputStream} import java.net.Socket import java.nio.ByteBuffer import java.util.Properties - import kafka.api.IntegrationTestHarness import kafka.network.SocketServer import kafka.utils.NotNothing @@ -149,4 +148,5 @@ abstract class BaseRequestTest extends IntegrationTestHarness { } new RequestHeader(apiKey, apiVersion, clientId, correlationId) } + } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index a52461c6161f7..22a97cee10f79 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -26,7 +26,7 @@ import java.util.{Collections, Optional, Properties, Random} import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1, LeaderAndIsr} import kafka.cluster.{Broker, Partition} -import kafka.controller.{ControllerContext, KafkaController} +import kafka.controller.KafkaController import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback} import kafka.coordinator.group._ import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator} @@ -669,7 +669,6 @@ class KafkaApisTest { val authorizedTopic = "authorized-topic" val unauthorizedTopic = "unauthorized-topic" - val authorizedTopicId = Uuid.randomUuid() authorizeResource(authorizer, AclOperation.CREATE, ResourceType.CLUSTER, Resource.CLUSTER_NAME, AuthorizationResult.DENIED, logIfDenied = false) @@ -718,14 +717,8 @@ class KafkaApisTest { EasyMock.eq(UnboundedControllerMutationQuota), EasyMock.capture(capturedCallback))) - val controllerContext = new ControllerContext - controllerContext.allTopics.add(authorizedTopic) - controllerContext.addTopicId(authorizedTopic, authorizedTopicId) - - EasyMock.expect(controller.controllerContext).andStubReturn(controllerContext) - EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager, - requestChannel, authorizer, adminManager, controller, zkClient) + requestChannel, authorizer, adminManager, controller) createKafkaApis(authorizer = Some(authorizer)).handleCreateTopicsRequest(request) @@ -733,8 +726,7 @@ class KafkaApisTest { verifyCreateTopicsResult(createTopicsRequest, capturedResponse, Map(authorizedTopic -> Errors.NONE, - unauthorizedTopic -> Errors.TOPIC_AUTHORIZATION_FAILED), Map(authorizedTopic -> authorizedTopicId, - unauthorizedTopic -> Uuid.ZERO_UUID)) + unauthorizedTopic -> Errors.TOPIC_AUTHORIZATION_FAILED)) verify(authorizer, adminManager, clientControllerQuotaManager) } @@ -790,19 +782,14 @@ class KafkaApisTest { private def verifyCreateTopicsResult(createTopicsRequest: CreateTopicsRequest, capturedResponse: Capture[RequestChannel.Response], - expectedResults: Map[String, Errors], - expectedIds: Map[String, Uuid]): Unit = { + expectedResults: Map[String, Errors]): Unit = { val response = readResponse(createTopicsRequest, capturedResponse) .asInstanceOf[CreateTopicsResponse] val responseMap = response.data.topics().asScala.map { topicResponse => topicResponse.name() -> Errors.forCode(topicResponse.errorCode) }.toMap - val responseIdMap = response.data.topics().asScala.map { topicResponse => - topicResponse.name() -> topicResponse.topicId() - }.toMap assertEquals(expectedResults, responseMap) - assertEquals(expectedIds, responseIdMap) } @Test diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestIBPTest/TopicIdWithOldInterBrokerProtocolTest.scala b/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala similarity index 97% rename from core/src/test/scala/unit/kafka/server/MetadataRequestIBPTest/TopicIdWithOldInterBrokerProtocolTest.scala rename to core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala index 5d5996a509c50..265ce87bf1fa2 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataRequestIBPTest/TopicIdWithOldInterBrokerProtocolTest.scala +++ b/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala @@ -15,12 +15,13 @@ * limitations under the License. */ -package kafka.server +package unit.kafka.server import java.util.{Arrays, Properties} import kafka.api.KAFKA_2_7_IV0 import kafka.network.SocketServer +import kafka.server.{BaseRequestTest, KafkaConfig} import kafka.utils.TestUtils import org.apache.kafka.common.Uuid import org.apache.kafka.common.message.DeleteTopicsRequestData @@ -30,8 +31,8 @@ import org.apache.kafka.common.requests.{DeleteTopicsRequest, DeleteTopicsRespon import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{BeforeEach, Test} -import scala.jdk.CollectionConverters._ import scala.collection.Seq +import scala.jdk.CollectionConverters._ class TopicIdWithOldInterBrokerProtocolTest extends BaseRequestTest { @@ -91,8 +92,7 @@ class TopicIdWithOldInterBrokerProtocolTest extends BaseRequestTest { new DeleteTopicsRequestData() .setTopics(Arrays.asList(new DeleteTopicState().setTopicId(ids("topic-7")), new DeleteTopicState().setTopicId(ids("topic-6")) - ) - ).setTimeoutMs(timeout)).build() + )).setTimeoutMs(timeout)).build() val response = sendDeleteTopicsRequest(request) val error = response.errorCounts.asScala assertEquals(2, error(Errors.UNSUPPORTED_VERSION)) From 46b109da445a5a2d51096c4033334e3aeb42a56e Mon Sep 17 00:00:00 2001 From: Justine Date: Thu, 28 Jan 2021 18:45:56 -0800 Subject: [PATCH 10/10] fix package name --- .../kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala b/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala index 265ce87bf1fa2..4d425c011cf8c 100644 --- a/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala +++ b/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package unit.kafka.server +package kafka.server import java.util.{Arrays, Properties}