Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Return a correct metadata response when topic is not found (#1801)
Browse files Browse the repository at this point in the history
### Motivation


https://github.com/apache/kafka/blob/61530d68ce83467de6190a52da37b3c0af84f0ef/core/src/main/scala/kafka/server/KafkaApis.scala#L1203-L1210

When a topic is not found, Kafka only returns `INVALID_TOPIC_EXCEPTION`
when the topic name is invalid. If the topic name is valid,
`UNKNOWN_TOPIC_OR_PARTITION` will be returned.

### Modifications

- Refactor `getPartitionedTopicMetadataAsync` to return the correct
error code (and rename this method). For the case when any error happens
at the broker side, return `UNKNOWN_TOPIC_OR_PARTITION`.
- Do not print error logs when the topic is not found. Kafka also does
not print the logs. The Kafka client will print the logs for
`UNKNOWN_TOPIC_OR_PARTITION`.
- Add `testTopicMetadataNotFound` to verify the changes.
  • Loading branch information
BewareMyPower authored Apr 22, 2023
1 parent f0eda8e commit 4c22dfa
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.AddOffsetsToTxnRequestData;
import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData;
Expand Down Expand Up @@ -513,64 +515,63 @@ protected void handleInactive(KafkaHeaderAndRequest kafkaHeaderAndRequest,

// Leverage pulsar admin to get partitioned topic metadata
// NOTE: the returned future never completes exceptionally
private CompletableFuture<Integer> getPartitionedTopicMetadataAsync(String topicName,
boolean allowAutoTopicCreation) {
final CompletableFuture<Integer> future = new CompletableFuture<>();
admin.topics().getPartitionedTopicMetadataAsync(topicName).whenComplete((metadata, e) -> {
private CompletableFuture<TopicAndMetadata> getTopicMetadataAsync(String topic,
boolean allowAutoTopicCreation) {
final CompletableFuture<TopicAndMetadata> future = new CompletableFuture<>();
final TopicName topicName = TopicName.get(topic);
admin.topics().getPartitionedTopicMetadataAsync(topic).whenComplete((metadata, e) -> {
if (e == null) {
if (metadata.partitions > 0) {
if (log.isDebugEnabled()) {
log.debug("Topic {} has {} partitions", topicName, metadata.partitions);
}
future.complete(metadata.partitions);
} else {
future.complete(TopicAndMetadata.NON_PARTITIONED_NUMBER);
if (log.isDebugEnabled()) {
log.debug("Topic {} has {} partitions", topic, metadata.partitions);
}
future.complete(TopicAndMetadata.success(topic, metadata.partitions));
} else if (e instanceof PulsarAdminException.NotFoundException) {
if (allowAutoTopicCreation) {
String namespace = TopicName.get(topicName).getNamespace();
admin.namespaces().getPoliciesAsync(namespace).whenComplete((policies, err) -> {
if (err != null || policies == null) {
log.error("[{}] Cannot get policies for namespace {}", ctx.channel(), namespace, err);
future.complete(TopicAndMetadata.INVALID_PARTITIONS);
} else {
boolean allowed = kafkaConfig.isAllowAutoTopicCreation();
if (policies.autoTopicCreationOverride != null) {
allowed = policies.autoTopicCreationOverride.isAllowAutoTopicCreation();
}
if (!allowed) {
log.error("[{}] Topic {} doesn't exist and it's not allowed "
+ "to auto create partitioned topic", ctx.channel(), topicName);
future.complete(TopicAndMetadata.INVALID_PARTITIONS);
} else {
log.info("[{}] Topic {} doesn't exist, auto create it with {} partitions",
ctx.channel(), topicName, defaultNumPartitions);
admin.topics().createPartitionedTopicAsync(topicName, defaultNumPartitions)
.whenComplete((__, createException) -> {
if (createException == null) {
future.complete(defaultNumPartitions);
} else {
log.warn("[{}] Failed to create partitioned topic {}: {}",
ctx.channel(), topicName, createException.getMessage());
future.complete(TopicAndMetadata.INVALID_PARTITIONS);
}
});
}
(allowAutoTopicCreation ? checkAllowAutoTopicCreation(topicName.getNamespace())
: CompletableFuture.completedFuture(false)).whenComplete((allowed, err) -> {
if (err != null) {
log.error("[{}] Cannot get policies for namespace {}",
ctx.channel(), topicName.getNamespace(), err);
future.complete(TopicAndMetadata.failure(topic, Errors.UNKNOWN_SERVER_ERROR));
return;
}
if (allowed) {
admin.topics().createPartitionedTopicAsync(topic, defaultNumPartitions)
.whenComplete((__, createException) -> {
if (createException == null) {
future.complete(TopicAndMetadata.success(topic, defaultNumPartitions));
} else {
log.warn("[{}] Failed to create partitioned topic {}: {}",
ctx.channel(), topicName, createException.getMessage());
future.complete(TopicAndMetadata.failure(topic, Errors.UNKNOWN_SERVER_ERROR));
}
});
} else {
try {
Topic.validate(topicName.getLocalName());
future.complete(TopicAndMetadata.failure(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION));
} catch (InvalidTopicException ignored) {
future.complete(TopicAndMetadata.failure(topic, Errors.INVALID_TOPIC_EXCEPTION));
}
});
} else {
log.error("[{}] Topic {} doesn't exist and it's not allowed to auto create partitioned topic",
ctx.channel(), topicName, e);
future.complete(TopicAndMetadata.INVALID_PARTITIONS);
}
}
});
} else {
log.error("[{}] Failed to get partitioned topic {}", ctx.channel(), topicName, e);
future.complete(TopicAndMetadata.INVALID_PARTITIONS);
log.error("[{}] Failed to get partitioned topic {}", ctx.channel(), topic, e);
future.complete(TopicAndMetadata.failure(topic, Errors.UNKNOWN_SERVER_ERROR));
}
});
return future;
}

private CompletableFuture<Boolean> checkAllowAutoTopicCreation(String namespace) {
return admin.namespaces().getPoliciesAsync(namespace).thenApply(policies -> {
if (policies != null && policies.autoTopicCreationOverride != null) {
return policies.autoTopicCreationOverride.isAllowAutoTopicCreation();
} else {
return kafkaConfig.isAllowAutoTopicCreation();
}
});
}

private CompletableFuture<Set<String>> expandAllowedNamespaces(Set<String> allowedNamespaces) {
String currentTenant = getCurrentTenant(kafkaConfig.getKafkaTenant());
return expandAllowedNamespaces(allowedNamespaces, currentTenant, pulsarService);
Expand Down Expand Up @@ -626,10 +627,9 @@ private List<TopicAndMetadata> analyzeFullTopicNames(final Stream<String> fullTo
Collections.sort(partitionIndexes);
final int lastIndex = partitionIndexes.get(partitionIndexes.size() - 1);
if (lastIndex < 0) {
topicAndMetadataList.add(
new TopicAndMetadata(topic, TopicAndMetadata.NON_PARTITIONED_NUMBER));
topicAndMetadataList.add(TopicAndMetadata.success(topic, 0)); // non-partitioned topic
} else if (lastIndex == partitionIndexes.size() - 1) {
topicAndMetadataList.add(new TopicAndMetadata(topic, partitionIndexes.size()));
topicAndMetadataList.add(TopicAndMetadata.success(topic, partitionIndexes.size()));
} else {
// The partitions should be [0, 1, ..., n-1], `n` is the number of partitions. If the last index is not
// `n-1`, there must be some missed partitions.
Expand Down Expand Up @@ -683,17 +683,16 @@ private CompletableFuture<ListPair<String>> authorizeTopicsAsync(final Collectio

private CompletableFuture<List<TopicAndMetadata>> findTopicMetadata(final ListPair<String> listPair,
final boolean allowTopicAutoCreation) {
final Map<String, CompletableFuture<Integer>> futureMap = CoreUtils.listToMap(
final Map<String, CompletableFuture<TopicAndMetadata>> futureMap = CoreUtils.listToMap(
listPair.getSuccessfulList(),
topic -> getPartitionedTopicMetadataAsync(topic, allowTopicAutoCreation)
topic -> getTopicMetadataAsync(topic, allowTopicAutoCreation)
);
return CoreUtils.waitForAll(futureMap.values()).thenApply(__ ->
CoreUtils.mapToList(futureMap, (key, value) -> new TopicAndMetadata(key, value.join()))
CoreUtils.mapToList(futureMap, (___, value) -> value.join())
).thenApply(authorizedTopicAndMetadataList ->
ListUtils.union(authorizedTopicAndMetadataList,
CoreUtils.listToList(listPair.getFailedList(),
topic -> new TopicAndMetadata(topic, TopicAndMetadata.AUTHORIZATION_FAILURE))
)
topic -> TopicAndMetadata.failure(topic, Errors.TOPIC_AUTHORIZATION_FAILED)))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,20 @@
@Getter
public class TopicAndMetadata {

public static final int INVALID_PARTITIONS = -2;
public static final int AUTHORIZATION_FAILURE = -1;
public static final int NON_PARTITIONED_NUMBER = 0;

private final String topic;
private final int numPartitions;
private final Errors error;

public boolean isPartitionedTopic() {
return numPartitions > 0;
public static TopicAndMetadata success(String topic, int numPartitions) {
return new TopicAndMetadata(topic, numPartitions, Errors.NONE);
}

public boolean hasNoError() {
return numPartitions >= 0;
public static TopicAndMetadata failure(String topic, Errors error) {
return new TopicAndMetadata(topic, -1, error);
}

public Errors error() {
if (hasNoError()) {
return Errors.NONE;
} else if (numPartitions == AUTHORIZATION_FAILURE) {
return Errors.TOPIC_AUTHORIZATION_FAILED;
} else {
return Errors.UNKNOWN_TOPIC_OR_PARTITION;
}
public boolean hasNoError() {
return error == Errors.NONE;
}

public CompletableFuture<TopicMetadata> lookupAsync(
Expand All @@ -70,14 +61,14 @@ public CompletableFuture<TopicMetadata> lookupAsync(
.map(lookupFunction)
.collect(Collectors.toList()), partitionMetadataList ->
new TopicMetadata(
error(),
error,
getOriginalTopic.apply(topic),
KopTopic.isInternalTopic(topic, metadataNamespace),
partitionMetadataList
));
}

public Stream<String> stream() {
private Stream<String> stream() {
if (numPartitions > 0) {
return IntStream.range(0, numPartitions)
.mapToObj(i -> topic + "-partition-" + i);
Expand All @@ -89,7 +80,7 @@ public Stream<String> stream() {
public TopicMetadata toTopicMetadata(final Function<String, String> getOriginalTopic,
final String metadataNamespace) {
return new TopicMetadata(
error(),
error,
getOriginalTopic.apply(topic),
KopTopic.isInternalTopic(topic, metadataNamespace),
Collections.emptyList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -106,8 +107,8 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Ignore;
import org.testng.annotations.Test;

Expand All @@ -127,7 +128,7 @@ protected void resetConfig() {
+ SSL_PREFIX + "127.0.0.1:" + kafkaBrokerPortTls);
}

@BeforeMethod
@BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
Expand Down Expand Up @@ -161,7 +162,7 @@ protected void setup() throws Exception {
serviceAddress = new InetSocketAddress(pulsar.getBindAddress(), kafkaBrokerPort);
}

@AfterMethod
@AfterClass
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
Expand Down Expand Up @@ -682,7 +683,7 @@ private List<String> getCreatedTopics(String topicName, int numTopics) {
}

private KafkaHeaderAndRequest createTopicMetadataRequest(List<String> topics) {
AbstractRequest.Builder builder = new MetadataRequest.Builder(topics, true);
MetadataRequest.Builder builder = new MetadataRequest.Builder(topics, false);
return buildRequest(builder);
}

Expand Down Expand Up @@ -1071,7 +1072,7 @@ public void testIllegalManagedLedger() throws Exception {
*/
@Test(timeOut = 60000)
public void testFetchMinBytesSingleConsumer() throws Exception {
final String topic = "testMinBytesTopic";
final String topic = "testMinBytesTopicSingleConsumer";
final TopicPartition topicPartition = new TopicPartition(topic, 0);
admin.topics().createPartitionedTopic(topic, 1);
triggerTopicLookup(topic, 1);
Expand Down Expand Up @@ -1105,4 +1106,18 @@ public void testFetchMinBytesSingleConsumer() throws Exception {
Long waitingFetchesTriggered = kafkaRequestHandler.getRequestStats().getWaitingFetchesTriggered().get();
assertEquals((long) waitingFetchesTriggered, 1);
}

@Test(timeOut = 30000)
public void testTopicMetadataNotFound() {
final Function<String, Errors> getMetadataResponseError = topic -> {
final CompletableFuture<AbstractResponse> future = new CompletableFuture<>();
kafkaRequestHandler.handleTopicMetadataRequest(
createTopicMetadataRequest(Collections.singletonList(topic)), future);
final MetadataResponse response = (MetadataResponse) future.join();
assertTrue(response.errors().containsKey(topic));
return response.errors().get(topic);
};
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, getMetadataResponseError.apply("test-topic-not-found._-"));
assertEquals(Errors.INVALID_TOPIC_EXCEPTION, getMetadataResponseError.apply("???"));
}
}

0 comments on commit 4c22dfa

Please sign in to comment.