Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Reduce the consumers list sort by priority level #16243

Merged
merged 1 commit into from
Jun 28, 2022

Conversation

codelipenghui
Copy link
Contributor

Motivation

While create many consumers (> 10000), the IO thread run into BLOCK state for long time which will
affect the message publish and subsequent consumer creation.

"pulsar-io-15-24" #195 prio=5 os_prio=31 cpu=15744.67ms elapsed=272.18s tid=0x00007faaa7183400 nid=0x19c03 waiting for monitor entry  [0x0000700019642000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:207)
	- waiting to lock <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
"pulsar-io-15-8" #157 prio=5 os_prio=31 cpu=10573.05ms elapsed=314.91s tid=0x00007faa9bf6e800 nid=0x17507 runnable  [0x00007000171d5000]
   java.lang.Thread.State: RUNNABLE
	at java.util.TimSort.countRunAndMakeAscending(java.base@17.0.3/TimSort.java:360)
	at java.util.TimSort.sort(java.base@17.0.3/TimSort.java:234)
	at java.util.Arrays.sort(java.base@17.0.3/Arrays.java:1307)
	at java.util.concurrent.CopyOnWriteArrayList.sortRange(java.base@17.0.3/CopyOnWriteArrayList.java:896)
	at java.util.concurrent.CopyOnWriteArrayList.sort(java.base@17.0.3/CopyOnWriteArrayList.java:888)
	- locked <0x00001000158237d8> (a java.lang.Object)
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:159)
	- locked <0x0000100015830888> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:287)
	- locked <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)

Profile without this PR:
image

perf_broker_subscribe_0.html.txt

Profile with this PR:
image

perf_broker_subscribe_1.html.txt

Modification

  • Sort the consumer list only if the new consumer with high priority than the last element in the consumer list,
    this can avoid the sort operation for all the consumers without priority level (the client-side always pass 0 if priority level absent).

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

Check the box below or label this PR directly.

Need to update docs?

  • doc-required
    (Your PR needs to update docs and you will update later)

  • doc-not-needed
    (Please explain why)

  • doc
    (Your PR contains doc changes)

  • doc-complete
    (Docs have been already added)

### Motivation

While create many consumers (> 10000), the IO thread run into BLOCK state for long time which will
affect the message publish and subsequent consumer creation.

```
"pulsar-io-15-24" apache#195 prio=5 os_prio=31 cpu=15744.67ms elapsed=272.18s tid=0x00007faaa7183400 nid=0x19c03 waiting for monitor entry  [0x0000700019642000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:207)
	- waiting to lock <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

```
"pulsar-io-15-8" apache#157 prio=5 os_prio=31 cpu=10573.05ms elapsed=314.91s tid=0x00007faa9bf6e800 nid=0x17507 runnable  [0x00007000171d5000]
   java.lang.Thread.State: RUNNABLE
	at java.util.TimSort.countRunAndMakeAscending(java.base@17.0.3/TimSort.java:360)
	at java.util.TimSort.sort(java.base@17.0.3/TimSort.java:234)
	at java.util.Arrays.sort(java.base@17.0.3/Arrays.java:1307)
	at java.util.concurrent.CopyOnWriteArrayList.sortRange(java.base@17.0.3/CopyOnWriteArrayList.java:896)
	at java.util.concurrent.CopyOnWriteArrayList.sort(java.base@17.0.3/CopyOnWriteArrayList.java:888)
	- locked <0x00001000158237d8> (a java.lang.Object)
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:159)
	- locked <0x0000100015830888> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:287)
	- locked <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

### Modification

- Sort the consumer list only if the new consumer with high priority than the last element in the consumer list,
  this can avoid the sort operation for all the consumers without priority level (the client-side always pass 0 if priority level absent).
@codelipenghui codelipenghui self-assigned this Jun 27, 2022
@codelipenghui codelipenghui added this to the 2.11.0 milestone Jun 27, 2022
@codelipenghui codelipenghui added type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages area/broker release/2.10.2 release/2.9.4 labels Jun 27, 2022
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jun 27, 2022
@merlimat merlimat merged commit 291fedc into apache:master Jun 28, 2022
merlimat pushed a commit that referenced this pull request Jun 28, 2022
…16243)

### Motivation

While create many consumers (> 10000), the IO thread run into BLOCK state for long time which will
affect the message publish and subsequent consumer creation.

```
"pulsar-io-15-24" #195 prio=5 os_prio=31 cpu=15744.67ms elapsed=272.18s tid=0x00007faaa7183400 nid=0x19c03 waiting for monitor entry  [0x0000700019642000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:207)
	- waiting to lock <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

```
"pulsar-io-15-8" #157 prio=5 os_prio=31 cpu=10573.05ms elapsed=314.91s tid=0x00007faa9bf6e800 nid=0x17507 runnable  [0x00007000171d5000]
   java.lang.Thread.State: RUNNABLE
	at java.util.TimSort.countRunAndMakeAscending(java.base@17.0.3/TimSort.java:360)
	at java.util.TimSort.sort(java.base@17.0.3/TimSort.java:234)
	at java.util.Arrays.sort(java.base@17.0.3/Arrays.java:1307)
	at java.util.concurrent.CopyOnWriteArrayList.sortRange(java.base@17.0.3/CopyOnWriteArrayList.java:896)
	at java.util.concurrent.CopyOnWriteArrayList.sort(java.base@17.0.3/CopyOnWriteArrayList.java:888)
	- locked <0x00001000158237d8> (a java.lang.Object)
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:159)
	- locked <0x0000100015830888> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:287)
	- locked <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

### Modification

