diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index 226c12a1f..f41fecd5e 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -299,17 +299,19 @@ public void resendInflightNotAcked() { debugLogPacketIds(expired); for (InFlightPacket notAckPacketId : expired) { - if (inflightWindow.containsKey(notAckPacketId.packetId)) { - final SessionRegistry.PublishedMessage msg = - (SessionRegistry.PublishedMessage) inflightWindow.get(notAckPacketId.packetId); - final Topic topic = msg.topic; - final MqttQoS qos = msg.publishingQos; - final ByteBuf payload = msg.payload; - final ByteBuf copiedPayload = payload.retainedDuplicate(); - MqttPublishMessage publishMsg = publishNotRetainedDuplicated(notAckPacketId, topic, qos, copiedPayload); - inflightTimeouts.add(new InFlightPacket(notAckPacketId.packetId, FLIGHT_BEFORE_RESEND_MS)); - mqttConnection.sendPublish(publishMsg); + final SessionRegistry.PublishedMessage msg = + (SessionRegistry.PublishedMessage) inflightWindow.get(notAckPacketId.packetId); + if (msg == null) { + // Already acked... + continue; } + final Topic topic = msg.topic; + final MqttQoS qos = msg.publishingQos; + final ByteBuf payload = msg.payload; + final ByteBuf copiedPayload = payload.retainedDuplicate(); + MqttPublishMessage publishMsg = publishNotRetainedDuplicated(notAckPacketId, topic, qos, copiedPayload); + inflightTimeouts.add(new InFlightPacket(notAckPacketId.packetId, FLIGHT_BEFORE_RESEND_MS)); + mqttConnection.sendPublish(publishMsg); } }