Skip to content

Commit

Permalink
Fixes #455, #587: QOS2 CCE and Deadlock
Browse files Browse the repository at this point in the history
#455: resendInflightNotAcked() assumed all messages in the inflightWindow are
PublishedMessage, but they can also be PubRelMarker. This caused a
ClassCastException.

#587: When sending PubRelMessages, never put them on the queue, since this
deadlocks the system.
  • Loading branch information
hylkevds committed Feb 25, 2021
1 parent 830bbc9 commit cf788ba
Showing 1 changed file with 25 additions and 20 deletions.
45 changes: 25 additions & 20 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,18 +190,14 @@ public void processPubRec(int packetId) {
inflightSlots.incrementAndGet();
}

if (canSkipQueue()) {
inflightSlots.decrementAndGet();
int pubRelPacketId = packetId/*mqttConnection.nextPacketId()*/;
inflightWindow.put(pubRelPacketId, new SessionRegistry.PubRelMarker());
inflightTimeouts.add(new InFlightPacket(pubRelPacketId, FLIGHT_BEFORE_RESEND_MS));
MqttMessage pubRel = MQTTConnection.pubrel(pubRelPacketId);
mqttConnection.sendIfWritableElseDrop(pubRel);
inflightSlots.decrementAndGet();
int pubRelPacketId = packetId/*mqttConnection.nextPacketId()*/;
inflightWindow.put(pubRelPacketId, new SessionRegistry.PubRelMarker());
inflightTimeouts.add(new InFlightPacket(pubRelPacketId, FLIGHT_BEFORE_RESEND_MS));
MqttMessage pubRel = MQTTConnection.pubrel(pubRelPacketId);
mqttConnection.sendIfWritableElseDrop(pubRel);

drainQueueToConnection();
} else {
sessionQueue.add(new SessionRegistry.PubRelMarker());
}
drainQueueToConnection();
}

public void processPubComp(int messageID) {
Expand Down Expand Up @@ -325,14 +321,23 @@ public void resendInflightNotAcked() {
debugLogPacketIds(expired);

for (InFlightPacket notAckPacketId : expired) {
if (inflightWindow.containsKey(notAckPacketId.packetId)) {
final SessionRegistry.PublishedMessage msg =
(SessionRegistry.PublishedMessage) inflightWindow.get(notAckPacketId.packetId);
final Topic topic = msg.topic;
final MqttQoS qos = msg.publishingQos;
final ByteBuf payload = msg.payload;
SessionRegistry.EnqueuedMessage msg = inflightWindow.get(notAckPacketId.packetId);
if (msg == null) {
// Already acked...
continue;
}
if (msg instanceof SessionRegistry.PubRelMarker) {
MqttMessage pubRel = MQTTConnection.pubrel(notAckPacketId.packetId);
inflightTimeouts.add(new InFlightPacket(notAckPacketId.packetId, FLIGHT_BEFORE_RESEND_MS));
mqttConnection.sendIfWritableElseDrop(pubRel);
} else {
final SessionRegistry.PublishedMessage pubMsg = (SessionRegistry.PublishedMessage) msg;
final Topic topic = pubMsg.topic;
final MqttQoS qos = pubMsg.publishingQos;
final ByteBuf payload = pubMsg.payload;
final ByteBuf copiedPayload = payload.retainedDuplicate();
MqttPublishMessage publishMsg = publishNotRetainedDuplicated(notAckPacketId, topic, qos, copiedPayload);
inflightTimeouts.add(new InFlightPacket(notAckPacketId.packetId, FLIGHT_BEFORE_RESEND_MS));
mqttConnection.sendPublish(publishMsg);
}
}
Expand Down Expand Up @@ -366,13 +371,13 @@ private void drainQueueToConnection() {
return;
}
int sendPacketId = mqttConnection.nextPacketId();
inflightSlots.decrementAndGet();
inflightWindow.put(sendPacketId, msg);
inflightTimeouts.add(new InFlightPacket(sendPacketId, FLIGHT_BEFORE_RESEND_MS));
if (msg instanceof SessionRegistry.PubRelMarker) {
MqttMessage pubRel = MQTTConnection.pubrel(sendPacketId);
mqttConnection.sendIfWritableElseDrop(pubRel);
} else {
inflightSlots.decrementAndGet();
inflightWindow.put(sendPacketId, msg);
inflightTimeouts.add(new InFlightPacket(sendPacketId, FLIGHT_BEFORE_RESEND_MS));
final SessionRegistry.PublishedMessage msgPub = (SessionRegistry.PublishedMessage) msg;
// Second pass-on.
msgPub.payload.retain();
Expand Down

0 comments on commit cf788ba

Please sign in to comment.