Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic;
Expand Down Expand Up @@ -92,6 +93,16 @@ public MetadataRequest build(short version) {
if (!data.allowAutoTopicCreation() && version < 4)
throw new UnsupportedVersionException("MetadataRequest versions older than 4 don't support the " +
"allowAutoTopicCreation field");
if (data.topics() != null) {
data.topics().forEach(topic -> {
if (topic.name() == null)
throw new UnsupportedVersionException("MetadataRequest version " + version +
" does not support null topic names.");
if (topic.topicId() != Uuid.ZERO_UUID)
throw new UnsupportedVersionException("MetadataRequest version " + version +
" does not support non-zero topic IDs.");
});
}
return new MetadataRequest(data, version);
}

Expand All @@ -117,13 +128,17 @@ public MetadataRequestData data() {
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Errors error = Errors.forException(e);
MetadataResponseData responseData = new MetadataResponseData();
if (topics() != null) {
for (String topic : topics())
if (data.topics() != null) {
for (MetadataRequestTopic topic : data.topics()) {
// the response does not allow null, so convert to empty string if necessary
String topicName = topic.name() == null ? "" : topic.name();
responseData.topics().add(new MetadataResponseData.MetadataResponseTopic()
.setName(topic)
.setName(topicName)
.setTopicId(topic.topicId())
.setErrorCode(error.code())
.setIsInternal(false)
.setPartitions(Collections.emptyList()));
}
}

responseData.setThrottleTimeMs(throttleTimeMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
//
// Version 9 is the first flexible version.
//
// Version 10 adds topicId.
// Version 10 adds topicId and allows name field to be null. However, this functionality was not implemented on the server.
// Versions 10 and 11 should not use the topicId field or set topic name to null.
//
// Version 11 deprecates IncludeClusterAuthorizedOperations field. This is now exposed
// by the DescribeCluster API (KIP-700).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,21 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class MetadataRequestTest {

Expand Down Expand Up @@ -65,4 +70,24 @@ public void testMetadataRequestVersion() {
assertEquals(minVersion, builder3.oldestAllowedVersion());
assertEquals(maxVersion, builder3.latestAllowedVersion());
}

@Test
public void testTopicIdAndNullTopicNameRequests() {
// Construct invalid MetadataRequestTopics. We will build each one separately and ensure the error is thrown.
List<MetadataRequestData.MetadataRequestTopic> topics = Arrays.asList(
new MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(Uuid.randomUuid()),
new MetadataRequestData.MetadataRequestTopic().setName(null),
new MetadataRequestData.MetadataRequestTopic().setTopicId(Uuid.randomUuid()),
new MetadataRequestData.MetadataRequestTopic().setName("topic").setTopicId(Uuid.randomUuid()));

// if version is 10 or 11, the invalid topic metadata should return an error
List<Short> invalidVersions = Arrays.asList((short) 10, (short) 11);
invalidVersions.forEach(version ->
topics.forEach(topic -> {
MetadataRequestData metadataRequestData = new MetadataRequestData().setTopics(Collections.singletonList(topic));
MetadataRequest.Builder builder = new MetadataRequest.Builder(metadataRequestData);
assertThrows(UnsupportedVersionException.class, () -> builder.build(version));
})
);
}
}
11 changes: 11 additions & 0 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1142,6 +1142,17 @@ class KafkaApis(val requestChannel: RequestChannel,
val metadataRequest = request.body[MetadataRequest]
val requestVersion = request.header.apiVersion

// Topic IDs are not supported for versions 10 and 11. Topic names can not be null in these versions.
if (!metadataRequest.isAllTopics) {
metadataRequest.data.topics.forEach{ topic =>
if (topic.name == null) {
throw new InvalidRequestException(s"Topic name can not be null for version ${metadataRequest.version}")
} else if (topic.topicId != Uuid.ZERO_UUID) {
throw new InvalidRequestException(s"Topic IDs are not supported in requests for version ${metadataRequest.version}")
}
}
}

val topics = if (metadataRequest.isAllTopics)
metadataCache.getAllTopics()
else
Expand Down
38 changes: 38 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,44 @@ class KafkaApisTest {
numBrokersNeeded - 1)
}

@Test
def testInvalidMetadataRequestReturnsError(): Unit = {
// Construct invalid MetadataRequestTopics. We will try each one separately and ensure the error is thrown.
val topics = List(new MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(Uuid.randomUuid()),
new MetadataRequestData.MetadataRequestTopic().setName(null),
new MetadataRequestData.MetadataRequestTopic().setTopicId(Uuid.randomUuid()),
new MetadataRequestData.MetadataRequestTopic().setName("topic1").setTopicId(Uuid.randomUuid()))

EasyMock.replay(replicaManager, clientRequestQuotaManager,
autoTopicCreationManager, forwardingManager, clientControllerQuotaManager, groupCoordinator, txnCoordinator)

// if version is 10 or 11, the invalid topic metadata should return an error
val invalidVersions = Set(10, 11)
invalidVersions.foreach( version =>
topics.foreach(topic => {
val metadataRequestData = new MetadataRequestData().setTopics(Collections.singletonList(topic))
val request = buildRequest(new MetadataRequest(metadataRequestData, version.toShort))
val kafkaApis = createKafkaApis()

val capturedResponse = EasyMock.newCapture[AbstractResponse]()
EasyMock.expect(requestChannel.sendResponse(
EasyMock.eq(request),
EasyMock.capture(capturedResponse),
EasyMock.anyObject()
))

EasyMock.replay(requestChannel)
kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching)

val response = capturedResponse.getValue.asInstanceOf[MetadataResponse]
assertEquals(1, response.topicMetadata.size)
assertEquals(1, response.errorCounts.get(Errors.INVALID_REQUEST))
response.data.topics.forEach(topic => assertNotEquals(null, topic.name))
reset(requestChannel)
})
)
}

@Test
def testOffsetCommitWithInvalidPartition(): Unit = {
val topic = "topic"
Expand Down