Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #455, #587: qos2 cce and deadlock #588

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 27 additions & 29 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,18 +196,12 @@ public void processPubRec(int pubRecPacketId) {
return;
}

inflightSlots.incrementAndGet();
if (canSkipQueue()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hylkevds could elaborate more on the reason why this if and the management of the slots created deadlock or any other problem?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(assuming I understand the inner workings correctly)

Lets say we have 10 inflight slots taken by QoS2 messages, and there are 1000 QoS2 messages in the queue waiting to be sent after a burst of activity.
We've just received a message in the QoS2 workflow, so the workflow of this message is taking up a message slot.
If we give up our slot, we're not getting it back, since the messageQueue is not empty. canSkipQueue will return false.

The last step in one of our first 10 QoS2 workflows will now end up as message 1001 on the queue. We are going to try to start 1000 more QoS2 workflows before finishing the 10 we're already working on! But the client is most likely not going to respond to most of those 1000 new workflows, since the client also already has 10 workflows open (assuming the client also has 10 inflight slots).
So we're going to have to wait for all 1000 new workflows to time out before we get a free slot again to finish the first 10 workflows.

But:
The fact that we received this message means we have an inflight slot for this workflow.
The fact that we received this message also means we want to send a message and that the connection is open.
So why give up this slot, that we know we need for a message that the client really needs to close an open workflow? It's better to finish a workflow then to start a new one, so responses on messages in open workflows should have priority over messages that start new workflows and should thus not be put on the queue.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explanation and good catch! I'll try to figure out hot to create a test for this case

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition is difficult to test unit

inflightSlots.decrementAndGet();
inflightWindow.put(pubRecPacketId, new SessionRegistry.PubRelMarker());
inflightTimeouts.add(new InFlightPacket(pubRecPacketId, FLIGHT_BEFORE_RESEND_MS));
MqttMessage pubRel = MQTTConnection.pubrel(pubRecPacketId);
mqttConnection.sendIfWritableElseDrop(pubRel);
inflightWindow.put(pubRecPacketId, new SessionRegistry.PubRelMarker());
inflightTimeouts.add(new InFlightPacket(pubRecPacketId, FLIGHT_BEFORE_RESEND_MS));
MqttMessage pubRel = MQTTConnection.pubrel(pubRecPacketId);
mqttConnection.sendIfWritableElseDrop(pubRel);

drainQueueToConnection();
} else {
sessionQueue.add(new SessionRegistry.PubRelMarker());
}
drainQueueToConnection();
}

public void processPubComp(int messageID) {
Expand Down Expand Up @@ -348,18 +342,25 @@ public void resendInflightNotAcked() {
debugLogPacketIds(expired);

for (InFlightPacket notAckPacketId : expired) {
final SessionRegistry.PublishedMessage msg =
(SessionRegistry.PublishedMessage) inflightWindow.get(notAckPacketId.packetId);
final SessionRegistry.EnqueuedMessage msg = inflightWindow.get(notAckPacketId.packetId);
if (msg == null) {
// Already acked...
continue;
}
final Topic topic = msg.topic;
final MqttQoS qos = msg.publishingQos;
final ByteBuf payload = msg.payload;
MqttPublishMessage publishMsg = publishNotRetainedDuplicated(notAckPacketId, topic, qos, payload);
inflightTimeouts.add(new InFlightPacket(notAckPacketId.packetId, FLIGHT_BEFORE_RESEND_MS));
mqttConnection.sendPublish(publishMsg);
if (msg instanceof SessionRegistry.PubRelMarker) {
MqttMessage pubRel = MQTTConnection.pubrel(notAckPacketId.packetId);
inflightTimeouts.add(new InFlightPacket(notAckPacketId.packetId, FLIGHT_BEFORE_RESEND_MS));
mqttConnection.sendIfWritableElseDrop(pubRel);
} else {
final SessionRegistry.PublishedMessage pubMsg = (SessionRegistry.PublishedMessage) msg;
final Topic topic = pubMsg.topic;
final MqttQoS qos = pubMsg.publishingQos;
final ByteBuf payload = pubMsg.payload;
final ByteBuf copiedPayload = payload.retainedDuplicate();
MqttPublishMessage publishMsg = publishNotRetainedDuplicated(notAckPacketId, topic, qos, copiedPayload);
inflightTimeouts.add(new InFlightPacket(notAckPacketId.packetId, FLIGHT_BEFORE_RESEND_MS));
mqttConnection.sendPublish(publishMsg);
}
}
}

Expand Down Expand Up @@ -400,16 +401,13 @@ private void drainQueueToConnection() {
inflightSlots.incrementAndGet();
}
inflightTimeouts.add(new InFlightPacket(sendPacketId, FLIGHT_BEFORE_RESEND_MS));
if (msg instanceof SessionRegistry.PubRelMarker) {
MqttMessage pubRel = MQTTConnection.pubrel(sendPacketId);
mqttConnection.sendIfWritableElseDrop(pubRel);
} else {
final SessionRegistry.PublishedMessage msgPub = (SessionRegistry.PublishedMessage) msg;
MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(msgPub.topic.toString(),
msgPub.publishingQos,
msgPub.payload, sendPacketId);
mqttConnection.sendPublish(publishMsg);
}
final SessionRegistry.PublishedMessage msgPub = (SessionRegistry.PublishedMessage) msg;
MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(
msgPub.topic.toString(),
msgPub.publishingQos,
msgPub.payload, sendPacketId);
mqttConnection.sendPublish(publishMsg);

// we fetched msg from a map, but the release is cancelled out by the above retain
}
}
Expand Down