Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JSON support to achieve full integration with non-ROS devices for arbitrary ROS message types #51

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 84 additions & 13 deletions README.md

Large diffs are not rendered by default.

20 changes: 17 additions & 3 deletions mqtt_client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ if(${ROS_VERSION} EQUAL 2)
find_package(mqtt_client_interfaces REQUIRED)
find_package(rclcpp REQUIRED)
find_package(rclcpp_components REQUIRED)
find_package(rosbag2_cpp REQUIRED)
find_package(rosidl_typesupport_cpp REQUIRED)
find_package(std_msgs REQUIRED)
find_package(fastcdr REQUIRED)
find_package(rosx_introspection REQUIRED)

# Paho MQTT C++ apt package doesn't include CMake config
# find_package(PahoMqttCpp REQUIRED)
Expand All @@ -35,19 +39,27 @@ if(${ROS_VERSION} EQUAL 2)

target_include_directories(${PROJECT_NAME}_lib PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>)
$<INSTALL_INTERFACE:include>
${rosx_introspection_INCLUDE_DIRS}
)

target_link_libraries(${PROJECT_NAME}_lib
${PahoMqttC_LIBRARY}
${PahoMqttCpp_LIBRARY}
${rosx_introspection_LIBRARIES}
fastcdr

)

ament_target_dependencies(${PROJECT_NAME}_lib
fmt
mqtt_client_interfaces
rclcpp
rclcpp_components
rosbag2_cpp
rosidl_typesupport_cpp
std_msgs
rosx_introspection
)

install(TARGETS ${PROJECT_NAME}_lib
Expand Down Expand Up @@ -95,12 +107,13 @@ elseif(${ROS_VERSION} EQUAL 1)
rosfmt ## For fmt::format.
std_msgs
topic_tools
rosx_introspection
)

## System dependencies are found with CMake's conventions
find_package(PahoMqttCpp REQUIRED)
set(PahoMqttCpp_LIBRARIES PahoMqttCpp::paho-mqttpp3)

find_package(fastcdr REQUIRED)

## Uncomment this if the package has a setup.py. This macro ensures
## modules and global scripts declared therein get installed
Expand Down Expand Up @@ -237,7 +250,8 @@ elseif(${ROS_VERSION} EQUAL 1)
target_link_libraries(${PROJECT_NAME}
${catkin_LIBRARIES}
${PahoMqttCpp_LIBRARIES}
)
fastcdr
)

#############
## Install ##
Expand Down
2 changes: 2 additions & 0 deletions mqtt_client/config/params.ros2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ mqtt_client:
- /ping/ros
/ping/ros:
mqtt_topic: pingpong/ros
json: true
mqtt2ros:
mqtt_topics:
- pingpong/ros
pingpong/ros:
ros_topic: /pong/ros
json: true
6 changes: 6 additions & 0 deletions mqtt_client/config/params.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@ bridge:
ros2mqtt:
- ros_topic: /ping/ros
mqtt_topic: pingpong/ros
- ros_topic: /ping/json
mqtt_topic: pingpong/json
json: true
- ros_topic: /ping/primitive
mqtt_topic: pingpong/primitive
primitive: true
mqtt2ros:
- mqtt_topic: pingpong/ros
ros_topic: /pong/ros
- mqtt_topic: pingpong/json
ros_topic: /pong/json
json: true
- mqtt_topic: pingpong/primitive
ros_topic: /pong/primitive
primitive: true
23 changes: 17 additions & 6 deletions mqtt_client/include/mqtt_client/MqttClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ SOFTWARE.
#include <nodelet/nodelet.h>
#include <ros/ros.h>
#include <rosfmt/full.h> // fmt::format, fmt::join
#include <rosx_introspection/ros_parser.hpp>
#include <topic_tools/shape_shifter.h>


