Skip to content

Commit

Permalink
fix: Make passing of timer data thread safe
Browse files Browse the repository at this point in the history
Signed-off-by: Janosch Machowinski <J.Machowinski@cellumation.com>
  • Loading branch information
Janosch Machowinski authored and Janosch Machowinski committed Feb 20, 2024
1 parent 7801d99 commit 0b4ec71
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 46 deletions.
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ class Executor
*/
RCLCPP_PUBLIC
static void
execute_timer(rclcpp::TimerBase::SharedPtr timer);
execute_timer(rclcpp::TimerBase::SharedPtr timer, const std::shared_ptr<void> & dataPtr);

/// Run service server executable.
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#ifndef RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_EVENT_TYPES_HPP_
#define RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_EVENT_TYPES_HPP_

#include <memory>

namespace rclcpp
{
namespace experimental
Expand All @@ -34,6 +36,7 @@ enum ExecutorEventType
struct ExecutorEvent
{
const void * entity_key;
std::shared_ptr<void> data;
int waitable_data;
ExecutorEventType type;
size_t num_events;
Expand Down
9 changes: 6 additions & 3 deletions rclcpp/include/rclcpp/experimental/timers_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ class TimersManager
RCLCPP_PUBLIC
TimersManager(
std::shared_ptr<rclcpp::Context> context,
std::function<void(const rclcpp::TimerBase *)> on_ready_callback = nullptr);
std::function<void(const rclcpp::TimerBase *,
const std::shared_ptr<void> &)> on_ready_callback = nullptr);

/**
* @brief Destruct the TimersManager object making sure to stop thread and release memory.
Expand Down Expand Up @@ -164,9 +165,10 @@ class TimersManager
* the TimersManager on_ready_callback was passed during construction.
*
* @param timer_id the timer ID of the timer to execute
* @param data internal data of the timer
*/
RCLCPP_PUBLIC
void execute_ready_timer(const rclcpp::TimerBase * timer_id);
void execute_ready_timer(const rclcpp::TimerBase * timer_id, const std::shared_ptr<void> & data);

/**
* @brief Get the amount of time before the next timer triggers.
Expand Down Expand Up @@ -529,7 +531,8 @@ class TimersManager
void execute_ready_timers_unsafe();

// Callback to be called when timer is ready
std::function<void(const rclcpp::TimerBase *)> on_ready_callback_;
std::function<void(const rclcpp::TimerBase *,
const std::shared_ptr<void> &)> on_ready_callback_ = nullptr;

// Thread used to run the timers execution task
std::thread timers_thread_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
++it;
continue;
}
if (!timer->call()) {
auto data = timer->call();
if (!data) {
// timer was cancelled, skip it.
++it;
continue;
Expand All @@ -377,6 +378,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
any_exec.timer = timer;
any_exec.callback_group = group;
any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
any_exec.data = *data;
timer_handles_.erase(it);
return;
}
Expand Down
43 changes: 24 additions & 19 deletions rclcpp/include/rclcpp/timer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <atomic>
#include <chrono>
#include <functional>
#include <optional>
#include <memory>
#include <sstream>
#include <thread>
Expand All @@ -43,6 +44,12 @@
namespace rclcpp
{

struct TimerInfo
{
Time expected_call_time;
Time actual_call_time;
};

class TimerBase
{
public:
Expand Down Expand Up @@ -101,16 +108,19 @@ class TimerBase
* The multithreaded executor takes advantage of this to avoid scheduling
* the callback multiple times.
*
* \return `true` if the callback should be executed, `false` if the timer was canceled.
* \return a shared_ptr if the callback should be executed, std::nullopt if the timer was canceled.
*/
RCLCPP_PUBLIC
virtual bool
virtual std::optional<std::shared_ptr<void>>
call() = 0;

/// Call the callback function when the timer signal is emitted.
/**
* \param[in] data the pointer returned by the call function
*/
RCLCPP_PUBLIC
virtual void
execute_callback() = 0;
execute_callback(const std::shared_ptr<void> & data) = 0;

RCLCPP_PUBLIC
std::shared_ptr<const rcl_timer_t>
Expand Down Expand Up @@ -198,12 +208,6 @@ class TimerBase
set_on_reset_callback(rcl_event_callback_t callback, const void * user_data);
};

struct TimerInfo
{
Time expected_call_time;
Time actual_call_time;
};

using VoidCallbackType = std::function<void ()>;
using TimerCallbackType = std::function<void (TimerBase &)>;
using TimerInfoCallbackType = std::function<void (const TimerInfo &)>;
Expand Down Expand Up @@ -263,27 +267,28 @@ class GenericTimer : public TimerBase
* \sa rclcpp::TimerBase::call
* \throws std::runtime_error if it failed to notify timer that callback will occurr
*/
bool
std::optional<std::shared_ptr<void>>
call() override
{
rcl_timer_call_info_t timer_call_info_;
rcl_ret_t ret = rcl_timer_call_with_info(timer_handle_.get(), &timer_call_info_);
if (ret == RCL_RET_TIMER_CANCELED) {
return false;
return std::nullopt;
}
if (ret != RCL_RET_OK) {
throw std::runtime_error("Failed to notify timer that callback occurred");
}
return true;
return std::make_shared<rcl_timer_call_info_t>(timer_call_info_);
}

