From 802bfc2c7442e7d285cd9ac3aaa88e3358683b54 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Tue, 21 Dec 2021 12:36:01 +0800 Subject: [PATCH] Add wait_for_all_acked support (#1662) * Add wait_for_all_acked support Signed-off-by: Barry Xu * Update the description of wait_for_all_acked Signed-off-by: Barry Xu * Use rcpputils::convert_to_nanoseconds() for time conversion Signed-off-by: Barry Xu --- rclcpp/include/rclcpp/publisher_base.hpp | 42 +++++++++++++ rclcpp/test/rclcpp/test_publisher.cpp | 79 +++++++++++++++++++++++- 2 files changed, 120 insertions(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/publisher_base.hpp b/rclcpp/include/rclcpp/publisher_base.hpp index c67a0439e0..01bdbb585f 100644 --- a/rclcpp/include/rclcpp/publisher_base.hpp +++ b/rclcpp/include/rclcpp/publisher_base.hpp @@ -18,6 +18,7 @@ #include #include +#include #include #include #include @@ -33,6 +34,7 @@ #include "rclcpp/qos_event.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" +#include "rcpputils/time.hpp" namespace rclcpp { @@ -203,6 +205,46 @@ class PublisherBase : public std::enable_shared_from_this std::vector get_network_flow_endpoints() const; + /// Wait until all published messages are acknowledged or until the specified timeout elapses. + /** + * This method waits until all published messages are acknowledged by all matching + * subscriptions or the given timeout elapses. + * + * If the timeout is negative then this method will block indefinitely until all published + * messages are acknowledged. + * If the timeout is zero then this method will not block, it will check if all published + * messages are acknowledged and return immediately. + * If the timeout is greater than zero, this method will wait until all published messages are + * acknowledged or the timeout elapses. + * + * This method only waits for acknowledgments if the publisher's QoS profile is RELIABLE. + * Otherwise this method will immediately return `true`. + * + * \param[in] timeout the duration to wait for all published messages to be acknowledged. + * \return `true` if all published messages were acknowledged before the given timeout + * elapsed, otherwise `false`. + * \throws rclcpp::exceptions::RCLError if middleware doesn't support or internal error occurs + * \throws std::invalid_argument if timeout is greater than std::chrono::nanoseconds::max() or + * less than std::chrono::nanoseconds::min() + */ + template + bool + wait_for_all_acked( + std::chrono::duration timeout = + std::chrono::duration(-1)) const + { + rcl_duration_value_t rcl_timeout = rcpputils::convert_to_nanoseconds(timeout).count(); + + rcl_ret_t ret = rcl_publisher_wait_for_all_acked(publisher_handle_.get(), rcl_timeout); + if (ret == RCL_RET_OK) { + return true; + } else if (ret == RCL_RET_TIMEOUT) { + return false; + } else { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + } + protected: template void diff --git a/rclcpp/test/rclcpp/test_publisher.cpp b/rclcpp/test/rclcpp/test_publisher.cpp index bec8bbb17a..3be64e9647 100644 --- a/rclcpp/test/rclcpp/test_publisher.cpp +++ b/rclcpp/test/rclcpp/test_publisher.cpp @@ -14,8 +14,9 @@ #include -#include +#include #include +#include #include #include @@ -28,6 +29,7 @@ #include "../utils/rclcpp_gtest_macros.hpp" #include "test_msgs/msg/empty.hpp" +#include "test_msgs/msg/strings.hpp" class TestPublisher : public ::testing::Test { @@ -536,3 +538,78 @@ TEST_F(TestPublisher, get_network_flow_endpoints_errors) { EXPECT_NO_THROW(publisher->get_network_flow_endpoints()); } } + +TEST_F(TestPublisher, check_wait_for_all_acked_return) { + initialize(); + const rclcpp::QoS publisher_qos(1); + auto publisher = node->create_publisher("topic", publisher_qos); + + { + // Using 'self' instead of 'lib:rclcpp' because `rcl_publisher_wait_for_all_acked` is entirely + // defined in a header + auto mock = mocking_utils::patch_and_return( + "self", rcl_publisher_wait_for_all_acked, RCL_RET_OK); + EXPECT_TRUE(publisher->wait_for_all_acked(std::chrono::milliseconds(-1))); + } + + { + // Using 'self' instead of 'lib:rclcpp' because `rcl_publisher_wait_for_all_acked` is entirely + // defined in a header + auto mock = mocking_utils::patch_and_return( + "self", rcl_publisher_wait_for_all_acked, RCL_RET_TIMEOUT); + EXPECT_FALSE(publisher->wait_for_all_acked(std::chrono::milliseconds(-1))); + } + + { + // Using 'self' instead of 'lib:rclcpp' because `rcl_publisher_wait_for_all_acked` is entirely + // defined in a header + auto mock = mocking_utils::patch_and_return( + "self", rcl_publisher_wait_for_all_acked, RCL_RET_UNSUPPORTED); + EXPECT_THROW( + publisher->wait_for_all_acked(std::chrono::milliseconds(-1)), + rclcpp::exceptions::RCLError); + } + + { + // Using 'self' instead of 'lib:rclcpp' because `rcl_publisher_wait_for_all_acked` is entirely + // defined in a header + auto mock = mocking_utils::patch_and_return( + "self", rcl_publisher_wait_for_all_acked, RCL_RET_ERROR); + EXPECT_THROW( + publisher->wait_for_all_acked(std::chrono::milliseconds(-1)), + rclcpp::exceptions::RCLError); + } +} + +class TestPublisherWaitForAllAcked + : public TestPublisher, public ::testing::WithParamInterface> +{ +}; + +TEST_P(TestPublisherWaitForAllAcked, check_wait_for_all_acked_with_QosPolicy) { + initialize(); + + auto do_nothing = [](std::shared_ptr) {}; + auto pub = node->create_publisher("topic", std::get<0>(GetParam())); + auto sub = node->create_subscription( + "topic", + std::get<1>(GetParam()), + do_nothing); + + auto msg = std::make_shared(); + for (int i = 0; i < 20; i++) { + ASSERT_NO_THROW(pub->publish(*msg)); + } + EXPECT_TRUE(pub->wait_for_all_acked(std::chrono::milliseconds(500))); +} + +INSTANTIATE_TEST_SUITE_P( + TestWaitForAllAckedWithParm, + TestPublisherWaitForAllAcked, + ::testing::Values( + std::pair( + rclcpp::QoS(1).reliable(), rclcpp::QoS(1).reliable()), + std::pair( + rclcpp::QoS(1).best_effort(), rclcpp::QoS(1).best_effort()), + std::pair( + rclcpp::QoS(1).reliable(), rclcpp::QoS(1).best_effort())));