Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Netty reports LEAK while using reactor (Rsocket) with Sleuth #2256

Closed
progys opened this issue Jan 25, 2023 · 5 comments · Fixed by #2262
Closed

Netty reports LEAK while using reactor (Rsocket) with Sleuth #2256

progys opened this issue Jan 25, 2023 · 5 comments · Fixed by #2262
Labels
Milestone

Comments

@progys
Copy link

progys commented Jan 25, 2023

Describe the bug
While using spring cloud sleuth with Rsocket I am getting Netty memory leak errors. If I make sleuth disabled - there are no leak errors reported. Seems the leak source is something along those lines

org.springframework.cloud.sleuth.instrument.rsocket.TracingRequesterRSocketProxy.injectDefaultZipkinRSocketHeaders(TracingRequesterRSocketProxy.java:123)
org.springframework.cloud.sleuth.instrument.rsocket.TracingRequesterRSocketProxy.lambda$setSpan$3(TracingRequesterRSocketProxy.java:103)
       

Similar bug which was closed before: #2102

Versions:

  • spring-cloud-sleuth: 3.1.6
  • spring-boot: 2.6.14
  • rsocket-core: 1.1.3

Details
Here is the more detailed logs from netty:

2023-01-25 16:25:45.463 ERROR [,,] 19692 --- [actor-tcp-nio-2] io.netty.util.ResourceLeakDetector       : LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
#1:
        io.netty.buffer.AdvancedLeakAwareByteBuf.writeLong(AdvancedLeakAwareByteBuf.java:569)
        io.rsocket.metadata.TracingMetadataCodec.encode(TracingMetadataCodec.java:110)
        io.rsocket.metadata.TracingMetadataCodec.encode64(TracingMetadataCodec.java:46)
        org.springframework.cloud.sleuth.instrument.rsocket.TracingRequesterRSocketProxy.injectDefaultZipkinRSocketHeaders(TracingRequesterRSocketProxy.java:123)
        org.springframework.cloud.sleuth.instrument.rsocket.TracingRequesterRSocketProxy.lambda$setSpan$3(TracingRequesterRSocketProxy.java:103)
        reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:47)
        io.rsocket.core.DefaultRSocketClient$FlattingInner.accept(DefaultRSocketClient.java:433)
        io.rsocket.core.DefaultRSocketClient$FlattingInner.accept(DefaultRSocketClient.java:366)
        io.rsocket.core.ResolvingOperator.complete(ResolvingOperator.java:260)
        io.rsocket.core.DefaultRSocketClient.onComplete(DefaultRSocketClient.java:146)
        reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1817)
        reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
        reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
        reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
        reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169)
        reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.request(FluxDoFinally.java:157)
        reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:238)
        reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onSubscribe(FluxDoFinally.java:124)
        reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
        reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
        reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
        reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:165)
        io.rsocket.core.DefaultClientSetup.lambda$null$0(ClientSetup.java:20)
        reactor.core.publisher.MonoCreate$DefaultMonoSink.onRequest(MonoCreate.java:214)
        io.rsocket.core.DefaultClientSetup.lambda$init$1(ClientSetup.java:20)
        reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
        reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
        reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
        reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
        reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
        reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
        reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
        reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:354)
        reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139)
        reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
        reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:238)
        reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178)
        reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152)
        reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:263)
        reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
        reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
        reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
        reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
        reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)
        reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:174)
        reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:165)
        reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:414)
        reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:677)
        reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onStateChange(DefaultPooledConnectionProvider.java:184)
        reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onStateChange(DefaultPooledConnectionProvider.java:440)
        reactor.netty.http.client.WebsocketClientOperations.onInboundNext(WebsocketClientOperations.java:119)
        reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
        io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
        io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
        io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
        io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
        io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
        io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
        io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        java.base/java.lang.Thread.run(Thread.java:829)
