Skip to content

Commit

Permalink
Merge pull request #61 from Chance-Maritime-Technologies/dev-explicit…
Browse files Browse the repository at this point in the history
…Types

Added the ability to explicitly set type names and some QoS settings
  • Loading branch information
lreiher authored May 29, 2024
2 parents f9598de + 1e0af0a commit 0871e9f
Show file tree
Hide file tree
Showing 5 changed files with 623 additions and 74 deletions.
44 changes: 26 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,58 +275,66 @@ client:
```yaml
bridge:
ros2mqtt: # Array specifying which ROS topics to map to which MQTT topics
ros2mqtt: # Array specifying which ROS topics to map to which MQTT topics
- ros_topic: # ROS topic whose messages are transformed to MQTT messages
mqtt_topic: # MQTT topic on which the corresponding ROS messages are sent to the broker
primitive: # [false] whether to publish as primitive message
inject_timestamp: # [false] whether to attach a timestamp to a ROS2MQTT payload (for latency computation on receiver side)
advanced:
ros:
queue_size: # [1] ROS subscriber queue size
queue_size: # [1] ROS subscriber queue size
mqtt:
qos: # [0] MQTT QoS value
retained: # [false] whether to retain MQTT message
mqtt2ros: # Array specifying which MQTT topics to map to which ROS topics
qos: # [0] MQTT QoS value
retained: # [false] whether to retain MQTT message
mqtt2ros: # Array specifying which MQTT topics to map to which ROS topics
- mqtt_topic: # MQTT topic on which messages are received from the broker
ros_topic: # ROS topic on which corresponding MQTT messages are published
primitive: # [false] whether to publish as primitive message (if coming from non-ROS MQTT client)
advanced:
mqtt:
qos: # [0] MQTT QoS value
qos: # [0] MQTT QoS value
ros:
queue_size: # [1] ROS publisher queue size
latched: # [false] whether to latch ROS message
queue_size: # [1] ROS publisher queue size
latched: # [false] whether to latch ROS message
```
##### ROS 2
```yaml
bridge:
ros2mqtt: # Object specifying which ROS topics to map to which MQTT topics
ros_topics: # Array specifying which ROS topics to bridge
ros2mqtt: # Object specifying which ROS topics to map to which MQTT topics
ros_topics: # Array specifying which ROS topics to bridge
- {{ ros_topic_name }} # The ROS topic that should be bridged, corresponds to the sub-object in the YAML
{{ ros_topic_name }}:
mqtt_topic: # MQTT topic on which the corresponding ROS messages are sent to the broker
primitive: # [false] whether to publish as primitive message
ros_type: # [*empty*] If set, the ROS msg type provided will be used. If empty, the type is automatically deduced via the publisher
inject_timestamp: # [false] whether to attach a timestamp to a ROS2MQTT payload (for latency computation on receiver side)
advanced:
ros:
queue_size: # [1] ROS subscriber queue size
queue_size: # [1] ROS subscriber queue size
qos:
reliability: # [auto] One of "auto", "system_default", "reliable", "best_effort". If auto, the QoS is automatically determined via the publisher
durability: # [auto] One of "auto", "system_default", "volatile", "transient_local". If auto, the QoS is automatically determined via the publisher
mqtt:
qos: # [0] MQTT QoS value
retained: # [false] whether to retain MQTT message
mqtt2ros: # Object specifying which MQTT topics to map to which ROS topics
mqtt_topics: # Array specifying which ROS topics to bridge
qos: # [0] MQTT QoS value
retained: # [false] whether to retain MQTT message
mqtt2ros: # Object specifying which MQTT topics to map to which ROS topics
mqtt_topics: # Array specifying which ROS topics to bridge
- {{ mqtt_topic_name }} # The MQTT topic that should be bridged, corresponds to the sub-object in the YAML
{{ mqtt_topic_name }}:
ros_topic: # ROS topic on which corresponding MQTT messages are published
ros_type: # [*empty*] If set, the ROS msg type provided will be used. If empty, the type is automatically deduced via the MQTT message
primitive: # [false] whether to publish as primitive message (if coming from non-ROS MQTT client)
advanced:
mqtt:
qos: # [0] MQTT QoS value
qos: # [0] MQTT QoS value
ros:
queue_size: # [1] ROS publisher queue size
latched: # [false] whether to latch ROS message
queue_size: # [1] ROS publisher queue size
latched: # [false] whether to latch ROS message
qos:
reliability: # [system_default] One of "system_default", "reliable", "best_effort".
durability: # [system_default] One of "system_default", "volatile", "transient_local".
```
## Primitive Messages
Expand Down
32 changes: 32 additions & 0 deletions mqtt_client/config/params.ros2.fixed-type-and-qos.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**/*:
ros__parameters:
broker:
host: localhost
port: 1883
bridge:
ros2mqtt:
ros_topics:
- /ping/primitive
/ping/primitive:
mqtt_topic: pingpong/primitive
primitive: false
ros_type: std_msgs/msg/Bool
advanced:
ros:
queue_size: 10
qos:
durability: auto
reliability: auto
mqtt2ros:
mqtt_topics:
- pingpong/primitive
pingpong/primitive:
ros_topic: /pong/primitive
primitive: false
ros_type: std_msgs/msg/Bool
advanced:
ros:
queue_size: 10
qos:
durability: transient_local
reliability: reliable
32 changes: 32 additions & 0 deletions mqtt_client/config/params.ros2.primitive-fixed-type-and-qos.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**/*:
ros__parameters:
broker:
host: localhost
port: 1883
bridge:
ros2mqtt:
ros_topics:
- /ping/primitive
/ping/primitive:
mqtt_topic: pingpong/primitive
primitive: true
ros_type: std_msgs/msg/Bool
advanced:
ros:
queue_size: 10
qos:
durability: auto
reliability: auto
mqtt2ros:
mqtt_topics:
- pingpong/primitive
pingpong/primitive:
ros_topic: /pong/primitive
primitive: true
ros_type: std_msgs/msg/Bool
advanced:
ros:
queue_size: 10
qos:
durability: transient_local
reliability: reliable
103 changes: 76 additions & 27 deletions mqtt_client/include/mqtt_client/MqttClient.ros2.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,18 @@ SOFTWARE.
#include <filesystem>
#include <map>
#include <memory>
#include <optional>
#include <string>

