diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index 816f600061568..d38e9ac47b80a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.MetadataRequestData; import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic; @@ -92,6 +93,16 @@ public MetadataRequest build(short version) { if (!data.allowAutoTopicCreation() && version < 4) throw new UnsupportedVersionException("MetadataRequest versions older than 4 don't support the " + "allowAutoTopicCreation field"); + if (data.topics() != null) { + data.topics().forEach(topic -> { + if (topic.name() == null) + throw new UnsupportedVersionException("MetadataRequest version " + version + + " does not support null topic names."); + if (topic.topicId() != Uuid.ZERO_UUID) + throw new UnsupportedVersionException("MetadataRequest version " + version + + " does not support non-zero topic IDs."); + }); + } return new MetadataRequest(data, version); } @@ -117,13 +128,17 @@ public MetadataRequestData data() { public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { Errors error = Errors.forException(e); MetadataResponseData responseData = new MetadataResponseData(); - if (topics() != null) { - for (String topic : topics()) + if (data.topics() != null) { + for (MetadataRequestTopic topic : data.topics()) { + // the response does not allow null, so convert to empty string if necessary + String topicName = topic.name() == null ? "" : topic.name(); responseData.topics().add(new MetadataResponseData.MetadataResponseTopic() - .setName(topic) + .setName(topicName) + .setTopicId(topic.topicId()) .setErrorCode(error.code()) .setIsInternal(false) .setPartitions(Collections.emptyList())); + } } responseData.setThrottleTimeMs(throttleTimeMs); diff --git a/clients/src/main/resources/common/message/MetadataRequest.json b/clients/src/main/resources/common/message/MetadataRequest.json index e5083a84ea343..a1634b19970d7 100644 --- a/clients/src/main/resources/common/message/MetadataRequest.json +++ b/clients/src/main/resources/common/message/MetadataRequest.json @@ -33,7 +33,8 @@ // // Version 9 is the first flexible version. // - // Version 10 adds topicId. + // Version 10 adds topicId and allows name field to be null. However, this functionality was not implemented on the server. + // Versions 10 and 11 should not use the topicId field or set topic name to null. // // Version 11 deprecates IncludeClusterAuthorizedOperations field. This is now exposed // by the DescribeCluster API (KIP-700). diff --git a/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java index e51523297597d..74c217df91f86 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java @@ -16,16 +16,21 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.MetadataRequestData; import org.apache.kafka.common.protocol.ApiKeys; import org.junit.jupiter.api.Test; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertThrows; public class MetadataRequestTest { @@ -65,4 +70,24 @@ public void testMetadataRequestVersion() { assertEquals(minVersion, builder3.oldestAllowedVersion()); assertEquals(maxVersion, builder3.latestAllowedVersion()); } + + @Test + public void testTopicIdAndNullTopicNameRequests() { + // Construct invalid MetadataRequestTopics. We will build each one separately and ensure the error is thrown. + List topics = Arrays.asList( + new MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(Uuid.randomUuid()), + new MetadataRequestData.MetadataRequestTopic().setName(null), + new MetadataRequestData.MetadataRequestTopic().setTopicId(Uuid.randomUuid()), + new MetadataRequestData.MetadataRequestTopic().setName("topic").setTopicId(Uuid.randomUuid())); + + // if version is 10 or 11, the invalid topic metadata should return an error + List invalidVersions = Arrays.asList((short) 10, (short) 11); + invalidVersions.forEach(version -> + topics.forEach(topic -> { + MetadataRequestData metadataRequestData = new MetadataRequestData().setTopics(Collections.singletonList(topic)); + MetadataRequest.Builder builder = new MetadataRequest.Builder(metadataRequestData); + assertThrows(UnsupportedVersionException.class, () -> builder.build(version)); + }) + ); + } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index bafc0411253b4..2724de39a19a7 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1142,6 +1142,17 @@ class KafkaApis(val requestChannel: RequestChannel, val metadataRequest = request.body[MetadataRequest] val requestVersion = request.header.apiVersion + // Topic IDs are not supported for versions 10 and 11. Topic names can not be null in these versions. + if (!metadataRequest.isAllTopics) { + metadataRequest.data.topics.forEach{ topic => + if (topic.name == null) { + throw new InvalidRequestException(s"Topic name can not be null for version ${metadataRequest.version}") + } else if (topic.topicId != Uuid.ZERO_UUID) { + throw new InvalidRequestException(s"Topic IDs are not supported in requests for version ${metadataRequest.version}") + } + } + } + val topics = if (metadataRequest.isAllTopics) metadataCache.getAllTopics() else diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 9a0bdb04211ca..a6da170a426ad 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -1052,6 +1052,44 @@ class KafkaApisTest { numBrokersNeeded - 1) } + @Test + def testInvalidMetadataRequestReturnsError(): Unit = { + // Construct invalid MetadataRequestTopics. We will try each one separately and ensure the error is thrown. + val topics = List(new MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(Uuid.randomUuid()), + new MetadataRequestData.MetadataRequestTopic().setName(null), + new MetadataRequestData.MetadataRequestTopic().setTopicId(Uuid.randomUuid()), + new MetadataRequestData.MetadataRequestTopic().setName("topic1").setTopicId(Uuid.randomUuid())) + + EasyMock.replay(replicaManager, clientRequestQuotaManager, + autoTopicCreationManager, forwardingManager, clientControllerQuotaManager, groupCoordinator, txnCoordinator) + + // if version is 10 or 11, the invalid topic metadata should return an error + val invalidVersions = Set(10, 11) + invalidVersions.foreach( version => + topics.foreach(topic => { + val metadataRequestData = new MetadataRequestData().setTopics(Collections.singletonList(topic)) + val request = buildRequest(new MetadataRequest(metadataRequestData, version.toShort)) + val kafkaApis = createKafkaApis() + + val capturedResponse = EasyMock.newCapture[AbstractResponse]() + EasyMock.expect(requestChannel.sendResponse( + EasyMock.eq(request), + EasyMock.capture(capturedResponse), + EasyMock.anyObject() + )) + + EasyMock.replay(requestChannel) + kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching) + + val response = capturedResponse.getValue.asInstanceOf[MetadataResponse] + assertEquals(1, response.topicMetadata.size) + assertEquals(1, response.errorCounts.get(Errors.INVALID_REQUEST)) + response.data.topics.forEach(topic => assertNotEquals(null, topic.name)) + reset(requestChannel) + }) + ) + } + @Test def testOffsetCommitWithInvalidPartition(): Unit = { val topic = "topic"