#2:
        io.netty.buffer.AdvancedLeakAwareByteBuf.writeLong(AdvancedLeakAwareByteBuf.java:569)
        io.rsocket.metadata.TracingMetadataCodec.encode(TracingMetadataCodec.java:107)
        io.rsocket.metadata.TracingMetadataCodec.encode64(TracingMetadataCodec.java:46)
        org.springframework.cloud.sleuth.instrument.rsocket.TracingRequesterRSocketProxy.injectDefaultZipkinRSocketHeaders(TracingRequesterRSocketProxy.java:123)
        org.springframework.cloud.sleuth.instrument.rsocket.TracingRequesterRSocketProxy.lambda$setSpan$3(TracingRequesterRSocketProxy.java:103)
        reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:47)
        io.rsocket.core.DefaultRSocketClient$FlattingInner.accept(DefaultRSocketClient.java:433)
        io.rsocket.core.DefaultRSocketClient$FlattingInner.accept(DefaultRSocketClient.java:366)
        io.rsocket.core.ResolvingOperator.complete(ResolvingOperator.java:260)
        io.rsocket.core.DefaultRSocketClient.onComplete(DefaultRSocketClient.java:146)
        reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1817)
        reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
        reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
        reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
        reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169)
        reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.request(FluxDoFinally.java:157)
        reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:238)
        reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onSubscribe(FluxDoFinally.java:124)
        reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
        reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
        reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
        reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:165)
        io.rsocket.core.DefaultClientSetup.lambda$null$0(ClientSetup.java:20)
        reactor.core.publisher.MonoCreate$DefaultMonoSink.onRequest(MonoCreate.java:214)
        io.rsocket.core.DefaultClientSetup.lambda$init$1(ClientSetup.java:20)
        reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
        reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
        reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
        reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
        reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
        reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
        reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
        reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:354)
        reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139)
        reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
        reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:238)
        reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178)
        reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152)
        reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:263)
        reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
        reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
        reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
        reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
        reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)
        reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:174)
        reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:165)
        reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:414)
        reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:677)
        reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onStateChange(DefaultPooledConnectionProvider.java:184)
        reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onStateChange(DefaultPooledConnectionProvider.java:440)
        reactor.netty.http.client.WebsocketClientOperations.onInboundNext(WebsocketClientOperations.java:119)
        reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
        io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
        io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
        io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
        io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
        io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
        io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
        io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        java.base/java.lang.Thread.run(Thread.java:829)
#3:
        io.netty.buffer.AdvancedLeakAwareByteBuf.writeByte(AdvancedLeakAwareByteBuf.java:545)
        io.rsocket.metadata.TracingMetadataCodec.encode(TracingMetadataCodec.java:101)
        io.rsocket.metadata.TracingMetadataCodec.encode64(TracingMetadataCodec.java:46)
        org.springframework.cloud.sleuth.instrument.rsocket.TracingRequesterRSocketProxy.injectDefaultZipkinRSocketHeaders(TracingRequesterRSocketProxy.java:123)
        org.springframework.cloud.sleuth.instrument.rsocket.TracingRequesterRSocketProxy.lambda$setSpan$3(TracingRequesterRSocketProxy.java:103)
        reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:47)
        io.rsocket.core.DefaultRSocketClient$FlattingInner.accept(DefaultRSocketClient.java:433)
        io.rsocket.core.DefaultRSocketClient$FlattingInner.accept(DefaultRSocketClient.java:366)
        io.rsocket.core.ResolvingOperator.complete(ResolvingOperator.java:260)
        io.rsocket.core.DefaultRSocketClient.onComplete(DefaultRSocketClient.java:146)
        reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1817)
        reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
        reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
        reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
        reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169)
        reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.request(FluxDoFinally.java:157)
        reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:238)
        reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onSubscribe(FluxDoFinally.java:124)
        reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
        reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
        reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
        reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:165)
        io.rsocket.core.DefaultClientSetup.lambda$null$0(ClientSetup.java:20)
        reactor.core.publisher.MonoCreate$DefaultMonoSink.onRequest(MonoCreate.java:214)
        io.rsocket.core.DefaultClientSetup.lambda$init$1(ClientSetup.java:20)
        reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
        reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
        reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
        reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
        reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
        reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
        reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
        reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:354)
        reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139)
        reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
        reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:238)
        reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178)
        reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152)
        reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:263)
        reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
        reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
        reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
        reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
        reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)
        reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:174)
        reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:165)
        reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:414)
        reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:677)
        reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onStateChange(DefaultPooledConnectionProvider.java:184)
        reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onStateChange(DefaultPooledConnectionProvider.java:440)
        reactor.netty.http.client.WebsocketClientOperations.onInboundNext(WebsocketClientOperations.java:119)
        reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
        io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
        io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
        io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
        io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
        io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
        io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
        io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        java.base/java.lang.Thread.run(Thread.java:829)
