Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[BUG] Kafka producer client can not connect to kop (Removing node xxxx:9092 (id: 2057312963 rack: null) from least loaded node selection since it is neither ready for sending or connecting)[BUG] #618

Closed
xiaotongwang1 opened this issue Jul 22, 2021 · 3 comments · Fixed by #620
Labels

Comments

@xiaotongwang1
Copy link

xiaotongwang1 commented Jul 22, 2021

"pulsar-io-26-32" #287 prio=5 os_prio=0 tid=0x00007f10e8767000 nid=0x4ae waiting on condition [0x00007f10c95ba000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b910ae0> (a org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section)
	at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
	at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.remove(ConcurrentLongHashMap.java:340)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.access$300(ConcurrentLongHashMap.java:194)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap.remove(ConcurrentLongHashMap.java:141)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.deleteOneExpiredCursor(KafkaTopicConsumerManager.java:91)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.lambda$deleteExpiredCursor$0(KafkaTopicConsumerManager.java:76)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager$$Lambda$1120/2085128569.accept(Unknown Source)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.forEach(ConcurrentLongHashMap.java:431)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap.forEach(ConcurrentLongHashMap.java:164)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.deleteExpiredCursor(KafkaTopicConsumerManager.java:74)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicManager.lambda$null$0(KafkaTopicManager.java:100)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicManager$$Lambda$1119/1991303689.accept(Unknown Source)
	at java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4707)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicManager.lambda$new$1(KafkaTopicManager.java:98)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicManager$$Lambda$893/144794386.run(Unknown Source)
	at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
	at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:176)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)


"pulsar-io-26-31" #286 prio=5 os_prio=0 tid=0x00007f10e8445000 nid=0x4ad waiting on condition [0x00007f10c96bb000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b910ae0> (a org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section)
	at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
	at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.remove(ConcurrentLongHashMap.java:340)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.access$300(ConcurrentLongHashMap.java:194)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap.remove(ConcurrentLongHashMap.java:141)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.remove(KafkaTopicConsumerManager.java:142)
	at io.streamnative.pulsar.handlers.kop.MessageFetchContext.lambda$null$2(MessageFetchContext.java:146)
	at io.streamnative.pulsar.handlers.kop.MessageFetchContext$$Lambda$945/1999044484.apply(Unknown Source)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1699)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
	at io.streamnative.pulsar.handlers.kop.MessageFetchContext.lambda$handleFetch$4(MessageFetchContext.java:167)
	at io.streamnative.pulsar.handlers.kop.MessageFetchContext$$Lambda$944/1214167246.accept(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
	at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:792)
	at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2153)
	at io.streamnative.pulsar.handlers.kop.MessageFetchContext.handleFetch(MessageFetchContext.java:109)
	at io.streamnative.pulsar.handlers.kop.KafkaRequestHandler.handleFetchRequest(KafkaRequestHandler.java:966)
	at io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder.channelRead(KafkaCommandDecoder.java:203)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

"pulsar-io-26-30" #285 prio=5 os_prio=0 tid=0x00007f10e8443800 nid=0x4ac waiting on condition [0x00007f10c97bd000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b9108a0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:872)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1201)
	at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.close(KafkaTopicConsumerManager.java:244)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicManager.close(KafkaTopicManager.java:336)
	- locked <0x000000052e940220> (a io.streamnative.pulsar.handlers.kop.KafkaTopicManager)
	at io.streamnative.pulsar.handlers.kop.KafkaRequestHandler.close(KafkaRequestHandler.java:212)
	at io.streamnative.pulsar.handlers.kop.KafkaRequestHandler.channelInactive(KafkaRequestHandler.java:202)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:389)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:354)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
	at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
@xiaotongwang1
Copy link
Author

xiaotongwang1 commented Jul 22, 2021

cat txt|grep -A 10 pulsar-io
"pulsar-io-26-32" #287 prio=5 os_prio=0 tid=0x00007f10e8767000 nid=0x4ae waiting on condition [0x00007f10c95ba000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b910ae0> (a org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section)
	at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
	at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.remove(ConcurrentLongHashMap.java:340)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.access$300(ConcurrentLongHashMap.java:194)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap.remove(ConcurrentLongHashMap.java:141)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.deleteOneExpiredCursor(KafkaTopicConsumerManager.java:91)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.lambda$deleteExpiredCursor$0(KafkaTopicConsumerManager.java:76)
--
"pulsar-io-26-31" #286 prio=5 os_prio=0 tid=0x00007f10e8445000 nid=0x4ad waiting on condition [0x00007f10c96bb000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b910ae0> (a org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section)
	at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
	at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.remove(ConcurrentLongHashMap.java:340)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.access$300(ConcurrentLongHashMap.java:194)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap.remove(ConcurrentLongHashMap.java:141)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.remove(KafkaTopicConsumerManager.java:142)
	at io.streamnative.pulsar.handlers.kop.MessageFetchContext.lambda$null$2(MessageFetchContext.java:146)
--
"pulsar-io-26-30" #285 prio=5 os_prio=0 tid=0x00007f10e8443800 nid=0x4ac waiting on condition [0x00007f10c97bd000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b9108a0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:872)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1201)
	at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.close(KafkaTopicConsumerManager.java:244)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicManager.close(KafkaTopicManager.java:336)
--
"pulsar-io-26-29" #284 prio=5 os_prio=0 tid=0x00007f10ec615800 nid=0x4ab waiting on condition [0x00007f10c98bd000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b9108a0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:969)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1285)
	at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.deleteOneExpiredCursor(KafkaTopicConsumerManager.java:85)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.lambda$deleteExpiredCursor$0(KafkaTopicConsumerManager.java:76)
--
"pulsar-io-26-28" #283 prio=5 os_prio=0 tid=0x00007f10ec610800 nid=0x4aa waiting on condition [0x00007f10c99bf000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b9108a0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:872)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1201)
	at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.close(KafkaTopicConsumerManager.java:244)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicManager.close(KafkaTopicManager.java:336)
--
"pulsar-io-26-27" #282 prio=5 os_prio=0 tid=0x00007f10ecad9800 nid=0x4a9 waiting on condition [0x00007f10c9abf000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b9108a0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:969)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1285)
	at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.deleteOneExpiredCursor(KafkaTopicConsumerManager.java:85)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.lambda$deleteExpiredCursor$0(KafkaTopicConsumerManager.java:76)
--
"pulsar-io-26-26" #281 prio=5 os_prio=0 tid=0x00007f10ecad7000 nid=0x4a8 waiting on condition [0x00007f10c9bc1000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b9108a0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:872)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1201)
	at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.close(KafkaTopicConsumerManager.java:244)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicManager.close(KafkaTopicManager.java:336)
--
"pulsar-io-26-25" #280 prio=5 os_prio=0 tid=0x00007f10ecad5000 nid=0x4a7 waiting on condition [0x00007f10c9cc1000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b9108a0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:969)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1285)
	at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.remove(KafkaTopicConsumerManager.java:136)
	at io.streamnative.pulsar.handlers.kop.MessageFetchContext.lambda$null$2(MessageFetchContext.java:146)
--
"pulsar-io-26-24" #279 prio=5 os_prio=0 tid=0x0000560e1d726000 nid=0x4a3 waiting on condition [0x00007f10cbdc4000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b9108a0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:969)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1285)
	at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.deleteOneExpiredCursor(KafkaTopicConsumerManager.java:85)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.lambda$deleteExpiredCursor$0(KafkaTopicConsumerManager.java:76)
--
"pulsar-io-26-23" #278 prio=5 os_prio=0 tid=0x00007f10e855c800 nid=0x4a2 waiting on condition [0x00007f10cbec6000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b9108a0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:872)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1201)
	at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.close(KafkaTopicConsumerManager.java:244)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicManager.close(KafkaTopicManager.java:336)
--
"pulsar-io-26-22" #277 prio=5 os_prio=0 tid=0x00007f10e855a800 nid=0x4a1 waiting on condition [0x00007f10cbfc6000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b9108a0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:969)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1285)
	at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.deleteOneExpiredCursor(KafkaTopicConsumerManager.java:85)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.lambda$deleteExpiredCursor$0(KafkaTopicConsumerManager.java:76)
--
"pulsar-io-26-21" #276 prio=5 os_prio=0 tid=0x00007f10e8559000 nid=0x4a0 waiting on condition [0x00007f10cc0c7000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b9108a0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:872)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1201)
	at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.close(KafkaTopicConsumerManager.java:244)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicManager.close(KafkaTopicManager.java:336)
--
"pulsar-io-26-20" #275 prio=5 os_prio=0 tid=0x00007f10e8557800 nid=0x49f waiting on condition [0x00007f10cc1c8000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b9108a0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:969)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1285)
	at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.deleteOneExpiredCursor(KafkaTopicConsumerManager.java:85)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.lambda$deleteExpiredCursor$0(KafkaTopicConsumerManager.java:76)
--
"pulsar-io-26-19" #274 prio=5 os_prio=0 tid=0x00007f10e8cc8800 nid=0x49e waiting on condition [0x00007f10cc2c9000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b9108a0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:872)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1201)
	at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.close(KafkaTopicConsumerManager.java:244)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicManager.close(KafkaTopicManager.java:336)
--
"pulsar-io-26-18" #273 prio=5 os_prio=0 tid=0x00007f10e8cc7000 nid=0x49d waiting on condition [0x00007f10cc3ca000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b9108a0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:969)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1285)
	at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.deleteOneExpiredCursor(KafkaTopicConsumerManager.java:85)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.lambda$deleteExpiredCursor$0(KafkaTopicConsumerManager.java:76)
--
"pulsar-io-26-17" #272 prio=5 os_prio=0 tid=0x00007f10e803f800 nid=0x49c waiting on condition [0x00007f10cc4cb000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b9108a0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:872)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1201)
	at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.close(KafkaTopicConsumerManager.java:244)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicManager.close(KafkaTopicManager.java:336)
--
"pulsar-io-26-16" #271 prio=5 os_prio=0 tid=0x00007f10e803d800 nid=0x49b waiting on condition [0x00007f10cc5cc000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b910ae0> (a org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section)
	at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
	at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.remove(ConcurrentLongHashMap.java:340)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.access$300(ConcurrentLongHashMap.java:194)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap.remove(ConcurrentLongHashMap.java:141)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.deleteOneExpiredCursor(KafkaTopicConsumerManager.java:91)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.lambda$deleteExpiredCursor$0(KafkaTopicConsumerManager.java:76)
--
"pulsar-io-26-15" #270 prio=5 os_prio=0 tid=0x00007f10ec658000 nid=0x49a waiting on condition [0x00007f10cc6cd000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b9108a0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:872)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1201)
	at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.close(KafkaTopicConsumerManager.java:244)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicManager.close(KafkaTopicManager.java:336)
--
"pulsar-io-26-14" #269 prio=5 os_prio=0 tid=0x00007f10ec082000 nid=0x499 waiting on condition [0x00007f10cc7ce000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b910ae0> (a org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section)
	at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
	at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.remove(ConcurrentLongHashMap.java:340)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.access$300(ConcurrentLongHashMap.java:194)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap.remove(ConcurrentLongHashMap.java:141)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.deleteOneExpiredCursor(KafkaTopicConsumerManager.java:91)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.lambda$deleteExpiredCursor$0(KafkaTopicConsumerManager.java:76)
--
"pulsar-io-26-13" #268 prio=5 os_prio=0 tid=0x00007f10ec081000 nid=0x498 waiting on condition [0x00007f10cc8d0000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b9108a0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:872)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1201)
	at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.close(KafkaTopicConsumerManager.java:244)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicManager.close(KafkaTopicManager.java:336)
--
"pulsar-io-26-12" #267 prio=5 os_prio=0 tid=0x00007f10ecaa2800 nid=0x497 waiting on condition [0x00007f10cc9d0000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b910ae0> (a org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section)
	at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
	at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.remove(ConcurrentLongHashMap.java:340)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.access$300(ConcurrentLongHashMap.java:194)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap.remove(ConcurrentLongHashMap.java:141)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.deleteOneExpiredCursor(KafkaTopicConsumerManager.java:91)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.lambda$deleteExpiredCursor$0(KafkaTopicConsumerManager.java:76)
--
"pulsar-io-26-11" #266 prio=5 os_prio=0 tid=0x0000560e1cc83800 nid=0x496 waiting on condition [0x00007f10ccad1000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b910ae0> (a org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section)
	at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
	at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.remove(ConcurrentLongHashMap.java:340)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.access$300(ConcurrentLongHashMap.java:194)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap.remove(ConcurrentLongHashMap.java:141)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.remove(KafkaTopicConsumerManager.java:142)
	at io.streamnative.pulsar.handlers.kop.MessageFetchContext.lambda$null$2(MessageFetchContext.java:146)
--
"pulsar-io-26-10" #265 prio=5 os_prio=0 tid=0x0000560e1cc81800 nid=0x495 waiting on condition [0x00007f10ccbd2000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b910ae0> (a org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section)
	at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
	at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.remove(ConcurrentLongHashMap.java:340)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.access$300(ConcurrentLongHashMap.java:194)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap.remove(ConcurrentLongHashMap.java:141)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.deleteOneExpiredCursor(KafkaTopicConsumerManager.java:91)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.lambda$deleteExpiredCursor$0(KafkaTopicConsumerManager.java:76)
--
"pulsar-io-26-9" #264 prio=5 os_prio=0 tid=0x0000560e1cc7f800 nid=0x494 waiting on condition [0x00007f10cccd3000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b910ae0> (a org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section)
	at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
	at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.remove(ConcurrentLongHashMap.java:340)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.access$300(ConcurrentLongHashMap.java:194)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap.remove(ConcurrentLongHashMap.java:141)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.remove(KafkaTopicConsumerManager.java:142)
	at io.streamnative.pulsar.handlers.kop.MessageFetchContext.lambda$null$2(MessageFetchContext.java:146)
--
"pulsar-io-26-8" #263 prio=5 os_prio=0 tid=0x0000560e1cc7d000 nid=0x490 waiting on condition [0x00007f10ccdd4000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b910ae0> (a org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section)
	at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
	at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.remove(ConcurrentLongHashMap.java:340)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.access$300(ConcurrentLongHashMap.java:194)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap.remove(ConcurrentLongHashMap.java:141)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.deleteOneExpiredCursor(KafkaTopicConsumerManager.java:91)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.lambda$deleteExpiredCursor$0(KafkaTopicConsumerManager.java:76)
--
"pulsar-io-26-7" #262 prio=5 os_prio=0 tid=0x0000560e1cc25000 nid=0x48e waiting on condition [0x00007f10cced5000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b910ae0> (a org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section)
	at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
	at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.remove(ConcurrentLongHashMap.java:340)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.access$300(ConcurrentLongHashMap.java:194)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap.remove(ConcurrentLongHashMap.java:141)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.remove(KafkaTopicConsumerManager.java:142)
	at io.streamnative.pulsar.handlers.kop.MessageFetchContext.lambda$null$2(MessageFetchContext.java:146)
--
"pulsar-io-26-6" #261 prio=5 os_prio=0 tid=0x0000560e1cc23000 nid=0x48d waiting on condition [0x00007f10ccfd6000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b910ae0> (a org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section)
	at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
	at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.remove(ConcurrentLongHashMap.java:340)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.access$300(ConcurrentLongHashMap.java:194)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap.remove(ConcurrentLongHashMap.java:141)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.deleteOneExpiredCursor(KafkaTopicConsumerManager.java:91)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.lambda$deleteExpiredCursor$0(KafkaTopicConsumerManager.java:76)
