Skip to content

Commit

Permalink
[humble] Redesign record_services tests to make them more determinist…
Browse files Browse the repository at this point in the history
…ic (#1122) (#1142)

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
(cherry picked from commit 5f0c7d4)

Co-authored-by: Michael Orlov <michael.orlov@apex.ai>
  • Loading branch information
mergify[bot] and MichaelOrlov authored Apr 12, 2023
1 parent 49eefa1 commit e30daed
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,7 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn
messages_per_topic_[message->topic_name] += 1;
messages_per_file_ += 1;
if (messages_per_file_ == max_messages_per_file_) { // "Split" the bag every few messages
auto info = std::make_shared<rosbag2_cpp::bag_events::BagSplitInfo>();
info->closed_file = "BagFile" + std::to_string(file_number_);
file_number_ += 1;
info->opened_file = "BagFile" + std::to_string(file_number_);
callback_manager_.execute_callbacks(rosbag2_cpp::bag_events::BagEvent::WRITE_SPLIT, info);
messages_per_file_ = 0;
this->split_bagfile();
}
}

Expand All @@ -73,6 +68,16 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn
return true;
}

void split_bagfile()
{
auto info = std::make_shared<rosbag2_cpp::bag_events::BagSplitInfo>();
info->closed_file = "BagFile" + std::to_string(file_number_);
file_number_ += 1;
info->opened_file = "BagFile" + std::to_string(file_number_);
callback_manager_.execute_callbacks(rosbag2_cpp::bag_events::BagEvent::WRITE_SPLIT, info);
messages_per_file_ = 0;
}

void
add_event_callbacks(const rosbag2_cpp::bag_events::WriterEventCallbacks & callbacks) override
{
Expand Down Expand Up @@ -103,6 +108,11 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn
return topics_;
}

void set_max_messages_per_file(size_t max_messages_per_file)
{
max_messages_per_file_ = max_messages_per_file;
}

size_t max_messages_per_file() const
{
return max_messages_per_file_;
Expand All @@ -117,7 +127,7 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn
bool snapshot_mode_ = false;
rosbag2_cpp::bag_events::EventCallbackManager callback_manager_;
size_t file_number_ = 0;
const size_t max_messages_per_file_ = 5;
size_t max_messages_per_file_ = 0;
};

#endif // ROSBAG2_TRANSPORT__MOCK_SEQUENTIAL_WRITER_HPP_
6 changes: 4 additions & 2 deletions rosbag2_transport/test/rosbag2_transport/test_record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,9 @@ TEST_F(RecordIntegrationTestFixture, write_split_callback_is_called)
};
writer_->add_event_callbacks(callbacks);

auto & mock_writer = dynamic_cast<MockSequentialWriter &>(writer_->get_implementation_handle());
mock_writer.set_max_messages_per_file(5);

rosbag2_transport::RecordOptions record_options =
{false, false, {string_topic}, "rmw_format", 100ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
Expand All @@ -333,8 +336,7 @@ TEST_F(RecordIntegrationTestFixture, write_split_callback_is_called)
start_async_spin(recorder);

auto & writer = recorder->get_writer_handle();
MockSequentialWriter & mock_writer =
static_cast<MockSequentialWriter &>(writer.get_implementation_handle());
mock_writer = dynamic_cast<MockSequentialWriter &>(writer.get_implementation_handle());

const size_t expected_messages = mock_writer.max_messages_per_file() + 1;

Expand Down
28 changes: 12 additions & 16 deletions rosbag2_transport/test/rosbag2_transport/test_record_services.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@
#include "rosbag2_transport/recorder.hpp"

#include "rosbag2_test_common/publication_manager.hpp"
#include "rosbag2_test_common/wait_for.hpp"

#include "test_msgs/msg/arrays.hpp"
#include "test_msgs/msg/basic_types.hpp"
#include "test_msgs/msg/strings.hpp"
#include "test_msgs/message_fixtures.hpp"

#include "record_integration_fixture.hpp"
Expand All @@ -56,10 +52,6 @@ class RecordSrvsTest : public RecordIntegrationTestFixture
{
}

void subscription_callback(const test_msgs::msg::Strings::SharedPtr)
{
}

/// Use SetUp instead of ctor because we want to ASSERT some preconditions for the tests
void SetUp() override
{
Expand All @@ -76,7 +68,7 @@ class RecordSrvsTest : public RecordIntegrationTestFixture

auto string_message = get_messages_strings()[1];
rosbag2_test_common::PublicationManager pub_manager;
pub_manager.setup_publisher(test_topic_, string_message, 50);
pub_manager.setup_publisher(test_topic_, string_message, 10);

const std::string ns = "/" + recorder_name_;
cli_snapshot_ = client_node_->create_client<Snapshot>(ns + "/snapshot");
Expand Down Expand Up @@ -124,7 +116,7 @@ class RecordSrvsTest : public RecordIntegrationTestFixture
// Basic configuration
const std::string recorder_name_ = "rosbag2_recorder_for_test_srvs";
const std::chrono::seconds service_wait_timeout_ {2};
const std::chrono::seconds service_call_timeout_ {1};
const std::chrono::seconds service_call_timeout_ {2};
const std::string test_topic_ = "/recorder_srvs_test_topic";

// Orchestration
Expand All @@ -140,14 +132,18 @@ class RecordSrvsTest : public RecordIntegrationTestFixture
TEST_F(RecordSrvsTest, trigger_snapshot)
{
auto & writer = recorder_->get_writer_handle();
MockSequentialWriter & mock_writer =
static_cast<MockSequentialWriter &>(writer.get_implementation_handle());
auto & mock_writer = dynamic_cast<MockSequentialWriter &>(writer.get_implementation_handle());
EXPECT_THAT(mock_writer.get_messages().size(), Eq(0u));

// Sleep for 2 seconds to allow messages to accumulate in snapshot buffer
std::chrono::duration<float> duration(2.0);
std::this_thread::sleep_for(duration);
EXPECT_THAT(mock_writer.get_snapshot_buffer().size(), Gt(0u));
// Wait for messages to be appeared in snapshot_buffer
std::chrono::duration<int> timeout = std::chrono::seconds(10);
using clock = std::chrono::steady_clock;
auto start = clock::now();
while (mock_writer.get_snapshot_buffer().empty()) {
std::this_thread::sleep_for(std::chrono::milliseconds(30));
EXPECT_LE((clock::now() - start), timeout) << "Failed to capture messages in time";
}
EXPECT_THAT(mock_writer.get_messages().size(), Eq(0u));

successful_service_request<Snapshot>(cli_snapshot_);
EXPECT_THAT(mock_writer.get_messages().size(), Ne(0u));
Expand Down

0 comments on commit e30daed

Please sign in to comment.