diff --git a/broker/src/main/java/io/moquette/broker/MQTTConnection.java b/broker/src/main/java/io/moquette/broker/MQTTConnection.java index 7cf920ae2..9a289487d 100644 --- a/broker/src/main/java/io/moquette/broker/MQTTConnection.java +++ b/broker/src/main/java/io/moquette/broker/MQTTConnection.java @@ -18,6 +18,7 @@ import io.moquette.broker.subscriptions.Topic; import io.moquette.broker.security.IAuthenticator; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufHolder; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -37,6 +38,7 @@ import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*; import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from; import static io.netty.handler.codec.mqtt.MqttQoS.*; +import io.netty.util.ReferenceCountUtil; final class MQTTConnection { @@ -377,7 +379,6 @@ void processPublish(MqttPublishMessage msg) { case EXACTLY_ONCE: { bindedSession.receivedPublishQos2(messageID, msg); postOffice.receivedPublishQos2(this, msg, username); -// msg.release(); break; } default: @@ -419,11 +420,19 @@ void sendIfWritableElseDrop(MqttMessage msg) { LOG.debug("OUT {}", msg.fixedHeader().messageType()); } if (channel.isWritable()) { + + // Sending to external, retain a duplicate. Just retain is not + // enough, since the receiver must have full control. + Object retainedDup = msg; + if (msg instanceof ByteBufHolder) { + retainedDup = ((ByteBufHolder) msg).retainedDuplicate(); + } + ChannelFuture channelFuture; if (brokerConfig.isImmediateBufferFlush()) { - channelFuture = channel.writeAndFlush(msg); + channelFuture = channel.writeAndFlush(retainedDup); } else { - channelFuture = channel.write(msg); + channelFuture = channel.write(retainedDup); } channelFuture.addListener(FIRE_EXCEPTION_ON_FAILURE); } diff --git a/broker/src/main/java/io/moquette/broker/PostOffice.java b/broker/src/main/java/io/moquette/broker/PostOffice.java index ecc378a35..e8d7b4adc 100644 --- a/broker/src/main/java/io/moquette/broker/PostOffice.java +++ b/broker/src/main/java/io/moquette/broker/PostOffice.java @@ -111,6 +111,8 @@ private void publishRetainedMessagesForSubscriptions(String clientID, List topicMatchingSubscriptions = subscriptions.matchQosSharpening(topic); for (final Subscription sub : topicMatchingSubscriptions) { @@ -213,8 +215,6 @@ private void publish2Subscribers(ByteBuf origPayload, Topic topic, MqttQoS publi if (isSessionPresent) { LOG.debug("Sending PUBLISH message to active subscriber CId: {}, topicFilter: {}, qos: {}", sub.getClientId(), sub.getTopicFilter(), qos); - // we need to retain because duplicate only copy r/w indexes and don't retain() causing refCnt = 0 - ByteBuf payload = origPayload.retainedDuplicate(); targetSession.sendPublishOnSessionAtQos(topic, qos, payload); } else { // If we are, the subscriber disconnected after the subscriptions tree selected that session as a diff --git a/broker/src/main/java/io/moquette/broker/Server.java b/broker/src/main/java/io/moquette/broker/Server.java index 859a67ef7..7af171d6a 100644 --- a/broker/src/main/java/io/moquette/broker/Server.java +++ b/broker/src/main/java/io/moquette/broker/Server.java @@ -295,7 +295,7 @@ private T loadClass(String className, Class intrface, Class constru * Use the broker to publish a message. It's intended for embedding applications. It can be used * only after the integration is correctly started with startServer. * - * @param msg the message to forward. + * @param msg the message to forward. The ByteBuf in the message will be released. * @param clientId the id of the sending integration. * @throws IllegalStateException if the integration is not yet started */ @@ -308,6 +308,7 @@ public void internalPublish(MqttPublishMessage msg, final String clientId) { } LOG.trace("Internal publishing message CId: {}, messageId: {}", clientId, messageID); dispatcher.internalPublish(msg); + msg.payload().release(); } public void stopServer() { diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index 140d7b28a..7e5b36542 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -17,6 +17,8 @@ import static io.moquette.BrokerConstants.FLIGHT_BEFORE_RESEND_MS; import static io.moquette.BrokerConstants.INFLIGHT_WINDOW_SIZE; +import io.moquette.broker.SessionRegistry.EnqueuedMessage; +import io.moquette.broker.SessionRegistry.PublishedMessage; import io.moquette.broker.subscriptions.Subscription; import io.moquette.broker.subscriptions.Topic; import io.netty.buffer.ByteBuf; @@ -183,7 +185,12 @@ boolean isClean() { } public void processPubRec(int packetId) { - inflightWindow.remove(packetId); + // Message discarded, make sure any buffers in it are released + SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(packetId); + if (removed != null) { + removed.release(); + } + inflightSlots.incrementAndGet(); if (canSkipQueue()) { inflightSlots.decrementAndGet(); @@ -200,7 +207,12 @@ public void processPubRec(int packetId) { } public void processPubComp(int messageID) { - inflightWindow.remove(messageID); + // Message discarded, make sure any buffers in it are released + SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(messageID); + if (removed != null) { + removed.release(); + } + inflightSlots.incrementAndGet(); drainQueueToConnection(); @@ -238,8 +250,17 @@ private void sendPublishQos1(Topic topic, MqttQoS qos, ByteBuf payload) { if (canSkipQueue()) { inflightSlots.decrementAndGet(); int packetId = mqttConnection.nextPacketId(); - inflightWindow.put(packetId, new SessionRegistry.PublishedMessage(topic, qos, payload)); + + // Adding to a map, retain. + payload.retain(); + EnqueuedMessage old = inflightWindow.put(packetId, new PublishedMessage(topic, qos, payload)); + // If there already was something, release it. + if (old != null) { + old.release(); + inflightSlots.incrementAndGet(); + } inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS)); + MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(topic.toString(), qos, payload, packetId); mqttConnection.sendPublish(publishMsg); @@ -247,6 +268,8 @@ private void sendPublishQos1(Topic topic, MqttQoS qos, ByteBuf payload) { // TODO drainQueueToConnection();? } else { final SessionRegistry.PublishedMessage msg = new SessionRegistry.PublishedMessage(topic, qos, payload); + // Adding to a queue, retain. + msg.retain(); sessionQueue.add(msg); } } @@ -255,8 +278,17 @@ private void sendPublishQos2(Topic topic, MqttQoS qos, ByteBuf payload) { if (canSkipQueue()) { inflightSlots.decrementAndGet(); int packetId = mqttConnection.nextPacketId(); - inflightWindow.put(packetId, new SessionRegistry.PublishedMessage(topic, qos, payload)); + + // Retain before adding to map + payload.retain(); + EnqueuedMessage old = inflightWindow.put(packetId, new SessionRegistry.PublishedMessage(topic, qos, payload)); + // If there already was something, release it. + if (old != null) { + old.release(); + inflightSlots.incrementAndGet(); + } inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS)); + MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(topic.toString(), qos, payload, packetId); mqttConnection.sendPublish(publishMsg); @@ -264,6 +296,8 @@ private void sendPublishQos2(Topic topic, MqttQoS qos, ByteBuf payload) { drainQueueToConnection(); } else { final SessionRegistry.PublishedMessage msg = new SessionRegistry.PublishedMessage(topic, qos, payload); + // Adding to a queue, retain. + msg.retain(); sessionQueue.add(msg); } } @@ -283,7 +317,11 @@ private boolean inflighHasSlotsAndConnectionIsUp() { void pubAckReceived(int ackPacketId) { // TODO remain to invoke in somehow m_interceptor.notifyMessageAcknowledged - inflightWindow.remove(ackPacketId); + SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(ackPacketId); + if (removed != null) { + removed.release(); + } + inflightSlots.incrementAndGet(); drainQueueToConnection(); } @@ -305,8 +343,7 @@ public void resendInflightNotAcked() { final Topic topic = msg.topic; final MqttQoS qos = msg.publishingQos; final ByteBuf payload = msg.payload; - final ByteBuf copiedPayload = payload.retainedDuplicate(); - MqttPublishMessage publishMsg = publishNotRetainedDuplicated(notAckPacketId, topic, qos, copiedPayload); + MqttPublishMessage publishMsg = publishNotRetainedDuplicated(notAckPacketId, topic, qos, payload); mqttConnection.sendPublish(publishMsg); } } @@ -341,7 +378,13 @@ private void drainQueueToConnection() { } inflightSlots.decrementAndGet(); int sendPacketId = mqttConnection.nextPacketId(); - inflightWindow.put(sendPacketId, msg); + + // Putting it in a map, but the retain is cancelled out by the below release. + EnqueuedMessage old = inflightWindow.put(sendPacketId, msg); + if (old != null) { + old.release(); + inflightSlots.incrementAndGet(); + } if (msg instanceof SessionRegistry.PubRelMarker) { MqttMessage pubRel = MQTTConnection.pubrel(sendPacketId); mqttConnection.sendIfWritableElseDrop(pubRel); @@ -352,6 +395,7 @@ private void drainQueueToConnection() { msgPub.payload, sendPacketId); mqttConnection.sendPublish(publishMsg); } + // we fetched msg from a map, but the release is cancelled out by the above retain } } @@ -374,12 +418,18 @@ void sendRetainedPublishOnSessionAtQos(Topic topic, MqttQoS qos, ByteBuf payload } public void receivedPublishQos2(int messageID, MqttPublishMessage msg) { - qos2Receiving.put(messageID, msg); - msg.retain(); // retain to put in the inflight map + // Retain before putting msg in map. + ReferenceCountUtil.retain(msg); + + MqttPublishMessage old = qos2Receiving.put(messageID, msg); + // In case of evil client with duplicate msgid. + ReferenceCountUtil.release(old); + mqttConnection.sendPublishReceived(messageID); } public void receivedPubRelQos2(int messageID) { + // Done with the message, remove from queue and release payload. final MqttPublishMessage removedMsg = qos2Receiving.remove(messageID); ReferenceCountUtil.release(removedMsg); } diff --git a/broker/src/main/java/io/moquette/broker/SessionRegistry.java b/broker/src/main/java/io/moquette/broker/SessionRegistry.java index 505919d01..8c7af1bfe 100644 --- a/broker/src/main/java/io/moquette/broker/SessionRegistry.java +++ b/broker/src/main/java/io/moquette/broker/SessionRegistry.java @@ -38,6 +38,18 @@ public class SessionRegistry { public abstract static class EnqueuedMessage { + + /** + * Releases any held resources. Must be called when the EnqueuedMessage is no + * longer needed. + */ + public void release() {} + + /** + * Retains any held resources. Must be called when the EnqueuedMessage is added + * to a store. + */ + public void retain() {} } public static class PublishedMessage extends EnqueuedMessage { @@ -63,6 +75,17 @@ public MqttQoS getPublishingQos() { public ByteBuf getPayload() { return payload; } + + @Override + public void release() { + payload.release(); + } + + @Override + public void retain() { + payload.retain(); + } + } public static final class PubRelMarker extends EnqueuedMessage { diff --git a/broker/src/main/java/io/moquette/interception/AbstractInterceptHandler.java b/broker/src/main/java/io/moquette/interception/AbstractInterceptHandler.java index e40798c6a..bd6e11f96 100644 --- a/broker/src/main/java/io/moquette/interception/AbstractInterceptHandler.java +++ b/broker/src/main/java/io/moquette/interception/AbstractInterceptHandler.java @@ -48,6 +48,7 @@ public void onConnectionLost(InterceptConnectionLostMessage msg) { @Override public void onPublish(InterceptPublishMessage msg) { + msg.getPayload().release(); } @Override diff --git a/broker/src/main/java/io/moquette/interception/BrokerInterceptor.java b/broker/src/main/java/io/moquette/interception/BrokerInterceptor.java index baad17502..42bc8495a 100644 --- a/broker/src/main/java/io/moquette/interception/BrokerInterceptor.java +++ b/broker/src/main/java/io/moquette/interception/BrokerInterceptor.java @@ -130,7 +130,8 @@ public void notifyTopicPublished(final MqttPublishMessage msg, final String clie for (InterceptHandler handler : handlers.get(InterceptPublishMessage.class)) { LOG.debug("Notifying MQTT PUBLISH message to interceptor. CId={}, messageId={}, topic={}, " + "interceptorId={}", clientID, messageId, topic, handler.getID()); - handler.onPublish(new InterceptPublishMessage(msg, clientID, username)); + // Sending to the outside, make a retainedDuplicate. + handler.onPublish(new InterceptPublishMessage(msg.retainedDuplicate(), clientID, username)); } } finally { ReferenceCountUtil.release(msg); diff --git a/broker/src/main/java/io/moquette/interception/InterceptHandler.java b/broker/src/main/java/io/moquette/interception/InterceptHandler.java index 10f2328f8..c5dde510e 100644 --- a/broker/src/main/java/io/moquette/interception/InterceptHandler.java +++ b/broker/src/main/java/io/moquette/interception/InterceptHandler.java @@ -52,6 +52,12 @@ public interface InterceptHandler { void onConnectionLost(InterceptConnectionLostMessage msg); + /** + * Called when a message is published. The receiver MUST release the payload of the message, either + * by calling super.onPublish, or by calling msg.getPayload.release() directly. + * + * @param msg The message that was published. + */ void onPublish(InterceptPublishMessage msg); void onSubscribe(InterceptSubscribeMessage msg); diff --git a/broker/src/main/java/io/moquette/persistence/ByteBufDataType.java b/broker/src/main/java/io/moquette/persistence/ByteBufDataType.java index f017b86d6..2a824f055 100644 --- a/broker/src/main/java/io/moquette/persistence/ByteBufDataType.java +++ b/broker/src/main/java/io/moquette/persistence/ByteBufDataType.java @@ -65,7 +65,7 @@ public void write(WriteBuffer buff, Object obj) { final ByteBuf casted = (ByteBuf) obj; final int payloadSize = casted.readableBytes(); byte[] rawBytes = new byte[payloadSize]; - casted.copy().readBytes(rawBytes); + casted.copy().readBytes(rawBytes).release(); buff.putInt(payloadSize); buff.put(rawBytes); } diff --git a/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java b/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java index 3d2f88610..f6721c65f 100644 --- a/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java +++ b/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java @@ -20,6 +20,7 @@ import io.moquette.broker.subscriptions.ISubscriptionsDirectory; import io.moquette.broker.security.IAuthenticator; import io.moquette.persistence.MemorySubscriptionsRepository; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.embedded.EmbeddedChannel; @@ -81,16 +82,18 @@ private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel @Test public void dropConnectionOnPublishWithInvalidTopicFormat() { // Connect message with clean session set to true and client id is null. + final ByteBuf payload = Unpooled.copiedBuffer("Hello MQTT world!".getBytes(UTF_8)); MqttPublishMessage publish = MqttMessageBuilders.publish() .topicName("") .retained(false) .qos(MqttQoS.AT_MOST_ONCE) - .payload(Unpooled.copiedBuffer("Hello MQTT world!".getBytes(UTF_8))).build(); + .payload(payload).build(); sut.processPublish(publish); // Verify assertFalse(channel.isOpen(), "Connection should be closed by the broker"); + payload.release(); } } diff --git a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java index 3234d3724..3f3d91cdc 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java @@ -228,10 +228,13 @@ public void testPublishWithEmptyPayloadClearRetainedStore() { connection.processConnect(connectMessage); ConnectionTestUtils.assertConnectAccepted(channel); + final ByteBuf payload1 = ByteBufUtil.writeAscii(UnpooledByteBufAllocator.DEFAULT, "Hello world!"); this.retainedRepository.retain(new Topic(NEWS_TOPIC), MqttMessageBuilders.publish() - .payload(ByteBufUtil.writeAscii(UnpooledByteBufAllocator.DEFAULT, "Hello world!")) + .payload(payload1) .qos(AT_LEAST_ONCE) .build()); + // Retaining a msg does not release the payload. + payload1.release(); // Exercise final ByteBuf anyPayload = Unpooled.copiedBuffer("Any payload", Charset.defaultCharset()); @@ -241,6 +244,8 @@ public void testPublishWithEmptyPayloadClearRetainedStore() { .qos(MqttQoS.AT_MOST_ONCE) .retained(false) .topicName(NEWS_TOPIC).build()); + // receivedPublishQos0 does not release payload. + anyPayload.release(); // Verify assertTrue(retainedRepository.isEmpty(), "QoS0 MUST clean retained message for topic"); diff --git a/broker/src/test/java/io/moquette/broker/SessionTest.java b/broker/src/test/java/io/moquette/broker/SessionTest.java index fcd6c1495..fd9723273 100644 --- a/broker/src/test/java/io/moquette/broker/SessionTest.java +++ b/broker/src/test/java/io/moquette/broker/SessionTest.java @@ -39,6 +39,13 @@ public void testPubAckDrainMessagesRemainingInQueue() { // Verify assertTrue(queuedMessages.isEmpty(), "Messages should be drained"); + + // release the rest, to avoid leaking buffers + for (int i = 2; i <= 11; i++) { + client.pubAckReceived(i); + } + client.closeImmediately(); + testChannel.close(); } private void sendQoS1To(Session client, Topic destinationTopic, String message) { diff --git a/broker/src/test/java/io/moquette/integration/ServerIntegrationPahoCanPublishOnReadBlockedTopicTest.java b/broker/src/test/java/io/moquette/integration/ServerIntegrationPahoCanPublishOnReadBlockedTopicTest.java index bc9568640..767276773 100644 --- a/broker/src/test/java/io/moquette/integration/ServerIntegrationPahoCanPublishOnReadBlockedTopicTest.java +++ b/broker/src/test/java/io/moquette/integration/ServerIntegrationPahoCanPublishOnReadBlockedTopicTest.java @@ -146,6 +146,8 @@ public void shouldNotInternalPublishOnReadBlockedSubscriptionTopic() throws Exce .payload(Unpooled.copiedBuffer("Hello World!!".getBytes(UTF_8))) .build(); + // We will be sending the same message again, retain the payload. + message.payload().retain(); m_server.internalPublish(message, "INTRLPUB"); Awaitility.await().until(m_messagesCollector::isMessageReceived); diff --git a/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTest.java b/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTest.java index 026dabbc9..1433139ba 100644 --- a/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTest.java +++ b/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTest.java @@ -190,7 +190,7 @@ public void testResendNotAckedPublishes() throws MqttException, InterruptedExcep subscriber.subscribe(topic, 1, (String topic1, org.eclipse.paho.client.mqttv3.MqttMessage message) -> { if (isFirst.getAndSet(false)) { // wait to trigger resending PUBLISH - TimeUnit.SECONDS.sleep(FLIGHT_BEFORE_RESEND_MS * 2); + TimeUnit.MILLISECONDS.sleep(FLIGHT_BEFORE_RESEND_MS * 2); } else { receivedPublish.set(true); } diff --git a/broker/src/test/java/io/moquette/interception/BrokerInterceptorTest.java b/broker/src/test/java/io/moquette/interception/BrokerInterceptorTest.java index 2486c0185..42c52a86d 100644 --- a/broker/src/test/java/io/moquette/interception/BrokerInterceptorTest.java +++ b/broker/src/test/java/io/moquette/interception/BrokerInterceptorTest.java @@ -19,6 +19,7 @@ import io.moquette.interception.messages.*; import io.moquette.broker.subscriptions.Subscription; import io.moquette.broker.subscriptions.Topic; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.handler.codec.mqtt.MqttMessageBuilders; import io.netty.handler.codec.mqtt.MqttQoS; @@ -71,6 +72,7 @@ public void onConnectionLost(InterceptConnectionLostMessage msg) { @Override public void onPublish(InterceptPublishMessage msg) { n.set(60); + msg.getPayload().release(); } @Override @@ -124,13 +126,16 @@ public void testNotifyClientDisconnected() throws Exception { @Test public void testNotifyTopicPublished() throws Exception { + final ByteBuf payload = Unpooled.copiedBuffer("Hello".getBytes(UTF_8)); + // Internal function call, will not release buffers. interceptor.notifyTopicPublished( MqttMessageBuilders.publish().qos(MqttQoS.AT_MOST_ONCE) - .payload(Unpooled.copiedBuffer("Hello".getBytes(UTF_8))).build(), + .payload(payload).build(), "cli1234", "cli1234"); interval(); assertEquals(60, n.get()); + payload.release(); } @Test