Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Broker] Optimize TopicPolicies#maxConsumerPerTopic with HierarchyTopicPolicies #13361

Merged
merged 3 commits into from
Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public AbstractTopic(String topic, BrokerService brokerService) {
protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic());
topicPolicies.getMaxProducersPerTopic().updateTopicValue(data.getMaxProducerPerTopic());
topicPolicies.getMaxConsumerPerTopic().updateTopicValue(data.getMaxConsumerPerTopic());
topicPolicies.getInactiveTopicPolicies().updateTopicValue(data.getInactiveTopicPolicies());
topicPolicies.getDeduplicationEnabled().updateTopicValue(data.getDeduplicationEnabled());
topicPolicies.getSubscriptionTypesEnabled().updateTopicValue(
Expand All @@ -175,6 +176,7 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
topicPolicies.getMessageTTLInSeconds().updateNamespaceValue(namespacePolicies.message_ttl_in_seconds);
topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic);
topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic);
topicPolicies.getMaxConsumerPerTopic().updateNamespaceValue(namespacePolicies.max_consumers_per_topic);
topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies);
topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
topicPolicies.getSubscriptionTypesEnabled().updateNamespaceValue(
Expand All @@ -194,6 +196,7 @@ private void updateTopicPolicyByBrokerConfig() {
updateBrokerSubscriptionTypesEnabled();
topicPolicies.getMaxSubscriptionsPerTopic().updateBrokerValue(config.getMaxSubscriptionsPerTopic());
topicPolicies.getMaxProducersPerTopic().updateBrokerValue(config.getMaxProducersPerTopic());
topicPolicies.getMaxConsumerPerTopic().updateBrokerValue(config.getMaxConsumersPerTopic());
topicPolicies.getDeduplicationEnabled().updateBrokerValue(config.isBrokerDeduplicationEnabled());
//init backlogQuota
topicPolicies.getBackLogQuotaMap()
Expand Down Expand Up @@ -273,18 +276,7 @@ public int getNumberOfSameAddressProducers(final String clientAddress) {
}

protected boolean isConsumersExceededOnTopic() {
Integer maxConsumers = getTopicPolicies().map(TopicPolicies::getMaxConsumerPerTopic).orElse(null);
if (maxConsumers == null) {

// Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks
Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(
TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());

maxConsumers = policies.max_consumers_per_topic;
}
final int maxConsumersPerTopic = maxConsumers != null ? maxConsumers
: brokerService.pulsar().getConfiguration().getMaxConsumersPerTopic();
int maxConsumersPerTopic = topicPolicies.getMaxConsumerPerTopic().get();
if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= getNumberOfConsumers()) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ public class HierarchyTopicPolicies {
final Map<BacklogQuotaType, PolicyHierarchyValue<BacklogQuota>> backLogQuotaMap;
final PolicyHierarchyValue<Integer> topicMaxMessageSize;
final PolicyHierarchyValue<Integer> messageTTLInSeconds;
final PolicyHierarchyValue<Integer> maxConsumerPerTopic;

public HierarchyTopicPolicies() {
deduplicationEnabled = new PolicyHierarchyValue<>();
inactiveTopicPolicies = new PolicyHierarchyValue<>();
subscriptionTypesEnabled = new PolicyHierarchyValue<>();
maxSubscriptionsPerTopic = new PolicyHierarchyValue<>();
maxProducersPerTopic = new PolicyHierarchyValue<>();
maxConsumerPerTopic = new PolicyHierarchyValue<>();
backLogQuotaMap = new ImmutableMap.Builder<BacklogQuotaType, PolicyHierarchyValue<BacklogQuota>>()
.put(BacklogQuotaType.destination_storage, new PolicyHierarchyValue<>())
.put(BacklogQuotaType.message_age, new PolicyHierarchyValue<>())
Expand Down