Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a bug Session#resendInflightNotAcked() fails due to handling a freed ByteBuf #464

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

komamitsu
Copy link
Contributor

I ran into this exception when Moquette was running with QoS1 under heavy load.

07:42:33.467 [nioEventLoopGroup-3-1] ERROR i.m.broker.NewNettyMQTTHandler - Unexpected exception while processing MQTT message. Closing Netty channel. CId=internal-client
io.netty.util.IllegalReferenceCountException: refCnt: 0, increment: 1
        at io.netty.buffer.AbstractReferenceCountedByteBuf.retain0(AbstractReferenceCountedByteBuf.java:67)
        at io.netty.buffer.AbstractReferenceCountedByteBuf.retain(AbstractReferenceCountedByteBuf.java:54)
        at io.netty.buffer.AbstractPooledDerivedByteBuf.init(AbstractPooledDerivedByteBuf.java:59)
        at io.netty.buffer.PooledDuplicatedByteBuf.newInstance(PooledDuplicatedByteBuf.java:43)
        at io.netty.buffer.PooledDuplicatedByteBuf.retainedDuplicate(PooledDuplicatedByteBuf.java:102)
        at io.moquette.broker.Session.resendInflightNotAcked(Session.java:297)
        at io.moquette.broker.MQTTConnection.resendNotAckedPublishes(MQTTConnection.java:481)
        at io.moquette.broker.NewNettyMQTTHandler.userEventTriggered(NewNettyMQTTHandler.java:106)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
        at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
        at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
        at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
        at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
        at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
        at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
        at io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:353)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
        at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
        at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
        at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
        at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
        at io.moquette.broker.MoquetteIdleTimeoutHandler.userEventTriggered(MoquetteIdleTimeoutHandler.java:48)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
        at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
        at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
        at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
        at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
        at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
        at io.moquette.broker.InflightResender.resendNotAcked(InflightResender.java:160)
        at io.moquette.broker.InflightResender.access$100(InflightResender.java:32)
        at io.moquette.broker.InflightResender$WriterIdleTimeoutTask.run(InflightResender.java:58)

It looks like Session#resendInflightNotAcked() was triggered, but it dealt with a ByteBuf already freed. After this exception, the channel didn't work at all and I think this is a critical bug.

I reproduced this bug as ServerLowlevelMessagesIntegrationTests#testResendNotAckedPublishes and I tried fixing it somehow by retaining the duplicated buffer. It works so far.

@andsel
Copy link
Collaborator

andsel commented Apr 2, 2019

I have to reason about this more carefully, because this could potentially open to memory leakeage

@komamitsu
Copy link
Contributor Author

Yeah. If you have any concern of memory leak, feel free to give me feedbacks. Thanks.

IMO, this issue occurs 100% when a subscriber takes time and potential memory leak may be better than it.

I think you can easily reproduce this issue by running testResendNotAckedPublishes() introduced by this pull request with commenting out the fix.

@githawks
Copy link

hey brother, me too

@githawks
Copy link

just four sensor, the problem

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants