Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move qos utilities to their own compilation unit #379

Merged
merged 8 commits into from
Apr 17, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/qos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <string>
#include <vector>

#include "rosbag2_transport/logging.hpp"

#include "qos.hpp"

namespace YAML
Expand Down Expand Up @@ -68,3 +73,126 @@ bool convert<rosbag2_transport::Rosbag2QoS>::decode(
return true;
}
} // namespace YAML

namespace rosbag2_transport
{
Rosbag2QoS Rosbag2QoS::adapt_request_to_offers(
const std::string & topic_name, const std::vector<rclcpp::TopicEndpointInfo> & endpoints)
{
static const Rosbag2QoS default_request{};
if (endpoints.empty()) {
return default_request;
}
size_t num_endpoints = endpoints.size();
size_t reliability_reliable_endpoints_count = 0;
size_t durability_transient_local_endpoints_count = 0;
for (const auto & endpoint : endpoints) {
const auto & profile = endpoint.qos_profile().get_rmw_qos_profile();
if (profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
reliability_reliable_endpoints_count++;
}
if (profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
durability_transient_local_endpoints_count++;
}
}

// We set policies in order as defined in rmw_qos_profile_t
Rosbag2QoS request_qos = default_request;
emersonknapp marked this conversation as resolved.
Show resolved Hide resolved
// Policy: history, depth
// History does not affect compatibility

// Policy: reliability
if (reliability_reliable_endpoints_count == num_endpoints) {
request_qos.reliable();
} else {
if (reliability_reliable_endpoints_count > 0) {
ROSBAG2_TRANSPORT_LOG_WARN_STREAM(
"Some, but not all, publishers on topic \"" << topic_name << "\" "
"are offering RMW_QOS_POLICY_RELIABILITY_RELIABLE. "
"Falling back to RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT "
"as it will connect to all publishers. "
"Some messages from Reliable publishers could be dropped.");
}
request_qos.best_effort();
}

// Policy: durability
// If all publishers offer transient_local, we can request it and receive latched messages
if (durability_transient_local_endpoints_count == num_endpoints) {
request_qos.transient_local();
} else {
if (durability_transient_local_endpoints_count > 0) {
ROSBAG2_TRANSPORT_LOG_WARN_STREAM(
"Some, but not all, publishers on topic \"" << topic_name << "\" "
"are offering RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL. "
"Falling back to RMW_QOS_POLICY_DURABILITY_VOLATILE "
"as it will connect to all publishers. "
"Previously-published latched messages will not be retrieved.");
}
request_qos.durability_volatile();
}
// Policy: deadline
// Deadline does not affect delivery of messages,
// and we do not record Deadline"Missed events.
// We can always use unspecified deadline, which will be compatible with all publishers.

// Policy: lifespan
// Lifespan does not affect compatibiliy

// Policy: liveliness, liveliness_lease_duration
// Liveliness does not affect delivery of messages,
// and we do not record LivelinessChanged events.
// We can always use unspecified liveliness, which will be compatible with all publishers.
return request_qos;
}

namespace
{
bool operator==(const rmw_time_t & lhs, const rmw_time_t & rhs)
{
return lhs.sec == rhs.sec && lhs.nsec == rhs.nsec;
}

bool all_profiles_effectively_same(const std::vector<Rosbag2QoS> & profiles)
emersonknapp marked this conversation as resolved.
Show resolved Hide resolved
{
auto iterator = profiles.begin();
const auto p_ref = iterator->get_rmw_qos_profile();
iterator++;
for (; iterator != profiles.end(); iterator++) {
const auto p_next = iterator->get_rmw_qos_profile();
bool compatibility_equals_previous = (
// excluding history
p_ref.reliability == p_next.reliability &&
p_ref.durability == p_next.durability &&
p_ref.deadline == p_next.deadline &&
// excluding lifespan
p_ref.liveliness == p_next.liveliness &&
p_ref.liveliness_lease_duration == p_next.liveliness_lease_duration
);
if (!compatibility_equals_previous) {
return false;
}
}
return true;
}
} // unnamed namespace

Rosbag2QoS Rosbag2QoS::adapt_offer_to_recorded_offers(
const std::string & topic_name, const std::vector<Rosbag2QoS> & profiles)
{
static const Rosbag2QoS default_offer{};
if (profiles.empty()) {
return default_offer;
emersonknapp marked this conversation as resolved.
Show resolved Hide resolved
}
if (all_profiles_effectively_same(profiles)) {
auto result = profiles[0];
emersonknapp marked this conversation as resolved.
Show resolved Hide resolved
return result.default_history();
}

ROSBAG2_TRANSPORT_LOG_WARN_STREAM(
"Not all original publishers on topic " << topic_name << " offered the same QoS profiles. "
"Rosbag2 cannot yet choose an adapted profile to offer for this mixed case. "
"Falling back to the rosbag2_transport default publisher offer.");
return default_offer;
}
} // namespace rosbag2_transport
28 changes: 28 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/qos.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
#ifndef ROSBAG2_TRANSPORT__QOS_HPP_
#define ROSBAG2_TRANSPORT__QOS_HPP_

#include <string>
#include <vector>

#include "rclcpp/node_interfaces/node_graph_interface.hpp"
#include "rclcpp/qos.hpp"

#ifdef _WIN32
Expand Down Expand Up @@ -46,6 +50,30 @@ class Rosbag2QoS : public rclcpp::QoS
keep_last(rmw_qos_profile_default.depth);
return *this;
}

