Skip to content

Commit

Permalink
[broker] Optimize TopicPolicy#maxProducersPerTopic with HierarchyTopi…
Browse files Browse the repository at this point in the history
…cPolicies (#13082)
  • Loading branch information
Jason918 authored Dec 3, 2021
1 parent 4dd4bd6 commit de51284
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public AbstractTopic(String topic, BrokerService brokerService) {

protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic());
topicPolicies.getMaxProducersPerTopic().updateTopicValue(data.getMaxProducerPerTopic());
topicPolicies.getInactiveTopicPolicies().updateTopicValue(data.getInactiveTopicPolicies());
topicPolicies.getDeduplicationEnabled().updateTopicValue(data.getDeduplicationEnabled());
Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(type ->
Expand All @@ -164,6 +165,7 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
return;
}
topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic);
topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic);
topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies);
topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(
Expand All @@ -179,6 +181,7 @@ private void updateTopicPolicyByBrokerConfig(HierarchyTopicPolicies topicPolicie
config.isBrokerDeleteInactiveTopicsEnabled()));

topicPolicies.getMaxSubscriptionsPerTopic().updateBrokerValue(config.getMaxSubscriptionsPerTopic());
topicPolicies.getMaxProducersPerTopic().updateBrokerValue(config.getMaxProducersPerTopic());
topicPolicies.getDeduplicationEnabled().updateBrokerValue(config.isBrokerDeduplicationEnabled());
//init backlogQuota
topicPolicies.getBackLogQuotaMap()
Expand All @@ -192,16 +195,7 @@ private void updateTopicPolicyByBrokerConfig(HierarchyTopicPolicies topicPolicie
}

protected boolean isProducersExceeded() {
Integer maxProducers = getTopicPolicies().map(TopicPolicies::getMaxProducerPerTopic).orElse(null);

if (maxProducers == null) {
Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesIfCached(TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());
maxProducers = policies.max_producers_per_topic;
}
maxProducers = maxProducers != null ? maxProducers : brokerService.pulsar()
.getConfiguration().getMaxProducersPerTopic();
Integer maxProducers = topicPolicies.getMaxProducersPerTopic().get();
if (maxProducers > 0 && maxProducers <= producers.size()) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
Expand Down Expand Up @@ -965,16 +966,51 @@ public void testGetMaxProducerApplied() throws Exception {
assertEquals(admin.topicPolicies().getMaxProducers(topic, true).intValue(), conf.getMaxProducersPerTopic());
}

private void waitTopicPoliciesApplied(String topic, int partitions,
java.util.function.Consumer<HierarchyTopicPolicies> condition) {
TopicName topicName = TopicName.get(topic);
if (partitions > 0) {
for (int i = 0; i < partitions; i++) {
String partition = topicName.getPartition(i).toString();
Awaitility.await().untilAsserted(() -> {
Topic t = pulsar.getBrokerService().getTopicIfExists(partition).get().get();
assertTrue(t instanceof AbstractTopic);
condition.accept(((AbstractTopic) t).getHierarchyTopicPolicies());
});
}
} else {
Awaitility.await().untilAsserted(() -> {
Topic t = pulsar.getBrokerService().getTopicIfExists(topic).get().get();
assertTrue(t instanceof AbstractTopic);
condition.accept(((AbstractTopic) t).getHierarchyTopicPolicies());
});
}
}

@Test
public void testSetMaxProducers() throws Exception {
Integer maxProducers = 2;
log.info("MaxProducers: {} will set to the topic: {}", maxProducers, persistenceTopic);

//broker level setting is 4
conf.setMaxProducersPerTopic(4);
admin.topics().createPartitionedTopic(persistenceTopic, 2);
waitTopicPoliciesApplied(persistenceTopic, 2, hierarchyTopicPolicies -> {
assertEquals((int) hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 4);
});
//ns level setting is 3
admin.namespaces().setMaxProducersPerTopic(myNamespace, 3);
waitTopicPoliciesApplied(persistenceTopic, 2, hierarchyTopicPolicies -> {
assertEquals((int) hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 3);
});
//topic level setting is 2
admin.topicPolicies().setMaxProducers(persistenceTopic, maxProducers);
waitTopicPoliciesApplied(persistenceTopic, 2, hierarchyTopicPolicies -> {
assertEquals((int) hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 2);
});

Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getMaxProducers(persistenceTopic), maxProducers));
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getMaxProducers(persistenceTopic),
maxProducers));