#include "rapidjson/document.h"
#include "rapidjson/stringbuffer.h"
#include "rapidjson/writer.h"
/**
* @brief Namespace for the mqtt_client package
*/
Expand Down Expand Up @@ -385,7 +387,12 @@ class MqttClient : public nodelet::Nodelet,
bool retained = false; ///< whether to retain MQTT message
} mqtt; ///< MQTT-related variables
bool primitive = false; ///< whether to publish as primitive message
bool stamped = false; ///< whether to inject timestamp in MQTT message
bool json = false; ///< whether the serial messages flowing through MQTT
///< broker are JSON format
std::shared_ptr< RosMsgParser::ParsersCollection<RosMsgParser::ROS_Deserializer>> json_parser; ///< parser from json to ROS message and vice-versa


bool stamped = false; ///< whether to inject timestamp in MQTT message
};

/**
Expand All @@ -405,7 +412,11 @@ class MqttClient : public nodelet::Nodelet,
} ros; ///< ROS-related variables
bool primitive = false; ///< whether to publish as primitive message (if
///< coming from non-ROS MQTT client)
bool stamped = false; ///< whether timestamp is injected
bool json = false; ///< whether the serial messages flowing through MQTT
///< broker are JSON format
std::shared_ptr< RosMsgParser::ParsersCollection<RosMsgParser::ROS_Deserializer>> json_parser; ///< parser from json to ROS message and vice-versa

bool stamped = false; ///< whether timestamp is injected
};

protected:
Expand Down Expand Up @@ -517,11 +528,11 @@ bool MqttClient::loadParameter(const std::string& key, std::vector<T>& value)
template <typename T>
bool MqttClient::loadParameter(const std::string& key, std::vector<T>& value,
const std::vector<T>& default_value)
{
{
const bool found = private_node_handle_.param<T>(key, value, default_value);
if (!found)
NODELET_WARN("Parameter '%s' not set, defaulting to '%s'", key.c_str(),
fmt::format("{}", fmt::join(value, ", ")).c_str());
fmt::format("{}", fmt::join(value, ", ")).c_str());
if (found)
NODELET_DEBUG("Retrieved parameter '%s' = '%s'", key.c_str(),
fmt::format("{}", fmt::join(value, ", ")).c_str());
Expand Down
12 changes: 11 additions & 1 deletion mqtt_client/include/mqtt_client/MqttClient.ros2.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ SOFTWARE.
#include <rclcpp/rclcpp.hpp>
#include <rclcpp/serialization.hpp>
#include <std_msgs/msg/float64.hpp>

#include <rosx_introspection/ros_parser.hpp>
#include "rosx_introspection/ros_utils/ros2_helpers.hpp"
#include "rapidjson/document.h"
#include "rapidjson/stringbuffer.h"
#include "rapidjson/writer.h"

/**
* @brief Namespace for the mqtt_client package
Expand Down Expand Up @@ -404,6 +408,9 @@ class MqttClient : public rclcpp::Node,
bool retained = false; ///< whether to retain MQTT message
} mqtt; ///< MQTT-related variables
bool primitive = false; ///< whether to publish as primitive message
bool json = false; ///< whether the serial messages flowing through MQTT
///< broker are JSON format
std::shared_ptr< RosMsgParser::ParsersCollection<RosMsgParser::ROS2_Deserializer>> json_parser; ///< parser from json to ROS message and vice-versa
bool stamped = false; ///< whether to inject timestamp in MQTT message
};

Expand All @@ -426,6 +433,9 @@ class MqttClient : public rclcpp::Node,
} ros; ///< ROS-related variables
bool primitive = false; ///< whether to publish as primitive message (if
///< coming from non-ROS MQTT client)
bool json = false; ///< whether the serial messages flowing through MQTT
///< broker are JSON format
std::shared_ptr< RosMsgParser::ParsersCollection<RosMsgParser::ROS2_Deserializer>> json_parser; ///< parser from json to ROS message and vice-versa
bool stamped = false; ///< whether timestamp is injected
};

Expand Down
3 changes: 3 additions & 0 deletions mqtt_client/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
<depend>mqtt_client_interfaces</depend>
<depend>ros_environment</depend>
<depend>std_msgs</depend>
<depend>rosx_introspection</depend>

<!-- ROS2 -->
<buildtool_depend condition="$ROS_VERSION == 2">ament_cmake</buildtool_depend>
Expand All @@ -29,6 +30,8 @@
<depend condition="$ROS_VERSION == 2">rclcpp</depend>
<depend condition="$ROS_VERSION == 2">rclcpp_components</depend>
<depend condition="$ROS_VERSION == 2">rcpputils</depend>
<depend condition="$ROS_VERSION == 2">rosbag2_cpp</depend>
<depend condition="$ROS_VERSION == 2">rosidl_typesupport_cpp</depend>

<!-- ROS1 -->
<buildtool_depend condition="$ROS_VERSION == 1">catkin</buildtool_depend>
Expand Down
109 changes: 96 additions & 13 deletions mqtt_client/src/MqttClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ void MqttClient::loadParameters() {
if (ros2mqtt_params[k].hasMember("primitive"))
ros2mqtt.primitive = ros2mqtt_params[k]["primitive"];

// ros2mqtt[k]/json
if (ros2mqtt_params[k].hasMember("json"))
ros2mqtt.json = ros2mqtt_params[k]["json"];

// ros2mqtt[k]/inject_timestamp
if (ros2mqtt_params[k].hasMember("inject_timestamp"))
ros2mqtt.stamped = ros2mqtt_params[k]["inject_timestamp"];
Expand Down Expand Up @@ -295,6 +299,10 @@ void MqttClient::loadParameters() {
if (mqtt2ros_params[k].hasMember("primitive"))
mqtt2ros.primitive = mqtt2ros_params[k]["primitive"];

// mqtt2ros[k]/json
if (mqtt2ros_params[k].hasMember("json"))
mqtt2ros.json = mqtt2ros_params[k]["json"];

// mqtt2ros[k]/advanced
if (mqtt2ros_params[k].hasMember("advanced")) {
XmlRpc::XmlRpcValue& advanced_params =
Expand Down Expand Up @@ -504,7 +512,64 @@ void MqttClient::ros2mqtt(const topic_tools::ShapeShifter::ConstPtr& ros_msg,
NODELET_DEBUG("Received ROS message of type '%s' on topic '%s'",
ros_msg_type.name.c_str(), ros_topic.c_str());

if (ros2mqtt.primitive) { // publish as primitive (string) message
if (ros2mqtt.json) {


// if ROS message type has changed
if (ros_msg_type.md5 !=
mqtt2ros_[ros2mqtt.mqtt.topic].ros.shape_shifter.getMD5Sum()) {

Mqtt2RosInterface& mqtt2ros = mqtt2ros_[ros2mqtt.mqtt.topic];

NODELET_INFO("ROS publisher message type on topic '%s' set to '%s'",
mqtt2ros.ros.topic.c_str(), ros_msg_type.name.c_str());

// configure ShapeShifter
mqtt2ros.ros.shape_shifter.morph(ros_msg_type.md5, ros_msg_type.name,
ros_msg_type.definition, "");

// advertise with ROS publisher
mqtt2ros.ros.publisher.shutdown();
mqtt2ros.ros.publisher = mqtt2ros.ros.shape_shifter.advertise(
node_handle_, mqtt2ros.ros.topic, mqtt2ros.ros.queue_size,
mqtt2ros.ros.latched);

// subscribe to MQTT topic with actual ROS messages
client_->subscribe(ros2mqtt.mqtt.topic, mqtt2ros.mqtt.qos);


// configure json parser
ros2mqtt.json_parser = std::make_shared<RosMsgParser::ParsersCollection<RosMsgParser::ROS_Deserializer>>();
ros2mqtt.json_parser->registerParser(ros2mqtt.mqtt.topic, ros_msg_type.name, ros_msg_type.definition);
mqtt2ros_[ros2mqtt.mqtt.topic].json_parser = ros2mqtt.json_parser;
}

// deserialize the shapeshifter message into json
std::vector<uint8_t> tmp_buffer;
std::string payload;
tmp_buffer.resize(ros_msg->size());
ros::serialization::OStream stream(tmp_buffer.data(), tmp_buffer.size());
ros_msg->write(stream);
payload =*(ros2mqtt.json_parser->deserializeIntoJson(ros2mqtt.mqtt.topic,RosMsgParser::Span<uint8_t>(tmp_buffer),false));

// parse the created json string from format {topic:{t} , msg: {m}} into {m}
size_t startIdx = payload.find("msg");
if (startIdx != std::string::npos) {
startIdx += 6; // Move past "(msg:)"
size_t endIdx = payload.rfind("}");
if (endIdx != std::string::npos)
payload = "{" + payload.substr(startIdx, endIdx - startIdx);
else
NODELET_ERROR("not the expected json format");
} else
NODELET_ERROR("not the expected json format");

// serialize the json string into payload_buffer
payload_buffer = std::vector<uint8_t>(payload.begin(), payload.end());
}


else if (ros2mqtt.primitive) { // publish as primitive (string) message

// resolve ROS messages to primitive strings if possible
std::string payload;
Expand Down Expand Up @@ -606,7 +671,7 @@ void MqttClient::ros2mqtt(const topic_tools::ShapeShifter::ConstPtr& ros_msg,

void MqttClient::mqtt2ros(mqtt::const_message_ptr mqtt_msg,
const ros::WallTime& arrival_stamp) {

std::string mqtt_topic = mqtt_msg->get_topic();
Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic];
auto& payload = mqtt_msg->get_payload_ref();
Expand Down Expand Up @@ -652,19 +717,37 @@ void MqttClient::mqtt2ros(mqtt::const_message_ptr mqtt_msg,
msg_length -= stamp_length;
msg_offset += stamp_length;
}
if (mqtt2ros.json) {
// the message is a serialized json string
std::vector<uint8_t> msg_buffer;
std::string msgStr = mqtt_msg->get_payload_str();

// convert json into a serialized ROS message, using the json_parser object
msg_buffer= mqtt2ros.json_parser->serializeFromJson( mqtt_topic, &msgStr);
ros::serialization::OStream msg_stream(&msg_buffer[0], msg_buffer.size());
// publish via ShapeShifter
NODELET_DEBUG(
"Sending ROS message of type '%s' from MQTT broker to ROS topic '%s' ...",
mqtt2ros.ros.shape_shifter.getDataType().c_str(),
mqtt2ros.ros.topic.c_str());

// create ROS message buffer on top of MQTT message payload
char* non_const_payload = const_cast<char*>(&payload[msg_offset]);
uint8_t* msg_buffer = reinterpret_cast<uint8_t*>(non_const_payload);
ros::serialization::OStream msg_stream(msg_buffer, msg_length);
mqtt2ros.ros.shape_shifter.read(msg_stream);
mqtt2ros.ros.publisher.publish(mqtt2ros.ros.shape_shifter);
}

// publish via ShapeShifter
NODELET_DEBUG(
"Sending ROS message of type '%s' from MQTT broker to ROS topic '%s' ...",
mqtt2ros.ros.shape_shifter.getDataType().c_str(),
mqtt2ros.ros.topic.c_str());
mqtt2ros.ros.shape_shifter.read(msg_stream);
mqtt2ros.ros.publisher.publish(mqtt2ros.ros.shape_shifter);
else { // create ROS message buffer on top of MQTT message payload
char* non_const_payload = const_cast<char*>(&payload[msg_offset]);
uint8_t* msg_buffer = reinterpret_cast<uint8_t*>(non_const_payload);
ros::serialization::OStream msg_stream(msg_buffer, msg_length);

// publish via ShapeShifter
NODELET_DEBUG(
"Sending ROS message of type '%s' from MQTT broker to ROS topic '%s' ...",
mqtt2ros.ros.shape_shifter.getDataType().c_str(),
mqtt2ros.ros.topic.c_str());
mqtt2ros.ros.shape_shifter.read(msg_stream);
mqtt2ros.ros.publisher.publish(mqtt2ros.ros.shape_shifter);
}
}


Expand Down
Loading