Skip to content

Commit

Permalink
fix: Make passing of timer data thread safe
Browse files Browse the repository at this point in the history
Signed-off-by: Janosch Machowinski <J.Machowinski@cellumation.com>
  • Loading branch information
Janosch Machowinski authored and Janosch Machowinski committed Apr 10, 2024
1 parent ca129f5 commit 3328d38
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 50 deletions.
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ class Executor
*/
RCLCPP_PUBLIC
static void
execute_timer(rclcpp::TimerBase::SharedPtr timer);
execute_timer(rclcpp::TimerBase::SharedPtr timer, const std::shared_ptr<void> & dataPtr);

/// Run service server executable.
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#ifndef RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_EVENT_TYPES_HPP_
#define RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_EVENT_TYPES_HPP_

#include <memory>

namespace rclcpp
{
namespace experimental
Expand All @@ -34,6 +36,7 @@ enum ExecutorEventType
struct ExecutorEvent
{
const void * entity_key;
std::shared_ptr<void> data;
int waitable_data;
ExecutorEventType type;
size_t num_events;
Expand Down
9 changes: 6 additions & 3 deletions rclcpp/include/rclcpp/experimental/timers_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ class TimersManager
RCLCPP_PUBLIC
TimersManager(
std::shared_ptr<rclcpp::Context> context,
std::function<void(const rclcpp::TimerBase *)> on_ready_callback = nullptr);
std::function<void(const rclcpp::TimerBase *,
const std::shared_ptr<void> &)> on_ready_callback = nullptr);

/**
* @brief Destruct the TimersManager object making sure to stop thread and release memory.
Expand Down Expand Up @@ -164,9 +165,10 @@ class TimersManager
* the TimersManager on_ready_callback was passed during construction.
*
* @param timer_id the timer ID of the timer to execute
* @param data internal data of the timer
*/
RCLCPP_PUBLIC
void execute_ready_timer(const rclcpp::TimerBase * timer_id);
void execute_ready_timer(const rclcpp::TimerBase * timer_id, const std::shared_ptr<void> & data);

/**
* @brief Get the amount of time before the next timer triggers.
Expand Down Expand Up @@ -529,7 +531,8 @@ class TimersManager
void execute_ready_timers_unsafe();

// Callback to be called when timer is ready
std::function<void(const rclcpp::TimerBase *)> on_ready_callback_;
std::function<void(const rclcpp::TimerBase *,
const std::shared_ptr<void> &)> on_ready_callback_ = nullptr;

// Thread used to run the timers execution task
std::thread timers_thread_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,8 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
++it;
continue;
}
if (!timer->call()) {
auto data = timer->call();
if (!data) {
// timer was cancelled, skip it.
++it;
continue;
Expand All @@ -379,6 +380,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
any_exec.timer = timer;
any_exec.callback_group = group;
any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
any_exec.data = data;
timer_handles_.erase(it);
return;
}
Expand Down
44 changes: 25 additions & 19 deletions rclcpp/include/rclcpp/timer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <atomic>
#include <chrono>
#include <functional>
#include <optional>
#include <memory>
#include <sstream>
#include <thread>
Expand All @@ -43,6 +44,12 @@
namespace rclcpp
{

struct TimerInfo
{
Time expected_call_time;
Time actual_call_time;
};

class TimerBase
{
public:
Expand Down Expand Up @@ -101,16 +108,20 @@ class TimerBase
* The multithreaded executor takes advantage of this to avoid scheduling
* the callback multiple times.
*
* \return `true` if the callback should be executed, `false` if the timer was canceled.
* \return a valid shared_ptr if the callback should be executed,
* an invalid shared_ptr (nullptr) if the timer was canceled.
*/
RCLCPP_PUBLIC
virtual bool
virtual std::shared_ptr<void>
call() = 0;

/// Call the callback function when the timer signal is emitted.
/**
* \param[in] data the pointer returned by the call function
*/
RCLCPP_PUBLIC
virtual void
execute_callback() = 0;
execute_callback(const std::shared_ptr<void> & data) = 0;

RCLCPP_PUBLIC
std::shared_ptr<const rcl_timer_t>
Expand Down Expand Up @@ -198,12 +209,6 @@ class TimerBase
set_on_reset_callback(rcl_event_callback_t callback, const void * user_data);
};

struct TimerInfo
{
Time expected_call_time;
Time actual_call_time;
};

using VoidCallbackType = std::function<void ()>;
using TimerCallbackType = std::function<void (TimerBase &)>;
using TimerInfoCallbackType = std::function<void (const TimerInfo &)>;
Expand Down Expand Up @@ -263,27 +268,28 @@ class GenericTimer : public TimerBase
* \sa rclcpp::TimerBase::call
* \throws std::runtime_error if it failed to notify timer that callback will occurr
*/
bool
std::shared_ptr<void>
call() override
{
rcl_timer_call_info_t timer_call_info_;
rcl_ret_t ret = rcl_timer_call_with_info(timer_handle_.get(), &timer_call_info_);
if (ret == RCL_RET_TIMER_CANCELED) {
return false;
return nullptr;
}
if (ret != RCL_RET_OK) {
throw std::runtime_error("Failed to notify timer that callback occurred");
}
return true;
return std::make_shared<rcl_timer_call_info_t>(timer_call_info_);
}

