Skip to content

Commit

Permalink
[improve] [broker] Improve CPU resources usege of TopicName Cache (ap…
Browse files Browse the repository at this point in the history
…ache#23052)

Co-authored-by: Zixuan Liu <nodeces@gmail.com>
(cherry picked from commit 81aed6c)
  • Loading branch information
poorbarcode committed Jul 22, 2024
1 parent 3f7206c commit 5a83958
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 17 deletions.
8 changes: 8 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ skipBrokerShutdownOnOOM=false
# Factory class-name to create topic with custom workflow
topicFactoryClassName=

# Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache
# per "maxSecondsToClearTopicNameCache", it does not mean broker will not cache TopicName.
topicNameCacheMaxCapacity=100000

# A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache frequently when
# there are too many topics are in use.
maxSecondsToClearTopicNameCache=7200

# Enable backlog quota check. Enforces action on topic when the quota is reached
backlogQuotaCheckEnabled=true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,21 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
)
private boolean backlogQuotaCheckEnabled = true;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache"
+ " per maxSecondsToClearTopicNameCache, it does not mean broker will not cache TopicName."
)
private int topicNameCacheMaxCapacity = 100_000;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache"
+ " frequently when there are too many topics are in use."
)
private int maxSecondsToClearTopicNameCache = 3600 * 2;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Whether to enable precise time based backlog quota check. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,16 @@ public void start() throws Exception {
this.updateBrokerDispatchThrottlingMaxRate();
this.startCheckReplicationPolicies();
this.startDeduplicationSnapshotMonitor();
this.startClearInvalidateTopicNameCacheTask();
}

protected void startClearInvalidateTopicNameCacheTask() {
final int maxSecondsToClearTopicNameCache = pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache();
inactivityMonitor.scheduleAtFixedRate(
() -> TopicName.clearIfReachedMaxCapacity(pulsar.getConfiguration().getTopicNameCacheMaxCapacity()),
maxSecondsToClearTopicNameCache,
maxSecondsToClearTopicNameCache,
TimeUnit.SECONDS);
}

protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpdateFrequencyInSecs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ protected void doInitConf() throws Exception {
super.doInitConf();
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
conf.setTopicNameCacheMaxCapacity(5000);
conf.setMaxSecondsToClearTopicNameCache(5);
if (useStaticPorts) {
conf.setBrokerServicePortTls(Optional.of(6651));
conf.setBrokerServicePort(Optional.of(6660));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,7 @@ public void testAdvertised() throws Exception {
assertNull(standalone.getConfig().getAdvertisedAddress());
assertEquals(standalone.getConfig().getAdvertisedListeners(),
"internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651");
assertEquals(standalone.getConfig().getMaxSecondsToClearTopicNameCache(), 1);
assertEquals(standalone.getConfig().getTopicNameCacheMaxCapacity(), 200);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public void testInit() throws Exception {
assertEquals(config.getManagedLedgerDataReadPriority(), "bookkeeper-first");
assertEquals(config.getBacklogQuotaDefaultLimitGB(), 0.05);
assertEquals(config.getHttpMaxRequestHeaderSize(), 1234);
assertEquals(config.getMaxSecondsToClearTopicNameCache(), 1);
assertEquals(config.getTopicNameCacheMaxCapacity(), 200);
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(config.getProperties());
assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(), "bookkeeper-first");
}
Expand Down Expand Up @@ -370,4 +372,15 @@ public void testAllowAutoTopicCreationType() throws Exception {
conf = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class);
assertEquals(conf.getAllowAutoTopicCreationType(), TopicType.NON_PARTITIONED);
}

@Test
public void testTopicNameCacheConfiguration() throws Exception {
ServiceConfiguration conf;
final Properties properties = new Properties();
properties.setProperty("maxSecondsToClearTopicNameCache", "2");
properties.setProperty("topicNameCacheMaxCapacity", "100");
conf = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class);
assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 2);
assertEquals(conf.getTopicNameCacheMaxCapacity(), 100);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,5 @@ transactionPendingAckBatchedWriteEnabled=true
transactionPendingAckBatchedWriteMaxRecords=44
transactionPendingAckBatchedWriteMaxSize=55
transactionPendingAckBatchedWriteMaxDelayInMillis=66
topicNameCacheMaxCapacity=200
maxSecondsToClearTopicNameCache=1
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,5 @@ brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up
supportedNamespaceBundleSplitAlgorithms=[range_equally_divide]
defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
maxMessagePublishBufferSizeInMB=-1
topicNameCacheMaxCapacity=200
maxSecondsToClearTopicNameCache=1
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,11 @@
package org.apache.pulsar.common.naming;

import com.google.common.base.Splitter;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.UncheckedExecutionException;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.util.Codec;

Expand All @@ -54,13 +49,17 @@ public class TopicName implements ServiceUnitId {

private final int partitionIndex;

private static final LoadingCache<String, TopicName> cache = CacheBuilder.newBuilder().maximumSize(100000)
.expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<String, TopicName>() {
@Override
public TopicName load(String name) throws Exception {
return new TopicName(name);
}
});
private static final ConcurrentHashMap<String, TopicName> cache = new ConcurrentHashMap<>();

public static void clearIfReachedMaxCapacity(int maxCapacity) {
if (maxCapacity < 0) {
// Unlimited cache.
return;
}
if (cache.size() > maxCapacity) {
cache.clear();
}
}

public static TopicName get(String domain, NamespaceName namespaceName, String topic) {
String name = domain + "://" + namespaceName.toString() + '/' + topic;
Expand All @@ -79,11 +78,11 @@ public static TopicName get(String domain, String tenant, String cluster, String
}

public static TopicName get(String topic) {
try {
return cache.get(topic);
} catch (ExecutionException | UncheckedExecutionException e) {
throw (RuntimeException) e.getCause();
TopicName tp = cache.get(topic);
if (tp != null) {
return tp;
}
return cache.computeIfAbsent(topic, k -> new TopicName(k));
}

public static TopicName getPartitionedTopicName(String topic) {
Expand Down

0 comments on commit 5a83958

Please sign in to comment.