From a3780c26b012b8587a5b2e449f529c8330215c94 Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Thu, 18 Feb 2021 13:18:11 +0100 Subject: [PATCH] Added a test for #573 --- ...ServerLowlevelMessagesIntegrationTest.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTest.java b/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTest.java index c7a1d2e6d..4659cadd7 100644 --- a/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTest.java +++ b/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTest.java @@ -38,6 +38,8 @@ import java.util.concurrent.TimeUnit; import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*; import static java.nio.charset.StandardCharsets.UTF_8; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.jupiter.api.Assertions.*; public class ServerLowlevelMessagesIntegrationTest { @@ -171,4 +173,38 @@ public void testWillMessageIsPublishedOnClientBadDisconnection() throws Interrup m_willSubscriber.disconnect(); } + @Test + public void testResendNotAckedPublishes() throws MqttException, InterruptedException { + LOG.info("*** testResendNotAckedPublishes ***"); + String topic = "/test"; + + MqttClient subscriber = new MqttClient("tcp://localhost:1883", "Subscriber"); + MqttClient publisher = new MqttClient("tcp://localhost:1883", "Publisher"); + + try { + subscriber.connect(); + publisher.connect(); + + AtomicBoolean isFirst = new AtomicBoolean(true); + CountDownLatch countDownLatch = new CountDownLatch(2); + subscriber.subscribe(topic, 1, (String topic1, org.eclipse.paho.client.mqttv3.MqttMessage message) -> { + if (isFirst.getAndSet(false)) { + // wait to trigger resending PUBLISH + TimeUnit.SECONDS.sleep(12); + } + countDownLatch.countDown(); + }); + + publisher.publish(topic, "hello".getBytes(), 1, false); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + } + finally { + try { + subscriber.disconnect(); + } + finally { + publisher.disconnect(); + } + } + } }