#define FMT_HEADER_ONLY
#include <fmt/format.h>
#include <mqtt/async_client.h>
#include <mqtt_client_interfaces/srv/is_connected.hpp>
#include <mqtt_client_interfaces/srv/new_mqtt2_ros_bridge.hpp>
#include <mqtt_client_interfaces/srv/new_ros2_mqtt_bridge.hpp>
#include <mqtt/async_client.h>
#include <rclcpp/rclcpp.hpp>
#include <rclcpp/serialization.hpp>
#include <rclcpp/qos.hpp>
#include <std_msgs/msg/float64.hpp>


Expand Down Expand Up @@ -71,6 +73,9 @@ class MqttClient : public rclcpp::Node,
explicit MqttClient(const rclcpp::NodeOptions& options);

protected:
struct Ros2MqttInterface;
struct Mqtt2RosInterface;

/**
* @brief Loads ROS parameters from parameter server.
*/
Expand Down Expand Up @@ -98,8 +103,7 @@ class MqttClient : public rclcpp::Node,
* @return true if parameter was successfully retrieved
* @return false if parameter was not found or default was used
*/
bool loadParameter(const std::string& key, std::string& value,
const std::string& default_value);
bool loadParameter(const std::string& key, std::string& value, const std::string& default_value);

/**
* @brief Loads requested ROS parameter from parameter server.
Expand Down Expand Up @@ -180,10 +184,42 @@ class MqttClient : public rclcpp::Node,
void setup();

/**
* @brief Checks all active ROS topics in order to set up generic subscribers.
* @brief Get the resolved compatible QOS from the interface and the endpoint
*
* This uses the two endpoints to decide upon a compatible QoS, resolving any "auto" QoS settings
*
* @param ros_topic the ROS topic we are looking on
* @param tei Topic endpoint info
* @param ros2mqtt the ROS to MQTT interface spec
*
* @returns The compatible QoS or nullopt if no compatible combination is found
*/
std::optional<rclcpp::QoS> getCompatibleQoS(
const std::string& ros_topic, const rclcpp::TopicEndpointInfo& tei,
const Ros2MqttInterface& ros2mqtt) const;

/**
* @brief Get the candiate topic endpoints for subscription matching
*
* @param ros2mqtt the ROS to MQTT interface spec
*
* @returns The compatible QoS or nullopt if no compatible combination is found
*/
std::vector<rclcpp::TopicEndpointInfo> getCandidatePublishers(
const std::string& ros_topic, const Ros2MqttInterface& ros2mqtt) const;

/**
* @brief Setup any subscriptions we can.
*
* These may be fixed type/QoS, or dynamically matched against active publisher
*/
void setupSubscriptions();

/**
* @brief Setup any publishers that we can
*/
void setupPublishers();

/**
* @brief Sets up the client connection options and initializes the client
* object.
Expand Down Expand Up @@ -236,12 +272,17 @@ class MqttClient : public rclcpp::Node,
/**
* @brief Publishes a primitive message received via MQTT to ROS.
*
* Currently not implemented.
*
* @param mqtt_msg MQTT message
*/
void mqtt2primitive(mqtt::const_message_ptr mqtt_msg);

/**
* @brief Publishes a primitive message received via MQTT to ROS.
*
* @param mqtt_msg MQTT message
*/
void mqtt2fixed(mqtt::const_message_ptr mqtt_msg);

/**
* @brief Callback for when the client has successfully connected to the
* broker.
Expand Down Expand Up @@ -351,10 +392,9 @@ class MqttClient : public rclcpp::Node,
std::string user; ///< username
std::string pass; ///< password
struct {
bool enabled; ///< whether to connect via SSL/TLS
std::filesystem::path
ca_certificate; ///< public CA certificate trusted by client
} tls; ///< SSL/TLS-related variables
bool enabled; ///< whether to connect via SSL/TLS
std::filesystem::path ca_certificate; ///< public CA certificate trusted by client
} tls; ///< SSL/TLS-related variables
};

/**
Expand All @@ -377,14 +417,14 @@ class MqttClient : public rclcpp::Node,
double keep_alive_interval; ///< keep-alive interval
int max_inflight; ///< maximum number of inflight messages
struct {
std::filesystem::path certificate; ///< client certificate
std::filesystem::path key; ///< client private keyfile
std::string password; ///< decryption password for private key
int version; ///< TLS version (https://github.com/eclipse/paho.mqtt.cpp/blob/master/src/mqtt/ssl_options.h#L305)
bool verify; ///< Verify the client should conduct
///< post-connect checks
std::vector<std::string> alpn_protos; ///< list of ALPN protocols
} tls; ///< SSL/TLS-related variables
std::filesystem::path certificate; ///< client certificate
std::filesystem::path key; ///< client private keyfile
std::string password; ///< decryption password for private key
int version; ///< TLS version (https://github.com/eclipse/paho.mqtt.cpp/blob/master/src/mqtt/ssl_options.h#L305)
bool verify; ///< Verify the client should conduct
///< post-connect checks
std::vector<std::string> alpn_protos; ///< list of ALPN protocols
} tls; ///< SSL/TLS-related variables
};

/**
Expand All @@ -397,12 +437,18 @@ class MqttClient : public rclcpp::Node,
std::string msg_type; ///< message type of subscriber
int queue_size = 1; ///< ROS subscriber queue size
bool is_stale = false; ///< whether a new generic publisher/subscriber is required
struct {
// If these are set to nullopt then that part of the QoS is determine automatically based on discovery
std::optional<rclcpp::ReliabilityPolicy> reliability;
std::optional<rclcpp::DurabilityPolicy> durability;
} qos;
} ros; ///< ROS-related variables
struct {
std::string topic; ///< MQTT topic
int qos = 0; ///< MQTT QoS value
bool retained = false; ///< whether to retain MQTT message
} mqtt; ///< MQTT-related variables
bool fixed_type = false; ///< whether the published message type is specified explicitly
bool primitive = false; ///< whether to publish as primitive message
bool stamped = false; ///< whether to inject timestamp in MQTT message
};
Expand All @@ -412,18 +458,23 @@ class MqttClient : public rclcpp::Node,
*/
struct Mqtt2RosInterface {
struct {
int qos = 0; ///< MQTT QoS value
} mqtt; ///< MQTT-related variables
int qos = 0; ///< MQTT QoS value
} mqtt; ///< MQTT-related variables
struct {
std::string topic; ///< ROS topic
std::string msg_type; ///< message type of publisher
rclcpp::GenericPublisher::SharedPtr publisher; ///< generic ROS publisher
rclcpp::Publisher<std_msgs::msg::Float64>::SharedPtr
latency_publisher; ///< ROS publisher for latency
int queue_size = 1; ///< ROS publisher queue size
struct {
rclcpp::ReliabilityPolicy reliability = rclcpp::ReliabilityPolicy::SystemDefault;
rclcpp::DurabilityPolicy durability = rclcpp::DurabilityPolicy::SystemDefault;
} qos;
bool latched = false; ///< whether to latch ROS message
bool is_stale = false; ///< whether a new generic publisher/subscriber is required
} ros; ///< ROS-related variables
} ros; ///< ROS-related variables
bool fixed_type = false; ///< whether the published ros message type is specified explicitly
bool primitive = false; ///< whether to publish as primitive message (if
///< coming from non-ROS MQTT client)
bool stamped = false; ///< whether timestamp is injected
Expand Down Expand Up @@ -535,27 +586,25 @@ bool MqttClient::loadParameter(const std::string& key, T& value,


template <typename T>
bool MqttClient::loadParameter(const std::string& key, std::vector<T>& value)
{
bool MqttClient::loadParameter(const std::string& key, std::vector<T>& value) {
const bool found = get_parameter(key, value);
if (found)
RCLCPP_WARN(get_logger(), "Retrieved parameter '%s' = '[%s]'", key.c_str(),
fmt::format("{}", fmt::join(value, ", ")).c_str());
fmt::format("{}", fmt::join(value, ", ")).c_str());
return found;
}


template <typename T>
bool MqttClient::loadParameter(const std::string& key, std::vector<T>& value,
const std::vector<T>& default_value)
{
const std::vector<T>& default_value) {
const bool found = get_parameter_or(key, value, default_value);
if (!found)
RCLCPP_WARN(get_logger(), "Parameter '%s' not set, defaulting to '%s'",
key.c_str(), fmt::format("{}", fmt::join(value, ", ")).c_str());
if (found)
RCLCPP_DEBUG(get_logger(), "Retrieved parameter '%s' = '%s'", key.c_str(),
fmt::format("{}", fmt::join(value, ", ")).c_str());
fmt::format("{}", fmt::join(value, ", ")).c_str());
return found;
}

Expand Down
Loading

0 comments on commit 0871e9f

Please sign in to comment.