Skip to content

Commit

Permalink
Add pause and resume service calls for rosbag2 recorder (ros2#1131)
Browse files Browse the repository at this point in the history
* feat(recorder): Add pause and resume service calls.

Add service calls to pause recording and resume recording.

Fixes ros2#1130

Signed-off-by: Rick Shanor <rickshanor@gmail.com>

* feat(recorder): pause, resume, and is_paused PR comments.

Address PR comments. Add is_paused service. Update tests accordingly.

Signed-off-by: Rick Shanor <rickshanor@gmail.com>

Signed-off-by: Rick Shanor <rickshanor@gmail.com>
  • Loading branch information
rshanor authored and haudren-woven committed Aug 24, 2023
1 parent 58aa739 commit 5b5e632
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 0 deletions.
6 changes: 6 additions & 0 deletions rosbag2_transport/include/rosbag2_transport/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

#include "rosbag2_cpp/writer.hpp"

#include "rosbag2_interfaces/srv/is_paused.hpp"
#include "rosbag2_interfaces/srv/pause.hpp"
#include "rosbag2_interfaces/srv/resume.hpp"
#include "rosbag2_interfaces/srv/snapshot.hpp"

#include "rosbag2_interfaces/msg/write_split_event.hpp"
Expand Down Expand Up @@ -162,6 +165,9 @@ class Recorder : public rclcpp::Node
std::string serialization_format_;
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
std::unordered_set<std::string> topic_unknown_types_;
rclcpp::Service<rosbag2_interfaces::srv::IsPaused>::SharedPtr srv_is_paused_;
rclcpp::Service<rosbag2_interfaces::srv::Pause>::SharedPtr srv_pause_;
rclcpp::Service<rosbag2_interfaces::srv::Resume>::SharedPtr srv_resume_;
rclcpp::Service<rosbag2_interfaces::srv::Snapshot>::SharedPtr srv_snapshot_;
std::atomic<bool> paused_ = false;

Expand Down
30 changes: 30 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,36 @@ void Recorder::record()
split_event_pub_ =
create_publisher<rosbag2_interfaces::msg::WriteSplitEvent>("events/write_split", 1);

srv_pause_ = create_service<rosbag2_interfaces::srv::Pause>(
"~/pause",
[this](
const std::shared_ptr<rmw_request_id_t>/* request_header */,
const std::shared_ptr<rosbag2_interfaces::srv::Pause::Request>/* request */,
const std::shared_ptr<rosbag2_interfaces::srv::Pause::Response>/* response */)
{
pause();
});

srv_resume_ = create_service<rosbag2_interfaces::srv::Resume>(
"~/resume",
[this](
const std::shared_ptr<rmw_request_id_t>/* request_header */,
const std::shared_ptr<rosbag2_interfaces::srv::Resume::Request>/* request */,
const std::shared_ptr<rosbag2_interfaces::srv::Resume::Response>/* response */)
{
resume();
});

srv_is_paused_ = create_service<rosbag2_interfaces::srv::IsPaused>(
"~/is_paused",
[this](
const std::shared_ptr<rmw_request_id_t>/* request_header */,
const std::shared_ptr<rosbag2_interfaces::srv::IsPaused::Request>/* request */,
const std::shared_ptr<rosbag2_interfaces::srv::IsPaused::Response> response)
{
response->paused = is_paused();
});

// Start the thread that will publish events
event_publisher_thread_ = std::thread(&Recorder::event_publisher_thread_main, this);

Expand Down
29 changes: 29 additions & 0 deletions rosbag2_transport/test/rosbag2_transport/test_record_services.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

#include "rclcpp/rclcpp.hpp"

#include "rosbag2_interfaces/srv/is_paused.hpp"
#include "rosbag2_interfaces/srv/pause.hpp"
#include "rosbag2_interfaces/srv/resume.hpp"
#include "rosbag2_interfaces/srv/snapshot.hpp"
#include "rosbag2_transport/recorder.hpp"

Expand All @@ -35,6 +38,9 @@ using namespace ::testing; // NOLINT
class RecordSrvsTest : public RecordIntegrationTestFixture
{
public:
using IsPaused = rosbag2_interfaces::srv::IsPaused;
using Pause = rosbag2_interfaces::srv::Pause;
using Resume = rosbag2_interfaces::srv::Resume;
using Snapshot = rosbag2_interfaces::srv::Snapshot;

RecordSrvsTest()
Expand Down Expand Up @@ -71,6 +77,9 @@ class RecordSrvsTest : public RecordIntegrationTestFixture
pub_manager.setup_publisher(test_topic_, string_message, 10);

const std::string ns = "/" + recorder_name_;
cli_is_paused_ = client_node_->create_client<IsPaused>(ns + "/is_paused");
cli_pause_ = client_node_->create_client<Pause>(ns + "/pause");
cli_resume_ = client_node_->create_client<Resume>(ns + "/resume");
cli_snapshot_ = client_node_->create_client<Snapshot>(ns + "/snapshot");

exec_ = std::make_shared<rclcpp::executors::SingleThreadedExecutor>();
Expand Down Expand Up @@ -126,6 +135,9 @@ class RecordSrvsTest : public RecordIntegrationTestFixture

// Service clients
rclcpp::Node::SharedPtr client_node_;
rclcpp::Client<IsPaused>::SharedPtr cli_is_paused_;
rclcpp::Client<Pause>::SharedPtr cli_pause_;
rclcpp::Client<Resume>::SharedPtr cli_resume_;
rclcpp::Client<Snapshot>::SharedPtr cli_snapshot_;
};

Expand All @@ -148,3 +160,20 @@ TEST_F(RecordSrvsTest, trigger_snapshot)
successful_service_request<Snapshot>(cli_snapshot_);
EXPECT_THAT(mock_writer.get_messages().size(), Ne(0u));
}

TEST_F(RecordSrvsTest, pause_resume)
{
EXPECT_FALSE(recorder_->is_paused());
auto is_paused_response = successful_service_request<IsPaused>(cli_is_paused_);
EXPECT_FALSE(is_paused_response->paused);

successful_service_request<Pause>(cli_pause_);
EXPECT_TRUE(recorder_->is_paused());
is_paused_response = successful_service_request<IsPaused>(cli_is_paused_);
EXPECT_TRUE(is_paused_response->paused);

successful_service_request<Resume>(cli_resume_);
EXPECT_FALSE(recorder_->is_paused());
is_paused_response = successful_service_request<IsPaused>(cli_is_paused_);
EXPECT_FALSE(is_paused_response->paused);
}

0 comments on commit 5b5e632

Please sign in to comment.