Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically match QoS settings across the bridge #5

Merged
merged 15 commits into from
Mar 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 68 additions & 3 deletions doc/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ All ROS communication entities (e.g. publishers and service clients) have [Quali
The bridge should faithfully map the QoS settings of data coming from one domain into another domain.
For example, a publisher with reliability policy set to "best effort" should continue to publish as "best effort" in the other domains when bridged.

If there are multiple publishers on the same topic, but with different QoS settings, the bridge should have two streams of data, each with their own QoS settings.
If there are multiple publishers on the same topic, but with different QoS settings, it would be nice if the bridge could preserve each stream of data.
For example, if there is one publisher using "best effort" and another publisher using "reliable" on the same topic (with the same domain ID) and a bridge is made,
then the bridge should forward data as "best effort" for the first publisher and as "reliable" for the second publisher into the output domain.
Unfortunately, this may not be possible due to technical limitations so we leave this as a soft requirement.
Refer to the *QoS mapping* section of the proposed approach below for a more detailed discussion.

Since remotely querying the *history* and *depth* QoS policies is not possible in many implementations (e.g. it is not required by the DDS spec),
the bridge should offer a way to configure these values explicitly.
Expand Down Expand Up @@ -99,7 +101,6 @@ For example, the following diagram illustrates a configuration where the "/chatt
A C++ library with a public API is provided for users to call in their own process and extend as they like.
The public API is expected to evolve over time, but at the very least users should be able to bridge ROS networks primitives.


For convenience, a standalone binary executable is also provided for easy integration into ROS systems.

### Supporting generic types
Expand All @@ -112,7 +113,58 @@ In fact, the generic publisher and subscription implementation is being moved to

### QoS mapping

TODO
The QoS settings of a publisher can be queried by the bridge.
With this information, the bridge can create a subscription using the same QoS settings for receiving data.
The bridge can also create a publisher for the domain being bridged with the same QoS settings.
In this way, for the single publisher case, the QoS settings are preserved from one domain to another domain.
However, there are issues related to the *liveliness* policy as well as multiple publishers per topic that are discussed below.

We must consider the scenario when the domain bridge starts *after* the publisher of a bridged topic becomes available.
In this case, the bridge cannot know what QoS settings to use for the bridged topic.
The solution is to have the bridge wait until a publisher becomes available before creating it's own subscription and publisher for that topic.
jacobperron marked this conversation as resolved.
Show resolved Hide resolved

#### Liveliness

The liveliness QoS policy can either be set to "automatic" or "manual by topic".
If it is "automatic", the system will use the published messages as a heartbeat to consider if the publishers node is alive.
If it is "manual by topic", the node needs to manually call a publisher API to assert it is alive.

This poses a problem for the domain bridge.
If the liveliness of a publisher is "manual by topic", then the bridge cannot mimic the QoS behavior into another domain.
It would require the bridge to know when the original publisher is asserting its liveliness (i.e. calling the publisher API).
Since there is no mechanism in ROS 2 to get this information at the subscription site, the bridge cannot support "manual by topic" liveliness.
Therefore, the bridge will always use "automatic" liveliness, regardless of the original publishers policy.

#### Multiple publishers

As mentioned in the requirements, if there are multiple publishers on the same topic, it would be nice if the bridge could preserve multiple streams of data, each with their own QoS settings.
Unfortunately, this is technically challenging because it is difficult to associate a ROS message received by a subscription with the publisher that originally published the message.
Consider the following scenario:

1. Publisher *A* publishes on topic "chatter" with a QoS reliability setting of *reliable*.
2. Publisher *B* publishes on topic "chatter" with a QoS reliability setting of *best effort*.
3. In order to receive a message from *B*, we must create a subscription with a QoS reliability setting of *best effort*.
4. The subscription will also receive messages from *A*, since *best effort* subscriptions also matches with *reliable* publishers.

If we cannot distinguish whether a message came from publisher *A* or publisher *B*, then we cannot know what QoS settings to use for the message when publishing into another domain.

Instead, the proposed approach will do as best it can to ensure all messages make it across the bridge.
The bridge will evaluate the QoS settings of all publishers and modifiy the QoS settings of the bridges subscription and publisher
such that it matches the majority of the available publishers for a given topic.
For example, given publisher *A* and publisher *B* from the aforementioned scenario, the bridge would select a reliability setting of best effort since it matches with both publishers.

For *deadline* and *lifespan* policies, the bridge will use the max value given the policy value for all publishers.
For example, if a publisher *A* has a larger deadline than publisher *B* (and *A* and *B* publish to the same topic), then the bridge will use the deadline value of publisher *A*.

The bridge will decide on the QoS settings as soon as one or more publishers is available.
This means it is possible that publishers joining after the topic bridge is created may have compatibility issues, and fail to have their messages bridged.
For example, in the following scenario, messages coming from publisher *B* will not be bridged:

1. Publisher *A* for topic "chatter" with a QoS reliability setting of *reliable* is created.
2. A topic bridge is created for topic "chatter".
3. Publisher *B* for topic "chatter" with a QoS reliability setting of *best effort* is created.

We consider having multiple publishers with different QoS policies on the same topic to be rare in practice, and so do not try to handle the above scenario in the proposed approach.

### Remapping

Expand Down Expand Up @@ -181,3 +233,16 @@ Pros:
Cons:
- Requires generating/building SOSS type support for every interface being bridged.
- Slightly less performant due to extra data marshaling.

### Handling multiple publishers

It's possible that the bridge could dynamically adjust to new publishers joining the network for an existing topic bridge.
For example, the bridge could continuously monitor for new publishers and re-create it's subscription and publisher to reflect any necessary changes to QoS settings.
Though, it is possible that some messages may be missed during the re-creation of the subscription.

Pros:
- Better suited to systems where publishers with different QoS settings are created/destroyed during runtime

Cons:
- Not obvious how to handle missing messages during subscription re-creation
- Substantially more complex to implement.
101 changes: 79 additions & 22 deletions src/domain_bridge/domain_bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <memory>
#include <sstream>
#include <string>
#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>
Expand All @@ -28,13 +29,15 @@
#include "domain_bridge/topic_bridge.hpp"
#include "domain_bridge/topic_bridge_options.hpp"

#include "rclcpp/rclcpp.hpp"
#include "rclcpp/executor.hpp"
#include "rcutils/logging_macros.h"
#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/rclcpp.hpp"
#include "rosbag2_cpp/typesupport_helpers.hpp"
#include "rmw/types.h"

#include "generic_publisher.hpp"
#include "generic_subscription.hpp"
#include "wait_for_qos_handler.hpp"

namespace domain_bridge
{
Expand All @@ -47,6 +50,8 @@ class DomainBridgeImpl
using TopicBridgeMap = std::map<
TopicBridge,
std::pair<std::shared_ptr<GenericPublisher>, std::shared_ptr<GenericSubscription>>>;
using TypesupportMap = std::unordered_map<
std::string, std::shared_ptr<rcpputils::SharedLibrary>>;

explicit DomainBridgeImpl(const DomainBridgeOptions & options)
: options_(options)
Expand Down Expand Up @@ -90,17 +95,29 @@ class DomainBridgeImpl
return domain_id_node_pair->second;
}

/// Load typesupport library into a cache.
void load_typesupport_library(std::string type)
{
if (loaded_typesupports_.find(type) != loaded_typesupports_.end()) {
// Typesupport library already loaded
return;
}
loaded_typesupports_[type] = rosbag2_cpp::get_typesupport_library(
type, "rosidl_typesupport_cpp");
}

std::shared_ptr<GenericPublisher> create_publisher(
rclcpp::Node::SharedPtr node,
const std::string & topic_name,
const rclcpp::QoS & qos,
const rosidl_message_type_support_t & typesupport_handle,
rclcpp::CallbackGroup::SharedPtr group)
{
auto publisher = std::make_shared<GenericPublisher>(
node->get_node_base_interface().get(),
typesupport_handle,
topic_name,
rclcpp::QoS(10));
qos);
node->get_node_topics_interface()->add_publisher(publisher, std::move(group));
return publisher;
}
Expand All @@ -109,6 +126,7 @@ class DomainBridgeImpl
rclcpp::Node::SharedPtr node,
std::shared_ptr<GenericPublisher> publisher,
const std::string & topic_name,
const rclcpp::QoS & qos,
const rosidl_message_type_support_t & typesupport_handle,
rclcpp::CallbackGroup::SharedPtr group)
{
Expand All @@ -117,7 +135,7 @@ class DomainBridgeImpl
node->get_node_base_interface().get(),
typesupport_handle,
topic_name,
rclcpp::QoS(10),
qos,
[publisher](std::shared_ptr<rclcpp::SerializedMessage> msg) {
// Publish message into the other domain
auto serialized_data_ptr = std::make_shared<rcl_serialized_message_t>(
Expand All @@ -130,13 +148,20 @@ class DomainBridgeImpl

void bridge_topic(
const TopicBridge & topic_bridge,
const TopicBridgeOptions & options)
const TopicBridgeOptions & topic_options)
{
const std::string & topic = topic_bridge.topic_name;
// Validate topic name
const std::string & topic = rclcpp::expand_topic_or_service_name(
topic_bridge.topic_name, options_.name(), "/");

const std::string & type = topic_bridge.type_name;
const std::size_t & from_domain_id = topic_bridge.from_domain_id;
const std::size_t & to_domain_id = topic_bridge.to_domain_id;

// Validate type name by loading library support (if not already loaded)
// Front-loading let's us fail early on invalid type names
load_typesupport_library(type);

// Check if already bridged
if (bridged_topics_.find(topic_bridge) != bridged_topics_.end()) {
std::cerr << "Topic '" << topic << "' with type '" << type << "'" <<
Expand All @@ -145,25 +170,51 @@ class DomainBridgeImpl
return;
}

// Create a null entry to avoid duplicate bridges
bridged_topics_[topic_bridge] = {nullptr, nullptr};
jacobperron marked this conversation as resolved.
Show resolved Hide resolved

rclcpp::Node::SharedPtr from_domain_node = get_node_for_domain(from_domain_id);
rclcpp::Node::SharedPtr to_domain_node = get_node_for_domain(to_domain_id);

// Get typesupport
auto typesupport_library = rosbag2_cpp::get_typesupport_library(
type, "rosidl_typesupport_cpp");
auto typesupport_handle = rosbag2_cpp::get_typesupport_handle(
type, "rosidl_typesupport_cpp", typesupport_library);

// Create publisher for the 'to_domain'
// The publisher should be created first so it is available to the subscription callback
auto publisher = this->create_publisher(
to_domain_node, topic, *typesupport_handle, options.callback_group());

// Create subscription for the 'from_domain'
auto subscription = this->create_subscription(
from_domain_node, publisher, topic, *typesupport_handle, options.callback_group());

bridged_topics_[topic_bridge] = {publisher, subscription};
// Register a callback to be triggered when QoS settings are available for one or more
// publishers on the 'from' side of the bridge
// The callback may be triggered immediately if a publisher is available
auto create_bridge =
[this, topic, topic_bridge, topic_options, from_domain_node, to_domain_node]
(const QosMatchInfo & qos_match)
{
const std::string & type = topic_bridge.type_name;

// Print any match warnings
for (const auto & warning : qos_match.warnings) {
std::cerr << warning << std::endl;
}

// Get typesupport handle
auto typesupport_handle = rosbag2_cpp::get_typesupport_handle(
type, "rosidl_typesupport_cpp", loaded_typesupports_.at(type));

// Create publisher for the 'to_domain'
// The publisher should be created first so it is available to the subscription callback
auto publisher = this->create_publisher(
to_domain_node,
topic, qos_match.qos,
*typesupport_handle,
topic_options.callback_group());

// Create subscription for the 'from_domain'
auto subscription = this->create_subscription(
from_domain_node,
publisher,
topic,
qos_match.qos,
*typesupport_handle,
topic_options.callback_group());

this->bridged_topics_[topic_bridge] = {publisher, subscription};
};
wait_for_qos_handler_.register_on_publisher_qos_ready_callback(
topic, from_domain_node, create_bridge);
}

void add_to_executor(rclcpp::Executor & executor)
Expand All @@ -189,6 +240,12 @@ class DomainBridgeImpl

/// Set of bridged topics
TopicBridgeMap bridged_topics_;

/// Cache of typesupport libraries
TypesupportMap loaded_typesupports_;

/// QoS event handler
WaitForQosHandler wait_for_qos_handler_;
}; // class DomainBridgeImpl

DomainBridge::DomainBridge(const DomainBridgeOptions & options)
Expand Down
Loading