/**
* \sa rclcpp::TimerBase::execute_callback
*/
void
execute_callback() override
execute_callback(const std::shared_ptr<void> & data) override
{
TRACETOOLS_TRACEPOINT(callback_start, reinterpret_cast<const void *>(&callback_), false);
execute_callback_delegate<>();
execute_callback_delegate<>(*static_cast<rcl_timer_call_info_t *>(data.get()));
TRACETOOLS_TRACEPOINT(callback_end, reinterpret_cast<const void *>(&callback_));
}

Expand All @@ -295,7 +300,7 @@ class GenericTimer : public TimerBase
>::type * = nullptr
>
void
execute_callback_delegate()
execute_callback_delegate(const rcl_timer_call_info_t &)
{
callback_();
}
Expand All @@ -307,7 +312,7 @@ class GenericTimer : public TimerBase
>::type * = nullptr
>
void
execute_callback_delegate()
execute_callback_delegate(const rcl_timer_call_info_t &)
{
callback_(*this);
}
Expand All @@ -320,7 +325,7 @@ class GenericTimer : public TimerBase
>::type * = nullptr
>
void
execute_callback_delegate()
execute_callback_delegate(const rcl_timer_call_info_t & timer_call_info_)
{
const TimerInfo info{Time{timer_call_info_.expected_call_time, clock_->get_clock_type()},
Time{timer_call_info_.actual_call_time, clock_->get_clock_type()}};
Expand All @@ -339,14 +344,14 @@ class GenericTimer : public TimerBase
RCLCPP_DISABLE_COPY(GenericTimer)

FunctorT callback_;
rcl_timer_call_info_t timer_call_info_;
};

