Skip to content

Commit 1080ad5

Browse files
authored
[fix][broker] pre-create non-partitioned system topics for load balance extension (#20370)
PIP: #16691 ### Motivation We need to create system topics without partitions explicitly. Currently, we do not support partitioned system topics. ### Modifications create system topics without partitions explicitly
1 parent 2e6928a commit 1080ad5

File tree

4 files changed

+23
-0
lines changed

4 files changed

+23
-0
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java

+17
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
8080
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
8181
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
82+
import org.apache.pulsar.client.admin.PulsarAdminException;
8283
import org.apache.pulsar.common.naming.NamespaceBundle;
8384
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
8485
import org.apache.pulsar.common.naming.NamespaceName;
@@ -213,6 +214,19 @@ public static boolean debug(ServiceConfiguration config, Logger log) {
213214
return config.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled();
214215
}
215216

217+
public static void createSystemTopic(PulsarService pulsar, String topic) throws PulsarServerException {
218+
try {
219+
pulsar.getAdminClient().topics().createNonPartitionedTopic(topic);
220+
log.info("Created topic {}.", topic);
221+
} catch (PulsarAdminException.ConflictException ex) {
222+
if (debug(pulsar.getConfiguration(), log)) {
223+
log.info("Topic {} already exists.", topic, ex);
224+
}
225+
} catch (PulsarAdminException e) {
226+
throw new PulsarServerException(e);
227+
}
228+
}
229+
216230
@Override
217231
public void start() throws PulsarServerException {
218232
if (this.started) {
@@ -247,6 +261,9 @@ public void start() throws PulsarServerException {
247261
this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies);
248262
this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper));
249263

264+
createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC);
265+
createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
266+
250267
try {
251268
this.brokerLoadDataStore = LoadDataStoreFactory
252269
.create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

+2
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,8 @@ public synchronized void start() throws PulsarServerException {
294294
PulsarClusterMetadataSetup.createNamespaceIfAbsent
295295
(pulsar.getPulsarResources(), SYSTEM_NAMESPACE, config.getClusterName());
296296

297+
ExtensibleLoadManagerImpl.createSystemTopic(pulsar, TOPIC);
298+
297299
producer = pulsar.getClient().newProducer(schema)
298300
.enableBatching(true)
299301
.compressionType(MSG_COMPRESSION_TYPE)

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
import org.apache.pulsar.common.policies.data.BundlesData;
9898
import org.apache.pulsar.common.policies.data.ClusterData;
9999
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
100+
import org.apache.pulsar.common.policies.data.TopicType;
100101
import org.apache.pulsar.common.stats.Metrics;
101102
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
102103
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
@@ -129,6 +130,7 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
129130
@Override
130131
public void setup() throws Exception {
131132
conf.setForceDeleteNamespaceAllowed(true);
133+
conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
132134
conf.setAllowAutoTopicCreation(true);
133135
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
134136
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
import org.apache.pulsar.client.api.PulsarClientException;
8787
import org.apache.pulsar.client.api.TypedMessageBuilder;
8888
import org.apache.pulsar.client.impl.TableViewImpl;
89+
import org.apache.pulsar.common.policies.data.TopicType;
8990
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
9091
import org.apache.pulsar.metadata.api.MetadataStoreException;
9192
import org.apache.pulsar.metadata.api.NotificationType;
@@ -129,6 +130,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
129130
@Override
130131
protected void setup() throws Exception {
131132
conf.setAllowAutoTopicCreation(true);
133+
conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
132134
conf.setLoadBalancerDebugModeEnabled(true);
133135
conf.setBrokerServiceCompactionMonitorIntervalInSeconds(10);
134136
super.internalSetup(conf);

0 commit comments

Comments
 (0)