Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support MQTT fragmented messages #651

Merged
merged 34 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
24d4aa7
large message checkpoint
bmaidics Dec 18, 2023
5e6a779
Large messages checkpoint
bmaidics Dec 22, 2023
e066cb9
checkpoint
bmaidics Dec 22, 2023
5105bb4
fix build
bmaidics Dec 22, 2023
e5698d7
flow control
bmaidics Dec 22, 2023
dd3d97d
flow control
bmaidics Dec 22, 2023
2eb54df
Checkpoint
bmaidics Dec 22, 2023
53df3c4
Merge remote-tracking branch 'upstream/develop' into large_messages
bmaidics Dec 22, 2023
9bc88d6
Merge remote-tracking branch 'upstream/develop' into large_messages
bmaidics Dec 28, 2023
9189eb4
checkpoint
bmaidics Dec 29, 2023
5878c4b
fix invalid message error
bmaidics Dec 29, 2023
0b2ff30
checkpoint
bmaidics Dec 29, 2023
9e0092f
Fix tests
bmaidics Dec 29, 2023
5fd7f26
Fix test
bmaidics Jan 2, 2024
4d57b39
Fix tcp flow control issue
bmaidics Jan 2, 2024
fe87bf6
Merge remote-tracking branch 'upstream/develop' into large_messages
bmaidics Jan 2, 2024
bbd38e3
Checkpoint
bmaidics Jan 2, 2024
80fdd6e
Correct padding
bmaidics Jan 2, 2024
733e9b7
Merge remote-tracking branch 'upstream/develop' into large_messages
bmaidics Jan 3, 2024
674a306
Add deferred
bmaidics Jan 3, 2024
5b12154
Flow control change
bmaidics Jan 3, 2024
a912b2d
Fix delegate flow control issues
bmaidics Jan 3, 2024
9351d17
Merge remote-tracking branch 'upstream/develop' into large_messages
bmaidics Jan 3, 2024
71c0988
Merge remote-tracking branch 'upstream/develop' into large_messages
bmaidics Jan 4, 2024
009c65f
Add missing deferred on mqtt subscribe data
bmaidics Jan 4, 2024
ffe834c
Publish/subsribe working
bmaidics Jan 4, 2024
185d1c6
Will message flow control fixes
bmaidics Jan 5, 2024
f54dc17
Remove zilla.pcap
bmaidics Jan 5, 2024
c984ea9
Revert "Fix tcp flow control issue"
bmaidics Jan 5, 2024
1336c2b
Address PR feedback
bmaidics Jan 8, 2024
d1b4b10
Review items
bmaidics Jan 9, 2024
52c7bb5
Revert copy-paste error
bmaidics Jan 9, 2024
efdab6a
Merge remote-tracking branch 'upstream/develop' into large_messages
bmaidics Jan 9, 2024
ca03ad5
fix dump
attilakreiner Jan 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.agrona.BitUtil;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Int2IntHashMap;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.concurrent.UnsafeBuffer;

Expand All @@ -48,14 +49,17 @@
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.BeginFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.DataFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.EndFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.ExtensionFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.FlushFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.KafkaBeginExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.KafkaDataExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.KafkaFlushExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.KafkaResetExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttBeginExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttDataExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttPublishBeginExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttPublishDataExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttResetExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.ResetFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.WindowFW;
import io.aklivity.zilla.runtime.engine.EngineContext;
Expand All @@ -67,10 +71,25 @@ public class MqttKafkaPublishFactory implements MqttKafkaStreamFactory
private static final OctetsFW EMPTY_OCTETS = new OctetsFW().wrap(new UnsafeBuffer(new byte[0]), 0, 0);
private static final KafkaAckMode KAFKA_DEFAULT_ACK_MODE = KafkaAckMode.LEADER_ONLY;
private static final String KAFKA_TYPE_NAME = "kafka";
private static final String MQTT_TYPE_NAME = "mqtt";
private static final byte SLASH_BYTE = (byte) '/';
private static final int DATA_FLAG_INIT = 0x02;
private static final int DATA_FLAG_FIN = 0x01;
private static final int DATA_FLAG_COMPLETE = 0x03;
public static final int PUBLISH_FLAGS_RETAINED_MASK = 1 << MqttPublishFlags.RETAIN.value();
public static final int MQTT_PACKET_TOO_LARGE = 0x95;
public static final int MQTT_IMPLEMENTATION_SPECIFIC_ERROR = 0x83;
private static final int KAFKA_ERROR_MESSAGE_TOO_LARGE = 18;
public static final Int2IntHashMap MQTT_REASON_CODES;

