Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed ServerLowLevel tests not running #580

Merged
merged 2 commits into from
May 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions broker/src/main/java/io/moquette/BrokerConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ public final class BrokerConstants {

public static final String STORAGE_CLASS_NAME = "storage_class";

public static final int FLIGHT_BEFORE_RESEND_MS = 5_000;
public static final int INFLIGHT_WINDOW_SIZE = 10;

private BrokerConstants() {
}
}
4 changes: 2 additions & 2 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.moquette.broker;

import static io.moquette.BrokerConstants.FLIGHT_BEFORE_RESEND_MS;
import static io.moquette.BrokerConstants.INFLIGHT_WINDOW_SIZE;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
Expand All @@ -34,8 +36,6 @@
class Session {

private static final Logger LOG = LoggerFactory.getLogger(Session.class);
private static final int FLIGHT_BEFORE_RESEND_MS = 5_000;
private static final int INFLIGHT_WINDOW_SIZE = 10;

static class InFlightPacket implements Delayed {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.moquette.integration;

import static io.moquette.BrokerConstants.FLIGHT_BEFORE_RESEND_MS;
import io.moquette.broker.Server;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.MemoryConfig;
Expand All @@ -38,11 +39,12 @@
import java.util.concurrent.TimeUnit;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.jupiter.api.Assertions.*;

public class ServerLowlevelMessagesIntegrationTests {
public class ServerLowlevelMessagesIntegrationTest {

private static final Logger LOG = LoggerFactory.getLogger(ServerLowlevelMessagesIntegrationTests.class);
private static final Logger LOG = LoggerFactory.getLogger(ServerLowlevelMessagesIntegrationTest.class);
static MqttClientPersistence s_dataStore;
Server m_server;
Client m_client;
Expand Down Expand Up @@ -171,4 +173,43 @@ public void testWillMessageIsPublishedOnClientBadDisconnection() throws Interrup
m_willSubscriber.disconnect();
}

@Test
public void testResendNotAckedPublishes() throws MqttException, InterruptedException {
LOG.info("*** testResendNotAckedPublishes ***");
String topic = "/test";

MqttClient subscriber = new MqttClient("tcp://localhost:1883", "Subscriber");
MqttClient publisher = new MqttClient("tcp://localhost:1883", "Publisher");

try {
subscriber.connect();
publisher.connect();

AtomicBoolean isFirst = new AtomicBoolean(true);
AtomicBoolean receivedPublish = new AtomicBoolean(false);
subscriber.subscribe(topic, 1, (String topic1, org.eclipse.paho.client.mqttv3.MqttMessage message) -> {
if (isFirst.getAndSet(false)) {
// wait to trigger resending PUBLISH
TimeUnit.SECONDS.sleep(FLIGHT_BEFORE_RESEND_MS * 2);
} else {
receivedPublish.set(true);
}
});

publisher.publish(topic, "hello".getBytes(), 1, false);
Awaitility.await("Waiting for resend.")
.atMost(FLIGHT_BEFORE_RESEND_MS * 3, TimeUnit.MILLISECONDS)
.pollDelay(FLIGHT_BEFORE_RESEND_MS * 2, TimeUnit.MILLISECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.untilTrue(receivedPublish);
} finally {
try {
if (subscriber.isConnected()) {
subscriber.disconnect();
}
} finally {
publisher.disconnect();
}
}
}
}