Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement QoS 1 and QoS 2 #589

Merged
merged 24 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1455,6 +1455,7 @@ private void onMqttSubscribeDataEx(
{
final int deferred = subscribe.deferred();
final String topic = subscribe.topic().asString();
final int packetId = subscribe.packetId();
final int flags = subscribe.flags();
final Array32FW<Varuint32FW> subscriptionIds = subscribe.subscriptionIds();
final int expiryInterval = subscribe.expiryInterval();
Expand All @@ -1465,8 +1466,8 @@ private void onMqttSubscribeDataEx(
final Array32FW<MqttUserPropertyFW> properties = subscribe.properties();

out.printf(verboseFormat, index, offset, timestamp,
format("[subscribe] (%d) %s %d %d %s %s %s %s",
deferred, topic, flags, expiryInterval, contentType, format.name(), responseTopic, correlation));
format("[subscribe] (%d) %s %d %d %d %s %s %s %s",
deferred, topic, packetId, flags, expiryInterval, contentType, format.name(), responseTopic, correlation));
subscriptionIds.forEach(s -> out.printf(verboseFormat, index, offset, timestamp,
format("Subscription ID: %d ", s.value())));
properties.forEach(u -> out.printf(verboseFormat, index, offset, timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1296,8 +1296,9 @@ private void onConsumerInitialFlush(
KafkaConsumerFlushExFW consumerFlushEx = kafkaFlushEx.consumer();
final KafkaOffsetFW partition = consumerFlushEx.partition();
final int leaderEpoch = consumerFlushEx.leaderEpoch();
final long correlationId = consumerFlushEx.correlationId();

offsetCommit.onOffsetCommitRequest(traceId, authorization, partition, leaderEpoch);
offsetCommit.onOffsetCommitRequest(traceId, authorization, partition, leaderEpoch, correlationId);
}

private void onConsumerInitialAbort(
Expand Down Expand Up @@ -1723,7 +1724,8 @@ private void onOffsetCommitRequest(
long traceId,
long authorization,
KafkaOffsetFW partition,
int leaderEpoch)
int leaderEpoch,
long correlationId)
{
doOffsetCommitInitialBegin(traceId, 0);

Expand All @@ -1732,7 +1734,8 @@ private void onOffsetCommitRequest(
partition.partitionOffset(),
delegate.fanout.generationId,
leaderEpoch,
partition.metadata().asString()));
partition.metadata().asString(),
correlationId));

doOffsetCommit(traceId, authorization);
}
Expand All @@ -1750,8 +1753,8 @@ private void onOffsetCommitResponse(
.partition(p -> p
.partitionId(commit.partitionId)
.partitionOffset(commit.partitionOffset)
.metadata(commit.metadata)
))
)
.correlationId(commit.correlationId))
.build()
.sizeof()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2817,7 +2817,8 @@ private void doConsumerInitialFlush(
.partitionId(offsetAck.partitionId())
.partitionOffset(offsetAck.partitionOffset())
.metadata(offsetAck.metadata()))
.leaderEpoch(partitionOffset.leaderEpoch))
.leaderEpoch(partitionOffset.leaderEpoch)
.correlationId(consumer.correlationId()))
.build();

final int reserved = initialPad;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,31 @@ public final class KafkaPartitionOffset
public final int generationId;
public final int leaderEpoch;
public final String metadata;
public final long correlationId;

public KafkaPartitionOffset(
int partitionId,
long partitionOffset,
int generationId,
int leaderEpoch,
String metadata)
{
this(partitionId, partitionOffset, generationId, leaderEpoch, metadata, -1);
}

public KafkaPartitionOffset(
int partitionId,
long partitionOffset,
int generationId,
int leaderEpoch,
String metadata,
long correlationId)
{
this.partitionId = partitionId;
this.partitionOffset = partitionOffset;
this.generationId = generationId;
this.leaderEpoch = leaderEpoch;
this.metadata = metadata;
this.correlationId = correlationId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class MqttKafkaHeaderHelper
private static final String KAFKA_REPLY_FILTER_HEADER_NAME = "zilla:reply-filter";

private static final String KAFKA_LOCAL_HEADER_NAME = "zilla:local";
private static final String KAFKA_QOS_HEADER_NAME = "zilla:qos";

private static final String KAFKA_TIMEOUT_HEADER_NAME = "zilla:timeout-ms";

Expand All @@ -47,6 +48,7 @@ public class MqttKafkaHeaderHelper
public final OctetsFW kafkaFilterHeaderName;
public final OctetsFW kafkaReplyFilterHeaderName;
public final OctetsFW kafkaLocalHeaderName;
public final OctetsFW kafkaQosHeaderName;
public final OctetsFW kafkaTimeoutHeaderName;
public final OctetsFW kafkaContentTypeHeaderName;
public final OctetsFW kafkaFormatHeaderName;
Expand All @@ -61,6 +63,7 @@ public class MqttKafkaHeaderHelper
public int timeout;
public OctetsFW contentType;
public String format;
public String qos;
public OctetsFW replyTo;
public OctetsFW replyKey;
public OctetsFW correlation;
Expand All @@ -71,6 +74,7 @@ public MqttKafkaHeaderHelper()
kafkaFilterHeaderName = stringToOctets(KAFKA_FILTER_HEADER_NAME);
kafkaReplyFilterHeaderName = stringToOctets(KAFKA_REPLY_FILTER_HEADER_NAME);
kafkaLocalHeaderName = stringToOctets(KAFKA_LOCAL_HEADER_NAME);
kafkaQosHeaderName = stringToOctets(KAFKA_QOS_HEADER_NAME);
kafkaTimeoutHeaderName = stringToOctets(KAFKA_TIMEOUT_HEADER_NAME);
kafkaContentTypeHeaderName = stringToOctets(KAFKA_CONTENT_TYPE_HEADER_NAME);
kafkaFormatHeaderName = stringToOctets(KAFKA_FORMAT_HEADER_NAME);
Expand All @@ -82,6 +86,7 @@ public MqttKafkaHeaderHelper()
visitors.put(kafkaFilterHeaderName, this::skip);
visitors.put(kafkaReplyFilterHeaderName, this::skip);
visitors.put(kafkaLocalHeaderName, this::skip);
visitors.put(kafkaQosHeaderName, this::visitQos);
visitors.put(kafkaTimeoutHeaderName, this::visitTimeout);
visitors.put(kafkaContentTypeHeaderName, this::visitContentType);
visitors.put(kafkaFormatHeaderName, this::visitFormat);
Expand Down Expand Up @@ -137,6 +142,12 @@ private void visitFormat(
format = value.get((b, o, m) -> b.getStringWithoutLengthUtf8(o, m - o));
}

private void visitQos(
OctetsFW value)
{
qos = value.get((b, o, m) -> b.getStringWithoutLengthUtf8(o, m - o));
}

private void visitReplyTo(
OctetsFW value)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.agrona.BitUtil;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.MqttKafkaConfiguration;
Expand Down Expand Up @@ -106,6 +107,7 @@ public class MqttKafkaPublishFactory implements MqttKafkaStreamFactory
private final LongFunction<MqttKafkaBindingConfig> supplyBinding;
private final String16FW binaryFormat;
private final String16FW textFormat;
private final Int2ObjectHashMap<String16FW> qosLevels;

public MqttKafkaPublishFactory(
MqttKafkaConfiguration config,
Expand All @@ -123,6 +125,9 @@ public MqttKafkaPublishFactory(
this.supplyBinding = supplyBinding;
this.binaryFormat = new String16FW(MqttPayloadFormat.BINARY.name());
this.textFormat = new String16FW(MqttPayloadFormat.TEXT.name());
this.qosLevels = new Int2ObjectHashMap<>();
this.qosLevels.put(1, new String16FW("1"));
this.qosLevels.put(2, new String16FW("2"));
}

@Override
Expand Down Expand Up @@ -413,6 +418,11 @@ private void onMqttData(
addHeader(helper.kafkaCorrelationHeaderName, mqttPublishDataEx.correlation().bytes());
}

if (mqttPublishDataEx.qos() != 0)
{
addHeader(helper.kafkaQosHeaderName, qosLevels.get(mqttPublishDataEx.qos()));
}

mqttPublishDataEx.properties().forEach(property ->
addHeader(property.key(), property.value()));

Expand Down Expand Up @@ -720,7 +730,9 @@ private void addHeader(
});
}

private void addHeader(String16FW key, String16FW value)
private void addHeader(
String16FW key,
String16FW value)
{
DirectBuffer keyBuffer = key.value();
DirectBuffer valueBuffer = value.value();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public class MqttKafkaSessionFactory implements MqttKafkaStreamFactory
private static final int WILDCARD_AVAILABLE_MASK = 1 << MqttServerCapabilities.WILDCARD.value();
private static final int SUBSCRIPTION_IDS_AVAILABLE_MASK = 1 << MqttServerCapabilities.SUBSCRIPTION_IDS.value();
private static final int SHARED_SUBSCRIPTIONS_AVAILABLE_MASK = 1 << MqttServerCapabilities.SHARED_SUBSCRIPTIONS.value();
private static final byte MQTT_KAFKA_MAX_QOS = 0;
private static final byte MQTT_KAFKA_MAX_QOS = 2;
private static final int MQTT_KAFKA_CAPABILITIES = RETAIN_AVAILABLE_MASK | WILDCARD_AVAILABLE_MASK |
SUBSCRIPTION_IDS_AVAILABLE_MASK;

Expand Down
Loading