Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adaptive playback qos based on recorded metadata #364

Merged
merged 7 commits into from
Apr 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,12 @@ function(create_tests_for_rmw_implementation)
${SKIP_TEST})

rosbag2_transport_add_gmock(test_play
src/rosbag2_transport/qos.cpp
test/rosbag2_transport/test_play.cpp
${SKIP_TEST}
INCLUDE_DIRS $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/src/rosbag2_transport>
LINK_LIBS rosbag2_transport
AMENT_DEPS test_msgs rosbag2_test_common)
AMENT_DEPS test_msgs rosbag2_test_common
${SKIP_TEST})

rosbag2_transport_add_gmock(test_play_loop
test/rosbag2_transport/test_play_loop.cpp
Expand Down
23 changes: 15 additions & 8 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

#include "rcl/graph.h"

Expand All @@ -44,22 +45,28 @@ namespace
* Determine which QoS to offer for a topic.
* The priority of the profile selected is:
* 1. The override specified in play_options (if one exists for the topic).
* 2. The default RMW QoS profile.
* 2. A profile automatically adapted to the recorded QoS profiles of publishers on the topic.
*
* \param topic_name The full name of the topic, with namespace (ex. /arm/joint_status).
* \param topic_qos_profile_overrides A map of topic to QoS profile overrides.
* @return The QoS profile to be used for subscribing.
*/
rclcpp::QoS publisher_qos_for_topic(
const std::string & topic_name,
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides)
const rosbag2_storage::TopicMetadata & topic,
const std::unordered_map<std::string, rclcpp::QoS> & topic_qos_profile_overrides)
{
auto qos_it = topic_qos_profile_overrides.find(topic_name);
using rosbag2_transport::Rosbag2QoS;
auto qos_it = topic_qos_profile_overrides.find(topic.name);
if (qos_it != topic_qos_profile_overrides.end()) {
ROSBAG2_TRANSPORT_LOG_INFO_STREAM("Overriding QoS profile for topic " << topic_name);
return rosbag2_transport::Rosbag2QoS{qos_it->second};
ROSBAG2_TRANSPORT_LOG_INFO_STREAM("Overriding QoS profile for topic " << topic.name);
return Rosbag2QoS{qos_it->second};
} else if (topic.offered_qos_profiles.empty()) {
return Rosbag2QoS{};
}
return rosbag2_transport::Rosbag2QoS{};

const auto profiles_yaml = YAML::Load(topic.offered_qos_profiles);
const auto offered_qos_profiles = profiles_yaml.as<std::vector<Rosbag2QoS>>();
return Rosbag2QoS::adapt_offer_to_recorded_offers(topic.name, offered_qos_profiles);
}
} // namespace

Expand Down Expand Up @@ -189,7 +196,7 @@ void Player::prepare_publishers(const PlayOptions & options)

