diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index 762ed7ebbb..ebf9db7cdf 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -156,10 +156,12 @@ function(create_tests_for_rmw_implementation) ${SKIP_TEST}) rosbag2_transport_add_gmock(test_play + src/rosbag2_transport/qos.cpp test/rosbag2_transport/test_play.cpp - ${SKIP_TEST} + INCLUDE_DIRS $ LINK_LIBS rosbag2_transport - AMENT_DEPS test_msgs rosbag2_test_common) + AMENT_DEPS test_msgs rosbag2_test_common + ${SKIP_TEST}) rosbag2_transport_add_gmock(test_play_loop test/rosbag2_transport/test_play_loop.cpp diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index c6e1a4be87..4309e46053 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "rcl/graph.h" @@ -44,22 +45,28 @@ namespace * Determine which QoS to offer for a topic. * The priority of the profile selected is: * 1. The override specified in play_options (if one exists for the topic). - * 2. The default RMW QoS profile. + * 2. A profile automatically adapted to the recorded QoS profiles of publishers on the topic. * * \param topic_name The full name of the topic, with namespace (ex. /arm/joint_status). * \param topic_qos_profile_overrides A map of topic to QoS profile overrides. * @return The QoS profile to be used for subscribing. */ rclcpp::QoS publisher_qos_for_topic( - const std::string & topic_name, - std::unordered_map topic_qos_profile_overrides) + const rosbag2_storage::TopicMetadata & topic, + const std::unordered_map & topic_qos_profile_overrides) { - auto qos_it = topic_qos_profile_overrides.find(topic_name); + using rosbag2_transport::Rosbag2QoS; + auto qos_it = topic_qos_profile_overrides.find(topic.name); if (qos_it != topic_qos_profile_overrides.end()) { - ROSBAG2_TRANSPORT_LOG_INFO_STREAM("Overriding QoS profile for topic " << topic_name); - return rosbag2_transport::Rosbag2QoS{qos_it->second}; + ROSBAG2_TRANSPORT_LOG_INFO_STREAM("Overriding QoS profile for topic " << topic.name); + return Rosbag2QoS{qos_it->second}; + } else if (topic.offered_qos_profiles.empty()) { + return Rosbag2QoS{}; } - return rosbag2_transport::Rosbag2QoS{}; + + const auto profiles_yaml = YAML::Load(topic.offered_qos_profiles); + const auto offered_qos_profiles = profiles_yaml.as>(); + return Rosbag2QoS::adapt_offer_to_recorded_offers(topic.name, offered_qos_profiles); } } // namespace @@ -189,7 +196,7 @@ void Player::prepare_publishers(const PlayOptions & options) auto topics = reader_->get_all_topics_and_types(); for (const auto & topic : topics) { - auto topic_qos = publisher_qos_for_topic(topic.name, topic_qos_profile_overrides_); + auto topic_qos = publisher_qos_for_topic(topic, topic_qos_profile_overrides_); publishers_.insert( std::make_pair( topic.name, rosbag2_transport_->create_generic_publisher( diff --git a/rosbag2_transport/test/rosbag2_transport/test_play.cpp b/rosbag2_transport/test/rosbag2_transport/test_play.cpp index a2c6599488..58e47a4e36 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play.cpp @@ -32,7 +32,10 @@ #include "test_msgs/msg/basic_types.hpp" #include "test_msgs/message_fixtures.hpp" +#include "qos.hpp" + #include "rosbag2_play_test_fixture.hpp" +#include "rosbag2_transport_test_fixture.hpp" using namespace ::testing; // NOLINT using namespace rosbag2_transport; // NOLINT @@ -205,35 +208,60 @@ class RosBag2PlayQosOverrideTestFixture : public RosBag2PlayTestFixture RosBag2PlayQosOverrideTestFixture() : RosBag2PlayTestFixture() { - // Because this test only cares about compatibility (receiving any messages at all) + } + + void initialize(const std::vector & offered_qos) + { + // Because these tests only cares about compatibility (receiving any messages at all) // We publish one more message than we expect to receive, to avoid caring about // shutdown edge behaviors that are not explicitly being tested here. const size_t num_msgs_to_publish = num_msgs_to_wait_for_ + 1; - topic_timestamps_ms_.reserve(num_msgs_to_publish); + messages_.reserve(num_msgs_to_publish); for (size_t i = 0; i < num_msgs_to_publish; i++) { - topic_timestamps_ms_.push_back(start_time_ms_ + message_spacing_ms_ * i); + const auto timestamp = start_time_ms_ + message_spacing_ms_ * i; + messages_.push_back(serialize_test_message(topic_name_, timestamp, basic_msg_)); } - messages_.reserve(topic_timestamps_ms_.size()); - for (const auto topic_timestamp : topic_timestamps_ms_) { - messages_.push_back(serialize_test_message(topic_name_, topic_timestamp, basic_msg_)); + std::string serialized_offered_qos = ""; + if (!offered_qos.empty()) { + YAML::Node offered_qos_yaml; + for (const auto & profile : offered_qos) { + offered_qos_yaml.push_back(profile); + } + serialized_offered_qos = YAML::Dump(offered_qos_yaml); } + topic_types_.push_back( + {topic_name_, msg_type_, "" /*serialization_format*/, serialized_offered_qos}); auto prepared_mock_reader = std::make_unique(); prepared_mock_reader->prepare(messages_, topic_types_); reader_ = std::make_unique(std::move(prepared_mock_reader)); } + template + void play_and_wait(Duration timeout, bool expect_timeout = false) + { + auto await_received_messages = sub_->spin_subscriptions(); + Rosbag2Transport transport{reader_, writer_, info_}; + transport.play(storage_options_, play_options_); + const auto result = await_received_messages.wait_for(timeout); + // Must EXPECT, can't ASSERT because transport needs to be shutdown if timed out + if (expect_timeout) { + EXPECT_EQ(result, std::future_status::timeout); + } else { + EXPECT_NE(result, std::future_status::timeout); + } + transport.shutdown(); + } + const std::string topic_name_{"/test_topic"}; const std::string msg_type_{"test_msgs/BasicTypes"}; - const size_t num_msgs_to_wait_for_{3}; + // Receiving _any_ messages means we've confirmed compatibility in these tests + const size_t num_msgs_to_wait_for_{1}; test_msgs::msg::BasicTypes::SharedPtr basic_msg_{get_messages_basic_types()[0]}; - const std::vector topic_types_{ - {topic_name_, msg_type_, "" /*serialization_format*/, "" /*offered_qos_profiles*/} - }; + std::vector topic_types_{}; const int64_t start_time_ms_{500}; const int64_t message_spacing_ms_{200}; - std::vector topic_timestamps_ms_{}; std::vector> messages_; }; @@ -246,27 +274,20 @@ TEST_F(RosBag2PlayQosOverrideTestFixture, topic_qos_profiles_overridden) // to receive messages. const auto qos_request = rclcpp::QoS{rclcpp::KeepAll()}.reliable().transient_local(); const auto qos_playback_override = rclcpp::QoS{rclcpp::KeepAll()}.reliable().transient_local(); - const auto topic_qos_profile_overrides = std::unordered_map{ std::pair{topic_name_, qos_playback_override}, }; + // This should normally take less than 1s - just making it shorter than 60s default + const auto timeout = 5s; + + initialize({}); sub_->add_subscription( topic_name_, num_msgs_to_wait_for_, qos_request); - auto await_received_messages = sub_->spin_subscriptions(); play_options_.topic_qos_profile_overrides = topic_qos_profile_overrides; - Rosbag2Transport rosbag2_transport{reader_, writer_, info_}; - rosbag2_transport.play(storage_options_, play_options_); - - // This should normally take less than 1s - just making it shorter than 60s default - const auto future_result = await_received_messages.wait_for(5s); - EXPECT_NE(future_result, std::future_status::timeout); - rosbag2_transport.shutdown(); - - const auto received_messages = - sub_->get_received_messages(topic_name_); - EXPECT_FALSE(received_messages.empty()); + // Fails if times out + play_and_wait(timeout); } TEST_F(RosBag2PlayQosOverrideTestFixture, topic_qos_profiles_overridden_incompatible) @@ -277,25 +298,89 @@ TEST_F(RosBag2PlayQosOverrideTestFixture, topic_qos_profiles_overridden_incompat // which should not be a compatible offer and therefore we should receive no messages. const auto qos_request = rclcpp::QoS{rclcpp::KeepAll()}.reliable(); const auto qos_playback_override = rclcpp::QoS{rclcpp::KeepAll()}.best_effort(); - const auto topic_qos_profile_overrides = std::unordered_map{ std::pair{topic_name_, qos_playback_override}, }; - play_options_.topic_qos_profile_overrides = topic_qos_profile_overrides; + // If any messages were going to come in, it should happen in under 1s even in slow scenarios + const auto timeout = 3s; + + initialize({}); sub_->add_subscription( topic_name_, num_msgs_to_wait_for_, qos_request); - auto await_received_messages = sub_->spin_subscriptions(); + play_options_.topic_qos_profile_overrides = topic_qos_profile_overrides; - Rosbag2Transport rosbag2_transport{reader_, writer_, info_}; - rosbag2_transport.play(storage_options_, play_options_); + // Fails if it doesn't time out + play_and_wait(timeout, true /* expect timeout */); +} + +TEST_F(RosBag2PlayQosOverrideTestFixture, playback_uses_recorded_transient_local_profile) +{ + // In this test, we subscribe requesting DURABILITY_TRANSIENT_LOCAL. + // The bag metadata has this recorded for the original Publisher, + // so playback's offer should be compatible (whereas the default offer would not be) + const auto transient_local_profile = Rosbag2QoS{Rosbag2QoS{}.transient_local()}; + // This should normally take less than 1s - just making it shorter than 60s default + const auto timeout = 5s; + + initialize({transient_local_profile}); + + sub_->add_subscription( + topic_name_, num_msgs_to_wait_for_, transient_local_profile); + + // Fails if times out + play_and_wait(timeout); +} + +TEST_F(RosBag2PlayQosOverrideTestFixture, playback_uses_recorded_deadline) +{ + // By default, QoS profiles use "unspecified/infinite" offers for duration-based policies + // The subscription in this test requests a finite Deadline, and by receiving messages + // we know that the playback has used the recorded finite Deadline duration. + + // The publisher is offering 2Hz. The subscription requests 1Hz, which is less strict of a + // requirement, so they are compatible. + const rclcpp::Duration request_deadline{1s}; + const rclcpp::Duration offer_deadline{500ms}; + const auto request_profile = Rosbag2QoS{}.deadline(request_deadline); + const auto offer_profile = Rosbag2QoS{Rosbag2QoS{}.deadline(offer_deadline)}; + const auto timeout = 5s; + + initialize({offer_profile}); + sub_->add_subscription( + topic_name_, num_msgs_to_wait_for_, request_profile); + play_and_wait(timeout); +} - using namespace std::chrono_literals; - const auto future_result = await_received_messages.wait_for(3s); - EXPECT_EQ(future_result, std::future_status::timeout); +TEST_F(RosBag2PlayQosOverrideTestFixture, override_has_precedence_over_recorded) +{ + // In this test, we show that the playback prefers the user override to the recorded values. + // The subscription requests a Liveliness lease_duration that is shorter than the original + // recorded publisher offered, so no messages should be passed. + // However, the override to the publisher offers a shorter duration than the request, so + // it should now be compatible. + const rclcpp::Duration liveliness_request{500ms}; + const rclcpp::Duration recorded_liveliness_offer{1000ms}; + const rclcpp::Duration override_liveliness_offer{250ms}; + ASSERT_LT(liveliness_request, recorded_liveliness_offer); + ASSERT_LT(override_liveliness_offer, liveliness_request); + const auto request_profile = Rosbag2QoS{}.liveliness_lease_duration(liveliness_request); + const auto recorded_offer_profile = Rosbag2QoS{Rosbag2QoS{}.liveliness_lease_duration( + recorded_liveliness_offer)}; + const auto override_offer_profile = Rosbag2QoS{Rosbag2QoS{}.liveliness_lease_duration( + override_liveliness_offer)}; + const auto topic_qos_profile_overrides = std::unordered_map{ + std::pair{topic_name_, override_offer_profile}, + }; + // This should normally take less than 1s - just making it shorter than 60s default + const auto timeout = 5s; + + initialize({recorded_offer_profile}); + + sub_->add_subscription( + topic_name_, num_msgs_to_wait_for_, request_profile); + play_options_.topic_qos_profile_overrides = topic_qos_profile_overrides; - rosbag2_transport.shutdown(); - const auto received_messages = - sub_->get_received_messages(topic_name_); - EXPECT_EQ(received_messages.size(), 0u); + // Fails if times out + play_and_wait(timeout); }