Skip to content

Commit

Permalink
[mqtt] Keep track of retained messages
Browse files Browse the repository at this point in the history
Closes #1279

Signed-off-by: Jochen Klein <git@jochen.susca.de>
  • Loading branch information
jochen314 authored and cweitkamp committed Jan 15, 2020
1 parent 18d0a52 commit cc3f140
Show file tree
Hide file tree
Showing 14 changed files with 177 additions and 402 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.io.transport.mqtt.internal.ClientCallback;
import org.openhab.core.io.transport.mqtt.internal.TopicSubscribers;
import org.openhab.core.io.transport.mqtt.internal.Subscription;
import org.openhab.core.io.transport.mqtt.internal.client.Mqtt3AsyncClientWrapper;
import org.openhab.core.io.transport.mqtt.internal.client.Mqtt5AsyncClientWrapper;
import org.openhab.core.io.transport.mqtt.internal.client.MqttAsyncClientWrapper;
Expand Down Expand Up @@ -103,7 +108,7 @@ public enum MqttVersion {
protected @Nullable MqttAsyncClientWrapper client;
protected boolean isConnecting = false;
protected final List<MqttConnectionObserver> connectionObservers = new CopyOnWriteArrayList<>();
protected final Map<String, TopicSubscribers> subscribers = new ConcurrentHashMap<>();
protected final Map<String, Subscription> subscribers = new ConcurrentHashMap<>();

// Connection timeout handling
protected final AtomicReference<@Nullable ScheduledFuture<?>> timeoutFuture = new AtomicReference<>(null);
Expand All @@ -126,6 +131,7 @@ public ConnectionCallback(MqttBrokerConnection mqttBrokerConnectionImpl) {
this.cancelTimeoutFuture = mqttBrokerConnectionImpl::cancelTimeoutFuture;
}

@Override
public void onConnected(@Nullable MqttClientConnectedContext context) {
cancelTimeoutFuture.run();

Expand All @@ -134,8 +140,8 @@ public void onConnected(@Nullable MqttClientConnectedContext context) {
connection.reconnectStrategy.connectionEstablished();
}
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
connection.subscribers.forEach((topic, subscriberList) -> {
futures.add(connection.subscribeRaw(topic));
connection.subscribers.forEach((topic, subscription) -> {
futures.add(connection.subscribeRaw(topic, subscription));
});

// As soon as all subscriptions are performed, turn the connection future complete.
Expand All @@ -146,6 +152,7 @@ public void onConnected(@Nullable MqttClientConnectedContext context) {
});
}

@Override
public void onDisconnected(@Nullable MqttClientDisconnectedContext context) {
if (context != null) {
final Throwable throwable = context.getCause();
Expand Down Expand Up @@ -178,8 +185,6 @@ public CompletableFuture<Boolean> createFuture() {
}
}

// Client callback object
protected ClientCallback clientCallback = new ClientCallback(this, connectionObservers, subscribers);
// Connection callback object
protected ConnectionCallback connectionCallback;

Expand Down Expand Up @@ -542,31 +547,20 @@ public boolean hasSubscribers() {
* @return Completes with true if successful. Completes with false if not connected yet. Exceptionally otherwise.
*/
public CompletableFuture<Boolean> subscribe(String topic, MqttMessageSubscriber subscriber) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
final Subscription subscription;
final boolean needsSubscribe;
synchronized (subscribers) {
TopicSubscribers subscriberList = subscribers.getOrDefault(topic, new TopicSubscribers(topic));
subscribers.put(topic, subscriberList);
subscriberList.add(subscriber);
}
final MqttAsyncClientWrapper client = this.client;
if (client == null) {
future.completeExceptionally(new Exception("No MQTT client"));
return future;
subscription = subscribers.computeIfAbsent(topic, t -> new Subscription());

needsSubscribe = subscription.isEmpty();

subscription.add(subscriber);
}
if (client.getState().isConnected()) {
client.subscribe(topic, qos, clientCallback).whenComplete((s, t) -> {
if (t == null) {
future.complete(true);
} else {
logger.warn("Failed subscribing to topic {}", topic, t);
future.completeExceptionally(new MqttException(t));
}
});
} else {
// The subscription will be performed on connecting.
future.complete(false);

if (needsSubscribe) {
return subscribeRaw(topic, subscription);
}
return future;
return CompletableFuture.completedFuture(true);
}

/**
Expand All @@ -575,13 +569,14 @@ public CompletableFuture<Boolean> subscribe(String topic, MqttMessageSubscriber
* @param topic The topic to subscribe to.
* @return Completes with true if successful. Exceptionally otherwise.
*/
protected CompletableFuture<Boolean> subscribeRaw(String topic) {
protected CompletableFuture<Boolean> subscribeRaw(String topic, Subscription subscription) {
logger.trace("subscribeRaw message consumer for topic '{}' from broker '{}'", topic, host);
CompletableFuture<Boolean> future = new CompletableFuture<>();
final MqttAsyncClientWrapper mqttClient = this.client;
if (mqttClient != null && mqttClient.getState().isConnected()) {
mqttClient.subscribe(topic, qos, clientCallback).whenComplete((s, t) -> {
mqttClient.subscribe(topic, qos, subscription).whenComplete((s, t) -> {
if (t == null) {
logger.trace("Successfully subscribed to topic {}", topic);
future.complete(true);
} else {
logger.warn("Failed subscribing to topic {}", topic, t);
Expand All @@ -604,30 +599,32 @@ protected CompletableFuture<Boolean> subscribeRaw(String topic) {
*/
@SuppressWarnings({ "null", "unused" })
public CompletableFuture<Boolean> unsubscribe(String topic, MqttMessageSubscriber subscriber) {
final boolean needsUnsubscribe;

synchronized (subscribers) {
final @Nullable List<MqttMessageSubscriber> list = subscribers.get(topic);
if (list == null) {
final @Nullable Subscription subscription = subscribers.get(topic);
if (subscription == null) {
logger.trace("Tried to unsubscribe {} from topic {}, but subscriber list is empty", subscriber, topic);
return CompletableFuture.completedFuture(true);
}
list.remove(subscriber);
if (!list.isEmpty()) {
logger.trace("Removed {} from topic subscribers for topic {}, but other subscribers present",
subscriber, topic);
return CompletableFuture.completedFuture(true);
subscription.remove(subscriber);

if (subscription.isEmpty()) {
needsUnsubscribe = true;
subscribers.remove(topic);
} else {
needsUnsubscribe = false;
}
// Remove from subscriber list
subscribers.remove(topic);
// No more subscribers to this topic. Unsubscribe topic on the broker
}
if (needsUnsubscribe) {
MqttAsyncClientWrapper mqttClient = this.client;
if (mqttClient != null) {
logger.trace("Subscriber list is empty after removing {}, unsubscribing topic {} from client",
subscriber, topic);
return unsubscribeRaw(mqttClient, topic);
} else {
return CompletableFuture.completedFuture(false);
}
}
return CompletableFuture.completedFuture(true);
}

/**
Expand Down Expand Up @@ -772,7 +769,7 @@ public CompletableFuture<Void> unsubscribeAll() {
MqttAsyncClientWrapper client = this.client;
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
if (client != null) {
subscribers.forEach((topic, subList) -> {
subscribers.forEach((topic, subscription) -> {
futures.add(unsubscribeRaw(client, topic));
});
subscribers.clear();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.core.io.transport.mqtt.internal;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;

import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;

/**
* This class keeps track of all the subscribers to a specific topic.
* <p>
* <b>Retained</b> messages for the topic are stored so they can be replayed to new subscribers.
*
* @author Jochen Klein - Initial contribution
*/
@NonNullByDefault
public class Subscription {
private final Map<String, byte[]> retainedMessages = new ConcurrentHashMap<>();
private final Collection<MqttMessageSubscriber> subscribers = ConcurrentHashMap.newKeySet();

/**
* Add a new subscriber.
* <p>
* If there are any retained messages, they will be delivered to the subscriber.
*
* @param subscriber
*/
public void add(MqttMessageSubscriber subscriber) {
if (subscribers.add(subscriber)) {
// new subscriber. deliver all known retained messages
retainedMessages.entrySet().parallelStream()
.forEach(entry -> processMessage(subscriber, entry.getKey(), entry.getValue()));
}
}

/**
* Remove a subscriber from the list.
*
* @param subscriber
*/
public void remove(MqttMessageSubscriber subscriber) {
subscribers.remove(subscriber);
}

public boolean isEmpty() {
return subscribers.isEmpty();
}

public void messageArrived(Mqtt3Publish message) {
messageArrived(message.getTopic().toString(), message.getPayloadAsBytes(), message.isRetain());
}

public void messageArrived(Mqtt5Publish message) {
messageArrived(message.getTopic().toString(), message.getPayloadAsBytes(), message.isRetain());
}

public void messageArrived(String topic, byte[] payload, boolean retain) {
if (retain) {
if (payload.length > 0) {
retainedMessages.put(topic, payload);
} else {
retainedMessages.remove(topic);
}
}
subscribers.parallelStream().forEach(subscriber -> processMessage(subscriber, topic, payload));
}

private void processMessage(MqttMessageSubscriber subscriber, String topic, byte[] payload) {
subscriber.processMessage(topic, payload);
}
}
Loading

0 comments on commit cc3f140

Please sign in to comment.