diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index c6f043e7429d4..7d42129505d32 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -153,6 +153,7 @@ public AbstractTopic(String topic, BrokerService brokerService) { protected void updateTopicPolicy(TopicPolicies data) { topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic()); topicPolicies.getMaxProducersPerTopic().updateTopicValue(data.getMaxProducerPerTopic()); + topicPolicies.getMaxConsumerPerTopic().updateTopicValue(data.getMaxConsumerPerTopic()); topicPolicies.getInactiveTopicPolicies().updateTopicValue(data.getInactiveTopicPolicies()); topicPolicies.getDeduplicationEnabled().updateTopicValue(data.getDeduplicationEnabled()); topicPolicies.getSubscriptionTypesEnabled().updateTopicValue( @@ -175,6 +176,7 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) { topicPolicies.getMessageTTLInSeconds().updateNamespaceValue(namespacePolicies.message_ttl_in_seconds); topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic); topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic); + topicPolicies.getMaxConsumerPerTopic().updateNamespaceValue(namespacePolicies.max_consumers_per_topic); topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies); topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled); topicPolicies.getSubscriptionTypesEnabled().updateNamespaceValue( @@ -194,6 +196,7 @@ private void updateTopicPolicyByBrokerConfig() { updateBrokerSubscriptionTypesEnabled(); topicPolicies.getMaxSubscriptionsPerTopic().updateBrokerValue(config.getMaxSubscriptionsPerTopic()); topicPolicies.getMaxProducersPerTopic().updateBrokerValue(config.getMaxProducersPerTopic()); + topicPolicies.getMaxConsumerPerTopic().updateBrokerValue(config.getMaxConsumersPerTopic()); topicPolicies.getDeduplicationEnabled().updateBrokerValue(config.isBrokerDeduplicationEnabled()); //init backlogQuota topicPolicies.getBackLogQuotaMap() @@ -273,18 +276,7 @@ public int getNumberOfSameAddressProducers(final String clientAddress) { } protected boolean isConsumersExceededOnTopic() { - Integer maxConsumers = getTopicPolicies().map(TopicPolicies::getMaxConsumerPerTopic).orElse(null); - if (maxConsumers == null) { - - // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks - Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached( - TopicName.get(topic).getNamespaceObject()) - .orElseGet(() -> new Policies()); - - maxConsumers = policies.max_consumers_per_topic; - } - final int maxConsumersPerTopic = maxConsumers != null ? maxConsumers - : brokerService.pulsar().getConfiguration().getMaxConsumersPerTopic(); + int maxConsumersPerTopic = topicPolicies.getMaxConsumerPerTopic().get(); if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= getNumberOfConsumers()) { return true; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 67211c6c905a6..ab28238722462 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -838,6 +838,8 @@ public void testAddRemoveConsumerDurableCursor() throws Exception { private void testMaxConsumersShared() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + topic.initialize().join(); + assertEquals((int) topic.getHierarchyTopicPolicies().getMaxConsumerPerTopic().get(), 3); PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false); PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock, false); @@ -930,6 +932,9 @@ public void testMaxConsumersSharedForNamespace() throws Exception { when(pulsar.getPulsarResources().getNamespaceResources() .getPoliciesIfCached(TopicName.get(successTopicName).getNamespaceObject())) .thenReturn(Optional.of(policies)); + when(pulsar.getPulsarResources().getNamespaceResources() + .getPoliciesAsync(TopicName.get(successTopicName).getNamespaceObject())) + .thenReturn(CompletableFuture.completedFuture(Optional.of(policies))); testMaxConsumersShared(); } @@ -937,6 +942,8 @@ public void testMaxConsumersSharedForNamespace() throws Exception { private void testMaxConsumersFailover() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + topic.initialize().join(); + assertEquals((int) topic.getHierarchyTopicPolicies().getMaxConsumerPerTopic().get(), 3); PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false); PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock, false); @@ -1029,6 +1036,9 @@ public void testMaxConsumersFailoverForNamespace() throws Exception { when(pulsar.getPulsarResources().getNamespaceResources() .getPoliciesIfCached(TopicName.get(successTopicName).getNamespaceObject())) .thenReturn(Optional.of(policies)); + when(pulsar.getPulsarResources().getNamespaceResources() + .getPoliciesAsync(TopicName.get(successTopicName).getNamespaceObject())) + .thenReturn(CompletableFuture.completedFuture(Optional.of(policies))); testMaxConsumersFailover(); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java index d78ef182f5d3f..0393def383455 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java @@ -39,6 +39,7 @@ public class HierarchyTopicPolicies { final Map> backLogQuotaMap; final PolicyHierarchyValue topicMaxMessageSize; final PolicyHierarchyValue messageTTLInSeconds; + final PolicyHierarchyValue maxConsumerPerTopic; public HierarchyTopicPolicies() { deduplicationEnabled = new PolicyHierarchyValue<>(); @@ -46,6 +47,7 @@ public HierarchyTopicPolicies() { subscriptionTypesEnabled = new PolicyHierarchyValue<>(); maxSubscriptionsPerTopic = new PolicyHierarchyValue<>(); maxProducersPerTopic = new PolicyHierarchyValue<>(); + maxConsumerPerTopic = new PolicyHierarchyValue<>(); backLogQuotaMap = new ImmutableMap.Builder>() .put(BacklogQuotaType.destination_storage, new PolicyHierarchyValue<>()) .put(BacklogQuotaType.message_age, new PolicyHierarchyValue<>())