Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [broker] Improve CPU resources usege of TopicName Cache #23052

Merged
merged 4 commits into from
Jul 22, 2024

Conversation

poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Jul 18, 2024

Motivation

LoadableCache.get cost too many CPU resources

broker_cpu.html.txt

pulsar-io-4-20" #135 prio=5 os_prio=0 cpu=79326083.91ms elapsed=1465985.25s tid=0x00007f5bc40740b0 nid=0xe9 runnable  [0x00007f5ba51fd000]
   java.lang.Thread.State: RUNNABLE
	at java.util.concurrent.ConcurrentLinkedQueue.offer(java.base@17.0.10/ConcurrentLinkedQueue.java:380)
	at java.util.concurrent.ConcurrentLinkedQueue.add(java.base@17.0.10/ConcurrentLinkedQueue.java:283)
	at com.google.common.cache.LocalCache$Segment.recordRead(LocalCache.java:2546)
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2068)
	at com.google.common.cache.LocalCache.get(LocalCache.java:4012)
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5011)
	at org.apache.pulsar.common.naming.TopicName.get(TopicName.java:81)
	at org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic(SystemTopicNames.java:62)
	at org.apache.pulsar.common.naming.SystemTopicNames.isSystemTopic(SystemTopicNames.java:86)
	at org.apache.pulsar.common.topics.TopicList.lambda$filterSystemTopic$1(TopicList.java:65)
	at org.apache.pulsar.common.topics.TopicList$$Lambda$2132/0x00007f5d049a4800.test(Unknown Source)
	at java.util.stream.ReferencePipeline$2$1.accept(java.base@17.0.10/ReferencePipeline.java:178)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(java.base@17.0.10/ArrayList.java:1625)
	at java.util.stream.AbstractPipeline.copyInto(java.base@17.0.10/AbstractPipeline.java:509)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(java.base@17.0.10/AbstractPipeline.java:499)
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(java.base@17.0.10/ReduceOps.java:921)
	at java.util.stream.AbstractPipeline.evaluate(java.base@17.0.10/AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.collect(java.base@17.0.10/ReferencePipeline.java:682)
	at org.apache.pulsar.common.topics.TopicList.filterSystemTopic(TopicList.java:66)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleGetTopicsOfNamespace$47(ServerCnx.java:2126)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$2129/0x00007f5d049b1c80.accept(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniAcceptNow(java.base@17.0.10/CompletableFuture.java:757)
	at java.util.concurrent.CompletableFuture.uniAcceptStage(java.base@17.0.10/CompletableFuture.java:735)
	at java.util.concurrent.CompletableFuture.thenAccept(java.base@17.0.10/CompletableFuture.java:2182)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleGetTopicsOfNamespace$49(ServerCnx.java:2123)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$2128/0x00007f5d049b1a38.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.10/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.10/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.10/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleGetTopicsOfNamespace(ServerCnx.java:2120)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:315)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:202)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:164)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:454)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.10/Thread.java:840)

Modifications

Because there is no need for a strict invalidate time, change the implementation to ConcurrentHashMap.
after_improvement.html

Performance review

Env

  • Server: master
    • topic count: 11000 topics. 110 partitioned topic, each partitioned topic contains 100 partitions
      • topic name public/default/{uuid}
    • enableBrokerSideSubscriptionPatternEvaluation : false
  • Client: 2.11.4
    • 1 client with 220 pattern consumers
      • without any change, 220 consumers can put pressure make the broker’s CPU usage to keep 90%
    • initialize 16 timer task for 220 consumers ( in default, there is only a single timer for each client )
    • change the time-unit of patternAutoDiscoveryPeriod to seconds , and set it to 1
    • connectionsPerBroker: 100

Test cases:

Screenshot 2024-07-19 at 12 28 07

WechatIMG155

1
2
3

WechatIMG156

WechatIMG157

perf_result.zip

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@poorbarcode
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

@codecov-commenter
Copy link

codecov-commenter commented Jul 18, 2024

Codecov Report

Attention: Patch coverage is 89.47368% with 2 lines in your changes missing coverage. Please review.

Project coverage is 73.45%. Comparing base (bbc6224) to head (9fe8dad).
Report is 460 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #23052      +/-   ##
============================================
- Coverage     73.57%   73.45%   -0.12%     
- Complexity    32624    33503     +879     
============================================
  Files          1877     1915      +38     
  Lines        139502   143956    +4454     
  Branches      15299    15727     +428     
============================================
+ Hits         102638   105743    +3105     
- Misses        28908    30113    +1205     
- Partials       7956     8100     +144     
Flag Coverage Δ
inttests 27.74% <68.42%> (+3.15%) ⬆️
systests 24.74% <68.42%> (+0.41%) ⬆️
unittests 72.52% <89.47%> (-0.33%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
...org/apache/pulsar/broker/ServiceConfiguration.java 99.02% <100.00%> (-0.37%) ⬇️
...rg/apache/pulsar/broker/service/BrokerService.java 82.18% <100.00%> (+1.40%) ⬆️
...ava/org/apache/pulsar/common/naming/TopicName.java 94.69% <80.00%> (-1.37%) ⬇️

... and 503 files with indirect coverage changes

conf/broker.conf Outdated Show resolved Hide resolved
conf/broker.conf Outdated Show resolved Hide resolved
@poorbarcode poorbarcode requested review from lhotari and removed request for lhotari July 19, 2024 10:11
@poorbarcode poorbarcode force-pushed the improve/topic_name_cache branch from b96dbbd to e794b67 Compare July 19, 2024 10:49
@BewareMyPower
Copy link
Contributor

Here is the benchmark's source code: https://github.com/BewareMyPower/CacheBenchmark

A typical output:

CacheReadBenchmark.testCaffine             thrpt    5   7432187.879 ±  746705.561  ops/s
CacheReadBenchmark.testGuava               thrpt    5   6962764.989 ±  222299.106  ops/s
CacheReadBenchmark.testMapComputeIfAbsent  thrpt    5  34219723.889 ±  686448.613  ops/s
CacheReadBenchmark.testMapGet              thrpt    5  72643406.351 ± 1312709.392  ops/s

The overhead for expiration seems significant. @ben-manes @lhotari Please help review if there is something wrong with it.

The motivation is that the time cost of parsing a TopicName (with an efficient implementation, the current Pulsar implementation is inefficient) is just similar to getting the cached object from Guava LoadingCache.

@ben-manes
Copy link

ben-manes commented Jul 20, 2024

Thanks! I’ll look later today (it’s late my time). If you are running this on Mac/Windows then reading the current time is a system call, iirc. That’s much slower than a user space call like Linux, so that can artificially skew your results by not being matching the deployment destination. The ticker can be defined if truly necessary to optimize the clock reads.

@poorbarcode
Copy link
Contributor Author

@ben-manes @lhotari

The implementation of Caffeine without expireAfterWrite/expireAfterAccess is com.github.benmanes.caffeine.cache.SSMS, and it depends on ConcurrentHashMapsee pictures below. So it can never be better than ConcurrentHashMap.

get
Screenshot 2024-07-21 at 13 46 30
get -> load
Screenshot 2024-07-21 at 13 47 43

@BewareMyPower

Thanks for your testing, I think we'd better change the code like the one below(BTW, in my test, both implementations are close to each other)

Screenshot 2024-07-21 at 13 52 09

@poorbarcode poorbarcode requested review from ben-manes and lhotari and removed request for lhotari and ben-manes July 21, 2024 05:54
@poorbarcode poorbarcode force-pushed the improve/topic_name_cache branch from 3f19821 to 61a7595 Compare July 21, 2024 17:52
conf/broker.conf Outdated Show resolved Hide resolved
Co-authored-by: Zixuan Liu <nodeces@gmail.com>
@poorbarcode poorbarcode requested a review from nodece July 22, 2024 03:33
@poorbarcode poorbarcode dismissed lhotari’s stale review July 22, 2024 09:37

The concern is not important, we can impove the code in ther future

@poorbarcode
Copy link
Contributor Author

Since @ben-manes and @lhotari did not answer, let me merge it first. We can improve the code continuously in the future.

@poorbarcode poorbarcode merged commit 81aed6c into apache:master Jul 22, 2024
51 checks passed
@poorbarcode poorbarcode deleted the improve/topic_name_cache branch July 22, 2024 09:41
poorbarcode added a commit that referenced this pull request Jul 22, 2024
…3052)

Co-authored-by: Zixuan Liu <nodeces@gmail.com>
(cherry picked from commit 81aed6c)
poorbarcode added a commit that referenced this pull request Jul 22, 2024
…3052)

Co-authored-by: Zixuan Liu <nodeces@gmail.com>
(cherry picked from commit 81aed6c)
poorbarcode added a commit that referenced this pull request Jul 22, 2024
…3052)

Co-authored-by: Zixuan Liu <nodeces@gmail.com>
(cherry picked from commit 81aed6c)
@hangc0276
Copy link
Contributor

Good job! LGTM.

nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 25, 2024
…ache#23052)

Co-authored-by: Zixuan Liu <nodeces@gmail.com>
(cherry picked from commit 81aed6c)
(cherry picked from commit 5a83958)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 26, 2024
…ache#23052)

Co-authored-by: Zixuan Liu <nodeces@gmail.com>
(cherry picked from commit 81aed6c)
(cherry picked from commit 5a83958)
@lhotari lhotari added this to the 4.0.0 milestone Oct 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.