Skip to content

Commit

Permalink
add test for playback using qos profile
Browse files Browse the repository at this point in the history
Signed-off-by: Emerson Knapp <emerson.b.knapp@gmail.com>
  • Loading branch information
emersonknapp committed Apr 20, 2020
1 parent 4652cc3 commit 8fa9fc8
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 34 deletions.
1 change: 1 addition & 0 deletions rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ function(create_tests_for_rmw_implementation)

rosbag2_transport_add_gmock(test_play
test/rosbag2_transport/test_play.cpp
INCLUDE_DIRS $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/src/rosbag2_transport>
${SKIP_TEST}
LINK_LIBS rosbag2_transport
AMENT_DEPS test_msgs rosbag2_test_common)
Expand Down
102 changes: 68 additions & 34 deletions rosbag2_transport/test/rosbag2_transport/test_play.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
#include "test_msgs/msg/basic_types.hpp"
#include "test_msgs/message_fixtures.hpp"

#include "qos.hpp"

#include "rosbag2_play_test_fixture.hpp"
#include "rosbag2_transport_test_fixture.hpp"

using namespace ::testing; // NOLINT
using namespace rosbag2_transport; // NOLINT
Expand Down Expand Up @@ -205,35 +208,59 @@ class RosBag2PlayQosOverrideTestFixture : public RosBag2PlayTestFixture
RosBag2PlayQosOverrideTestFixture()
: RosBag2PlayTestFixture()
{
// Because this test only cares about compatibility (receiving any messages at all)
}

void initialize(const std::vector<rosbag2_transport::Rosbag2QoS> & offered_qos)
{
// Because these tests only cares about compatibility (receiving any messages at all)
// We publish one more message than we expect to receive, to avoid caring about
// shutdown edge behaviors that are not explicitly being tested here.
const size_t num_msgs_to_publish = num_msgs_to_wait_for_ + 1;
topic_timestamps_ms_.reserve(num_msgs_to_publish);
messages_.reserve(num_msgs_to_publish);
for (size_t i = 0; i < num_msgs_to_publish; i++) {
topic_timestamps_ms_.push_back(start_time_ms_ + message_spacing_ms_ * i);
const auto timestamp = start_time_ms_ + message_spacing_ms_ * i;
messages_.push_back(serialize_test_message(topic_name_, timestamp, basic_msg_));
}

messages_.reserve(topic_timestamps_ms_.size());
for (const auto topic_timestamp : topic_timestamps_ms_) {
messages_.push_back(serialize_test_message(topic_name_, topic_timestamp, basic_msg_));
std::string serialized_offered_qos = "";
if (!offered_qos.empty()) {
YAML::Node offered_qos_yaml;
for (const auto & profile : offered_qos) {
offered_qos_yaml.push_back(profile);
}
serialized_offered_qos = YAML::Dump(offered_qos_yaml);
}
topic_types_.push_back(
{topic_name_, msg_type_, "" /*serialization_format*/, serialized_offered_qos});

auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
prepared_mock_reader->prepare(messages_, topic_types_);
reader_ = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));
}

template<typename Duration>
void play_and_wait(Duration timeout, bool expect_timeout = false)
{
auto await_received_messages = sub_->spin_subscriptions();
Rosbag2Transport transport(reader_, writer_, info_);
transport.play(storage_options_, play_options_);
const auto result = await_received_messages.wait_for(timeout);
if (expect_timeout) {
ASSERT_EQ(result, std::future_status::timeout);
} else {
ASSERT_NE(result, std::future_status::timeout);
}
transport.shutdown();
}

const std::string topic_name_{"/test_topic"};
const std::string msg_type_{"test_msgs/BasicTypes"};
const size_t num_msgs_to_wait_for_{3};
// Receiving _any_ messages means we've confirmed compatibility in these tests
const size_t num_msgs_to_wait_for_{1};
test_msgs::msg::BasicTypes::SharedPtr basic_msg_{get_messages_basic_types()[0]};
const std::vector<rosbag2_storage::TopicMetadata> topic_types_{
{topic_name_, msg_type_, "" /*serialization_format*/, "" /*offered_qos_profiles*/}
};
std::vector<rosbag2_storage::TopicMetadata> topic_types_{};
const int64_t start_time_ms_{500};
const int64_t message_spacing_ms_{200};
std::vector<int64_t> topic_timestamps_ms_{};
std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages_;
};