// Create an adaptive QoS profile to use for subscribing to a set of offers from publishers.
/**
* - Uses rosbag2_transport defaults for History since they do not affect compatibility.
* - Adapts Durability and Reliability, falling back to the least strict publisher when there
* is a mixed offer. This behavior errs on the side of forming connections with all publishers.
* - Does not specify Lifespan, Deadline, or Liveliness to be maximally compatible, because
* these policies do not affect message delivery.
*/
static Rosbag2QoS adapt_request_to_offers(
const std::string & topic_name,
const std::vector<rclcpp::TopicEndpointInfo> & endpoints);

// Create a QoS profile to offer for playback.
/**
* This logic exists because rosbag2 does not record on a per-publisher basis, so we try to
* get as close as possible to the original system's behavior, given a single publisher.
* If all profiles are the same (excepting History & Lifespan, which are purely local),
* that exact value is returned.
* Otherwise, fall back to the rosbag2 default and emit a warning.
*/
static Rosbag2QoS adapt_offer_to_recorded_offers(
const std::string & topic_name,
const std::vector<Rosbag2QoS> & profiles);
};
} // namespace rosbag2_transport

Expand Down
72 changes: 2 additions & 70 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,77 +201,9 @@ rclcpp::QoS Recorder::subscription_qos_for_topic(const std::string & topic_name)
if (topic_qos_profile_overrides_.count(topic_name)) {
ROSBAG2_TRANSPORT_LOG_INFO_STREAM("Overriding subscription profile for " << topic_name);
return topic_qos_profile_overrides_.at(topic_name);
} else {
return adapt_qos_to_publishers(topic_name);
}
return adapt_qos_to_publishers(topic_name);
}

rclcpp::QoS Recorder::adapt_qos_to_publishers(const std::string & topic_name) const
{
auto endpoints = node_->get_publishers_info_by_topic(topic_name);
size_t num_endpoints = endpoints.size();
size_t reliability_reliable_endpoints_count = 0;
size_t durability_transient_local_endpoints_count = 0;
for (const auto & endpoint : endpoints) {
const auto & profile = endpoint.qos_profile().get_rmw_qos_profile();
if (profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
reliability_reliable_endpoints_count++;
}
if (profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
durability_transient_local_endpoints_count++;
}
}

// We set policies in order as defined in rmw_qos_profile_t
Rosbag2QoS request_qos;
// Policy: history, depth
// History does not affect compatibility
request_qos.default_history();

// Policy: reliability
if (reliability_reliable_endpoints_count == num_endpoints) {
request_qos.reliable();
} else {
if (reliability_reliable_endpoints_count > 0) {
ROSBAG2_TRANSPORT_LOG_WARN_STREAM(
"Some, but not all, publishers on topic \"" << topic_name << "\" "
"are offering RMW_QOS_POLICY_RELIABILITY_RELIABLE. "
"Falling back to RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT "
"as it will connect to all publishers. "
"Some messages from Reliable publishers could be dropped.");
}
request_qos.best_effort();
}

// Policy: durability
// If all publishers offer transient_local, we can request it and receive latched messages
if (durability_transient_local_endpoints_count == num_endpoints) {
request_qos.transient_local();
} else {
if (durability_transient_local_endpoints_count > 0) {
ROSBAG2_TRANSPORT_LOG_WARN_STREAM(
"Some, but not all, publishers on topic \"" << topic_name << "\" "
"are offering RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL. "
"Falling back to RMW_QOS_POLICY_DURABILITY_VOLATILE "
"as it will connect to all publishers. "
"Previously-published latched messages will not be retrieved.");
}
request_qos.durability_volatile();
}
// Policy: deadline
// Deadline does not affect delivery of messages,
// and we do not record Deadline"Missed events.
// We can always use unspecified deadline, which will be compatible with all publishers.

// Policy: lifespan
// Lifespan does not affect compatibiliy

// Policy: liveliness, liveliness_lease_duration
// Liveliness does not affect delivery of messages,
// and we do not record LivelinessChanged events.
// We can always use unspecified liveliness, which will be compatible with all publishers.
return request_qos;
return Rosbag2QoS::adapt_request_to_offers(
topic_name, node_->get_publishers_info_by_topic(topic_name));
}

void Recorder::warn_if_new_qos_for_subscribed_topic(const std::string & topic_name)
Expand Down
12 changes: 2 additions & 10 deletions rosbag2_transport/src/rosbag2_transport/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,13 @@ class Recorder
/**
* Find the QoS profile that should be used for subscribing.
*
* Profiles are prioritized by:
* 1. The override specified in the record_options, if one exists for the topic.
* 2. Adapted subscription via `adapt_qos_to_publishers`
* Uses the override from record_options, if it is specified for this topic.
* Otherwise, falls back to Rosbag2QoS::adapt_request_to_offers
*
* \param topic_name The full name of the topic, with namespace (ex. /arm/joint_status).
* \return The QoS profile to be used for subscribing.
*/
rclcpp::QoS subscription_qos_for_topic(const std::string & topic_name) const;
/**
* Try to subscribe using publishers' offered QoS policies.
*
* Fall back to sensible defaults when we can't adapt robustly,
* erring in favor of creating compatible connections.
*/
rclcpp::QoS adapt_qos_to_publishers(const std::string & topic_name) const;

// Serialize all currently offered QoS profiles for a topic into a YAML list.
std::string serialized_offered_qos_profiles_for_topic(const std::string & topic_name);
Expand Down
Loading