Created at:
        io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:401)
        io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
        io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
        io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:116)
        io.rsocket.metadata.TracingMetadataCodec.encode(TracingMetadataCodec.java:72)
        io.rsocket.metadata.TracingMetadataCodec.encode64(TracingMetadataCodec.java:46)
        org.springframework.cloud.sleuth.instrument.rsocket.TracingRequesterRSocketProxy.injectDefaultZipkinRSocketHeaders(TracingRequesterRSocketProxy.java:123)
        org.springframework.cloud.sleuth.instrument.rsocket.TracingRequesterRSocketProxy.lambda$setSpan$3(TracingRequesterRSocketProxy.java:103)
        reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:47)
        io.rsocket.core.DefaultRSocketClient$FlattingInner.accept(DefaultRSocketClient.java:433)
        io.rsocket.core.DefaultRSocketClient$FlattingInner.accept(DefaultRSocketClient.java:366)
        io.rsocket.core.ResolvingOperator.complete(ResolvingOperator.java:260)
        io.rsocket.core.DefaultRSocketClient.onComplete(DefaultRSocketClient.java:146)
        reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1817)
        reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
        reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
        reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
        reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169)
        reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.request(FluxDoFinally.java:157)
        reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:238)
        reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onSubscribe(FluxDoFinally.java:124)
        reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
        reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
        reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
        reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:165)
        io.rsocket.core.DefaultClientSetup.lambda$null$0(ClientSetup.java:20)
        reactor.core.publisher.MonoCreate$DefaultMonoSink.onRequest(MonoCreate.java:214)
        io.rsocket.core.DefaultClientSetup.lambda$init$1(ClientSetup.java:20)
        reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
        reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
        reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
        reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
        reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
        reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
        reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
        reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:354)
        reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139)
        reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
        reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:238)
        reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178)
        reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152)
        reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:263)
        reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
        reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
        reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
        reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
        reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)
        reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:174)
        reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:165)
        reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:414)
        reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:677)
        reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onStateChange(DefaultPooledConnectionProvider.java:184)
        reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onStateChange(DefaultPooledConnectionProvider.java:440)
        reactor.netty.http.client.WebsocketClientOperations.onInboundNext(WebsocketClientOperations.java:119)
        reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
        io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
        io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
        io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
        io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
        io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
        io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
        io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        java.base/java.lang.Thread.run(Thread.java:829)
: 1 leak records were discarded because they were duplicates

Sample
sleuth-memory-leak-main.zip or from github https://github.com/progys/sleuth-memory-leak

To run this app extract it and go to content dir of extracted and execute:
mvnw test

Observe that test ran and finished successfully, but log contains Netty leak reports:
image

Now go to src/main/resources/application.yml and disable sleuth completely by setting config value to false:

spring:
   sleuth:
     enabled: false

Re-run same command mvnw test and observe that log does not contain any Netty leak reports:
image

@marcingrzejszczak
Copy link
Contributor

Hey @OlegDokuka I thought we fixed this. Do you recall this problem?

@msche
Copy link

msche commented Feb 14, 2023

Any update regarding this issue. We encounter the same issue in out prod environment

@OlegDokuka
Copy link
Contributor

OlegDokuka commented Feb 15, 2023

@marcingrzejszczak, looking into


	private void injectDefaultZipkinRSocketHeaders(Payload newPayload, TraceContext traceContext) {
		TracingMetadataCodec.Flags flags = traceContext.sampled() == null ? TracingMetadataCodec.Flags.UNDECIDED
				: traceContext.sampled() ? TracingMetadataCodec.Flags.SAMPLE : TracingMetadataCodec.Flags.NOT_SAMPLE;
		String traceId = traceContext.traceId();
		long[] traceIds = EncodingUtils.fromString(traceId);
		long[] spanId = EncodingUtils.fromString(traceContext.spanId());
		long[] parentSpanId = EncodingUtils.fromString(traceContext.parentId());
		boolean isTraceId128Bit = traceIds.length == 2;
		if (isTraceId128Bit) {
			TracingMetadataCodec.encode128(newPayload.metadata().alloc(), traceIds[0], traceIds[1], spanId[0],
					EncodingUtils.fromString(traceContext.parentId())[0], flags);
		}
		else {
			TracingMetadataCodec.encode64(newPayload.metadata().alloc(), traceIds[0], spanId[0], parentSpanId[0],
					flags);
		}
	}

I see that the allocated bytebuf is unused anywhere. Thus, it is the main reason of the leak.
Now I wonder what we had in mind when we wrote that  🤔 (both encode128 and encode64 returns ByteBuf)

@OlegDokuka
Copy link
Contributor

should be fixed with #2262

@progys
Copy link
Author

progys commented Feb 16, 2023

thanks @OlegDokuka and @marcingrzejszczak for quick fix 💯

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
No open projects
Status: Done
5 participants