Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rcl_publisher_wait_for_all_acked support #1661

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions rclcpp/include/rclcpp/publisher_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <rmw/error_handling.h>
#include <rmw/rmw.h>

#include <chrono>
#include <functional>
#include <iostream>
#include <memory>
Expand Down Expand Up @@ -203,6 +204,69 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
std::vector<rclcpp::NetworkFlowEndpoint>
get_network_flow_endpoints() const;

/// Wait until all published message data is acknowledged or until the specified timeout elapses.
/**
* This function waits until all published message data were acknowledged by all subscribers or
* timeout.
*
* timeout must be less then std::chrono::nanoseconds::max().
* If the timeout is negative then this function will block indefinitely until all published
* message data were acknowledged.
* If the timeout is 0 then this function will be non-blocking; checking all published message
* data were acknowledged (If acknowledged, return true. Otherwise, return false),
* but not waiting.
* If the timeout is greater than 0 then this function will return after that period of time has
* elapsed (return false) or all published message data were acknowledged (return true).
*
* \param[in] timeout the duration to wait for all published message data were acknowledged.
* \return `true` if all published message data were acknowledged before timeout, otherwise
* `false`.
* \throws rclcpp::exceptions::RCLError if middleware doesn't support or internal error occurs
*/
template<typename DurationRepT = int64_t, typename DurationT = std::milli>
bool
wait_for_all_acked(
std::chrono::duration<DurationRepT, DurationT> timeout =
std::chrono::duration<DurationRepT, DurationT>(-1)) const
{
// Casting to a double representation might lose precision and allow the check below to succeed
// but the actual cast to nanoseconds fail. Using 1 DurationT worth of nanoseconds less than max
constexpr auto maximum_safe_cast_ns =
std::chrono::nanoseconds::max() - std::chrono::duration<DurationRepT, DurationT>(1);

// If period is greater than nanoseconds::max(), the duration_cast to nanoseconds will overflow
// a signed integer, which is undefined behavior. Checking whether any std::chrono::duration is
// greater than nanoseconds::max() is a difficult general problem. This is a more conservative
// version of Howard Hinnant's (the <chrono> guy>) response here:
// https://stackoverflow.com/a/44637334/2089061
// However, this doesn't solve the issue for all possible duration types of period.
// Follow-up issue: https://github.com/ros2/rclcpp/issues/1177
constexpr auto ns_max_as_double =
std::chrono::duration_cast<std::chrono::duration<double, std::chrono::nanoseconds::period>>(
maximum_safe_cast_ns);
if (timeout > ns_max_as_double) {
throw std::invalid_argument{
"timeout must be less than std::chrono::nanoseconds::max()"};
}

rcl_duration_value_t rcl_timeout;

if (timeout < std::chrono::duration<DurationRepT, DurationT>::zero()) {
rcl_timeout = -1;
} else {
rcl_timeout = (std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)).count();
}

rcl_ret_t ret = rcl_publisher_wait_for_all_acked(publisher_handle_.get(), rcl_timeout);
if (ret == RCL_RET_OK) {
return true;
} else if (ret == RCL_RET_TIMEOUT) {
return false;
} else {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
}

protected:
template<typename EventCallbackT>
void
Expand Down
45 changes: 44 additions & 1 deletion rclcpp/test/rclcpp/test_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@

#include <gtest/gtest.h>

#include <string>
#include <chrono>
#include <memory>
#include <string>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -536,3 +537,45 @@ TEST_F(TestPublisher, get_network_flow_endpoints_errors) {
EXPECT_NO_THROW(publisher->get_network_flow_endpoints());
}
}

TEST_F(TestPublisher, check_wait_for_all_acked_return) {
initialize();
const rclcpp::QoS publisher_qos(1);
auto publisher = node->create_publisher<test_msgs::msg::Empty>("topic", publisher_qos);

{
// Using 'self' instead of 'lib:rclcpp' because `rcl_publisher_wait_for_all_acked` is entirely
// defined in a header
auto mock = mocking_utils::patch_and_return(
"self", rcl_publisher_wait_for_all_acked, RCL_RET_OK);
EXPECT_TRUE(publisher->wait_for_all_acked(std::chrono::milliseconds(-1)));
}

{
// Using 'self' instead of 'lib:rclcpp' because `rcl_publisher_wait_for_all_acked` is entirely
// defined in a header
auto mock = mocking_utils::patch_and_return(
"self", rcl_publisher_wait_for_all_acked, RCL_RET_TIMEOUT);
EXPECT_FALSE(publisher->wait_for_all_acked(std::chrono::milliseconds(-1)));
}

{
// Using 'self' instead of 'lib:rclcpp' because `rcl_publisher_wait_for_all_acked` is entirely
// defined in a header
auto mock = mocking_utils::patch_and_return(
"self", rcl_publisher_wait_for_all_acked, RCL_RET_UNSUPPORTED);
EXPECT_THROW(
publisher->wait_for_all_acked(std::chrono::milliseconds(-1)),
rclcpp::exceptions::RCLError);
}

{
// Using 'self' instead of 'lib:rclcpp' because `rcl_publisher_wait_for_all_acked` is entirely
// defined in a header
auto mock = mocking_utils::patch_and_return(
"self", rcl_publisher_wait_for_all_acked, RCL_RET_ERROR);
EXPECT_THROW(
publisher->wait_for_all_acked(std::chrono::milliseconds(-1)),
rclcpp::exceptions::RCLError);
}
}