auto topics = reader_->get_all_topics_and_types();
for (const auto & topic : topics) {
auto topic_qos = publisher_qos_for_topic(topic.name, topic_qos_profile_overrides_);
auto topic_qos = publisher_qos_for_topic(topic, topic_qos_profile_overrides_);
publishers_.insert(
std::make_pair(
topic.name, rosbag2_transport_->create_generic_publisher(
Expand Down
157 changes: 121 additions & 36 deletions rosbag2_transport/test/rosbag2_transport/test_play.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
#include "test_msgs/msg/basic_types.hpp"
#include "test_msgs/message_fixtures.hpp"

#include "qos.hpp"

#include "rosbag2_play_test_fixture.hpp"
#include "rosbag2_transport_test_fixture.hpp"

using namespace ::testing; // NOLINT
using namespace rosbag2_transport; // NOLINT
Expand Down Expand Up @@ -205,35 +208,60 @@ class RosBag2PlayQosOverrideTestFixture : public RosBag2PlayTestFixture
RosBag2PlayQosOverrideTestFixture()
: RosBag2PlayTestFixture()
{
// Because this test only cares about compatibility (receiving any messages at all)
}

void initialize(const std::vector<rosbag2_transport::Rosbag2QoS> & offered_qos)
{
// Because these tests only cares about compatibility (receiving any messages at all)
// We publish one more message than we expect to receive, to avoid caring about
// shutdown edge behaviors that are not explicitly being tested here.
const size_t num_msgs_to_publish = num_msgs_to_wait_for_ + 1;
topic_timestamps_ms_.reserve(num_msgs_to_publish);
messages_.reserve(num_msgs_to_publish);
for (size_t i = 0; i < num_msgs_to_publish; i++) {
topic_timestamps_ms_.push_back(start_time_ms_ + message_spacing_ms_ * i);
const auto timestamp = start_time_ms_ + message_spacing_ms_ * i;
messages_.push_back(serialize_test_message(topic_name_, timestamp, basic_msg_));
}

messages_.reserve(topic_timestamps_ms_.size());
for (const auto topic_timestamp : topic_timestamps_ms_) {
messages_.push_back(serialize_test_message(topic_name_, topic_timestamp, basic_msg_));
std::string serialized_offered_qos = "";
if (!offered_qos.empty()) {
YAML::Node offered_qos_yaml;
for (const auto & profile : offered_qos) {
offered_qos_yaml.push_back(profile);
}
serialized_offered_qos = YAML::Dump(offered_qos_yaml);
}
topic_types_.push_back(
{topic_name_, msg_type_, "" /*serialization_format*/, serialized_offered_qos});

auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
prepared_mock_reader->prepare(messages_, topic_types_);
reader_ = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));
}

template<typename Duration>
void play_and_wait(Duration timeout, bool expect_timeout = false)
{
auto await_received_messages = sub_->spin_subscriptions();
Rosbag2Transport transport{reader_, writer_, info_};
transport.play(storage_options_, play_options_);
const auto result = await_received_messages.wait_for(timeout);
// Must EXPECT, can't ASSERT because transport needs to be shutdown if timed out
if (expect_timeout) {
EXPECT_EQ(result, std::future_status::timeout);
} else {
EXPECT_NE(result, std::future_status::timeout);
}
transport.shutdown();
}

const std::string topic_name_{"/test_topic"};
const std::string msg_type_{"test_msgs/BasicTypes"};
const size_t num_msgs_to_wait_for_{3};
// Receiving _any_ messages means we've confirmed compatibility in these tests
const size_t num_msgs_to_wait_for_{1};
test_msgs::msg::BasicTypes::SharedPtr basic_msg_{get_messages_basic_types()[0]};
const std::vector<rosbag2_storage::TopicMetadata> topic_types_{
{topic_name_, msg_type_, "" /*serialization_format*/, "" /*offered_qos_profiles*/}
};
std::vector<rosbag2_storage::TopicMetadata> topic_types_{};
const int64_t start_time_ms_{500};
const int64_t message_spacing_ms_{200};
std::vector<int64_t> topic_timestamps_ms_{};
std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages_;
};

Expand All @@ -246,27 +274,20 @@ TEST_F(RosBag2PlayQosOverrideTestFixture, topic_qos_profiles_overridden)
// to receive messages.
const auto qos_request = rclcpp::QoS{rclcpp::KeepAll()}.reliable().transient_local();
const auto qos_playback_override = rclcpp::QoS{rclcpp::KeepAll()}.reliable().transient_local();

const auto topic_qos_profile_overrides = std::unordered_map<std::string, rclcpp::QoS>{
std::pair<std::string, rclcpp::QoS>{topic_name_, qos_playback_override},
};
// This should normally take less than 1s - just making it shorter than 60s default
const auto timeout = 5s;

initialize({});

sub_->add_subscription<test_msgs::msg::BasicTypes>(
topic_name_, num_msgs_to_wait_for_, qos_request);
auto await_received_messages = sub_->spin_subscriptions();
play_options_.topic_qos_profile_overrides = topic_qos_profile_overrides;

Rosbag2Transport rosbag2_transport{reader_, writer_, info_};
rosbag2_transport.play(storage_options_, play_options_);

// This should normally take less than 1s - just making it shorter than 60s default
const auto future_result = await_received_messages.wait_for(5s);
EXPECT_NE(future_result, std::future_status::timeout);
rosbag2_transport.shutdown();

const auto received_messages =
sub_->get_received_messages<test_msgs::msg::BasicTypes>(topic_name_);
EXPECT_FALSE(received_messages.empty());
// Fails if times out
play_and_wait(timeout);
}

TEST_F(RosBag2PlayQosOverrideTestFixture, topic_qos_profiles_overridden_incompatible)
Expand All @@ -277,25 +298,89 @@ TEST_F(RosBag2PlayQosOverrideTestFixture, topic_qos_profiles_overridden_incompat
// which should not be a compatible offer and therefore we should receive no messages.
const auto qos_request = rclcpp::QoS{rclcpp::KeepAll()}.reliable();
const auto qos_playback_override = rclcpp::QoS{rclcpp::KeepAll()}.best_effort();

const auto topic_qos_profile_overrides = std::unordered_map<std::string, rclcpp::QoS>{
std::pair<std::string, rclcpp::QoS>{topic_name_, qos_playback_override},
};
play_options_.topic_qos_profile_overrides = topic_qos_profile_overrides;
// If any messages were going to come in, it should happen in under 1s even in slow scenarios
const auto timeout = 3s;
emersonknapp marked this conversation as resolved.
Show resolved Hide resolved

initialize({});

sub_->add_subscription<test_msgs::msg::BasicTypes>(
topic_name_, num_msgs_to_wait_for_, qos_request);
auto await_received_messages = sub_->spin_subscriptions();
play_options_.topic_qos_profile_overrides = topic_qos_profile_overrides;

Rosbag2Transport rosbag2_transport{reader_, writer_, info_};
rosbag2_transport.play(storage_options_, play_options_);
// Fails if it doesn't time out
play_and_wait(timeout, true /* expect timeout */);
}

TEST_F(RosBag2PlayQosOverrideTestFixture, playback_uses_recorded_transient_local_profile)
{
// In this test, we subscribe requesting DURABILITY_TRANSIENT_LOCAL.
// The bag metadata has this recorded for the original Publisher,
// so playback's offer should be compatible (whereas the default offer would not be)
const auto transient_local_profile = Rosbag2QoS{Rosbag2QoS{}.transient_local()};
// This should normally take less than 1s - just making it shorter than 60s default
const auto timeout = 5s;

initialize({transient_local_profile});

sub_->add_subscription<test_msgs::msg::BasicTypes>(
topic_name_, num_msgs_to_wait_for_, transient_local_profile);

// Fails if times out
play_and_wait(timeout);
}

TEST_F(RosBag2PlayQosOverrideTestFixture, playback_uses_recorded_deadline)
{
// By default, QoS profiles use "unspecified/infinite" offers for duration-based policies
// The subscription in this test requests a finite Deadline, and by receiving messages
// we know that the playback has used the recorded finite Deadline duration.

// The publisher is offering 2Hz. The subscription requests 1Hz, which is less strict of a
// requirement, so they are compatible.
const rclcpp::Duration request_deadline{1s};
const rclcpp::Duration offer_deadline{500ms};
const auto request_profile = Rosbag2QoS{}.deadline(request_deadline);
const auto offer_profile = Rosbag2QoS{Rosbag2QoS{}.deadline(offer_deadline)};
const auto timeout = 5s;

initialize({offer_profile});
sub_->add_subscription<test_msgs::msg::BasicTypes>(
topic_name_, num_msgs_to_wait_for_, request_profile);
play_and_wait(timeout);
}

using namespace std::chrono_literals;
const auto future_result = await_received_messages.wait_for(3s);
EXPECT_EQ(future_result, std::future_status::timeout);
TEST_F(RosBag2PlayQosOverrideTestFixture, override_has_precedence_over_recorded)
{
// In this test, we show that the playback prefers the user override to the recorded values.
// The subscription requests a Liveliness lease_duration that is shorter than the original
// recorded publisher offered, so no messages should be passed.
// However, the override to the publisher offers a shorter duration than the request, so
// it should now be compatible.
const rclcpp::Duration liveliness_request{500ms};
const rclcpp::Duration recorded_liveliness_offer{1000ms};
const rclcpp::Duration override_liveliness_offer{250ms};
ASSERT_LT(liveliness_request, recorded_liveliness_offer);
ASSERT_LT(override_liveliness_offer, liveliness_request);
const auto request_profile = Rosbag2QoS{}.liveliness_lease_duration(liveliness_request);
const auto recorded_offer_profile = Rosbag2QoS{Rosbag2QoS{}.liveliness_lease_duration(
recorded_liveliness_offer)};
const auto override_offer_profile = Rosbag2QoS{Rosbag2QoS{}.liveliness_lease_duration(
override_liveliness_offer)};
const auto topic_qos_profile_overrides = std::unordered_map<std::string, rclcpp::QoS>{
std::pair<std::string, rclcpp::QoS>{topic_name_, override_offer_profile},
};
// This should normally take less than 1s - just making it shorter than 60s default
const auto timeout = 5s;

initialize({recorded_offer_profile});

sub_->add_subscription<test_msgs::msg::BasicTypes>(
topic_name_, num_msgs_to_wait_for_, request_profile);
play_options_.topic_qos_profile_overrides = topic_qos_profile_overrides;

rosbag2_transport.shutdown();
const auto received_messages =
sub_->get_received_messages<test_msgs::msg::BasicTypes>(topic_name_);
EXPECT_EQ(received_messages.size(), 0u);
// Fails if times out
play_and_wait(timeout);
}