Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pause/resume options to the bag recorder #905

Merged
merged 10 commits into from
Nov 24, 2021
4 changes: 4 additions & 0 deletions ros2bag/ros2bag/verb/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ def add_arguments(self, parser, cli_name): # noqa: D102
'write:'
' pragmas: [\"<setting_name>\" = <setting_value>]'
'For a list of sqlite3 settings, refer to sqlite3 documentation')
parser.add_argument(
'--start-paused', action='store_true', default=False,
help='Start the recorder in a paused state.')
self._subparser = parser

def main(self, *, args): # noqa: D102
Expand Down Expand Up @@ -219,6 +222,7 @@ def main(self, *, args): # noqa: D102
record_options.compression_threads = args.compression_threads
record_options.topic_qos_profile_overrides = qos_profile_overrides
record_options.include_hidden_topics = args.include_hidden_topics
record_options.start_paused = args.start_paused

recorder = Recorder()

Expand Down
1 change: 1 addition & 0 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ PYBIND11_MODULE(_transport, m) {
&RecordOptions::getTopicQoSProfileOverrides,
&RecordOptions::setTopicQoSProfileOverrides)
.def_readwrite("include_hidden_topics", &RecordOptions::include_hidden_topics)
.def_readwrite("start_paused", &RecordOptions::start_paused)
;

py::class_<rosbag2_py::Player>(m, "Player")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,51 @@ TEST_F(RecordFixture, record_end_to_end_test) {
EXPECT_THAT(wrong_topic_messages, IsEmpty());
}

TEST_F(RecordFixture, record_end_to_end_test_start_paused) {
auto message = get_messages_strings()[0];
message->string_value = "test";

rosbag2_test_common::PublicationManager pub_manager;
pub_manager.setup_publisher("/test_topic", message, 10);

auto process_handle = start_execution(
"ros2 bag record --max-cache-size 0 --output " + root_bag_path_.string() +
" --start-paused /test_topic");
auto cleanup_process_handle = rcpputils::make_scope_exit(
[process_handle]() {
stop_execution(process_handle);
});

ASSERT_TRUE(pub_manager.wait_for_matched("/test_topic")) <<
"Expected find rosbag subscription";

wait_for_db();

pub_manager.run_publishers();

stop_execution(process_handle);
cleanup_process_handle.cancel();

// TODO(Martin-Idel-SI): Find out how to correctly send a Ctrl-C signal on Windows
// This is necessary as the process is killed hard on Windows and doesn't write a metadata file
#ifdef _WIN32
rosbag2_storage::BagMetadata metadata{};
metadata.version = 1;
metadata.storage_identifier = "sqlite3";
metadata.relative_file_paths = {get_bag_file_path(0).string()};
metadata.duration = std::chrono::nanoseconds(0);
metadata.starting_time =
std::chrono::time_point<std::chrono::high_resolution_clock>(std::chrono::nanoseconds(0));
metadata.message_count = 0;
rosbag2_storage::MetadataIo metadata_io;
metadata_io.write_metadata(root_bag_path_.string(), metadata);
#endif

wait_for_metadata();
auto test_topic_messages = get_messages_for_topic<test_msgs::msg::Strings>("/test_topic");
EXPECT_THAT(test_topic_messages, IsEmpty());
}

TEST_F(RecordFixture, record_end_to_end_exits_gracefully_on_sigterm) {
const std::string topic_name = "/test_sigterm";
auto message = get_messages_strings()[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <unordered_map>
#include <vector>

#include "keyboard_handler/keyboard_handler.hpp"
#include "rclcpp/rclcpp.hpp"

namespace rosbag2_transport
Expand All @@ -41,6 +42,7 @@ struct RecordOptions
uint64_t compression_threads = 0;
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides{};
bool include_hidden_topics = false;
bool start_paused = false;
};

} // namespace rosbag2_transport
Expand Down
35 changes: 35 additions & 0 deletions rosbag2_transport/include/rosbag2_transport/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <utility>
#include <vector>

#include "keyboard_handler/keyboard_handler.hpp"

#include "rclcpp/node.hpp"
#include "rclcpp/qos.hpp"

Expand Down Expand Up @@ -59,6 +61,15 @@ class Recorder : public rclcpp::Node
const std::string & node_name = "rosbag2_recorder",
const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions());

ROSBAG2_TRANSPORT_PUBLIC
Recorder(
std::shared_ptr<rosbag2_cpp::Writer> writer,
std::shared_ptr<KeyboardHandler> keyboard_handler,
const rosbag2_storage::StorageOptions & storage_options,
const rosbag2_transport::RecordOptions & record_options,
const std::string & node_name = "rosbag2_recorder",
const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions());

ROSBAG2_TRANSPORT_PUBLIC
virtual ~Recorder();

Expand All @@ -80,6 +91,24 @@ class Recorder : public rclcpp::Node
ROSBAG2_TRANSPORT_PUBLIC
const rosbag2_cpp::Writer & get_writer_handle();

/// Pause the recording.
ROSBAG2_TRANSPORT_PUBLIC
void pause();

/// Resume recording.
ROSBAG2_TRANSPORT_PUBLIC
void resume();

/// Pause if it was recording, continue recording if paused.
ROSBAG2_TRANSPORT_PUBLIC
void toggle_paused();

/// Return the current paused state.
ROSBAG2_TRANSPORT_PUBLIC
bool is_paused();

inline constexpr static const auto kPauseResumeToggleKey = KeyboardHandler::KeyCode::SPACE;

private:
void topics_discovery();