- Sort the consumer list only if the new consumer with high priority than the last element in the consumer list,
  this can avoid the sort operation for all the consumers without priority level (the client-side always pass 0 if priority level absent).
@hangc0276
Copy link
Contributor

Good job!

@codelipenghui codelipenghui deleted the penghui/reduce-consumers-sort branch June 28, 2022 01:45
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Jun 28, 2022
…pache#16243)

### Motivation

While create many consumers (> 10000), the IO thread run into BLOCK state for long time which will
affect the message publish and subsequent consumer creation.

```
"pulsar-io-15-24" #195 prio=5 os_prio=31 cpu=15744.67ms elapsed=272.18s tid=0x00007faaa7183400 nid=0x19c03 waiting for monitor entry  [0x0000700019642000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:207)
	- waiting to lock <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

```
"pulsar-io-15-8" #157 prio=5 os_prio=31 cpu=10573.05ms elapsed=314.91s tid=0x00007faa9bf6e800 nid=0x17507 runnable  [0x00007000171d5000]
   java.lang.Thread.State: RUNNABLE
	at java.util.TimSort.countRunAndMakeAscending(java.base@17.0.3/TimSort.java:360)
	at java.util.TimSort.sort(java.base@17.0.3/TimSort.java:234)
	at java.util.Arrays.sort(java.base@17.0.3/Arrays.java:1307)
	at java.util.concurrent.CopyOnWriteArrayList.sortRange(java.base@17.0.3/CopyOnWriteArrayList.java:896)
	at java.util.concurrent.CopyOnWriteArrayList.sort(java.base@17.0.3/CopyOnWriteArrayList.java:888)
	- locked <0x00001000158237d8> (a java.lang.Object)
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:159)
	- locked <0x0000100015830888> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:287)
	- locked <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

### Modification

- Sort the consumer list only if the new consumer with high priority than the last element in the consumer list,
  this can avoid the sort operation for all the consumers without priority level (the client-side always pass 0 if priority level absent).

(cherry picked from commit 4bdaa32)
congbobo184 pushed a commit that referenced this pull request Nov 10, 2022
…16243)

### Motivation

While create many consumers (> 10000), the IO thread run into BLOCK state for long time which will
affect the message publish and subsequent consumer creation.

```
"pulsar-io-15-24" #195 prio=5 os_prio=31 cpu=15744.67ms elapsed=272.18s tid=0x00007faaa7183400 nid=0x19c03 waiting for monitor entry  [0x0000700019642000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:207)
	- waiting to lock <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

```
"pulsar-io-15-8" #157 prio=5 os_prio=31 cpu=10573.05ms elapsed=314.91s tid=0x00007faa9bf6e800 nid=0x17507 runnable  [0x00007000171d5000]
   java.lang.Thread.State: RUNNABLE
	at java.util.TimSort.countRunAndMakeAscending(java.base@17.0.3/TimSort.java:360)
	at java.util.TimSort.sort(java.base@17.0.3/TimSort.java:234)
	at java.util.Arrays.sort(java.base@17.0.3/Arrays.java:1307)
	at java.util.concurrent.CopyOnWriteArrayList.sortRange(java.base@17.0.3/CopyOnWriteArrayList.java:896)
	at java.util.concurrent.CopyOnWriteArrayList.sort(java.base@17.0.3/CopyOnWriteArrayList.java:888)
	- locked <0x00001000158237d8> (a java.lang.Object)
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:159)
	- locked <0x0000100015830888> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:287)
	- locked <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

### Modification

- Sort the consumer list only if the new consumer with high priority than the last element in the consumer list,
  this can avoid the sort operation for all the consumers without priority level (the client-side always pass 0 if priority level absent).

(cherry picked from commit 291fedc)
@congbobo184 congbobo184 added the cherry-picked/branch-2.9 Archived: 2.9 is end of life label Nov 10, 2022
congbobo184 pushed a commit that referenced this pull request Nov 26, 2022
…16243)

### Motivation

While create many consumers (> 10000), the IO thread run into BLOCK state for long time which will
affect the message publish and subsequent consumer creation.

```
"pulsar-io-15-24" #195 prio=5 os_prio=31 cpu=15744.67ms elapsed=272.18s tid=0x00007faaa7183400 nid=0x19c03 waiting for monitor entry  [0x0000700019642000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:207)
	- waiting to lock <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

```
"pulsar-io-15-8" #157 prio=5 os_prio=31 cpu=10573.05ms elapsed=314.91s tid=0x00007faa9bf6e800 nid=0x17507 runnable  [0x00007000171d5000]
   java.lang.Thread.State: RUNNABLE
	at java.util.TimSort.countRunAndMakeAscending(java.base@17.0.3/TimSort.java:360)
	at java.util.TimSort.sort(java.base@17.0.3/TimSort.java:234)
	at java.util.Arrays.sort(java.base@17.0.3/Arrays.java:1307)
	at java.util.concurrent.CopyOnWriteArrayList.sortRange(java.base@17.0.3/CopyOnWriteArrayList.java:896)
	at java.util.concurrent.CopyOnWriteArrayList.sort(java.base@17.0.3/CopyOnWriteArrayList.java:888)
	- locked <0x00001000158237d8> (a java.lang.Object)
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:159)
	- locked <0x0000100015830888> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:287)
	- locked <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

### Modification

- Sort the consumer list only if the new consumer with high priority than the last element in the consumer list,
  this can avoid the sort operation for all the consumers without priority level (the client-side always pass 0 if priority level absent).

(cherry picked from commit 291fedc)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker cherry-picked/branch-2.9 Archived: 2.9 is end of life cherry-picked/branch-2.10 doc-not-needed Your PR changes do not impact docs release/2.9.4 release/2.10.2 type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants