Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Broker] Direct memory leak when using replicated subscriptions and Key_Shared consumers #11383

Closed
lhotari opened this issue Jul 20, 2021 · 0 comments · Fixed by #11396
Closed
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@lhotari
Copy link
Member

lhotari commented Jul 20, 2021

Describe the bug

The broker leaks direct memory when using replicated subscriptions and Key_Shared consumers. The Pulsar version 2.7.2 .

Some Netty leak detector logs were captured passing these JVM options to the broker in broker.configData.PULSAR_EXTRA_OPTS :

      -Dpulsar.allocator.leak_detection=Advanced
      -Dio.netty.leakDetectionLevel=advanced
      -Dio.netty.leakDetection.targetRecords=30
Full leak detector log entry (click to expand)
19:20:10.644 [pulsar-io-23-4] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
#1:
	io.netty.buffer.AbstractPooledDerivedByteBuf.deallocate(AbstractPooledDerivedByteBuf.java:86)
	io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
	io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
	org.apache.pulsar.common.protocol.ByteBufPair.deallocate(ByteBufPair.java:99)
	io.netty.util.AbstractReferenceCounted.handleRelease(AbstractReferenceCounted.java:86)
	io.netty.util.AbstractReferenceCounted.release(AbstractReferenceCounted.java:76)
	io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
	io.netty.util.ReferenceCountUtil.safeRelease(ReferenceCountUtil.java:113)
	org.apache.pulsar.client.impl.ProducerImpl.ackReceived(ProducerImpl.java:988)
	org.apache.pulsar.client.impl.ClientCnx.handleSendReceipt(ClientCnx.java:394)
	org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:249)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1534)
	io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1295)
	io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1332)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#2:
	io.netty.buffer.AbstractPooledDerivedByteBuf.deallocate(AbstractPooledDerivedByteBuf.java:86)
	io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
	io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
	org.apache.bookkeeper.mledger.impl.EntryImpl.deallocate(EntryImpl.java:163)
	org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted.release0(AbstractCASReferenceCounted.java:104)
	org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted.release(AbstractCASReferenceCounted.java:87)
	org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:155)
	org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:103)
	org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.java:215)
	org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:496)
	org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:150)
	org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
	org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#3:
	io.netty.buffer.AdvancedLeakAwareByteBuf.retainedDuplicate(AdvancedLeakAwareByteBuf.java:100)
	org.apache.bookkeeper.mledger.impl.EntryImpl.create(EntryImpl.java:97)
	org.apache.bookkeeper.mledger.impl.EntryCacheImpl.asyncReadEntry0(EntryCacheImpl.java:280)
	org.apache.bookkeeper.mledger.impl.EntryCacheImpl.asyncReadEntry(EntryCacheImpl.java:249)
	org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntry(ManagedLedgerImpl.java:1757)
	org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalReadFromLedger(ManagedLedgerImpl.java:1729)
	org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntries(ManagedLedgerImpl.java:1552)
	org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.notifyEntriesAvailable(ManagedCursorImpl.java:2624)
	org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
	org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#4:
	io.netty.buffer.AbstractPooledDerivedByteBuf.deallocate(AbstractPooledDerivedByteBuf.java:86)
	io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
	io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
	org.apache.bookkeeper.util.ByteBufList.deallocate(ByteBufList.java:262)
	io.netty.util.AbstractReferenceCounted.handleRelease(AbstractReferenceCounted.java:86)
	io.netty.util.AbstractReferenceCounted.release(AbstractReferenceCounted.java:76)
	io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
	org.apache.bookkeeper.client.PendingAddOp.maybeRecycle(PendingAddOp.java:493)
	org.apache.bookkeeper.client.PendingAddOp.submitCallback(PendingAddOp.java:434)
	org.apache.bookkeeper.client.LedgerHandle.sendAddSuccessCallbacks(LedgerHandle.java:1832)
	org.apache.bookkeeper.client.PendingAddOp.sendAddSuccessCallbacks(PendingAddOp.java:415)
	org.apache.bookkeeper.client.PendingAddOp.writeComplete(PendingAddOp.java:409)
	org.apache.bookkeeper.proto.PerChannelBookieClient$AddCompletion.writeComplete(PerChannelBookieClient.java:2149)
	org.apache.bookkeeper.proto.PerChannelBookieClient$AddCompletion.handleResponse(PerChannelBookieClient.java:2206)
	org.apache.bookkeeper.proto.PerChannelBookieClient$AddCompletion.handleV2Response(PerChannelBookieClient.java:2185)
	org.apache.bookkeeper.proto.PerChannelBookieClient$ReadV2ResponseCallback.safeRun(PerChannelBookieClient.java:1380)
	org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#5:
	org.apache.bookkeeper.mledger.impl.OpAddEntry.safeRun(OpAddEntry.java:179)
	org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#6:
	org.apache.bookkeeper.mledger.impl.EntryImpl.deallocate(EntryImpl.java:163)
	org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted.release0(AbstractCASReferenceCounted.java:104)
	org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted.release(AbstractCASReferenceCounted.java:87)
	org.apache.bookkeeper.mledger.impl.OpAddEntry.safeRun(OpAddEntry.java:175)
	org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#7:
	org.apache.bookkeeper.mledger.impl.EntryCacheImpl.insert(EntryCacheImpl.java:113)
	org.apache.bookkeeper.mledger.impl.OpAddEntry.safeRun(OpAddEntry.java:174)
	org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#8:
	org.apache.bookkeeper.mledger.impl.EntryImpl.create(EntryImpl.java:87)
	org.apache.bookkeeper.mledger.impl.EntryCacheImpl.insert(EntryCacheImpl.java:112)
	org.apache.bookkeeper.mledger.impl.OpAddEntry.safeRun(OpAddEntry.java:174)
	org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#9:
	org.apache.bookkeeper.mledger.impl.EntryCacheImpl.insert(EntryCacheImpl.java:108)
	org.apache.bookkeeper.mledger.impl.OpAddEntry.safeRun(OpAddEntry.java:174)
	org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#10:
	org.apache.bookkeeper.mledger.impl.EntryImpl.create(EntryImpl.java:76)
	org.apache.bookkeeper.mledger.impl.OpAddEntry.safeRun(OpAddEntry.java:171)
	org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#11:
	io.netty.buffer.AdvancedLeakAwareByteBuf.retainedDuplicate(AdvancedLeakAwareByteBuf.java:100)
	org.apache.bookkeeper.mledger.impl.OpAddEntry.initiate(OpAddEntry.java:106)
	org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalAsyncAddEntry(ManagedLedgerImpl.java:700)
	org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$asyncAddEntry$3(ManagedLedgerImpl.java:633)
	org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
	org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#12:
	org.apache.bookkeeper.mledger.impl.OpAddEntry.create(OpAddEntry.java:80)
	org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncAddEntry(ManagedLedgerImpl.java:630)
	org.apache.pulsar.broker.service.persistent.PersistentTopic.publishMessage(PersistentTopic.java:360)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.writeMarker(ReplicatedSubscriptionsController.java:231)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:86)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#13:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeBytes(AdvancedLeakAwareByteBuf.java:592)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1968)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#14:
	io.netty.buffer.AdvancedLeakAwareByteBuf.setInt(AdvancedLeakAwareByteBuf.java:304)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1965)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#15:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawByte(ByteBufCodedOutputStream.java:85)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawVarint32(ByteBufCodedOutputStream.java:94)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeInt32NoTag(ByteBufCodedOutputStream.java:220)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeInt32(ByteBufCodedOutputStream.java:111)
	org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata.writeTo(PulsarApi.java:4393)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1951)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#16:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawByte(ByteBufCodedOutputStream.java:85)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawVarint32(ByteBufCodedOutputStream.java:94)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeTag(ByteBufCodedOutputStream.java:105)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeInt32(ByteBufCodedOutputStream.java:110)
	org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata.writeTo(PulsarApi.java:4393)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1951)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#17:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawByte(ByteBufCodedOutputStream.java:85)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawVarint32(ByteBufCodedOutputStream.java:97)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeTag(ByteBufCodedOutputStream.java:105)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeInt32(ByteBufCodedOutputStream.java:110)
	org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata.writeTo(PulsarApi.java:4393)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1951)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#18:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawByte(ByteBufCodedOutputStream.java:85)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawVarint64(ByteBufCodedOutputStream.java:149)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeUInt64NoTag(ByteBufCodedOutputStream.java:142)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeUInt64(ByteBufCodedOutputStream.java:121)
	org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata.writeTo(PulsarApi.java:4345)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1951)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#19:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawByte(ByteBufCodedOutputStream.java:85)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawVarint64(ByteBufCodedOutputStream.java:152)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeUInt64NoTag(ByteBufCodedOutputStream.java:142)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeUInt64(ByteBufCodedOutputStream.java:121)
	org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata.writeTo(PulsarApi.java:4345)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1951)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#20:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawByte(ByteBufCodedOutputStream.java:85)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawVarint32(ByteBufCodedOutputStream.java:94)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeTag(ByteBufCodedOutputStream.java:105)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeUInt64(ByteBufCodedOutputStream.java:120)
	org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata.writeTo(PulsarApi.java:4345)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1951)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#21:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawByte(ByteBufCodedOutputStream.java:85)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawVarint64(ByteBufCodedOutputStream.java:149)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeUInt64NoTag(ByteBufCodedOutputStream.java:142)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeUInt64(ByteBufCodedOutputStream.java:121)
	org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata.writeTo(PulsarApi.java:4342)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1951)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#22:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawByte(ByteBufCodedOutputStream.java:85)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawVarint32(ByteBufCodedOutputStream.java:94)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeTag(ByteBufCodedOutputStream.java:105)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeUInt64(ByteBufCodedOutputStream.java:120)
	org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata.writeTo(PulsarApi.java:4342)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1951)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#23:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeBytes(AdvancedLeakAwareByteBuf.java:616)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawBytes(ByteBufCodedOutputStream.java:182)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeBytesNoTag(ByteBufCodedOutputStream.java:167)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeBytes(ByteBufCodedOutputStream.java:161)
	org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata.writeTo(PulsarApi.java:4339)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1951)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#24:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawByte(ByteBufCodedOutputStream.java:85)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawVarint32(ByteBufCodedOutputStream.java:94)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeBytesNoTag(ByteBufCodedOutputStream.java:166)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeBytes(ByteBufCodedOutputStream.java:161)
	org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata.writeTo(PulsarApi.java:4339)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1951)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#25:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:544)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawByte(ByteBufCodedOutputStream.java:85)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeRawVarint32(ByteBufCodedOutputStream.java:94)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeTag(ByteBufCodedOutputStream.java:105)
	org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.writeBytes(ByteBufCodedOutputStream.java:160)
	org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata.writeTo(PulsarApi.java:4339)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1951)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#26:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeInt(AdvancedLeakAwareByteBuf.java:562)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1950)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
#27:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeShort(AdvancedLeakAwareByteBuf.java:550)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1943)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
Created at:
	io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:385)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
	org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:164)
	org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.buffer(ByteBufAllocatorImpl.java:135)
	org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload(Commands.java:1937)
	org.apache.pulsar.common.protocol.Markers.newMessage(Markers.java:57)
	org.apache.pulsar.common.protocol.Markers.newReplicatedSubscriptionsSnapshotRequest(Markers.java:90)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder.start(ReplicatedSubscriptionsSnapshotBuilder.java:87)
	org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController.startNewSnapshot(ReplicatedSubscriptionsController.java:206)
	java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)
: 4 leak records were discarded because they were duplicates
: 2 leak records were discarded because the leak record count is targeted to 30. Use system property io.netty.leakDetection.targetRecords to increase the limit.

Expected behavior

Netty buffers should be released so that direct memory doesn't leak.

@lhotari lhotari added the type/bug The PR fixed a bug or issue reported a bug label Jul 20, 2021
@lhotari lhotari changed the title [Broker] Direct memory leak when using replicated subscriptions [Broker] Direct memory leak when using replicated subscriptions and Key_Shared consumers Jul 20, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant