Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Memory Leak in Actions Using EventsExecutor #2661

Open
wants to merge 3 commits into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions rclcpp_action/include/rclcpp_action/create_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/node_interfaces/node_clock_interface.hpp"
#include "rclcpp/node_interfaces/node_logging_interface.hpp"
#include "rclcpp/node_interfaces/node_timers_interface.hpp"
#include "rclcpp/node_interfaces/node_waitables_interface.hpp"

#include "rclcpp_action/server.hpp"
Expand All @@ -42,6 +43,7 @@ namespace rclcpp_action
* \param[in] node_base_interface The node base interface of the corresponding node.
* \param[in] node_clock_interface The node clock interface of the corresponding node.
* \param[in] node_logging_interface The node logging interface of the corresponding node.
* \param[in] node_timers_interface The node timers interface of the corresponding node.
* \param[in] node_waitables_interface The node waitables interface of the corresponding node.
* \param[in] name The action name.
* \param[in] handle_goal A callback that decides if a goal should be accepted or rejected.
Expand All @@ -59,6 +61,7 @@ create_server(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base_interface,
rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock_interface,
rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging_interface,
rclcpp::node_interfaces::NodeTimersInterface::SharedPtr node_timers_interface,
rclcpp::node_interfaces::NodeWaitablesInterface::SharedPtr node_waitables_interface,
const std::string & name,
typename Server<ActionT>::GoalCallback handle_goal,
Expand Down Expand Up @@ -100,6 +103,7 @@ create_server(
node_base_interface,
node_clock_interface,
node_logging_interface,
node_timers_interface,
name,
options,
handle_goal,
Expand Down Expand Up @@ -142,6 +146,7 @@ create_server(
node->get_node_base_interface(),
node->get_node_clock_interface(),
node->get_node_logging_interface(),
node->get_node_timers_interface(),
node->get_node_waitables_interface(),
name,
handle_goal,
Expand Down
40 changes: 40 additions & 0 deletions rclcpp_action/include/rclcpp_action/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
#ifndef RCLCPP_ACTION__SERVER_HPP_
#define RCLCPP_ACTION__SERVER_HPP_

#include <chrono>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <utility>
#include <set>

#include "action_msgs/srv/cancel_goal.hpp"
#include "rcl/event_callback.h"
Expand All @@ -30,6 +32,8 @@
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/node_interfaces/node_clock_interface.hpp"
#include "rclcpp/node_interfaces/node_logging_interface.hpp"
#include "rclcpp/node_interfaces/node_timers_interface.hpp"
#include "rclcpp/timer.hpp"
#include "rclcpp/waitable.hpp"

#include "rclcpp_action/visibility_control.hpp"
Expand Down Expand Up @@ -173,6 +177,31 @@ class ServerBase : public rclcpp::Waitable
void
set_on_ready_callback(std::function<void(size_t, int)> callback) override;

/// Initialize the goal expiration timer.
/// \internal
RCLCPP_ACTION_PUBLIC
void
initialize_expire_goal_timer();

/// Set timer to trigger on the goal expiration time
/// \internal
RCLCPP_ACTION_PUBLIC
void
set_expire_goal_timer();

/// Timer callback to handle goal expiration.
/// \internal
RCLCPP_ACTION_PUBLIC
void
expire_goal_timer_callback();

/// Calculates the time until the next goal expiration and updates the timer period.
/// Cancels the timer if no goals remain or invokes the callback directly if expiration is immediate.
/// \internal
RCLCPP_ACTION_PUBLIC
void
reset_timer_to_next_goal();

/// Unset the callback to be called whenever the waitable becomes ready.
RCLCPP_ACTION_PUBLIC
void
Expand All @@ -187,6 +216,7 @@ class ServerBase : public rclcpp::Waitable
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base,
rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock,
rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging,
rclcpp::node_interfaces::NodeTimersInterface::SharedPtr node_timers,
const std::string & name,
const rosidl_action_type_support_t * type_support,
const rcl_action_server_options_t & options);
Expand Down Expand Up @@ -330,6 +360,14 @@ class ServerBase : public rclcpp::Waitable
// Storage for std::function callbacks to keep them in scope
std::unordered_map<EntityType, std::function<void(size_t)>> entity_type_to_on_ready_callback_;

// Store elements required to create goal expiration timers
std::mutex goal_entry_times_mutex_;
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base_;
rclcpp::node_interfaces::NodeTimersInterface::SharedPtr node_timers_;
std::set<std::chrono::steady_clock::time_point> goal_entry_times_;
rclcpp::TimerBase::SharedPtr expire_goal_timer_;
std::chrono::nanoseconds goal_expire_timeout_;

/// Set a callback to be called when the specified entity is ready
RCLCPP_ACTION_PUBLIC
void
Expand Down Expand Up @@ -396,6 +434,7 @@ class Server : public ServerBase, public std::enable_shared_from_this<Server<Act
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base,
rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock,
rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging,
rclcpp::node_interfaces::NodeTimersInterface::SharedPtr node_timers,
const std::string & name,
const rcl_action_server_options_t & options,
GoalCallback handle_goal,
Expand All @@ -406,6 +445,7 @@ class Server : public ServerBase, public std::enable_shared_from_this<Server<Act
node_base,
node_clock,
node_logging,
node_timers,
name,
rosidl_typesupport_cpp::get_action_type_support_handle<ActionT>(),
options),
Expand Down
106 changes: 105 additions & 1 deletion rclcpp_action/src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <chrono>
#include <memory>
#include <mutex>
#include <string>
Expand All @@ -29,6 +30,7 @@

#include "action_msgs/msg/goal_status_array.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/create_timer.hpp"
#include "rclcpp_action/server.hpp"

using rclcpp_action::ServerBase;
Expand Down Expand Up @@ -117,12 +119,15 @@ ServerBase::ServerBase(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base,
rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock,
rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging,
rclcpp::node_interfaces::NodeTimersInterface::SharedPtr node_timers,
const std::string & name,
const rosidl_action_type_support_t * type_support,
const rcl_action_server_options_t & options
)
: pimpl_(new ServerBaseImpl(
node_clock->get_clock(), node_logging->get_logger().get_child("rclcpp_action")))
node_clock->get_clock(), node_logging->get_logger().get_child("rclcpp_action"))),
node_base_(node_base), node_timers_(node_timers),
goal_expire_timeout_(std::chrono::nanoseconds(options.result_timeout.nanoseconds))
{
auto deleter = [node_base](rcl_action_server_t * ptr)
{
Expand Down Expand Up @@ -704,6 +709,100 @@ ServerBase::publish_status()
}
}

