From a5368e08fb3432592ecac42a2650c2f8791eb0c2 Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Mon, 27 Oct 2025 10:51:32 +0100 Subject: [PATCH 1/2] Add new disconnectGracefully method to all interfaces --- .../client/internal/mqtt/MqttAsyncClient.java | 5 ++ .../internal/mqtt/MqttBlockingClient.java | 9 +++ .../client/internal/mqtt/MqttRxClient.java | 5 ++ .../handler/connect/MqttConnAckSingle.java | 4 +- .../MqttDisconnectGracefulCompletable.java | 63 +++++++++++++++++++ .../disconnect/MqttDisconnectHandler.java | 31 +++++++++ .../mqtt/mqtt3/Mqtt3AsyncClientView.java | 13 ++++ .../mqtt/mqtt3/Mqtt3BlockingClientView.java | 9 +++ .../hivemq/client/mqtt/MqttClientState.java | 9 ++- .../client/mqtt/mqtt3/Mqtt3AsyncClient.java | 16 +++++ .../mqtt/mqtt3/Mqtt3BlockingClient.java | 15 +++++ .../client/mqtt/mqtt5/Mqtt5AsyncClient.java | 16 +++++ .../mqtt/mqtt5/Mqtt5BlockingClient.java | 15 +++++ 13 files changed, 207 insertions(+), 3 deletions(-) create mode 100644 src/main/java/com/hivemq/client/internal/mqtt/handler/disconnect/MqttDisconnectGracefulCompletable.java diff --git a/src/main/java/com/hivemq/client/internal/mqtt/MqttAsyncClient.java b/src/main/java/com/hivemq/client/internal/mqtt/MqttAsyncClient.java index e9c768431..d8942c720 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/MqttAsyncClient.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/MqttAsyncClient.java @@ -270,6 +270,11 @@ public void publishes( return new MqttDisconnectBuilder.Send<>(this::disconnect); } + @Override + public @NotNull CompletableFuture disconnectGracefully() { + return RxFutureConverter.toFuture(delegate.disconnectGracefullyUnsafe()); + } + @Override public @NotNull MqttClientConfig getConfig() { return delegate.getConfig(); diff --git a/src/main/java/com/hivemq/client/internal/mqtt/MqttBlockingClient.java b/src/main/java/com/hivemq/client/internal/mqtt/MqttBlockingClient.java index a5c86b83e..f498d4be2 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/MqttBlockingClient.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/MqttBlockingClient.java @@ -201,6 +201,15 @@ public void disconnect(final @NotNull Mqtt5Disconnect disconnect) { return new MqttDisconnectBuilder.SendVoid(this::disconnect); } + @Override + public void disconnectGracefully() { + try { + delegate.disconnectGracefullyUnsafe().blockingAwait(); + } catch (final RuntimeException e) { + throw AsyncRuntimeException.fillInStackTrace(e); + } + } + @Override public @NotNull MqttClientConfig getConfig() { return delegate.getConfig(); diff --git a/src/main/java/com/hivemq/client/internal/mqtt/MqttRxClient.java b/src/main/java/com/hivemq/client/internal/mqtt/MqttRxClient.java index c9fa963ea..84f9f0bcd 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/MqttRxClient.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/MqttRxClient.java @@ -19,6 +19,7 @@ import com.hivemq.client.internal.mqtt.handler.auth.MqttReAuthCompletable; import com.hivemq.client.internal.mqtt.handler.connect.MqttConnAckSingle; import com.hivemq.client.internal.mqtt.handler.disconnect.MqttDisconnectCompletable; +import com.hivemq.client.internal.mqtt.handler.disconnect.MqttDisconnectGracefulCompletable; import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttGlobalIncomingPublishFlowable; import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttSubscribedPublishFlowable; import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckFlowable; @@ -260,6 +261,10 @@ public MqttRxClient(final @NotNull MqttClientConfig clientConfig) { return new MqttDisconnectCompletable(clientConfig, disconnect); } + @NotNull Completable disconnectGracefullyUnsafe() { + return new MqttDisconnectGracefulCompletable(clientConfig); + } + @Override public MqttDisconnectBuilder.@NotNull Nested disconnectWith() { return new MqttDisconnectBuilder.Nested<>(this::disconnect); diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnAckSingle.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnAckSingle.java index b475862b6..68897d8b0 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnAckSingle.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnAckSingle.java @@ -150,11 +150,11 @@ private static void reconnect( } } - if (reconnector.isReconnect()) { + if (reconnector.isReconnect() && clientConfig.getRawState().get() != DISCONNECTING_GRACEFULLY) { clientConfig.getRawState().set(DISCONNECTED_RECONNECT); eventLoop.schedule(() -> { reconnector.getFuture().whenComplete((ignored, throwable) -> { - if (reconnector.isReconnect()) { + if (reconnector.isReconnect() && clientConfig.getRawState().get() != DISCONNECTING_GRACEFULLY) { if (clientConfig.getRawState().compareAndSet(DISCONNECTED_RECONNECT, CONNECTING_RECONNECT)) { clientConfig.setCurrentTransportConfig(reconnector.getTransportConfig()); diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/disconnect/MqttDisconnectGracefulCompletable.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/disconnect/MqttDisconnectGracefulCompletable.java new file mode 100644 index 000000000..e2ed3e2ac --- /dev/null +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/disconnect/MqttDisconnectGracefulCompletable.java @@ -0,0 +1,63 @@ +/* + * Copyright 2018-present HiveMQ and the HiveMQ Community + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hivemq.client.internal.mqtt.handler.disconnect; + +import com.hivemq.client.internal.mqtt.MqttClientConfig; +import com.hivemq.client.internal.mqtt.MqttClientConnectionConfig; +import com.hivemq.client.internal.mqtt.exceptions.MqttClientStateExceptions; +import com.hivemq.client.internal.rx.CompletableFlow; +import io.netty.channel.Channel; +import io.reactivex.Completable; +import io.reactivex.CompletableObserver; +import io.reactivex.internal.disposables.EmptyDisposable; +import org.jetbrains.annotations.NotNull; + +/** + * Completable for gracefully disconnecting the client, canceling any ongoing reconnection attempts. + * + * @author Silvio Giebl + * @since 1.4.0 + */ +public class MqttDisconnectGracefulCompletable extends Completable { + + private final @NotNull MqttClientConfig clientConfig; + + public MqttDisconnectGracefulCompletable(final @NotNull MqttClientConfig clientConfig) { + this.clientConfig = clientConfig; + } + + @Override + protected void subscribeActual(final @NotNull CompletableObserver s) { + final MqttClientConnectionConfig connectionConfig = clientConfig.getRawConnectionConfig(); + if (connectionConfig == null) { + // If not connected, just complete successfully for graceful disconnect + EmptyDisposable.complete(s); + return; + } + final Channel channel = connectionConfig.getChannel(); + final MqttDisconnectHandler disconnectHandler = + (MqttDisconnectHandler) channel.pipeline().get(MqttDisconnectHandler.NAME); + if (disconnectHandler == null) { + // If no disconnect handler, just complete successfully for graceful disconnect + EmptyDisposable.complete(s); + return; + } + final CompletableFlow flow = new CompletableFlow(s); + s.onSubscribe(flow); + disconnectHandler.disconnectGracefully(flow); + } +} diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/disconnect/MqttDisconnectHandler.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/disconnect/MqttDisconnectHandler.java index 145c8afe0..dd57cc72d 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/disconnect/MqttDisconnectHandler.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/disconnect/MqttDisconnectHandler.java @@ -30,7 +30,9 @@ import com.hivemq.client.internal.mqtt.message.connect.MqttConnectRestrictions; import com.hivemq.client.internal.mqtt.message.connect.connack.MqttConnAck; import com.hivemq.client.internal.mqtt.message.disconnect.MqttDisconnect; +import com.hivemq.client.internal.mqtt.message.disconnect.MqttDisconnectBuilder; import com.hivemq.client.internal.rx.CompletableFlow; +import com.hivemq.client.mqtt.MqttClientState; import com.hivemq.client.mqtt.MqttVersion; import com.hivemq.client.mqtt.exceptions.ConnectionClosedException; import com.hivemq.client.mqtt.lifecycle.MqttDisconnectSource; @@ -139,6 +141,14 @@ public void disconnect(final @NotNull MqttDisconnect disconnect, final @NotNull } } + public void disconnectGracefully(final @NotNull CompletableFlow flow) { + if (!clientConfig.executeInEventLoop(() -> writeGracefulDisconnect(flow))) { + // If no event loop is available, just complete the flow successfully + // This handles the case where the client is not connected or in an invalid state + flow.onComplete(); + } + } + private void writeDisconnect(final @NotNull MqttDisconnect disconnect, final @NotNull CompletableFlow flow) { final ChannelHandlerContext ctx = this.ctx; if ((ctx != null) && (state == null)) { @@ -149,6 +159,27 @@ private void writeDisconnect(final @NotNull MqttDisconnect disconnect, final @No } } + private void writeGracefulDisconnect(final @NotNull CompletableFlow flow) { + // Set the client state to DISCONNECTING_GRACEFULLY to prevent reconnection + clientConfig.getRawState().set(MqttClientState.DISCONNECTING_GRACEFULLY); + + final ChannelHandlerContext ctx = this.ctx; + if (ctx != null) { + // If we have an active connection, send a disconnect message + if (state == null) { + state = STATE_CLOSED; + final MqttDisconnect disconnect = new MqttDisconnectBuilder.Default().build(); + fireDisconnectEvent(ctx.channel(), new MqttDisconnectEvent.ByUser(disconnect, flow)); + } else { + // If already disconnected, just complete the flow + flow.onComplete(); + } + } else { + // No active connection, just complete the flow + flow.onComplete(); + } + } + @Override protected void onDisconnectEvent( final @NotNull ChannelHandlerContext ctx, final @NotNull MqttDisconnectEvent disconnectEvent) { diff --git a/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3AsyncClientView.java b/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3AsyncClientView.java index 33359785b..16da6b289 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3AsyncClientView.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3AsyncClientView.java @@ -258,6 +258,19 @@ public void publishes( return future; } + @Override + public @NotNull CompletableFuture disconnectGracefully() { + final CompletableFuture future = new CompletableFuture<>(); + delegate.disconnectGracefully().whenComplete((ignored, throwable) -> { + if (throwable != null) { + future.completeExceptionally(Mqtt3ExceptionFactory.map(throwable)); + } else { + future.complete(null); + } + }); + return future; + } + @Override public @NotNull Mqtt3ClientConfig getConfig() { return clientConfig; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3BlockingClientView.java b/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3BlockingClientView.java index 2b732b470..881aac1b6 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3BlockingClientView.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3BlockingClientView.java @@ -153,6 +153,15 @@ public void disconnect() { } } + @Override + public void disconnectGracefully() { + try { + delegate.disconnectGracefully(); + } catch (final RuntimeException e) { + throw Mqtt3ExceptionFactory.mapWithStackTrace(e); + } + } + @Override public @NotNull Mqtt3ClientConfig getConfig() { return clientConfig; diff --git a/src/main/java/com/hivemq/client/mqtt/MqttClientState.java b/src/main/java/com/hivemq/client/mqtt/MqttClientState.java index 4413c4d39..6cd89274d 100644 --- a/src/main/java/com/hivemq/client/mqtt/MqttClientState.java +++ b/src/main/java/com/hivemq/client/mqtt/MqttClientState.java @@ -68,7 +68,14 @@ public enum MqttClientState { * This means the client was {@link #DISCONNECTED_RECONNECT}, a Connect message is sent, but the ConnAck message is * not received yet. */ - CONNECTING_RECONNECT; + CONNECTING_RECONNECT, + /** + * The client is gracefully disconnecting, canceling any ongoing reconnection attempts. + *

+ * This state is used when {@code disconnectGracefully()} is called to ensure the client + * transitions to a clean {@link #DISCONNECTED} state. + */ + DISCONNECTING_GRACEFULLY; private static final @NotNull EnumSet CONNECTED_OR_RECONNECT = EnumSet.of(CONNECTED, DISCONNECTED_RECONNECT, CONNECTING_RECONNECT); diff --git a/src/main/java/com/hivemq/client/mqtt/mqtt3/Mqtt3AsyncClient.java b/src/main/java/com/hivemq/client/mqtt/mqtt3/Mqtt3AsyncClient.java index 28b12ef90..0570b5605 100644 --- a/src/main/java/com/hivemq/client/mqtt/mqtt3/Mqtt3AsyncClient.java +++ b/src/main/java/com/hivemq/client/mqtt/mqtt3/Mqtt3AsyncClient.java @@ -321,6 +321,22 @@ void publishes( */ @NotNull CompletableFuture disconnect(); + /** + * Gracefully disconnects the client, canceling any ongoing reconnection attempts. + *

+ * This method can be called from any client state and will ensure the client + * transitions to a clean {@link com.hivemq.client.mqtt.MqttClientState#DISCONNECTED DISCONNECTED} state. + * Unlike the regular {@link #disconnect()} method, this will not throw an exception + * when the client is in a reconnecting state. + *

+ * This is particularly useful when automatic reconnection is enabled and you need + * to cleanly shut down the client regardless of its current connection state. + * + * @return the {@link CompletableFuture} which completes when the client is fully disconnected + * @since 1.4.0 + */ + @NotNull CompletableFuture disconnectGracefully(); + @Override @CheckReturnValue default @NotNull Mqtt3AsyncClient toAsync() { diff --git a/src/main/java/com/hivemq/client/mqtt/mqtt3/Mqtt3BlockingClient.java b/src/main/java/com/hivemq/client/mqtt/mqtt3/Mqtt3BlockingClient.java index 3dd6ddc81..dc2c308f0 100644 --- a/src/main/java/com/hivemq/client/mqtt/mqtt3/Mqtt3BlockingClient.java +++ b/src/main/java/com/hivemq/client/mqtt/mqtt3/Mqtt3BlockingClient.java @@ -163,6 +163,21 @@ public interface Mqtt3BlockingClient extends Mqtt3Client { */ void disconnect(); + /** + * Gracefully disconnects the client, canceling any ongoing reconnection attempts. + *

+ * This method can be called from any client state and will ensure the client + * transitions to a clean {@link com.hivemq.client.mqtt.MqttClientState#DISCONNECTED DISCONNECTED} state. + * Unlike the regular {@link #disconnect()} method, this will not throw an exception + * when the client is in a reconnecting state. + *

+ * This is particularly useful when automatic reconnection is enabled and you need + * to cleanly shut down the client regardless of its current connection state. + * + * @since 1.4.0 + */ + void disconnectGracefully(); + @Override @CheckReturnValue default @NotNull Mqtt3BlockingClient toBlocking() { diff --git a/src/main/java/com/hivemq/client/mqtt/mqtt5/Mqtt5AsyncClient.java b/src/main/java/com/hivemq/client/mqtt/mqtt5/Mqtt5AsyncClient.java index 5e77444de..db69b949f 100644 --- a/src/main/java/com/hivemq/client/mqtt/mqtt5/Mqtt5AsyncClient.java +++ b/src/main/java/com/hivemq/client/mqtt/mqtt5/Mqtt5AsyncClient.java @@ -368,6 +368,22 @@ void publishes( @CheckReturnValue Mqtt5DisconnectBuilder.@NotNull Send> disconnectWith(); + /** + * Gracefully disconnects the client, canceling any ongoing reconnection attempts. + *

+ * This method can be called from any client state and will ensure the client + * transitions to a clean {@link com.hivemq.client.mqtt.MqttClientState#DISCONNECTED DISCONNECTED} state. + * Unlike the regular {@link #disconnect()} method, this will not throw an exception + * when the client is in a reconnecting state. + *

+ * This is particularly useful when automatic reconnection is enabled and you need + * to cleanly shut down the client regardless of its current connection state. + * + * @return the {@link CompletableFuture} which completes when the client is fully disconnected + * @since 1.4.0 + */ + @NotNull CompletableFuture disconnectGracefully(); + @Override @CheckReturnValue default @NotNull Mqtt5AsyncClient toAsync() { diff --git a/src/main/java/com/hivemq/client/mqtt/mqtt5/Mqtt5BlockingClient.java b/src/main/java/com/hivemq/client/mqtt/mqtt5/Mqtt5BlockingClient.java index 50e228347..b58d260d7 100644 --- a/src/main/java/com/hivemq/client/mqtt/mqtt5/Mqtt5BlockingClient.java +++ b/src/main/java/com/hivemq/client/mqtt/mqtt5/Mqtt5BlockingClient.java @@ -206,6 +206,21 @@ public interface Mqtt5BlockingClient extends Mqtt5Client { @CheckReturnValue Mqtt5DisconnectBuilder.@NotNull SendVoid disconnectWith(); + /** + * Gracefully disconnects the client, canceling any ongoing reconnection attempts. + *

+ * This method can be called from any client state and will ensure the client + * transitions to a clean {@link com.hivemq.client.mqtt.MqttClientState#DISCONNECTED DISCONNECTED} state. + * Unlike the regular {@link #disconnect()} method, this will not throw an exception + * when the client is in a reconnecting state. + *

+ * This is particularly useful when automatic reconnection is enabled and you need + * to cleanly shut down the client regardless of its current connection state. + * + * @since 1.4.0 + */ + void disconnectGracefully(); + @Override @CheckReturnValue default @NotNull Mqtt5BlockingClient toBlocking() { From aec95b692e4c389c01d8fbe9b4556dfec5a90b72 Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Mon, 27 Oct 2025 10:52:19 +0100 Subject: [PATCH 2/2] Updated examples and add tests --- .../mqtt/examples/ReconnectStrategy.java | 51 ++++- .../Mqtt3ClientGracefulDisconnectTest.java | 191 ++++++++++++++++++ 2 files changed, 241 insertions(+), 1 deletion(-) create mode 100644 src/test/java/com/hivemq/client/mqtt/lifecycle/Mqtt3ClientGracefulDisconnectTest.java diff --git a/examples/src/main/java/com/hivemq/client/mqtt/examples/ReconnectStrategy.java b/examples/src/main/java/com/hivemq/client/mqtt/examples/ReconnectStrategy.java index dfd65fb26..ab25738d6 100644 --- a/examples/src/main/java/com/hivemq/client/mqtt/examples/ReconnectStrategy.java +++ b/examples/src/main/java/com/hivemq/client/mqtt/examples/ReconnectStrategy.java @@ -35,7 +35,8 @@ public class ReconnectStrategy { public static void main(final String[] args) throws InterruptedException { // defaultReconnect(); // customizedReconnect(); - completelyCustom(); +// completelyCustom(); + gracefulDisconnectExample(); } public static void defaultReconnect() { @@ -109,4 +110,52 @@ private static CompletableFuture getOAuthToken() { return new byte[] {1, 2, 3}; }); } + + /** + * Demonstrates graceful disconnect functionality. + * This example shows how to use disconnectGracefully() to cleanly shut down + * a client even when automatic reconnection is enabled and the client is + * in a reconnecting state. + */ + public static void gracefulDisconnectExample() throws InterruptedException { + System.out.println("=== Graceful Disconnect Example ==="); + + final Mqtt5BlockingClient client = Mqtt5Client.builder() + .serverHost("broker.hivemq.com") + .automaticReconnect() + .initialDelay(1, TimeUnit.SECONDS) + .maxDelay(2, TimeUnit.SECONDS) + .applyAutomaticReconnect() + .addConnectedListener(context -> System.out.println("Connected: " + LocalTime.now())) + .addDisconnectedListener(context -> System.out.println("Disconnected: " + LocalTime.now() + + " (Source: " + context.getSource() + ")")) + .buildBlocking(); + + try { + // Connect the client + System.out.println("Connecting..."); + client.connect(); + System.out.println("Connected successfully!"); + + // Simulate network issues by turning off network (in real scenario) + System.out.println("Simulating network issues..."); + System.out.println("Client state: " + client.getState()); + + // Wait a bit to let reconnection attempts start + TimeUnit.SECONDS.sleep(3); + System.out.println("Client state after network issues: " + client.getState()); + + // Now demonstrate graceful disconnect + System.out.println("Calling disconnectGracefully()..."); + client.disconnectGracefully(); + System.out.println("Graceful disconnect completed!"); + System.out.println("Final client state: " + client.getState()); + + } catch (final Exception e) { + System.err.println("Error during graceful disconnect example: " + e.getMessage()); + e.printStackTrace(); + } + + System.out.println("=== End Graceful Disconnect Example ==="); + } } diff --git a/src/test/java/com/hivemq/client/mqtt/lifecycle/Mqtt3ClientGracefulDisconnectTest.java b/src/test/java/com/hivemq/client/mqtt/lifecycle/Mqtt3ClientGracefulDisconnectTest.java new file mode 100644 index 000000000..e575ccd4a --- /dev/null +++ b/src/test/java/com/hivemq/client/mqtt/lifecycle/Mqtt3ClientGracefulDisconnectTest.java @@ -0,0 +1,191 @@ +/* + * Copyright 2018-present HiveMQ and the HiveMQ Community + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hivemq.client.mqtt.lifecycle; + +import com.hivemq.client.mqtt.MqttClient; +import com.hivemq.client.mqtt.MqttClientState; +import com.hivemq.client.mqtt.mqtt3.Mqtt3BlockingClient; +import com.hivemq.client.mqtt.mqtt3.Mqtt3Client; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Test for graceful disconnect functionality in MQTT 3 clients. + * + * This test addresses the issue described in GitHub issue #675 where + * disconnect() fails with MqttClientStateException when automatic + * reconnection is enabled and the client is in a reconnecting state. + * + * @since 1.4.0 + */ +class Mqtt3ClientGracefulDisconnectTest { + + @Test + @Timeout(30) + void disconnectGracefully_whenNotConnected_shouldSucceed() { + final Mqtt3BlockingClient client = Mqtt3Client.builder() + .serverHost("localhost") + .serverPort(1883) + .buildBlocking(); + + // Should not throw any exception + assertDoesNotThrow(() -> client.disconnectGracefully()); + assertEquals(MqttClientState.DISCONNECTED, client.getState()); + } + + @Test + @Timeout(30) + void disconnectGracefully_whenConnected_shouldSucceed() throws Exception { + final Mqtt3BlockingClient client = Mqtt3Client.builder() + .serverHost("broker.hivemq.com") + .serverPort(1883) + .buildBlocking(); + + try { + // Connect the client + client.connect(); + assertEquals(MqttClientState.CONNECTED, client.getState()); + + // Graceful disconnect should succeed + assertDoesNotThrow(() -> client.disconnectGracefully()); + assertEquals(MqttClientState.DISCONNECTED, client.getState()); + } catch (Exception e) { + // If connection fails (network issues), graceful disconnect should still work + assertDoesNotThrow(() -> client.disconnectGracefully()); + assertEquals(MqttClientState.DISCONNECTED, client.getState()); + } + } + + @Test + @Timeout(30) + void disconnectGracefully_withAutomaticReconnect_shouldCancelReconnection() throws Exception { + final CountDownLatch disconnectedLatch = new CountDownLatch(1); + final AtomicReference finalState = new AtomicReference<>(); + + final Mqtt3BlockingClient client = Mqtt3Client.builder() + .serverHost("broker.hivemq.com") + .serverPort(1883) + .automaticReconnect() + .initialDelay(1, TimeUnit.SECONDS) + .maxDelay(2, TimeUnit.SECONDS) + .applyAutomaticReconnect() + .addDisconnectedListener(context -> { + System.out.println("Disconnected: " + context.getSource()); + finalState.set(MqttClientState.DISCONNECTED); + disconnectedLatch.countDown(); + }) + .buildBlocking(); + + try { + // Connect the client + client.connect(); + assertEquals(MqttClientState.CONNECTED, client.getState()); + + // Disconnect to trigger reconnection + client.disconnect(); + + // Wait a bit for reconnection to start + Thread.sleep(500); + + // Now call graceful disconnect - this should cancel reconnection + assertDoesNotThrow(() -> client.disconnectGracefully()); + + // Wait for disconnection to complete + assertTrue(disconnectedLatch.await(5, TimeUnit.SECONDS)); + + // Final state should be DISCONNECTED, not DISCONNECTED_RECONNECT + assertEquals(MqttClientState.DISCONNECTED, client.getState()); + assertEquals(MqttClientState.DISCONNECTED, finalState.get()); + + } catch (Exception e) { + // If connection fails (network issues), graceful disconnect should still work + assertDoesNotThrow(() -> client.disconnectGracefully()); + assertEquals(MqttClientState.DISCONNECTED, client.getState()); + } + } + + @Test + @Timeout(30) + void disconnectGracefully_async_shouldSucceed() throws Exception { + final Mqtt3BlockingClient client = Mqtt3Client.builder() + .serverHost("broker.hivemq.com") + .serverPort(1883) + .buildBlocking(); + + try { + // Connect the client + client.connect(); + assertEquals(MqttClientState.CONNECTED, client.getState()); + + // Test async graceful disconnect + final var future = client.toAsync().disconnectGracefully(); + assertNotNull(future); + + // Wait for completion + future.get(5, TimeUnit.SECONDS); + assertEquals(MqttClientState.DISCONNECTED, client.getState()); + + } catch (Exception e) { + // If connection fails (network issues), graceful disconnect should still work + final var future = client.toAsync().disconnectGracefully(); + assertNotNull(future); + future.get(5, TimeUnit.SECONDS); + assertEquals(MqttClientState.DISCONNECTED, client.getState()); + } + } + + @Test + @Timeout(30) + void disconnectGracefully_comparedToRegularDisconnect() throws Exception { + final Mqtt3BlockingClient client = Mqtt3Client.builder() + .serverHost("broker.hivemq.com") + .serverPort(1883) + .automaticReconnect() + .initialDelay(1, TimeUnit.SECONDS) + .maxDelay(2, TimeUnit.SECONDS) + .applyAutomaticReconnect() + .buildBlocking(); + + try { + // Connect the client + client.connect(); + assertEquals(MqttClientState.CONNECTED, client.getState()); + + // Disconnect to trigger reconnection + client.disconnect(); + + // Wait a bit for reconnection to start + Thread.sleep(500); + + // Regular disconnect might throw MqttClientStateException in reconnecting state + // Graceful disconnect should not throw any exception + assertDoesNotThrow(() -> client.disconnectGracefully()); + assertEquals(MqttClientState.DISCONNECTED, client.getState()); + + } catch (Exception e) { + // If connection fails (network issues), graceful disconnect should still work + assertDoesNotThrow(() -> client.disconnectGracefully()); + assertEquals(MqttClientState.DISCONNECTED, client.getState()); + } + } +}