Expand All @@ -246,27 +273,19 @@ TEST_F(RosBag2PlayQosOverrideTestFixture, topic_qos_profiles_overridden)
// to receive messages.
const auto qos_request = rclcpp::QoS{rclcpp::KeepAll()}.reliable().transient_local();
const auto qos_playback_override = rclcpp::QoS{rclcpp::KeepAll()}.reliable().transient_local();

const auto topic_qos_profile_overrides = std::unordered_map<std::string, rclcpp::QoS>{
std::pair<std::string, rclcpp::QoS>{topic_name_, qos_playback_override},
};
// This should normally take less than 1s - just making it shorter than 60s default
const auto timeout = 5s;

initialize({});

sub_->add_subscription<test_msgs::msg::BasicTypes>(
topic_name_, num_msgs_to_wait_for_, qos_request);
auto await_received_messages = sub_->spin_subscriptions();
play_options_.topic_qos_profile_overrides = topic_qos_profile_overrides;

Rosbag2Transport rosbag2_transport{reader_, writer_, info_};
rosbag2_transport.play(storage_options_, play_options_);

// This should normally take less than 1s - just making it shorter than 60s default
const auto future_result = await_received_messages.wait_for(5s);
EXPECT_NE(future_result, std::future_status::timeout);
rosbag2_transport.shutdown();

const auto received_messages =
sub_->get_received_messages<test_msgs::msg::BasicTypes>(topic_name_);
EXPECT_FALSE(received_messages.empty());
play_and_wait(timeout);
}

TEST_F(RosBag2PlayQosOverrideTestFixture, topic_qos_profiles_overridden_incompatible)
Expand All @@ -277,25 +296,40 @@ TEST_F(RosBag2PlayQosOverrideTestFixture, topic_qos_profiles_overridden_incompat
// which should not be a compatible offer and therefore we should receive no messages.
const auto qos_request = rclcpp::QoS{rclcpp::KeepAll()}.reliable();
const auto qos_playback_override = rclcpp::QoS{rclcpp::KeepAll()}.best_effort();

const auto topic_qos_profile_overrides = std::unordered_map<std::string, rclcpp::QoS>{
std::pair<std::string, rclcpp::QoS>{topic_name_, qos_playback_override},
};
play_options_.topic_qos_profile_overrides = topic_qos_profile_overrides;
// If any messages were going to come in, it should happen in under 1s even in slow scenarios
const auto timeout = 3s;

initialize({});

sub_->add_subscription<test_msgs::msg::BasicTypes>(
topic_name_, num_msgs_to_wait_for_, qos_request);
auto await_received_messages = sub_->spin_subscriptions();

Rosbag2Transport rosbag2_transport{reader_, writer_, info_};
rosbag2_transport.play(storage_options_, play_options_);
play_options_.topic_qos_profile_overrides = topic_qos_profile_overrides;

using namespace std::chrono_literals;
const auto future_result = await_received_messages.wait_for(3s);
EXPECT_EQ(future_result, std::future_status::timeout);
play_and_wait(timeout, true /* expect timeout */);

rosbag2_transport.shutdown();
const auto received_messages =
sub_->get_received_messages<test_msgs::msg::BasicTypes>(topic_name_);
EXPECT_EQ(received_messages.size(), 0u);
}

TEST_F(RosBag2PlayQosOverrideTestFixture, playback_uses_recorded_profiles)
{
// In this test, we subscribe requesting DURABILITY_TRANSIENT_LOCAL.
// The bag metadata has this recorded for the original Publisher,
// so playback's offer should be compatible (whereas the default offer would not be)
const size_t arbitrary_finite_history{5};
const auto transient_local_profile = Rosbag2QoS{
rclcpp::QoS{arbitrary_finite_history}.transient_local()};
// This should normally take less than 1s - just making it shorter than 60s default
const auto timeout = 5s;

initialize({transient_local_profile});

sub_->add_subscription<test_msgs::msg::BasicTypes>(
topic_name_, num_msgs_to_wait_for_, transient_local_profile);

play_and_wait(timeout);
}

0 comments on commit 8fa9fc8

Please sign in to comment.