Skip to content

Commit

Permalink
[Issue 9173] [Non Persistent Topics] Auto-create partitions even when…
Browse files Browse the repository at this point in the history
… the auto-creation is disabled
  • Loading branch information
mlyahmed committed Mar 4, 2021
1 parent 7ba9816 commit 1a6273b
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
Expand Down Expand Up @@ -797,9 +798,21 @@ public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean c
return this.loadOrCreatePersistentTopic(topicName, createIfMissing);
});
} else {
return topics.computeIfAbsent(topic, (topicName) -> {
if (createIfMissing) {
return createNonPersistentTopic(topicName);
return topics.computeIfAbsent(topic, (name) -> {
final TopicName topicName = TopicName.get(name);
if (topicName.isPartitioned()) {
try {
final PartitionedTopicMetadata metadata = pulsar().getAdminClient().topics()
.getPartitionedTopicMetadata(topicName.getPartitionedTopicName());
if (topicName.getPartitionIndex() < metadata.partitions) {
return createNonPersistentTopic(name);
}
return CompletableFuture.completedFuture(Optional.empty());
} catch (PulsarAdminException|PulsarServerException e) {
return FutureUtil.failedFuture(e);
}
} else if (createIfMissing) {
return createNonPersistentTopic(name);
} else {
return CompletableFuture.completedFuture(Optional.empty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

Expand Down Expand Up @@ -52,6 +53,8 @@
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -98,6 +101,58 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test(timeOut = 90000 /* 1.5mn */)
public void testNonPersistentPartitionsAreNotAutoCreatedWhenThePartitionedTopicDoesNotExist() throws Exception {
final boolean defaultAllowAutoTopicCreation = conf.isAllowAutoTopicCreation();
try {
// Given the auto topic creation is disabled
cleanup();
conf.setAllowAutoTopicCreation(false);
setup();

final String topicPartitionName = "non-persistent://public/default/issue-9173-partition-0";

// Then error when subscribe to a partition of a non-persistent topic that does not exist
assertThrows(PulsarClientException.TopicDoesNotExistException.class,
() -> pulsarClient.newConsumer().topic(topicPartitionName).subscriptionName("sub-issue-9173").subscribe());

// Then error when produce to a partition of a non-persistent topic that does not exist
assertThrows(PulsarClientException.TopicDoesNotExistException.class,
() -> pulsarClient.newProducer().topic(topicPartitionName).create());
} finally {
conf.setAllowAutoTopicCreation(defaultAllowAutoTopicCreation);
}
}

@Test(timeOut = 90000 /* 1.5mn */)
public void testAutoCreateNonPersistentPartitionsWhenThePartitionedTopicExists() throws Exception {
final boolean defaultAllowAutoTopicCreation = conf.isAllowAutoTopicCreation();
try {
// Given the auto topic creation is disabled
cleanup();
conf.setAllowAutoTopicCreation(false);
setup();

// Given the non-persistent partitioned topic exists
final String topic = "non-persistent://public/default/issue-9173";
admin.topics().createPartitionedTopic(topic, 3);

// When subscribe, then a sub-consumer is created for each partition which means the partitions are created
final MultiTopicsConsumerImpl<byte[]> consumer = (MultiTopicsConsumerImpl<byte[]>) pulsarClient.newConsumer()
.topic(topic).subscriptionName("sub-issue-9173").subscribe();
assertEquals(consumer.getConsumers().size(), 3);

// When produce, a sub-producer is created for each partition which means the partitions are created
PartitionedProducerImpl<byte[]> producer = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer().topic(topic).create();
assertEquals(producer.getProducers().size(), 3);

consumer.close();
producer.close();
} finally {
conf.setAllowAutoTopicCreation(defaultAllowAutoTopicCreation);
}
}

@Test(dataProvider = "subscriptionType")
public void testNonPersistentTopic(SubscriptionType type) throws Exception {
log.info("-- Starting {} test --", methodName);
Expand Down

0 comments on commit 1a6273b

Please sign in to comment.