Producer<byte[]> producer1 = pulsarClient.newProducer().topic(persistenceTopic).create();
Producer<byte[]> producer2 = pulsarClient.newProducer().topic(persistenceTopic).create();
Expand All @@ -990,6 +1026,37 @@ public void testSetMaxProducers() throws Exception {
Assert.assertNotNull(producer2);
Assert.assertNull(producer3);

admin.topicPolicies().removeMaxProducers(persistenceTopic);
waitTopicPoliciesApplied(persistenceTopic, 2, hierarchyTopicPolicies -> {
assertEquals((int) hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 3);
});
producer3 = pulsarClient.newProducer().topic(persistenceTopic).create();

Producer<byte[]> producer4;
try {
producer4 = pulsarClient.newProducer().topic(persistenceTopic).create();
Assert.fail();
} catch (PulsarClientException e) {
log.info("Topic reached max producers limit on topic level.");
}

admin.namespaces().removeMaxProducersPerTopic(myNamespace);
waitTopicPoliciesApplied(persistenceTopic, 2, hierarchyTopicPolicies -> {
assertEquals((int) hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 4);
});
producer4 = pulsarClient.newProducer().topic(persistenceTopic).create();

try {
Producer<byte[]> producer5 = pulsarClient.newProducer().topic(persistenceTopic).create();
Assert.fail();
} catch (PulsarClientException e) {
log.info("Topic reached max producers limit on topic level.");
}
producer1.close();
producer2.close();
producer3.close();
producer4.close();

admin.topics().deletePartitionedTopic(persistenceTopic, true);
admin.topics().deletePartitionedTopic(testTopic, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ public void testProducerOverwrite() throws Exception {

private void testMaxProducers() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
topic.initialize();
String role = "appid1";
// 1. add producer1
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name1", role,
Expand Down Expand Up @@ -577,8 +578,11 @@ public void testMaxProducersForNamespace() throws Exception {
.thenReturn(Optional.of(policies));

when(pulsar.getPulsarResources().getNamespaceResources()
.getPolicies(TopicName.get(successTopicName).getNamespaceObject()))
.thenReturn(Optional.of(policies));
.getPolicies(TopicName.get(successTopicName).getNamespaceObject()))
.thenReturn(Optional.of(policies));
when(pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(successTopicName).getNamespaceObject()))
.thenReturn(CompletableFuture.completedFuture(Optional.of(policies)));
testMaxProducers();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ public class HierarchyTopicPolicies {
final PolicyHierarchyValue<Boolean> deduplicationEnabled;
final PolicyHierarchyValue<InactiveTopicPolicies> inactiveTopicPolicies;
final PolicyHierarchyValue<Integer> maxSubscriptionsPerTopic;
final PolicyHierarchyValue<Integer> maxProducersPerTopic;
final Map<BacklogQuotaType, PolicyHierarchyValue<BacklogQuota>> backLogQuotaMap;
final PolicyHierarchyValue<Integer> topicMaxMessageSize;

public HierarchyTopicPolicies() {
deduplicationEnabled = new PolicyHierarchyValue<>();
inactiveTopicPolicies = new PolicyHierarchyValue<>();
maxSubscriptionsPerTopic = new PolicyHierarchyValue<>();
maxProducersPerTopic = new PolicyHierarchyValue<>();
backLogQuotaMap = new ImmutableMap.Builder<BacklogQuotaType, PolicyHierarchyValue<BacklogQuota>>()
.put(BacklogQuotaType.destination_storage, new PolicyHierarchyValue<>())
.put(BacklogQuotaType.message_age, new PolicyHierarchyValue<>())
Expand Down

0 comments on commit de51284

Please sign in to comment.