Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqtt-kakfa will message bugfixes #644

Merged
merged 2 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ public class MqttKafkaSessionFactory implements MqttKafkaStreamFactory
private final InstanceId instanceId;
private final boolean willAvailable;
private final int reconnectDelay;
private final Int2ObjectHashMap<String16FW> qosLevels;

private String serverRef;
private int reconnectAttempt;
Expand Down Expand Up @@ -246,6 +247,10 @@ public MqttKafkaSessionFactory(
this.sessionExpiryIds = new Object2LongHashMap<>(-1);
this.instanceId = instanceId;
this.reconnectDelay = config.willStreamReconnectDelay();
this.qosLevels = new Int2ObjectHashMap<>();
this.qosLevels.put(0, new String16FW("0"));
this.qosLevels.put(1, new String16FW("1"));
this.qosLevels.put(2, new String16FW("2"));
}

@Override
Expand Down Expand Up @@ -2069,6 +2074,8 @@ private void sendWill(
will.properties().forEach(property ->
addHeader(property.key(), property.value()));

addHeader(helper.kafkaQosHeaderName, qosLevels.get(will.qos()));

kafkaDataEx = kafkaDataExRW
.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(kafkaTypeId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1907,7 +1907,6 @@ protected void doKafkaConsumerFlush(
final MqttOffsetStateFlags state = offsetCommit.state;
final int packetId = offsetCommit.packetId;

boolean shouldClose = false;
if (qos == MqttQoS.EXACTLY_ONCE.value() && state == MqttOffsetStateFlags.COMPLETE)
{
final IntArrayList incompletes = incompletePacketIds.get(offset.partitionId);
Expand All @@ -1923,11 +1922,6 @@ protected void doKafkaConsumerFlush(
incompletePacketIds.computeIfAbsent(offset.partitionId, c -> new IntArrayList()).add(packetId);
}

if (unAckedPackets == 0 && incompletePacketIds.isEmpty())
{
shouldClose = true;
}

final int correlationId = state == MqttOffsetStateFlags.INCOMPLETE ? packetId : -1;

final KafkaFlushExFW kafkaFlushEx =
Expand All @@ -1949,12 +1943,6 @@ protected void doKafkaConsumerFlush(

doFlush(kafka, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization, budgetId, reserved, kafkaFlushEx);

if (shouldClose)
{
mqtt.retainedSubscriptionIds.clear();
doKafkaEnd(traceId, authorization);
}
}

private void doKafkaFlush(
Expand Down Expand Up @@ -2299,7 +2287,10 @@ private void onKafkaFlush(
.subscribe(b -> b.packetId((int) correlationId)).build();
mqtt.doMqttFlush(traceId, authorization, budgetId, reserved, mqttSubscribeFlushEx);
}
unAckedPackets--;
else
{
unAckedPackets--;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "unacked packet" mean in this context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't close the retained stream until we received back ALL the acks for any qos1/2 messages. So basically this is just a counter to know when every delivered message were acked.

}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2893,19 +2893,18 @@ private int onDecodeConnectWillMessage(
reasonCode = RETAIN_NOT_SUPPORTED;
break decode;
}
payload.willRetain = (byte) RETAIN_FLAG;
}

if (payload.willQos > maximumQos)
final int flags = connectFlags;
final int willFlags = decodeWillFlags(flags);
final int willQos = decodeWillQos(flags);

if (willQos > maximumQos)
{
reasonCode = QOS_NOT_SUPPORTED;
break decode;
}

final int flags = connectFlags;
final int willFlags = decodeWillFlags(flags);
final int willQos = decodeWillQos(flags);

if (willFlagSet)
{
final MqttDataExFW.Builder sessionDataExBuilder =
Expand Down Expand Up @@ -6299,8 +6298,7 @@ private static int decodeWillQos(
int willQos = 0;
if (isSetWillQos(flags))
{
//TODO shift by 3?
willQos = (flags & WILL_QOS_MASK) >>> 2;
willQos = (flags & WILL_QOS_MASK) >>> 3;
}
return willQos;
}
Expand Down Expand Up @@ -6418,8 +6416,6 @@ private final class MqttConnectPayload
{
private byte reasonCode = SUCCESS;
private MqttPropertiesFW willProperties;
private byte willQos;
private byte willRetain;
private String16FW willTopic;
private BinaryFW willPayload;
private String16FW username;
Expand All @@ -6436,8 +6432,6 @@ private MqttConnectPayload reset()
{
this.reasonCode = SUCCESS;
this.willProperties = null;
this.willQos = 0;
this.willRetain = 0;
this.willTopic = null;
this.willPayload = null;
this.username = null;
Expand Down Expand Up @@ -6494,12 +6488,6 @@ private int decode(
break;
}

final byte qos = (byte) ((flags & WILL_QOS_MASK) >>> 3);
if (qos != 0)
{
willQos = (byte) (qos << 1);
}

if (willTopic == null || willTopic.asString().isEmpty())
{
reasonCode = MALFORMED_PACKET;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,9 +462,9 @@ write zilla:data.ext ${kafka:dataEx()
.key("obituaries")
.header("zilla:filter", "obituaries")
.header("zilla:format", "TEXT")
.header("zilla:qos", "0")
.build()
.build()}

write "client-1 disconnected abruptly"
write flush

Expand Down Expand Up @@ -498,6 +498,7 @@ write zilla:data.ext ${kafka:dataEx()
.key("obituaries")
.header("zilla:filter", "obituaries")
.header("zilla:format", "TEXT")
.header("zilla:qos", "0")
.build()
.build()}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ read zilla:data.ext ${kafka:matchDataEx()
.key("obituaries")
.header("zilla:filter", "obituaries")
.header("zilla:format", "TEXT")
.header("zilla:qos", "0")
.build()
.build()}
read "client-1 disconnected abruptly"
Expand Down Expand Up @@ -495,6 +496,7 @@ read zilla:data.ext ${kafka:matchDataEx()
.key("obituaries")
.header("zilla:filter", "obituaries")
.header("zilla:format", "TEXT")
.header("zilla:qos", "0")
.build()
.build()}
read "client-1 disconnected abruptly"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ write zilla:data.ext ${kafka:dataEx()
.header("zilla:reply-filter", "responses")
.header("zilla:reply-filter", "client1")
.header("zilla:correlation-id", "info")
.header("zilla:qos", "0")
.build()
.build()}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ read zilla:data.ext ${kafka:matchDataEx()
.header("zilla:reply-filter", "responses")
.header("zilla:reply-filter", "client1")
.header("zilla:correlation-id", "info")
.header("zilla:qos", "0")
.build()
.build()}
read "client-1 disconnected abruptly"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ write zilla:data.ext ${kafka:dataEx()
.key("obituaries")
.header("zilla:filter", "obituaries")
.header("zilla:format", "TEXT")
.header("zilla:qos", "0")
.build()
.build()}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ read zilla:data.ext ${kafka:matchDataEx()
.key("obituaries")
.header("zilla:filter", "obituaries")
.header("zilla:format", "TEXT")
.header("zilla:qos", "0")
.build()
.build()}
read "client-1 disconnected abruptly"
Expand Down