static
{
final Int2IntHashMap reasonCodes = new Int2IntHashMap(MQTT_IMPLEMENTATION_SPECIFIC_ERROR);

reasonCodes.put(KAFKA_ERROR_MESSAGE_TOO_LARGE, MQTT_PACKET_TOO_LARGE);

MQTT_REASON_CODES = reasonCodes;
}

private final OctetsFW emptyRO = new OctetsFW().wrap(new UnsafeBuffer(0L, 0), 0, 0);
private final BeginFW beginRO = new BeginFW();
Expand All @@ -91,12 +110,15 @@ public class MqttKafkaPublishFactory implements MqttKafkaStreamFactory
private final WindowFW.Builder windowRW = new WindowFW.Builder();
private final ResetFW.Builder resetRW = new ResetFW.Builder();

private final ExtensionFW extensionRO = new ExtensionFW();
private final MqttBeginExFW mqttBeginExRO = new MqttBeginExFW();
private final MqttDataExFW mqttDataExRO = new MqttDataExFW();
private final KafkaResetExFW kafkaResetExRO = new KafkaResetExFW();

private final KafkaBeginExFW.Builder kafkaBeginExRW = new KafkaBeginExFW.Builder();
private final KafkaFlushExFW.Builder kafkaFlushExRW = new KafkaFlushExFW.Builder();
private final KafkaDataExFW.Builder kafkaDataExRW = new KafkaDataExFW.Builder();
private final MqttResetExFW.Builder mqttResetExRW = new MqttResetExFW.Builder();
private final Array32FW.Builder<KafkaHeaderFW.Builder, KafkaHeaderFW> kafkaHeadersRW =
new Array32FW.Builder<>(new KafkaHeaderFW.Builder(), new KafkaHeaderFW());

Expand All @@ -108,6 +130,7 @@ public class MqttKafkaPublishFactory implements MqttKafkaStreamFactory
private final LongUnaryOperator supplyReplyId;
private final MqttKafkaHeaderHelper helper;
private final int kafkaTypeId;
private final int mqttTypeId;
private final LongFunction<MqttKafkaBindingConfig> supplyBinding;
private final String16FW binaryFormat;
private final String16FW textFormat;
Expand All @@ -119,6 +142,7 @@ public MqttKafkaPublishFactory(
LongFunction<MqttKafkaBindingConfig> supplyBinding)
{
this.kafkaTypeId = context.supplyTypeId(KAFKA_TYPE_NAME);
this.mqttTypeId = context.supplyTypeId(MQTT_TYPE_NAME);
this.writeBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]);
this.extBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]);
this.kafkaHeadersBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]);
Expand Down Expand Up @@ -199,8 +223,7 @@ private final class MqttPublishProxy
private Array32FW<String16FW> topicNameHeaders;
private OctetsFW clientIdOctets;
private boolean retainAvailable;
private boolean retainedData;
private boolean fragmentedData;
private int publishFlags;

private MqttPublishProxy(
MessageConsumer mqtt,
Expand Down Expand Up @@ -447,15 +470,14 @@ private void onMqttData(
.headers(kafkaHeadersRW.build())))
.build();

retainedData = (mqttPublishDataEx.flags() & 1 << MqttPublishFlags.RETAIN.value()) != 0;
publishFlags = mqttPublishDataEx.flags();
}

fragmentedData = (flags & DATA_FLAG_FIN) == 0;
messages.doKafkaData(traceId, authorization, budgetId, reserved, flags, payload, kafkaDataEx);

if (retainAvailable)
{
if (retainedData)
if (hasPublishFlagRetained(publishFlags))
{
retained.doKafkaData(traceId, authorization, budgetId, reserved, flags, payload, kafkaDataEx);
}
Expand All @@ -475,10 +497,16 @@ private void onMqttData(

if ((flags & DATA_FLAG_FIN) != 0x00)
{
retainedData = false;
publishFlags = 0;
}
}

private boolean hasPublishFlagRetained(
int publishFlags)
{
return (publishFlags & PUBLISH_FLAGS_RETAINED_MASK) != 0;
}

private void setHashKey(
KafkaKeyFW.Builder builder)
{
Expand Down Expand Up @@ -670,8 +698,9 @@ private void doMqttWindow(
int padding,
int capabilities)
{
final long newInitialAck = retainedData ? Math.min(messages.initialAck, retained.initialAck) : messages.initialAck;
final int newInitialMax = retainedData ? Math.max(messages.initialMax, retained.initialMax) : messages.initialMax;
final boolean retainedFlag = hasPublishFlagRetained(publishFlags);
final long newInitialAck = retainedFlag ? Math.min(messages.initialAck, retained.initialAck) : messages.initialAck;
final int newInitialMax = retainedFlag ? Math.max(messages.initialMax, retained.initialMax) : messages.initialMax;

if (initialAck != newInitialAck || initialMax != newInitialMax)
{
Expand All @@ -686,13 +715,14 @@ private void doMqttWindow(
}

private void doMqttReset(
long traceId)
long traceId,
Flyweight extension)
{
if (!MqttKafkaState.initialClosed(state))
{
state = MqttKafkaState.closeInitial(state);

doReset(mqtt, originId, routedId, initialId, initialSeq, initialAck, initialMax, traceId);
doReset(mqtt, originId, routedId, initialId, initialSeq, initialAck, initialMax, traceId, extension);
}
}
}
Expand Down Expand Up @@ -1060,7 +1090,22 @@ private void onKafkaReset(

assert delegate.initialAck <= delegate.initialSeq;

delegate.doMqttReset(traceId);
final OctetsFW extension = reset.extension();
final ExtensionFW resetEx = extension.get(extensionRO::tryWrap);
final KafkaResetExFW kafkaResetEx =
resetEx != null && resetEx.typeId() == kafkaTypeId ? extension.get(kafkaResetExRO::tryWrap) : null;

Flyweight mqttResetEx = EMPTY_OCTETS;
if (kafkaResetEx != null)
{
mqttResetEx = mqttResetExRW
.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(mqttTypeId)
.reasonCode(MQTT_REASON_CODES.get(kafkaResetEx.error()))
.build();
}

delegate.doMqttReset(traceId, mqttResetEx);
}

private void doKafkaReset(
Expand All @@ -1070,7 +1115,7 @@ private void doKafkaReset(
{
state = MqttKafkaState.closeReply(state);

doReset(kafka, originId, routedId, replyId, replySeq, replyAck, replyMax, traceId);
doReset(kafka, originId, routedId, replyId, replySeq, replyAck, replyMax, traceId, EMPTY_OCTETS);
}
}

Expand Down Expand Up @@ -1372,13 +1417,28 @@ private void onKafkaReset(
final long traceId = reset.traceId();

assert acknowledge <= sequence;
assert acknowledge >= initialAck;
assert acknowledge >= delegate.initialAck;

initialAck = acknowledge;
delegate.initialAck = acknowledge;

assert initialAck <= initialSeq;
assert delegate.initialAck <= delegate.initialSeq;

final OctetsFW extension = reset.extension();
final ExtensionFW resetEx = extension.get(extensionRO::tryWrap);
final KafkaResetExFW kafkaResetEx =
resetEx != null && resetEx.typeId() == kafkaTypeId ? extension.get(kafkaResetExRO::tryWrap) : null;

delegate.doMqttReset(traceId);
Flyweight mqttResetEx = EMPTY_OCTETS;
if (kafkaResetEx != null)
{
mqttResetEx = mqttResetExRW
.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(mqttTypeId)
.reasonCode(MQTT_REASON_CODES.get(kafkaResetEx.error()))
.build();
}

delegate.doMqttReset(traceId, mqttResetEx);
}

private void doKafkaReset(
Expand All @@ -1388,7 +1448,7 @@ private void doKafkaReset(
{
state = MqttKafkaState.closeReply(state);

doReset(kafka, originId, routedId, replyId, replySeq, replyAck, replyMax, traceId);
doReset(kafka, originId, routedId, replyId, replySeq, replyAck, replyMax, traceId, EMPTY_OCTETS);
}
}

Expand Down Expand Up @@ -1639,7 +1699,8 @@ private void doReset(
long sequence,
long acknowledge,
int maximum,
long traceId)
long traceId,
Flyweight extension)
{
final ResetFW reset = resetRW.wrap(writeBuffer, 0, writeBuffer.capacity())
.originId(originId)
Expand All @@ -1649,6 +1710,7 @@ private void doReset(
.acknowledge(acknowledge)
.maximum(maximum)
.traceId(traceId)
.extension(extension.buffer(), extension.offset(), extension.sizeof())
.build();

sender.accept(reset.typeId(), reset.buffer(), reset.offset(), reset.sizeof());
Expand Down
Loading
Loading