/**
* \sa rclcpp::TimerBase::execute_callback
*/
void
execute_callback() override
execute_callback(const std::shared_ptr<void> & data) override
{
TRACETOOLS_TRACEPOINT(callback_start, reinterpret_cast<const void *>(&callback_), false);
execute_callback_delegate<>();
execute_callback_delegate<>(*static_cast<rcl_timer_call_info_t *>(data.get()));
TRACETOOLS_TRACEPOINT(callback_end, reinterpret_cast<const void *>(&callback_));
}

Expand All @@ -295,7 +301,7 @@ class GenericTimer : public TimerBase
>::type * = nullptr
>
void
execute_callback_delegate()
execute_callback_delegate(const rcl_timer_call_info_t &)
{
callback_();
}
Expand All @@ -307,7 +313,7 @@ class GenericTimer : public TimerBase
>::type * = nullptr
>
void
execute_callback_delegate()
execute_callback_delegate(const rcl_timer_call_info_t &)
{
callback_(*this);
}
Expand All @@ -320,7 +326,7 @@ class GenericTimer : public TimerBase
>::type * = nullptr
>
void
execute_callback_delegate()
execute_callback_delegate(const rcl_timer_call_info_t & timer_call_info_)
{
const TimerInfo info{Time{timer_call_info_.expected_call_time, clock_->get_clock_type()},
Time{timer_call_info_.actual_call_time, clock_->get_clock_type()}};
Expand All @@ -339,14 +345,14 @@ class GenericTimer : public TimerBase
RCLCPP_DISABLE_COPY(GenericTimer)

FunctorT callback_;
rcl_timer_call_info_t timer_call_info_;
};

template<
typename FunctorT,
typename std::enable_if<
rclcpp::function_traits::same_arguments<FunctorT, VoidCallbackType>::value ||
rclcpp::function_traits::same_arguments<FunctorT, TimerCallbackType>::value
rclcpp::function_traits::same_arguments<FunctorT, TimerCallbackType>::value ||
rclcpp::function_traits::same_arguments<FunctorT, TimerInfoCallbackType>::value
>::type * = nullptr
>
class WallTimer : public GenericTimer<FunctorT>
Expand Down
12 changes: 8 additions & 4 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
TRACETOOLS_TRACEPOINT(
rclcpp_executor_execute,
static_cast<const void *>(any_exec.timer->get_timer_handle().get()));
execute_timer(any_exec.timer);
execute_timer(any_exec.timer, any_exec.data);
}
if (any_exec.subscription) {
TRACETOOLS_TRACEPOINT(
Expand Down Expand Up @@ -547,9 +547,9 @@ Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
}

void
Executor::execute_timer(rclcpp::TimerBase::SharedPtr timer)
Executor::execute_timer(rclcpp::TimerBase::SharedPtr timer, const std::shared_ptr<void> & dataPtr)
{
timer->execute_callback();
timer->execute_callback(dataPtr);
}

void
Expand Down Expand Up @@ -690,6 +690,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
if (entity_iter != current_collection_.timers.end()) {
auto callback_group = entity_iter->second.callback_group.lock();
if (callback_group && !callback_group->can_be_taken_from()) {
current_timer_index++;
continue;
}
// At this point the timer is either ready for execution or was perhaps
Expand All @@ -698,14 +699,17 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
// it from the wait result.
wait_result_->clear_timer_with_index(current_timer_index);
// Check that the timer should be called still, i.e. it wasn't canceled.
if (!timer->call()) {
any_executable.data = timer->call();
if (!any_executable.data) {
current_timer_index++;
continue;
}
any_executable.timer = timer;
any_executable.callback_group = callback_group;
valid_executable = true;
break;
}
current_timer_index++;
}
}

Expand Down
12 changes: 8 additions & 4 deletions rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,15 @@ bool StaticSingleThreadedExecutor::execute_ready_executables(
auto entity_iter = collection.timers.find(timer->get_timer_handle().get());
if (entity_iter != collection.timers.end()) {
wait_result.clear_timer_with_index(current_timer_index);
if (timer->call()) {
execute_timer(timer);
any_ready_executable = true;
if (spin_once) {return any_ready_executable;}
auto data = timer->call();
if (!data) {
// someone canceled the timer between is_ready and call
continue;
}

execute_timer(std::move(timer), data);
any_ready_executable = true;
if (spin_once) {return any_ready_executable;}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ EventsExecutor::EventsExecutor(
// The timers manager can be used either to only track timers (in this case an expired
// timer will generate an executor event and then it will be executed by the executor thread)
// or it can also take care of executing expired timers in its dedicated thread.
std::function<void(const rclcpp::TimerBase *)> timer_on_ready_cb = nullptr;
std::function<void(const rclcpp::TimerBase *,
const std::shared_ptr<void> &)> timer_on_ready_cb = nullptr;
if (!execute_timers_separate_thread) {
timer_on_ready_cb = [this](const rclcpp::TimerBase * timer_id) {
ExecutorEvent event = {timer_id, -1, ExecutorEventType::TIMER_EVENT, 1};
timer_on_ready_cb =
[this](const rclcpp::TimerBase * timer_id, const std::shared_ptr<void> & data) {
ExecutorEvent event = {timer_id, data, -1, ExecutorEventType::TIMER_EVENT, 1};
this->events_queue_->enqueue(event);
};
}
Expand Down Expand Up @@ -88,7 +90,7 @@ EventsExecutor::EventsExecutor(
}

ExecutorEvent event =
{notify_waitable_entity_id, waitable_data, ExecutorEventType::WAITABLE_EVENT, 1};
{notify_waitable_entity_id, nullptr, waitable_data, ExecutorEventType::WAITABLE_EVENT, 1};
this->events_queue_->enqueue(event);
});

Expand Down Expand Up @@ -325,7 +327,7 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
case ExecutorEventType::TIMER_EVENT:
{
timers_manager_->execute_ready_timer(
static_cast<const rclcpp::TimerBase *>(event.entity_key));
static_cast<const rclcpp::TimerBase *>(event.entity_key), event.data);
break;
}
case ExecutorEventType::WAITABLE_EVENT:
Expand Down Expand Up @@ -485,7 +487,7 @@ EventsExecutor::create_entity_callback(
{
std::function<void(size_t)>
callback = [this, entity_key, event_type](size_t num_events) {
ExecutorEvent event = {entity_key, -1, event_type, num_events};
ExecutorEvent event = {entity_key, nullptr, -1, event_type, num_events};
this->events_queue_->enqueue(event);
};
return callback;
Expand All @@ -497,7 +499,7 @@ EventsExecutor::create_waitable_callback(const rclcpp::Waitable * entity_key)
std::function<void(size_t, int)>
callback = [this, entity_key](size_t num_events, int waitable_data) {
ExecutorEvent event =
{entity_key, waitable_data, ExecutorEventType::WAITABLE_EVENT, num_events};
{entity_key, nullptr, waitable_data, ExecutorEventType::WAITABLE_EVENT, num_events};
this->events_queue_->enqueue(event);
};
return callback;
Expand Down
29 changes: 20 additions & 9 deletions rclcpp/src/rclcpp/experimental/timers_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ using rclcpp::experimental::TimersManager;

TimersManager::TimersManager(
std::shared_ptr<rclcpp::Context> context,
std::function<void(const rclcpp::TimerBase *)> on_ready_callback)
std::function<void(const rclcpp::TimerBase *, const std::shared_ptr<void> &)> on_ready_callback)
: on_ready_callback_(on_ready_callback),
context_(context)
{
Expand Down Expand Up @@ -148,24 +148,30 @@ bool TimersManager::execute_head_timer()
if (timer_ready) {
// NOTE: here we always execute the timer, regardless of whether the
// on_ready_callback is set or not.
head_timer->call();
head_timer->execute_callback();
auto data = head_timer->call();
if (!data) {
// someone canceled the timer between is_ready and call
return false;
}
head_timer->execute_callback(data);
timers_heap.heapify_root();
weak_timers_heap_.store(timers_heap);
}

return timer_ready;
}

void TimersManager::execute_ready_timer(const rclcpp::TimerBase * timer_id)
void TimersManager::execute_ready_timer(
const rclcpp::TimerBase * timer_id,
const std::shared_ptr<void> & data)
{
TimerPtr ready_timer;
{
std::unique_lock<std::mutex> lock(timers_mutex_);
ready_timer = weak_timers_heap_.get_timer(timer_id);
}
if (ready_timer) {
ready_timer->execute_callback();
ready_timer->execute_callback(data);
}
}

Expand Down Expand Up @@ -215,11 +221,16 @@ void TimersManager::execute_ready_timers_unsafe()
const size_t number_ready_timers = locked_heap.get_number_ready_timers();
size_t executed_timers = 0;
while (executed_timers < number_ready_timers && head_timer->is_ready()) {
head_timer->call();
if (on_ready_callback_) {
on_ready_callback_(head_timer.get());
auto data = head_timer->call();
if (data) {
if (on_ready_callback_) {
on_ready_callback_(head_timer.get(), data);
} else {
head_timer->execute_callback(data);
}
} else {
head_timer->execute_callback();
// someone canceled the timer between is_ready and call
// we don't do anything, as the timer is now 'processed'
}

executed_timers++;
Expand Down
Loading

0 comments on commit 3328d38

Please sign in to comment.