File tree 2 files changed +21
-1
lines changed
main/java/org/apache/pulsar/broker/service
test/java/org/apache/pulsar/broker/service
2 files changed +21
-1
lines changed Original file line number Diff line number Diff line change 100
100
import org .apache .pulsar .broker .intercept .BrokerInterceptor ;
101
101
import org .apache .pulsar .broker .intercept .ManagedLedgerInterceptorImpl ;
102
102
import org .apache .pulsar .broker .loadbalance .LoadManager ;
103
+ import org .apache .pulsar .broker .loadbalance .extensions .channel .ServiceUnitStateChannelImpl ;
103
104
import org .apache .pulsar .broker .namespace .NamespaceService ;
104
105
import org .apache .pulsar .broker .resources .DynamicConfigurationResources ;
105
106
import org .apache .pulsar .broker .resources .LocalPoliciesResources ;
@@ -3291,10 +3292,19 @@ private CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(final TopicName
3291
3292
topicName .getNamespaceObject ());
3292
3293
return CompletableFuture .completedFuture (false );
3293
3294
}
3294
- //System topic can always be created automatically
3295
+
3296
+ // ServiceUnitStateChannelImpl.TOPIC expects to be a non-partitioned-topic now.
3297
+ // We don't allow the auto-creation here.
3298
+ // ServiceUnitStateChannelImpl.start() is responsible to create the topic.
3299
+ if (ServiceUnitStateChannelImpl .TOPIC .equals (topicName .toString ())) {
3300
+ return CompletableFuture .completedFuture (false );
3301
+ }
3302
+
3303
+ //Other system topics can be created automatically
3295
3304
if (pulsar .getConfiguration ().isSystemTopicEnabled () && isSystemTopic (topicName )) {
3296
3305
return CompletableFuture .completedFuture (true );
3297
3306
}
3307
+
3298
3308
final boolean allowed ;
3299
3309
AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride (topicName , policies );
3300
3310
if (autoTopicCreationOverride != null ) {
Original file line number Diff line number Diff line change 72
72
import org .apache .http .client .methods .HttpGet ;
73
73
import org .apache .http .impl .client .HttpClientBuilder ;
74
74
import org .apache .pulsar .broker .PulsarService ;
75
+ import org .apache .pulsar .broker .loadbalance .extensions .channel .ServiceUnitStateChannelImpl ;
75
76
import org .apache .pulsar .broker .namespace .NamespaceService ;
76
77
import org .apache .pulsar .broker .service .BrokerServiceException .PersistenceException ;
77
78
import org .apache .pulsar .broker .service .persistent .PersistentTopic ;
@@ -1520,4 +1521,13 @@ public void testDynamicConfigurationsForceDeleteTenantAllowed() throws Exception
1520
1521
assertTrue (conf .isForceDeleteTenantAllowed ());
1521
1522
});
1522
1523
}
1524
+
1525
+ @ Test
1526
+ public void testIsSystemTopicAllowAutoTopicCreationAsync () throws Exception {
1527
+ BrokerService brokerService = pulsar .getBrokerService ();
1528
+ assertFalse (brokerService .isAllowAutoTopicCreationAsync (
1529
+ ServiceUnitStateChannelImpl .TOPIC ).get ());
1530
+ assertTrue (brokerService .isAllowAutoTopicCreationAsync (
1531
+ "persistent://pulsar/system/my-system-topic" ).get ());
1532
+ }
1523
1533
}
You can’t perform that action at this time.
0 commit comments