Expand Down Expand Up @@ -124,6 +153,12 @@ class Recorder : public rclcpp::Node
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
std::unordered_set<std::string> topic_unknown_types_;
rclcpp::Service<rosbag2_interfaces::srv::Snapshot>::SharedPtr srv_snapshot_;
std::atomic<bool> paused_ = false;
// Keyboard handler
std::shared_ptr<KeyboardHandler> keyboard_handler_;
// Toogle paused key callback handle
KeyboardHandler::callback_handle_t toggle_paused_key_callback_handle_ =
KeyboardHandler::invalid_handle;
};

} // namespace rosbag2_transport
Expand Down
61 changes: 59 additions & 2 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,45 @@ Recorder::Recorder(
const rosbag2_transport::RecordOptions & record_options,
const std::string & node_name,
const rclcpp::NodeOptions & node_options)
: Recorder(
std::move(writer),
std::make_shared<KeyboardHandler>(),
storage_options,
record_options,
node_name,
node_options)
{}

Recorder::Recorder(
std::shared_ptr<rosbag2_cpp::Writer> writer,
std::shared_ptr<KeyboardHandler> keyboard_handler,
const rosbag2_storage::StorageOptions & storage_options,
const rosbag2_transport::RecordOptions & record_options,
const std::string & node_name,
const rclcpp::NodeOptions & node_options)
: rclcpp::Node(node_name, rclcpp::NodeOptions(node_options).start_parameter_event_publisher(false)),
writer_(std::move(writer)),
storage_options_(storage_options),
record_options_(record_options),
stop_discovery_(record_options_.is_discovery_disabled)
stop_discovery_(record_options_.is_discovery_disabled),
paused_(record_options.start_paused),
keyboard_handler_(std::move(keyboard_handler))
{
std::string key_str = enum_key_code_to_str(Recorder::kPauseResumeToggleKey);
toggle_paused_key_callback_handle_ =
keyboard_handler_->add_key_press_callback(
[this](KeyboardHandler::KeyCode /*key_code*/,
KeyboardHandler::KeyModifiers /*key_modifiers*/) {this->toggle_paused();},
Recorder::kPauseResumeToggleKey);
// show instructions
RCLCPP_INFO_STREAM(
get_logger(),
"Press " << key_str << " for pausing/resuming");
}

Recorder::~Recorder()
{
keyboard_handler_->delete_key_press_callback(toggle_paused_key_callback_handle_);
stop_discovery_ = true;
if (discovery_future_.valid()) {
discovery_future_.wait();
Expand Down Expand Up @@ -124,6 +153,32 @@ const rosbag2_cpp::Writer & Recorder::get_writer_handle()
return *writer_;
}

void Recorder::pause()
{
paused_.store(true);
RCLCPP_INFO_STREAM(get_logger(), "Pausing recording.");
}

void Recorder::resume()
{
paused_.store(false);
RCLCPP_INFO_STREAM(get_logger(), "Resuming recording.");
}

void Recorder::toggle_paused()
{
if (paused_.load()) {
this->resume();
} else {
this->pause();
}
}

bool Recorder::is_paused()
{
return paused_.load();
}

void Recorder::topics_discovery()
{
while (rclcpp::ok() && stop_discovery_ == false) {
Expand Down Expand Up @@ -236,7 +291,9 @@ Recorder::create_subscription(
topic_type,
qos,
[this, topic_name, topic_type](std::shared_ptr<rclcpp::SerializedMessage> message) {
writer_->write(message, topic_name, topic_type, rclcpp::Clock(RCL_SYSTEM_TIME).now());
if (!paused_.load()) {
writer_->write(message, topic_name, topic_type, rclcpp::Clock(RCL_SYSTEM_TIME).now());
}
});
return subscription;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
#include "rosbag2_test_common/subscription_manager.hpp"

#include "rosbag2_transport/player.hpp"
#include "rosbag2_transport/recorder.hpp"

#include "test_msgs/msg/arrays.hpp"
#include "test_msgs/msg/basic_types.hpp"
#include "test_msgs/message_fixtures.hpp"

#include "record_integration_fixture.hpp"
#include "rosbag2_play_test_fixture.hpp"
#include "rosbag2_transport_test_fixture.hpp"
#include "mock_keyboard_handler.hpp"
Expand Down Expand Up @@ -177,3 +179,29 @@ TEST_F(RosBag2PlayTestFixture, test_keyboard_controls)
EXPECT_THAT(player->num_played_next, 1);
EXPECT_THAT(player->num_set_rate, 2);
}

TEST_F(RecordIntegrationTestFixture, test_keyboard_controls)
{
auto mock_sequential_writer = std::make_unique<MockSequentialWriter>();
auto writer = std::make_unique<rosbag2_cpp::Writer>(std::move(mock_sequential_writer));
auto keyboard_handler = std::make_shared<MockKeyboardHandler>();

rosbag2_transport::RecordOptions record_options = {true, false, {}, "rmw_format", 100ms};
record_options.start_paused = true;

auto recorder = std::make_shared<Recorder>(
std::move(writer), keyboard_handler, storage_options_,
record_options, "test_count_recorder");

recorder->record();

this->start_async_spin(recorder);

EXPECT_THAT(recorder->is_paused(), true);
keyboard_handler->simulate_key_press(rosbag2_transport::Recorder::kPauseResumeToggleKey);
EXPECT_THAT(recorder->is_paused(), false);
keyboard_handler->simulate_key_press(rosbag2_transport::Recorder::kPauseResumeToggleKey);
EXPECT_THAT(recorder->is_paused(), true);

this->stop_spinning();
}