Skip to content

Commit

Permalink
[improve][broker] System topic writer/reader connection not counted (#…
Browse files Browse the repository at this point in the history
…18603)

This PR is a supplement to #18369.
- `AbstractTopic.isSameAddressProducersExceeded()`
- `AbstractBaseDispatcher.isConsumersExceededOnSubscription()`

(cherry picked from commit a2fb562)
  • Loading branch information
yuruguo authored and congbobo184 committed Dec 6, 2022
1 parent 1d01714 commit 9ca74fc
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,18 +202,21 @@ private void individualAcknowledgeMessageIfNeeded(Position position, Map<String,
protected abstract boolean isConsumersExceededOnSubscription();

protected boolean isConsumersExceededOnSubscription(BrokerService brokerService,
String topic, int consumerSize) {
AbstractTopic topic, int consumerSize) {
if (topic.isSystemTopic()) {
return false;
}
Policies policies = null;
Integer maxConsumersPerSubscription = null;
try {
maxConsumersPerSubscription = brokerService
.getTopicPolicies(TopicName.get(topic))
.getTopicPolicies(TopicName.get(topic.getName()))
.map(TopicPolicies::getMaxConsumersPerSubscription)
.orElse(null);
if (maxConsumersPerSubscription == null) {
// Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
policies = brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesIfCached(TopicName.get(topic).getNamespaceObject()).orElse(null);
.getPoliciesIfCached(TopicName.get(topic.getName()).getNamespaceObject()).orElse(null);
}
} catch (Exception e) {
log.debug("Get topic or namespace policies fail", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ private long getUserCreatedProducersSize() {
}

protected boolean isSameAddressProducersExceeded(Producer producer) {
if (isSystemTopic() || producer.isRemote()) {
return false;
}
final int maxSameAddressProducers = brokerService.pulsar().getConfiguration()
.getMaxSameAddressProducersPerTopic();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce

@Override
protected boolean isConsumersExceededOnSubscription() {
return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumerList.size());
return isConsumersExceededOnSubscription(topic.getBrokerService(), topic, consumerList.size());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void sendMessages(List<Entry> entries) {

@Override
protected boolean isConsumersExceededOnSubscription() {
return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumers.size());
return isConsumersExceededOnSubscription(topic.getBrokerService(), topic, consumers.size());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce

@Override
protected boolean isConsumersExceededOnSubscription() {
return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumerList.size());
return isConsumersExceededOnSubscription(topic.getBrokerService(), topic, consumerList.size());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ protected void scheduleReadOnActiveConsumer() {

@Override
protected boolean isConsumersExceededOnSubscription() {
return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumers.size());
return isConsumersExceededOnSubscription(topic.getBrokerService(), topic, consumers.size());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ public void testSystemTopicNotCheckExceed() throws Exception {
admin.namespaces().createNamespace(ns, 2);
admin.topics().createPartitionedTopic(String.format("persistent://%s", topic), 1);

conf.setMaxSameAddressConsumersPerTopic(1);
admin.namespaces().setMaxConsumersPerTopic(ns, 1);
admin.topicPolicies().setMaxConsumers(topic, 1);
NamespaceEventsSystemTopicFactory systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient);
Expand All @@ -252,8 +253,9 @@ public void testSystemTopicNotCheckExceed() throws Exception {
SystemTopicClient.Reader reader1 = systemTopicClientForNamespace.newReader();
SystemTopicClient.Reader reader2 = systemTopicClientForNamespace.newReader();

conf.setMaxSameAddressProducersPerTopic(1);
admin.namespaces().setMaxProducersPerTopic(ns, 1);
admin.topicPolicies().setMaxProducers(topic, 1);

CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer1 = systemTopicClientForNamespace.newWriterAsync();
CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer2 = systemTopicClientForNamespace.newWriterAsync();
CompletableFuture<Void> f1 = admin.topicPolicies().setCompactionThresholdAsync(topic, 1L);
Expand Down

0 comments on commit 9ca74fc

Please sign in to comment.