From 8fa9fc8c4fddb5917d6600e7a8ca411158d21deb Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Mon, 20 Apr 2020 14:32:01 -0700 Subject: [PATCH] add test for playback using qos profile Signed-off-by: Emerson Knapp --- rosbag2_transport/CMakeLists.txt | 1 + .../test/rosbag2_transport/test_play.cpp | 102 ++++++++++++------ 2 files changed, 69 insertions(+), 34 deletions(-) diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index 762ed7ebbb..92c79c4d72 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -157,6 +157,7 @@ function(create_tests_for_rmw_implementation) rosbag2_transport_add_gmock(test_play test/rosbag2_transport/test_play.cpp + INCLUDE_DIRS $ ${SKIP_TEST} LINK_LIBS rosbag2_transport AMENT_DEPS test_msgs rosbag2_test_common) diff --git a/rosbag2_transport/test/rosbag2_transport/test_play.cpp b/rosbag2_transport/test/rosbag2_transport/test_play.cpp index a2c6599488..73b144abc7 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play.cpp @@ -32,7 +32,10 @@ #include "test_msgs/msg/basic_types.hpp" #include "test_msgs/message_fixtures.hpp" +#include "qos.hpp" + #include "rosbag2_play_test_fixture.hpp" +#include "rosbag2_transport_test_fixture.hpp" using namespace ::testing; // NOLINT using namespace rosbag2_transport; // NOLINT @@ -205,35 +208,59 @@ class RosBag2PlayQosOverrideTestFixture : public RosBag2PlayTestFixture RosBag2PlayQosOverrideTestFixture() : RosBag2PlayTestFixture() { - // Because this test only cares about compatibility (receiving any messages at all) + } + + void initialize(const std::vector & offered_qos) + { + // Because these tests only cares about compatibility (receiving any messages at all) // We publish one more message than we expect to receive, to avoid caring about // shutdown edge behaviors that are not explicitly being tested here. const size_t num_msgs_to_publish = num_msgs_to_wait_for_ + 1; - topic_timestamps_ms_.reserve(num_msgs_to_publish); + messages_.reserve(num_msgs_to_publish); for (size_t i = 0; i < num_msgs_to_publish; i++) { - topic_timestamps_ms_.push_back(start_time_ms_ + message_spacing_ms_ * i); + const auto timestamp = start_time_ms_ + message_spacing_ms_ * i; + messages_.push_back(serialize_test_message(topic_name_, timestamp, basic_msg_)); } - messages_.reserve(topic_timestamps_ms_.size()); - for (const auto topic_timestamp : topic_timestamps_ms_) { - messages_.push_back(serialize_test_message(topic_name_, topic_timestamp, basic_msg_)); + std::string serialized_offered_qos = ""; + if (!offered_qos.empty()) { + YAML::Node offered_qos_yaml; + for (const auto & profile : offered_qos) { + offered_qos_yaml.push_back(profile); + } + serialized_offered_qos = YAML::Dump(offered_qos_yaml); } + topic_types_.push_back( + {topic_name_, msg_type_, "" /*serialization_format*/, serialized_offered_qos}); auto prepared_mock_reader = std::make_unique(); prepared_mock_reader->prepare(messages_, topic_types_); reader_ = std::make_unique(std::move(prepared_mock_reader)); } + template + void play_and_wait(Duration timeout, bool expect_timeout = false) + { + auto await_received_messages = sub_->spin_subscriptions(); + Rosbag2Transport transport(reader_, writer_, info_); + transport.play(storage_options_, play_options_); + const auto result = await_received_messages.wait_for(timeout); + if (expect_timeout) { + ASSERT_EQ(result, std::future_status::timeout); + } else { + ASSERT_NE(result, std::future_status::timeout); + } + transport.shutdown(); + } + const std::string topic_name_{"/test_topic"}; const std::string msg_type_{"test_msgs/BasicTypes"}; - const size_t num_msgs_to_wait_for_{3}; + // Receiving _any_ messages means we've confirmed compatibility in these tests + const size_t num_msgs_to_wait_for_{1}; test_msgs::msg::BasicTypes::SharedPtr basic_msg_{get_messages_basic_types()[0]}; - const std::vector topic_types_{ - {topic_name_, msg_type_, "" /*serialization_format*/, "" /*offered_qos_profiles*/} - }; + std::vector topic_types_{}; const int64_t start_time_ms_{500}; const int64_t message_spacing_ms_{200}; - std::vector topic_timestamps_ms_{}; std::vector> messages_; }; @@ -246,27 +273,19 @@ TEST_F(RosBag2PlayQosOverrideTestFixture, topic_qos_profiles_overridden) // to receive messages. const auto qos_request = rclcpp::QoS{rclcpp::KeepAll()}.reliable().transient_local(); const auto qos_playback_override = rclcpp::QoS{rclcpp::KeepAll()}.reliable().transient_local(); - const auto topic_qos_profile_overrides = std::unordered_map{ std::pair{topic_name_, qos_playback_override}, }; + // This should normally take less than 1s - just making it shorter than 60s default + const auto timeout = 5s; + + initialize({}); sub_->add_subscription( topic_name_, num_msgs_to_wait_for_, qos_request); - auto await_received_messages = sub_->spin_subscriptions(); play_options_.topic_qos_profile_overrides = topic_qos_profile_overrides; - Rosbag2Transport rosbag2_transport{reader_, writer_, info_}; - rosbag2_transport.play(storage_options_, play_options_); - - // This should normally take less than 1s - just making it shorter than 60s default - const auto future_result = await_received_messages.wait_for(5s); - EXPECT_NE(future_result, std::future_status::timeout); - rosbag2_transport.shutdown(); - - const auto received_messages = - sub_->get_received_messages(topic_name_); - EXPECT_FALSE(received_messages.empty()); + play_and_wait(timeout); } TEST_F(RosBag2PlayQosOverrideTestFixture, topic_qos_profiles_overridden_incompatible) @@ -277,25 +296,40 @@ TEST_F(RosBag2PlayQosOverrideTestFixture, topic_qos_profiles_overridden_incompat // which should not be a compatible offer and therefore we should receive no messages. const auto qos_request = rclcpp::QoS{rclcpp::KeepAll()}.reliable(); const auto qos_playback_override = rclcpp::QoS{rclcpp::KeepAll()}.best_effort(); - const auto topic_qos_profile_overrides = std::unordered_map{ std::pair{topic_name_, qos_playback_override}, }; - play_options_.topic_qos_profile_overrides = topic_qos_profile_overrides; + // If any messages were going to come in, it should happen in under 1s even in slow scenarios + const auto timeout = 3s; + + initialize({}); sub_->add_subscription( topic_name_, num_msgs_to_wait_for_, qos_request); - auto await_received_messages = sub_->spin_subscriptions(); - - Rosbag2Transport rosbag2_transport{reader_, writer_, info_}; - rosbag2_transport.play(storage_options_, play_options_); + play_options_.topic_qos_profile_overrides = topic_qos_profile_overrides; - using namespace std::chrono_literals; - const auto future_result = await_received_messages.wait_for(3s); - EXPECT_EQ(future_result, std::future_status::timeout); + play_and_wait(timeout, true /* expect timeout */); - rosbag2_transport.shutdown(); const auto received_messages = sub_->get_received_messages(topic_name_); EXPECT_EQ(received_messages.size(), 0u); } + +TEST_F(RosBag2PlayQosOverrideTestFixture, playback_uses_recorded_profiles) +{ + // In this test, we subscribe requesting DURABILITY_TRANSIENT_LOCAL. + // The bag metadata has this recorded for the original Publisher, + // so playback's offer should be compatible (whereas the default offer would not be) + const size_t arbitrary_finite_history{5}; + const auto transient_local_profile = Rosbag2QoS{ + rclcpp::QoS{arbitrary_finite_history}.transient_local()}; + // This should normally take less than 1s - just making it shorter than 60s default + const auto timeout = 5s; + + initialize({transient_local_profile}); + + sub_->add_subscription( + topic_name_, num_msgs_to_wait_for_, transient_local_profile); + + play_and_wait(timeout); +}