Skip to content

Commit

Permalink
Adaptive playback qos based on recorded metadata (#364)
Browse files Browse the repository at this point in the history
* Adapt player publisher profile to recorded

Signed-off-by: Emerson Knapp <emerson.b.knapp@gmail.com>
  • Loading branch information
emersonknapp authored Apr 21, 2020
1 parent 12c0cc6 commit d82dd9f
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 46 deletions.
6 changes: 4 additions & 2 deletions rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,12 @@ function(create_tests_for_rmw_implementation)
${SKIP_TEST})

rosbag2_transport_add_gmock(test_play
src/rosbag2_transport/qos.cpp
test/rosbag2_transport/test_play.cpp
${SKIP_TEST}
INCLUDE_DIRS $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/src/rosbag2_transport>
LINK_LIBS rosbag2_transport
AMENT_DEPS test_msgs rosbag2_test_common)
AMENT_DEPS test_msgs rosbag2_test_common
${SKIP_TEST})

rosbag2_transport_add_gmock(test_play_loop
test/rosbag2_transport/test_play_loop.cpp
Expand Down
23 changes: 15 additions & 8 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

#include "rcl/graph.h"

Expand All @@ -44,22 +45,28 @@ namespace
* Determine which QoS to offer for a topic.
* The priority of the profile selected is:
* 1. The override specified in play_options (if one exists for the topic).
* 2. The default RMW QoS profile.
* 2. A profile automatically adapted to the recorded QoS profiles of publishers on the topic.
*
* \param topic_name The full name of the topic, with namespace (ex. /arm/joint_status).
* \param topic_qos_profile_overrides A map of topic to QoS profile overrides.
* @return The QoS profile to be used for subscribing.
*/
rclcpp::QoS publisher_qos_for_topic(
const std::string & topic_name,
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides)
const rosbag2_storage::TopicMetadata & topic,
const std::unordered_map<std::string, rclcpp::QoS> & topic_qos_profile_overrides)
{
auto qos_it = topic_qos_profile_overrides.find(topic_name);
using rosbag2_transport::Rosbag2QoS;
auto qos_it = topic_qos_profile_overrides.find(topic.name);
if (qos_it != topic_qos_profile_overrides.end()) {
ROSBAG2_TRANSPORT_LOG_INFO_STREAM("Overriding QoS profile for topic " << topic_name);
return rosbag2_transport::Rosbag2QoS{qos_it->second};
ROSBAG2_TRANSPORT_LOG_INFO_STREAM("Overriding QoS profile for topic " << topic.name);
return Rosbag2QoS{qos_it->second};
} else if (topic.offered_qos_profiles.empty()) {
return Rosbag2QoS{};
}
return rosbag2_transport::Rosbag2QoS{};

const auto profiles_yaml = YAML::Load(topic.offered_qos_profiles);
const auto offered_qos_profiles = profiles_yaml.as<std::vector<Rosbag2QoS>>();
return Rosbag2QoS::adapt_offer_to_recorded_offers(topic.name, offered_qos_profiles);
}
} // namespace

Expand Down Expand Up @@ -189,7 +196,7 @@ void Player::prepare_publishers(const PlayOptions & options)

