Skip to content

Commit 16eea03

Browse files
committed
guarantees the behavior of non-persistent topic is the same as before
1 parent 196069b commit 16eea03

File tree

2 files changed

+62
-22
lines changed

2 files changed

+62
-22
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -556,16 +556,7 @@ protected CompletableFuture<PartitionedTopicMetadata> internalGetPartitionedMeta
556556
} else {
557557
// If it does not exist, response a Not Found error.
558558
// Otherwise, response a non-partitioned metadata.
559-
if (topicName.isPersistent()) {
560-
return internalCheckTopicExists(topicName).thenApply(__ -> metadata);
561-
} else {
562-
// Regarding non-persistent topic, we do not know whether it exists or not.
563-
// Just return a non-partitioned metadata if partitioned metadata does not
564-
// exist.
565-
// Broker will respond a not found error when doing subscribing or producing if
566-
// broker not allow to auto create topics.
567-
return CompletableFuture.completedFuture(metadata);
568-
}
559+
return internalCheckTopicExists(topicName).thenApply(__ -> metadata);
569560
}
570561
});
571562
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java

Lines changed: 61 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.Semaphore;
2828
import lombok.extern.slf4j.Slf4j;
2929
import org.apache.pulsar.broker.BrokerTestUtil;
30+
import org.apache.pulsar.broker.ServiceConfiguration;
3031
import org.apache.pulsar.client.api.ProducerConsumerBase;
3132
import org.apache.pulsar.client.api.PulsarClient;
3233
import org.apache.pulsar.client.api.PulsarClientException;
@@ -38,6 +39,7 @@
3839
import org.apache.pulsar.common.policies.data.TopicType;
3940
import org.apache.pulsar.common.util.FutureUtil;
4041
import org.awaitility.Awaitility;
42+
import org.testng.Assert;
4143
import org.testng.annotations.AfterMethod;
4244
import org.testng.annotations.DataProvider;
4345
import org.testng.annotations.Test;
@@ -325,17 +327,20 @@ public Object[][] autoCreationParamsNotAllow(){
325327
{true, false, false},
326328
{false, false, true},
327329
{false, false, false},
328-
// These test cases are for the following PR.
329-
// Which was described in the Motivation of https://github.com/apache/pulsar/pull/22206.
330-
//{false, true, true},
331-
//{false, true, false},
330+
{false, true, true},
331+
{false, true, false},
332332
};
333333
}
334334

335335
@Test(dataProvider = "autoCreationParamsNotAllow")
336336
public void testGetMetadataIfNotAllowedCreate(boolean configAllowAutoTopicCreation,
337337
boolean paramMetadataAutoCreationEnabled,
338338
boolean isUsingHttpLookup) throws Exception {
339+
if (!configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) {
340+
// These test cases are for the following PR.
341+
// Which was described in the Motivation of https://github.com/apache/pulsar/pull/22206.
342+
return;
343+
}
339344
conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
340345
conf.setDefaultNumPartitions(3);
341346
conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation);
@@ -379,7 +384,34 @@ public void testGetMetadataIfNotAllowedCreate(boolean configAllowAutoTopicCreati
379384
client.close();
380385
}
381386

382-
@Test(dataProvider = "autoCreationParamsNotAllow")
387+
@DataProvider(name = "autoCreationParamsForNonPersistentTopic")
388+
public Object[][] autoCreationParamsForNonPersistentTopic(){
389+
return new Object[][]{
390+
// configAllowAutoTopicCreation, paramCreateIfAutoCreationEnabled, isUsingHttpLookup.
391+
{true, true, true},
392+
{true, true, false},
393+
{false, true, true},
394+
{false, true, false},
395+
{false, false, true}
396+
};
397+
}
398+
399+
/**
400+
* Regarding the API "get partitioned metadata" about non-persistent topic.
401+
* The original behavior is:
402+
* param-auto-create = true, broker-config-auto-create = true
403+
* HTTP API: default configuration {@link ServiceConfiguration#getDefaultNumPartitions()}
404+
* binary API: default configuration {@link ServiceConfiguration#getDefaultNumPartitions()}
405+
* param-auto-create = true, broker-config-auto-create = false
406+
* HTTP API: {partitions: 0}
407+
* binary API: {partitions: 0}
408+
* param-auto-create = false
409+
* HTTP API: not found error
410+
* binary API: not support
411+
* This test only guarantees that the behavior is the same as before. The following separated PR will fix the
412+
* incorrect behavior.
413+
*/
414+
@Test(dataProvider = "autoCreationParamsForNonPersistentTopic")
383415
public void testGetNonPersistentMetadataIfNotAllowedCreate(boolean configAllowAutoTopicCreation,
384416
boolean paramMetadataAutoCreationEnabled,
385417
boolean isUsingHttpLookup) throws Exception {
@@ -399,17 +431,34 @@ public void testGetNonPersistentMetadataIfNotAllowedCreate(boolean configAllowAu
399431
// Regarding non-persistent topic, we do not know whether it exists or not.
400432
// Broker will return a non-partitioned metadata if partitioned metadata does not exist.
401433
PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
434+
435+
if (!configAllowAutoTopicCreation && !paramMetadataAutoCreationEnabled && isUsingHttpLookup) {
436+
try {
437+
lookup.getPartitionedTopicMetadata(TopicName.get(topicNameStr), paramMetadataAutoCreationEnabled)
438+
.join();
439+
Assert.fail("Expected a not found ex");
440+
} catch (Exception ex) {
441+
// Cleanup.
442+
client.close();
443+
return;
444+
}
445+
}
446+
402447
PartitionedTopicMetadata metadata = lookup
403448
.getPartitionedTopicMetadata(TopicName.get(topicNameStr), paramMetadataAutoCreationEnabled).join();
404-
assertEquals(metadata.partitions, 0);
449+
if (configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) {
450+
assertEquals(metadata.partitions, 3);
451+
} else {
452+
assertEquals(metadata.partitions, 0);
453+
}
405454

406455
List<String> partitionedTopics = admin.topics().getPartitionedTopicList("public/default");
407-
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().partitionedTopicExists(topicName);
408-
assertFalse(partitionedTopics.contains(topicNameStr));
409-
List<String> topicList = admin.topics().getList("public/default");
410-
assertFalse(topicList.contains(topicNameStr));
411-
for (int i = 0; i < 3; i++) {
412-
assertFalse(topicList.contains(topicName.getPartition(i)));
456+
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
457+
.partitionedTopicExists(topicName);
458+
if (configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) {
459+
assertTrue(partitionedTopics.contains(topicNameStr));
460+
} else {
461+
assertFalse(partitionedTopics.contains(topicNameStr));
413462
}
414463

415464
// Verify: lookup semaphore has been releases.

0 commit comments

Comments
 (0)