--
"pulsar-io-26-5" #260 prio=5 os_prio=0 tid=0x0000560e1cc21000 nid=0x48c waiting on condition [0x00007f10cd0d7000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b910ae0> (a org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section)
	at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
	at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.remove(ConcurrentLongHashMap.java:340)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.access$300(ConcurrentLongHashMap.java:194)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap.remove(ConcurrentLongHashMap.java:141)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.remove(KafkaTopicConsumerManager.java:142)
	at io.streamnative.pulsar.handlers.kop.MessageFetchContext.lambda$null$2(MessageFetchContext.java:146)
--
"pulsar-io-26-4" #259 prio=5 os_prio=0 tid=0x0000560e1cd82800 nid=0x48b waiting on condition [0x00007f10cd1d8000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b910ae0> (a org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section)
	at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
	at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.remove(ConcurrentLongHashMap.java:340)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.access$300(ConcurrentLongHashMap.java:194)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap.remove(ConcurrentLongHashMap.java:141)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.deleteOneExpiredCursor(KafkaTopicConsumerManager.java:91)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.lambda$deleteExpiredCursor$0(KafkaTopicConsumerManager.java:76)
--
"pulsar-io-26-3" #258 prio=5 os_prio=0 tid=0x0000560e1d439800 nid=0x485 waiting on condition [0x00007f10ce2da000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b9108a0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:872)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1201)
	at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.close(KafkaTopicConsumerManager.java:244)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicManager.close(KafkaTopicManager.java:336)
--
"pulsar-io-26-2" #257 prio=5 os_prio=0 tid=0x0000560e1d0b1000 nid=0x484 waiting on condition [0x00007f10cf3dc000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b910ae0> (a org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section)
	at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
	at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.remove(ConcurrentLongHashMap.java:340)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap$Section.access$300(ConcurrentLongHashMap.java:194)
	at org.apache.pulsar.common.util.collections.ConcurrentLongHashMap.remove(ConcurrentLongHashMap.java:141)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.deleteOneExpiredCursor(KafkaTopicConsumerManager.java:91)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.lambda$deleteExpiredCursor$0(KafkaTopicConsumerManager.java:76)
--
"pulsar-io-26-1" #256 prio=5 os_prio=0 tid=0x0000560e1cdbe800 nid=0x47d waiting on condition [0x00007f10d16e0000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000052b9108a0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:872)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1201)
	at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager.close(KafkaTopicConsumerManager.java:244)
	at io.streamnative.pulsar.handlers.kop.KafkaTopicManager.close(KafkaTopicManager.java:336)

@xiaotongwang1
Copy link
Author

KOP Version :2.7.2.4 + patch :https://github.com/streamnative/kop/pull/586/files

@BewareMyPower
Copy link
Collaborator

There're two deadlock cases:

  1. Some threads stuck at ConcurrentLongHashMap.remove.
  2. Some threads stuck at acquiring the read-write lock of KafkaTopicConsumerManager.

But the second deadlock is caused by the first deadlock, for example

public Pair<ManagedCursor, Long> remove(long offset) {
Pair<ManagedCursor, Long> cursor;
// should not return cursor for Fetch to read, since this tcm already in closing state.
rwLock.readLock().lock();
try {
if (closed) {
return null;
}
cursor = consumers.remove(offset);
lastAccessTimes.remove(offset);

Once consumers.remove(offset) was blocked, the current thread would never release the read lock and close() method would stuck at acquiring the write lock, see

For the first deadlock, there're two cases that consumers.remove() is called:

  1. MessageFetchContext finds the cursor for each partition, which calls KafkaTopicConsumerManager#remove.
  2. The periodic task to delete expired cursor in KafkaTopicManager's constructor. Each time a client connection is established, a KafkaTopicManager instance will be created and a task to delete expired cursors will be scheduled at a fix rate of 1 minute.

this.cursorExpireTask = brokerService.executor().scheduleWithFixedDelay(() -> {
long current = System.currentTimeMillis();
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule a check of expired cursor",
requestHandler.ctx.channel());
}
consumerTopicManagers.values().forEach(future -> {
if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
future.join().deleteExpiredCursor(current, expirePeriodMillis);
}
});
}, checkPeriodMillis, checkPeriodMillis, TimeUnit.MILLISECONDS);
}