template<
typename FunctorT,
typename std::enable_if<
rclcpp::function_traits::same_arguments<FunctorT, VoidCallbackType>::value ||
rclcpp::function_traits::same_arguments<FunctorT, TimerCallbackType>::value
rclcpp::function_traits::same_arguments<FunctorT, TimerCallbackType>::value ||
rclcpp::function_traits::same_arguments<FunctorT, TimerInfoCallbackType>::value
>::type * = nullptr
>
class WallTimer : public GenericTimer<FunctorT>
Expand Down
6 changes: 3 additions & 3 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
TRACETOOLS_TRACEPOINT(
rclcpp_executor_execute,
static_cast<const void *>(any_exec.timer->get_timer_handle().get()));
execute_timer(any_exec.timer);
execute_timer(any_exec.timer, any_exec.data);
}
if (any_exec.subscription) {
TRACETOOLS_TRACEPOINT(
Expand Down Expand Up @@ -714,9 +714,9 @@ Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
}

void
Executor::execute_timer(rclcpp::TimerBase::SharedPtr timer)
Executor::execute_timer(rclcpp::TimerBase::SharedPtr timer, const std::shared_ptr<void> & dataPtr)
{
timer->execute_callback();
timer->execute_callback(dataPtr);
}

void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,13 @@ StaticSingleThreadedExecutor::execute_ready_executables(bool spin_once)
if (i < entities_collector_->get_number_of_timers()) {
if (wait_set_.timers[i] && entities_collector_->get_timer(i)->is_ready()) {
auto timer = entities_collector_->get_timer(i);
timer->call();
execute_timer(std::move(timer));
auto data = timer->call();
if (!data) {
throw std::runtime_error(
"StaticSingleThreadedExecutor::execute_ready_executables() Error,"
"timer is ready, but call returned false");
}
execute_timer(std::move(timer), *data);
if (spin_once) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ EventsExecutor::EventsExecutor(
// The timers manager can be used either to only track timers (in this case an expired
// timer will generate an executor event and then it will be executed by the executor thread)
// or it can also take care of executing expired timers in its dedicated thread.
std::function<void(const rclcpp::TimerBase *)> timer_on_ready_cb = nullptr;
std::function<void(const rclcpp::TimerBase *,
const std::shared_ptr<void> &)> timer_on_ready_cb = nullptr;
if (!execute_timers_separate_thread) {
timer_on_ready_cb = [this](const rclcpp::TimerBase * timer_id) {
ExecutorEvent event = {timer_id, -1, ExecutorEventType::TIMER_EVENT, 1};
timer_on_ready_cb =
[this](const rclcpp::TimerBase * timer_id, const std::shared_ptr<void> & data) {
ExecutorEvent event = {timer_id, data, -1, ExecutorEventType::TIMER_EVENT, 1};
this->events_queue_->enqueue(event);
};
}
Expand Down Expand Up @@ -88,7 +90,7 @@ EventsExecutor::EventsExecutor(
}

ExecutorEvent event =
{notify_waitable_entity_id, waitable_data, ExecutorEventType::WAITABLE_EVENT, 1};
{notify_waitable_entity_id, nullptr, waitable_data, ExecutorEventType::WAITABLE_EVENT, 1};
this->events_queue_->enqueue(event);
});

Expand Down Expand Up @@ -325,7 +327,7 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
case ExecutorEventType::TIMER_EVENT:
{
timers_manager_->execute_ready_timer(
static_cast<const rclcpp::TimerBase *>(event.entity_key));
static_cast<const rclcpp::TimerBase *>(event.entity_key), event.data);
break;
}
case ExecutorEventType::WAITABLE_EVENT:
Expand Down Expand Up @@ -485,7 +487,7 @@ EventsExecutor::create_entity_callback(
{
std::function<void(size_t)>
callback = [this, entity_key, event_type](size_t num_events) {
ExecutorEvent event = {entity_key, -1, event_type, num_events};
ExecutorEvent event = {entity_key, nullptr, -1, event_type, num_events};
this->events_queue_->enqueue(event);
};
return callback;
Expand All @@ -497,7 +499,7 @@ EventsExecutor::create_waitable_callback(const rclcpp::Waitable * entity_key)
std::function<void(size_t, int)>
callback = [this, entity_key](size_t num_events, int waitable_data) {
ExecutorEvent event =
{entity_key, waitable_data, ExecutorEventType::WAITABLE_EVENT, num_events};
{entity_key, nullptr, waitable_data, ExecutorEventType::WAITABLE_EVENT, num_events};
this->events_queue_->enqueue(event);
};
return callback;
Expand Down
26 changes: 18 additions & 8 deletions rclcpp/src/rclcpp/experimental/timers_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ using rclcpp::experimental::TimersManager;

TimersManager::TimersManager(
std::shared_ptr<rclcpp::Context> context,
std::function<void(const rclcpp::TimerBase *)> on_ready_callback)
std::function<void(const rclcpp::TimerBase *, const std::shared_ptr<void> &)> on_ready_callback)
: on_ready_callback_(on_ready_callback),
context_(context)
{
Expand Down Expand Up @@ -148,24 +148,29 @@ bool TimersManager::execute_head_timer()
if (timer_ready) {
// NOTE: here we always execute the timer, regardless of whether the
// on_ready_callback is set or not.
head_timer->call();
head_timer->execute_callback();
auto data = head_timer->call();
if (!data) {
throw std::runtime_error("execute_head_timer() timer repoted ready, but call returned false");
}
head_timer->execute_callback(*data);
timers_heap.heapify_root();
weak_timers_heap_.store(timers_heap);
}

return timer_ready;
}

void TimersManager::execute_ready_timer(const rclcpp::TimerBase * timer_id)
void TimersManager::execute_ready_timer(
const rclcpp::TimerBase * timer_id,
const std::shared_ptr<void> & data)
{
TimerPtr ready_timer;
{
std::unique_lock<std::mutex> lock(timers_mutex_);
ready_timer = weak_timers_heap_.get_timer(timer_id);
}
if (ready_timer) {
ready_timer->execute_callback();
ready_timer->execute_callback(data);
}
}

Expand Down Expand Up @@ -215,11 +220,16 @@ void TimersManager::execute_ready_timers_unsafe()
const size_t number_ready_timers = locked_heap.get_number_ready_timers();
size_t executed_timers = 0;
while (executed_timers < number_ready_timers && head_timer->is_ready()) {
head_timer->call();
auto data = head_timer->call();
if (!data) {
throw std::runtime_error(
"TimersManager::execute_ready_timers_unsafe(): Error, timer was"
"ready but call returned false");
}
if (on_ready_callback_) {
on_ready_callback_(head_timer.get());
on_ready_callback_(head_timer.get(), *data);
} else {
head_timer->execute_callback();
head_timer->execute_callback(*data);
}

executed_timers++;
Expand Down
1 change: 1 addition & 0 deletions rclcpp/test/rclcpp/executors/test_events_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ TEST(TestEventsQueue, SimpleQueueTest)
// Lets push an event into the queue and get it back
rclcpp::experimental::executors::ExecutorEvent push_event = {
simple_queue.get(),
nullptr,
99,
rclcpp::experimental::executors::ExecutorEventType::SUBSCRIPTION_EVENT,
1};
Expand Down
4 changes: 2 additions & 2 deletions rclcpp/test/rclcpp/node_interfaces/test_node_timers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ class TestTimer : public rclcpp::TimerBase
: TimerBase(node->get_clock(), std::chrono::nanoseconds(1),
node->get_node_base_interface()->get_context()) {}

bool call() override {return true;}
void execute_callback() override {}
std::optional<std::shared_ptr<void>> call() override {return nullptr;}
void execute_callback(const std::shared_ptr<void> &) override {}
bool is_steady() override {return false;}
};

Expand Down
Loading

0 comments on commit 0b4ec71

Please sign in to comment.