Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement mqtt message expiry #640

Merged
merged 4 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class MqttKafkaHeaderHelper
private static final String KAFKA_LOCAL_HEADER_NAME = "zilla:local";
private static final String KAFKA_QOS_HEADER_NAME = "zilla:qos";

private static final String KAFKA_TIMEOUT_HEADER_NAME = "zilla:timeout-ms";
private static final String KAFKA_TIMEOUT_HEADER_NAME = "zilla:expiry";

private static final String KAFKA_CONTENT_TYPE_HEADER_NAME = "zilla:content-type";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ private void onMqttData(
if (mqttPublishDataEx.expiryInterval() != -1)
{
final MutableDirectBuffer expiryBuffer = new UnsafeBuffer(new byte[4]);
expiryBuffer.putInt(0, mqttPublishDataEx.expiryInterval() * 1000, ByteOrder.BIG_ENDIAN);
expiryBuffer.putInt(0, mqttPublishDataEx.expiryInterval(), ByteOrder.BIG_ENDIAN);
kafkaHeadersRW.item(h ->
{
h.nameLen(helper.kafkaTimeoutHeaderName.sizeof());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,7 @@ private void onMqttData(

MqttWillMessageFW will = mqttWillRO.tryWrap(buffer, offset, limit);
this.delay = (int) Math.min(SECONDS.toMillis(will.delay()), sessionExpiryMillis);
final int expiryInterval = will.expiryInterval() == -1 ? -1 :
(int) TimeUnit.SECONDS.toMillis(will.expiryInterval());
final int expiryInterval = will.expiryInterval() == -1 ? -1 : will.expiryInterval();
final MqttWillMessageFW.Builder willMessageBuilder =
mqttMessageRW.wrap(willMessageBuffer, 0, willMessageBuffer.capacity())
.topic(will.topic())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static io.aklivity.zilla.runtime.engine.buffer.BufferPool.NO_SLOT;
import static io.aklivity.zilla.runtime.engine.concurrent.Signaler.NO_CANCEL_ID;
import static java.lang.System.currentTimeMillis;
import static java.time.Instant.now;
import static java.util.concurrent.TimeUnit.SECONDS;


Expand Down Expand Up @@ -104,7 +105,10 @@ public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory
private static final int RETAIN_FLAG = 1 << RETAIN.ordinal();
private static final int RETAIN_AS_PUBLISHED_FLAG = 1 << RETAIN_AS_PUBLISHED.ordinal();
private static final int SIGNAL_CONNECT_BOOTSTRAP_STREAM = 1;
private static final int DATA_FIN_FLAG = 0x03;
private static final int DATA_FLAG_INIT = 0x02;
private static final int DATA_FLAG_FIN = 0x01;
private static final OctetsFW EMPTY_OCTETS = new OctetsFW().wrap(new UnsafeBuffer(new byte[0]), 0, 0);

private final OctetsFW emptyRO = new OctetsFW().wrap(new UnsafeBuffer(0L, 0), 0, 0);
private final BeginFW beginRO = new BeginFW();
private final DataFW dataRO = new DataFW();
Expand Down Expand Up @@ -1096,6 +1100,7 @@ final class KafkaMessagesProxy extends KafkaProxy
private long replyAck;
private int replyMax;
private int replyPad;
private boolean expiredMessage;

private KafkaMessagesProxy(
long originId,
Expand Down Expand Up @@ -1420,6 +1425,7 @@ private void onKafkaData(

assert replyAck <= replySeq;

sendData:
if (replySeq > replyAck + replyMax)
{
doKafkaReset(traceId);
Expand All @@ -1439,13 +1445,31 @@ private void onKafkaData(
final OctetsFW key = kafkaMergedDataEx != null ? kafkaMergedDataEx.fetch().key().value() : null;
final long filters = kafkaMergedDataEx != null ? kafkaMergedDataEx.fetch().filters() : 0;
final KafkaOffsetFW partition = kafkaMergedDataEx != null ? kafkaMergedDataEx.fetch().partition() : null;
final long timestamp = kafkaMergedDataEx != null ? kafkaMergedDataEx.fetch().timestamp() : 0;


if (key != null)
Flyweight mqttSubscribeDataEx = EMPTY_OCTETS;
if ((flags & DATA_FLAG_INIT) != 0x00 && key != null)
{
String topicName = kafkaMergedDataEx.fetch().key().value()
.get((b, o, m) -> b.getStringWithoutLengthUtf8(o, m - o));
helper.visit(kafkaMergedDataEx);

long expireInterval;
if (helper.timeout != -1)
{
expireInterval = timestamp + helper.timeout - now().toEpochMilli();
if (expireInterval < 0)
{
expiredMessage = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

long expireInterval;
if (helper.timeout != -1)
{
   ...
}
else
{
    expireInterval = helper.timeout;
}

can this be simplified to:

long expireInterval = -1;
if (helper.timeout != -1)
{
   ...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally I wanted to write this like you showed. But as we're using the expireInterval in a lambda we'd get:
"Variable used in lambda expression should be final or effectively final".

break sendData;
}
}
else
{
expireInterval = helper.timeout;
}

// If the qos it was created for is 0, set the high watermark, as we won't receive ack
if (mqtt.qos == MqttQoS.AT_MOST_ONCE.value())
{
Expand All @@ -1457,7 +1481,7 @@ private void onKafkaData(
}
}

final MqttDataExFW mqttSubscribeDataEx = mqttDataExRW.wrap(extBuffer, 0, extBuffer.capacity())
mqttSubscribeDataEx = mqttDataExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(mqttTypeId)
.subscribe(b ->
{
Expand Down Expand Up @@ -1487,9 +1511,10 @@ private void onKafkaData(
}
b.flags(flag);
b.subscriptionIds(subscriptionIdsRW.build());
if (helper.timeout != -1)

if (expireInterval != -1)
{
b.expiryInterval(helper.timeout / 1000);
b.expiryInterval((int) expireInterval);
}
if (helper.contentType != null)
{
Expand Down Expand Up @@ -1525,7 +1550,10 @@ private void onKafkaData(
}
});
}).build();
}

if (!expiredMessage)
{
if (!MqttKafkaState.initialOpened(mqtt.retained.state) ||
MqttKafkaState.replyClosed(mqtt.retained.state))
{
Expand Down Expand Up @@ -1555,6 +1583,11 @@ private void onKafkaData(
messageSlotReserved += reserved;
}
}

if ((flags & DATA_FLAG_FIN) != 0x00)
{
expiredMessage = false;
}
}
}

Expand All @@ -1573,7 +1606,7 @@ private void flushData(
{
final MqttSubscribeMessageFW message = mqttSubscribeMessageRO.wrap(dataBuffer, messageSlotOffset,
dataBuffer.capacity());
mqtt.doMqttData(traceId, authorization, budgetId, reserved, DATA_FIN_FLAG, message.payload(),
mqtt.doMqttData(traceId, authorization, budgetId, reserved, DATA_FLAG_FIN, message.payload(),
message.extension());

messageSlotOffset += message.sizeof();
Expand Down Expand Up @@ -1834,6 +1867,7 @@ final class KafkaRetainedProxy extends KafkaProxy
private int replyPad;

private int unAckedPackets;
private boolean expiredMessage;

private KafkaRetainedProxy(
long originId,
Expand Down Expand Up @@ -2138,6 +2172,7 @@ private void onKafkaData(

assert replyAck <= replySeq;

sendData:
if (replySeq > replyAck + replyMax)
{
doKafkaReset(traceId);
Expand All @@ -2157,13 +2192,31 @@ private void onKafkaData(
final OctetsFW key = kafkaMergedDataEx != null ? kafkaMergedDataEx.fetch().key().value() : null;
final long filters = kafkaMergedDataEx != null ? kafkaMergedDataEx.fetch().filters() : 0;
final KafkaOffsetFW partition = kafkaMergedDataEx != null ? kafkaMergedDataEx.fetch().partition() : null;
final long timestamp = kafkaMergedDataEx != null ? kafkaMergedDataEx.fetch().timestamp() : 0;

if (key != null)
Flyweight mqttSubscribeDataEx = EMPTY_OCTETS;
if ((flags & DATA_FLAG_INIT) != 0x00 && key != null)
{
String topicName = kafkaMergedDataEx.fetch().key().value()
.get((b, o, m) -> b.getStringWithoutLengthUtf8(o, m - o));
helper.visit(kafkaMergedDataEx);
final Flyweight mqttSubscribeDataEx = mqttDataExRW.wrap(extBuffer, 0, extBuffer.capacity())

long expireInterval;
if (helper.timeout != -1)
{
expireInterval = timestamp + helper.timeout - now().toEpochMilli();
if (expireInterval < 0)
{
expiredMessage = true;
break sendData;
}
}
else
{
expireInterval = helper.timeout;
}

mqttSubscribeDataEx = mqttDataExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(mqttTypeId)
.subscribe(b ->
{
Expand Down Expand Up @@ -2199,9 +2252,9 @@ private void onKafkaData(
}
b.flags(flag);
b.subscriptionIds(subscriptionIdsRW.build());
if (helper.timeout != -1)
if (expireInterval != -1)
{
b.expiryInterval(helper.timeout / 1000);
b.expiryInterval((int) expireInterval);
}
if (helper.contentType != null)
{
Expand Down Expand Up @@ -2237,11 +2290,18 @@ private void onKafkaData(
}
});
}).build();
}

if (!expiredMessage)
{
mqtt.doMqttData(traceId, authorization, budgetId, reserved, flags, payload, mqttSubscribeDataEx);

mqtt.mqttSharedBudget -= length;
}

if ((flags & DATA_FLAG_FIN) != 0x00)
{
expiredMessage = false;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,17 @@ public void shouldReceiveOneMessage() throws Exception
k3po.finish();
}

@Test
@Configuration("proxy.yaml")
@Configure(name = WILL_AVAILABLE_NAME, value = "false")
@Specification({
"${mqtt}/subscribe.one.message/client",
"${kafka}/subscribe.one.message.fragmented/server"})
public void shouldReceiveOneMessageFragmented() throws Exception
{
k3po.finish();
}

@Test
@Configuration("proxy.options.yaml")
@Configure(name = WILL_AVAILABLE_NAME, value = "false")
Expand Down Expand Up @@ -251,6 +262,17 @@ public void shouldReceiveRetainedNoRetainAsPublished() throws Exception
k3po.finish();
}

@Test
@Configuration("proxy.yaml")
@Configure(name = WILL_AVAILABLE_NAME, value = "false")
@Specification({
"${mqtt}/subscribe.retain/client",
"${kafka}/subscribe.retain.fragmented/server"})
public void shouldReceiveRetainedFragmented() throws Exception
{
k3po.finish();
}

@Test
@Configuration("proxy.yaml")
@Configure(name = WILL_AVAILABLE_NAME, value = "false")
Expand Down Expand Up @@ -580,4 +602,26 @@ public void shouldReplayRetainedQos2() throws Exception
{
k3po.finish();
}

@Test
@Configuration("proxy.yaml")
@Configure(name = WILL_AVAILABLE_NAME, value = "false")
@Specification({
"${mqtt}/subscribe.expire.message/client",
"${kafka}/subscribe.expire.message/server"})
public void shouldExpireMessage() throws Exception
{
k3po.finish();
}

@Test
@Configuration("proxy.yaml")
@Configure(name = WILL_AVAILABLE_NAME, value = "false")
@Specification({
"${mqtt}/subscribe.expire.message/client",
"${kafka}/subscribe.expire.message.fragmented/server"})
public void shouldExpireMessageFragmented() throws Exception
{
k3po.finish();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ write zilla:data.ext ${kafka:dataEx()
.header("zilla:filter", "sensor")
.header("zilla:filter", "one")
.header("zilla:local", "client")
.headerInt("zilla:timeout-ms", 15000)
.headerInt("zilla:expiry", 15)
.header("zilla:content-type", "message")
.header("zilla:format", "TEXT")
.header("zilla:reply-to", "messages")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ read zilla:data.ext ${kafka:matchDataEx()
.header("zilla:filter", "sensor")
.header("zilla:filter", "one")
.header("zilla:local", "client")
.headerInt("zilla:timeout-ms", 15000)
.headerInt("zilla:expiry", 15)
.header("zilla:content-type", "message")
.header("zilla:format", "TEXT")
.header("zilla:reply-to", "messages")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ write zilla:data.ext ${kafka:dataEx()
.header("zilla:filter", "sensor")
.header("zilla:filter", "one")
.header("zilla:local", "client")
.headerInt("zilla:timeout-ms", 15000)
.headerInt("zilla:expiry", 15)
.header("zilla:content-type", "message")
.header("zilla:format", "TEXT")
.header("zilla:reply-to", "mqtt-messages")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ read zilla:data.ext ${kafka:matchDataEx()
.header("zilla:filter", "sensor")
.header("zilla:filter", "one")
.header("zilla:local", "client")
.headerInt("zilla:timeout-ms", 15000)
.headerInt("zilla:expiry", 15)
.header("zilla:content-type", "message")
.header("zilla:format", "TEXT")
.header("zilla:reply-to", "mqtt-messages")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ write zilla:data.ext ${kafka:dataEx()
write ${mqtt:will()
.topic("obituaries")
.delay(1000)
.expiryInterval(15000)
.expiryInterval(15)
.format("TEXT")
.responseTopic("responses/client1")
.lifetimeId("1e6a1eb5-810a-459d-a12c-a6fa08f228d1")
Expand Down Expand Up @@ -427,7 +427,7 @@ read zilla:data.ext ${kafka:matchDataEx()
read ${mqtt:will()
.topic("obituaries")
.delay(1000)
.expiryInterval(15000)
.expiryInterval(15)
.format("TEXT")
.responseTopic("responses/client1")
.lifetimeId("1e6a1eb5-810a-459d-a12c-a6fa08f228d1")
Expand Down Expand Up @@ -467,7 +467,7 @@ write zilla:data.ext ${kafka:dataEx()
.partition(-1, -1)
.key("obituaries")
.header("zilla:filter", "obituaries")
.headerInt("zilla:timeout-ms", 15000)
.headerInt("zilla:expiry", 15)
.header("zilla:format", "TEXT")
.header("zilla:reply-to", "mqtt-messages")
.header("zilla:reply-key", "responses/client1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ read zilla:data.ext ${kafka:matchDataEx()
read ${mqtt:will()
.topic("obituaries")
.delay(1000)
.expiryInterval(15000)
.expiryInterval(15)
.format("TEXT")
.responseTopic("responses/client1")
.lifetimeId("1e6a1eb5-810a-459d-a12c-a6fa08f228d1")
Expand Down Expand Up @@ -428,7 +428,7 @@ write zilla:data.ext ${kafka:dataEx()
write ${mqtt:will()
.topic("obituaries")
.delay(1000)
.expiryInterval(15000)
.expiryInterval(15)
.format("TEXT")
.responseTopic("responses/client1")
.lifetimeId("1e6a1eb5-810a-459d-a12c-a6fa08f228d1")
Expand Down Expand Up @@ -466,7 +466,7 @@ read zilla:data.ext ${kafka:matchDataEx()
.partition(-1, -1)
.key("obituaries")
.header("zilla:filter", "obituaries")
.headerInt("zilla:timeout-ms", 15000)
.headerInt("zilla:expiry", 15)
.header("zilla:format", "TEXT")
.header("zilla:reply-to", "mqtt-messages")
.header("zilla:reply-key", "responses/client1")
Expand Down
Loading