Skip to content

Commit c9df1f8

Browse files
Radiancebobonicklixinyang
authored and
nicklixinyang
committed
[fix][admin]allowAutoTopicCreation and allowAutoTopicCreationType etc should be dynamically configured (apache#16023)
1 parent 014900a commit c9df1f8

File tree

6 files changed

+229
-9
lines changed

6 files changed

+229
-9
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

+10
Original file line numberDiff line numberDiff line change
@@ -544,12 +544,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
544544

545545
@FieldContext(
546546
category = CATEGORY_POLICIES,
547+
dynamic = true,
547548
doc = "Enable the deletion of inactive topics.\n"
548549
+ "If only enable this option, will not clean the metadata of partitioned topic."
549550
)
550551
private boolean brokerDeleteInactiveTopicsEnabled = true;
551552
@FieldContext(
552553
category = CATEGORY_POLICIES,
554+
dynamic = true,
553555
doc = "Metadata of inactive partitioned topic will not be automatically cleaned up by default.\n"
554556
+ "Note: If `allowAutoTopicCreation` and this option are enabled at the same time,\n"
555557
+ "it may appear that a partitioned topic has just been deleted but is automatically created as a "
@@ -558,12 +560,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
558560
private boolean brokerDeleteInactivePartitionedTopicMetadataEnabled = false;
559561
@FieldContext(
560562
category = CATEGORY_POLICIES,
563+
dynamic = true,
561564
doc = "How often to check for inactive topics"
562565
)
563566
private int brokerDeleteInactiveTopicsFrequencySeconds = 60;
564567

565568
@FieldContext(
566569
category = CATEGORY_POLICIES,
570+
dynamic = true,
567571
doc = "Set the inactive topic delete mode. Default is delete_when_no_subscriptions\n"
568572
+ "'delete_when_no_subscriptions' mode only delete the topic which has no subscriptions and no active "
569573
+ "producers\n"
@@ -575,6 +579,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
575579

576580
@FieldContext(
577581
category = CATEGORY_POLICIES,
582+
dynamic = true,
578583
doc = "Max duration of topic inactivity in seconds, default is not present\n"
579584
+ "If not present, 'brokerDeleteInactiveTopicsFrequencySeconds' will be used\n"
580585
+ "Topics that are inactive for longer than this value will be deleted"
@@ -1207,6 +1212,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
12071212

12081213
@FieldContext(
12091214
category = CATEGORY_SERVER,
1215+
dynamic = true,
12101216
doc = "The number of partitions per partitioned topic.\n"
12111217
+ "If try to create or update partitioned topics by exceeded number of partitions, then fail."
12121218
)
@@ -1779,21 +1785,25 @@ public class ServiceConfiguration implements PulsarConfiguration {
17791785
private double managedLedgerDefaultMarkDeleteRateLimit = 1.0;
17801786
@FieldContext(
17811787
category = CATEGORY_STORAGE_ML,
1788+
dynamic = true,
17821789
doc = "Allow automated creation of topics if set to true (default value)."
17831790
)
17841791
private boolean allowAutoTopicCreation = true;
17851792
@FieldContext(
17861793
category = CATEGORY_STORAGE_ML,
1794+
dynamic = true,
17871795
doc = "The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)"
17881796
)
17891797
private String allowAutoTopicCreationType = "non-partitioned";
17901798
@FieldContext(
17911799
category = CATEGORY_STORAGE_ML,
1800+
dynamic = true,
17921801
doc = "Allow automated creation of subscriptions if set to true (default value)."
17931802
)
17941803
private boolean allowAutoSubscriptionCreation = true;
17951804
@FieldContext(
17961805
category = CATEGORY_STORAGE_ML,
1806+
dynamic = true,
17971807
doc = "The number of partitioned topics that is allowed to be automatically created"
17981808
+ "if allowAutoTopicCreationType is partitioned."
17991809
)

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

+30
Original file line numberDiff line numberDiff line change
@@ -2319,9 +2319,39 @@ private void updateConfigurationAndRegisterListeners() {
23192319
// add listener to notify topic subscriptionTypesEnabled changed.
23202320
registerConfigurationListener("subscriptionTypesEnabled", this::updateBrokerSubscriptionTypesEnabled);
23212321

2322+
// add listener to notify partitioned topic defaultNumPartitions changed
2323+
registerConfigurationListener("defaultNumPartitions", defaultNumPartitions -> {
2324+
this.updateDefaultNumPartitions((int) defaultNumPartitions);
2325+
});
2326+
2327+
// add listener to notify partitioned topic maxNumPartitionsPerPartitionedTopic changed
2328+
registerConfigurationListener("maxNumPartitionsPerPartitionedTopic", maxNumPartitions -> {
2329+
this.updateMaxNumPartitionsPerPartitionedTopic((int) maxNumPartitions);
2330+
});
2331+
23222332
// add more listeners here
23232333
}
23242334

2335+
private void updateDefaultNumPartitions(int numPartitions) {
2336+
int maxNumPartitions = pulsar.getConfiguration().getMaxNumPartitionsPerPartitionedTopic();
2337+
if (maxNumPartitions == 0 || maxNumPartitions > numPartitions) {
2338+
this.pulsar.getConfiguration().setDefaultNumPartitions(numPartitions);
2339+
} else {
2340+
this.pulsar.getConfiguration().setDefaultNumPartitions(maxNumPartitions);
2341+
}
2342+
}
2343+
2344+
private void updateMaxNumPartitionsPerPartitionedTopic(int maxNumPartitions) {
2345+
if (maxNumPartitions == 0) {
2346+
this.pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(maxNumPartitions);
2347+
return;
2348+
}
2349+
if (this.pulsar.getConfiguration().getDefaultNumPartitions() > maxNumPartitions) {
2350+
this.pulsar.getConfiguration().setDefaultNumPartitions(maxNumPartitions);
2351+
}
2352+
this.pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(maxNumPartitions);
2353+
}
2354+
23252355
private void updateBrokerDispatchThrottlingMaxRate() {
23262356
if (brokerDispatchRateLimiter == null) {
23272357
brokerDispatchRateLimiter = new DispatchRateLimiter(this);

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java

+18
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import static org.testng.Assert.assertFalse;
2222
import static org.testng.Assert.assertTrue;
2323
import static org.testng.Assert.fail;
24+
import java.util.UUID;
2425
import java.util.concurrent.atomic.AtomicInteger;
26+
import org.apache.pulsar.client.admin.PulsarAdminException;
2527
import org.apache.pulsar.client.api.MessageId;
2628
import org.apache.pulsar.client.api.PulsarClientException;
2729
import org.apache.pulsar.common.naming.TopicName;
@@ -152,4 +154,20 @@ public void testNonPersistentTopicSubscriptionCreationWithAutoCreationDisable()
152154
assertTrue(admin.topics().getSubscriptions(topicName).contains(subscriptionName));
153155
}
154156

157+
@Test
158+
public void testDynamicConfigurationTopicAutoSubscriptionCreation()
159+
throws PulsarAdminException, PulsarClientException {
160+
pulsar.getConfiguration().setAllowAutoTopicCreation(false);
161+
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
162+
admin.brokers().updateDynamicConfiguration("allowAutoSubscriptionCreation", "false");
163+
String topicString = "persistent://prop/ns-abc/non-partitioned-topic" + UUID.randomUUID();
164+
String subscriptionName = "non-partitioned-topic-sub";
165+
admin.topics().createNonPartitionedTopic(topicString);
166+
Assert.assertThrows(PulsarClientException.class,
167+
()-> pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe());
168+
admin.brokers().updateDynamicConfiguration("allowAutoSubscriptionCreation", "true");
169+
pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
170+
assertTrue(admin.topics().getSubscriptions(topicString).contains(subscriptionName));
171+
}
172+
155173
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java

+101
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@
1818
*/
1919
package org.apache.pulsar.broker.service;
2020

21+
import static org.testng.Assert.assertEquals;
2122
import static org.testng.Assert.assertFalse;
2223
import static org.testng.Assert.assertTrue;
2324
import static org.testng.Assert.fail;
2425

26+
27+
import java.util.List;
28+
import java.util.UUID;
2529
import lombok.Cleanup;
2630
import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
2731
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -30,6 +34,7 @@
3034
import org.apache.pulsar.client.api.PulsarClientException;
3135
import org.apache.pulsar.common.naming.SystemTopicNames;
3236
import org.apache.pulsar.common.naming.TopicName;
37+
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
3338
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
3439
import org.apache.pulsar.common.policies.data.TopicType;
3540
import org.testng.Assert;
@@ -420,4 +425,100 @@ public void testAutoCreationOfSystemTopicNamespaceEvents() throws Exception {
420425
ListNamespaceTopicsOptions.builder().includeSystemTopic(true).build()).contains(topicString));
421426
assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
422427
}
428+
429+
@Test
430+
public void testDynamicConfigurationTopicAutoCreationDisable() throws PulsarAdminException {
431+
// test disable AllowAutoTopicCreation
432+
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
433+
admin.brokers().updateDynamicConfiguration("allowAutoTopicCreation", "false");
434+
final String namespaceName = "prop/ns-abc";
435+
final String topic = "persistent://" + namespaceName + "/test-dynamicConfiguration-topic-auto-creation-"
436+
+ UUID.randomUUID();
437+
Assert.assertThrows(PulsarClientException.NotFoundException.class,
438+
()-> pulsarClient.newProducer().topic(topic).create());
439+
}
440+
441+
@Test
442+
public void testDynamicConfigurationTopicAutoCreationNonPartitioned() throws PulsarAdminException, PulsarClientException {
443+
pulsar.getConfiguration().setAllowAutoTopicCreation(false);
444+
pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
445+
final String namespaceName = "prop/ns-abc";
446+
final String topic = "persistent://" + namespaceName + "/test-dynamicConfiguration-topic-auto-creation-"
447+
+ UUID.randomUUID();
448+
// test enable AllowAutoTopicCreation, non-partitioned
449+
admin.brokers().updateDynamicConfiguration("allowAutoTopicCreation", "true");
450+
admin.brokers().updateDynamicConfiguration("allowAutoTopicCreationType", "non-partitioned");
451+
Producer<byte[]> producer = pulsarClient.newProducer()
452+
.topic(topic)
453+
.create();
454+
List<String> topics = admin.topics().getList(namespaceName);
455+
List<String> partitionedTopicList = admin.topics().getPartitionedTopicList(namespaceName);
456+
assertEquals(topics.size(), 1);
457+
assertEquals(partitionedTopicList.size(), 0);
458+
producer.close();
459+
admin.topics().delete(topic);
460+
}
461+
462+
@Test
463+
public void testDynamicConfigurationTopicAutoCreationPartitioned() throws PulsarAdminException, PulsarClientException {
464+
pulsar.getConfiguration().setAllowAutoTopicCreation(false);
465+
pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned");
466+
pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(0);
467+
final String namespaceName = "prop/ns-abc";
468+
final String topic = "persistent://" + namespaceName + "/test-dynamicConfiguration-topic-auto-creation-"
469+
+ UUID.randomUUID();
470+
// test enable AllowAutoTopicCreation, partitioned
471+
admin.brokers().updateDynamicConfigurationAsync("allowAutoTopicCreation", "true");
472+
admin.brokers().updateDynamicConfiguration("maxNumPartitionsPerPartitionedTopic", "6");
473+
admin.brokers().updateDynamicConfiguration("allowAutoTopicCreationType", "partitioned");
474+
admin.brokers().updateDynamicConfiguration("defaultNumPartitions", "4");
475+
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
476+
List<String> topics = admin.topics().getList(namespaceName);
477+
List<String> partitionedTopicList = admin.topics().getPartitionedTopicList(namespaceName);
478+
PartitionedTopicMetadata partitionedTopicMetadata = admin.topics().getPartitionedTopicMetadata(topic);
479+
assertEquals(topics.size(), 4);
480+
assertEquals(partitionedTopicList.size(), 1);
481+
assertEquals(partitionedTopicMetadata.partitions, 4);
482+
producer.close();
483+
for (String t : topics) {
484+
admin.topics().delete(t);
485+
}
486+
}
487+
488+
@Test
489+
public void testDynamicConfigurationTopicAutoCreationPartitionedWhenDefaultMoreThanMax() throws PulsarAdminException, PulsarClientException {
490+
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
491+
pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
492+
pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(0);
493+
final String namespaceName = "prop/ns-abc";
494+
String topic = "persistent://" + namespaceName + "/test-dynamicConfiguration-topic-auto-creation-"
495+
+ UUID.randomUUID();
496+
// test enable AllowAutoTopicCreation, partitioned when maxNumPartitionsPerPartitionedTopic < defaultNumPartitions
497+
admin.brokers().updateDynamicConfiguration("maxNumPartitionsPerPartitionedTopic", "2");
498+
admin.brokers().updateDynamicConfiguration("defaultNumPartitions", "6");
499+
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
500+
List<String> topics = admin.topics().getList(namespaceName);
501+
List<String> partitionedTopicList = admin.topics().getPartitionedTopicList(namespaceName);
502+
PartitionedTopicMetadata partitionedTopicMetadata = admin.topics().getPartitionedTopicMetadata(topic);
503+
assertEquals(topics.size(), 2);
504+
assertEquals(partitionedTopicList.size(), 1);
505+
assertEquals(partitionedTopicMetadata.partitions, 2);
506+
producer.close();
507+
for (String t : topics) {
508+
admin.topics().delete(t);
509+
}
510+
511+
// set maxNumPartitionsPerPartitionedTopic, make maxNumPartitionsPerPartitionedTopic < defaultNumPartitions
512+
admin.brokers().updateDynamicConfiguration("maxNumPartitionsPerPartitionedTopic", "1");
513+
topic = "persistent://" + namespaceName + "/test-dynamicConfiguration-topic-auto-creation-"
514+
+ UUID.randomUUID();
515+
producer = pulsarClient.newProducer().topic(topic).create();
516+
topics = admin.topics().getList(namespaceName);
517+
assertEquals(topics.size(), 1);
518+
producer.close();
519+
for (String t : topics) {
520+
admin.topics().delete(t);
521+
}
522+
}
523+
423524
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java

+60
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.testng.Assert.assertFalse;
2323
import static org.testng.Assert.assertNotNull;
2424
import static org.testng.Assert.assertNull;
25+
import static org.testng.Assert.assertTrue;
2526
import com.google.common.collect.Sets;
2627
import java.util.Arrays;
2728
import java.util.HashMap;
@@ -596,4 +597,63 @@ public void testHealthTopicInactiveNotClean() throws Exception {
596597
Assert.assertTrue(V1Partitions.contains(healthCheckTopicV1));
597598
Assert.assertTrue(V2Partitions.contains(healthCheckTopicV2));
598599
}
600+
601+
@Test
602+
public void testDynamicConfigurationBrokerDeleteInactiveTopicsEnabled() throws Exception {
603+
conf.setBrokerDeleteInactiveTopicsEnabled(true);
604+
super.baseSetup();
605+
admin.brokers().updateDynamicConfiguration("brokerDeleteInactiveTopicsEnabled", "false");
606+
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(()->{
607+
assertFalse(conf.isBrokerDeleteInactiveTopicsEnabled());
608+
});
609+
}
610+
611+
@Test
612+
public void testDynamicConfigurationBrokerDeleteInactiveTopicsFrequencySeconds() throws Exception {
613+
conf.setBrokerDeleteInactiveTopicsFrequencySeconds(30);
614+
super.baseSetup();
615+
admin.brokers()
616+
.updateDynamicConfiguration("brokerDeleteInactiveTopicsFrequencySeconds", "60");
617+
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(()->{
618+
assertEquals(conf.getBrokerDeleteInactiveTopicsFrequencySeconds(), 60);
619+
});
620+
}
621+
622+
@Test
623+
public void testDynamicConfigurationBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds() throws Exception {
624+
conf.setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(30);
625+
super.baseSetup();
626+
admin.brokers()
627+
.updateDynamicConfiguration("brokerDeleteInactiveTopicsMaxInactiveDurationSeconds", "60");
628+
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(()->{
629+
assertEquals(conf.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), 60);
630+
});
631+
}
632+
633+
@Test
634+
public void testDynamicConfigurationBrokerDeleteInactiveTopicsMode() throws Exception {
635+
conf.setBrokerDeleteInactiveTopicsMode (InactiveTopicDeleteMode.delete_when_no_subscriptions);
636+
super.baseSetup();
637+
String expect = InactiveTopicDeleteMode.delete_when_subscriptions_caught_up.toString();
638+
admin.brokers()
639+
.updateDynamicConfiguration("brokerDeleteInactiveTopicsMode",
640+
expect);
641+
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(()->{
642+
assertEquals(conf.getBrokerDeleteInactiveTopicsMode().toString(), expect);
643+
});
644+
}
645+
646+
@Test
647+
public void testBrokerDeleteInactivePartitionedTopicMetadataEnabled() throws Exception {
648+
conf.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(false);
649+
super.baseSetup();
650+
admin.brokers()
651+
.updateDynamicConfiguration("brokerDeleteInactivePartitionedTopicMetadataEnabled",
652+
"true");
653+
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(()->{
654+
assertTrue(conf.isBrokerDeleteInactivePartitionedTopicMetadataEnabled());
655+
});
656+
}
657+
658+
599659
}

0 commit comments

Comments
 (0)