Let's use thread N to represent pulsar-io-26-N.

Threads that stuck at ConcurrentLongHashMap.remove are:

  • MessageFetchContext: 5, 7, 9, 11, 31
  • cursorExpireTask: 2, 4, 6, 8, 10, 12, 14, 16, 32

Threads that stuck at acquiring read-write lock of KafkaTopicConsumerManager are:

  • Write lock: 1, 3, 13, 15, 17, 19, 21, 23, 26, 28
  • Read lock: 18, 20, 22, 24, 25, 27, 29

There's one deadlock fix in apache/pulsar#9787. Since KoP 2.7.x.y depends on Pulsar 2.7.x, KoP 2.7.x.y doesn't contain the fix. For branch-2.7, we can use ConcurrentHashMap<Long, V> from the Java standard library to replace ConcurrentLongHashMap<V> from pulsar-common here to avoid the deadlock.

For deadlock of KafkaTopicConsumerManager, we should use atomic boolean to replace read write lock.

Further more, we should avoid creating a cursorExpireTask for each KafkaTopicManager because currently consumerTopicManagers is global but KafkaTopicConsumer will be created for each connection.

BewareMyPower added a commit that referenced this issue Jul 23, 2021
Fixes #618 

### Motivation

See #618 (comment) for the deadlock analysis.

### Modifications
- Use `ConcurrentHashMap` instead of `ConcurrentLongHashMap`. Though this bug may already be fixed in apache/pulsar#9787, the `ConcurrentHashMap` from Java standard library is more reliable. The possible performance enhancement brought by `ConcurrentLongHashMap` still needs to be proved.
- Use `AtomicBoolean` as `KafkaTopicConsumerManager`'s state instead of read-write lock to avoid `close()` method that tries to acquire write lock blocking.
- Run a single cursor expire task instead one task per channel, since #404 changed `consumerTopicManagers` to a static field, there's no reason to run a task for each connection.
BewareMyPower added a commit to BewareMyPower/kop that referenced this issue Jul 25, 2021
Fixes streamnative#618 

### Motivation

See streamnative#618 (comment) for the deadlock analysis.

### Modifications
- Use `ConcurrentHashMap` instead of `ConcurrentLongHashMap`. Though this bug may already be fixed in apache/pulsar#9787, the `ConcurrentHashMap` from Java standard library is more reliable. The possible performance enhancement brought by `ConcurrentLongHashMap` still needs to be proved.
- Use `AtomicBoolean` as `KafkaTopicConsumerManager`'s state instead of read-write lock to avoid `close()` method that tries to acquire write lock blocking.
- Run a single cursor expire task instead one task per channel, since streamnative#404 changed `consumerTopicManagers` to a static field, there's no reason to run a task for each connection.
BewareMyPower added a commit that referenced this issue Jul 25, 2021
Fixes #618 

### Motivation

See #618 (comment) for the deadlock analysis.

### Modifications
- Use `ConcurrentHashMap` instead of `ConcurrentLongHashMap`. Though this bug may already be fixed in apache/pulsar#9787, the `ConcurrentHashMap` from Java standard library is more reliable. The possible performance enhancement brought by `ConcurrentLongHashMap` still needs to be proved.
- Use `AtomicBoolean` as `KafkaTopicConsumerManager`'s state instead of read-write lock to avoid `close()` method that tries to acquire write lock blocking.
- Run a single cursor expire task instead one task per channel, since #404 changed `consumerTopicManagers` to a static field, there's no reason to run a task for each connection.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants