Skip to content

CompletableFuture returned from subscribe on a CONNECTING async client never completes if the client does not connect #612

@jfontsaballs

Description

@jfontsaballs

🐛 Bug Report

CompletableFuture returned from subscribe on a CONNECTING async client never completes if the client does not connect. Also, after this the client can not be stopped and reconnection seems to stop happening.

🔬 How To Reproduce

Please see the code sample below, executed without a broker. For this behavior to occur the client never connects to the broker.

A similarly suspicious behavior happens if you call subscribe while the client is reconnecting, although I have not been able to analyze it properly.

Code sample

Kotiln:

import com.hivemq.client.mqtt.lifecycle.MqttClientAutoReconnect
import java.util.UUID
import java.util.concurrent.TimeUnit

fun main() {
    var shouldStop = false
    val client = com.hivemq.client.mqtt.MqttClient.builder()
        .identifier(UUID.randomUUID().toString())
        .serverHost("localhost")
        .automaticReconnect(
            MqttClientAutoReconnect.builder()
                .initialDelay(500, TimeUnit.MILLISECONDS)
                .maxDelay(5000, TimeUnit.MILLISECONDS)
                .build()
        )
        .addDisconnectedListener { context ->
            println("Disconnected")
            if (shouldStop)
                context.reconnector.reconnect(false)
        }
        .useMqttVersion5()
        .buildAsync()

    // Start connection but don't wait for it to complete
    val connectionFuture = client.connectWith()
        .cleanStart(true)
        .keepAlive(10 /*seconds*/)
        .send()

    println(client.state) // CONNECTING

    // Publish, it throws as expected
    try {
        client.publishWith()
            .topic("something")
            .payload("whatever".toByteArray())
            .send().join()
    } catch (e: Throwable) {
        println(e) //com.hivemq.client.mqtt.exceptions.MqttClientStateException: MQTT client is not connected.
    }

    val subscribeFuture = client.subscribeWith()
        .topicFilter("something")
        .send()
    Thread.sleep(5000)
    //subscribeFuture.join() !! Never completes
    println(subscribeFuture)

    // Then I tried setting a timeout
    try {
        client.subscribeWith()
            .topicFilter("something")
            .send()
            .orTimeout(2, TimeUnit.SECONDS)
            .join()
    } catch (e: Throwable) {
        println(e)
    }

    shouldStop = true
    connectionFuture.cancel(true)
    try {
        client.disconnect().join()
    } catch (e: Throwable) {
        println(e)
    }
    println("END")
    // After this, application never stops neither it tries to reconnect with the broker
}

Output

CONNECTING
java.util.concurrent.CompletionException: com.hivemq.client.mqtt.exceptions.MqttClientStateException: MQTT client is not connected.
Disconnected
Disconnected
Disconnected
com.hivemq.client.internal.rx.RxFutureConverter$RxSingleFuture@238d68ff[Not completed]
Disconnected
java.util.concurrent.CompletionException: java.util.concurrent.TimeoutException
java.util.concurrent.CompletionException: com.hivemq.client.mqtt.exceptions.MqttClientStateException: MQTT client is not connected.
END

Environment

Where are you running/using this client?

Hardware or Device?
Laptop with Intel i7 Processor

What version of this client are you using?
1.3.0

JVM version?
Java 21, Gradle 8.5

Operating System?
Windows 10

Which MQTT protocol version is being used?
5

Which MQTT broker (name and version)?
None

📈 Expected behavior

Subscribe throws if it can't get through, similar to publish

📎 Additional context

Bug found while analyzing the behavior of the client under communication difficulties with the broker.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions