-
-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[mqtt] Correctly unsubscribe after message is received #7723
Conversation
Signed-off-by: Aitor Iturrioz <riturrioz@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That bug is amazing.
@@ -46,11 +46,11 @@ public WaitForTopicValue(MqttBrokerConnection connection, String topic) | |||
final MqttMessageSubscriber mqttMessageSubscriber = (t, payload) -> { | |||
future.complete(new String(payload)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default charset varies from system to system so it is better to specify it yourself instead of using the default.
@@ -24,14 +24,14 @@ | |||
import org.eclipse.smarthome.io.transport.mqtt.MqttMessageSubscriber; | |||
|
|||
/** | |||
* Waits for a topic value to appear on a MQTT topic. One-time useable only per instance. | |||
* Waits for a topic value to appear on a MQTT topic. One-time usable only per instance. | |||
* | |||
* @author David Graeff - Initial contribution | |||
*/ | |||
@NonNullByDefault | |||
public class WaitForTopicValue { | |||
private CompletableFuture<String> future = new CompletableFuture<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private CompletableFuture<String> future = new CompletableFuture<>(); | |
private final CompletableFuture<String> future = new CompletableFuture<>(); |
@@ -68,7 +68,7 @@ public void stop() { | |||
*/ | |||
public @Nullable String waitForTopicValue(int timeoutInMS) { | |||
try { | |||
return subscripeFuture.thenCompose(b -> future).get(timeoutInMS, TimeUnit.MILLISECONDS); | |||
return subscribeFuture.thenCompose(b -> future).get(timeoutInMS, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems a bit wasteful to create a new compose future every time. Could you cache subscribeFuture.thenCompose(b -> future)
in a field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will modify it, but keep in mind that the WaitForTopicValue is a one-time use class and a new object should be created in each use. What would be the benefit?
Signed-off-by: Aitor Iturrioz <riturrioz@gmail.com>
Done! |
subscribeFuture = connection.subscribe(topic, mqttMessageSubscriber); | ||
|
||
composeFuture = subscribeFuture.thenCompose(b -> future); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably don't need the subscribeFuture
field anymore.
subscribeFuture = connection.subscribe(topic, mqttMessageSubscriber); | |
composeFuture = subscribeFuture.thenCompose(b -> future); | |
composeFuture = connection.subscribe(topic, mqttMessageSubscriber).thenCompose(b -> future); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
* | ||
* @author David Graeff - Initial contribution | ||
*/ | ||
@NonNullByDefault | ||
public class WaitForTopicValue { | ||
private CompletableFuture<String> future = new CompletableFuture<>(); | ||
private final CompletableFuture<Boolean> subscripeFuture; | ||
private final CompletableFuture<String> future = new CompletableFuture<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect that you should replace usages of future
in this class with composeFuture
.
Your timeout
method completes the future
but the result of that future isn't available until the subscribeFuture
completes first so your timeout wouldn't actually cause the timeout as you would expect.
However if you caused the timeout on the composeFuture
everything would operate as you would expect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @cpmeister, although we're going to change more lines than the class itself has 😄 Anyway, it seems that the Homie discovery process has another unnoticed error. The Homie300Discovery class does the following:
try {
WaitForTopicValue w = new WaitForTopicValue(connection, topic.replace("$homie", "$name"));
w.waitForTopicValueAsync(scheduler, 700).thenAccept(name -> {
publishDevice(connectionBridge, connection, deviceID, topic, name);
});
} catch (InterruptedException | ExecutionException ignored) {
// The name is nice to have, but not required, use deviceId as fallback
publishDevice(connectionBridge, connection, deviceID, topic, deviceID);
}
But, if I intentionally remove the $name message and thus, trigger the timeout, the composeFuture.completeExceptionally
function doesn't throw any error, so no discoveryResult is published. I'm sorry, but do you know how can I receive the completeExceptionally
message in the w.waitForTopicValueAsync()
sequence?
This CompleteFuture topic is driving me crazy, I'm more used to synchronous programming 😞
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this work?
WaitForTopicValue w = new WaitForTopicValue(connection, topic.replace("$homie", "$name"));
w.waitForTopicValueAsync(scheduler, 700).whenComplete((name, ex) -> {
String deviceName = ex == null ? deviceID : name;
publishDevice(connectionBridge, connection, deviceID, topic, deviceName);
});
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome, it works! It seems that I need to read more about whenComplete
, exceptionally
, thenApply
... 😉
Signed-off-by: Aitor Iturrioz <riturrioz@gmail.com>
Let's see if this is the final review! 👍 |
Travis tests were successfulHey @bodiroga, |
@@ -44,20 +45,24 @@ | |||
public WaitForTopicValue(MqttBrokerConnection connection, String topic) | |||
throws InterruptedException, ExecutionException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see anywhere this would be thrown. It should be safe to remove these exceptions.
* | ||
* @author David Graeff - Initial contribution | ||
*/ | ||
@NonNullByDefault | ||
public class WaitForTopicValue { | ||
private CompletableFuture<String> future = new CompletableFuture<>(); | ||
private final CompletableFuture<Boolean> subscripeFuture; | ||
private final CompletableFuture<String> future = new CompletableFuture<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this needs to be a field anymore since it isn't used outside of the constructor.
@@ -44,20 +45,24 @@ | |||
public WaitForTopicValue(MqttBrokerConnection connection, String topic) | |||
throws InterruptedException, ExecutionException { | |||
final MqttMessageSubscriber mqttMessageSubscriber = (t, payload) -> { | |||
future.complete(new String(payload)); | |||
try { | |||
future.complete(new String(payload, "UTF-8")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to use the String(byte[] bytes, Charset charset)
method with const StandardCharsets.UTF_8
to omit the UnsupportedEncodingException
?
Signed-off-by: Aitor Iturrioz <riturrioz@gmail.com>
Signed-off-by: Aitor Iturrioz <riturrioz@gmail.com>
Done! Thank you so much for you comments guys! |
Travis tests were successfulHey @bodiroga, |
1 similar comment
Travis tests were successfulHey @bodiroga, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not to myself: futures are not builders :-)
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
* Correctly unsubscribe after message is received * Address review comments * Use StandardCharset for UTF-8 Signed-off-by: Aitor Iturrioz <riturrioz@gmail.com>
* Correctly unsubscribe after message is received * Address review comments * Use StandardCharset for UTF-8 Signed-off-by: Aitor Iturrioz <riturrioz@gmail.com>
* Correctly unsubscribe after message is received * Address review comments * Use StandardCharset for UTF-8 Signed-off-by: Aitor Iturrioz <riturrioz@gmail.com> Signed-off-by: CSchlipp <christian@schlipp.de>
* Correctly unsubscribe after message is received * Address review comments * Use StandardCharset for UTF-8 Signed-off-by: Aitor Iturrioz <riturrioz@gmail.com>
* Correctly unsubscribe after message is received * Address review comments * Use StandardCharset for UTF-8 Signed-off-by: Aitor Iturrioz <riturrioz@gmail.com>
* Correctly unsubscribe after message is received * Address review comments * Use StandardCharset for UTF-8 Signed-off-by: Aitor Iturrioz <riturrioz@gmail.com>
* Correctly unsubscribe after message is received * Address review comments * Use StandardCharset for UTF-8 Signed-off-by: Aitor Iturrioz <riturrioz@gmail.com>
* Correctly unsubscribe after message is received * Address review comments * Use StandardCharset for UTF-8 Signed-off-by: Aitor Iturrioz <riturrioz@gmail.com> Signed-off-by: Daan Meijer <daan@studioseptember.nl>
* Correctly unsubscribe after message is received * Address review comments * Use StandardCharset for UTF-8 Signed-off-by: Aitor Iturrioz <riturrioz@gmail.com>
The
future
attribute was overwritten in the constructor and, as a consequence, thewhenComplete
lambda method was not called. This made it impossible to re-subscribe to the same topic from the same connection. This fix is critical for Homie users, as the extensions uses this function in the discovery process.A couple of typos are also fixed.
Closes #6975
Closes #7252
Reviewers: @jochen314, @J-N-K, @cpmeister
Signed-off-by: Aitor Iturrioz riturrioz@gmail.com