void
ServerBase::initialize_expire_goal_timer()
{
expire_goal_timer_ = rclcpp::create_wall_timer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a slight behavior difference to the rcl implementation here. RCL uses the clock of the node.
Perhaps use create_timer with the node clock ?

std::chrono::nanoseconds(goal_expire_timeout_),
std::bind(&ServerBase::expire_goal_timer_callback, this),
nullptr,
node_base_.get(),
node_timers_.get()
);

expire_goal_timer_->cancel();
}

void
ServerBase::expire_goal_timer_callback()
{
{
std::lock_guard<std::mutex> lock(goal_entry_times_mutex_);
auto now = std::chrono::steady_clock::now();
auto it = goal_entry_times_.begin();

// Remove expired time entries for goals.
while (it != goal_entry_times_.end() && *it + goal_expire_timeout_ <= now) {
it = goal_entry_times_.erase(it);
}
}

execute_check_expired_goals();
reset_timer_to_next_goal();
}

void
ServerBase::reset_timer_to_next_goal()
{
std::lock_guard<std::mutex> lock(goal_entry_times_mutex_);

// If no more goals, cancel the timer.
if (goal_entry_times_.empty()) {
expire_goal_timer_->cancel();
return;
}

auto now = std::chrono::steady_clock::now();
auto earliest_entry_time = *goal_entry_times_.begin();
auto expiration_time = earliest_entry_time + goal_expire_timeout_;

// Calculate time until expiration of the next goal.
auto time_until_expiration = expiration_time > now
? expiration_time - now
: std::chrono::nanoseconds(0);

// If the time is zero, directly call the callback.
if (time_until_expiration.count() == 0) {
expire_goal_timer_->cancel();
expire_goal_timer_callback();
return;
}

// Update the timer to the calculated next expiration time.
rcl_timer_t * rcl_timer_handle = const_cast<rcl_timer_t *>(expire_goal_timer_->get_timer_handle().get());
int64_t old_period;
rcl_ret_t ret = rcl_timer_exchange_period(rcl_timer_handle, time_until_expiration.count(), &old_period);

if (ret != RCL_RET_OK) {
RCLCPP_ERROR(pimpl_->logger_, "Failed to update expire timer period");
}

// Reset timer to count down from the new period.
expire_goal_timer_->reset();
}

void
ServerBase::set_expire_goal_timer()
{
{
std::lock_guard<std::mutex> lock(goal_entry_times_mutex_);
goal_entry_times_.insert(std::chrono::steady_clock::now());
}

// Only reset timer if it is canceled; otherwise, let the current timer run its full duration.
if (expire_goal_timer_->is_canceled()) {
rcl_timer_t * rcl_timer_handle = const_cast<rcl_timer_t *>(expire_goal_timer_->get_timer_handle().get());
int64_t old_period;
rcl_ret_t ret = rcl_timer_exchange_period(rcl_timer_handle, goal_expire_timeout_.count(), &old_period);

if (ret != RCL_RET_OK) {
RCLCPP_ERROR(pimpl_->logger_, "Failed to reset timer period to goal_expire_timeout_");
}

expire_goal_timer_->reset();
}
}

void
ServerBase::publish_result(const GoalUUID & uuid, std::shared_ptr<void> result_msg)
{
Expand All @@ -720,6 +819,10 @@ ServerBase::publish_result(const GoalUUID & uuid, std::shared_ptr<void> result_m
throw std::runtime_error("Asked to publish result for goal that does not exist");
}

if (on_ready_callback_set_) {
set_expire_goal_timer();
}

{
/**
* NOTE: There is a potential deadlock issue if both unordered_map_mutex_ and
Expand Down Expand Up @@ -788,6 +891,7 @@ ServerBase::set_on_ready_callback(std::function<void(size_t, int)> callback)
set_callback_to_entity(EntityType::GoalService, callback);
set_callback_to_entity(EntityType::ResultService, callback);
set_callback_to_entity(EntityType::CancelService, callback);
initialize_expire_goal_timer();
}

void
Expand Down