Skip to content

Commit

Permalink
[mqtt] Correctly unsubscribe after message is received (openhab#7723)
Browse files Browse the repository at this point in the history
* Correctly unsubscribe after message is received
* Address review comments
* Use StandardCharset for UTF-8

Signed-off-by: Aitor Iturrioz <riturrioz@gmail.com>
  • Loading branch information
bodiroga authored and J-N-K committed Jul 14, 2020
1 parent 8ebbc6c commit 73efe4a
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package org.openhab.binding.mqtt.generic.tools;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -24,40 +25,37 @@
import org.eclipse.smarthome.io.transport.mqtt.MqttMessageSubscriber;

/**
* Waits for a topic value to appear on a MQTT topic. One-time useable only per instance.
* Waits for a topic value to appear on a MQTT topic. One-time usable only per instance.
*
* @author David Graeff - Initial contribution
*/
@NonNullByDefault
public class WaitForTopicValue {
private CompletableFuture<String> future = new CompletableFuture<>();
private final CompletableFuture<Boolean> subscripeFuture;
private final CompletableFuture<String> composeFuture;

/**
* Creates an a instance.
*
* @param connection A broker connection.
* @param topic The topic
* @throws InterruptedException
* @throws ExecutionException
*/
public WaitForTopicValue(MqttBrokerConnection connection, String topic)
throws InterruptedException, ExecutionException {
public WaitForTopicValue(MqttBrokerConnection connection, String topic) {
final CompletableFuture<String> future = new CompletableFuture<>();
final MqttMessageSubscriber mqttMessageSubscriber = (t, payload) -> {
future.complete(new String(payload));
future.complete(new String(payload, StandardCharsets.UTF_8));
};
future = future.whenComplete((r, e) -> {
future.whenComplete((r, e) -> {
connection.unsubscribe(topic, mqttMessageSubscriber);
});

subscripeFuture = connection.subscribe(topic, mqttMessageSubscriber);
composeFuture = connection.subscribe(topic, mqttMessageSubscriber).thenCompose(b -> future);
}

/**
* Free any resources
*/
public void stop() {
future.completeExceptionally(new Exception("Stopped"));
composeFuture.completeExceptionally(new Exception("Stopped"));
}

/**
Expand All @@ -68,15 +66,15 @@ public void stop() {
*/
public @Nullable String waitForTopicValue(int timeoutInMS) {
try {
return subscripeFuture.thenCompose(b -> future).get(timeoutInMS, TimeUnit.MILLISECONDS);
return composeFuture.get(timeoutInMS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
return null;
}
}

private void timeout() {
if (!future.isDone()) {
future.completeExceptionally(new TimeoutException());
if (!composeFuture.isDone()) {
composeFuture.completeExceptionally(new TimeoutException());
}
}

Expand All @@ -89,6 +87,6 @@ private void timeout() {
*/
public CompletableFuture<String> waitForTopicValueAsync(ScheduledExecutorService scheduler, int timeoutInMS) {
scheduler.schedule(this::timeout, timeoutInMS, TimeUnit.MILLISECONDS);
return subscripeFuture.thenCompose(b -> future);
return composeFuture;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
Expand Down Expand Up @@ -94,15 +93,11 @@ public void receivedMessage(ThingUID connectionBridge, MqttBrokerConnection conn
}

// Retrieve name and update found discovery
try {
WaitForTopicValue w = new WaitForTopicValue(connection, topic.replace("$homie", "$name"));
w.waitForTopicValueAsync(scheduler, 700).thenAccept(name -> {
publishDevice(connectionBridge, connection, deviceID, topic, name);
});
} catch (InterruptedException | ExecutionException ignored) {
// The name is nice to have, but not required, use deviceId as fallback
publishDevice(connectionBridge, connection, deviceID, topic, deviceID);
}
WaitForTopicValue w = new WaitForTopicValue(connection, topic.replace("$homie", "$name"));
w.waitForTopicValueAsync(scheduler, 700).whenComplete((name, ex) -> {
String deviceName = ex == null ? name : deviceID;
publishDevice(connectionBridge, connection, deviceID, topic, deviceName);
});
}

void publishDevice(ThingUID connectionBridge, MqttBrokerConnection connection, String deviceID, String topic,
Expand Down

0 comments on commit 73efe4a

Please sign in to comment.