Skip to content

Commit

Permalink
[improve][broker] System topic writer/reader connection not counted (#…
Browse files Browse the repository at this point in the history
…18603)

This PR is a supplement to #18369.
- `AbstractTopic.isSameAddressProducersExceeded()`
- `AbstractBaseDispatcher.isConsumersExceededOnSubscription()`
  • Loading branch information
yuruguo authored Nov 28, 2022
1 parent b579bb8 commit a2fb562
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,12 @@ protected void acquirePermitsForDeliveredMessages(Topic topic, ManagedCursor cur
protected abstract boolean isConsumersExceededOnSubscription();

protected boolean isConsumersExceededOnSubscription(AbstractTopic topic, int consumerSize) {
if (topic.isSystemTopic()) {
return false;
}
Integer maxConsumersPerSubscription = topic.getHierarchyTopicPolicies().getMaxConsumersPerSubscription().get();
return maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerSize;
return maxConsumersPerSubscription != null && maxConsumersPerSubscription > 0
&& maxConsumersPerSubscription <= consumerSize;
}

private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,9 @@ protected void unregisterTopicPolicyListener() {
}

protected boolean isSameAddressProducersExceeded(Producer producer) {
if (isSystemTopic() || producer.isRemote()) {
return false;
}
final int maxSameAddressProducers = brokerService.pulsar().getConfiguration()
.getMaxSameAddressProducersPerTopic();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ public void testSystemTopicNotCheckExceed() throws Exception {
admin.namespaces().createNamespace(ns, 2);
admin.topics().createPartitionedTopic(String.format("persistent://%s", topic), 1);

conf.setMaxSameAddressConsumersPerTopic(1);
admin.namespaces().setMaxConsumersPerTopic(ns, 1);
admin.topicPolicies().setMaxConsumers(topic, 1);
NamespaceEventsSystemTopicFactory systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient);
Expand All @@ -280,8 +281,9 @@ public void testSystemTopicNotCheckExceed() throws Exception {
SystemTopicClient.Reader reader1 = systemTopicClientForNamespace.newReader();
SystemTopicClient.Reader reader2 = systemTopicClientForNamespace.newReader();

conf.setMaxSameAddressProducersPerTopic(1);
admin.namespaces().setMaxProducersPerTopic(ns, 1);
admin.topicPolicies().setMaxProducers(topic, 1);

CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer1 = systemTopicClientForNamespace.newWriterAsync();
CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer2 = systemTopicClientForNamespace.newWriterAsync();
CompletableFuture<Void> f1 = admin.topicPolicies().setCompactionThresholdAsync(topic, 1L);
Expand Down

0 comments on commit a2fb562

Please sign in to comment.