Skip to content

Commit

Permalink
[improve][broker] Change type of allowAutoTopicCreationType to Topi…
Browse files Browse the repository at this point in the history
…cType (#18814)
  • Loading branch information
yuruguo authored Dec 8, 2022
1 parent e0bdfbc commit 816acc1
Show file tree
Hide file tree
Showing 21 changed files with 90 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1914,7 +1914,7 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se
dynamic = true,
doc = "The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)"
)
private String allowAutoTopicCreationType = "non-partitioned";
private TopicType allowAutoTopicCreationType = TopicType.NON_PARTITIONED;
@FieldContext(
category = CATEGORY_STORAGE_ML,
dynamic = true,
Expand Down Expand Up @@ -3201,7 +3201,7 @@ public void setProperties(Properties properties) {
}

public boolean isDefaultTopicTypePartitioned() {
return TopicType.PARTITIONED.toString().equals(allowAutoTopicCreationType);
return TopicType.PARTITIONED.equals(allowAutoTopicCreationType);
}

public int getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
Expand Down Expand Up @@ -1554,17 +1555,17 @@ public void testDeleteTenant() throws Exception {
@AllArgsConstructor
private static class NamespaceAttr {
private boolean systemTopicEnabled;
private String autoTopicCreationType;
private TopicType autoTopicCreationType;
private int defaultNumPartitions;
private boolean forceDeleteNamespaceAllowed;
}

@DataProvider(name = "namespaceAttrs")
public Object[][] namespaceAttributes(){
return new Object[][]{
{new NamespaceAttr(false, "non-partitioned", 0, false)},
{new NamespaceAttr(true, "non-partitioned", 0, false)},
{new NamespaceAttr(true, "partitioned", 3, false)}
{new NamespaceAttr(false, TopicType.NON_PARTITIONED, 0, false)},
{new NamespaceAttr(true, TopicType.NON_PARTITIONED, 0, false)},
{new NamespaceAttr(true, TopicType.PARTITIONED, 3, false)}
};
}

Expand Down Expand Up @@ -2071,7 +2072,7 @@ public void testMaxTopicsPerNamespace() throws Exception {
cleanup();
conf.setMaxTopicsPerNamespace(10);
conf.setDefaultNumPartitions(3);
conf.setAllowAutoTopicCreationType("partitioned");
conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
setup();
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));
Expand All @@ -2089,7 +2090,7 @@ public void testMaxTopicsPerNamespace() throws Exception {
// check producer/consumer auto create non-partitioned topic
cleanup();
conf.setMaxTopicsPerNamespace(3);
conf.setAllowAutoTopicCreationType("non-partitioned");
conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
setup();
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Set.of("test"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
Expand Down Expand Up @@ -1682,7 +1683,7 @@ public void testMaxTopicsPerNamespace() throws Exception {
cleanup();
conf.setMaxTopicsPerNamespace(0);
conf.setDefaultNumPartitions(3);
conf.setAllowAutoTopicCreationType("partitioned");
conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
initAndStartBroker();

admin.tenants().createTenant("testTenant", tenantInfo);
Expand Down Expand Up @@ -1711,7 +1712,7 @@ public void testMaxTopicsPerNamespace() throws Exception {
cleanup();
conf.setMaxTopicsPerNamespace(0);
conf.setDefaultNumPartitions(1);
conf.setAllowAutoTopicCreationType("non-partitioned");
conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
initAndStartBroker();

admin.tenants().createTenant("testTenant", tenantInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.TopicType;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand All @@ -51,7 +52,7 @@ public class TopicAutoCreationTest extends ProducerConsumerBase {
@Override
@BeforeMethod
protected void setup() throws Exception {
conf.setAllowAutoTopicCreationType("partitioned");
conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
conf.setAllowAutoTopicCreation(true);
conf.setDefaultNumPartitions(3);
super.internalSetup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
Expand Down Expand Up @@ -544,7 +545,7 @@ protected ServiceConfiguration getDefaultConf() {
configuration.setDefaultNumberOfNamespaceBundles(1);
configuration.setMetadataStoreUrl("zk:localhost:2181");
configuration.setConfigurationMetadataStoreUrl("zk:localhost:3181");
configuration.setAllowAutoTopicCreationType("non-partitioned");
configuration.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
configuration.setBrokerShutdownTimeoutMs(0L);
configuration.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
configuration.setBrokerServicePort(Optional.of(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.CreateMode;
Expand Down Expand Up @@ -104,7 +105,7 @@ protected void startBroker() throws Exception {
conf.setDefaultNumberOfNamespaceBundles(1);
conf.setMetadataStoreUrl("zk:localhost:2181");
conf.setConfigurationMetadataStoreUrl("zk:localhost:3181");
conf.setAllowAutoTopicCreationType("non-partitioned");
conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
conf.setBookkeeperClientExposeStatsToPrometheus(true);
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -125,7 +126,7 @@ void setup() throws Exception {
config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER);
config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
config.setAllowAutoTopicCreationType("non-partitioned");
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
config.setSystemTopicEnabled(false);
config.setTopicLevelPoliciesEnabled(false);
config.setForceDeleteNamespaceAllowed(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.testng.Assert;
Expand Down Expand Up @@ -88,7 +89,7 @@ protected void setup() throws Exception {
config.setManagedLedgerMaxEntriesPerLedger(5);
config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
config.setAdvertisedAddress("127.0.0.1");
config.setAllowAutoTopicCreationType("non-partitioned");
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
config.setMetadataStoreOperationTimeoutSeconds(10);
config.setNumIOThreads(1);
Properties properties = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.zookeeper.CreateMode;
Expand Down Expand Up @@ -160,7 +161,7 @@ public void testBookieIsolation() throws Exception {
config.setManagedLedgerDefaultWriteQuorum(2);
config.setManagedLedgerDefaultAckQuorum(2);

config.setAllowAutoTopicCreationType("non-partitioned");
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);

int totalEntriesPerLedger = 20;
int totalLedgers = totalPublish / totalEntriesPerLedger;
Expand Down Expand Up @@ -314,7 +315,7 @@ public void testSetRackInfoAndAffinityGroupDuringProduce() throws Exception {
config.setManagedLedgerDefaultWriteQuorum(2);
config.setManagedLedgerDefaultAckQuorum(2);

config.setAllowAutoTopicCreationType("non-partitioned");
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);

int totalEntriesPerLedger = 20;
int totalLedgers = totalPublish / totalEntriesPerLedger;
Expand Down Expand Up @@ -457,7 +458,7 @@ public void testStrictBookieIsolation() throws Exception {
config.setManagedLedgerDefaultWriteQuorum(2);
config.setManagedLedgerDefaultAckQuorum(2);

config.setAllowAutoTopicCreationType("non-partitioned");
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);

int totalEntriesPerLedger = 20;
int totalLedgers = totalPublish / totalEntriesPerLedger;
Expand Down Expand Up @@ -613,7 +614,7 @@ public void testBookieIsolationWithSecondaryGroup() throws Exception {
config.setManagedLedgerDefaultEnsembleSize(2);
config.setManagedLedgerDefaultWriteQuorum(2);
config.setManagedLedgerDefaultAckQuorum(2);
config.setAllowAutoTopicCreationType("non-partitioned");
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);

int totalEntriesPerLedger = 20;
int totalLedgers = totalPublish / totalEntriesPerLedger;
Expand Down Expand Up @@ -754,7 +755,7 @@ public void testDeleteIsolationGroup() throws Exception {
config.setManagedLedgerDefaultEnsembleSize(2);
config.setManagedLedgerDefaultWriteQuorum(2);
config.setManagedLedgerDefaultAckQuorum(2);
config.setAllowAutoTopicCreationType("non-partitioned");
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);

config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
pulsarService = new PulsarService(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected void cleanupTest() throws Exception {
@Test
public void testAutoNonPartitionedTopicCreation() throws Exception{
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned");
pulsar.getConfiguration().setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);

final String topicString = "persistent://prop/ns-abc/non-partitioned-topic";
final String subscriptionName = "non-partitioned-topic-sub";
Expand All @@ -80,7 +80,7 @@ public void testAutoNonPartitionedTopicCreation() throws Exception{
@Test
public void testAutoNonPartitionedTopicCreationOnProduce() throws Exception{
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned");
pulsar.getConfiguration().setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);

final String topicString = "persistent://prop/ns-abc/non-partitioned-topic-2";
pulsarClient.newProducer().topic(topicString).create();
Expand All @@ -92,7 +92,7 @@ public void testAutoNonPartitionedTopicCreationOnProduce() throws Exception{
@Test
public void testAutoPartitionedTopicCreation() throws Exception{
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
pulsar.getConfiguration().setAllowAutoTopicCreationType(TopicType.PARTITIONED);
pulsar.getConfiguration().setDefaultNumPartitions(3);

final String topicString = "persistent://prop/ns-abc/partitioned-topic";
Expand All @@ -108,7 +108,7 @@ public void testAutoPartitionedTopicCreation() throws Exception{
@Test
public void testAutoPartitionedTopicCreationOnProduce() throws Exception{
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
pulsar.getConfiguration().setAllowAutoTopicCreationType(TopicType.PARTITIONED);
pulsar.getConfiguration().setDefaultNumPartitions(3);

final String topicString = "persistent://prop/ns-abc/partitioned-topic-1";
Expand Down Expand Up @@ -138,7 +138,7 @@ public void testAutoTopicCreationDisable() throws Exception{
@Test
public void testAutoTopicCreationDisableIfNonPartitionedTopicAlreadyExist() throws Exception {
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
pulsar.getConfiguration().setAllowAutoTopicCreationType(TopicType.PARTITIONED);
pulsar.getConfiguration().setDefaultNumPartitions(3);

final String topicString = "persistent://prop/ns-abc/test-topic-2";
Expand All @@ -162,7 +162,7 @@ public void testAutoTopicCreationDisableIfNonPartitionedTopicAlreadyExist() thro
@Test
public void testGetPartitionedMetadataWithoutCheckAllowAutoCreation() throws Exception{
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
pulsar.getConfiguration().setAllowAutoTopicCreationType(TopicType.PARTITIONED);
pulsar.getConfiguration().setDefaultNumPartitions(3);

final String topicString = "persistent://prop/ns-abc/test-topic-3";
Expand Down Expand Up @@ -244,7 +244,7 @@ public void testAutoCreationNamespaceOverridesTopicTypePartitioned() throws Exce
final TopicName topicName = TopicName.get(topicString);

pulsar.getConfiguration().setAllowAutoTopicCreation(true);
pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned");
pulsar.getConfiguration().setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
AutoTopicCreationOverride.builder()
.allowAutoTopicCreation(true)
Expand All @@ -267,7 +267,7 @@ public void testAutoCreationNamespaceOverridesTopicTypeNonPartitioned() throws E
final TopicName topicName = TopicName.get(topicString);

pulsar.getConfiguration().setAllowAutoTopicCreation(true);
pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
pulsar.getConfiguration().setAllowAutoTopicCreationType(TopicType.PARTITIONED);
pulsar.getConfiguration().setDefaultNumPartitions(2);
pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
AutoTopicCreationOverride.builder()
Expand All @@ -288,7 +288,7 @@ public void testAutoCreationNamespaceOverridesDefaultNumPartitions() throws Exce
final TopicName topicName = TopicName.get(topicString);

pulsar.getConfiguration().setAllowAutoTopicCreation(true);
pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
pulsar.getConfiguration().setAllowAutoTopicCreationType(TopicType.PARTITIONED);
pulsar.getConfiguration().setDefaultNumPartitions(2);
pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
AutoTopicCreationOverride.builder()
Expand Down Expand Up @@ -385,7 +385,7 @@ public void testAutoCreationNamespaceOverridesSubscriptionTopicCreation() throws
@Test
public void testMaxNumPartitionsPerPartitionedTopicTopicCreation() {
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
pulsar.getConfiguration().setAllowAutoTopicCreationType(TopicType.PARTITIONED);
pulsar.getConfiguration().setDefaultNumPartitions(3);
pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(2);

Expand Down Expand Up @@ -442,7 +442,7 @@ public void testDynamicConfigurationTopicAutoCreationDisable() throws PulsarAdmi
@Test
public void testDynamicConfigurationTopicAutoCreationNonPartitioned() throws PulsarAdminException, PulsarClientException {
pulsar.getConfiguration().setAllowAutoTopicCreation(false);
pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
pulsar.getConfiguration().setAllowAutoTopicCreationType(TopicType.PARTITIONED);
final String namespaceName = "prop/ns-abc";
final String topic = "persistent://" + namespaceName + "/test-dynamicConfiguration-topic-auto-creation-"
+ UUID.randomUUID();
Expand All @@ -463,7 +463,7 @@ public void testDynamicConfigurationTopicAutoCreationNonPartitioned() throws Pul
@Test
public void testDynamicConfigurationTopicAutoCreationPartitioned() throws PulsarAdminException, PulsarClientException {
pulsar.getConfiguration().setAllowAutoTopicCreation(false);
pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned");
pulsar.getConfiguration().setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(0);
final String namespaceName = "prop/ns-abc";
final String topic = "persistent://" + namespaceName + "/test-dynamicConfiguration-topic-auto-creation-"
Expand All @@ -489,7 +489,7 @@ public void testDynamicConfigurationTopicAutoCreationPartitioned() throws Pulsar
@Test
public void testDynamicConfigurationTopicAutoCreationPartitionedWhenDefaultMoreThanMax() throws PulsarAdminException, PulsarClientException {
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
pulsar.getConfiguration().setAllowAutoTopicCreationType(TopicType.PARTITIONED);
pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(0);
final String namespaceName = "prop/ns-abc";
String topic = "persistent://" + namespaceName + "/test-dynamicConfiguration-topic-auto-creation-"
Expand Down
Loading

0 comments on commit 816acc1

Please sign in to comment.