diff --git a/ros2bag/ros2bag/verb/burst.py b/ros2bag/ros2bag/verb/burst.py index c2c0876879..bbd9f8589c 100644 --- a/ros2bag/ros2bag/verb/burst.py +++ b/ros2bag/ros2bag/verb/burst.py @@ -41,8 +41,12 @@ def add_arguments(self, parser, cli_name): # noqa: D102 'delay of message playback.') parser.add_argument( '--topics', type=str, default=[], nargs='+', - help='topics to replay, separated by space. If none specified, all topics will be ' - 'replayed.') + help='topics to replay, separated by space. At least one topic needs to be ' + "specified. If this parameter isn\'t specified, all topics will be replayed.") + parser.add_argument( + '--services', type=str, default=[], nargs='+', + help='services to replay, separated by space. At least one service needs to be ' + "specified. If this parameter isn\'t specified, all services will be replayed.") parser.add_argument( '--qos-profile-overrides-path', type=FileType('r'), help='Path to a yaml file defining overrides of the QoS profile for specific topics.') @@ -90,6 +94,13 @@ def main(self, *, args): # noqa: D102 play_options.node_prefix = NODE_NAME_PREFIX play_options.rate = 1.0 play_options.topics_to_filter = args.topics + # Convert service name to service event topic name + services = [] + if args.services and len(args.services) != 0: + for s in args.services: + name = '/' + s if s[0] != '/' else s + services.append(name + '/_service_event') + play_options.services_to_filter = services play_options.topic_qos_profile_overrides = qos_profile_overrides play_options.loop = False play_options.topic_remapping_options = topic_remapping diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 5c4e0218d4..e25b720cb6 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -52,14 +52,22 @@ def add_arguments(self, parser, cli_name): # noqa: D102 '--topics', type=str, default=[], nargs='+', help='topics to replay, separated by space. If none specified, all topics will be ' 'replayed.') + parser.add_argument( + '--services', type=str, default=[], nargs='+', + help='services to replay, separated by space. At least one service needs to be ' + 'specified.') parser.add_argument( '-e', '--regex', default='', help='filter topics by regular expression to replay, separated by space. If none ' 'specified, all topics will be replayed.') parser.add_argument( - '-x', '--exclude', default='', + '--exclude-topics', default='', help='regular expressions to exclude topics from replay, separated by space. If none ' 'specified, all topics will be replayed.') + parser.add_argument( + '--exclude-services', default='', + help='regular expressions to exclude services from replay, separated by space. If ' + 'none specified, all services will be replayed.') parser.add_argument( '--qos-profile-overrides-path', type=FileType('r'), help='Path to a yaml file defining overrides of the QoS profile for specific topics.') @@ -183,8 +191,21 @@ def main(self, *, args): # noqa: D102 play_options.node_prefix = NODE_NAME_PREFIX play_options.rate = args.rate play_options.topics_to_filter = args.topics - play_options.topics_regex_to_filter = args.regex - play_options.topics_regex_to_exclude = args.exclude + + # Convert service name to service event topic name + services = [] + if args.services and len(args.services) != 0: + for s in args.services: + name = '/' + s if s[0] != '/' else s + services.append(name + '/_service_event') + play_options.services_to_filter = services + + play_options.regex_to_filter = args.regex + play_options.topics_regex_to_exclude = args.exclude_topics + if args.exclude_services: + play_options.services_regex_to_exclude = args.exclude_services + '/_service_event' + else: + play_options.services_regex_to_exclude = args.exclude_services play_options.topic_qos_profile_overrides = qos_profile_overrides play_options.loop = args.loop play_options.topic_remapping_options = topic_remapping diff --git a/rosbag2_py/src/rosbag2_py/_storage.cpp b/rosbag2_py/src/rosbag2_py/_storage.cpp index 4ecdcb2c4f..109eb3cdda 100644 --- a/rosbag2_py/src/rosbag2_py/_storage.cpp +++ b/rosbag2_py/src/rosbag2_py/_storage.cpp @@ -120,15 +120,22 @@ PYBIND11_MODULE(_storage, m) { pybind11::class_(m, "StorageFilter") .def( - pybind11::init, std::string, std::string>(), + pybind11::init< + std::vector, std::vector, std::string, std::string, std::string>(), pybind11::arg("topics") = std::vector(), - pybind11::arg("topics_regex") = "", - pybind11::arg("topics_regex_to_exclude") = "") + pybind11::arg("services") = std::vector(), + pybind11::arg("regex") = "", + pybind11::arg("topics_regex_to_exclude") = "", + pybind11::arg("services_regex_to_exclude") = "") .def_readwrite("topics", &rosbag2_storage::StorageFilter::topics) - .def_readwrite("topics_regex", &rosbag2_storage::StorageFilter::topics_regex) + .def_readwrite("services", &rosbag2_storage::StorageFilter::services) + .def_readwrite("regex", &rosbag2_storage::StorageFilter::regex) .def_readwrite( "topics_regex_to_exclude", - &rosbag2_storage::StorageFilter::topics_regex_to_exclude); + &rosbag2_storage::StorageFilter::topics_regex_to_exclude) + .def_readwrite( + "services_regex_to_exclude", + &rosbag2_storage::StorageFilter::services_regex_to_exclude); pybind11::class_(m, "MessageDefinition") .def( diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index d9ee4c3833..c17025f19d 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -318,8 +318,10 @@ PYBIND11_MODULE(_transport, m) { .def_readwrite("node_prefix", &PlayOptions::node_prefix) .def_readwrite("rate", &PlayOptions::rate) .def_readwrite("topics_to_filter", &PlayOptions::topics_to_filter) - .def_readwrite("topics_regex_to_filter", &PlayOptions::topics_regex_to_filter) + .def_readwrite("services_to_filter", &PlayOptions::services_to_filter) + .def_readwrite("regex_to_filter", &PlayOptions::regex_to_filter) .def_readwrite("topics_regex_to_exclude", &PlayOptions::topics_regex_to_exclude) + .def_readwrite("services_regex_to_exclude", &PlayOptions::services_regex_to_exclude) .def_property( "topic_qos_profile_overrides", &PlayOptions::getTopicQoSProfileOverrides, diff --git a/rosbag2_storage/include/rosbag2_storage/storage_filter.hpp b/rosbag2_storage/include/rosbag2_storage/storage_filter.hpp index 4c401505c2..6f5aedb3e1 100644 --- a/rosbag2_storage/include/rosbag2_storage/storage_filter.hpp +++ b/rosbag2_storage/include/rosbag2_storage/storage_filter.hpp @@ -25,18 +25,28 @@ struct StorageFilter { // Topic names to whitelist when reading a bag. Only messages matching these // specified topics will be returned. If list is empty, the filter is ignored - // and all messages are returned. + // and all messages of topics are returned. std::vector topics; - // Regular expression of topic names to whitelist when playing a bag. - // Only messages matching these specified topics will be played. + // Service names to whitelist when reading a bag. Only messages matching these + // specified service will be returned. If list is empty, the filter is ignored + // and all messages of services are returned. + std::vector services; + + // Regular expression of topic names and service name to whitelist when playing a bag. + // Only messages matching these specified topics or services will be played. // If list is empty, the filter is ignored and all messages are played. - std::string topics_regex = ""; + std::string regex = ""; // Regular expression of topic names to exclude when playing a bag. // Only messages not matching these specified topics will be played. - // If list is empty, the filter is ignored and all messages are played. + // If list is empty, the filter is ignored and all messages of topics are played. std::string topics_regex_to_exclude = ""; + + // Regular expression of topic names to exclude when playing a bag. + // Only messages not matching these specified services will be played. + // If list is empty, the filter is ignored and all messages of services are played. + std::string services_regex_to_exclude = ""; }; } // namespace rosbag2_storage diff --git a/rosbag2_storage_mcap/src/mcap_storage.cpp b/rosbag2_storage_mcap/src/mcap_storage.cpp index 716967a12b..b3fa432a1a 100644 --- a/rosbag2_storage_mcap/src/mcap_storage.cpp +++ b/rosbag2_storage_mcap/src/mcap_storage.cpp @@ -521,26 +521,63 @@ void MCAPStorage::reset_iterator() options.endTime = mcap::MaxTime; } options.readOrder = read_order_; - if (!storage_filter_.topics.empty()) { - options.topicFilter = [this](std::string_view topic) { + + auto filter_process = [this](std::string_view topic) { + if (!storage_filter_.topics.empty()) { for (const auto & match_topic : storage_filter_.topics) { if (match_topic == topic) { return true; } } - return false; - }; - } + } + + if (!storage_filter_.services.empty()) { + for (const auto & match_service : storage_filter_.services) { + if (match_service == topic) { + return true; + } + } + } + + bool topics_regex_to_exclude_match = false; + bool services_regex_to_exclude_match = false; + std::string topic_string(topic); + + if (!storage_filter_.topics_regex_to_exclude.empty()) { + std::smatch m; + std::regex re(storage_filter_.topics_regex_to_exclude); + topics_regex_to_exclude_match = std::regex_match(topic_string, m, re); + } + + if (!storage_filter_.services_regex_to_exclude.empty()) { + std::smatch m; + std::regex re(storage_filter_.services_regex_to_exclude); + services_regex_to_exclude_match = std::regex_match(topic_string, m, re); + } + #ifdef ROSBAG2_STORAGE_MCAP_HAS_STORAGE_FILTER_TOPIC_REGEX - if (!storage_filter_.topics_regex.empty()) { - options.topicFilter = [this](std::string_view topic) { + if (!storage_filter_.regex.empty()) { std::smatch m; - std::string topic_string(topic); - std::regex re(storage_filter_.topics_regex); - return std::regex_match(topic_string, m, re); - }; - } + std::regex re(storage_filter_.regex); + + if (std::regex_match(topic_string, m, re) && !topics_regex_to_exclude_match && + !services_regex_to_exclude_match) { + return true; + } else { + return false; + } + } #endif + + if ((storage_filter_.topics.empty() && !topics_regex_to_exclude_match) && + (storage_filter_.services.empty() && !services_regex_to_exclude_match)) { + return true; + } + + return false; + }; + options.topicFilter = filter_process; + linear_view_ = std::make_unique(mcap_reader_->readMessages(OnProblem, options)); linear_iterator_ = std::make_unique(linear_view_->begin()); diff --git a/rosbag2_storage_sqlite3/src/rosbag2_storage_sqlite3/sqlite_storage.cpp b/rosbag2_storage_sqlite3/src/rosbag2_storage_sqlite3/sqlite_storage.cpp index d83e8d5883..8c4cb75f20 100644 --- a/rosbag2_storage_sqlite3/src/rosbag2_storage_sqlite3/sqlite_storage.cpp +++ b/rosbag2_storage_sqlite3/src/rosbag2_storage_sqlite3/sqlite_storage.cpp @@ -521,6 +521,7 @@ void SqliteStorage::prepare_for_reading() "FROM messages JOIN topics ON messages.topic_id = topics.id WHERE "; std::vector where_conditions; + std::string topic_and_service_list; // add topic filter if (!storage_filter_.topics.empty()) { // Construct string for selected topics @@ -531,13 +532,39 @@ void SqliteStorage::prepare_for_reading() topic_list += ","; } } - where_conditions.push_back("(topics.name IN (" + topic_list + "))"); + topic_and_service_list = "(topics.name IN (" + topic_list + "))"; } - // add topic filter based on regular expression - if (!storage_filter_.topics_regex.empty()) { + + // add service filter + if (!storage_filter_.services.empty()) { // Construct string for selected topics - where_conditions.push_back("(topics.name REGEXP '" + storage_filter_.topics_regex + "')"); + std::string service_list{""}; + for (auto & service : storage_filter_.services) { + service_list += "'" + service + "'"; + if (&service != &storage_filter_.topics.back()) { + service_list += ","; + } + } + + topic_and_service_list = topic_and_service_list + + std::string(topic_and_service_list.empty() ? "" : " OR ") + + "(topics.name IN (" + service_list + "))"; + } + + std::string list_and_regex = topic_and_service_list; + // add topic filter based on regular expression + if (!storage_filter_.regex.empty()) { + std::string regex = "(topics.name REGEXP '" + storage_filter_.regex + "')"; + list_and_regex = list_and_regex + + std::string(!list_and_regex.empty() ? " OR " : "") + + regex; } + + if (!list_and_regex.empty()) { + where_conditions.push_back(list_and_regex); + } + + std::string exclude_topics_services; // exclude topics based on regular expressions if (!storage_filter_.topics_regex_to_exclude.empty()) { // Construct string for selected topics @@ -546,6 +573,14 @@ void SqliteStorage::prepare_for_reading() "(SELECT topics.name FROM topics WHERE topics.name REGEXP '" + storage_filter_.topics_regex_to_exclude + "'))"); } + // exclude service based on regular expressions + if (!storage_filter_.services_regex_to_exclude.empty()) { + // Construct string for selected topics + where_conditions.push_back( + "(topics.name NOT IN " + "(SELECT topics.name FROM topics WHERE topics.name REGEXP '" + + storage_filter_.services_regex_to_exclude + "'))"); + } const std::string direction_op = read_order_.reverse ? "<" : ">"; const std::string order_direction = read_order_.reverse ? "DESC" : "ASC"; diff --git a/rosbag2_storage_sqlite3/test/rosbag2_storage_sqlite3/test_sqlite_storage.cpp b/rosbag2_storage_sqlite3/test/rosbag2_storage_sqlite3/test_sqlite_storage.cpp index ae8582d02f..3a2d985da5 100644 --- a/rosbag2_storage_sqlite3/test/rosbag2_storage_sqlite3/test_sqlite_storage.cpp +++ b/rosbag2_storage_sqlite3/test/rosbag2_storage_sqlite3/test_sqlite_storage.cpp @@ -566,7 +566,7 @@ TEST_F(StorageTestFixture, read_next_returns_filtered_messages_regex) { readable_storage->open({db_filename, kPluginID}); rosbag2_storage::StorageFilter storage_filter; - storage_filter.topics_regex = "topic.*"; + storage_filter.regex = "topic.*"; readable_storage->set_filter(storage_filter); EXPECT_TRUE(readable_storage->has_next()); diff --git a/rosbag2_test_common/include/rosbag2_test_common/service_client_manager.hpp b/rosbag2_test_common/include/rosbag2_test_common/client_manager.hpp similarity index 93% rename from rosbag2_test_common/include/rosbag2_test_common/service_client_manager.hpp rename to rosbag2_test_common/include/rosbag2_test_common/client_manager.hpp index 2862b1fb98..a7764b4e49 100644 --- a/rosbag2_test_common/include/rosbag2_test_common/service_client_manager.hpp +++ b/rosbag2_test_common/include/rosbag2_test_common/client_manager.hpp @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef ROSBAG2_TEST_COMMON__SERVICE_CLIENT_MANAGER_HPP_ -#define ROSBAG2_TEST_COMMON__SERVICE_CLIENT_MANAGER_HPP_ +#ifndef ROSBAG2_TEST_COMMON__CLIENT_MANAGER_HPP_ +#define ROSBAG2_TEST_COMMON__CLIENT_MANAGER_HPP_ #include #include @@ -23,13 +23,14 @@ #include "rclcpp/rclcpp.hpp" // rclcpp must be included before the Windows specific includes. + namespace rosbag2_test_common { template -class ServiceClientManager : public rclcpp::Node +class ClientManager : public rclcpp::Node { public: - explicit ServiceClientManager( + explicit ClientManager( std::string service_name, size_t client_number = 1, bool service_event_contents = false, @@ -122,4 +123,4 @@ class ServiceClientManager : public rclcpp::Node }; } // namespace rosbag2_test_common -#endif // ROSBAG2_TEST_COMMON__SERVICE_CLIENT_MANAGER_HPP_ +#endif // ROSBAG2_TEST_COMMON__CLIENT_MANAGER_HPP_ diff --git a/rosbag2_test_common/include/rosbag2_test_common/service_manager.hpp b/rosbag2_test_common/include/rosbag2_test_common/service_manager.hpp new file mode 100644 index 0000000000..ea640eccb6 --- /dev/null +++ b/rosbag2_test_common/include/rosbag2_test_common/service_manager.hpp @@ -0,0 +1,89 @@ +// Copyright 2023 Sony Group Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_TEST_COMMON__SERVICE_MANAGER_HPP_ +#define ROSBAG2_TEST_COMMON__SERVICE_MANAGER_HPP_ + +#include +#include +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" + +namespace rosbag2_test_common +{ +class ServiceManager +{ +public: + ServiceManager() + : pub_node_(std::make_shared( + "service_manager_" + std::to_string(rclcpp::Clock().now().nanoseconds()), + rclcpp::NodeOptions().start_parameter_event_publisher(false).enable_rosout(false))) + { + } + + ~ServiceManager() + { + exit_ = true; + if (thread_.joinable()) { + thread_.join(); + } + } + + template + void setup_service( + std::string service_name, + std::vector> & requests) + { + auto callback = [&requests]( + const std::shared_ptr request_header, + const std::shared_ptr request, + std::shared_ptr response) { + (void)request_header; + (void)response; + requests.emplace_back(request); + }; + + auto service = pub_node_->create_service( + service_name, std::forward(callback)); + services_.emplace(service_name, service); + } + + void run_services() + { + auto spin = [this]() { + rclcpp::executors::SingleThreadedExecutor exec; + exec.add_node(pub_node_); + + while (!exit_) { + exec.spin_some(); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + }; + + thread_ = std::thread(spin); + } + +private: + std::shared_ptr pub_node_; + std::unordered_map services_; + bool exit_ = false; + std::thread thread_; +}; + +} // namespace rosbag2_test_common + +#endif // ROSBAG2_TEST_COMMON__SERVICE_MANAGER_HPP_ diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index 4f880d0614..8d0ed448a1 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -37,19 +37,29 @@ struct PlayOptions // Topic names to whitelist when playing a bag. // Only messages matching these specified topics will be played. - // If list is empty, the filter is ignored and all messages are played. + // If list is empty, the filter is ignored and all messages of topics are played. std::vector topics_to_filter = {}; - // Regular expression of topic names to whitelist when playing a bag. - // Only messages matching these specified topics will be played. + // Service names to whitelist when playing a bag. + // Only messages matching these specified service will be played. + // If list is empty, the filter is ignored and all messages of services are played. + std::vector services_to_filter = {}; + + // Regular expression of topic names and service name to whitelist when playing a bag. + // Only messages matching these specified topics and services will be played. // If list is empty, the filter is ignored and all messages are played. - std::string topics_regex_to_filter = ""; + std::string regex_to_filter = ""; // Regular expression of topic names to exclude when playing a bag. // Only messages not matching these specified topics will be played. // If list is empty, the filter is ignored and all messages are played. std::string topics_regex_to_exclude = ""; + // Regular expression of service names to exclude when playing a bag. + // Only messages not matching these specified topics will be played. + // If list is empty, the filter is ignored and all messages are played. + std::string services_regex_to_exclude = ""; + std::unordered_map topic_qos_profile_overrides = {}; bool loop = false; std::vector topic_remapping_options = {}; diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index 97884e65f8..eef5fbf114 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -30,11 +30,13 @@ #include "moodycamel/readerwriterqueue.h" +#include "rclcpp/logger.hpp" #include "rclcpp/node.hpp" #include "rclcpp/publisher.hpp" #include "rclcpp/qos.hpp" #include "rosbag2_cpp/clocks/player_clock.hpp" +#include "rosbag2_cpp/service_utils.hpp" #include "rosbag2_interfaces/msg/read_split_event.hpp" #include "rosbag2_interfaces/srv/get_rate.hpp" #include "rosbag2_interfaces/srv/is_paused.hpp" @@ -53,8 +55,12 @@ #include "rosbag2_transport/play_options.hpp" #include "rosbag2_transport/visibility_control.hpp" +#include "rosidl_typesupport_introspection_cpp/identifier.hpp" +#include "rosidl_typesupport_introspection_cpp/message_introspection.hpp" + #include "rosgraph_msgs/msg/clock.hpp" + namespace rosbag2_cpp { class Reader; @@ -242,11 +248,57 @@ class Player : public rclcpp::Node std::shared_ptr publisher_; std::function publish_func_; }; + + class PlayerClient final + { +public: + explicit PlayerClient( + std::shared_ptr cli, + std::string service_name, + std::string service_event_type, + const rclcpp::Logger & logger); + + // Can call this function if check_include_request_message() return true + void async_send_request(const rclcpp::SerializedMessage & message); + + std::shared_ptr generic_client() + { + return client_; + } + + // Check if message can be unpacked to get request message + bool is_include_request_message(const rclcpp::SerializedMessage & message); + +private: + std::shared_ptr client_; + std::string service_name_; + const rclcpp::Logger & logger_; + enum class introspection_type + { + UNKNOW, + METADATA, + CONTENTS + }; + introspection_type client_side_type_ = introspection_type::UNKNOW; + introspection_type service_side_type_ = introspection_type::UNKNOW; + + std::shared_ptr ts_lib_; + const rosidl_message_type_support_t * ts_; + const rosidl_typesupport_introspection_cpp::MessageMembers * message_members_; + + rcutils_allocator_t allocator_ = rcutils_get_default_allocator(); + + uint8_t get_msg_event_type(const rclcpp::SerializedMessage & message); + }; bool is_ready_to_play_from_queue_{false}; std::mutex ready_to_play_from_queue_mutex_; std::condition_variable ready_to_play_from_queue_cv_; rclcpp::Publisher::SharedPtr clock_publisher_; - std::unordered_map> publishers_; + using SharedPlayerPublisher = std::shared_ptr; + using SharedPlayerClient = std::shared_ptr; + std::unordered_map< + std::string, + std::variant> senders_; private: rosbag2_storage::SerializedBagMessageSharedPtr peek_next_message_from_queue(); diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index f767cab6ab..3d55929b39 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include "rcl/graph.h" @@ -32,6 +33,7 @@ #include "rosbag2_cpp/clocks/time_controller_clock.hpp" #include "rosbag2_cpp/reader.hpp" #include "rosbag2_cpp/typesupport_helpers.hpp" +#include "rosbag2_cpp/service_utils.hpp" #include "rosbag2_storage/storage_filter.hpp" @@ -263,13 +265,17 @@ bool Player::play() if (timeout == std::chrono::milliseconds(0)) { timeout = std::chrono::milliseconds(-1); } - for (auto pub : publishers_) { + for (auto pub : senders_) { try { - if (!pub.second->generic_publisher()->wait_for_all_acked(timeout)) { - RCLCPP_ERROR( - get_logger(), - "Timed out while waiting for all published messages to be acknowledged for topic %s", - pub.first.c_str()); + if (pub.second.index() == 0) { // publisher + if (!std::get(pub.second) + ->generic_publisher()->wait_for_all_acked(timeout)) + { + RCLCPP_ERROR( + get_logger(), + "Timed out while waiting for all published messages to be acknowledged for topic %s", + pub.first.c_str()); + } } } catch (std::exception & e) { RCLCPP_ERROR( @@ -601,8 +607,10 @@ void Player::prepare_publishers() { rosbag2_storage::StorageFilter storage_filter; storage_filter.topics = play_options_.topics_to_filter; - storage_filter.topics_regex = play_options_.topics_regex_to_filter; + storage_filter.services = play_options_.services_to_filter; + storage_filter.regex = play_options_.regex_to_filter; storage_filter.topics_regex_to_exclude = play_options_.topics_regex_to_exclude; + storage_filter.services_regex_to_exclude = play_options_.services_regex_to_exclude; reader_->set_filter(storage_filter); // Create /clock publisher @@ -643,39 +651,64 @@ void Player::prepare_publishers() auto topics = reader_->get_all_topics_and_types(); std::string topic_without_support_acked; for (const auto & topic : topics) { - if (publishers_.find(topic.name) != publishers_.end()) { + if (senders_.find(topic.name) != senders_.end()) { continue; } - // filter topics to add publishers if necessary + auto & filter_topics = storage_filter.topics; - if (!filter_topics.empty()) { - auto iter = std::find(filter_topics.begin(), filter_topics.end(), topic.name); - if (iter == filter_topics.end()) { - continue; + auto & filter_services = storage_filter.services; + + if (rosbag2_cpp::is_service_event_topic(topic.name, topic.type)) { + // filter services to add clients if necessary + if (!filter_services.empty()) { + auto iter = std::find(filter_services.begin(), filter_services.end(), topic.name); + if (iter == filter_services.end()) { + continue; + } } - } - - auto topic_qos = publisher_qos_for_topic( - topic, topic_qos_profile_overrides_, - get_logger()); - try { - std::shared_ptr pub = - create_generic_publisher(topic.name, topic.type, topic_qos); - std::shared_ptr player_pub = - std::make_shared( - std::move(pub), play_options_.disable_loan_message); - publishers_.insert(std::make_pair(topic.name, player_pub)); - if (play_options_.wait_acked_timeout >= 0 && - topic_qos.reliability() == rclcpp::ReliabilityPolicy::BestEffort) - { - topic_without_support_acked += topic.name + ", "; + auto service_name = rosbag2_cpp::service_event_topic_name_to_service_name(topic.name); + auto service_type = rosbag2_cpp::service_event_topic_type_to_service_type(topic.type); + try { + auto cli = create_generic_client(service_name, service_type); + auto player_cli = std::make_shared( + std::move(cli), service_name, topic.type, get_logger()); + senders_.insert(std::make_pair(topic.name, player_cli)); + } catch (const std::runtime_error & e) { + RCLCPP_WARN( + get_logger(), + "Ignoring a service '%s', reason: %s.", + service_name.c_str(), e.what()); + } + } else { + // filter topics to add publishers if necessary + if (!filter_topics.empty()) { + auto iter = std::find(filter_topics.begin(), filter_topics.end(), topic.name); + if (iter == filter_topics.end()) { + continue; + } + } + auto topic_qos = publisher_qos_for_topic( + topic, topic_qos_profile_overrides_, + get_logger()); + try { + std::shared_ptr pub = + create_generic_publisher(topic.name, topic.type, topic_qos); + std::shared_ptr player_pub = + std::make_shared( + std::move(pub), play_options_.disable_loan_message); + senders_.insert(std::make_pair(topic.name, player_pub)); + if (play_options_.wait_acked_timeout >= 0 && + topic_qos.reliability() == rclcpp::ReliabilityPolicy::BestEffort) + { + topic_without_support_acked += topic.name + ", "; + } + } catch (const std::runtime_error & e) { + // using a warning log seems better than adding a new option + // to ignore some unknown message type library + RCLCPP_WARN( + get_logger(), + "Ignoring a topic '%s', reason: %s.", topic.name.c_str(), e.what()); } - } catch (const std::runtime_error & e) { - // using a warning log seems better than adding a new option - // to ignore some unknown message type library - RCLCPP_WARN( - get_logger(), - "Ignoring a topic '%s', reason: %s.", topic.name.c_str(), e.what()); } } @@ -709,8 +742,17 @@ void Player::prepare_publishers() bool Player::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message) { bool message_published = false; - auto publisher_iter = publishers_.find(message->topic_name); - if (publisher_iter != publishers_.end()) { + + auto sender_iter = senders_.find(message->topic_name); + if (sender_iter != senders_.end()) { + // For sending requests, ignore service event messages that do not contain request information. + if (sender_iter->second.index() == 1 && + !std::get(sender_iter->second) + ->is_include_request_message(rclcpp::SerializedMessage(*message->serialized_data))) + { + return message_published; + } + { // Calling on play message pre-callbacks std::lock_guard lk(on_play_msg_callbacks_mutex_); for (auto & pre_callback_data : on_play_msg_pre_callbacks_) { @@ -720,13 +762,34 @@ bool Player::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr mess } } - try { - publisher_iter->second->publish(rclcpp::SerializedMessage(*message->serialized_data)); - message_published = true; - } catch (const std::exception & e) { - RCLCPP_ERROR_STREAM( - get_logger(), "Failed to publish message on '" << message->topic_name << - "' topic. \nError: " << e.what()); + switch (sender_iter->second.index()) { + case 0: // publisher + { + try { + std::get(sender_iter->second) + ->publish(rclcpp::SerializedMessage(*message->serialized_data)); + message_published = true; + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + get_logger(), "Failed to publish message on '" << message->topic_name << + "' topic. \nError: " << e.what()); + } + } + break; + case 1: // Client + { + try { + std::get(sender_iter->second) + ->async_send_request(rclcpp::SerializedMessage(*message->serialized_data)); + message_published = true; + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + get_logger(), "Failed to send request on '" << + rosbag2_cpp::service_event_topic_name_to_service_name(message->topic_name) << + "' service. \nError: " << e.what()); + } + } + break; } // Calling on play message post-callbacks @@ -936,4 +999,140 @@ void Player::publish_clock_update(const rclcpp::Time & time) } } +Player::PlayerClient::PlayerClient( + std::shared_ptr cli, + std::string service_name, + std::string service_event_type, + const rclcpp::Logger & logger) +: client_(std::move(cli)), + service_name_(service_name), + logger_(logger) +{ + ts_lib_ = rclcpp::get_typesupport_library( + service_event_type, "rosidl_typesupport_cpp"); + + ts_ = rclcpp::get_typesupport_handle( + service_event_type, + "rosidl_typesupport_cpp", + *ts_lib_); + + auto ts_handle = get_message_typesupport_handle( + ts_, + rosidl_typesupport_introspection_cpp::typesupport_identifier); + + message_members_ = + reinterpret_cast( + ts_handle->data); +} + +bool Player::PlayerClient::is_include_request_message(const rclcpp::SerializedMessage & message) +{ + auto type = get_msg_event_type(message); + + // Ignore response message + if (type == service_msgs::msg::ServiceEventInfo::RESPONSE_SENT || + type == service_msgs::msg::ServiceEventInfo::RESPONSE_RECEIVED) + { + return false; + } + + // Ignore metadata message + if (type == service_msgs::msg::ServiceEventInfo::REQUEST_SENT && + client_side_type_ == introspection_type::METADATA) + { + return false; + } + if (type == service_msgs::msg::ServiceEventInfo::REQUEST_RECEIVED && + service_side_type_ == introspection_type::METADATA) + { + return false; + } + + if (client_side_type_ == introspection_type::UNKNOW && + type == service_msgs::msg::ServiceEventInfo::REQUEST_SENT) + { + if (message.size() <= rosbag2_cpp::get_serialization_size_for_service_metadata_event()) { + client_side_type_ = introspection_type::METADATA; + RCLCPP_WARN( + logger_, + "The configuration of introspection for '%s' client is metadata !", + service_name_.c_str()); + return false; + } else { + client_side_type_ = introspection_type::CONTENTS; + } + } + if (service_side_type_ == introspection_type::UNKNOW && + type == service_msgs::msg::ServiceEventInfo::REQUEST_RECEIVED) + { + if (message.size() <= rosbag2_cpp::get_serialization_size_for_service_metadata_event()) { + service_side_type_ = introspection_type::METADATA; + RCLCPP_WARN( + logger_, + "The configuration of introspection for '%s' service is metadata !", + service_name_.c_str()); + return false; + } else { + service_side_type_ = introspection_type::CONTENTS; + } + } + + // If there are request send info and request receive info, only send request send info. + if (client_side_type_ == introspection_type::CONTENTS && + service_side_type_ == introspection_type::CONTENTS && + type == service_msgs::msg::ServiceEventInfo::REQUEST_RECEIVED) + { + return false; + } + + return true; +} + +void Player::PlayerClient::async_send_request(const rclcpp::SerializedMessage & message) +{ + void * ros_message = new uint8_t[message_members_->size_of_]; + message_members_->init_function( + ros_message, rosidl_runtime_cpp::MessageInitialization::ZERO); + + int ret = rmw_deserialize(&message.get_rcl_serialized_message(), ts_, ros_message); + if (ret == RMW_RET_OK) { + if (client_->service_is_ready()) { + // members_[0]: info, members_[1]: request, members_[2]: response + auto request_offset = message_members_->members_[1].offset_; + auto request_addr = reinterpret_cast(ros_message) + request_offset; + client_->async_send_request( + reinterpret_cast(*reinterpret_cast(request_addr))); + } else { + RCLCPP_ERROR( + logger_, + "'%s' service isn't ready !", + service_name_.c_str()); + } + } else { + throw std::runtime_error( + "Failed to deserialize service event message for " + service_name_ + " !"); + } + + message_members_->fini_function(ros_message); + delete[] static_cast(ros_message); +} + +uint8_t Player::PlayerClient::get_msg_event_type(const rclcpp::SerializedMessage & message) +{ + auto msg = service_msgs::msg::ServiceEventInfo(); + + const rosidl_message_type_support_t * type_support_info = + rosidl_typesupport_cpp:: + get_message_type_support_handle(); + + auto ret = rmw_deserialize( + &message.get_rcl_serialized_message(), + type_support_info, + reinterpret_cast(&msg)); + if (ret != RMW_RET_OK) { + throw std::runtime_error("It failed to deserialize message !"); + } + + return msg.event_type; +} } // namespace rosbag2_transport diff --git a/rosbag2_transport/test/rosbag2_transport/mock_player.hpp b/rosbag2_transport/test/rosbag2_transport/mock_player.hpp index 144bb6f522..ae17de860b 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_player.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_player.hpp @@ -36,10 +36,12 @@ class MockPlayer : public rosbag2_transport::Player std::vector get_list_of_publishers() { std::vector pub_list; - for (const auto & publisher : publishers_) { - pub_list.push_back( - static_cast( - publisher.second->generic_publisher().get())); + for (const auto & sender : senders_) { + if (sender.second.index() == 0) { + pub_list.push_back( + static_cast( + std::get(sender.second)->generic_publisher().get())); + } } if (clock_publisher_) { pub_list.push_back(clock_publisher_.get()); @@ -47,6 +49,20 @@ class MockPlayer : public rosbag2_transport::Player return pub_list; } + std::vector get_list_of_clients() + { + std::vector cli_list; + for (const auto & sender : senders_) { + if (sender.second.index() == 1) { + cli_list.push_back( + static_cast( + std::get(sender.second)->generic_client().get())); + } + } + + return cli_list; + } + void wait_for_playback_to_start() { std::unique_lock lk(ready_to_play_from_queue_mutex_); diff --git a/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp b/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp index 46ac168c01..6551f98929 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp @@ -43,14 +43,24 @@ class MockSequentialReader : public rosbag2_cpp::reader_interfaces::BaseReaderIn bool has_next() override { - if (filter_.topics.empty()) { + if (filter_.topics.empty() && filter_.services.empty()) { return num_read_ < messages_.size(); } while (num_read_ < messages_.size()) { - for (const auto & filter_topic : filter_.topics) { - if (!messages_[num_read_]->topic_name.compare(filter_topic)) { - return true; + if (!filter_.topics.empty()) { + for (const auto & filter_topic : filter_.topics) { + if (!messages_[num_read_]->topic_name.compare(filter_topic)) { + return true; + } + } + } + + if (!filter_.services.empty()) { + for (const auto & filter_service : filter_.services) { + if (!messages_[num_read_]->topic_name.compare(filter_service)) { + return true; + } } } num_read_++; diff --git a/rosbag2_transport/test/rosbag2_transport/rosbag2_play_test_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/rosbag2_play_test_fixture.hpp index c68ab7b01f..7b429e7140 100644 --- a/rosbag2_transport/test/rosbag2_transport/rosbag2_play_test_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/rosbag2_play_test_fixture.hpp @@ -18,6 +18,7 @@ #include #include "rosbag2_test_common/subscription_manager.hpp" +#include "rosbag2_test_common/service_manager.hpp" #include "rosbag2_transport_test_fixture.hpp" class RosBag2PlayTestFixture : public Rosbag2TransportTestFixture @@ -28,6 +29,7 @@ class RosBag2PlayTestFixture : public Rosbag2TransportTestFixture { rclcpp::init(0, nullptr); sub_ = std::make_shared(); + srv_ = std::make_shared(); } ~RosBag2PlayTestFixture() override @@ -36,6 +38,7 @@ class RosBag2PlayTestFixture : public Rosbag2TransportTestFixture } std::shared_ptr sub_; + std::shared_ptr srv_; }; #endif // ROSBAG2_TRANSPORT__ROSBAG2_PLAY_TEST_FIXTURE_HPP_ diff --git a/rosbag2_transport/test/rosbag2_transport/test_burst.cpp b/rosbag2_transport/test/rosbag2_transport/test_burst.cpp index 3e666b56ad..8b6f1c2542 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_burst.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_burst.cpp @@ -29,6 +29,7 @@ using namespace ::testing; // NOLINT using namespace rosbag2_transport; // NOLINT using namespace rosbag2_test_common; // NOLINT +using namespace std::chrono_literals; // NOLINT TEST_F(RosBag2PlayTestFixture, burst_with_false_preconditions) { auto primitive_message = get_messages_basic_types()[0]; @@ -365,3 +366,54 @@ TEST_F(RosBag2PlayTestFixture, burst_bursting_only_filtered_topics) { // All we care is that any messages arrived EXPECT_THAT(replayed_topic2, SizeIs(Eq(EXPECTED_BURST_COUNT))); } + +TEST_F(RosBag2PlayTestFixture, burst_bursting_only_filtered_services) { + auto services_types = std::vector{ + {"test_service1/_service_event", "test_msgs/srv/BasicTypes_Event", "", "", ""}, + {"test_service2/_service_event", "test_msgs/srv/BasicTypes_Event", "", "", ""}, + }; + std::vector> messages = + { + serialize_test_message( + "test_service1/_service_event", 500, get_service_event_message_basic_types()[0]), + serialize_test_message( + "test_service2/_service_event", 600, get_service_event_message_basic_types()[0]), + serialize_test_message( + "test_service1/_service_event", 400, get_service_event_message_basic_types()[1]), + serialize_test_message( + "test_service2/_service_event", 500, get_service_event_message_basic_types()[1]) + }; + + std::vector> service1_receive_requests; + std::vector> service2_receive_requests; + + srv_->setup_service("test_service1", service1_receive_requests); + srv_->setup_service("test_service2", service2_receive_requests); + + srv_->run_services(); + + // Filter allows test_service2, blocks test_service1 + play_options_.services_to_filter = {"test_service2/_service_event"}; + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, services_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + auto player = std::make_shared(std::move(reader), storage_options_, play_options_); + + player->pause(); + ASSERT_TRUE(player->is_paused()); + + auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + ASSERT_TRUE(player->is_paused()); + + const size_t EXPECTED_BURST_COUNT = 2; + ASSERT_EQ(player->burst(EXPECTED_BURST_COUNT), EXPECTED_BURST_COUNT); + + ASSERT_TRUE(player->is_paused()); + player->resume(); + player_future.get(); + + std::this_thread::sleep_for(200ms); + + EXPECT_EQ(service1_receive_requests.size(), 0); + EXPECT_EQ(service2_receive_requests.size(), 2); +} diff --git a/rosbag2_transport/test/rosbag2_transport/test_play.cpp b/rosbag2_transport/test/rosbag2_transport/test_play.cpp index 7b33abbcdc..3df6ee3b17 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play.cpp @@ -25,12 +25,14 @@ #include "rclcpp/rclcpp.hpp" #include "rosbag2_test_common/subscription_manager.hpp" +#include "rosbag2_test_common/service_manager.hpp" #include "rosbag2_transport/player.hpp" #include "test_msgs/msg/arrays.hpp" #include "test_msgs/msg/basic_types.hpp" #include "test_msgs/message_fixtures.hpp" +#include "test_msgs/srv/basic_types.hpp" #include "rosbag2_transport/qos.hpp" @@ -109,6 +111,104 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics) ElementsAre(40.0f, 2.0f, 0.0f))))); } +TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_services) +{ + auto services_types = std::vector{ + {"test_service1/_service_event", "test_msgs/srv/BasicTypes_Event", "", "", ""}, + {"test_service2/_service_event", "test_msgs/srv/BasicTypes_Event", "", "", ""}, + }; + std::vector> messages = + { + serialize_test_message( + "test_service1/_service_event", 500, get_service_event_message_basic_types()[0]), + serialize_test_message( + "test_service2/_service_event", 600, get_service_event_message_basic_types()[0]), + serialize_test_message( + "test_service1/_service_event", 400, get_service_event_message_basic_types()[1]), + serialize_test_message( + "test_service2/_service_event", 500, get_service_event_message_basic_types()[1]) + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, services_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + std::vector> service1_receive_requests; + std::vector> service2_receive_requests; + + srv_->setup_service("test_service1", service1_receive_requests); + srv_->setup_service("test_service2", service2_receive_requests); + + srv_->run_services(); + + auto player = std::make_shared( + std::move( + reader), storage_options_, play_options_); + player->play(); + + std::this_thread::sleep_for(300ms); + EXPECT_EQ(service1_receive_requests.size(), 2); + EXPECT_EQ(service2_receive_requests.size(), 2); +} + +TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_topics_and_services) +{ + auto topic_msg = get_messages_basic_types()[0]; + topic_msg->int64_value = 1111; + + auto services_types = std::vector{ + {"topic1", "test_msgs/BasicTypes", "", "", ""}, + {"test_service1/_service_event", "test_msgs/srv/BasicTypes_Event", "", "", ""}, + }; + std::vector> messages = + { + serialize_test_message( + "test_service1/_service_event", 500, get_service_event_message_basic_types()[0]), + serialize_test_message( + "topic1", 600, topic_msg), + serialize_test_message( + "test_service1/_service_event", 550, get_service_event_message_basic_types()[1]), + serialize_test_message( + "topic1", 400, topic_msg), + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, services_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + std::vector> service_receive_requests; + srv_->setup_service("test_service1", service_receive_requests); + srv_->run_services(); + + sub_->add_subscription("/topic1", 2); + auto await_received_messages = sub_->spin_subscriptions(); + + auto player = std::make_shared( + std::move( + reader), storage_options_, play_options_); + player->play(); + + await_received_messages.get(); + + auto replayed_topic_msg = sub_->get_received_messages( + "/topic1"); + EXPECT_THAT(replayed_topic_msg, SizeIs(Ge(2u))); + EXPECT_THAT( + replayed_topic_msg, + Each(Pointee(Field(&test_msgs::msg::BasicTypes::int64_value, 1111)))); + + std::this_thread::sleep_for(200ms); + ASSERT_EQ(service_receive_requests.size(), 2); + for (size_t i = 0; i < 2; i++) { + EXPECT_EQ( + service_receive_requests[i]->int32_value, + get_service_event_message_basic_types()[i]->request[0].int32_value); + EXPECT_EQ( + service_receive_requests[i]->int64_value, + get_service_event_message_basic_types()[i]->request[0].int64_value); + } +} + TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_with_unknown_type) { auto primitive_message1 = get_messages_basic_types()[0]; @@ -302,6 +402,266 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics) } } +TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_services) +{ + auto services_types = std::vector{ + {"test_service1/_service_event", "test_msgs/srv/BasicTypes_Event", "", "", ""}, + {"test_service2/_service_event", "test_msgs/srv/BasicTypes_Event", "", "", ""}, + }; + std::vector> messages = + { + serialize_test_message( + "test_service1/_service_event", 500, get_service_event_message_basic_types()[0]), + serialize_test_message( + "test_service2/_service_event", 600, get_service_event_message_basic_types()[0]), + serialize_test_message( + "test_service1/_service_event", 400, get_service_event_message_basic_types()[1]), + serialize_test_message( + "test_service2/_service_event", 500, get_service_event_message_basic_types()[1]) + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, services_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + // Filter allows /test_service2, blocks /test_service1 + { + play_options_.services_to_filter = {"test_service2/_service_event"}; + + srv_.reset(); + srv_ = std::make_shared(); + + std::vector> service1_receive_requests; + std::vector> service2_receive_requests; + srv_->setup_service("test_service1", service1_receive_requests); + srv_->setup_service("test_service2", service2_receive_requests); + + srv_->run_services(); + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, services_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + auto player = std::make_shared( + std::move( + reader), storage_options_, play_options_); + player->play(); + + std::this_thread::sleep_for(250ms); + EXPECT_EQ(service1_receive_requests.size(), 0); + EXPECT_EQ(service2_receive_requests.size(), 2); + } + + // Filter allows /test_service1, blocks /test_service2 + { + play_options_.services_to_filter = {"test_service1/_service_event"}; + + srv_.reset(); + srv_ = std::make_shared(); + std::vector> service1_receive_requests; + std::vector> service2_receive_requests; + srv_->setup_service("test_service1", service1_receive_requests); + srv_->setup_service("test_service2", service2_receive_requests); + + srv_->run_services(); + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, services_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + auto player = std::make_shared( + std::move( + reader), storage_options_, play_options_); + player->play(); + + std::this_thread::sleep_for(250ms); + EXPECT_EQ(service1_receive_requests.size(), 2); + EXPECT_EQ(service2_receive_requests.size(), 0); + } + + // No filter, receive both services + { + play_options_.services_to_filter.clear(); + + srv_.reset(); + srv_ = std::make_shared(); + std::vector> service1_receive_requests; + std::vector> service2_receive_requests; + srv_->setup_service("test_service1", service1_receive_requests); + srv_->setup_service("test_service2", service2_receive_requests); + + srv_->run_services(); + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, services_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + auto player = std::make_shared( + std::move( + reader), storage_options_, play_options_); + player->play(); + + std::this_thread::sleep_for(250ms); + EXPECT_EQ(service1_receive_requests.size(), 2); + EXPECT_EQ(service2_receive_requests.size(), 2); + } +} + +TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics_and_services) +{ + auto all_types = std::vector{ + {"topic1", "test_msgs/BasicTypes", "", "", ""}, + {"topic2", "test_msgs/BasicTypes", "", "", ""}, + {"test_service1/_service_event", "test_msgs/srv/BasicTypes_Event", "", "", ""}, + {"test_service2/_service_event", "test_msgs/srv/BasicTypes_Event", "", "", ""}, + }; + + std::vector> messages = + { + serialize_test_message("topic1", 500, get_messages_basic_types()[0]), + serialize_test_message( + "test_service1/_service_event", + 520, + get_service_event_message_basic_types()[0]), + serialize_test_message("topic2", 520, get_messages_basic_types()[0]), + serialize_test_message( + "test_service2/_service_event", + 550, + get_service_event_message_basic_types()[0]), + }; + + // Filter allows all topics, blocks service test_service2 + { + play_options_.topics_to_filter = {"topic1", "topic2"}; + play_options_.services_to_filter = {"test_service1/_service_event"}; + + sub_.reset(); + sub_ = std::make_shared(); + sub_->add_subscription("/topic1", 1); + sub_->add_subscription("/topic2", 1); + + srv_.reset(); + srv_ = std::make_shared(); + std::vector> service1_receive_requests; + std::vector> service2_receive_requests; + srv_->setup_service("test_service1", service1_receive_requests); + srv_->setup_service("test_service2", service2_receive_requests); + srv_->run_services(); + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, all_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + auto await_received_messages = sub_->spin_subscriptions(); + + auto player = std::make_shared( + std::move( + reader), storage_options_, play_options_); + player->play(); + + await_received_messages.get(); + std::this_thread::sleep_for(300ms); + + // Filter allow all topics + auto replayed_topic1 = sub_->get_received_messages("/topic1"); + EXPECT_THAT(replayed_topic1, SizeIs(1u)); + auto replayed_topic2 = sub_->get_received_messages("/topic2"); + EXPECT_THAT(replayed_topic2, SizeIs(1u)); + + // Filter allow test_service1, block test_service2 + EXPECT_EQ(service1_receive_requests.size(), 1); + EXPECT_EQ(service2_receive_requests.size(), 0); + } + + // Filter allows all services, blocks topic2 + { + play_options_.topics_to_filter = {"topic1"}; + play_options_.services_to_filter = { + "test_service1/_service_event", "test_service2/_service_event"}; + + sub_.reset(); + sub_ = std::make_shared(); + sub_->add_subscription("/topic1", 1); + sub_->add_subscription("/topic2", 0); + + srv_.reset(); + srv_ = std::make_shared(); + std::vector> service1_receive_requests; + std::vector> service2_receive_requests; + srv_->setup_service("test_service1", service1_receive_requests); + srv_->setup_service("test_service2", service2_receive_requests); + srv_->run_services(); + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, all_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + auto await_received_messages = sub_->spin_subscriptions(); + + auto player = std::make_shared( + std::move( + reader), storage_options_, play_options_); + player->play(); + + await_received_messages.get(); + std::this_thread::sleep_for(200ms); + + // Filter allow topic2, block topic1 + auto replayed_topic1 = sub_->get_received_messages("/topic1"); + EXPECT_THAT(replayed_topic1, SizeIs(1u)); + auto replayed_topic2 = sub_->get_received_messages("/topic2"); + EXPECT_THAT(replayed_topic2, SizeIs(0u)); + + // Filter allow all services + EXPECT_EQ(service1_receive_requests.size(), 1); + EXPECT_EQ(service2_receive_requests.size(), 1); + } + + // Filter allows all services and topics + { + play_options_.topics_to_filter = {"topic1", "topic2"}; + play_options_.services_to_filter = { + "test_service1/_service_event", "test_service2/_service_event"}; + + sub_.reset(); + sub_ = std::make_shared(); + sub_->add_subscription("/topic1", 1); + sub_->add_subscription("/topic2", 1); + + srv_.reset(); + srv_ = std::make_shared(); + std::vector> service1_receive_requests; + std::vector> service2_receive_requests; + srv_->setup_service("test_service1", service1_receive_requests); + srv_->setup_service("test_service2", service2_receive_requests); + srv_->run_services(); + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, all_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + auto await_received_messages = sub_->spin_subscriptions(); + + auto player = std::make_shared( + std::move( + reader), storage_options_, play_options_); + player->play(); + + await_received_messages.get(); + std::this_thread::sleep_for(200ms); + + // Filter allow all topics + auto replayed_topic1 = sub_->get_received_messages("/topic1"); + EXPECT_THAT(replayed_topic1, SizeIs(1u)); + auto replayed_topic2 = sub_->get_received_messages("/topic2"); + EXPECT_THAT(replayed_topic2, SizeIs(1u)); + + // Filter allow all services + EXPECT_EQ(service1_receive_requests.size(), 1); + EXPECT_EQ(service2_receive_requests.size(), 1); + } +} + TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics_with_unknown_type) { auto primitive_message1 = get_messages_basic_types()[0]; diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp index c35ce0e1b6..1358043059 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp @@ -69,7 +69,7 @@ TEST_F(RosBag2PlayTestFixture, play_bag_file_twice) { auto await_received_messages = sub_->spin_subscriptions(); rosbag2_transport::PlayOptions play_options = { - read_ahead_queue_size, "", rate, {}, {}, {}, {}, loop_playback, {}, + read_ahead_queue_size, "", rate, {}, {}, {}, {}, {}, {}, loop_playback, {}, clock_publish_frequency, clock_publish_per_topic, clock_trigger_topics, delay}; auto player = std::make_shared( std::move( @@ -131,7 +131,7 @@ TEST_F(RosBag2PlayTestFixture, messages_played_in_loop) { auto await_received_messages = sub_->spin_subscriptions(); rosbag2_transport::PlayOptions play_options{read_ahead_queue_size, "", rate, {}, {}, - {}, {}, loop_playback, {}, clock_publish_frequency, clock_publish_per_topic, + {}, {}, {}, {}, loop_playback, {}, clock_publish_frequency, clock_publish_per_topic, clock_trigger_topics, delay}; auto player = std::make_shared( std::move( diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index fc1a7ce220..59829f6754 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -23,7 +23,7 @@ #include "rclcpp/rclcpp.hpp" #include "rosbag2_test_common/publication_manager.hpp" -#include "rosbag2_test_common/service_client_manager.hpp" +#include "rosbag2_test_common/client_manager.hpp" #include "rosbag2_test_common/wait_for.hpp" #include "rosbag2_transport/recorder.hpp" diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp index a5b3933dba..be76ac4ce9 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp @@ -26,7 +26,7 @@ #include "rosbag2_test_common/publication_manager.hpp" #include "rosbag2_test_common/wait_for.hpp" -#include "rosbag2_test_common/service_client_manager.hpp" +#include "rosbag2_test_common/client_manager.hpp" #include "rosbag2_transport/recorder.hpp" @@ -91,12 +91,12 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_services_are_recorded) { - auto service_client_manager_1 = - std::make_shared>( + auto client_manager_1 = + std::make_shared>( "test_service_1"); - auto service_client_manager_2 = - std::make_shared>( + auto client_manager_2 = + std::make_shared>( "test_service_2"); rosbag2_transport::RecordOptions record_options = @@ -107,14 +107,14 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_services_a start_async_spin(recorder); - ASSERT_TRUE(service_client_manager_1->check_service_ready()); + ASSERT_TRUE(client_manager_1->check_service_ready()); - ASSERT_TRUE(service_client_manager_2->check_service_ready()); + ASSERT_TRUE(client_manager_2->check_service_ready()); // By default, only client introspection is enable. // For one request, service event topic get 2 messages. - ASSERT_TRUE(service_client_manager_1->send_request()); - ASSERT_TRUE(service_client_manager_2->send_request()); + ASSERT_TRUE(client_manager_1->send_request()); + ASSERT_TRUE(client_manager_2->send_request()); std::this_thread::sleep_for(std::chrono::milliseconds(100)); @@ -129,8 +129,8 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_services_a TEST_F(RecordIntegrationTestFixture, published_messages_from_topic_and_service_are_recorded) { - auto service_client_manager_1 = - std::make_shared>( + auto client_manager_1 = + std::make_shared>( "test_service"); auto string_message = get_messages_strings()[0]; @@ -150,13 +150,13 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_topic_and_service_a ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); - ASSERT_TRUE(service_client_manager_1->check_service_ready()); + ASSERT_TRUE(client_manager_1->check_service_ready()); pub_manager.run_publishers(); // By default, only client introspection is enable. // For one request, service event topic get 2 messages. - ASSERT_TRUE(service_client_manager_1->send_request()); + ASSERT_TRUE(client_manager_1->send_request()); std::this_thread::sleep_for(std::chrono::milliseconds(100)); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp index 52c21b1060..7af1a6a815 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp @@ -24,7 +24,7 @@ #include "rosbag2_test_common/publication_manager.hpp" #include "rosbag2_test_common/wait_for.hpp" -#include "rosbag2_test_common/service_client_manager.hpp" +#include "rosbag2_test_common/client_manager.hpp" #include "rosbag2_transport/recorder.hpp" @@ -201,19 +201,19 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_service_recording) record_options.exclude_services = services_regex_to_exclude; auto service_manager_v1 = - std::make_shared>(v1); + std::make_shared>(v1); auto service_manager_v2 = - std::make_shared>(v2); + std::make_shared>(v2); auto service_manager_e1 = - std::make_shared>(e1); + std::make_shared>(e1); auto service_manager_b1 = - std::make_shared>(b1); + std::make_shared>(b1); auto service_manager_b2 = - std::make_shared>(b2); + std::make_shared>(b2); auto recorder = std::make_shared( std::move(writer_), storage_options_, record_options);