From 8ed75d1487019f933ef9a7d201fb29332c8a8d73 Mon Sep 17 00:00:00 2001 From: JayHerpin Date: Tue, 16 Apr 2024 15:05:28 -0500 Subject: [PATCH 01/15] Added the ability to explicitly set type names and some QoS settings for Mqtt -> ROS --- mqtt_client/config/params.ros2.fixed.yaml | 20 ++ .../include/mqtt_client/MqttClient.ros2.hpp | 66 +++-- mqtt_client/src/MqttClient.ros2.cpp | 250 +++++++++++++++++- 3 files changed, 303 insertions(+), 33 deletions(-) create mode 100644 mqtt_client/config/params.ros2.fixed.yaml diff --git a/mqtt_client/config/params.ros2.fixed.yaml b/mqtt_client/config/params.ros2.fixed.yaml new file mode 100644 index 0000000..99c1524 --- /dev/null +++ b/mqtt_client/config/params.ros2.fixed.yaml @@ -0,0 +1,20 @@ +/**/*: + ros__parameters: + broker: + host: localhost + port: 1883 + bridge: + ros2mqtt: + ros_topics: + - /ping/primitive + /ping/primitive: + mqtt_topic: pingpong/primitive + primitive: true + ros_type: std_msgs/msg/Bool + mqtt2ros: + mqtt_topics: + - pingpong/primitive + pingpong/primitive: + ros_topic: /pong/primitive + primitive: true + ros_type: std_msgs/msg/Bool diff --git a/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp b/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp index c9a2d38..6ba630b 100644 --- a/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp +++ b/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp @@ -34,12 +34,13 @@ SOFTWARE. #define FMT_HEADER_ONLY #include +#include #include #include #include -#include #include #include +#include #include @@ -98,8 +99,7 @@ class MqttClient : public rclcpp::Node, * @return true if parameter was successfully retrieved * @return false if parameter was not found or default was used */ - bool loadParameter(const std::string& key, std::string& value, - const std::string& default_value); + bool loadParameter(const std::string& key, std::string& value, const std::string& default_value); /** * @brief Loads requested ROS parameter from parameter server. @@ -242,6 +242,15 @@ class MqttClient : public rclcpp::Node, */ void mqtt2primitive(mqtt::const_message_ptr mqtt_msg); + /** + * @brief Publishes a primitive message received via MQTT to ROS. + * + * Currently not implemented. + * + * @param mqtt_msg MQTT message + */ + void mqtt2fixed(mqtt::const_message_ptr mqtt_msg); + /** * @brief Callback for when the client has successfully connected to the * broker. @@ -351,10 +360,9 @@ class MqttClient : public rclcpp::Node, std::string user; ///< username std::string pass; ///< password struct { - bool enabled; ///< whether to connect via SSL/TLS - std::filesystem::path - ca_certificate; ///< public CA certificate trusted by client - } tls; ///< SSL/TLS-related variables + bool enabled; ///< whether to connect via SSL/TLS + std::filesystem::path ca_certificate; ///< public CA certificate trusted by client + } tls; ///< SSL/TLS-related variables }; /** @@ -377,14 +385,14 @@ class MqttClient : public rclcpp::Node, double keep_alive_interval; ///< keep-alive interval int max_inflight; ///< maximum number of inflight messages struct { - std::filesystem::path certificate; ///< client certificate - std::filesystem::path key; ///< client private keyfile - std::string password; ///< decryption password for private key - int version; ///< TLS version (https://github.com/eclipse/paho.mqtt.cpp/blob/master/src/mqtt/ssl_options.h#L305) - bool verify; ///< Verify the client should conduct - ///< post-connect checks - std::vector alpn_protos; ///< list of ALPN protocols - } tls; ///< SSL/TLS-related variables + std::filesystem::path certificate; ///< client certificate + std::filesystem::path key; ///< client private keyfile + std::string password; ///< decryption password for private key + int version; ///< TLS version (https://github.com/eclipse/paho.mqtt.cpp/blob/master/src/mqtt/ssl_options.h#L305) + bool verify; ///< Verify the client should conduct + ///< post-connect checks + std::vector alpn_protos; ///< list of ALPN protocols + } tls; ///< SSL/TLS-related variables }; /** @@ -403,6 +411,7 @@ class MqttClient : public rclcpp::Node, int qos = 0; ///< MQTT QoS value bool retained = false; ///< whether to retain MQTT message } mqtt; ///< MQTT-related variables + bool fixed_type = false; ///< whether the published message type is specified explicitly bool primitive = false; ///< whether to publish as primitive message bool stamped = false; ///< whether to inject timestamp in MQTT message }; @@ -412,8 +421,8 @@ class MqttClient : public rclcpp::Node, */ struct Mqtt2RosInterface { struct { - int qos = 0; ///< MQTT QoS value - } mqtt; ///< MQTT-related variables + int qos = 0; ///< MQTT QoS value + } mqtt; ///< MQTT-related variables struct { std::string topic; ///< ROS topic std::string msg_type; ///< message type of publisher @@ -421,9 +430,14 @@ class MqttClient : public rclcpp::Node, rclcpp::Publisher::SharedPtr latency_publisher; ///< ROS publisher for latency int queue_size = 1; ///< ROS publisher queue size + struct { + rclcpp::ReliabilityPolicy reliability = rclcpp::ReliabilityPolicy::SystemDefault; + rclcpp::DurabilityPolicy durability = rclcpp::DurabilityPolicy::SystemDefault; + } qos; bool latched = false; ///< whether to latch ROS message bool is_stale = false; ///< whether a new generic publisher/subscriber is required - } ros; ///< ROS-related variables + } ros; ///< ROS-related variables + bool fixed_type = false; ///< whether the published ros message type is specified explicitly bool primitive = false; ///< whether to publish as primitive message (if ///< coming from non-ROS MQTT client) bool stamped = false; ///< whether timestamp is injected @@ -511,7 +525,8 @@ class MqttClient : public rclcpp::Node, template -bool MqttClient::loadParameter(const std::string& key, T& value) { +bool MqttClient::loadParameter(const std::string& key, T& value) +{ bool found = get_parameter(key, value); if (found) RCLCPP_DEBUG(get_logger(), "Retrieved parameter '%s' = '%s'", key.c_str(), @@ -522,7 +537,8 @@ bool MqttClient::loadParameter(const std::string& key, T& value) { template bool MqttClient::loadParameter(const std::string& key, T& value, - const T& default_value) { + const T& default_value) +{ bool found = get_parameter_or(key, value, default_value); if (!found) RCLCPP_WARN(get_logger(), "Parameter '%s' not set, defaulting to '%s'", @@ -540,7 +556,7 @@ bool MqttClient::loadParameter(const std::string& key, std::vector& value) const bool found = get_parameter(key, value); if (found) RCLCPP_WARN(get_logger(), "Retrieved parameter '%s' = '[%s]'", key.c_str(), - fmt::format("{}", fmt::join(value, ", ")).c_str()); + fmt::format("{}", fmt::join(value, ", ")).c_str()); return found; } @@ -555,7 +571,7 @@ bool MqttClient::loadParameter(const std::string& key, std::vector& value, key.c_str(), fmt::format("{}", fmt::join(value, ", ")).c_str()); if (found) RCLCPP_DEBUG(get_logger(), "Retrieved parameter '%s' = '%s'", key.c_str(), - fmt::format("{}", fmt::join(value, ", ")).c_str()); + fmt::format("{}", fmt::join(value, ", ")).c_str()); return found; } @@ -570,7 +586,8 @@ bool MqttClient::loadParameter(const std::string& key, std::vector& value, */ template void serializeRosMessage(const T& msg, - rclcpp::SerializedMessage& serialized_msg) { + rclcpp::SerializedMessage& serialized_msg) +{ rclcpp::Serialization serializer; serializer.serialize_message(&msg, &serialized_msg); @@ -587,7 +604,8 @@ void serializeRosMessage(const T& msg, */ template void deserializeRosMessage(const rclcpp::SerializedMessage& serialized_msg, - T& msg) { + T& msg) +{ rclcpp::Serialization serializer; serializer.deserialize_message(&serialized_msg, &msg); diff --git a/mqtt_client/src/MqttClient.ros2.cpp b/mqtt_client/src/MqttClient.ros2.cpp index 5d6247b..c2fca83 100644 --- a/mqtt_client/src/MqttClient.ros2.cpp +++ b/mqtt_client/src/MqttClient.ros2.cpp @@ -28,6 +28,7 @@ SOFTWARE. #include #include #include +#include #include #include @@ -59,6 +60,109 @@ const std::string MqttClient::kRosMsgTypeMqttTopicPrefix = const std::string MqttClient::kLatencyRosTopicPrefix = "~/latencies/"; +template +T mqtt2int(mqtt::const_message_ptr mqtt_msg) { + auto str_msg = mqtt_msg->to_string (); + std::size_t pos; + const T v = std::stoll(str_msg, &pos); + + if (pos != str_msg.size()) + throw std::invalid_argument ("not all charaters processed"); + + return v; +} + +bool mqtt2bool(mqtt::const_message_ptr mqtt_msg) { + const std::string str_msg = mqtt_msg->to_string (); + std::string bool_str = mqtt_msg->to_string (); + std::transform(str_msg.cbegin(), str_msg.cend(), bool_str.begin(), + ::tolower); + if (bool_str == "true" || bool_str == "1") return true; + if (bool_str == "false" || bool_str == "0") return false; + + throw std::invalid_argument ("unable to decode string"); +} + +bool fixedMqtt2PrimitiveRos(mqtt::const_message_ptr mqtt_msg, + const std::string& msg_type, + rclcpp::SerializedMessage &serialized_msg) { + try { + if (msg_type == "std_msgs/msg/String") { + std_msgs::msg::String msg; + msg.data = mqtt_msg->to_string(); + + serializeRosMessage(msg, serialized_msg); + return true; + } else if (msg_type == "std_msgs/msg/Bool") { + std_msgs::msg::Bool msg; + msg.data = mqtt2bool(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + return true; + } else if (msg_type == "std_msgs/msg/Char") { + std_msgs::msg::Char msg; + msg.data = mqtt2int(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + return true; + } else if (msg_type == "std_msgs/msg/UInt8") { + std_msgs::msg::UInt8 msg; + msg.data = mqtt2int(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + return true; + } else if (msg_type == "std_msgs/msg/UInt16") { + std_msgs::msg::UInt16 msg; + msg.data = mqtt2int(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + return true; + } else if (msg_type == "std_msgs/msg/UInt32") { + std_msgs::msg::UInt32 msg; + msg.data = mqtt2int(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + return true; + } else if (msg_type == "std_msgs/msg/UInt64") { + std_msgs::msg::UInt64 msg; + msg.data = mqtt2int(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + return true; + } else if (msg_type == "std_msgs/msg/Int8") { + std_msgs::msg::Int8 msg; + msg.data = mqtt2int(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + return true; + } else if (msg_type == "std_msgs/msg/Int16") { + std_msgs::msg::Int16 msg; + msg.data = mqtt2int(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + return true; + } else if (msg_type == "std_msgs/msg/Int32") { + std_msgs::msg::Int32 msg; + msg.data = mqtt2int(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + return true; + } else if (msg_type == "std_msgs/msg/Int64") { + std_msgs::msg::Int32 msg; + msg.data = mqtt2int(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + return true; + } else if (msg_type == "std_msgs/msg/Float32") { + /// TODO + } else if (msg_type == "std_msgs/msg/Float64") { + /// TODO + } + } catch (const std::exception &) { + } + + return false; +} /** * @brief Extracts string of primitive data types from ROS message. @@ -200,6 +304,8 @@ void MqttClient::loadParameters() { declare_parameter(fmt::format("bridge.ros2mqtt.{}.mqtt_topic", ros_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); param_desc.description = "whether to publish as primitive message"; declare_parameter(fmt::format("bridge.ros2mqtt.{}.primitive", ros_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc); + param_desc.description = "Explicitly set the ros message type"; + declare_parameter(fmt::format("bridge.ros2mqtt.{}.ros_type", ros_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); param_desc.description = "whether to attach a timestamp to a ROS2MQTT payload (for latency computation on receiver side)"; declare_parameter(fmt::format("bridge.ros2mqtt.{}.inject_timestamp", ros_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc); param_desc.description = "ROS subscriber queue size"; @@ -218,10 +324,16 @@ void MqttClient::loadParameters() { declare_parameter(fmt::format("bridge.mqtt2ros.{}.ros_topic", mqtt_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); param_desc.description = "whether to publish as primitive message (if coming from non-ROS MQTT client)"; declare_parameter(fmt::format("bridge.mqtt2ros.{}.primitive", mqtt_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc); - param_desc.description = "MQTT QoS value"; + param_desc.description = "Explicitly set the ros message type"; + declare_parameter(fmt::format("bridge.mqtt2ros.{}.ros_type", mqtt_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); + param_desc.description = "The encoding to use for the mqtt "; declare_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.mqtt.qos", mqtt_topic), rclcpp::ParameterType::PARAMETER_INTEGER, param_desc); param_desc.description = "ROS publisher queue size"; declare_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.ros.queue_size", mqtt_topic), rclcpp::ParameterType::PARAMETER_INTEGER, param_desc); + param_desc.description = "ROS publisher QoS durability"; + declare_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.ros.qos.durability", mqtt_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); + param_desc.description = "ROS publisher QoS reliability"; + declare_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.ros.qos.reliability", mqtt_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); param_desc.description = "whether to latch ROS message"; declare_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.ros.latched", mqtt_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc); } @@ -294,6 +406,17 @@ void MqttClient::loadParameters() { if (get_parameter(fmt::format("bridge.ros2mqtt.{}.primitive", ros_topic), primitive_param)) ros2mqtt.primitive = primitive_param.as_bool(); + // ros2mqtt[k]/ros_type + rclcpp::Parameter ros_type_param; + if (get_parameter(fmt::format("bridge.ros2mqtt.{}.ros_type", ros_topic), ros_type_param)) { + + + ros2mqtt.ros.msg_type = ros_type_param.as_string(); + ros2mqtt.fixed_type = true; + ros2mqtt.primitive = true; + RCLCPP_DEBUG(get_logger(), "Using explicit ros type %s", ros2mqtt.ros.msg_type.c_str ()); + } + // ros2mqtt[k]/inject_timestamp rclcpp::Parameter stamped_param; if (get_parameter(fmt::format("bridge.ros2mqtt.{}.inject_timestamp", ros_topic), stamped_param)) @@ -345,11 +468,23 @@ void MqttClient::loadParameters() { Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic]; mqtt2ros.ros.topic = ros_topic; + RCLCPP_DEBUG(get_logger(), "MQTT %s to ROS %s", mqtt_topic.c_str(), ros_topic.c_str ()); + // mqtt2ros[k]/primitive rclcpp::Parameter primitive_param; if (get_parameter(fmt::format("bridge.mqtt2ros.{}.primitive", mqtt_topic), primitive_param)) mqtt2ros.primitive = primitive_param.as_bool(); + + rclcpp::Parameter ros_type_param; + if (get_parameter(fmt::format("bridge.mqtt2ros.{}.ros_type", mqtt_topic), ros_type_param)) { + mqtt2ros.ros.msg_type = ros_type_param.as_string(); + mqtt2ros.fixed_type = true; + mqtt2ros.primitive = true; + + RCLCPP_DEBUG(get_logger(), "Using explicit ros type %s for %s", mqtt2ros.ros.msg_type.c_str (), ros_topic.c_str ()); + } + // mqtt2ros[k]/advanced/mqtt/qos rclcpp::Parameter qos_param; if (get_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.mqtt.qos", mqtt_topic), qos_param)) @@ -361,6 +496,33 @@ void MqttClient::loadParameters() { queue_size_param)) mqtt2ros.ros.queue_size = queue_size_param.as_int(); + rclcpp::Parameter durability_param; + if (get_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.ros.qos.durability", mqtt_topic), durability_param)) { + const auto p = durability_param.as_string(); + if (p == "volatile") { + mqtt2ros.ros.qos.durability = rclcpp::DurabilityPolicy::Volatile; + } else if (p == "transient_local") { + mqtt2ros.ros.qos.durability = + rclcpp::DurabilityPolicy::TransientLocal; + } else { + RCLCPP_ERROR(get_logger(), "Unexpected durability %s", p.c_str ()); + exit(EXIT_FAILURE); + } + } + + rclcpp::Parameter reliability_param; + if (get_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.ros.qos.reliability", mqtt_topic), reliability_param)) { + const auto p = reliability_param.as_string(); + if (p == "best_effort") { + mqtt2ros.ros.qos.reliability = rclcpp::ReliabilityPolicy::BestEffort; + } else if (p == "reliable") { + mqtt2ros.ros.qos.reliability = rclcpp::ReliabilityPolicy::Reliable; + } else { + RCLCPP_ERROR(get_logger(), "Unexpected reliability %s", p.c_str ()); + exit(EXIT_FAILURE); + } + } + // mqtt2ros[k]/advanced/ros/latched rclcpp::Parameter latched_param; if (get_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.ros.latched", mqtt_topic), latched_param)) { @@ -473,6 +635,26 @@ void MqttClient::setupSubscriptions() { // check for ros2mqtt topics for (auto& [ros_topic, ros2mqtt] : ros2mqtt_) { + + std::function msg)> + bound_callback_func = + std::bind(&MqttClient::ros2mqtt, this, std::placeholders::_1, ros_topic); + + if (ros2mqtt.fixed_type && ! ros2mqtt.ros.subscriber) { + try { + ros2mqtt.ros.subscriber = create_generic_subscription( + ros_topic, ros2mqtt.ros.msg_type, ros2mqtt.ros.queue_size, bound_callback_func); + ros2mqtt.ros.is_stale = false; + + } catch (rclcpp::exceptions::RCLError& e) { + RCLCPP_ERROR(get_logger(), "Failed to create generic subscriber: %s", + e.what()); + return; + } + RCLCPP_INFO(get_logger(), "Subscribed ROS topic '%s' of type '%s'", + ros_topic.c_str(), ros2mqtt.ros.msg_type.c_str()); + + } if (all_topics_and_types.count(ros_topic)) { // check if message type has changed or if mapping is stale @@ -481,10 +663,6 @@ void MqttClient::setupSubscriptions() { ros2mqtt.ros.is_stale = false; ros2mqtt.ros.msg_type = msg_type; - // create new generic subscription, if message type has changed - std::function msg)> - bound_callback_func = std::bind(&MqttClient::ros2mqtt, this, - std::placeholders::_1, ros_topic); try { ros2mqtt.ros.subscriber = create_generic_subscription( ros_topic, msg_type, ros2mqtt.ros.queue_size, bound_callback_func); @@ -701,7 +879,6 @@ void MqttClient::ros2mqtt( void MqttClient::mqtt2ros(mqtt::const_message_ptr mqtt_msg, const rclcpp::Time& arrival_stamp) { - std::string mqtt_topic = mqtt_msg->get_topic(); Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic]; auto& payload = mqtt_msg->get_payload_ref(); @@ -762,8 +939,8 @@ void MqttClient::mqtt2primitive(mqtt::const_message_ptr mqtt_msg) { std::string mqtt_topic = mqtt_msg->get_topic(); Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic]; - const std::string str_msg = mqtt_msg->to_string(); + const std::string str_msg = mqtt_msg->to_string(); bool found_primitive = false; std::string ros_msg_type = "std_msgs/msg/String"; rclcpp::SerializedMessage serialized_msg; @@ -846,8 +1023,11 @@ void MqttClient::mqtt2primitive(mqtt::const_message_ptr mqtt_msg) { // recreate generic publisher try { + const auto qos = rclcpp::QoS(mqtt2ros.ros.queue_size) + .durability(mqtt2ros.ros.qos.durability) + .reliability(mqtt2ros.ros.qos.reliability); mqtt2ros.ros.publisher = create_generic_publisher( - mqtt2ros.ros.topic, ros_msg_type, mqtt2ros.ros.queue_size); + mqtt2ros.ros.topic, ros_msg_type, qos); } catch (rclcpp::exceptions::RCLError& e) { RCLCPP_ERROR(get_logger(), "Failed to create generic publisher: %s", e.what()); @@ -864,6 +1044,49 @@ void MqttClient::mqtt2primitive(mqtt::const_message_ptr mqtt_msg) { mqtt2ros.ros.publisher->publish(serialized_msg); } +void MqttClient::mqtt2fixed(mqtt::const_message_ptr mqtt_msg) { + + std::string mqtt_topic = mqtt_msg->get_topic(); + Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic]; + + rclcpp::SerializedMessage serialized_msg; + + if (!fixedMqtt2PrimitiveRos(mqtt_msg, mqtt2ros.ros.msg_type, serialized_msg)) { + RCLCPP_WARN( + get_logger(), + "Could not convert mqtt message into type %s on topic %s ...", + mqtt2ros.ros.msg_type.c_str(), mqtt2ros.ros.topic.c_str()); + } else { + + if (!mqtt2ros.ros.publisher) + { + RCLCPP_INFO(get_logger(), + "ROS publisher message type on topic '%s' set to '%s'", + mqtt2ros.ros.topic.c_str(), mqtt2ros.ros.msg_type.c_str()); + + // recreate generic publisher + try { + const auto qos = rclcpp::QoS(mqtt2ros.ros.queue_size) + .durability(mqtt2ros.ros.qos.durability) + .reliability(mqtt2ros.ros.qos.reliability); + mqtt2ros.ros.publisher = create_generic_publisher( + mqtt2ros.ros.topic, mqtt2ros.ros.msg_type, qos); + + mqtt2ros.ros.is_stale = false; + } catch (const rclcpp::exceptions::RCLError& e) { + RCLCPP_ERROR(get_logger(), "Failed to create generic publisher: %s", + e.what()); + } + } + + RCLCPP_DEBUG(get_logger(), + "Sending ROS message of type '%s' from MQTT broker to ROS " + "topic '%s' ...", + mqtt2ros.ros.msg_type.c_str(), mqtt2ros.ros.topic.c_str()); + mqtt2ros.ros.publisher->publish(serialized_msg); + } +} + void MqttClient::connected(const std::string& cause) { @@ -993,6 +1216,12 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) { // publish directly if primitive if (mqtt2ros_.count(mqtt_topic) > 0) { Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic]; + + if (mqtt2ros.fixed_type) { + mqtt2fixed(mqtt_msg); + return; + } + if (mqtt2ros.primitive) { mqtt2primitive(mqtt_msg); return; @@ -1033,8 +1262,11 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) { // recreate generic publisher try { + const auto qos = rclcpp::QoS(mqtt2ros.ros.queue_size) + .durability(mqtt2ros.ros.qos.durability) + .reliability(mqtt2ros.ros.qos.reliability); mqtt2ros.ros.publisher = create_generic_publisher( - mqtt2ros.ros.topic, ros_msg_type.name, mqtt2ros.ros.queue_size); + mqtt2ros.ros.topic, ros_msg_type.name, qos); } catch (rclcpp::exceptions::RCLError& e) { RCLCPP_ERROR(get_logger(), "Failed to create generic publisher: %s", e.what()); From 4e5b854dfd6ba334f4a2f807b7db7ef1792664e9 Mon Sep 17 00:00:00 2001 From: JayHerpin Date: Wed, 17 Apr 2024 08:47:46 -0500 Subject: [PATCH 02/15] Automatically deduce QOS paramters for subscriptions based on publications --- mqtt_client/src/MqttClient.ros2.cpp | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/mqtt_client/src/MqttClient.ros2.cpp b/mqtt_client/src/MqttClient.ros2.cpp index c2fca83..0e37c33 100644 --- a/mqtt_client/src/MqttClient.ros2.cpp +++ b/mqtt_client/src/MqttClient.ros2.cpp @@ -28,6 +28,7 @@ SOFTWARE. #include #include #include +#include #include #include @@ -649,27 +650,34 @@ void MqttClient::setupSubscriptions() { } catch (rclcpp::exceptions::RCLError& e) { RCLCPP_ERROR(get_logger(), "Failed to create generic subscriber: %s", e.what()); - return; + continue; } RCLCPP_INFO(get_logger(), "Subscribed ROS topic '%s' of type '%s'", ros_topic.c_str(), ros2mqtt.ros.msg_type.c_str()); - } - if (all_topics_and_types.count(ros_topic)) { + } else { + const auto &pubs = get_publishers_info_by_topic(ros_topic); + + if (pubs.empty()) continue; + const auto endpointInfo = pubs.front (); // check if message type has changed or if mapping is stale - const std::string& msg_type = all_topics_and_types.at(ros_topic)[0]; + const std::string msg_type = endpointInfo.topic_type(); + if (msg_type == ros2mqtt.ros.msg_type && !ros2mqtt.ros.is_stale) continue; ros2mqtt.ros.is_stale = false; ros2mqtt.ros.msg_type = msg_type; try { + auto qos = endpointInfo.qos_profile (); ros2mqtt.ros.subscriber = create_generic_subscription( - ros_topic, msg_type, ros2mqtt.ros.queue_size, bound_callback_func); + ros_topic, msg_type, qos.keep_last(ros2mqtt.ros.queue_size), + bound_callback_func); + } catch (rclcpp::exceptions::RCLError& e) { RCLCPP_ERROR(get_logger(), "Failed to create generic subscriber: %s", e.what()); - return; + continue; } RCLCPP_INFO(get_logger(), "Subscribed ROS topic '%s' of type '%s'", ros_topic.c_str(), msg_type.c_str()); From 49ced1d3c7a7a3565d565220890472b3e678f077 Mon Sep 17 00:00:00 2001 From: JayHerpin Date: Wed, 17 Apr 2024 08:48:07 -0500 Subject: [PATCH 03/15] Changed bool formatting for output to be 1 and 0 --- mqtt_client/src/MqttClient.ros2.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mqtt_client/src/MqttClient.ros2.cpp b/mqtt_client/src/MqttClient.ros2.cpp index 0e37c33..1bd3502 100644 --- a/mqtt_client/src/MqttClient.ros2.cpp +++ b/mqtt_client/src/MqttClient.ros2.cpp @@ -193,7 +193,7 @@ bool primitiveRosMessageToString( } else if (msg_type == "std_msgs/msg/Bool") { std_msgs::msg::Bool msg; deserializeRosMessage(*serialized_msg, msg); - primitive = msg.data ? "true" : "false"; + primitive = msg.data ? "1" : "0"; } else if (msg_type == "std_msgs/msg/Char") { std_msgs::msg::Char msg; deserializeRosMessage(*serialized_msg, msg); From 0bd52b2c04b5e4442410d5300d55cd05c9ae31f0 Mon Sep 17 00:00:00 2001 From: JayHerpin Date: Tue, 23 Apr 2024 13:45:07 -0500 Subject: [PATCH 04/15] Allow auto or explicit selection of ros2mqtt subscriber QoS --- mqtt_client/config/params.ros2.fixed.yaml | 18 +- .../include/mqtt_client/MqttClient.ros2.hpp | 34 ++++ mqtt_client/src/MqttClient.ros2.cpp | 190 +++++++++++++++--- 3 files changed, 208 insertions(+), 34 deletions(-) diff --git a/mqtt_client/config/params.ros2.fixed.yaml b/mqtt_client/config/params.ros2.fixed.yaml index 99c1524..5cb6648 100644 --- a/mqtt_client/config/params.ros2.fixed.yaml +++ b/mqtt_client/config/params.ros2.fixed.yaml @@ -9,12 +9,26 @@ - /ping/primitive /ping/primitive: mqtt_topic: pingpong/primitive - primitive: true + # This is implied by the explicit ros type + #primitive: true ros_type: std_msgs/msg/Bool + advanced: + ros: + queue_size: 10 + qos: + durability: auto + reliability: auto mqtt2ros: mqtt_topics: - pingpong/primitive pingpong/primitive: ros_topic: /pong/primitive - primitive: true + #This is implied by the explicit ros type + #primitive: true ros_type: std_msgs/msg/Bool + advanced: + ros: + queue_size: 10 + qos: + durability: transient_local + reliability: reliable diff --git a/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp b/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp index 6ba630b..a4084bd 100644 --- a/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp +++ b/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp @@ -30,6 +30,7 @@ SOFTWARE. #include #include #include +#include #include #define FMT_HEADER_ONLY @@ -72,6 +73,9 @@ class MqttClient : public rclcpp::Node, explicit MqttClient(const rclcpp::NodeOptions& options); protected: + class Ros2MqttInterface; + class Mqtt2RosInterface; + /** * @brief Loads ROS parameters from parameter server. */ @@ -179,6 +183,31 @@ class MqttClient : public rclcpp::Node, */ void setup(); + /** + * @brief Get the resolved compatible QOS from the interface and the endpoint + * + * This uses the two endpoints to decide upon a compatible QoS, resolving any "auto" QoS settings + * + * @param ros_topic the ROS topic we are looking on + * @param tei Topic endpoint info + * @param ros2mqtt the ROS to MQTT interface spec + * + * @returns The compatible QoS or nullopt if no compatible combination is found + */ + std::optional getCompatibleQoS( + const std::string& ros_topic, const rclcpp::TopicEndpointInfo& tei, + const Ros2MqttInterface& ros2mqtt) const; + + /** + * @brief Get the candiate topic endpoints for subscription matching + * + * @param ros2mqtt the ROS to MQTT interface spec + * + * @returns The compatible QoS or nullopt if no compatible combination is found + */ + std::vector getCandidatePublishers( + const std::string& ros_topic, const Ros2MqttInterface& ros2mqtt) const; + /** * @brief Checks all active ROS topics in order to set up generic subscribers. */ @@ -405,6 +434,11 @@ class MqttClient : public rclcpp::Node, std::string msg_type; ///< message type of subscriber int queue_size = 1; ///< ROS subscriber queue size bool is_stale = false; ///< whether a new generic publisher/subscriber is required + struct { + // If these are set to nullopt then that part of the QoS is determine automatically based on discovery + std::optional reliability; + std::optional durability; + } qos; } ros; ///< ROS-related variables struct { std::string topic; ///< MQTT topic diff --git a/mqtt_client/src/MqttClient.ros2.cpp b/mqtt_client/src/MqttClient.ros2.cpp index 1bd3502..d973470 100644 --- a/mqtt_client/src/MqttClient.ros2.cpp +++ b/mqtt_client/src/MqttClient.ros2.cpp @@ -175,6 +175,7 @@ bool fixedMqtt2PrimitiveRos(mqtt::const_message_ptr mqtt_msg, * * @param [in] serialized_msg generic serialized ROS message * @param [in] msg_type ROS message type, e.g. `std_msgs/msg/String` + * @param [in] fixed_encoding Truthfully this is just an ugly hack to change the encoding of bools in some cases * @param [out] primitive string representation of primitive message data * * @return true if primitive ROS message type was found @@ -182,7 +183,7 @@ bool fixedMqtt2PrimitiveRos(mqtt::const_message_ptr mqtt_msg, */ bool primitiveRosMessageToString( const std::shared_ptr& serialized_msg, - const std::string& msg_type, std::string& primitive) { + const std::string& msg_type, bool fixed_encoding, std::string& primitive) { bool found_primitive = true; @@ -193,7 +194,10 @@ bool primitiveRosMessageToString( } else if (msg_type == "std_msgs/msg/Bool") { std_msgs::msg::Bool msg; deserializeRosMessage(*serialized_msg, msg); - primitive = msg.data ? "1" : "0"; + if (fixed_encoding) + primitive = msg.data ? "1" : "0"; + else + primitive = msg.data ? "true" : "false"; } else if (msg_type == "std_msgs/msg/Char") { std_msgs::msg::Char msg; deserializeRosMessage(*serialized_msg, msg); @@ -311,6 +315,10 @@ void MqttClient::loadParameters() { declare_parameter(fmt::format("bridge.ros2mqtt.{}.inject_timestamp", ros_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc); param_desc.description = "ROS subscriber queue size"; declare_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.queue_size", ros_topic), rclcpp::ParameterType::PARAMETER_INTEGER, param_desc); + param_desc.description = "ROS subscriber QoS reliability"; + declare_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.qos.reliability", ros_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); + param_desc.description = "ROS subscriber QoS durability"; + declare_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.qos.durability", ros_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); param_desc.description = "MQTT QoS value"; declare_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.mqtt.qos", ros_topic), rclcpp::ParameterType::PARAMETER_INTEGER, param_desc); param_desc.description = "whether to retain MQTT message"; @@ -437,6 +445,36 @@ void MqttClient::loadParameters() { queue_size_param)) ros2mqtt.ros.queue_size = queue_size_param.as_int(); + rclcpp::Parameter durability_param; + if (get_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.qos.durability", mqtt_topic), durability_param)) { + const auto p = durability_param.as_string(); + if (p == "volatile") { + ros2mqtt.ros.qos.durability = rclcpp::DurabilityPolicy::Volatile; + } else if (p == "transient_local") { + ros2mqtt.ros.qos.durability = rclcpp::DurabilityPolicy::TransientLocal; + } else if (p == "auto") { + ros2mqtt.ros.qos.durability = {}; + } else { + RCLCPP_ERROR(get_logger(), "Unexpected durability %s", p.c_str ()); + exit(EXIT_FAILURE); + } + } + + rclcpp::Parameter reliability_param; + if (get_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.qos.reliability", mqtt_topic), reliability_param)) { + const auto p = reliability_param.as_string(); + if (p == "best_effort") { + ros2mqtt.ros.qos.reliability = rclcpp::ReliabilityPolicy::BestEffort; + } else if (p == "reliable") { + ros2mqtt.ros.qos.reliability = rclcpp::ReliabilityPolicy::Reliable; + } else if (p == "auto") { + ros2mqtt.ros.qos.reliability = {}; + } else { + RCLCPP_ERROR(get_logger(), "Unexpected reliability %s", p.c_str ()); + exit(EXIT_FAILURE); + } + } + // ros2mqtt[k]/advanced/mqtt/qos rclcpp::Parameter qos_param; if (get_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.mqtt.qos", ros_topic), qos_param)) @@ -628,64 +666,152 @@ void MqttClient::setup() { std::bind(&MqttClient::setupSubscriptions, this)); } +std::optional MqttClient::getCompatibleQoS (const std::string &ros_topic, const rclcpp::TopicEndpointInfo &tei, + const Ros2MqttInterface &ros2mqtt) const +{ + // the basic premise here is that we take the QoS from the publisher, overwrite any parts that are explicitly set + // via configuration then check if the result is compatible with the publisher + auto qos = tei.qos_profile (); + + if (auto r = ros2mqtt.ros.qos.reliability) + qos.reliability (*r); + if (auto d = ros2mqtt.ros.qos.durability) + qos.durability (*d); + qos.keep_last (ros2mqtt.ros.queue_size); + + const auto qres = rclcpp::qos_check_compatible (tei.qos_profile (), qos); + + switch (qres.compatibility) + { + case rclcpp::QoSCompatibility::Ok: + return qos; + case rclcpp::QoSCompatibility::Warning: + RCLCPP_DEBUG(get_logger(), "QoS compatibility warning on topic %s - %s", ros_topic.c_str(), qres.reason.c_str ()); + // presumably this is still compatible + return qos; + case rclcpp::QoSCompatibility::Error: + default: + return {}; + } + +} + +std::vector MqttClient::getCandidatePublishers( + const std::string& ros_topic, const Ros2MqttInterface& ros2mqtt) const +{ + const auto &pubs = get_publishers_info_by_topic(ros_topic); + + if (pubs.empty ()) + return {}; + + std::vector ret; + + ret.reserve (pubs.size ()); + + for (const auto &p : pubs) + { + const std::string msg_type = p.topic_type(); + + // if the type isn't set, match aginst all t ypes, otherwise only match against mtching types + if (ros2mqtt.ros.msg_type.empty () || msg_type == ros2mqtt.ros.msg_type) + ret.push_back (p); + } + + // If we found any matching types, return those + if (! ret.empty ()) + return ret; + + // If we didn't and we aren't fix type, then just return the full set of publishers + if (! ros2mqtt.fixed_type) + return pubs; + + // None of these publishers will work... :sad_panda: + return {}; +} void MqttClient::setupSubscriptions() { - // get info of all topics - const auto all_topics_and_types = get_topic_names_and_types(); + // For each ros2mqtt interface, check if we need to do a lazy subscription (eg, lazy subscription would be if we + // are determining either the type of QoS dynamically). If we don't, then just make the fixed subscriber if its not + // already made. + // + // If we do, we check each publisher for a potential match. The first step is to filter down the list of + // publishers based on the type. If we are fixed type then that list only includes publishers with matching types. + // If we aren't its a little trickier. For dynamic types, if we already have matched against a type previously, the + // candidate list will include all publishers which have matching types if any exist. If none of the publishers + // have matching types, then the list will include all publishers. + // + // Then for each candidate publisher, check if their QoS is compatible with the QoS specic in the configuration. This + // is really just needed because there is the potential for someone to set half of the QoS to auto and half to + // explicit. + // Condition for requiring "lazy" subscription where we need to walk the + const auto requires_lazy_subscription = [] (const Ros2MqttInterface &ros2mqtt) + { + if (! ros2mqtt.fixed_type) + return true; + if (ros2mqtt.ros.qos.reliability == std::nullopt || + ros2mqtt.ros.qos.durability == std::nullopt) + return true; + return false; + }; - // check for ros2mqtt topics for (auto& [ros_topic, ros2mqtt] : ros2mqtt_) { - std::function msg)> - bound_callback_func = + std::function msg)> bound_callback_func = std::bind(&MqttClient::ros2mqtt, this, std::placeholders::_1, ros_topic); - if (ros2mqtt.fixed_type && ! ros2mqtt.ros.subscriber) { + if (! requires_lazy_subscription (ros2mqtt)) + { try { + if (ros2mqtt.ros.subscriber) + continue; + ros2mqtt.ros.subscriber = create_generic_subscription( ros_topic, ros2mqtt.ros.msg_type, ros2mqtt.ros.queue_size, bound_callback_func); ros2mqtt.ros.is_stale = false; - + RCLCPP_INFO(get_logger(), "Subscribed ROS topic '%s' of type '%s'", + ros_topic.c_str(), ros2mqtt.ros.msg_type.c_str()); } catch (rclcpp::exceptions::RCLError& e) { RCLCPP_ERROR(get_logger(), "Failed to create generic subscriber: %s", e.what()); continue; } - RCLCPP_INFO(get_logger(), "Subscribed ROS topic '%s' of type '%s'", - ros_topic.c_str(), ros2mqtt.ros.msg_type.c_str()); - } else { - const auto &pubs = get_publishers_info_by_topic(ros_topic); + const auto &pubs = getCandidatePublishers (ros_topic, ros2mqtt); if (pubs.empty()) continue; - const auto endpointInfo = pubs.front (); - // check if message type has changed or if mapping is stale - const std::string msg_type = endpointInfo.topic_type(); + for (auto endpointInfo : pubs) { + try { + // check if message type has changed or if mapping is stale + const std::string msg_type = endpointInfo.topic_type(); - if (msg_type == ros2mqtt.ros.msg_type && !ros2mqtt.ros.is_stale) continue; - ros2mqtt.ros.is_stale = false; - ros2mqtt.ros.msg_type = msg_type; + if (msg_type == ros2mqtt.ros.msg_type && !ros2mqtt.ros.is_stale && ros2mqtt.ros.subscriber) + continue; - try { - auto qos = endpointInfo.qos_profile (); - ros2mqtt.ros.subscriber = create_generic_subscription( - ros_topic, msg_type, qos.keep_last(ros2mqtt.ros.queue_size), - bound_callback_func); + auto const qos = getCompatibleQoS (ros_topic, endpointInfo, ros2mqtt); - } catch (rclcpp::exceptions::RCLError& e) { - RCLCPP_ERROR(get_logger(), "Failed to create generic subscriber: %s", - e.what()); - continue; + if (! qos) + continue; + + ros2mqtt.ros.is_stale = false; + ros2mqtt.ros.msg_type = msg_type; + + ros2mqtt.ros.subscriber = create_generic_subscription( + ros_topic, msg_type, *qos, bound_callback_func); + + RCLCPP_INFO(get_logger(), "Subscribed ROS topic '%s' of type '%s'", + ros_topic.c_str(), msg_type.c_str()); + } catch (rclcpp::exceptions::RCLError& e) { + RCLCPP_ERROR(get_logger(), "Failed to create generic subscriber: %s", + e.what()); + continue; + } } - RCLCPP_INFO(get_logger(), "Subscribed ROS topic '%s' of type '%s'", - ros_topic.c_str(), msg_type.c_str()); } } } - void MqttClient::setupClient() { // basic client connection options @@ -787,7 +913,7 @@ void MqttClient::ros2mqtt( // resolve ROS messages to primitive strings if possible std::string payload; bool found_primitive = - primitiveRosMessageToString(serialized_msg, ros_msg_type.name, payload); + primitiveRosMessageToString(serialized_msg, ros_msg_type.name, ros2mqtt.fixed_type, payload); if (found_primitive) { payload_buffer = std::vector(payload.begin(), payload.end()); } else { From 0f5ec6718ab8e0df04b6be9dbcd2c63805dcd9b6 Mon Sep 17 00:00:00 2001 From: JayHerpin Date: Tue, 23 Apr 2024 14:11:56 -0500 Subject: [PATCH 05/15] Make fixed type publishers ahead of time - Updated the docs - Added in "system_default" qos values - Fixed type publishers are now made right away rather than waiting for an mqtt message to come in --- README.md | 34 ++++++++------ .../include/mqtt_client/MqttClient.ros2.hpp | 9 +++- mqtt_client/src/MqttClient.ros2.cpp | 44 +++++++++++++++++-- 3 files changed, 69 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index c00aa13..2e4e69c 100644 --- a/README.md +++ b/README.md @@ -275,51 +275,56 @@ client: ```yaml bridge: - ros2mqtt: # Array specifying which ROS topics to map to which MQTT topics + ros2mqtt: # Array specifying which ROS topics to map to which MQTT topics - ros_topic: # ROS topic whose messages are transformed to MQTT messages mqtt_topic: # MQTT topic on which the corresponding ROS messages are sent to the broker primitive: # [false] whether to publish as primitive message inject_timestamp: # [false] whether to attach a timestamp to a ROS2MQTT payload (for latency computation on receiver side) advanced: ros: - queue_size: # [1] ROS subscriber queue size + queue_size: # [1] ROS subscriber queue size mqtt: - qos: # [0] MQTT QoS value - retained: # [false] whether to retain MQTT message - mqtt2ros: # Array specifying which MQTT topics to map to which ROS topics + qos: # [0] MQTT QoS value + retained: # [false] whether to retain MQTT message + mqtt2ros: # Array specifying which MQTT topics to map to which ROS topics - mqtt_topic: # MQTT topic on which messages are received from the broker ros_topic: # ROS topic on which corresponding MQTT messages are published primitive: # [false] whether to publish as primitive message (if coming from non-ROS MQTT client) advanced: mqtt: - qos: # [0] MQTT QoS value + qos: # [0] MQTT QoS value ros: - queue_size: # [1] ROS publisher queue size - latched: # [false] whether to latch ROS message + queue_size: # [1] ROS publisher queue size + latched: # [false] whether to latch ROS message ``` ##### ROS 2 ```yaml bridge: - ros2mqtt: # Object specifying which ROS topics to map to which MQTT topics - ros_topics: # Array specifying which ROS topics to bridge + ros2mqtt: # Object specifying which ROS topics to map to which MQTT topics + ros_topics: # Array specifying which ROS topics to bridge - {{ ros_topic_name }} # The ROS topic that should be bridged, corresponds to the sub-object in the YAML {{ ros_topic_name }}: mqtt_topic: # MQTT topic on which the corresponding ROS messages are sent to the broker primitive: # [false] whether to publish as primitive message + ros_type: # [*empty*] If set, the ROS msg type provided will be used. If empty, the type is automatically deduced via the publisher inject_timestamp: # [false] whether to attach a timestamp to a ROS2MQTT payload (for latency computation on receiver side) advanced: ros: - queue_size: # [1] ROS subscriber queue size + queue_size: # [1] ROS subscriber queue size + qos: + reliability: # [auto] One of "auto", "system_default", "reliable", "best_effort". If auto, the QoS is automatically determined via the publisher + durability: # [auto] One of "auto", "system_default", "volatile", "transient_local". If auto, the QoS is automatically determined via the publisher mqtt: - qos: # [0] MQTT QoS value - retained: # [false] whether to retain MQTT message + qos: # [0] MQTT QoS value + retained: # [false] whether to retain MQTT message mqtt2ros: # Object specifying which MQTT topics to map to which ROS topics mqtt_topics: # Array specifying which ROS topics to bridge - {{ mqtt_topic_name }} # The MQTT topic that should be bridged, corresponds to the sub-object in the YAML {{ mqtt_topic_name }}: ros_topic: # ROS topic on which corresponding MQTT messages are published + ros_type: # [*empty*] If set, the ROS msg type provided will be used. If empty, the type is automatically deduced via the MQTT message primitive: # [false] whether to publish as primitive message (if coming from non-ROS MQTT client) advanced: mqtt: @@ -327,6 +332,9 @@ bridge: ros: queue_size: # [1] ROS publisher queue size latched: # [false] whether to latch ROS message + qos: + reliability: # [system_default] One of "system_default", "reliable", "best_effort". + durability: # [system_default] One of "system_default", "volatile", "transient_local". ``` ## Primitive Messages diff --git a/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp b/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp index a4084bd..5174038 100644 --- a/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp +++ b/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp @@ -209,10 +209,17 @@ class MqttClient : public rclcpp::Node, const std::string& ros_topic, const Ros2MqttInterface& ros2mqtt) const; /** - * @brief Checks all active ROS topics in order to set up generic subscribers. + * @brief Setup any subscriptions we can. + * + * These may be fixed type/QoS, or dynamically matched against active publisher */ void setupSubscriptions(); + /** + * @brief Setup any publishers that we can + */ + void setupPublishers(); + /** * @brief Sets up the client connection options and initializes the client * object. diff --git a/mqtt_client/src/MqttClient.ros2.cpp b/mqtt_client/src/MqttClient.ros2.cpp index d973470..825c2d7 100644 --- a/mqtt_client/src/MqttClient.ros2.cpp +++ b/mqtt_client/src/MqttClient.ros2.cpp @@ -448,7 +448,9 @@ void MqttClient::loadParameters() { rclcpp::Parameter durability_param; if (get_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.qos.durability", mqtt_topic), durability_param)) { const auto p = durability_param.as_string(); - if (p == "volatile") { + if (p == "system_default") { + ros2mqtt.ros.qos.durability = rclcpp::DurabilityPolicy::SystemDefault; + } else if (p == "volatile") { ros2mqtt.ros.qos.durability = rclcpp::DurabilityPolicy::Volatile; } else if (p == "transient_local") { ros2mqtt.ros.qos.durability = rclcpp::DurabilityPolicy::TransientLocal; @@ -463,7 +465,9 @@ void MqttClient::loadParameters() { rclcpp::Parameter reliability_param; if (get_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.qos.reliability", mqtt_topic), reliability_param)) { const auto p = reliability_param.as_string(); - if (p == "best_effort") { + if (p == "system_default") { + ros2mqtt.ros.qos.reliability = rclcpp::ReliabilityPolicy::SystemDefault; + } else if (p == "best_effort") { ros2mqtt.ros.qos.reliability = rclcpp::ReliabilityPolicy::BestEffort; } else if (p == "reliable") { ros2mqtt.ros.qos.reliability = rclcpp::ReliabilityPolicy::Reliable; @@ -538,7 +542,9 @@ void MqttClient::loadParameters() { rclcpp::Parameter durability_param; if (get_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.ros.qos.durability", mqtt_topic), durability_param)) { const auto p = durability_param.as_string(); - if (p == "volatile") { + if (p == "system_default") { + mqtt2ros.ros.qos.durability = rclcpp::DurabilityPolicy::SystemDefault; + } else if (p == "volatile") { mqtt2ros.ros.qos.durability = rclcpp::DurabilityPolicy::Volatile; } else if (p == "transient_local") { mqtt2ros.ros.qos.durability = @@ -552,7 +558,9 @@ void MqttClient::loadParameters() { rclcpp::Parameter reliability_param; if (get_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.ros.qos.reliability", mqtt_topic), reliability_param)) { const auto p = reliability_param.as_string(); - if (p == "best_effort") { + if (p == "system_default") { + mqtt2ros.ros.qos.reliability = rclcpp::ReliabilityPolicy::SystemDefault; + } else if (p == "best_effort") { mqtt2ros.ros.qos.reliability = rclcpp::ReliabilityPolicy::BestEffort; } else if (p == "reliable") { mqtt2ros.ros.qos.reliability = rclcpp::ReliabilityPolicy::Reliable; @@ -664,6 +672,8 @@ void MqttClient::setup() { check_subscriptions_timer_ = create_wall_timer(std::chrono::duration(1.0), std::bind(&MqttClient::setupSubscriptions, this)); + + setupPublishers (); } std::optional MqttClient::getCompatibleQoS (const std::string &ros_topic, const rclcpp::TopicEndpointInfo &tei, @@ -812,6 +822,31 @@ void MqttClient::setupSubscriptions() { } } +void MqttClient::setupPublishers() { + + for (auto& [mqtt_topic, mqtt2ros] : mqtt2ros_) { + if (mqtt2ros.ros.publisher) + continue; + + // If the type is not fixed, we require a mqtt message in order to deduce the type + if (!mqtt2ros.fixed_type) + continue; + + try { + const auto qos = rclcpp::QoS(mqtt2ros.ros.queue_size) + .durability(mqtt2ros.ros.qos.durability) + .reliability(mqtt2ros.ros.qos.reliability); + mqtt2ros.ros.publisher = create_generic_publisher( + mqtt2ros.ros.topic, mqtt2ros.ros.msg_type, qos); + + mqtt2ros.ros.is_stale = false; + } catch (const rclcpp::exceptions::RCLError& e) { + RCLCPP_ERROR(get_logger(), "Failed to create generic publisher: %s", + e.what()); + } + } +} + void MqttClient::setupClient() { // basic client connection options @@ -1210,6 +1245,7 @@ void MqttClient::mqtt2fixed(mqtt::const_message_ptr mqtt_msg) { } catch (const rclcpp::exceptions::RCLError& e) { RCLCPP_ERROR(get_logger(), "Failed to create generic publisher: %s", e.what()); + return; } } From 41cba103a1adc45ba90135aceb3e20745a2553e3 Mon Sep 17 00:00:00 2001 From: JayHerpin Date: Tue, 23 Apr 2024 14:25:14 -0500 Subject: [PATCH 06/15] Misc minor formatting fixes --- README.md | 14 +++++++------- .../include/mqtt_client/MqttClient.ros2.hpp | 18 ++++++------------ 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 2e4e69c..e2fad69 100644 --- a/README.md +++ b/README.md @@ -319,8 +319,8 @@ bridge: mqtt: qos: # [0] MQTT QoS value retained: # [false] whether to retain MQTT message - mqtt2ros: # Object specifying which MQTT topics to map to which ROS topics - mqtt_topics: # Array specifying which ROS topics to bridge + mqtt2ros: # Object specifying which MQTT topics to map to which ROS topics + mqtt_topics: # Array specifying which ROS topics to bridge - {{ mqtt_topic_name }} # The MQTT topic that should be bridged, corresponds to the sub-object in the YAML {{ mqtt_topic_name }}: ros_topic: # ROS topic on which corresponding MQTT messages are published @@ -328,13 +328,13 @@ bridge: primitive: # [false] whether to publish as primitive message (if coming from non-ROS MQTT client) advanced: mqtt: - qos: # [0] MQTT QoS value + qos: # [0] MQTT QoS value ros: - queue_size: # [1] ROS publisher queue size - latched: # [false] whether to latch ROS message + queue_size: # [1] ROS publisher queue size + latched: # [false] whether to latch ROS message qos: - reliability: # [system_default] One of "system_default", "reliable", "best_effort". - durability: # [system_default] One of "system_default", "volatile", "transient_local". + reliability: # [system_default] One of "system_default", "reliable", "best_effort". + durability: # [system_default] One of "system_default", "volatile", "transient_local". ``` ## Primitive Messages diff --git a/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp b/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp index 5174038..9ab9488 100644 --- a/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp +++ b/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp @@ -566,8 +566,7 @@ class MqttClient : public rclcpp::Node, template -bool MqttClient::loadParameter(const std::string& key, T& value) -{ +bool MqttClient::loadParameter(const std::string& key, T& value) { bool found = get_parameter(key, value); if (found) RCLCPP_DEBUG(get_logger(), "Retrieved parameter '%s' = '%s'", key.c_str(), @@ -578,8 +577,7 @@ bool MqttClient::loadParameter(const std::string& key, T& value) template bool MqttClient::loadParameter(const std::string& key, T& value, - const T& default_value) -{ + const T& default_value) { bool found = get_parameter_or(key, value, default_value); if (!found) RCLCPP_WARN(get_logger(), "Parameter '%s' not set, defaulting to '%s'", @@ -592,8 +590,7 @@ bool MqttClient::loadParameter(const std::string& key, T& value, template -bool MqttClient::loadParameter(const std::string& key, std::vector& value) -{ +bool MqttClient::loadParameter(const std::string& key, std::vector& value) { const bool found = get_parameter(key, value); if (found) RCLCPP_WARN(get_logger(), "Retrieved parameter '%s' = '[%s]'", key.c_str(), @@ -604,8 +601,7 @@ bool MqttClient::loadParameter(const std::string& key, std::vector& value) template bool MqttClient::loadParameter(const std::string& key, std::vector& value, - const std::vector& default_value) -{ + const std::vector& default_value) { const bool found = get_parameter_or(key, value, default_value); if (!found) RCLCPP_WARN(get_logger(), "Parameter '%s' not set, defaulting to '%s'", @@ -627,8 +623,7 @@ bool MqttClient::loadParameter(const std::string& key, std::vector& value, */ template void serializeRosMessage(const T& msg, - rclcpp::SerializedMessage& serialized_msg) -{ + rclcpp::SerializedMessage& serialized_msg) { rclcpp::Serialization serializer; serializer.serialize_message(&msg, &serialized_msg); @@ -645,8 +640,7 @@ void serializeRosMessage(const T& msg, */ template void deserializeRosMessage(const rclcpp::SerializedMessage& serialized_msg, - T& msg) -{ + T& msg) { rclcpp::Serialization serializer; serializer.deserialize_message(&serialized_msg, &msg); From 7d8a136f33ca935ea11206f1e160c893e9649953 Mon Sep 17 00:00:00 2001 From: JayHerpin <159202566+JayHerpin@users.noreply.github.com> Date: Tue, 28 May 2024 08:04:46 -0500 Subject: [PATCH 07/15] Apply suggestions from code review Co-authored-by: Lennart Reiher --- mqtt_client/include/mqtt_client/MqttClient.ros2.hpp | 4 ++-- mqtt_client/src/MqttClient.ros2.cpp | 9 ++------- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp b/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp index 9ab9488..c97ad6c 100644 --- a/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp +++ b/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp @@ -73,8 +73,8 @@ class MqttClient : public rclcpp::Node, explicit MqttClient(const rclcpp::NodeOptions& options); protected: - class Ros2MqttInterface; - class Mqtt2RosInterface; + struct Ros2MqttInterface; + struct Mqtt2RosInterface; /** * @brief Loads ROS parameters from parameter server. diff --git a/mqtt_client/src/MqttClient.ros2.cpp b/mqtt_client/src/MqttClient.ros2.cpp index 825c2d7..07ca127 100644 --- a/mqtt_client/src/MqttClient.ros2.cpp +++ b/mqtt_client/src/MqttClient.ros2.cpp @@ -418,12 +418,10 @@ void MqttClient::loadParameters() { // ros2mqtt[k]/ros_type rclcpp::Parameter ros_type_param; if (get_parameter(fmt::format("bridge.ros2mqtt.{}.ros_type", ros_topic), ros_type_param)) { - - ros2mqtt.ros.msg_type = ros_type_param.as_string(); ros2mqtt.fixed_type = true; ros2mqtt.primitive = true; - RCLCPP_DEBUG(get_logger(), "Using explicit ros type %s", ros2mqtt.ros.msg_type.c_str ()); + RCLCPP_DEBUG(get_logger(), "Using explicit ROS message type '%s'", ros2mqtt.ros.msg_type.c_str()); } // ros2mqtt[k]/inject_timestamp @@ -511,8 +509,6 @@ void MqttClient::loadParameters() { Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic]; mqtt2ros.ros.topic = ros_topic; - RCLCPP_DEBUG(get_logger(), "MQTT %s to ROS %s", mqtt_topic.c_str(), ros_topic.c_str ()); - // mqtt2ros[k]/primitive rclcpp::Parameter primitive_param; if (get_parameter(fmt::format("bridge.mqtt2ros.{}.primitive", mqtt_topic), primitive_param)) @@ -524,8 +520,7 @@ void MqttClient::loadParameters() { mqtt2ros.ros.msg_type = ros_type_param.as_string(); mqtt2ros.fixed_type = true; mqtt2ros.primitive = true; - - RCLCPP_DEBUG(get_logger(), "Using explicit ros type %s for %s", mqtt2ros.ros.msg_type.c_str (), ros_topic.c_str ()); + RCLCPP_DEBUG(get_logger(), "Using explicit ROS message type '%s' for '%s'", mqtt2ros.ros.msg_type.c_str(), ros_topic.c_str()); } // mqtt2ros[k]/advanced/mqtt/qos From 1e4d1e8e1c80274a965fd0a383bdca690a6eba12 Mon Sep 17 00:00:00 2001 From: JayHerpin Date: Tue, 28 May 2024 09:05:58 -0500 Subject: [PATCH 08/15] Renaming new example config file to be more descriptive --- ...params.ros2.fixed.yaml => params.ros2.fixed-type-and-qos.yaml} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename mqtt_client/config/{params.ros2.fixed.yaml => params.ros2.fixed-type-and-qos.yaml} (100%) diff --git a/mqtt_client/config/params.ros2.fixed.yaml b/mqtt_client/config/params.ros2.fixed-type-and-qos.yaml similarity index 100% rename from mqtt_client/config/params.ros2.fixed.yaml rename to mqtt_client/config/params.ros2.fixed-type-and-qos.yaml From d90cf4ba2602532a9ef45ae41ee52ae55aa56138 Mon Sep 17 00:00:00 2001 From: JayHerpin Date: Tue, 28 May 2024 09:09:00 -0500 Subject: [PATCH 09/15] Removing erroneous "not implemented" documentation --- mqtt_client/include/mqtt_client/MqttClient.ros2.hpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp b/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp index c97ad6c..5db137a 100644 --- a/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp +++ b/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp @@ -272,8 +272,6 @@ class MqttClient : public rclcpp::Node, /** * @brief Publishes a primitive message received via MQTT to ROS. * - * Currently not implemented. - * * @param mqtt_msg MQTT message */ void mqtt2primitive(mqtt::const_message_ptr mqtt_msg); @@ -281,8 +279,6 @@ class MqttClient : public rclcpp::Node, /** * @brief Publishes a primitive message received via MQTT to ROS. * - * Currently not implemented. - * * @param mqtt_msg MQTT message */ void mqtt2fixed(mqtt::const_message_ptr mqtt_msg); From e30fa1a5ae8d6b87bb68fc637305826040b5dfab Mon Sep 17 00:00:00 2001 From: JayHerpin Date: Tue, 28 May 2024 09:18:45 -0500 Subject: [PATCH 10/15] Implemented primitive conversions for floating point values --- mqtt_client/src/MqttClient.ros2.cpp | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/mqtt_client/src/MqttClient.ros2.cpp b/mqtt_client/src/MqttClient.ros2.cpp index 07ca127..98dbbf4 100644 --- a/mqtt_client/src/MqttClient.ros2.cpp +++ b/mqtt_client/src/MqttClient.ros2.cpp @@ -61,6 +61,18 @@ const std::string MqttClient::kRosMsgTypeMqttTopicPrefix = const std::string MqttClient::kLatencyRosTopicPrefix = "~/latencies/"; +template +T mqtt2float(mqtt::const_message_ptr mqtt_msg) { + auto str_msg = mqtt_msg->to_string (); + std::size_t pos; + const T v = std::stold(str_msg, &pos); + + if (pos != str_msg.size()) + throw std::invalid_argument ("not all charaters processed"); + + return v; +} + template T mqtt2int(mqtt::const_message_ptr mqtt_msg) { auto str_msg = mqtt_msg->to_string (); @@ -155,9 +167,17 @@ bool fixedMqtt2PrimitiveRos(mqtt::const_message_ptr mqtt_msg, serializeRosMessage(msg, serialized_msg); return true; } else if (msg_type == "std_msgs/msg/Float32") { - /// TODO + std_msgs::msg::Float32 msg; + msg.data = mqtt2float(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + return true; } else if (msg_type == "std_msgs/msg/Float64") { - /// TODO + std_msgs::msg::Float64 msg; + msg.data = mqtt2float(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + return true; } } catch (const std::exception &) { } From 6bb674f8c2a30079199fc1e7fcbf48e0bb4ff856 Mon Sep 17 00:00:00 2001 From: JayHerpin Date: Tue, 28 May 2024 10:10:36 -0500 Subject: [PATCH 11/15] Minor refactor to reduce the number of lines of code --- mqtt_client/src/MqttClient.ros2.cpp | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/mqtt_client/src/MqttClient.ros2.cpp b/mqtt_client/src/MqttClient.ros2.cpp index 98dbbf4..3f8c121 100644 --- a/mqtt_client/src/MqttClient.ros2.cpp +++ b/mqtt_client/src/MqttClient.ros2.cpp @@ -105,84 +105,77 @@ bool fixedMqtt2PrimitiveRos(mqtt::const_message_ptr mqtt_msg, msg.data = mqtt_msg->to_string(); serializeRosMessage(msg, serialized_msg); - return true; } else if (msg_type == "std_msgs/msg/Bool") { std_msgs::msg::Bool msg; msg.data = mqtt2bool(mqtt_msg); serializeRosMessage(msg, serialized_msg); - return true; } else if (msg_type == "std_msgs/msg/Char") { std_msgs::msg::Char msg; msg.data = mqtt2int(mqtt_msg); serializeRosMessage(msg, serialized_msg); - return true; } else if (msg_type == "std_msgs/msg/UInt8") { std_msgs::msg::UInt8 msg; msg.data = mqtt2int(mqtt_msg); serializeRosMessage(msg, serialized_msg); - return true; } else if (msg_type == "std_msgs/msg/UInt16") { std_msgs::msg::UInt16 msg; msg.data = mqtt2int(mqtt_msg); serializeRosMessage(msg, serialized_msg); - return true; } else if (msg_type == "std_msgs/msg/UInt32") { std_msgs::msg::UInt32 msg; msg.data = mqtt2int(mqtt_msg); serializeRosMessage(msg, serialized_msg); - return true; } else if (msg_type == "std_msgs/msg/UInt64") { std_msgs::msg::UInt64 msg; msg.data = mqtt2int(mqtt_msg); serializeRosMessage(msg, serialized_msg); - return true; } else if (msg_type == "std_msgs/msg/Int8") { std_msgs::msg::Int8 msg; msg.data = mqtt2int(mqtt_msg); serializeRosMessage(msg, serialized_msg); - return true; } else if (msg_type == "std_msgs/msg/Int16") { std_msgs::msg::Int16 msg; msg.data = mqtt2int(mqtt_msg); serializeRosMessage(msg, serialized_msg); - return true; } else if (msg_type == "std_msgs/msg/Int32") { std_msgs::msg::Int32 msg; msg.data = mqtt2int(mqtt_msg); serializeRosMessage(msg, serialized_msg); - return true; } else if (msg_type == "std_msgs/msg/Int64") { std_msgs::msg::Int32 msg; msg.data = mqtt2int(mqtt_msg); serializeRosMessage(msg, serialized_msg); - return true; } else if (msg_type == "std_msgs/msg/Float32") { std_msgs::msg::Float32 msg; msg.data = mqtt2float(mqtt_msg); serializeRosMessage(msg, serialized_msg); - return true; } else if (msg_type == "std_msgs/msg/Float64") { std_msgs::msg::Float64 msg; msg.data = mqtt2float(mqtt_msg); serializeRosMessage(msg, serialized_msg); - return true; + } else { + throw std::domain_error("Unhandled message type (" + msg_type + ")"); } + + return true; + } catch (const std::exception &) { + return false; } - return false; + } /** From d6a8d7922e83b30c8edfaf7620a1624221e17d39 Mon Sep 17 00:00:00 2001 From: JayHerpin Date: Tue, 28 May 2024 10:14:16 -0500 Subject: [PATCH 12/15] Aligning the parameter descriptions with what is in the README file --- mqtt_client/src/MqttClient.ros2.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mqtt_client/src/MqttClient.ros2.cpp b/mqtt_client/src/MqttClient.ros2.cpp index 3f8c121..01db7fb 100644 --- a/mqtt_client/src/MqttClient.ros2.cpp +++ b/mqtt_client/src/MqttClient.ros2.cpp @@ -322,7 +322,7 @@ void MqttClient::loadParameters() { declare_parameter(fmt::format("bridge.ros2mqtt.{}.mqtt_topic", ros_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); param_desc.description = "whether to publish as primitive message"; declare_parameter(fmt::format("bridge.ros2mqtt.{}.primitive", ros_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc); - param_desc.description = "Explicitly set the ros message type"; + param_desc.description = "If set, the ROS msg type provided will be used. If empty, the type is automatically deduced via the publisher"; declare_parameter(fmt::format("bridge.ros2mqtt.{}.ros_type", ros_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); param_desc.description = "whether to attach a timestamp to a ROS2MQTT payload (for latency computation on receiver side)"; declare_parameter(fmt::format("bridge.ros2mqtt.{}.inject_timestamp", ros_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc); @@ -346,9 +346,9 @@ void MqttClient::loadParameters() { declare_parameter(fmt::format("bridge.mqtt2ros.{}.ros_topic", mqtt_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); param_desc.description = "whether to publish as primitive message (if coming from non-ROS MQTT client)"; declare_parameter(fmt::format("bridge.mqtt2ros.{}.primitive", mqtt_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc); - param_desc.description = "Explicitly set the ros message type"; + param_desc.description = "If set, the ROS msg type provided will be used. If empty, the type is automatically deduced via the MQTT message"; declare_parameter(fmt::format("bridge.mqtt2ros.{}.ros_type", mqtt_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); - param_desc.description = "The encoding to use for the mqtt "; + param_desc.description = "MQTT QoS value"; declare_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.mqtt.qos", mqtt_topic), rclcpp::ParameterType::PARAMETER_INTEGER, param_desc); param_desc.description = "ROS publisher queue size"; declare_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.ros.queue_size", mqtt_topic), rclcpp::ParameterType::PARAMETER_INTEGER, param_desc); From 8a6bd378d739a08f270c0f4c08701c19725d4174 Mon Sep 17 00:00:00 2001 From: JayHerpin Date: Tue, 28 May 2024 11:19:49 -0500 Subject: [PATCH 13/15] Allowing non-primitive with fixed types This change now allows non-primitive encoding to work with fixed types on either side of the bridge. Note that if the ros type specified mqtt2ros does not match the one received from the mqtt type topic then the config file specified version will win (eg, the type will not be allowed to change). Note that this is necessary since ROS doesn't really allow us to change the type of topics on the fly and since we used a fixed type, the publisher was created on startup. --- .../params.ros2.fixed-type-and-qos.yaml | 6 +-- ...ams.ros2.primitive-fixed-type-and-qos.yaml | 32 ++++++++++++++ .../include/mqtt_client/MqttClient.ros2.hpp | 2 +- mqtt_client/src/MqttClient.ros2.cpp | 44 +++++++++++++------ 4 files changed, 66 insertions(+), 18 deletions(-) create mode 100644 mqtt_client/config/params.ros2.primitive-fixed-type-and-qos.yaml diff --git a/mqtt_client/config/params.ros2.fixed-type-and-qos.yaml b/mqtt_client/config/params.ros2.fixed-type-and-qos.yaml index 5cb6648..176a62c 100644 --- a/mqtt_client/config/params.ros2.fixed-type-and-qos.yaml +++ b/mqtt_client/config/params.ros2.fixed-type-and-qos.yaml @@ -9,8 +9,7 @@ - /ping/primitive /ping/primitive: mqtt_topic: pingpong/primitive - # This is implied by the explicit ros type - #primitive: true + primitive: false ros_type: std_msgs/msg/Bool advanced: ros: @@ -23,8 +22,7 @@ - pingpong/primitive pingpong/primitive: ros_topic: /pong/primitive - #This is implied by the explicit ros type - #primitive: true + primitive: false ros_type: std_msgs/msg/Bool advanced: ros: diff --git a/mqtt_client/config/params.ros2.primitive-fixed-type-and-qos.yaml b/mqtt_client/config/params.ros2.primitive-fixed-type-and-qos.yaml new file mode 100644 index 0000000..80266b3 --- /dev/null +++ b/mqtt_client/config/params.ros2.primitive-fixed-type-and-qos.yaml @@ -0,0 +1,32 @@ +/**/*: + ros__parameters: + broker: + host: localhost + port: 1883 + bridge: + ros2mqtt: + ros_topics: + - /ping/primitive + /ping/primitive: + mqtt_topic: pingpong/primitive + primitive: true + ros_type: std_msgs/msg/Bool + advanced: + ros: + queue_size: 10 + qos: + durability: auto + reliability: auto + mqtt2ros: + mqtt_topics: + - pingpong/primitive + pingpong/primitive: + ros_topic: /pong/primitive + primitive: true + ros_type: std_msgs/msg/Bool + advanced: + ros: + queue_size: 10 + qos: + durability: transient_local + reliability: reliable diff --git a/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp b/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp index 5db137a..0d5c6b8 100644 --- a/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp +++ b/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp @@ -474,7 +474,7 @@ class MqttClient : public rclcpp::Node, bool latched = false; ///< whether to latch ROS message bool is_stale = false; ///< whether a new generic publisher/subscriber is required } ros; ///< ROS-related variables - bool fixed_type = false; ///< whether the published ros message type is specified explicitly + bool fixed_type = false; ///< whether the published ros message type is specified explicitly bool primitive = false; ///< whether to publish as primitive message (if ///< coming from non-ROS MQTT client) bool stamped = false; ///< whether timestamp is injected diff --git a/mqtt_client/src/MqttClient.ros2.cpp b/mqtt_client/src/MqttClient.ros2.cpp index 01db7fb..443006e 100644 --- a/mqtt_client/src/MqttClient.ros2.cpp +++ b/mqtt_client/src/MqttClient.ros2.cpp @@ -433,7 +433,6 @@ void MqttClient::loadParameters() { if (get_parameter(fmt::format("bridge.ros2mqtt.{}.ros_type", ros_topic), ros_type_param)) { ros2mqtt.ros.msg_type = ros_type_param.as_string(); ros2mqtt.fixed_type = true; - ros2mqtt.primitive = true; RCLCPP_DEBUG(get_logger(), "Using explicit ROS message type '%s'", ros2mqtt.ros.msg_type.c_str()); } @@ -532,7 +531,6 @@ void MqttClient::loadParameters() { if (get_parameter(fmt::format("bridge.mqtt2ros.{}.ros_type", mqtt_topic), ros_type_param)) { mqtt2ros.ros.msg_type = ros_type_param.as_string(); mqtt2ros.fixed_type = true; - mqtt2ros.primitive = true; RCLCPP_DEBUG(get_logger(), "Using explicit ROS message type '%s' for '%s'", mqtt2ros.ros.msg_type.c_str(), ros_topic.c_str()); } @@ -1280,11 +1278,17 @@ void MqttClient::connected(const std::string& cause) { // subscribe MQTT topics for (const auto& [mqtt_topic, mqtt2ros] : mqtt2ros_) { - std::string mqtt_topic_to_subscribe = mqtt_topic; - if (!mqtt2ros.primitive) // subscribe topics for ROS message types first - mqtt_topic_to_subscribe = kRosMsgTypeMqttTopicPrefix + mqtt_topic; - client_->subscribe(mqtt_topic_to_subscribe, mqtt2ros.mqtt.qos); - RCLCPP_INFO(get_logger(), "Subscribed MQTT topic '%s'", mqtt_topic_to_subscribe.c_str()); + if (!mqtt2ros.primitive) { + std::string const mqtt_topic_to_subscribe = kRosMsgTypeMqttTopicPrefix + mqtt_topic; + client_->subscribe(mqtt_topic_to_subscribe, mqtt2ros.mqtt.qos); + RCLCPP_INFO(get_logger(), "Subscribed MQTT topic '%s'", mqtt_topic_to_subscribe.c_str()); + } + // If not primitive and not fixed, we need the message type before we can public. In that case + // wait for the type to come in before subscribing to the data topic + if (mqtt2ros.primitive || mqtt2ros.fixed_type) { + client_->subscribe(mqtt_topic, mqtt2ros.mqtt.qos); + RCLCPP_INFO(get_logger(), "Subscribed MQTT topic '%s'", mqtt_topic.c_str()); + } } } @@ -1395,13 +1399,12 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) { if (mqtt2ros_.count(mqtt_topic) > 0) { Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic]; - if (mqtt2ros.fixed_type) { - mqtt2fixed(mqtt_msg); - return; - } - if (mqtt2ros.primitive) { - mqtt2primitive(mqtt_msg); + if (mqtt2ros.fixed_type) { + mqtt2fixed(mqtt_msg); + } else { + mqtt2primitive(mqtt_msg); + } return; } } @@ -1432,6 +1435,21 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) { // if ROS message type has changed or if mapping is stale if (ros_msg_type.name != mqtt2ros.ros.msg_type || mqtt2ros.ros.is_stale) { + if (mqtt2ros.fixed_type) { + // We should never be in this situation if the type has been set explicitly. As fixed_type + // is not currently supported through the service based bridge creation and the type name + // not matching means the fixed type specified in the configuration does not match the + // one we just recieved + if (ros_msg_type.name != mqtt2ros.ros.msg_type) + RCLCPP_ERROR(get_logger(), + "Unexpected type name received for topic %s (expected %s but received %s)", + mqtt2ros.ros.topic.c_str(), mqtt2ros.ros.msg_type.c_str(), ros_msg_type.name.c_str()); + if (mqtt2ros.ros.is_stale) + RCLCPP_ERROR(get_logger(), "Topic %s has been unexpectedly marked stale", + mqtt2ros.ros.topic.c_str()); + return; + } + mqtt2ros.ros.msg_type = ros_msg_type.name; mqtt2ros.stamped = ros_msg_type.stamped; RCLCPP_INFO(get_logger(), From 74d64ee6379392b00b4e8e2a7fb9d7cde5938cd1 Mon Sep 17 00:00:00 2001 From: JayHerpin Date: Tue, 28 May 2024 11:24:26 -0500 Subject: [PATCH 14/15] Removing hack altering bool encodings when in "fixed type" mode --- mqtt_client/src/MqttClient.ros2.cpp | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/mqtt_client/src/MqttClient.ros2.cpp b/mqtt_client/src/MqttClient.ros2.cpp index 443006e..02b1a91 100644 --- a/mqtt_client/src/MqttClient.ros2.cpp +++ b/mqtt_client/src/MqttClient.ros2.cpp @@ -188,7 +188,6 @@ bool fixedMqtt2PrimitiveRos(mqtt::const_message_ptr mqtt_msg, * * @param [in] serialized_msg generic serialized ROS message * @param [in] msg_type ROS message type, e.g. `std_msgs/msg/String` - * @param [in] fixed_encoding Truthfully this is just an ugly hack to change the encoding of bools in some cases * @param [out] primitive string representation of primitive message data * * @return true if primitive ROS message type was found @@ -196,7 +195,7 @@ bool fixedMqtt2PrimitiveRos(mqtt::const_message_ptr mqtt_msg, */ bool primitiveRosMessageToString( const std::shared_ptr& serialized_msg, - const std::string& msg_type, bool fixed_encoding, std::string& primitive) { + const std::string& msg_type, std::string& primitive) { bool found_primitive = true; @@ -207,10 +206,7 @@ bool primitiveRosMessageToString( } else if (msg_type == "std_msgs/msg/Bool") { std_msgs::msg::Bool msg; deserializeRosMessage(*serialized_msg, msg); - if (fixed_encoding) - primitive = msg.data ? "1" : "0"; - else - primitive = msg.data ? "true" : "false"; + primitive = msg.data ? "true" : "false"; } else if (msg_type == "std_msgs/msg/Char") { std_msgs::msg::Char msg; deserializeRosMessage(*serialized_msg, msg); @@ -954,7 +950,7 @@ void MqttClient::ros2mqtt( // resolve ROS messages to primitive strings if possible std::string payload; bool found_primitive = - primitiveRosMessageToString(serialized_msg, ros_msg_type.name, ros2mqtt.fixed_type, payload); + primitiveRosMessageToString(serialized_msg, ros_msg_type.name, payload); if (found_primitive) { payload_buffer = std::vector(payload.begin(), payload.end()); } else { From 1e0af0aef1685259d05e60adeb62f220ae21ca67 Mon Sep 17 00:00:00 2001 From: JayHerpin Date: Tue, 28 May 2024 12:08:57 -0500 Subject: [PATCH 15/15] Resolving issued with explicit QoS for the ros2mqtt ros subscribers --- mqtt_client/src/MqttClient.ros2.cpp | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/mqtt_client/src/MqttClient.ros2.cpp b/mqtt_client/src/MqttClient.ros2.cpp index 02b1a91..c45be95 100644 --- a/mqtt_client/src/MqttClient.ros2.cpp +++ b/mqtt_client/src/MqttClient.ros2.cpp @@ -452,7 +452,7 @@ void MqttClient::loadParameters() { ros2mqtt.ros.queue_size = queue_size_param.as_int(); rclcpp::Parameter durability_param; - if (get_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.qos.durability", mqtt_topic), durability_param)) { + if (get_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.qos.durability", ros_topic), durability_param)) { const auto p = durability_param.as_string(); if (p == "system_default") { ros2mqtt.ros.qos.durability = rclcpp::DurabilityPolicy::SystemDefault; @@ -469,7 +469,7 @@ void MqttClient::loadParameters() { } rclcpp::Parameter reliability_param; - if (get_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.qos.reliability", mqtt_topic), reliability_param)) { + if (get_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.qos.reliability", ros_topic), reliability_param)) { const auto p = reliability_param.as_string(); if (p == "system_default") { ros2mqtt.ros.qos.reliability = rclcpp::ReliabilityPolicy::SystemDefault; @@ -778,8 +778,12 @@ void MqttClient::setupSubscriptions() { if (ros2mqtt.ros.subscriber) continue; + auto const qos = rclcpp::QoS(ros2mqtt.ros.queue_size) + .reliability(*ros2mqtt.ros.qos.reliability) + .durability(*ros2mqtt.ros.qos.durability); + ros2mqtt.ros.subscriber = create_generic_subscription( - ros_topic, ros2mqtt.ros.msg_type, ros2mqtt.ros.queue_size, bound_callback_func); + ros_topic, ros2mqtt.ros.msg_type, qos, bound_callback_func); ros2mqtt.ros.is_stale = false; RCLCPP_INFO(get_logger(), "Subscribed ROS topic '%s' of type '%s'", ros_topic.c_str(), ros2mqtt.ros.msg_type.c_str());