From 86641559e0d9106b8f2bc7c0c2067eae5ebb4daf Mon Sep 17 00:00:00 2001 From: bmaidics Date: Wed, 13 Dec 2023 18:30:42 +0100 Subject: [PATCH 1/4] Implement message expiry --- .../stream/MqttKafkaSubscribeFactory.java | 43 ++++++++++- .../stream/MqttKafkaSubscribeProxyIT.java | 14 ++++ .../kafka/subscribe.expire.message/client.rpt | 70 ++++++++++++++++++ .../kafka/subscribe.expire.message/server.rpt | 74 +++++++++++++++++++ .../client.rpt | 1 - .../server.rpt | 1 - .../mqtt/subscribe.expire.message/client.rpt | 34 +++++++++ .../mqtt/subscribe.expire.message/server.rpt | 35 +++++++++ .../client.rpt | 1 - .../server.rpt | 1 - .../binding/mqtt/kafka/streams/KafkaIT.java | 9 +++ .../binding/mqtt/kafka/streams/MqttIT.java | 9 +++ 12 files changed, 284 insertions(+), 8 deletions(-) create mode 100644 specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/client.rpt create mode 100644 specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/server.rpt create mode 100644 specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/client.rpt create mode 100644 specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/server.rpt diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java index 5a0374be62..81f64d065c 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java @@ -23,6 +23,7 @@ import static io.aklivity.zilla.runtime.engine.buffer.BufferPool.NO_SLOT; import static io.aklivity.zilla.runtime.engine.concurrent.Signaler.NO_CANCEL_ID; import static java.lang.System.currentTimeMillis; +import static java.time.Instant.now; import static java.util.concurrent.TimeUnit.SECONDS; @@ -1420,6 +1421,7 @@ private void onKafkaData( assert replyAck <= replySeq; + sendData: if (replySeq > replyAck + replyMax) { doKafkaReset(traceId); @@ -1439,6 +1441,7 @@ private void onKafkaData( final OctetsFW key = kafkaMergedDataEx != null ? kafkaMergedDataEx.fetch().key().value() : null; final long filters = kafkaMergedDataEx != null ? kafkaMergedDataEx.fetch().filters() : 0; final KafkaOffsetFW partition = kafkaMergedDataEx != null ? kafkaMergedDataEx.fetch().partition() : null; + final long timestamp = kafkaMergedDataEx != null ? kafkaMergedDataEx.fetch().timestamp() : 0; if (key != null) { @@ -1446,6 +1449,20 @@ private void onKafkaData( .get((b, o, m) -> b.getStringWithoutLengthUtf8(o, m - o)); helper.visit(kafkaMergedDataEx); + long expireInterval; + if (helper.timeout != -1) + { + expireInterval = timestamp + helper.timeout - now().toEpochMilli(); + if (expireInterval < 0) + { + break sendData; + } + } + else + { + expireInterval = helper.timeout; + } + // If the qos it was created for is 0, set the high watermark, as we won't receive ack if (mqtt.qos == MqttQoS.AT_MOST_ONCE.value()) { @@ -1487,9 +1504,10 @@ private void onKafkaData( } b.flags(flag); b.subscriptionIds(subscriptionIdsRW.build()); - if (helper.timeout != -1) + + if (expireInterval != -1) { - b.expiryInterval(helper.timeout / 1000); + b.expiryInterval((int) expireInterval / 1000); } if (helper.contentType != null) { @@ -2138,6 +2156,7 @@ private void onKafkaData( assert replyAck <= replySeq; + sendData: if (replySeq > replyAck + replyMax) { doKafkaReset(traceId); @@ -2157,12 +2176,28 @@ private void onKafkaData( final OctetsFW key = kafkaMergedDataEx != null ? kafkaMergedDataEx.fetch().key().value() : null; final long filters = kafkaMergedDataEx != null ? kafkaMergedDataEx.fetch().filters() : 0; final KafkaOffsetFW partition = kafkaMergedDataEx != null ? kafkaMergedDataEx.fetch().partition() : null; + final long timestamp = kafkaMergedDataEx != null ? kafkaMergedDataEx.fetch().timestamp() : 0; if (key != null) { String topicName = kafkaMergedDataEx.fetch().key().value() .get((b, o, m) -> b.getStringWithoutLengthUtf8(o, m - o)); helper.visit(kafkaMergedDataEx); + + long expireInterval; + if (helper.timeout != -1) + { + expireInterval = timestamp + helper.timeout - now().toEpochMilli(); + if (expireInterval < 0) + { + break sendData; + } + } + else + { + expireInterval = helper.timeout; + } + final Flyweight mqttSubscribeDataEx = mqttDataExRW.wrap(extBuffer, 0, extBuffer.capacity()) .typeId(mqttTypeId) .subscribe(b -> @@ -2199,9 +2234,9 @@ private void onKafkaData( } b.flags(flag); b.subscriptionIds(subscriptionIdsRW.build()); - if (helper.timeout != -1) + if (expireInterval != -1) { - b.expiryInterval(helper.timeout / 1000); + b.expiryInterval((int) expireInterval / 1000); } if (helper.contentType != null) { diff --git a/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeProxyIT.java b/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeProxyIT.java index 194d071a67..47d232f3f9 100644 --- a/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeProxyIT.java +++ b/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeProxyIT.java @@ -580,4 +580,18 @@ public void shouldReplayRetainedQos2() throws Exception { k3po.finish(); } + + @Test + @Configuration("proxy.yaml") + @Configure(name = WILL_AVAILABLE_NAME, value = "false") + @Specification({ + "${mqtt}/subscribe.expire.message/client", + "${kafka}/subscribe.expire.message/server"}) + public void shouldExpireMessage() throws Exception + { + k3po.start(); + Thread.sleep(1500); + k3po.notifyBarrier("SEND_DATA"); + k3po.finish(); + } } diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/client.rpt new file mode 100644 index 0000000000..91add417ca --- /dev/null +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/client.rpt @@ -0,0 +1,70 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("mqtt-messages") + .filter() + .headers("zilla:filter") + .sequence("sensor") + .sequence("one") + .build() + .headerNot("zilla:qos", "1") + .headerNot("zilla:qos", "2") + .build() + .evaluation("EAGER") + .build() + .build()} + +read zilla:begin.ext ${kafka:matchBeginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("mqtt-messages") + .partition(0, 0, 1, 1) + .build() + .build()} + +connected + + +read notify SEND_DATA +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .fetch() + .timestamp(timestamp) + .filters(1) + .partition(0, 2, 2) + .progress(0, 3) + .progress(1, 1) + .key("sensor/one") + .header("zilla:filter", "sensor") + .header("zilla:filter", "one") + .header("zilla:local", "client") + .headerInt("zilla:timeout-ms", 1000) + .header("zilla:format", "TEXT") + .build() + .build()} +read "message" + +write close +read closed diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/server.rpt new file mode 100644 index 0000000000..b61c9bf819 --- /dev/null +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/server.rpt @@ -0,0 +1,74 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +property timestamp ${kafka:timestamp()} + +accept "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${kafka:matchBeginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("mqtt-messages") + .filter() + .headers("zilla:filter") + .sequence("sensor") + .sequence("one") + .build() + .headerNot("zilla:qos", "1") + .headerNot("zilla:qos", "2") + .build() + .evaluation("EAGER") + .build() + .build()} + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("mqtt-messages") + .partition(0, 0, 1, 1) + .build() + .build()} + +connected + +write await SEND_DATA +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .fetch() + .timestamp(timestamp) + .filters(1) + .partition(0, 2, 2) + .progress(0, 3) + .progress(1, 1) + .key("sensor/one") + .header("zilla:filter", "sensor") + .header("zilla:filter", "one") + .header("zilla:local", "client") + .headerInt("zilla:timeout-ms", 1000) + .header("zilla:format", "TEXT") + .build() + .build()} +write "message" +write flush + +read closed +write close diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.one.message.receive.response.topic.and.correlation.data/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.one.message.receive.response.topic.and.correlation.data/client.rpt index de8fbc0dc3..1b56acd62d 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.one.message.receive.response.topic.and.correlation.data/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.one.message.receive.response.topic.and.correlation.data/client.rpt @@ -46,7 +46,6 @@ read zilla:data.ext ${kafka:matchDataEx() .header("zilla:filter", "sensor") .header("zilla:filter", "one") .header("zilla:local", "client") - .headerInt("zilla:timeout-ms", 15000) .header("zilla:content-type", "message") .header("zilla:format", "TEXT") .header("zilla:reply-to", "sensor/one") diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.one.message.receive.response.topic.and.correlation.data/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.one.message.receive.response.topic.and.correlation.data/server.rpt index 9742cf9769..ec25d52edd 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.one.message.receive.response.topic.and.correlation.data/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.one.message.receive.response.topic.and.correlation.data/server.rpt @@ -49,7 +49,6 @@ write zilla:data.ext ${kafka:dataEx() .header("zilla:filter", "sensor") .header("zilla:filter", "one") .header("zilla:local", "client") - .headerInt("zilla:timeout-ms", 15000) .header("zilla:content-type", "message") .header("zilla:format", "TEXT") .header("zilla:reply-to", "sensor/one") diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/client.rpt new file mode 100644 index 0000000000..3e8afacc6b --- /dev/null +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/client.rpt @@ -0,0 +1,34 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/mqtt0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .subscribe() + .clientId("client") + .qos("AT_MOST_ONCE") + .filter("sensor/one", 1, "AT_LEAST_ONCE") + .build() + .build()} + +connected + + +write await SEND_DATA +write close +read closed diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/server.rpt new file mode 100644 index 0000000000..1cd84ca994 --- /dev/null +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/server.rpt @@ -0,0 +1,35 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/mqtt0" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .subscribe() + .clientId("client") + .qos("AT_MOST_ONCE") + .filter("sensor/one", 1, "AT_LEAST_ONCE") + .build() + .build()} + +connected + +read notify SEND_DATA +read closed +write close diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.one.message.receive.response.topic.and.correlation.data/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.one.message.receive.response.topic.and.correlation.data/client.rpt index 0cf15cfa82..ea79a52fda 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.one.message.receive.response.topic.and.correlation.data/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.one.message.receive.response.topic.and.correlation.data/client.rpt @@ -32,7 +32,6 @@ read zilla:data.ext ${mqtt:matchDataEx() .subscribe() .topic("sensor/one") .subscriptionId(1) - .expiryInterval(15) .contentType("message") .format("TEXT") .responseTopic("sensor/one") diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.one.message.receive.response.topic.and.correlation.data/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.one.message.receive.response.topic.and.correlation.data/server.rpt index d173fa405a..71c80a12ea 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.one.message.receive.response.topic.and.correlation.data/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.one.message.receive.response.topic.and.correlation.data/server.rpt @@ -34,7 +34,6 @@ write zilla:data.ext ${mqtt:dataEx() .subscribe() .topic("sensor/one") .subscriptionId(1) - .expiryInterval(15) .contentType("message") .format("TEXT") .responseTopic("sensor/one") diff --git a/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/KafkaIT.java b/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/KafkaIT.java index b777b39bac..3439c2688d 100644 --- a/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/KafkaIT.java +++ b/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/KafkaIT.java @@ -917,4 +917,13 @@ public void shouldReceiveMessageOverlappingWildcardMixedQos() throws Exception { k3po.finish(); } + + @Test + @Specification({ + "${kafka}/subscribe.expire.message/client", + "${kafka}/subscribe.expire.message/server"}) + public void shouldExpireMessage() throws Exception + { + k3po.finish(); + } } diff --git a/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/MqttIT.java b/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/MqttIT.java index 837ef28fbe..995d5edc53 100644 --- a/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/MqttIT.java +++ b/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/MqttIT.java @@ -757,4 +757,13 @@ public void shouldReceiveMessageOverlappingWildcardMixedQos() throws Exception { k3po.finish(); } + + @Test + @Specification({ + "${mqtt}/subscribe.expire.message/client", + "${mqtt}/subscribe.expire.message/server"}) + public void shouldExpireMessage() throws Exception + { + k3po.finish(); + } } From ff942d30464cffca743a8f444e867b2c51425069 Mon Sep 17 00:00:00 2001 From: bmaidics Date: Thu, 14 Dec 2023 12:14:49 +0100 Subject: [PATCH 2/4] Apply feedback --- .../stream/MqttKafkaSubscribeFactory.java | 39 +++++-- .../stream/MqttKafkaSubscribeProxyIT.java | 38 ++++++- .../client.rpt | 72 ++++++++++++ .../server.rpt | 79 +++++++++++++ .../kafka/subscribe.expire.message/client.rpt | 2 +- .../kafka/subscribe.expire.message/server.rpt | 2 +- .../client.rpt | 55 +++++++++ .../server.rpt | 61 ++++++++++ .../subscribe.retain.fragmented/client.rpt | 103 +++++++++++++++++ .../subscribe.retain.fragmented/server.rpt | 107 ++++++++++++++++++ .../mqtt/subscribe.expire.message/client.rpt | 2 +- .../mqtt/subscribe.expire.message/server.rpt | 2 +- .../binding/mqtt/kafka/streams/KafkaIT.java | 27 +++++ 13 files changed, 577 insertions(+), 12 deletions(-) create mode 100644 specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/client.rpt create mode 100644 specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/server.rpt create mode 100644 specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.one.message.fragmented/client.rpt create mode 100644 specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.one.message.fragmented/server.rpt create mode 100644 specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.retain.fragmented/client.rpt create mode 100644 specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.retain.fragmented/server.rpt diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java index 81f64d065c..5411c2eed7 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java @@ -105,7 +105,10 @@ public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory private static final int RETAIN_FLAG = 1 << RETAIN.ordinal(); private static final int RETAIN_AS_PUBLISHED_FLAG = 1 << RETAIN_AS_PUBLISHED.ordinal(); private static final int SIGNAL_CONNECT_BOOTSTRAP_STREAM = 1; - private static final int DATA_FIN_FLAG = 0x03; + private static final int DATA_FLAG_INIT = 0x02; + private static final int DATA_FLAG_FIN = 0x01; + private static final OctetsFW EMPTY_OCTETS = new OctetsFW().wrap(new UnsafeBuffer(new byte[0]), 0, 0); + private final OctetsFW emptyRO = new OctetsFW().wrap(new UnsafeBuffer(0L, 0), 0, 0); private final BeginFW beginRO = new BeginFW(); private final DataFW dataRO = new DataFW(); @@ -1097,6 +1100,7 @@ final class KafkaMessagesProxy extends KafkaProxy private long replyAck; private int replyMax; private int replyPad; + private boolean expiredMessage; private KafkaMessagesProxy( long originId, @@ -1443,7 +1447,9 @@ private void onKafkaData( final KafkaOffsetFW partition = kafkaMergedDataEx != null ? kafkaMergedDataEx.fetch().partition() : null; final long timestamp = kafkaMergedDataEx != null ? kafkaMergedDataEx.fetch().timestamp() : 0; - if (key != null) + + Flyweight mqttSubscribeDataEx = EMPTY_OCTETS; + if ((flags & DATA_FLAG_INIT) != 0x00 && key != null) { String topicName = kafkaMergedDataEx.fetch().key().value() .get((b, o, m) -> b.getStringWithoutLengthUtf8(o, m - o)); @@ -1455,6 +1461,7 @@ private void onKafkaData( expireInterval = timestamp + helper.timeout - now().toEpochMilli(); if (expireInterval < 0) { + expiredMessage = true; break sendData; } } @@ -1474,7 +1481,7 @@ private void onKafkaData( } } - final MqttDataExFW mqttSubscribeDataEx = mqttDataExRW.wrap(extBuffer, 0, extBuffer.capacity()) + mqttSubscribeDataEx = mqttDataExRW.wrap(extBuffer, 0, extBuffer.capacity()) .typeId(mqttTypeId) .subscribe(b -> { @@ -1543,7 +1550,10 @@ private void onKafkaData( } }); }).build(); + } + if (!expiredMessage) + { if (!MqttKafkaState.initialOpened(mqtt.retained.state) || MqttKafkaState.replyClosed(mqtt.retained.state)) { @@ -1573,6 +1583,11 @@ private void onKafkaData( messageSlotReserved += reserved; } } + + if ((flags & DATA_FLAG_FIN) != 0x00) + { + expiredMessage = false; + } } } @@ -1591,7 +1606,7 @@ private void flushData( { final MqttSubscribeMessageFW message = mqttSubscribeMessageRO.wrap(dataBuffer, messageSlotOffset, dataBuffer.capacity()); - mqtt.doMqttData(traceId, authorization, budgetId, reserved, DATA_FIN_FLAG, message.payload(), + mqtt.doMqttData(traceId, authorization, budgetId, reserved, DATA_FLAG_FIN, message.payload(), message.extension()); messageSlotOffset += message.sizeof(); @@ -1852,6 +1867,7 @@ final class KafkaRetainedProxy extends KafkaProxy private int replyPad; private int unAckedPackets; + private boolean expiredMessage; private KafkaRetainedProxy( long originId, @@ -2178,7 +2194,8 @@ private void onKafkaData( final KafkaOffsetFW partition = kafkaMergedDataEx != null ? kafkaMergedDataEx.fetch().partition() : null; final long timestamp = kafkaMergedDataEx != null ? kafkaMergedDataEx.fetch().timestamp() : 0; - if (key != null) + Flyweight mqttSubscribeDataEx = EMPTY_OCTETS; + if ((flags & DATA_FLAG_INIT) != 0x00 && key != null) { String topicName = kafkaMergedDataEx.fetch().key().value() .get((b, o, m) -> b.getStringWithoutLengthUtf8(o, m - o)); @@ -2190,6 +2207,7 @@ private void onKafkaData( expireInterval = timestamp + helper.timeout - now().toEpochMilli(); if (expireInterval < 0) { + expiredMessage = true; break sendData; } } @@ -2198,7 +2216,7 @@ private void onKafkaData( expireInterval = helper.timeout; } - final Flyweight mqttSubscribeDataEx = mqttDataExRW.wrap(extBuffer, 0, extBuffer.capacity()) + mqttSubscribeDataEx = mqttDataExRW.wrap(extBuffer, 0, extBuffer.capacity()) .typeId(mqttTypeId) .subscribe(b -> { @@ -2272,11 +2290,18 @@ private void onKafkaData( } }); }).build(); + } + if (!expiredMessage) + { mqtt.doMqttData(traceId, authorization, budgetId, reserved, flags, payload, mqttSubscribeDataEx); - mqtt.mqttSharedBudget -= length; } + + if ((flags & DATA_FLAG_FIN) != 0x00) + { + expiredMessage = false; + } } } diff --git a/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeProxyIT.java b/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeProxyIT.java index 47d232f3f9..a894e33daa 100644 --- a/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeProxyIT.java +++ b/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeProxyIT.java @@ -152,6 +152,17 @@ public void shouldReceiveOneMessage() throws Exception k3po.finish(); } + @Test + @Configuration("proxy.yaml") + @Configure(name = WILL_AVAILABLE_NAME, value = "false") + @Specification({ + "${mqtt}/subscribe.one.message/client", + "${kafka}/subscribe.one.message.fragmented/server"}) + public void shouldReceiveOneMessageFragmented() throws Exception + { + k3po.finish(); + } + @Test @Configuration("proxy.options.yaml") @Configure(name = WILL_AVAILABLE_NAME, value = "false") @@ -251,6 +262,17 @@ public void shouldReceiveRetainedNoRetainAsPublished() throws Exception k3po.finish(); } + @Test + @Configuration("proxy.yaml") + @Configure(name = WILL_AVAILABLE_NAME, value = "false") + @Specification({ + "${mqtt}/subscribe.retain/client", + "${kafka}/subscribe.retain.fragmented/server"}) + public void shouldReceiveRetainedFragmented() throws Exception + { + k3po.finish(); + } + @Test @Configuration("proxy.yaml") @Configure(name = WILL_AVAILABLE_NAME, value = "false") @@ -591,7 +613,21 @@ public void shouldExpireMessage() throws Exception { k3po.start(); Thread.sleep(1500); - k3po.notifyBarrier("SEND_DATA"); + k3po.notifyBarrier("WAIT_TIME_ELAPSED"); + k3po.finish(); + } + + @Test + @Configuration("proxy.yaml") + @Configure(name = WILL_AVAILABLE_NAME, value = "false") + @Specification({ + "${mqtt}/subscribe.expire.message/client", + "${kafka}/subscribe.expire.message.fragmented/server"}) + public void shouldExpireMessageFragmented() throws Exception + { + k3po.start(); + Thread.sleep(1500); + k3po.notifyBarrier("WAIT_TIME_ELAPSED"); k3po.finish(); } } diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/client.rpt new file mode 100644 index 0000000000..173d709107 --- /dev/null +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/client.rpt @@ -0,0 +1,72 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("mqtt-messages") + .filter() + .headers("zilla:filter") + .sequence("sensor") + .sequence("one") + .build() + .headerNot("zilla:qos", "1") + .headerNot("zilla:qos", "2") + .build() + .evaluation("EAGER") + .build() + .build()} + +read zilla:begin.ext ${kafka:matchBeginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("mqtt-messages") + .partition(0, 0, 1, 1) + .build() + .build()} + +connected + + +read notify WAIT_TIME_ELAPSED +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .fetch() + .timestamp(timestamp) + .filters(1) + .partition(0, 2, 2) + .progress(0, 3) + .progress(1, 1) + .key("sensor/one") + .header("zilla:filter", "sensor") + .header("zilla:filter", "one") + .header("zilla:local", "client") + .headerInt("zilla:timeout-ms", 1000) + .header("zilla:format", "TEXT") + .build() + .build()} +read "mess" + +read "age" + +write close +read closed diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/server.rpt new file mode 100644 index 0000000000..760170ea12 --- /dev/null +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/server.rpt @@ -0,0 +1,79 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +property timestamp ${kafka:timestamp()} + +accept "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${kafka:matchBeginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("mqtt-messages") + .filter() + .headers("zilla:filter") + .sequence("sensor") + .sequence("one") + .build() + .headerNot("zilla:qos", "1") + .headerNot("zilla:qos", "2") + .build() + .evaluation("EAGER") + .build() + .build()} + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("mqtt-messages") + .partition(0, 0, 1, 1) + .build() + .build()} + +connected + +write await WAIT_TIME_ELAPSED +write option zilla:flags "init" +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .fetch() + .timestamp(timestamp) + .filters(1) + .partition(0, 2, 2) + .progress(0, 3) + .progress(1, 1) + .key("sensor/one") + .header("zilla:filter", "sensor") + .header("zilla:filter", "one") + .header("zilla:local", "client") + .headerInt("zilla:timeout-ms", 1000) + .header("zilla:format", "TEXT") + .build() + .build()} +write "mess" +write flush + +write option zilla:flags "fin" +write "age" +write flush + +read closed +write close diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/client.rpt index 91add417ca..c0f38481a2 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/client.rpt @@ -46,7 +46,7 @@ read zilla:begin.ext ${kafka:matchBeginEx() connected -read notify SEND_DATA +read notify WAIT_TIME_ELAPSED read zilla:data.ext ${kafka:matchDataEx() .typeId(zilla:id("kafka")) .merged() diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/server.rpt index b61c9bf819..9e98198e4c 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/server.rpt @@ -49,7 +49,7 @@ write zilla:begin.ext ${kafka:beginEx() connected -write await SEND_DATA +write await WAIT_TIME_ELAPSED write zilla:data.ext ${kafka:dataEx() .typeId(zilla:id("kafka")) .merged() diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.one.message.fragmented/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.one.message.fragmented/client.rpt new file mode 100644 index 0000000000..cb8138e477 --- /dev/null +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.one.message.fragmented/client.rpt @@ -0,0 +1,55 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("mqtt-messages") + .filter() + .headers("zilla:filter") + .sequence("sensor") + .sequence("one") + .build() + .build() + .evaluation("EAGER") + .build() + .build()} + +connected + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .fetch() + .filters(1) + .partition(0, 1, 2) + .progress(0, 2) + .progress(1, 1) + .key("sensor/one") + .header("zilla:filter", "sensor") + .header("zilla:filter", "one") + .header("zilla:local", "client") + .header("zilla:format", "TEXT") + .build() + .build()} + +read "mess" + +read "age" diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.one.message.fragmented/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.one.message.fragmented/server.rpt new file mode 100644 index 0000000000..b70fdeac42 --- /dev/null +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.one.message.fragmented/server.rpt @@ -0,0 +1,61 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${kafka:matchBeginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("mqtt-messages") + .filter() + .headers("zilla:filter") + .sequence("sensor") + .sequence("one") + .build() + .build() + .evaluation("EAGER") + .build() + .build()} + +connected + +write option zilla:flags "init" +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .fetch() + .timestamp(kafka:timestamp()) + .filters(1) + .partition(0, 1, 2) + .progress(0, 2) + .progress(1, 1) + .key("sensor/one") + .header("zilla:filter", "sensor") + .header("zilla:filter", "one") + .header("zilla:local", "client") + .header("zilla:format", "TEXT") + .build() + .build()} +write "mess" +write flush + +write option zilla:flags "fin" +write "age" +write flush diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.retain.fragmented/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.retain.fragmented/client.rpt new file mode 100644 index 0000000000..b6624d3f9a --- /dev/null +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.retain.fragmented/client.rpt @@ -0,0 +1,103 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("mqtt-retained") + .filter() + .headers("zilla:filter") + .sequence("sensor") + .sequence("one") + .build() + .build() + .evaluation("EAGER") + .build() + .build()} + +connected + + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .fetch() + .filters(1) + .partition(0, 1, 2) + .progress(0, 2) + .progress(1, 1) + .key("sensor/one") + .header("zilla:filter", "sensor") + .header("zilla:filter", "one") + .header("zilla:local", "client") + .header("zilla:format", "TEXT") + .build() + .build()} + +read "mess" + +read "age" + +read advised zilla:flush + +write close +read closed + +write notify RETAINED_FINISHED + +connect await RETAINED_FINISHED + "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("mqtt-messages") + .filter() + .headers("zilla:filter") + .sequence("sensor") + .sequence("one") + .build() + .build() + .evaluation("EAGER") + .build() + .build()} + +connected + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .fetch() + .filters(1) + .partition(0, 1, 2) + .progress(0, 2) + .progress(1, 1) + .key("sensor/one") + .header("zilla:filter", "sensor") + .header("zilla:filter", "one") + .header("zilla:local", "client") + .header("zilla:format", "TEXT") + .build() + .build()} + +read "message2" diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.retain.fragmented/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.retain.fragmented/server.rpt new file mode 100644 index 0000000000..88749b722e --- /dev/null +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.retain.fragmented/server.rpt @@ -0,0 +1,107 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${kafka:matchBeginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("mqtt-retained") + .filter() + .headers("zilla:filter") + .sequence("sensor") + .sequence("one") + .build() + .build() + .evaluation("EAGER") + .build() + .build()} + +connected + +write option zilla:flags "init" +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .fetch() + .timestamp(kafka:timestamp()) + .filters(1) + .partition(0, 1, 2) + .progress(0, 2) + .progress(1, 1) + .key("sensor/one") + .header("zilla:filter", "sensor") + .header("zilla:filter", "one") + .header("zilla:local", "client") + .header("zilla:format", "TEXT") + .build() + .build()} + +write "mess" +write flush + +write option zilla:flags "fin" +write "age" +write flush + +write advise zilla:flush + +read closed +write close + +accepted + +read zilla:begin.ext ${kafka:matchBeginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("mqtt-messages") + .filter() + .headers("zilla:filter") + .sequence("sensor") + .sequence("one") + .build() + .build() + .evaluation("EAGER") + .build() + .build()} + +connected + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .fetch() + .timestamp(kafka:timestamp()) + .filters(1) + .partition(0, 1, 2) + .progress(0, 2) + .progress(1, 1) + .key("sensor/one") + .header("zilla:filter", "sensor") + .header("zilla:filter", "one") + .header("zilla:local", "client") + .header("zilla:format", "TEXT") + .build() + .build()} + +write "message2" +write flush + diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/client.rpt index 3e8afacc6b..b3e73377f0 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/client.rpt @@ -29,6 +29,6 @@ write zilla:begin.ext ${mqtt:beginEx() connected -write await SEND_DATA +write await WAIT_TIME_ELAPSED write close read closed diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/server.rpt index 1cd84ca994..d474db70f8 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/server.rpt @@ -30,6 +30,6 @@ read zilla:begin.ext ${mqtt:matchBeginEx() connected -read notify SEND_DATA +read notify WAIT_TIME_ELAPSED read closed write close diff --git a/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/KafkaIT.java b/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/KafkaIT.java index 3439c2688d..ecfb8a7917 100644 --- a/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/KafkaIT.java +++ b/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/KafkaIT.java @@ -341,6 +341,15 @@ public void shouldReceiveOneMessage() throws Exception k3po.finish(); } + @Test + @Specification({ + "${kafka}/subscribe.one.message.fragmented/client", + "${kafka}/subscribe.one.message.fragmented/server"}) + public void shouldReceiveOneMessageFragmented() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${kafka}/subscribe.one.message.changed.topic.name/client", @@ -386,6 +395,15 @@ public void shouldReceiveRetained() throws Exception k3po.finish(); } + @Test + @Specification({ + "${kafka}/subscribe.retain.fragmented/client", + "${kafka}/subscribe.retain.fragmented/server"}) + public void shouldReceiveRetainedFragmented() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${kafka}/subscribe.receive.message.wildcard/client", @@ -926,4 +944,13 @@ public void shouldExpireMessage() throws Exception { k3po.finish(); } + + @Test + @Specification({ + "${kafka}/subscribe.expire.message.fragmented/client", + "${kafka}/subscribe.expire.message.fragmented/server"}) + public void shouldExpireMessageFragmented() throws Exception + { + k3po.finish(); + } } From 18819f4ffac570396c23f8017092d55757c1a3d1 Mon Sep 17 00:00:00 2001 From: bmaidics Date: Fri, 15 Dec 2023 13:06:18 +0100 Subject: [PATCH 3/4] More feedback --- .../mqtt/kafka/internal/config/MqttKafkaHeaderHelper.java | 2 +- .../kafka/publish.one.message.changed.topic.name/client.rpt | 2 +- .../kafka/publish.one.message.changed.topic.name/server.rpt | 2 +- .../mqtt/kafka/streams/kafka/publish.one.message/client.rpt | 2 +- .../mqtt/kafka/streams/kafka/publish.one.message/server.rpt | 2 +- .../kafka/session.will.message.abort.deliver.will/client.rpt | 2 +- .../kafka/session.will.message.abort.deliver.will/server.rpt | 2 +- .../kafka/subscribe.expire.message.fragmented/client.rpt | 2 +- .../kafka/subscribe.expire.message.fragmented/server.rpt | 2 +- .../kafka/streams/kafka/subscribe.expire.message/client.rpt | 2 +- .../kafka/streams/kafka/subscribe.expire.message/server.rpt | 2 +- 11 files changed, 11 insertions(+), 11 deletions(-) diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/config/MqttKafkaHeaderHelper.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/config/MqttKafkaHeaderHelper.java index 71be641871..4032e365a1 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/config/MqttKafkaHeaderHelper.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/config/MqttKafkaHeaderHelper.java @@ -34,7 +34,7 @@ public class MqttKafkaHeaderHelper private static final String KAFKA_LOCAL_HEADER_NAME = "zilla:local"; private static final String KAFKA_QOS_HEADER_NAME = "zilla:qos"; - private static final String KAFKA_TIMEOUT_HEADER_NAME = "zilla:timeout-ms"; + private static final String KAFKA_TIMEOUT_HEADER_NAME = "zilla:expiry"; private static final String KAFKA_CONTENT_TYPE_HEADER_NAME = "zilla:content-type"; diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message.changed.topic.name/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message.changed.topic.name/client.rpt index 7af4769612..67aa07f6ff 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message.changed.topic.name/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message.changed.topic.name/client.rpt @@ -39,7 +39,7 @@ write zilla:data.ext ${kafka:dataEx() .header("zilla:filter", "sensor") .header("zilla:filter", "one") .header("zilla:local", "client") - .headerInt("zilla:timeout-ms", 15000) + .headerInt("zilla:expiry", 15000) .header("zilla:content-type", "message") .header("zilla:format", "TEXT") .header("zilla:reply-to", "messages") diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message.changed.topic.name/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message.changed.topic.name/server.rpt index 6a6cc098b0..15ca4c4468 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message.changed.topic.name/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message.changed.topic.name/server.rpt @@ -42,7 +42,7 @@ read zilla:data.ext ${kafka:matchDataEx() .header("zilla:filter", "sensor") .header("zilla:filter", "one") .header("zilla:local", "client") - .headerInt("zilla:timeout-ms", 15000) + .headerInt("zilla:expiry", 15000) .header("zilla:content-type", "message") .header("zilla:format", "TEXT") .header("zilla:reply-to", "messages") diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message/client.rpt index 49f76cf963..abc2a1da21 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message/client.rpt @@ -39,7 +39,7 @@ write zilla:data.ext ${kafka:dataEx() .header("zilla:filter", "sensor") .header("zilla:filter", "one") .header("zilla:local", "client") - .headerInt("zilla:timeout-ms", 15000) + .headerInt("zilla:expiry", 15000) .header("zilla:content-type", "message") .header("zilla:format", "TEXT") .header("zilla:reply-to", "mqtt-messages") diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message/server.rpt index 82b379a218..581e1bff62 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message/server.rpt @@ -42,7 +42,7 @@ read zilla:data.ext ${kafka:matchDataEx() .header("zilla:filter", "sensor") .header("zilla:filter", "one") .header("zilla:local", "client") - .headerInt("zilla:timeout-ms", 15000) + .headerInt("zilla:expiry", 15000) .header("zilla:content-type", "message") .header("zilla:format", "TEXT") .header("zilla:reply-to", "mqtt-messages") diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.abort.deliver.will/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.abort.deliver.will/client.rpt index cb20e66539..7f6d6f625a 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.abort.deliver.will/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.abort.deliver.will/client.rpt @@ -467,7 +467,7 @@ write zilla:data.ext ${kafka:dataEx() .partition(-1, -1) .key("obituaries") .header("zilla:filter", "obituaries") - .headerInt("zilla:timeout-ms", 15000) + .headerInt("zilla:expiry", 15000) .header("zilla:format", "TEXT") .header("zilla:reply-to", "mqtt-messages") .header("zilla:reply-key", "responses/client1") diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.abort.deliver.will/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.abort.deliver.will/server.rpt index 68a08801fc..be7725e0f0 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.abort.deliver.will/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.abort.deliver.will/server.rpt @@ -466,7 +466,7 @@ read zilla:data.ext ${kafka:matchDataEx() .partition(-1, -1) .key("obituaries") .header("zilla:filter", "obituaries") - .headerInt("zilla:timeout-ms", 15000) + .headerInt("zilla:expiry", 15000) .header("zilla:format", "TEXT") .header("zilla:reply-to", "mqtt-messages") .header("zilla:reply-key", "responses/client1") diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/client.rpt index 173d709107..54730d4d02 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/client.rpt @@ -60,7 +60,7 @@ read zilla:data.ext ${kafka:matchDataEx() .header("zilla:filter", "sensor") .header("zilla:filter", "one") .header("zilla:local", "client") - .headerInt("zilla:timeout-ms", 1000) + .headerInt("zilla:expiry", 1000) .header("zilla:format", "TEXT") .build() .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/server.rpt index 760170ea12..3560004d4a 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/server.rpt @@ -64,7 +64,7 @@ write zilla:data.ext ${kafka:dataEx() .header("zilla:filter", "sensor") .header("zilla:filter", "one") .header("zilla:local", "client") - .headerInt("zilla:timeout-ms", 1000) + .headerInt("zilla:expiry", 1000) .header("zilla:format", "TEXT") .build() .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/client.rpt index c0f38481a2..052001027d 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/client.rpt @@ -60,7 +60,7 @@ read zilla:data.ext ${kafka:matchDataEx() .header("zilla:filter", "sensor") .header("zilla:filter", "one") .header("zilla:local", "client") - .headerInt("zilla:timeout-ms", 1000) + .headerInt("zilla:expiry", 1000) .header("zilla:format", "TEXT") .build() .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/server.rpt index 9e98198e4c..f88fa9fef6 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/server.rpt @@ -63,7 +63,7 @@ write zilla:data.ext ${kafka:dataEx() .header("zilla:filter", "sensor") .header("zilla:filter", "one") .header("zilla:local", "client") - .headerInt("zilla:timeout-ms", 1000) + .headerInt("zilla:expiry", 1000) .header("zilla:format", "TEXT") .build() .build()} From 3262f0c4096f4eaf1e612f5bd0a5ff10eb992ca6 Mon Sep 17 00:00:00 2001 From: bmaidics Date: Tue, 19 Dec 2023 11:20:21 +0100 Subject: [PATCH 4/4] feedbacks --- .../mqtt/kafka/internal/stream/MqttKafkaPublishFactory.java | 2 +- .../mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java | 3 +-- .../kafka/internal/stream/MqttKafkaSubscribeFactory.java | 4 ++-- .../kafka/internal/stream/MqttKafkaSubscribeProxyIT.java | 6 ------ .../kafka/publish.one.message.changed.topic.name/client.rpt | 2 +- .../kafka/publish.one.message.changed.topic.name/server.rpt | 2 +- .../mqtt/kafka/streams/kafka/publish.one.message/client.rpt | 2 +- .../mqtt/kafka/streams/kafka/publish.one.message/server.rpt | 2 +- .../session.will.message.abort.deliver.will/client.rpt | 6 +++--- .../session.will.message.abort.deliver.will/server.rpt | 6 +++--- .../client.rpt | 4 ++-- .../server.rpt | 4 ++-- .../kafka/subscribe.expire.message.fragmented/client.rpt | 4 +--- .../kafka/subscribe.expire.message.fragmented/server.rpt | 6 +++--- .../kafka/streams/kafka/subscribe.expire.message/client.rpt | 3 +-- .../kafka/streams/kafka/subscribe.expire.message/server.rpt | 6 +++--- .../kafka/streams/mqtt/subscribe.expire.message/client.rpt | 1 - .../kafka/streams/mqtt/subscribe.expire.message/server.rpt | 1 - 18 files changed, 26 insertions(+), 38 deletions(-) diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaPublishFactory.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaPublishFactory.java index f99f590726..40d74512f3 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaPublishFactory.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaPublishFactory.java @@ -386,7 +386,7 @@ private void onMqttData( if (mqttPublishDataEx.expiryInterval() != -1) { final MutableDirectBuffer expiryBuffer = new UnsafeBuffer(new byte[4]); - expiryBuffer.putInt(0, mqttPublishDataEx.expiryInterval() * 1000, ByteOrder.BIG_ENDIAN); + expiryBuffer.putInt(0, mqttPublishDataEx.expiryInterval(), ByteOrder.BIG_ENDIAN); kafkaHeadersRW.item(h -> { h.nameLen(helper.kafkaTimeoutHeaderName.sizeof()); diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java index 5f0613beaa..ad12b9bd54 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java @@ -506,8 +506,7 @@ private void onMqttData( MqttWillMessageFW will = mqttWillRO.tryWrap(buffer, offset, limit); this.delay = (int) Math.min(SECONDS.toMillis(will.delay()), sessionExpiryMillis); - final int expiryInterval = will.expiryInterval() == -1 ? -1 : - (int) TimeUnit.SECONDS.toMillis(will.expiryInterval()); + final int expiryInterval = will.expiryInterval() == -1 ? -1 : will.expiryInterval(); final MqttWillMessageFW.Builder willMessageBuilder = mqttMessageRW.wrap(willMessageBuffer, 0, willMessageBuffer.capacity()) .topic(will.topic()) diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java index 5411c2eed7..7d91f26277 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java @@ -1514,7 +1514,7 @@ private void onKafkaData( if (expireInterval != -1) { - b.expiryInterval((int) expireInterval / 1000); + b.expiryInterval((int) expireInterval); } if (helper.contentType != null) { @@ -2254,7 +2254,7 @@ private void onKafkaData( b.subscriptionIds(subscriptionIdsRW.build()); if (expireInterval != -1) { - b.expiryInterval((int) expireInterval / 1000); + b.expiryInterval((int) expireInterval); } if (helper.contentType != null) { diff --git a/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeProxyIT.java b/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeProxyIT.java index a894e33daa..62e2ac00fc 100644 --- a/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeProxyIT.java +++ b/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeProxyIT.java @@ -611,9 +611,6 @@ public void shouldReplayRetainedQos2() throws Exception "${kafka}/subscribe.expire.message/server"}) public void shouldExpireMessage() throws Exception { - k3po.start(); - Thread.sleep(1500); - k3po.notifyBarrier("WAIT_TIME_ELAPSED"); k3po.finish(); } @@ -625,9 +622,6 @@ public void shouldExpireMessage() throws Exception "${kafka}/subscribe.expire.message.fragmented/server"}) public void shouldExpireMessageFragmented() throws Exception { - k3po.start(); - Thread.sleep(1500); - k3po.notifyBarrier("WAIT_TIME_ELAPSED"); k3po.finish(); } } diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message.changed.topic.name/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message.changed.topic.name/client.rpt index 67aa07f6ff..6faa447bb7 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message.changed.topic.name/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message.changed.topic.name/client.rpt @@ -39,7 +39,7 @@ write zilla:data.ext ${kafka:dataEx() .header("zilla:filter", "sensor") .header("zilla:filter", "one") .header("zilla:local", "client") - .headerInt("zilla:expiry", 15000) + .headerInt("zilla:expiry", 15) .header("zilla:content-type", "message") .header("zilla:format", "TEXT") .header("zilla:reply-to", "messages") diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message.changed.topic.name/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message.changed.topic.name/server.rpt index 15ca4c4468..a9653e2d70 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message.changed.topic.name/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message.changed.topic.name/server.rpt @@ -42,7 +42,7 @@ read zilla:data.ext ${kafka:matchDataEx() .header("zilla:filter", "sensor") .header("zilla:filter", "one") .header("zilla:local", "client") - .headerInt("zilla:expiry", 15000) + .headerInt("zilla:expiry", 15) .header("zilla:content-type", "message") .header("zilla:format", "TEXT") .header("zilla:reply-to", "messages") diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message/client.rpt index abc2a1da21..66637714da 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message/client.rpt @@ -39,7 +39,7 @@ write zilla:data.ext ${kafka:dataEx() .header("zilla:filter", "sensor") .header("zilla:filter", "one") .header("zilla:local", "client") - .headerInt("zilla:expiry", 15000) + .headerInt("zilla:expiry", 15) .header("zilla:content-type", "message") .header("zilla:format", "TEXT") .header("zilla:reply-to", "mqtt-messages") diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message/server.rpt index 581e1bff62..468e8b279f 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/publish.one.message/server.rpt @@ -42,7 +42,7 @@ read zilla:data.ext ${kafka:matchDataEx() .header("zilla:filter", "sensor") .header("zilla:filter", "one") .header("zilla:local", "client") - .headerInt("zilla:expiry", 15000) + .headerInt("zilla:expiry", 15) .header("zilla:content-type", "message") .header("zilla:format", "TEXT") .header("zilla:reply-to", "mqtt-messages") diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.abort.deliver.will/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.abort.deliver.will/client.rpt index 7f6d6f625a..99c5b365fd 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.abort.deliver.will/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.abort.deliver.will/client.rpt @@ -295,7 +295,7 @@ write zilla:data.ext ${kafka:dataEx() write ${mqtt:will() .topic("obituaries") .delay(1000) - .expiryInterval(15000) + .expiryInterval(15) .format("TEXT") .responseTopic("responses/client1") .lifetimeId("1e6a1eb5-810a-459d-a12c-a6fa08f228d1") @@ -427,7 +427,7 @@ read zilla:data.ext ${kafka:matchDataEx() read ${mqtt:will() .topic("obituaries") .delay(1000) - .expiryInterval(15000) + .expiryInterval(15) .format("TEXT") .responseTopic("responses/client1") .lifetimeId("1e6a1eb5-810a-459d-a12c-a6fa08f228d1") @@ -467,7 +467,7 @@ write zilla:data.ext ${kafka:dataEx() .partition(-1, -1) .key("obituaries") .header("zilla:filter", "obituaries") - .headerInt("zilla:expiry", 15000) + .headerInt("zilla:expiry", 15) .header("zilla:format", "TEXT") .header("zilla:reply-to", "mqtt-messages") .header("zilla:reply-key", "responses/client1") diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.abort.deliver.will/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.abort.deliver.will/server.rpt index be7725e0f0..7be3e1ffa0 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.abort.deliver.will/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.abort.deliver.will/server.rpt @@ -298,7 +298,7 @@ read zilla:data.ext ${kafka:matchDataEx() read ${mqtt:will() .topic("obituaries") .delay(1000) - .expiryInterval(15000) + .expiryInterval(15) .format("TEXT") .responseTopic("responses/client1") .lifetimeId("1e6a1eb5-810a-459d-a12c-a6fa08f228d1") @@ -428,7 +428,7 @@ write zilla:data.ext ${kafka:dataEx() write ${mqtt:will() .topic("obituaries") .delay(1000) - .expiryInterval(15000) + .expiryInterval(15) .format("TEXT") .responseTopic("responses/client1") .lifetimeId("1e6a1eb5-810a-459d-a12c-a6fa08f228d1") @@ -466,7 +466,7 @@ read zilla:data.ext ${kafka:matchDataEx() .partition(-1, -1) .key("obituaries") .header("zilla:filter", "obituaries") - .headerInt("zilla:expiry", 15000) + .headerInt("zilla:expiry", 15) .header("zilla:format", "TEXT") .header("zilla:reply-to", "mqtt-messages") .header("zilla:reply-key", "responses/client1") diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.will.id.mismatch.skip.delivery/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.will.id.mismatch.skip.delivery/client.rpt index 03006dbc80..e558c9c0a8 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.will.id.mismatch.skip.delivery/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.will.id.mismatch.skip.delivery/client.rpt @@ -269,7 +269,7 @@ write zilla:data.ext ${kafka:dataEx() write ${mqtt:will() .topic("obituaries") .delay(1000) - .expiryInterval(15000) + .expiryInterval(15) .format("TEXT") .responseTopic("responses/client1") .lifetimeId("1e6a1eb5-810a-459d-a12c-a6fa08f228d1") @@ -399,7 +399,7 @@ read zilla:data.ext ${kafka:matchDataEx() read ${mqtt:will() .topic("obituaries") .delay(1000) - .expiryInterval(15000) + .expiryInterval(15) .format("TEXT") .responseTopic("responses/client1") .lifetimeId("1e6a1eb5-810a-459d-a12c-a6fa08f228d1") diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.will.id.mismatch.skip.delivery/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.will.id.mismatch.skip.delivery/server.rpt index 5c0d7a4597..5ae6d8c246 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.will.id.mismatch.skip.delivery/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.will.message.will.id.mismatch.skip.delivery/server.rpt @@ -269,7 +269,7 @@ read zilla:data.ext ${kafka:matchDataEx() read ${mqtt:will() .topic("obituaries") .delay(1000) - .expiryInterval(15000) + .expiryInterval(15) .format("TEXT") .responseTopic("responses/client1") .lifetimeId("1e6a1eb5-810a-459d-a12c-a6fa08f228d1") @@ -399,7 +399,7 @@ write zilla:data.ext ${kafka:dataEx() write ${mqtt:will() .topic("obituaries") .delay(1000) - .expiryInterval(15000) + .expiryInterval(15) .format("TEXT") .responseTopic("responses/client1") .lifetimeId("1e6a1eb5-810a-459d-a12c-a6fa08f228d1") diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/client.rpt index 54730d4d02..079b44646c 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/client.rpt @@ -45,8 +45,6 @@ read zilla:begin.ext ${kafka:matchBeginEx() connected - -read notify WAIT_TIME_ELAPSED read zilla:data.ext ${kafka:matchDataEx() .typeId(zilla:id("kafka")) .merged() @@ -60,7 +58,7 @@ read zilla:data.ext ${kafka:matchDataEx() .header("zilla:filter", "sensor") .header("zilla:filter", "one") .header("zilla:local", "client") - .headerInt("zilla:expiry", 1000) + .headerInt("zilla:expiry", 1) .header("zilla:format", "TEXT") .build() .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/server.rpt index 3560004d4a..5887c752ea 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message.fragmented/server.rpt @@ -13,7 +13,8 @@ # specific language governing permissions and limitations under the License. # -property timestamp ${kafka:timestamp()} +property deltaMillis 1000L +property timestamp ${kafka:timestamp() - deltaMillis} accept "zilla://streams/kafka0" option zilla:window 8192 @@ -49,7 +50,6 @@ write zilla:begin.ext ${kafka:beginEx() connected -write await WAIT_TIME_ELAPSED write option zilla:flags "init" write zilla:data.ext ${kafka:dataEx() .typeId(zilla:id("kafka")) @@ -64,7 +64,7 @@ write zilla:data.ext ${kafka:dataEx() .header("zilla:filter", "sensor") .header("zilla:filter", "one") .header("zilla:local", "client") - .headerInt("zilla:expiry", 1000) + .headerInt("zilla:expiry", 1) .header("zilla:format", "TEXT") .build() .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/client.rpt index 052001027d..3a2100ce41 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/client.rpt @@ -46,7 +46,6 @@ read zilla:begin.ext ${kafka:matchBeginEx() connected -read notify WAIT_TIME_ELAPSED read zilla:data.ext ${kafka:matchDataEx() .typeId(zilla:id("kafka")) .merged() @@ -60,7 +59,7 @@ read zilla:data.ext ${kafka:matchDataEx() .header("zilla:filter", "sensor") .header("zilla:filter", "one") .header("zilla:local", "client") - .headerInt("zilla:expiry", 1000) + .headerInt("zilla:expiry", 1) .header("zilla:format", "TEXT") .build() .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/server.rpt index f88fa9fef6..ef0389a20e 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/subscribe.expire.message/server.rpt @@ -13,7 +13,8 @@ # specific language governing permissions and limitations under the License. # -property timestamp ${kafka:timestamp()} +property deltaMillis 1000L +property timestamp ${kafka:timestamp() - deltaMillis} accept "zilla://streams/kafka0" option zilla:window 8192 @@ -49,7 +50,6 @@ write zilla:begin.ext ${kafka:beginEx() connected -write await WAIT_TIME_ELAPSED write zilla:data.ext ${kafka:dataEx() .typeId(zilla:id("kafka")) .merged() @@ -63,7 +63,7 @@ write zilla:data.ext ${kafka:dataEx() .header("zilla:filter", "sensor") .header("zilla:filter", "one") .header("zilla:local", "client") - .headerInt("zilla:expiry", 1000) + .headerInt("zilla:expiry", 1) .header("zilla:format", "TEXT") .build() .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/client.rpt index b3e73377f0..e4a549d274 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/client.rpt @@ -29,6 +29,5 @@ write zilla:begin.ext ${mqtt:beginEx() connected -write await WAIT_TIME_ELAPSED write close read closed diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/server.rpt index d474db70f8..6bdcfae9e6 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/mqtt/subscribe.expire.message/server.rpt @@ -30,6 +30,5 @@ read zilla:begin.ext ${mqtt:matchBeginEx() connected -read notify WAIT_TIME_ELAPSED read closed write close