From 5a839582ac08eb3be110de9b81f1cc7d245d4018 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 22 Jul 2024 17:40:30 +0800 Subject: [PATCH] [improve] [broker] Improve CPU resources usege of TopicName Cache (#23052) Co-authored-by: Zixuan Liu (cherry picked from commit 81aed6c75eba99fb62172b986b0c59e693e6f4b9) --- conf/broker.conf | 8 +++++ .../pulsar/broker/ServiceConfiguration.java | 15 +++++++++ .../pulsar/broker/service/BrokerService.java | 10 ++++++ .../pulsar/broker/PulsarServiceTest.java | 2 ++ .../pulsar/broker/service/StandaloneTest.java | 2 ++ .../naming/ServiceConfigurationTest.java | 13 ++++++++ .../configurations/pulsar_broker_test.conf | 2 ++ .../pulsar_broker_test_standalone.conf | 2 ++ .../pulsar/common/naming/TopicName.java | 33 +++++++++---------- 9 files changed, 70 insertions(+), 17 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index fd1eecf27d3d0..bc026fea9c9eb 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -158,6 +158,14 @@ skipBrokerShutdownOnOOM=false # Factory class-name to create topic with custom workflow topicFactoryClassName= +# Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache +# per "maxSecondsToClearTopicNameCache", it does not mean broker will not cache TopicName. +topicNameCacheMaxCapacity=100000 + +# A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache frequently when +# there are too many topics are in use. +maxSecondsToClearTopicNameCache=7200 + # Enable backlog quota check. Enforces action on topic when the quota is reached backlogQuotaCheckEnabled=true diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 5087e5b51ac3b..a992170f130d9 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -586,6 +586,21 @@ The delayed message index time step(in seconds) in per bucket snapshot segment, ) private boolean backlogQuotaCheckEnabled = true; + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache" + + " per maxSecondsToClearTopicNameCache, it does not mean broker will not cache TopicName." + ) + private int topicNameCacheMaxCapacity = 100_000; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache" + + " frequently when there are too many topics are in use." + ) + private int maxSecondsToClearTopicNameCache = 3600 * 2; + @FieldContext( category = CATEGORY_POLICIES, doc = "Whether to enable precise time based backlog quota check. " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index cee53e9067863..de0750b695a94 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -582,6 +582,16 @@ public void start() throws Exception { this.updateBrokerDispatchThrottlingMaxRate(); this.startCheckReplicationPolicies(); this.startDeduplicationSnapshotMonitor(); + this.startClearInvalidateTopicNameCacheTask(); + } + + protected void startClearInvalidateTopicNameCacheTask() { + final int maxSecondsToClearTopicNameCache = pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache(); + inactivityMonitor.scheduleAtFixedRate( + () -> TopicName.clearIfReachedMaxCapacity(pulsar.getConfiguration().getTopicNameCacheMaxCapacity()), + maxSecondsToClearTopicNameCache, + maxSecondsToClearTopicNameCache, + TimeUnit.SECONDS); } protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpdateFrequencyInSecs) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java index 3e0887646e119..a515890dd3061 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java @@ -56,6 +56,8 @@ protected void doInitConf() throws Exception { super.doInitConf(); conf.setBrokerServicePortTls(Optional.of(0)); conf.setWebServicePortTls(Optional.of(0)); + conf.setTopicNameCacheMaxCapacity(5000); + conf.setMaxSecondsToClearTopicNameCache(5); if (useStaticPorts) { conf.setBrokerServicePortTls(Optional.of(6651)); conf.setBrokerServicePort(Optional.of(6660)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java index 5307e1a9ee874..67d188efd2550 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java @@ -56,5 +56,7 @@ public void testAdvertised() throws Exception { assertNull(standalone.getConfig().getAdvertisedAddress()); assertEquals(standalone.getConfig().getAdvertisedListeners(), "internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651"); + assertEquals(standalone.getConfig().getMaxSecondsToClearTopicNameCache(), 1); + assertEquals(standalone.getConfig().getTopicNameCacheMaxCapacity(), 200); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index 55971c15adf68..ae13afb19344b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -73,6 +73,8 @@ public void testInit() throws Exception { assertEquals(config.getManagedLedgerDataReadPriority(), "bookkeeper-first"); assertEquals(config.getBacklogQuotaDefaultLimitGB(), 0.05); assertEquals(config.getHttpMaxRequestHeaderSize(), 1234); + assertEquals(config.getMaxSecondsToClearTopicNameCache(), 1); + assertEquals(config.getTopicNameCacheMaxCapacity(), 200); OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(config.getProperties()); assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(), "bookkeeper-first"); } @@ -370,4 +372,15 @@ public void testAllowAutoTopicCreationType() throws Exception { conf = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class); assertEquals(conf.getAllowAutoTopicCreationType(), TopicType.NON_PARTITIONED); } + + @Test + public void testTopicNameCacheConfiguration() throws Exception { + ServiceConfiguration conf; + final Properties properties = new Properties(); + properties.setProperty("maxSecondsToClearTopicNameCache", "2"); + properties.setProperty("topicNameCacheMaxCapacity", "100"); + conf = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class); + assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 2); + assertEquals(conf.getTopicNameCacheMaxCapacity(), 100); + } } diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf index 36f5869d73de6..551a9c88757a4 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf @@ -103,3 +103,5 @@ transactionPendingAckBatchedWriteEnabled=true transactionPendingAckBatchedWriteMaxRecords=44 transactionPendingAckBatchedWriteMaxSize=55 transactionPendingAckBatchedWriteMaxDelayInMillis=66 +topicNameCacheMaxCapacity=200 +maxSecondsToClearTopicNameCache=1 diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf index 0748418be6390..e9aeed1a34da9 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf @@ -94,3 +94,5 @@ brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up supportedNamespaceBundleSplitAlgorithms=[range_equally_divide] defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide maxMessagePublishBufferSizeInMB=-1 +topicNameCacheMaxCapacity=200 +maxSecondsToClearTopicNameCache=1 \ No newline at end of file diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index dd3307b4fa137..e52b21aa35591 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -19,16 +19,11 @@ package org.apache.pulsar.common.naming; import com.google.common.base.Splitter; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.util.concurrent.UncheckedExecutionException; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.List; import java.util.Objects; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.util.Codec; @@ -54,13 +49,17 @@ public class TopicName implements ServiceUnitId { private final int partitionIndex; - private static final LoadingCache cache = CacheBuilder.newBuilder().maximumSize(100000) - .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader() { - @Override - public TopicName load(String name) throws Exception { - return new TopicName(name); - } - }); + private static final ConcurrentHashMap cache = new ConcurrentHashMap<>(); + + public static void clearIfReachedMaxCapacity(int maxCapacity) { + if (maxCapacity < 0) { + // Unlimited cache. + return; + } + if (cache.size() > maxCapacity) { + cache.clear(); + } + } public static TopicName get(String domain, NamespaceName namespaceName, String topic) { String name = domain + "://" + namespaceName.toString() + '/' + topic; @@ -79,11 +78,11 @@ public static TopicName get(String domain, String tenant, String cluster, String } public static TopicName get(String topic) { - try { - return cache.get(topic); - } catch (ExecutionException | UncheckedExecutionException e) { - throw (RuntimeException) e.getCause(); + TopicName tp = cache.get(topic); + if (tp != null) { + return tp; } + return cache.computeIfAbsent(topic, k -> new TopicName(k)); } public static TopicName getPartitionedTopicName(String topic) {