auto topics = reader_->get_all_topics_and_types();
for (const auto & topic : topics) {
auto topic_qos = publisher_qos_for_topic(topic.name, topic_qos_profile_overrides_);
auto topic_qos = publisher_qos_for_topic(topic, topic_qos_profile_overrides_);
publishers_.insert(
std::make_pair(
topic.name, rosbag2_transport_->create_generic_publisher(
Expand Down
157 changes: 121 additions & 36 deletions rosbag2_transport/test/rosbag2_transport/test_play.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
#include "test_msgs/msg/basic_types.hpp"
#include "test_msgs/message_fixtures.hpp"

#include "qos.hpp"

#include "rosbag2_play_test_fixture.hpp"
#include "rosbag2_transport_test_fixture.hpp"

using namespace ::testing; // NOLINT
using namespace rosbag2_transport; // NOLINT
Expand Down Expand Up @@ -205,35 +208,60 @@ class RosBag2PlayQosOverrideTestFixture : public RosBag2PlayTestFixture
RosBag2PlayQosOverrideTestFixture()
: RosBag2PlayTestFixture()
{
// Because this test only cares about compatibility (receiving any messages at all)
}

void initialize(const std::vector<rosbag2_transport::Rosbag2QoS> & offered_qos)
{
// Because these tests only cares about compatibility (receiving any messages at all)
// We publish one more message than we expect to receive, to avoid caring about
// shutdown edge behaviors that are not explicitly being tested here.
const size_t num_msgs_to_publish = num_msgs_to_wait_for_ + 1;
topic_timestamps_ms_.reserve(num_msgs_to_publish);
messages_.reserve(num_msgs_to_publish);
for (size_t i = 0; i < num_msgs_to_publish; i++) {
topic_timestamps_ms_.push_back(start_time_ms_ + message_spacing_ms_ * i);
const auto timestamp = start_time_ms_ + message_spacing_ms_ * i;
messages_.push_back(serialize_test_message(topic_name_, timestamp, basic_msg_));
}

messages_.reserve(topic_timestamps_ms_.size());
for (const auto topic_timestamp : topic_timestamps_ms_) {
messages_.push_back(serialize_test_message(topic_name_, topic_timestamp, basic_msg_));
std::string serialized_offered_qos = "";
if (!offered_qos.empty()) {
YAML::Node offered_qos_yaml;
for (const auto & profile : offered_qos) {
offered_qos_yaml.push_back(profile);
}
serialized_offered_qos = YAML::Dump(offered_qos_yaml);
}
topic_types_.push_back(
{topic_name_, msg_type_, "" /*serialization_format*/, serialized_offered_qos});

auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
prepared_mock_reader->prepare(messages_, topic_types_);
reader_ = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));
}

template<typename Duration>
void play_and_wait(Duration timeout, bool expect_timeout = false)
{
auto await_received_messages = sub_->spin_subscriptions();
Rosbag2Transport transport{reader_, writer_, info_};
transport.play(storage_options_, play_options_);
const auto result = await_received_messages.wait_for(timeout);
// Must EXPECT, can't ASSERT because transport needs to be shutdown if timed out
if (expect_timeout) {
EXPECT_EQ(result, std::future_status::timeout);
} else {
EXPECT_NE(result, std::future_status::timeout);
}
transport.shutdown();
}

const std::string topic_name_{"/test_topic"};
const std::string msg_type_{"test_msgs/BasicTypes"};
const size_t num_msgs_to_wait_for_{3};
// Receiving _any_ messages means we've confirmed compatibility in these tests
const size_t num_msgs_to_wait_for_{1};
test_msgs::msg::BasicTypes::SharedPtr basic_msg_{get_messages_basic_types()[0]};
const std::vector<rosbag2_storage::TopicMetadata> topic_types_{
{topic_name_, msg_type_, "" /*serialization_format*/, "" /*offered_qos_profiles*/}
};
std::vector<rosbag2_storage::TopicMetadata> topic_types_{};
const int64_t start_time_ms_{500};
const int64_t message_spacing_ms_{200};
std::vector<int64_t> topic_timestamps_ms_{};
std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages_;
};

Expand All @@ -246,27 +274,20 @@ TEST_F(RosBag2PlayQosOverrideTestFixture, topic_qos_profiles_overridden)
// to receive messages.
const auto qos_request = rclcpp::QoS{rclcpp::KeepAll()}.reliable().transient_local();
const auto qos_playback_override = rclcpp::QoS{rclcpp::KeepAll()}.reliable().transient_local();

const auto topic_qos_profile_overrides = std::unordered_map<std::string, rclcpp::QoS>{
std::pair<std::string, rclcpp::QoS>{topic_name_, qos_playback_override},
};
// This should normally take less than 1s - just making it shorter than 60s default
const auto timeout = 5s;

initialize({});

sub_->add_subscription<test_msgs::msg::BasicTypes>(
topic_name_, num_msgs_to_wait_for_, qos_request);
auto await_received_messages = sub_->spin_subscriptions();
play_options_.topic_qos_profile_overrides = topic_qos_profile_overrides;

Rosbag2Transport rosbag2_transport{reader_, writer_, info_};
rosbag2_transport.play(storage_options_, play_options_);

// This should normally take less than 1s - just making it shorter than 60s default
const auto future_result = await_received_messages.wait_for(5s);
EXPECT_NE(future_result, std::future_status::timeout);
rosbag2_transport.shutdown();

const auto received_messages =
sub_->get_received_messages<test_msgs::msg::BasicTypes>(topic_name_);
EXPECT_FALSE(received_messages.empty());
// Fails if times out
play_and_wait(timeout);
}

TEST_F(RosBag2PlayQosOverrideTestFixture, topic_qos_profiles_overridden_incompatible)
Expand All @@ -277,25 +298,89 @@ TEST_F(RosBag2PlayQosOverrideTestFixture, topic_qos_profiles_overridden_incompat
// which should not be a compatible offer and therefore we should receive no messages.
const auto qos_request = rclcpp::QoS{rclcpp::KeepAll()}.reliable();
const auto qos_playback_override = rclcpp::QoS{rclcpp::KeepAll()}.best_effort();

const auto topic_qos_profile_overrides = std::unordered_map<std::string, rclcpp::QoS>{
std::pair<std::string, rclcpp::QoS>{topic_name_, qos_playback_override},
};
play_options_.topic_qos_profile_overrides = topic_qos_profile_overrides;
// If any messages were going to come in, it should happen in under 1s even in slow scenarios
const auto timeout = 3s;

initialize({});

sub_->add_subscription<test_msgs::msg::BasicTypes>(
topic_name_, num_msgs_to_wait_for_, qos_request);
auto await_received_messages = sub_->spin_subscriptions();
play_options_.topic_qos_profile_overrides = topic_qos_profile_overrides;

Rosbag2Transport rosbag2_transport{reader_, writer_, info_};
rosbag2_transport.play(storage_options_, play_options_);
// Fails if it doesn't time out
play_and_wait(timeout, true /* expect timeout */);
}

TEST_F(RosBag2PlayQosOverrideTestFixture, playback_uses_recorded_transient_local_profile)
{
// In this test, we subscribe requesting DURABILITY_TRANSIENT_LOCAL.
// The bag metadata has this recorded for the original Publisher,
// so playback's offer should be compatible (whereas the default offer would not be)
const auto transient_local_profile = Rosbag2QoS{Rosbag2QoS{}.transient_local()};
// This should normally take less than 1s - just making it shorter than 60s default
const auto timeout = 5s;

initialize({transient_local_profile});

sub_->add_subscription<test_msgs::msg::BasicTypes>(
topic_name_, num_msgs_to_wait_for_, transient_local_profile);

// Fails if times out
play_and_wait(timeout);
}

TEST_F(RosBag2PlayQosOverrideTestFixture, playback_uses_recorded_deadline)
{
// By default, QoS profiles use "unspecified/infinite" offers for duration-based policies
// The subscription in this test requests a finite Deadline, and by receiving messages
// we know that the playback has used the recorded finite Deadline duration.

// The publisher is offering 2Hz. The subscription requests 1Hz, which is less strict of a
// requirement, so they are compatible.
const rclcpp::Duration request_deadline{1s};
const rclcpp::Duration offer_deadline{500ms};
const auto request_profile = Rosbag2QoS{}.deadline(request_deadline);
const auto offer_profile = Rosbag2QoS{Rosbag2QoS{}.deadline(offer_deadline)};
const auto timeout = 5s;

initialize({offer_profile});
sub_->add_subscription<test_msgs::msg::BasicTypes>(
topic_name_, num_msgs_to_wait_for_, request_profile);
play_and_wait(timeout);
}

using namespace std::chrono_literals;
const auto future_result = await_received_messages.wait_for(3s);
EXPECT_EQ(future_result, std::future_status::timeout);
TEST_F(RosBag2PlayQosOverrideTestFixture, override_has_precedence_over_recorded)
{
// In this test, we show that the playback prefers the user override to the recorded values.
// The subscription requests a Liveliness lease_duration that is shorter than the original
// recorded publisher offered, so no messages should be passed.
// However, the override to the publisher offers a shorter duration than the request, so
// it should now be compatible.
const rclcpp::Duration liveliness_request{500ms};
const rclcpp::Duration recorded_liveliness_offer{1000ms};
const rclcpp::Duration override_liveliness_offer{250ms};
ASSERT_LT(liveliness_request, recorded_liveliness_offer);
ASSERT_LT(override_liveliness_offer, liveliness_request);
const auto request_profile = Rosbag2QoS{}.liveliness_lease_duration(liveliness_request);
const auto recorded_offer_profile = Rosbag2QoS{Rosbag2QoS{}.liveliness_lease_duration(
recorded_liveliness_offer)};
const auto override_offer_profile = Rosbag2QoS{Rosbag2QoS{}.liveliness_lease_duration(
override_liveliness_offer)};
const auto topic_qos_profile_overrides = std::unordered_map<std::string, rclcpp::QoS>{
std::pair<std::string, rclcpp::QoS>{topic_name_, override_offer_profile},
};
// This should normally take less than 1s - just making it shorter than 60s default
const auto timeout = 5s;

initialize({recorded_offer_profile});

sub_->add_subscription<test_msgs::msg::BasicTypes>(
topic_name_, num_msgs_to_wait_for_, request_profile);
play_options_.topic_qos_profile_overrides = topic_qos_profile_overrides;

rosbag2_transport.shutdown();
const auto received_messages =
sub_->get_received_messages<test_msgs::msg::BasicTypes>(topic_name_);
EXPECT_EQ(received_messages.size(), 0u);
// Fails if times out
play_and_wait(timeout);
}

0 comments on commit d82dd9f

Please sign in to comment.