Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pause/resume options to the bag recorder #905

Merged
merged 10 commits into from
Nov 24, 2021
4 changes: 4 additions & 0 deletions ros2bag/ros2bag/verb/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ def add_arguments(self, parser, cli_name): # noqa: D102
'write:'
' pragmas: [\"<setting_name>\" = <setting_value>]'
'For a list of sqlite3 settings, refer to sqlite3 documentation')
parser.add_argument(
'--start-paused', action='store_true', default=False,
help='Start the recorder in a paused state.')
self._subparser = parser

def main(self, *, args): # noqa: D102
Expand Down Expand Up @@ -219,6 +222,7 @@ def main(self, *, args): # noqa: D102
record_options.compression_threads = args.compression_threads
record_options.topic_qos_profile_overrides = qos_profile_overrides
record_options.include_hidden_topics = args.include_hidden_topics
record_options.start_paused = args.start_paused

recorder = Recorder()

Expand Down
1 change: 1 addition & 0 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ PYBIND11_MODULE(_transport, m) {
&RecordOptions::getTopicQoSProfileOverrides,
&RecordOptions::setTopicQoSProfileOverrides)
.def_readwrite("include_hidden_topics", &RecordOptions::include_hidden_topics)
.def_readwrite("start_paused", &RecordOptions::start_paused)
;

py::class_<rosbag2_py::Player>(m, "Player")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <unordered_map>
#include <vector>

#include "keyboard_handler/keyboard_handler.hpp"
#include "rclcpp/rclcpp.hpp"

namespace rosbag2_transport
Expand All @@ -41,6 +42,8 @@ struct RecordOptions
uint64_t compression_threads = 0;
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides{};
bool include_hidden_topics = false;
bool start_paused = false;
KeyboardHandler::KeyCode pause_resume_toggle_key = KeyboardHandler::KeyCode::SPACE;
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
};

} // namespace rosbag2_transport
Expand Down
25 changes: 25 additions & 0 deletions rosbag2_transport/include/rosbag2_transport/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <utility>
#include <vector>

#include "keyboard_handler/keyboard_handler.hpp"

#include "rclcpp/node.hpp"
#include "rclcpp/qos.hpp"

Expand Down Expand Up @@ -80,6 +82,22 @@ class Recorder : public rclcpp::Node
ROSBAG2_TRANSPORT_PUBLIC
const rosbag2_cpp::Writer & get_writer_handle();

/// Pause the recording.
ROSBAG2_TRANSPORT_PUBLIC
void pause();

/// Resume recording.
ROSBAG2_TRANSPORT_PUBLIC
void resume();

/// Pause if it was recording, continue recording if paused.
ROSBAG2_TRANSPORT_PUBLIC
void toogle_paused();
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved

/// Return the current paused state.
ROSBAG2_TRANSPORT_PUBLIC
bool is_paused();

private:
void topics_discovery();

Expand Down Expand Up @@ -124,6 +142,13 @@ class Recorder : public rclcpp::Node
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
std::unordered_set<std::string> topic_unknown_types_;
rclcpp::Service<rosbag2_interfaces::srv::Snapshot>::SharedPtr srv_snapshot_;
// This is a 0/1 flag.
// Using an int, so we can atomically toogle it (atomic_bool doesn't have a fetch_xor operator).
std::atomic<int> paused_ = 0;
// Keyboard handler
std::shared_ptr<KeyboardHandler> keyboard_handler_;
// Toogle paused key callback handle
KeyboardHandler::callback_handle_t toogle_paused_key_callback_handle_;
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
};

} // namespace rosbag2_transport
Expand Down
53 changes: 51 additions & 2 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,15 @@ Recorder::Recorder(
writer_(std::move(writer)),
storage_options_(storage_options),
record_options_(record_options),
stop_discovery_(record_options_.is_discovery_disabled)
stop_discovery_(record_options_.is_discovery_disabled),
paused_(record_options.start_paused),
keyboard_handler_(std::make_shared<KeyboardHandler>())
{
}

Recorder::~Recorder()
{
keyboard_handler_->delete_key_press_callback(toogle_paused_key_callback_handle_);
stop_discovery_ = true;
if (discovery_future_.valid()) {
discovery_future_.wait();
Expand Down Expand Up @@ -117,13 +120,57 @@ void Recorder::record()
discovery_future_ =
std::async(std::launch::async, std::bind(&Recorder::topics_discovery, this));
}
auto key = record_options_.pause_resume_toggle_key;
std::string key_str = enum_key_code_to_str(key);
if (key == KeyboardHandler::KeyCode::UNKNOWN) {
RCLCPP_ERROR_STREAM(
get_logger(),
"Invalid key binding " << key_str << " for pausing/resuming");
throw std::invalid_argument("Invalid key binding.");
}
toogle_paused_key_callback_handle_ =
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
keyboard_handler_->add_key_press_callback(
[this](KeyboardHandler::KeyCode /*key_code*/,
KeyboardHandler::KeyModifiers /*key_modifiers*/) {this->toogle_paused();},
key);
// show instructions
RCLCPP_INFO_STREAM(
get_logger(),
"Press " << key_str << " for pausing/resuming");
}

const rosbag2_cpp::Writer & Recorder::get_writer_handle()
{
return *writer_;
}

void Recorder::pause()
{
paused_.exchange(1);
RCLCPP_INFO_STREAM(get_logger(), "Pausing recording.");
}

void Recorder::resume()
{
paused_.exchange(0);
RCLCPP_INFO_STREAM(get_logger(), "Resuming recording.");
}

void Recorder::toogle_paused()
{
auto was_paused_before = static_cast<bool>(paused_.fetch_xor(1));
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
if (was_paused_before) {
RCLCPP_INFO_STREAM(get_logger(), "Resuming recording.");
} else {
RCLCPP_INFO_STREAM(get_logger(), "Pausing recording.");
}
}

bool Recorder::is_paused()
{
return paused_.load();
}

void Recorder::topics_discovery()
{
while (rclcpp::ok() && stop_discovery_ == false) {
Expand Down Expand Up @@ -236,7 +283,9 @@ Recorder::create_subscription(
topic_type,
qos,
[this, topic_name, topic_type](std::shared_ptr<rclcpp::SerializedMessage> message) {
writer_->write(message, topic_name, topic_type, rclcpp::Clock(RCL_SYSTEM_TIME).now());
if (!paused_.load()) {
writer_->write(message, topic_name, topic_type, rclcpp::Clock(RCL_SYSTEM_TIME).now());
}
});
return subscription;
}
Expand Down