Skip to content

Commit

Permalink
Fix possible Race condition: change contains + get to get + null check.
Browse files Browse the repository at this point in the history
Between the contains and get calls, another thread could steal the
value. This also saves one hash lookup.
  • Loading branch information
hylkevds committed May 9, 2021
1 parent df9307f commit 696ab80
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -299,17 +299,19 @@ public void resendInflightNotAcked() {
debugLogPacketIds(expired);

for (InFlightPacket notAckPacketId : expired) {
if (inflightWindow.containsKey(notAckPacketId.packetId)) {
final SessionRegistry.PublishedMessage msg =
(SessionRegistry.PublishedMessage) inflightWindow.get(notAckPacketId.packetId);
final Topic topic = msg.topic;
final MqttQoS qos = msg.publishingQos;
final ByteBuf payload = msg.payload;
final ByteBuf copiedPayload = payload.retainedDuplicate();
MqttPublishMessage publishMsg = publishNotRetainedDuplicated(notAckPacketId, topic, qos, copiedPayload);
inflightTimeouts.add(new InFlightPacket(notAckPacketId.packetId, FLIGHT_BEFORE_RESEND_MS));
mqttConnection.sendPublish(publishMsg);
final SessionRegistry.PublishedMessage msg =
(SessionRegistry.PublishedMessage) inflightWindow.get(notAckPacketId.packetId);
if (msg == null) {
// Already acked...
continue;
}
final Topic topic = msg.topic;
final MqttQoS qos = msg.publishingQos;
final ByteBuf payload = msg.payload;
final ByteBuf copiedPayload = payload.retainedDuplicate();
MqttPublishMessage publishMsg = publishNotRetainedDuplicated(notAckPacketId, topic, qos, copiedPayload);
inflightTimeouts.add(new InFlightPacket(notAckPacketId.packetId, FLIGHT_BEFORE_RESEND_MS));
mqttConnection.sendPublish(publishMsg);
}
}

Expand Down

0 comments on commit 696ab80

Please sign in to comment.