From 48416d305338c0d66d39ad9158ddc67ea55c7c43 Mon Sep 17 00:00:00 2001 From: Alberto Soragna Date: Fri, 12 Mar 2021 11:20:10 +0000 Subject: [PATCH] Add EventsExecutor Signed-off-by: Alberto Soragna --- rclcpp/CMakeLists.txt | 3 + .../rclcpp/any_subscription_callback.hpp | 12 + rclcpp/include/rclcpp/executor.hpp | 2 +- rclcpp/include/rclcpp/executors.hpp | 1 + .../rclcpp/executors/event_waitable.hpp | 68 ++ .../rclcpp/executors/events_executor.hpp | 259 +++++++ .../events_executor_entities_collector.hpp | 288 ++++++++ .../executors/events_executor_event_types.hpp | 79 +++ .../events_executor_notify_waitable.hpp | 95 +++ .../rclcpp/executors/timers_manager.hpp | 475 +++++++++++++ .../experimental/buffers/events_queue.hpp | 119 ++++ .../buffers/ring_buffer_implementation.hpp | 7 +- .../buffers/simple_events_queue.hpp | 138 ++++ .../src/rclcpp/executors/events_executor.cpp | 333 +++++++++ .../events_executor_entities_collector.cpp | 651 ++++++++++++++++++ .../src/rclcpp/executors/timers_manager.cpp | 257 +++++++ rclcpp/test/rclcpp/CMakeLists.txt | 35 + .../rclcpp/executors/test_events_executor.cpp | 374 ++++++++++ ...est_events_executor_entities_collector.cpp | 178 +++++ .../rclcpp/executors/test_events_queue.cpp | 65 ++ .../test/rclcpp/executors/test_executors.cpp | 98 ++- .../rclcpp/executors/test_timers_manager.cpp | 435 ++++++++++++ .../test_add_callback_groups_to_executor.cpp | 24 +- 23 files changed, 3977 insertions(+), 19 deletions(-) create mode 100644 rclcpp/include/rclcpp/executors/event_waitable.hpp create mode 100644 rclcpp/include/rclcpp/executors/events_executor.hpp create mode 100644 rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp create mode 100644 rclcpp/include/rclcpp/executors/events_executor_event_types.hpp create mode 100644 rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp create mode 100644 rclcpp/include/rclcpp/executors/timers_manager.hpp create mode 100644 rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp create mode 100644 rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp create mode 100644 rclcpp/src/rclcpp/executors/events_executor.cpp create mode 100644 rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp create mode 100644 rclcpp/src/rclcpp/executors/timers_manager.cpp create mode 100644 rclcpp/test/rclcpp/executors/test_events_executor.cpp create mode 100644 rclcpp/test/rclcpp/executors/test_events_executor_entities_collector.cpp create mode 100644 rclcpp/test/rclcpp/executors/test_events_queue.cpp create mode 100644 rclcpp/test/rclcpp/executors/test_timers_manager.cpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 448113bb93..509335ad0a 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -51,10 +51,13 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/executor.cpp src/rclcpp/executors.cpp src/rclcpp/expand_topic_or_service_name.cpp + src/rclcpp/executors/events_executor.cpp + src/rclcpp/executors/events_executor_entities_collector.cpp src/rclcpp/executors/multi_threaded_executor.cpp src/rclcpp/executors/single_threaded_executor.cpp src/rclcpp/executors/static_executor_entities_collector.cpp src/rclcpp/executors/static_single_threaded_executor.cpp + src/rclcpp/executors/timers_manager.cpp src/rclcpp/future_return_code.cpp src/rclcpp/graph_listener.cpp src/rclcpp/guard_condition.cpp diff --git a/rclcpp/include/rclcpp/any_subscription_callback.hpp b/rclcpp/include/rclcpp/any_subscription_callback.hpp index 5a708435bd..124215bdca 100644 --- a/rclcpp/include/rclcpp/any_subscription_callback.hpp +++ b/rclcpp/include/rclcpp/any_subscription_callback.hpp @@ -188,6 +188,12 @@ class AnySubscriptionCallback ConstMessageSharedPtr message, const rclcpp::MessageInfo & message_info) { TRACEPOINT(callback_start, static_cast(this), true); + + // If the message is not valid, return. + if (!message) { + return; + } + if (const_shared_ptr_callback_) { const_shared_ptr_callback_(message); } else if (const_shared_ptr_with_info_callback_) { @@ -211,6 +217,12 @@ class AnySubscriptionCallback MessageUniquePtr message, const rclcpp::MessageInfo & message_info) { TRACEPOINT(callback_start, static_cast(this), true); + + // If the message is not valid, return. + if (!message) { + return; + } + if (shared_ptr_callback_) { typename std::shared_ptr shared_message = std::move(message); shared_ptr_callback_(shared_message); diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 946aa18851..9321f679fc 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -544,7 +544,7 @@ class Executor RCLCPP_DISABLE_COPY(Executor) RCLCPP_PUBLIC - void + virtual void spin_once_impl(std::chrono::nanoseconds timeout); typedef std::map #include +#include "rclcpp/executors/events_executor.hpp" #include "rclcpp/executors/multi_threaded_executor.hpp" #include "rclcpp/executors/single_threaded_executor.hpp" #include "rclcpp/executors/static_single_threaded_executor.hpp" diff --git a/rclcpp/include/rclcpp/executors/event_waitable.hpp b/rclcpp/include/rclcpp/executors/event_waitable.hpp new file mode 100644 index 0000000000..d52b664b7f --- /dev/null +++ b/rclcpp/include/rclcpp/executors/event_waitable.hpp @@ -0,0 +1,68 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__EVENT_WAITABLE_HPP_ +#define RCLCPP__EXECUTORS__EVENT_WAITABLE_HPP_ + +#include "rclcpp/waitable.hpp" + +namespace rclcpp +{ +namespace executors +{ + +/** + * @brief This class provides a wrapper around the waitable object, that is + * meant to be used with the EventsExecutor. + * The waitset related methods are stubbed out as they should not be called. + * This class is abstract as the execute method of rclcpp::Waitable is not implemented. + * Nodes who want to implement a custom EventWaitable, can derive from this class and override + * the execute method. + */ +class EventWaitable : public rclcpp::Waitable +{ +public: + // Constructor + RCLCPP_PUBLIC + EventWaitable() = default; + + // Destructor + RCLCPP_PUBLIC + virtual ~EventWaitable() = default; + + // Stub API: not used by EventsExecutor + RCLCPP_PUBLIC + bool + is_ready(rcl_wait_set_t * wait_set) final + { + (void)wait_set; + throw std::runtime_error("EventWaitable can't be checked if it's ready"); + return false; + } + + // Stub API: not used by EventsExecutor + RCLCPP_PUBLIC + bool + add_to_wait_set(rcl_wait_set_t * wait_set) final + { + (void)wait_set; + throw std::runtime_error("EventWaitable can't be added to a wait_set"); + return false; + } +}; + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__EVENT_WAITABLE_HPP_ diff --git a/rclcpp/include/rclcpp/executors/events_executor.hpp b/rclcpp/include/rclcpp/executors/events_executor.hpp new file mode 100644 index 0000000000..375206fca0 --- /dev/null +++ b/rclcpp/include/rclcpp/executors/events_executor.hpp @@ -0,0 +1,259 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__EVENTS_EXECUTOR_HPP_ +#define RCLCPP__EXECUTORS__EVENTS_EXECUTOR_HPP_ + +#include +#include +#include +#include + +#include "rclcpp/executor.hpp" +#include "rclcpp/executors/events_executor_entities_collector.hpp" +#include "rclcpp/executors/events_executor_event_types.hpp" +#include "rclcpp/executors/events_executor_notify_waitable.hpp" +#include "rclcpp/executors/timers_manager.hpp" +#include "rclcpp/experimental/buffers/events_queue.hpp" +#include "rclcpp/experimental/buffers/simple_events_queue.hpp" +#include "rclcpp/node.hpp" + +#include "rmw/listener_callback_type.h" + +namespace rclcpp +{ +namespace executors +{ + +/// Events executor implementation +/** + * This executor uses an events queue and a timers manager to execute entities from its + * associated nodes and callback groups. + * The RMW listener APIs are used to collect new events. + * + * This executor tries to reduce as much as possible the amount of maintenance operations. + * This allows to use customized `EventsQueue` classes to achieve different goals such + * as very low CPU usage, bounded memory requirement, determinism, etc. + * + * The executor uses a weak ownership model and it locks entities only while executing + * their related events. + * + * To run this executor: + * rclcpp::executors::EventsExecutor executor; + * executor.add_node(node); + * executor.spin(); + * executor.remove_node(node); + */ +class EventsExecutor : public rclcpp::Executor +{ + friend class EventsExecutorEntitiesCollector; + +public: + RCLCPP_SMART_PTR_DEFINITIONS(EventsExecutor) + + /// Default constructor. See the default constructor for Executor. + /** + * \param[in] events_queue The queue used to store events. + * \param[in] options Options used to configure the executor. + */ + RCLCPP_PUBLIC + explicit EventsExecutor( + rclcpp::experimental::buffers::EventsQueue::UniquePtr events_queue = std::make_unique< + rclcpp::experimental::buffers::SimpleEventsQueue>(), + const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions()); + + /// Default destrcutor. + RCLCPP_PUBLIC + virtual ~EventsExecutor() = default; + + /// Events executor implementation of spin. + /** + * This function will block until work comes in, execute it, and keep blocking. + * It will only be interrupted by a CTRL-C (managed by the global signal handler). + * \throws std::runtime_error when spin() called while already spinning + */ + RCLCPP_PUBLIC + void + spin() override; + + /// Events executor implementation of spin some + /** + * This non-blocking function will execute the timers and events + * that were ready when this API was called, until timeout or no + * more work available. New ready-timers/events arrived while + * executing work, won't be taken into account here. + * + * Example: + * while(condition) { + * spin_some(); + * sleep(); // User should have some sync work or + * // sleep to avoid a 100% CPU usage + * } + */ + RCLCPP_PUBLIC + void + spin_some(std::chrono::nanoseconds max_duration = std::chrono::nanoseconds(0)) override; + + /// Events executor implementation of spin all + /** + * This non-blocking function will execute timers and events + * until timeout or no more work available. If new ready-timers/events + * arrive while executing work available, they will be executed + * as long as the timeout hasn't expired. + * + * Example: + * while(condition) { + * spin_all(); + * sleep(); // User should have some sync work or + * // sleep to avoid a 100% CPU usage + * } + */ + RCLCPP_PUBLIC + void + spin_all(std::chrono::nanoseconds max_duration) override; + + /// Add a node to the executor. + /** + * \sa rclcpp::Executor::add_node + */ + RCLCPP_PUBLIC + void + add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify = true) override; + + /// Convenience function which takes Node and forwards NodeBaseInterface. + /** + * \sa rclcpp::EventsExecutor::add_node + */ + RCLCPP_PUBLIC + void + add_node(std::shared_ptr node_ptr, bool notify = true) override; + + /// Remove a node from the executor. + /** + * \sa rclcpp::Executor::remove_node + */ + RCLCPP_PUBLIC + void + remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify = true) override; + + /// Convenience function which takes Node and forwards NodeBaseInterface. + /** + * \sa rclcpp::Executor::remove_node + */ + RCLCPP_PUBLIC + void + remove_node(std::shared_ptr node_ptr, bool notify = true) override; + + /// Add a callback group to an executor. + /** + * \sa rclcpp::Executor::add_callback_group + */ + void + add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify = true) override; + + /// Remove callback group from the executor + /** + * \sa rclcpp::Executor::remove_callback_group + */ + RCLCPP_PUBLIC + void + remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + bool notify = true) override; + + RCLCPP_PUBLIC + std::vector + get_all_callback_groups() override; + + /// Get callback groups that belong to executor. + /** + * \sa rclcpp::Executor::get_manually_added_callback_groups() + */ + RCLCPP_PUBLIC + std::vector + get_manually_added_callback_groups() override; + + /// Get callback groups that belong to executor. + /** + * \sa rclcpp::Executor::get_automatically_added_callback_groups_from_nodes() + */ + RCLCPP_PUBLIC + std::vector + get_automatically_added_callback_groups_from_nodes() override; + +protected: + RCLCPP_PUBLIC + void + spin_once_impl(std::chrono::nanoseconds timeout) override; + + RCLCPP_PUBLIC + void + spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive); + +private: + RCLCPP_DISABLE_COPY(EventsExecutor) + + // Executor callback: Push new events into the queue and trigger cv. + // This function is called by the DDS entities when an event happened, + // like a subscription receiving a message. + static void + push_event(const void * event_data) + { + if (!event_data) { + throw std::runtime_error("Executor event data not valid."); + } + + auto data = static_cast(event_data); + + executors::EventsExecutor * this_executor = data->executor; + + // Event queue mutex scope + { + std::unique_lock lock(this_executor->push_mutex_); + this_executor->events_queue_->push(data->event); + } + // Notify that the event queue has some events in it. + this_executor->events_queue_cv_.notify_one(); + } + + // Execute a single event + RCLCPP_PUBLIC + void + execute_event(const ExecutorEvent & event); + + // Queue where entities can push events + rclcpp::experimental::buffers::EventsQueue::SharedPtr events_queue_; + + EventsExecutorEntitiesCollector::SharedPtr entities_collector_; + EventsExecutorNotifyWaitable::SharedPtr executor_notifier_; + + // Mutex to protect the insertion of events in the queue + std::mutex push_mutex_; + // Variable used to notify when an event is added to the queue + std::condition_variable events_queue_cv_; + // Timers manager + std::shared_ptr timers_manager_; +}; + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__EVENTS_EXECUTOR_HPP_ diff --git a/rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp b/rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp new file mode 100644 index 0000000000..2781026492 --- /dev/null +++ b/rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp @@ -0,0 +1,288 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__EVENTS_EXECUTOR_ENTITIES_COLLECTOR_HPP_ +#define RCLCPP__EXECUTORS__EVENTS_EXECUTOR_ENTITIES_COLLECTOR_HPP_ + +#include +#include +#include +#include +#include + +#include "rclcpp/executors/event_waitable.hpp" +#include "rclcpp/executors/events_executor_event_types.hpp" +#include "rclcpp/executors/timers_manager.hpp" +#include "rclcpp/node_interfaces/node_base_interface.hpp" + +namespace rclcpp +{ +namespace executors +{ +typedef std::map> WeakCallbackGroupsToNodesMap; + +// forward declaration of EventsExecutor to avoid circular dependency +class EventsExecutor; + +/** + * @brief This class provides a waitable object that is used for managing the + * entities (i.e. nodes and their subscriptions, timers, services, etc) + * added to an EventsExecutor. + * The add/remove node APIs are used when a node is added/removed from + * the associated EventsExecutor and result in setting/unsetting the + * events callbacks and adding timers to the timers manager. + * + * Being this class derived from Waitable, it can be used to wake up an + * executor thread while it's spinning. + * When this occurs, the execute API takes care of handling changes + * in the entities currently used by the executor. + */ +class EventsExecutorEntitiesCollector final + : public EventWaitable, + public std::enable_shared_from_this +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(EventsExecutorEntitiesCollector) + + // Constructor + RCLCPP_PUBLIC + EventsExecutorEntitiesCollector( + EventsExecutor * executor); + + // Destructor + RCLCPP_PUBLIC + ~EventsExecutorEntitiesCollector(); + + // Initialize entities collector + RCLCPP_PUBLIC + void init(); + + // The purpose of "execute" is handling the situation of a new entity added to + // a node, while the executor is already spinning. + // This consists in setting that entitiy's callback. + // If a entity is removed from a node, we should unset its callback + RCLCPP_PUBLIC + void + execute(std::shared_ptr & data) override; + + RCLCPP_PUBLIC + void + add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + + RCLCPP_PUBLIC + void + remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + + /// Add a callback group to the entities collector + /** + * \see rclcpp::Executor::add_callback_group + */ + RCLCPP_PUBLIC + void + add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + + /// Add a callback group to the entities collector + /** + * \see rclcpp::Executor::add_callback_group + * \return boolean whether the node from the callback group is new + */ + RCLCPP_PUBLIC + void + add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); + + /// Remove a callback group from the entities collector + /** + * \see rclcpp::Executor::remove_callback_group + */ + RCLCPP_PUBLIC + void + remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr); + + /// Remove a callback group from the entities collector + /** + * \see rclcpp::Executor::remove_callback_group_from_map + */ + RCLCPP_PUBLIC + void + remove_callback_group_from_map( + rclcpp::CallbackGroup::SharedPtr group_ptr, + WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); + + RCLCPP_PUBLIC + std::vector + get_all_callback_groups(); + + /// Get manually added callback groups that belong to the entities collector + /** + * \see rclcpp::Executor::get_manually_added_callback_groups() + */ + RCLCPP_PUBLIC + std::vector + get_manually_added_callback_groups(); + + /// Get autmatically added callback groups that belong to the entities collector + /** + * \see rclcpp::Executor::get_automatically_added_callback_groups_from_nodes() + */ + RCLCPP_PUBLIC + std::vector + get_automatically_added_callback_groups_from_nodes(); + + /// + /** + * Get the subscription shared pointer corresponding + * to a subscription identifier + */ + RCLCPP_PUBLIC + rclcpp::SubscriptionBase::SharedPtr + get_subscription(const void * subscription_id); + + /// + /** + * Get the client shared pointer corresponding + * to a client identifier + */ + RCLCPP_PUBLIC + rclcpp::ClientBase::SharedPtr + get_client(const void * client_id); + + /// + /** + * Get the service shared pointer corresponding + * to a service identifier + */ + RCLCPP_PUBLIC + rclcpp::ServiceBase::SharedPtr + get_service(const void * service_id); + + /// + /** + * Get the waitable shared pointer corresponding + * to a waitable identifier + */ + RCLCPP_PUBLIC + rclcpp::Waitable::SharedPtr + get_waitable(const void * waitable_id); + + /// + /** + * Add a weak pointer to a waitable + */ + RCLCPP_PUBLIC + void + add_waitable(rclcpp::Waitable::SharedPtr waitable); + + RCLCPP_PUBLIC + std::shared_ptr + take_data() + { + // This waitable doesn't handle any data + return nullptr; + } + +private: + void + set_callback_group_entities_callbacks(rclcpp::CallbackGroup::SharedPtr group); + + void + unset_callback_group_entities_callbacks(rclcpp::CallbackGroup::SharedPtr group); + + void + set_guard_condition_callback(const rcl_guard_condition_t * guard_condition); + + void + unset_guard_condition_callback(const rcl_guard_condition_t * guard_condition); + + void + remove_callback_data(void * entity_id, ExecutorEventType type); + + const EventsExecutorCallbackData * + get_callback_data(void * entity_id, ExecutorEventType type); + + /// Return true if the node belongs to the collector + /** + * \param[in] group_ptr a node base interface shared pointer + * \return boolean whether a node belongs the collector + */ + bool + has_node( + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const; + + /// Add all callback groups that can be automatically added by any executor + /// and is not already associated with an executor from nodes + /// that are associated with executor + /** + * \see rclcpp::Executor::add_callback_groups_from_nodes_associated_to_executor() + */ + void + add_callback_groups_from_nodes_associated_to_executor(); + + void + set_entities_event_callbacks_from_map( + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); + + // maps callback groups to nodes. + WeakCallbackGroupsToNodesMap weak_groups_associated_with_executor_to_nodes_; + // maps callback groups to nodes. + WeakCallbackGroupsToNodesMap weak_groups_to_nodes_associated_with_executor_; + + typedef std::map> + WeakNodesToGuardConditionsMap; + WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_; + + /// List of weak nodes registered in the events executor + std::list weak_nodes_; + + // Maps: entity identifiers to weak pointers from the entities registered in the executor + // so in the case of an event providing and ID, we can retrieve and own the corresponding + // entity while it performs work + std::unordered_map weak_subscriptions_map_; + std::unordered_map weak_services_map_; + std::unordered_map weak_clients_map_; + std::unordered_map weak_waitables_map_; + + /// Executor using this entities collector object + EventsExecutor * associated_executor_ = nullptr; + /// Instance of the timers manager used by the associated executor + TimersManager::SharedPtr timers_manager_; + + /// Callback data objects mapped to the number of listeners sharing the same object. + /// When no more listeners use the object, it can be removed from the map. + /// For example, the entities collector holds every node's guard condition, which + /// share the same EventsExecutorCallbackData object ptr to use as their callback arg: + /// cb_data_object = {executor_ptr, entities_collector_ptr, WAITABLE_EVENT}; + /// Node1->gc(&cb_data_object) + /// Node2->gc(&cb_data_object) + /// So the maps has: (cb_data_object, 2) + /// When both nodes are removed (counter = 0), the cb_data_object can be destroyed. + std::unordered_map callback_data_map_; +}; + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__EVENTS_EXECUTOR_ENTITIES_COLLECTOR_HPP_ diff --git a/rclcpp/include/rclcpp/executors/events_executor_event_types.hpp b/rclcpp/include/rclcpp/executors/events_executor_event_types.hpp new file mode 100644 index 0000000000..045cc51d57 --- /dev/null +++ b/rclcpp/include/rclcpp/executors/events_executor_event_types.hpp @@ -0,0 +1,79 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__EVENTS_EXECUTOR_EVENT_TYPES_HPP_ +#define RCLCPP__EXECUTORS__EVENTS_EXECUTOR_EVENT_TYPES_HPP_ + +namespace rclcpp +{ +namespace executors +{ + +// forward declaration of EventsExecutor to avoid circular dependency +class EventsExecutor; + +enum ExecutorEventType +{ + SUBSCRIPTION_EVENT, + SERVICE_EVENT, + CLIENT_EVENT, + WAITABLE_EVENT +}; + +struct ExecutorEvent +{ + const void * entity_id; + ExecutorEventType type; +}; + +// The EventsExecutorCallbackData struct is what the listeners +// will use as argument when calling their callbacks from the +// RMW implementation. The listeners get a (void *) of this struct, +// and the executor is in charge to cast it back and use the data. +struct EventsExecutorCallbackData +{ + EventsExecutorCallbackData( + EventsExecutor * _executor, + ExecutorEvent _event) + { + executor = _executor; + event = _event; + } + + // Equal operator + bool operator==(const EventsExecutorCallbackData & other) const + { + return (event.entity_id == other.event.entity_id); + } + + // Struct members + EventsExecutor * executor; + ExecutorEvent event; +}; + +// To be able to use std::unordered_map with an EventsExecutorCallbackData +// as key, we need a hasher. We use the entity ID as hash, since it is +// unique for each EventsExecutorCallbackData object. +struct KeyHasher +{ + size_t operator()(const EventsExecutorCallbackData & key) const + { + return std::hash()(key.event.entity_id); + } +}; + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__EVENTS_EXECUTOR_EVENT_TYPES_HPP_ diff --git a/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp b/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp new file mode 100644 index 0000000000..1a9a72ebff --- /dev/null +++ b/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp @@ -0,0 +1,95 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__EVENTS_EXECUTOR_NOTIFY_WAITABLE_HPP_ +#define RCLCPP__EXECUTORS__EVENTS_EXECUTOR_NOTIFY_WAITABLE_HPP_ + +#include +#include + +#include "rcl/guard_condition.h" +#include "rclcpp/executors/event_waitable.hpp" + +namespace rclcpp +{ +namespace executors +{ + +/** + * @brief This class provides an EventWaitable that allows to + * wake up an EventsExecutor when a guard condition is notified. + */ +class EventsExecutorNotifyWaitable final : public EventWaitable +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(EventsExecutorNotifyWaitable) + + // Constructor + RCLCPP_PUBLIC + EventsExecutorNotifyWaitable() = default; + + // Destructor + RCLCPP_PUBLIC + virtual ~EventsExecutorNotifyWaitable() = default; + + // The function is a no-op, since we only care of waking up the executor + RCLCPP_PUBLIC + void + execute(std::shared_ptr & data) override + { + (void)data; + } + + RCLCPP_PUBLIC + void + add_guard_condition(const rcl_guard_condition_t * guard_condition) + { + notify_guard_conditions_.push_back(guard_condition); + } + + RCLCPP_PUBLIC + void + set_listener_callback( + rmw_listener_callback_t callback, + const void * user_data) const override + { + for (auto gc : notify_guard_conditions_) { + rcl_ret_t ret = rcl_guard_condition_set_listener_callback( + gc, + callback, + user_data, + false); + + if (RCL_RET_OK != ret) { + throw std::runtime_error("Couldn't set guard condition events callback"); + } + } + } + + RCLCPP_PUBLIC + std::shared_ptr + take_data() + { + // This waitable doesn't handle any data + return nullptr; + } + +private: + std::list notify_guard_conditions_; +}; + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__EVENTS_EXECUTOR_NOTIFY_WAITABLE_HPP_ diff --git a/rclcpp/include/rclcpp/executors/timers_manager.hpp b/rclcpp/include/rclcpp/executors/timers_manager.hpp new file mode 100644 index 0000000000..2630fdaf2d --- /dev/null +++ b/rclcpp/include/rclcpp/executors/timers_manager.hpp @@ -0,0 +1,475 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__TIMERS_MANAGER_HPP_ +#define RCLCPP__EXECUTORS__TIMERS_MANAGER_HPP_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/context.hpp" +#include "rclcpp/timer.hpp" + +namespace rclcpp +{ +namespace executors +{ + +/** + * @brief This class provides a way for storing and executing timer objects. + * It APIs to suit the needs of different applications and execution models. + * All public APIs provided by this class are thread-safe. + * + * Timers management + * This class provides APIs to add and remove timers. + * It keeps a list of weak pointers from added timers, and locks them only when + * they need to be executed or modified. + * Timers are kept ordered in a binary-heap priority queue. + * Calls to add/remove APIs will temporarily block the execution of the timers and + * will require to reorder the internal priority queue. + * Because of this, they have a not-negligible impact on the performance. + * + * Timers execution + * The most efficient implementation consists in letting a TimersManager object + * to spawn a thread where timers are monitored and periodically executed. + * Besides this, other APIs allow to either execute a single timer or all the + * currently ready ones. + * This class assumes that the `execute_callback()` API of the stored timers is never + * called by other entities, but it can only be called from here. + * If this assumption is not respected, the heap property may be invalidated, + * so timers may be executed out of order, without this object noticing it. + * + */ +class TimersManager +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(TimersManager) + + /** + * @brief Construct a new TimersManager object + * + * @param context custom context to be used. + * Shared ownership of the context is held until destruction. + */ + explicit TimersManager(std::shared_ptr context); + + /** + * @brief Destruct the TimersManager object making sure to stop thread and release memory. + */ + ~TimersManager(); + + /** + * @brief Adds a new timer to the storage, maintaining weak ownership of it. + * Function is thread safe and it can be called regardless of the state of the timers thread. + * + * @param timer the timer to add. + */ + void add_timer(rclcpp::TimerBase::SharedPtr timer); + + /** + * @brief Remove a single timer from the object storage. + * Will do nothing if the timer was not being stored here. + * Function is thread safe and it can be called regardless of the state of the timers thread. + * + * @param timer the timer to remove. + */ + void remove_timer(rclcpp::TimerBase::SharedPtr timer); + + /** + * @brief Remove all the timers stored in the object. + * Function is thread safe and it can be called regardless of the state of the timers thread. + */ + void clear(); + + /** + * @brief Starts a thread that takes care of executing the timers stored in this object. + * Function will throw an error if the timers thread was already running. + */ + void start(); + + /** + * @brief Stops the timers thread. + * Will do nothing if the timer thread was not running. + */ + void stop(); + + /** + * @brief Executes all the timers currently ready when the function was invoked. + * This function will lock all the stored timers throughout its duration. + * Function is thread safe, but it will throw an error if the timers thread is running. + */ + void execute_ready_timers(); + + /** + * @brief Executes head timer if ready at time point. + * Function is thread safe, but it will throw an error if the timers thread is running. + * + * @param tp the time point to check for, where `max()` denotes that no check will be performed. + * @return true if head timer was ready at time point. + */ + bool execute_head_timer( + std::chrono::time_point tp = + std::chrono::time_point::max()); + + /** + * @brief Get the amount of time before the next timer expires. + * Function is thread safe, but it will throw an error if the timers thread is running. + * + * @return std::chrono::nanoseconds to wait, + * the returned value could be negative if the timer is already expired + * or MAX_TIME if there are no timers stored in the object. + */ + std::chrono::nanoseconds get_head_timeout(); + + // This is what the TimersManager uses to denote a duration forever. + // We don't use std::chrono::nanoseconds::max because it will overflow. + // See https://en.cppreference.com/w/cpp/thread/condition_variable/wait_for + static constexpr std::chrono::nanoseconds MAX_TIME = std::chrono::hours(90); + +private: + RCLCPP_DISABLE_COPY(TimersManager) + + using TimerPtr = rclcpp::TimerBase::SharedPtr; + using WeakTimerPtr = rclcpp::TimerBase::WeakPtr; + + // Forward declaration + class TimersHeap; + + /** + * @brief This class allows to store weak pointers to timers in a heap-like data structure. + * The root of the heap is the timer that expires first. + * Since this class uses weak ownership, it is not guaranteed that it represents a valid heap + * at any point in time as timers could go out of scope, thus invalidating it. + * The "validate_and_lock" API allows to get ownership of the timers and also makes sure that + * the heap property is respected. + * This class is not thread safe and requires external mutexes to protect its usage. + */ + class WeakTimersHeap + { +public: + /** + * @brief Add a new timer to the heap. After the addition, the heap property is enforced. + * + * @param timer new timer to add. + * @return true if timer has been added, false if it was already there. + */ + bool add_timer(TimerPtr timer) + { + TimersHeap locked_heap = this->validate_and_lock(); + bool added = locked_heap.add_timer(std::move(timer)); + + if (added) { + // Re-create the weak heap with the new timer added + this->store(locked_heap); + } + + return added; + } + + /** + * @brief Remove a timer from the heap. After the removal, the heap property is enforced. + * + * @param timer timer to remove. + * @return true if timer has been removed, false if it was not there. + */ + bool remove_timer(TimerPtr timer) + { + TimersHeap locked_heap = this->validate_and_lock(); + bool removed = locked_heap.remove_timer(std::move(timer)); + + if (removed) { + // Re-create the weak heap with the timer removed + this->store(locked_heap); + } + + return removed; + } + + /** + * @brief Returns a const reference to the front element + */ + const WeakTimerPtr & front() const + { + return weak_heap_.front(); + } + + /** + * @brief Returns whether the heap is empty or not + */ + bool empty() const + { + return weak_heap_.empty(); + } + + /** + * @brief This function restores the current object as a valid heap + * and it returns a locked version of it. + * It is the only public API to access and manipulate the stored timers. + * + * @return TimersHeap owned timers corresponding to the current object + */ + TimersHeap validate_and_lock() + { + TimersHeap locked_heap; + bool any_timer_destroyed = false; + + auto it = weak_heap_.begin(); + + while (it != weak_heap_.end()) { + if (auto timer_shared_ptr = it->lock()) { + // This timer is valid, add it to the locked heap + // Note: we access private `owned_heap_` member field. + locked_heap.owned_heap_.push_back(std::move(timer_shared_ptr)); + it++; + } else { + // This timer went out of scope, remove it + it = weak_heap_.erase(it); + any_timer_destroyed = true; + } + } + + // If a timer has gone out of scope, then the remaining elements may not represent + // a valid heap anymore. We need to re heapify the timers heap. + if (any_timer_destroyed) { + locked_heap.heapify(); + // Re-create the weak heap now that elements have been heapified again + this->store(locked_heap); + } + + return locked_heap; + } + + /** + * @brief This function allows to recreate the heap of weak pointers + * from an heap of owned pointers. + * It is required to be called after a locked TimersHeap generated from this object + * has been modified in any way (e.g. timers triggered, added, removed). + * + * @param heap timers heap to store as weak pointers + */ + void store(const TimersHeap & heap) + { + weak_heap_.clear(); + // Note: we access private `owned_heap_` member field. + for (auto t : heap.owned_heap_) { + weak_heap_.push_back(t); + } + } + + /** + * @brief Remove all timers from the heap. + */ + void clear() + { + weak_heap_.clear(); + } + +private: + std::vector weak_heap_; + }; + + /** + * @brief This class is the equivalent of WeakTimersHeap but with ownership of the timers. + * It can be generated by locking the weak version. + * It provides operations to manipulate the heap. + * This class is not thread safe and requires external mutexes to protect its usage. + */ + class TimersHeap + { +public: + /** + * @brief Try to add a new timer to the heap. + * After the addition, the heap property is preserved. + * @param timer new timer to add + * @return true if timer has been added, false if it was already there + */ + bool add_timer(TimerPtr timer) + { + // Nothing to do if the timer is already stored here + auto it = std::find(owned_heap_.begin(), owned_heap_.end(), timer); + if (it != owned_heap_.end()) { + return false; + } + + owned_heap_.push_back(std::move(timer)); + std::push_heap(owned_heap_.begin(), owned_heap_.end(), timer_greater); + + return true; + } + + /** + * @brief Try to remove a timer from the heap. + * After the removal, the heap property is preserved. + * @param timer timer to remove + * @return true if timer has been removed, false if it was not there + */ + bool remove_timer(TimerPtr timer) + { + // Nothing to do if the timer is not stored here + auto it = std::find(owned_heap_.begin(), owned_heap_.end(), timer); + if (it == owned_heap_.end()) { + return false; + } + + owned_heap_.erase(it); + std::make_heap(owned_heap_.begin(), owned_heap_.end(), timer_greater); + + return true; + } + + /** + * @brief Returns a reference to the front element + */ + TimerPtr & front() + { + return owned_heap_.front(); + } + + /** + * @brief Returns a const reference to the front element + */ + const TimerPtr & front() const + { + return owned_heap_.front(); + } + + /** + * @brief Returns whether the heap is empty or not + */ + bool empty() const + { + return owned_heap_.empty(); + } + + /** + * @brief Restore a valid heap after the root value has been replaced (e.g. timer triggered). + */ + void heapify_root() + { + // The following code is a more efficient version of doing + // pop_heap(); + // pop_back(); + // push_back(); + // push_heap(); + // as it removes the need for the last push_heap + + // Push the modified element (i.e. the current root) at the bottom of the heap + owned_heap_.push_back(owned_heap_[0]); + // Exchange first and last-1 elements and reheapify + std::pop_heap(owned_heap_.begin(), owned_heap_.end(), timer_greater); + // Remove last element + owned_heap_.pop_back(); + } + + /** + * @brief Completely restores the structure to a valid heap + */ + void heapify() + { + std::make_heap(owned_heap_.begin(), owned_heap_.end(), timer_greater); + } + + /** + * @brief Friend declaration to allow the `validate_and_lock()` function to access the + * underlying heap container + */ + friend TimersHeap WeakTimersHeap::validate_and_lock(); + + /** + * @brief Friend declaration to allow the `store()` function to access the + * underlying heap container + */ + friend void WeakTimersHeap::store(const TimersHeap & heap); + +private: + /** + * @brief Comparison function between timers. + * Returns true if `a` expires after `b`. + */ + static bool timer_greater(TimerPtr a, TimerPtr b) + { + return a->time_until_trigger() > b->time_until_trigger(); + } + + std::vector owned_heap_; + }; + + /** + * @brief Implements a loop that keeps executing ready timers. + * This function is executed in the timers thread. + */ + void run_timers(); + + /** + * @brief Get the amount of time before the next timer expires. + * This function is not thread safe, acquire a mutex before calling it. + * + * @return std::chrono::nanoseconds to wait, + * the returned value could be negative if the timer is already expired + * or MAX_TIME if the heap is empty. + * This function is not thread safe, acquire the timers_mutex_ before calling it. + */ + std::chrono::nanoseconds get_head_timeout_unsafe(); + + /** + * @brief Executes all the timers currently ready when the function is invoked + * while keeping the heap correctly sorted. + * This function is not thread safe, acquire the timers_mutex_ before calling it. + */ + void execute_ready_timers_unsafe(); + + /** + * @brief Helper function that checks whether a timer was already ready + * at a specific time point. + + * @param timer a pointer to the timer to check for + * @param tp the time point to check for + * @return true if timer was ready at tp + */ + bool timer_was_ready_at_tp( + TimerPtr timer, + std::chrono::time_point tp) + { + // A ready timer will return a negative duration when calling time_until_trigger + auto time_ready = std::chrono::steady_clock::now() + timer->time_until_trigger(); + return time_ready < tp; + } + + // Thread used to run the timers execution task + std::thread timers_thread_; + // Protects access to timers + std::mutex timers_mutex_; + // Notifies the timers thread whenever timers are added/removed + std::condition_variable timers_cv_; + // Flag used as predicate by timers_cv + bool timers_updated_ {false}; + // Indicates whether the timers thread is currently running or requested to stop + std::atomic running_ {false}; + // Context of the parent executor + std::shared_ptr context_; + // Timers heap with weak ownership + WeakTimersHeap weak_timers_heap_; +}; + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__TIMERS_MANAGER_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp new file mode 100644 index 0000000000..f0958d36bd --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp @@ -0,0 +1,119 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__EVENTS_QUEUE_HPP_ +#define RCLCPP__EXPERIMENTAL__BUFFERS__EVENTS_QUEUE_HPP_ + +#include + +#include "rclcpp/macros.hpp" +#include "rclcpp/visibility_control.hpp" + +#include "rclcpp/executors/events_executor_event_types.hpp" + +namespace rclcpp +{ +namespace experimental +{ +namespace buffers +{ + +/** + * @brief This abstract class can be used to implement different types of queues + * where `ExecutorEvent` can be stored. + * The derived classes should choose which underlying container to use and + * the strategy for pushing and popping events. + * For example a queue implementation may be bounded or unbounded and have + * different pruning strategies. + * Implementations may or may not check the validity of events and decide how to handle + * the situation where an event is not valid anymore (e.g. a subscription history cache overruns) + */ +class EventsQueue +{ +public: + RCLCPP_SMART_PTR_ALIASES_ONLY(EventsQueue) + + /** + * @brief Destruct the object. + */ + RCLCPP_PUBLIC + virtual ~EventsQueue() = default; + + /** + * @brief push event into the queue + * @param event The event to push into the queue + */ + RCLCPP_PUBLIC + virtual + void + push(const rclcpp::executors::ExecutorEvent & event) = 0; + + /** + * @brief removes front element from the queue. + */ + RCLCPP_PUBLIC + virtual + void + pop() = 0; + + /** + * @brief gets the front event from the queue + * @return the front event + */ + RCLCPP_PUBLIC + virtual + rclcpp::executors::ExecutorEvent + front() const = 0; + + /** + * @brief Test whether queue is empty + * @return true if the queue's size is 0, false otherwise. + */ + RCLCPP_PUBLIC + virtual + bool + empty() const = 0; + + /** + * @brief Returns the number of elements in the queue. + * @return the number of elements in the queue. + */ + RCLCPP_PUBLIC + virtual + size_t + size() const = 0; + + /** + * @brief Initializes the queue + */ + RCLCPP_PUBLIC + virtual + void + init() = 0; + + /** + * @brief pops out all events stored in the object into an output queue. + * @return queue with events + */ + RCLCPP_PUBLIC + virtual + std::queue + pop_all_events() = 0; +}; + +} // namespace buffers +} // namespace experimental +} // namespace rclcpp + +#endif // RCLCPP__EXPERIMENTAL__BUFFERS__EVENTS_QUEUE_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp b/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp index ad01946193..4ed7245c48 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp @@ -91,7 +91,12 @@ class RingBufferImplementation : public BufferImplementationBase if (!has_data_()) { RCLCPP_ERROR(rclcpp::get_logger("rclcpp"), "Calling dequeue on empty intra-process buffer"); - throw std::runtime_error("Calling dequeue on empty intra-process buffer"); + // This situation can happen on the EventsExecutor if we have more events in the queue + // than messages in the history cache (set by the qos_policies.depth of the subscription) + // For example if we set depth=1 and we get 2 messages really fast (so no time for + // processing), we could end up with 2 events in the queue but only 1 msg is + // actually stored on the buffer. In this case we return an empty buffer. + return BufferT(); } auto request = std::move(ring_buffer_[read_index_]); diff --git a/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp new file mode 100644 index 0000000000..4f560a91cf --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp @@ -0,0 +1,138 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__SIMPLE_EVENTS_QUEUE_HPP_ +#define RCLCPP__EXPERIMENTAL__BUFFERS__SIMPLE_EVENTS_QUEUE_HPP_ + +#include +#include + +#include "rclcpp/experimental/buffers/events_queue.hpp" + +namespace rclcpp +{ +namespace experimental +{ +namespace buffers +{ + +/** + * @brief This class implements an EventsQueue as a simple wrapper around a std::queue. + * It does not perform any checks about the size of queue, which can grow + * unbounded without being pruned. + * The simplicity of this implementation makes it suitable for optimizing CPU usage. + */ +class SimpleEventsQueue : public EventsQueue +{ +public: + RCLCPP_PUBLIC + ~SimpleEventsQueue() = default; + + /** + * @brief push event into the queue + * @param event The event to push into the queue + */ + RCLCPP_PUBLIC + virtual + void + push(const rclcpp::executors::ExecutorEvent & event) + { + event_queue_.push(event); + } + + /** + * @brief removes front event from the queue. + */ + RCLCPP_PUBLIC + virtual + void + pop() + { + event_queue_.pop(); + } + + /** + * @brief gets the front event from the queue + * @return the front event + */ + RCLCPP_PUBLIC + virtual + rclcpp::executors::ExecutorEvent + front() const + { + return event_queue_.front(); + } + + /** + * @brief Test whether queue is empty + * @return true if the queue's size is 0, false otherwise. + */ + RCLCPP_PUBLIC + virtual + bool + empty() const + { + return event_queue_.empty(); + } + + /** + * @brief Returns the number of elements in the queue. + * @return the number of elements in the queue. + */ + RCLCPP_PUBLIC + virtual + size_t + size() const + { + return event_queue_.size(); + } + + /** + * @brief Initializes the queue + */ + RCLCPP_PUBLIC + virtual + void + init() + { + // Make sure the queue is empty when we start + std::queue local_queue; + std::swap(event_queue_, local_queue); + } + + /** + * @brief gets a queue with all events accumulated on it since + * the last call. The member queue is empty when the call returns. + * @return std::queue with events + */ + RCLCPP_PUBLIC + virtual + std::queue + pop_all_events() + { + std::queue local_queue; + std::swap(event_queue_, local_queue); + return local_queue; + } + +private: + std::queue event_queue_; +}; + +} // namespace buffers +} // namespace experimental +} // namespace rclcpp + + +#endif // RCLCPP__EXPERIMENTAL__BUFFERS__SIMPLE_EVENTS_QUEUE_HPP_ diff --git a/rclcpp/src/rclcpp/executors/events_executor.cpp b/rclcpp/src/rclcpp/executors/events_executor.cpp new file mode 100644 index 0000000000..eb53c2e3c7 --- /dev/null +++ b/rclcpp/src/rclcpp/executors/events_executor.cpp @@ -0,0 +1,333 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +#include "rclcpp/exceptions/exceptions.hpp" +#include "rclcpp/executors/events_executor.hpp" + +using namespace std::chrono_literals; + +using rclcpp::executors::EventsExecutor; + +EventsExecutor::EventsExecutor( + rclcpp::experimental::buffers::EventsQueue::UniquePtr events_queue, + const rclcpp::ExecutorOptions & options) +: rclcpp::Executor(options) +{ + // Get ownership of the queue used to store events. + if (!events_queue) { + throw std::invalid_argument("events_queue can't be a null pointer"); + } + events_queue_ = std::move(events_queue); + events_queue_->init(); + + // Create timers manager + timers_manager_ = std::make_shared(context_); + + // Create entities collector + entities_collector_ = std::make_shared(this); + entities_collector_->init(); + + // Setup the executor notifier to wake up the executor when some guard conditions are tiggered. + // The added guard conditions are guaranteed to not go out of scope before the executor itself. + executor_notifier_ = std::make_shared(); + executor_notifier_->add_guard_condition(&shutdown_guard_condition_->get_rcl_guard_condition()); + executor_notifier_->add_guard_condition(&interrupt_guard_condition_); + + entities_collector_->add_waitable(executor_notifier_); +} + +void +EventsExecutor::spin() +{ + if (spinning.exchange(true)) { + throw std::runtime_error("spin() called while already spinning"); + } + RCLCPP_SCOPE_EXIT(this->spinning.store(false);); + + // When condition variable is notified, check this predicate to proceed + auto has_event_predicate = [this]() {return !events_queue_->empty();}; + + timers_manager_->start(); + + while (rclcpp::ok(context_) && spinning.load()) { + std::unique_lock push_lock(push_mutex_); + // We wait here until something has been pushed to the event queue + events_queue_cv_.wait(push_lock, has_event_predicate); + // Move all events into a local events queue to allow entities to push while we execute them + std::queue execution_events_queue = events_queue_->pop_all_events(); + // Unlock the mutex + push_lock.unlock(); + // Consume all available events + while (!execution_events_queue.empty()) { + ExecutorEvent event = execution_events_queue.front(); + execution_events_queue.pop(); + this->execute_event(event); + } + } + + // Stop the timers manager thread when we are done spinning + timers_manager_->stop(); +} + +void +EventsExecutor::spin_some(std::chrono::nanoseconds max_duration) +{ + // In this context a 0 input max_duration means no duration limit + if (std::chrono::nanoseconds(0) == max_duration) { + max_duration = timers_manager_->MAX_TIME; + } + + return this->spin_some_impl(max_duration, false); +} + +void +EventsExecutor::spin_all(std::chrono::nanoseconds max_duration) +{ + if (max_duration <= 0ns) { + throw std::invalid_argument("max_duration must be positive"); + } + return this->spin_some_impl(max_duration, true); +} + +void +EventsExecutor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive) +{ + if (spinning.exchange(true)) { + throw std::runtime_error("spin_some() called while already spinning"); + } + + RCLCPP_SCOPE_EXIT(this->spinning.store(false);); + + auto start = std::chrono::steady_clock::now(); + + auto max_duration_not_elapsed = [max_duration, start]() { + auto elapsed_time = std::chrono::steady_clock::now() - start; + return elapsed_time < max_duration; + }; + + size_t ready_events_at_start = 0; + size_t executed_events = 0; + + if (!exhaustive) { + // Get the number of events ready at start + std::unique_lock lock(push_mutex_); + ready_events_at_start = events_queue_->size(); + lock.unlock(); + } + + while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) { + // Execute first ready event from queue if exists + if (exhaustive || (executed_events < ready_events_at_start)) { + std::unique_lock lock(push_mutex_); + bool has_event = !events_queue_->empty(); + + if (has_event) { + ExecutorEvent event = events_queue_->front(); + events_queue_->pop(); + this->execute_event(event); + executed_events++; + continue; + } + } + + bool timer_executed; + + if (exhaustive) { + // Execute timer if is ready + timer_executed = timers_manager_->execute_head_timer(); + } else { + // Execute timer if was ready at start + timer_executed = timers_manager_->execute_head_timer(start); + } + + if (timer_executed) { + continue; + } + + // If there's no more work available, exit + break; + } +} + +void +EventsExecutor::spin_once_impl(std::chrono::nanoseconds timeout) +{ + // In this context a negative input timeout means no timeout + if (timeout < 0ns) { + timeout = timers_manager_->MAX_TIME; + } + + // Select the smallest between input timeout and timer timeout + auto next_timer_timeout = timers_manager_->get_head_timeout(); + if (next_timer_timeout < timeout) { + timeout = next_timer_timeout; + } + + // When condition variable is notified, check this predicate to proceed + auto has_event_predicate = [this]() {return !events_queue_->empty();}; + + ExecutorEvent event; + bool has_event = false; + + { + // Wait until timeout or event arrives + std::unique_lock lock(push_mutex_); + events_queue_cv_.wait_for(lock, timeout, has_event_predicate); + + // Grab first event from queue if it exists + has_event = !events_queue_->empty(); + if (has_event) { + event = events_queue_->front(); + events_queue_->pop(); + } + } + + // If we wake up from the wait with an event, it means that it + // arrived before any of the timers expired. + if (has_event) { + this->execute_event(event); + } else { + timers_manager_->execute_head_timer(); + } +} + +void +EventsExecutor::add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) +{ + // This field is unused because we don't have to wake up the executor when a node is added. + (void) notify; + + // Add node to entities collector + entities_collector_->add_node(node_ptr); +} + +void +EventsExecutor::add_node(std::shared_ptr node_ptr, bool notify) +{ + this->add_node(node_ptr->get_node_base_interface(), notify); +} + +void +EventsExecutor::remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) +{ + // This field is unused because we don't have to wake up the executor when a node is removed. + (void)notify; + + // Remove node from entities collector. + // This will result in un-setting all the event callbacks from its entities. + // After this function returns, this executor will not receive any more events associated + // to these entities. + entities_collector_->remove_node(node_ptr); +} + +void +EventsExecutor::remove_node(std::shared_ptr node_ptr, bool notify) +{ + this->remove_node(node_ptr->get_node_base_interface(), notify); +} + +void +EventsExecutor::execute_event(const ExecutorEvent & event) +{ + switch (event.type) { + case SUBSCRIPTION_EVENT: + { + auto subscription = entities_collector_->get_subscription(event.entity_id); + + if (subscription) { + execute_subscription(subscription); + } + break; + } + + case SERVICE_EVENT: + { + auto service = entities_collector_->get_service(event.entity_id); + + if (service) { + execute_service(service); + } + break; + } + + case CLIENT_EVENT: + { + auto client = entities_collector_->get_client(event.entity_id); + + if (client) { + execute_client(client); + } + break; + } + + case WAITABLE_EVENT: + { + auto waitable = entities_collector_->get_waitable(event.entity_id); + + if (waitable) { + auto data = waitable->take_data(); + waitable->execute(data); + } + break; + } + } +} + +void +EventsExecutor::add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify) +{ + // This field is unused because we don't have to wake up + // the executor when a callback group is added. + (void)notify; + entities_collector_->add_callback_group(group_ptr, node_ptr); +} + +void +EventsExecutor::remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify) +{ + // This field is unused because we don't have to wake up + // the executor when a callback group is removed. + (void)notify; + entities_collector_->remove_callback_group(group_ptr); +} + +std::vector +EventsExecutor::get_all_callback_groups() +{ + return entities_collector_->get_all_callback_groups(); +} + +std::vector +EventsExecutor::get_manually_added_callback_groups() +{ + return entities_collector_->get_manually_added_callback_groups(); +} + +std::vector +EventsExecutor::get_automatically_added_callback_groups_from_nodes() +{ + return entities_collector_->get_automatically_added_callback_groups_from_nodes(); +} diff --git a/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp b/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp new file mode 100644 index 0000000000..50d5d72537 --- /dev/null +++ b/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp @@ -0,0 +1,651 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include "rclcpp/executors/events_executor.hpp" +#include "rclcpp/executors/events_executor_entities_collector.hpp" + +using rclcpp::executors::EventsExecutorCallbackData; +using rclcpp::executors::EventsExecutorEntitiesCollector; + +EventsExecutorEntitiesCollector::EventsExecutorEntitiesCollector( + EventsExecutor * executor) +{ + if (executor == nullptr) { + throw std::runtime_error("Received NULL executor in EventsExecutorEntitiesCollector."); + } + + associated_executor_ = executor; + timers_manager_ = associated_executor_->timers_manager_; +} + +EventsExecutorEntitiesCollector::~EventsExecutorEntitiesCollector() +{ + // Disassociate all callback groups + for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) { + auto group = pair.first.lock(); + if (group) { + std::atomic_bool & has_executor = group->get_associated_with_executor_atomic(); + has_executor.store(false); + unset_callback_group_entities_callbacks(group); + } + } + for (const auto & pair : weak_groups_to_nodes_associated_with_executor_) { + auto group = pair.first.lock(); + if (group) { + std::atomic_bool & has_executor = group->get_associated_with_executor_atomic(); + has_executor.store(false); + unset_callback_group_entities_callbacks(group); + } + } + + // Disassociate all nodes + for (const auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (node) { + std::atomic_bool & has_executor = node->get_associated_with_executor_atomic(); + has_executor.store(false); + } + } + + // Unset nodes notify guard condition executor callback + for (const auto & pair : weak_nodes_to_guard_conditions_) { + auto node = pair.first.lock(); + if (node) { + auto node_gc = pair.second; + unset_guard_condition_callback(node_gc); + } + } + + // Clear all containers + weak_nodes_.clear(); + weak_clients_map_.clear(); + weak_services_map_.clear(); + callback_data_map_.clear(); + weak_waitables_map_.clear(); + weak_subscriptions_map_.clear(); + weak_nodes_to_guard_conditions_.clear(); + weak_groups_associated_with_executor_to_nodes_.clear(); + weak_groups_to_nodes_associated_with_executor_.clear(); +} + +void +EventsExecutorEntitiesCollector::init() +{ + // Add the EventsExecutorEntitiesCollector shared_ptr to waitables map + weak_waitables_map_.emplace(this, this->shared_from_this()); +} + +void +EventsExecutorEntitiesCollector::add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) +{ + // Check if the node already has an executor and if not, set this to true + std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); + if (has_executor.exchange(true)) { + throw std::runtime_error("Node has already been added to an executor."); + } + + // Get node callback groups, add them to weak_groups_to_nodes_associated_with_executor_ + for (const auto & weak_group : node_ptr->get_callback_groups()) { + auto group_ptr = weak_group.lock(); + if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load() && + group_ptr->automatically_add_to_executor_with_node()) + { + add_callback_group(group_ptr, node_ptr, weak_groups_to_nodes_associated_with_executor_); + } + } + + // Add node to weak_nodes_ + weak_nodes_.push_back(node_ptr); +} + +void +EventsExecutorEntitiesCollector::add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) +{ + add_callback_group(group_ptr, node_ptr, weak_groups_associated_with_executor_to_nodes_); +} + +void +EventsExecutorEntitiesCollector::add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) +{ + // If the callback_group already has an executor, throw error + std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); + if (has_executor.exchange(true)) { + throw std::runtime_error("Callback group has already been added to an executor."); + } + + bool is_new_node = !has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) && + !has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_); + + if (is_new_node) { + // Set an event callback for the node's notify guard condition, so if new entities are added + // or removed to this node we will receive an event. + set_guard_condition_callback(node_ptr->get_notify_guard_condition()); + + // Store node's notify guard condition + weak_nodes_to_guard_conditions_[node_ptr] = node_ptr->get_notify_guard_condition(); + } + + // Add callback group to weak_groups_to_node + rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr; + + auto insert_info = weak_groups_to_nodes.insert(std::make_pair(weak_group_ptr, node_ptr)); + + // Throw error if the group was already registered in the executor + bool was_inserted = insert_info.second; + if (!was_inserted) { + throw std::runtime_error("Callback group was already added to executor."); + } + + // For all entities in the callback group, set their event callback + set_callback_group_entities_callbacks(group_ptr); +} + +void +EventsExecutorEntitiesCollector::execute(std::shared_ptr & data) +{ + (void)data; + // This function is called when the associated executor is notified that something changed. + // We do not know if an entity has been added or remode so we have to rebuild everything. + + timers_manager_->clear(); + + // If a registered node has a new callback group, register the group. + add_callback_groups_from_nodes_associated_to_executor(); + + // For all groups registered in the executor, set their event callbacks. + set_entities_event_callbacks_from_map(weak_groups_associated_with_executor_to_nodes_); + set_entities_event_callbacks_from_map(weak_groups_to_nodes_associated_with_executor_); +} + +void +EventsExecutorEntitiesCollector::add_callback_groups_from_nodes_associated_to_executor() +{ + // Register new callback groups added to a node while already spinning + for (const auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (node) { + auto group_ptrs = node->get_callback_groups(); + std::for_each( + group_ptrs.begin(), group_ptrs.end(), + [this, node](rclcpp::CallbackGroup::WeakPtr group_ptr) + { + auto shared_group_ptr = group_ptr.lock(); + if (shared_group_ptr && shared_group_ptr->automatically_add_to_executor_with_node() && + !shared_group_ptr->get_associated_with_executor_atomic().load()) + { + add_callback_group( + shared_group_ptr, + node, + weak_groups_to_nodes_associated_with_executor_); + } + }); + } + } +} + +void +EventsExecutorEntitiesCollector::set_entities_event_callbacks_from_map( + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) +{ + for (const auto & pair : weak_groups_to_nodes) { + auto group = pair.first.lock(); + auto node = pair.second.lock(); + if (!node || !group || !group->can_be_taken_from().load()) { + continue; + } + set_callback_group_entities_callbacks(group); + } +} + +void +EventsExecutorEntitiesCollector::set_callback_group_entities_callbacks( + rclcpp::CallbackGroup::SharedPtr group) +{ + // Timers are handled by the timers manager + group->find_timer_ptrs_if( + [this](const rclcpp::TimerBase::SharedPtr & timer) { + if (timer) { + timers_manager_->add_timer(timer); + } + return false; + }); + + // Set callbacks for all other entity types + group->find_subscription_ptrs_if( + [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) { + if (subscription) { + weak_subscriptions_map_.emplace(subscription.get(), subscription); + + subscription->set_listener_callback( + &EventsExecutor::push_event, + get_callback_data(subscription.get(), SUBSCRIPTION_EVENT)); + } + return false; + }); + group->find_service_ptrs_if( + [this](const rclcpp::ServiceBase::SharedPtr & service) { + if (service) { + weak_services_map_.emplace(service.get(), service); + + service->set_listener_callback( + &EventsExecutor::push_event, + get_callback_data(service.get(), SERVICE_EVENT)); + } + return false; + }); + group->find_client_ptrs_if( + [this](const rclcpp::ClientBase::SharedPtr & client) { + if (client) { + weak_clients_map_.emplace(client.get(), client); + + client->set_listener_callback( + &EventsExecutor::push_event, + get_callback_data(client.get(), CLIENT_EVENT)); + } + return false; + }); + group->find_waitable_ptrs_if( + [this](const rclcpp::Waitable::SharedPtr & waitable) { + if (waitable) { + weak_waitables_map_.emplace(waitable.get(), waitable); + + waitable->set_listener_callback( + &EventsExecutor::push_event, + get_callback_data(waitable.get(), WAITABLE_EVENT)); + } + return false; + }); +} + +void +EventsExecutorEntitiesCollector::unset_callback_group_entities_callbacks( + rclcpp::CallbackGroup::SharedPtr group) +{ + // Timers are handled by the timers manager + group->find_timer_ptrs_if( + [this](const rclcpp::TimerBase::SharedPtr & timer) { + if (timer) { + timers_manager_->remove_timer(timer); + } + return false; + }); + + // Unset callbacks for all other entity types + group->find_subscription_ptrs_if( + [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) { + if (subscription) { + subscription->set_listener_callback(nullptr, nullptr); + weak_subscriptions_map_.erase(subscription.get()); + remove_callback_data(subscription.get(), SUBSCRIPTION_EVENT); + } + return false; + }); + group->find_service_ptrs_if( + [this](const rclcpp::ServiceBase::SharedPtr & service) { + if (service) { + service->set_listener_callback(nullptr, nullptr); + weak_services_map_.erase(service.get()); + remove_callback_data(service.get(), SERVICE_EVENT); + } + return false; + }); + group->find_client_ptrs_if( + [this](const rclcpp::ClientBase::SharedPtr & client) { + if (client) { + client->set_listener_callback(nullptr, nullptr); + weak_clients_map_.erase(client.get()); + remove_callback_data(client.get(), CLIENT_EVENT); + } + return false; + }); + group->find_waitable_ptrs_if( + [this](const rclcpp::Waitable::SharedPtr & waitable) { + if (waitable) { + waitable->set_listener_callback(nullptr, nullptr); + weak_waitables_map_.erase(waitable.get()); + remove_callback_data(waitable.get(), WAITABLE_EVENT); + } + return false; + }); +} + +void +EventsExecutorEntitiesCollector::remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr) +{ + this->remove_callback_group_from_map( + group_ptr, + weak_groups_associated_with_executor_to_nodes_); +} + +void +EventsExecutorEntitiesCollector::remove_callback_group_from_map( + rclcpp::CallbackGroup::SharedPtr group_ptr, + WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) +{ + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr; + rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr; + + // Look for the group to remove in the map + auto iter = weak_groups_to_nodes.find(weak_group_ptr); + if (iter == weak_groups_to_nodes.end()) { + // Group not found. + throw std::runtime_error("Callback group needs to be associated with this executor."); + } + + // Group found, get its associated node. + node_ptr = iter->second.lock(); + if (node_ptr == nullptr) { + throw std::runtime_error("Node must not be deleted before its callback group(s)."); + } + // Remove group from map + weak_groups_to_nodes.erase(iter); + + // For all the entities in the group, unset their callbacks + unset_callback_group_entities_callbacks(group_ptr); + + // Check if this node still has other callback groups associated with the executor + bool node_has_associated_callback_groups = + has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) || + has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_); + + if (!node_has_associated_callback_groups) { + // Node doesn't have more callback groups associated to the executor. + // Unset the event callback for the node's notify guard condition, to stop + // receiving events if entities are added or removed to this node. + unset_guard_condition_callback(node_ptr->get_notify_guard_condition()); + + // Remove guard condition from list + rclcpp::node_interfaces::NodeBaseInterface::WeakPtr weak_node_ptr(node_ptr); + weak_nodes_to_guard_conditions_.erase(weak_node_ptr); + } +} + +void +EventsExecutorEntitiesCollector::remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) +{ + if (!node_ptr->get_associated_with_executor_atomic().load()) { + throw std::runtime_error("Node needs to be associated with an executor."); + return; + } + // Check if this node is currently stored here + auto node_it = weak_nodes_.begin(); + while (node_it != weak_nodes_.end()) { + bool matched = (node_it->lock() == node_ptr); + if (matched) { + weak_nodes_.erase(node_it); + break; + } + ++node_it; + } + if (node_it == weak_nodes_.end()) { + // The node is not stored here + throw std::runtime_error("Tried to remove node not stored in this executor."); + return; + } + + // Find callback groups belonging to the node to remove + std::vector found_group_ptrs; + std::for_each( + weak_groups_to_nodes_associated_with_executor_.begin(), + weak_groups_to_nodes_associated_with_executor_.end(), + [&found_group_ptrs, node_ptr](std::pair key_value_pair) { + auto & weak_node_ptr = key_value_pair.second; + auto shared_node_ptr = weak_node_ptr.lock(); + auto group_ptr = key_value_pair.first.lock(); + if (shared_node_ptr == node_ptr) { + found_group_ptrs.push_back(group_ptr); + } + }); + // Remove those callback groups + std::for_each( + found_group_ptrs.begin(), found_group_ptrs.end(), [this] + (rclcpp::CallbackGroup::SharedPtr group_ptr) { + this->remove_callback_group_from_map( + group_ptr, + weak_groups_to_nodes_associated_with_executor_); + }); + + // Set that the node does not have an executor anymore + std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); + has_executor.store(false); +} + +// Returns true if the map has the node_ptr +bool +EventsExecutorEntitiesCollector::has_node( + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const +{ + return std::find_if( + weak_groups_to_nodes.begin(), + weak_groups_to_nodes.end(), + [&](const WeakCallbackGroupsToNodesMap::value_type & other) -> bool { + auto other_ptr = other.second.lock(); + return other_ptr == node_ptr; + }) != weak_groups_to_nodes.end(); +} + +std::vector +EventsExecutorEntitiesCollector::get_all_callback_groups() +{ + std::vector groups; + for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { + groups.push_back(group_node_ptr.first); + } + for (const auto & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) { + groups.push_back(group_node_ptr.first); + } + return groups; +} + +std::vector +EventsExecutorEntitiesCollector::get_manually_added_callback_groups() +{ + std::vector groups; + for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { + groups.push_back(group_node_ptr.first); + } + return groups; +} + +std::vector +EventsExecutorEntitiesCollector::get_automatically_added_callback_groups_from_nodes() +{ + std::vector groups; + for (const auto & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) { + groups.push_back(group_node_ptr.first); + } + return groups; +} + +void +EventsExecutorEntitiesCollector::set_guard_condition_callback( + const rcl_guard_condition_t * guard_condition) +{ + rcl_ret_t ret = rcl_guard_condition_set_listener_callback( + guard_condition, + &EventsExecutor::push_event, + get_callback_data(this, WAITABLE_EVENT), + false /* Discard previous events */); + + if (ret != RCL_RET_OK) { + throw std::runtime_error("Couldn't set guard condition event callback"); + } +} + +void +EventsExecutorEntitiesCollector::unset_guard_condition_callback( + const rcl_guard_condition_t * guard_condition) +{ + rcl_ret_t ret = rcl_guard_condition_set_listener_callback( + guard_condition, + nullptr, + nullptr, + false /* Discard previous events */); + + if (ret != RCL_RET_OK) { + throw std::runtime_error("Couldn't unset guard condition event callback"); + } + + remove_callback_data(this, WAITABLE_EVENT); +} + +rclcpp::SubscriptionBase::SharedPtr +EventsExecutorEntitiesCollector::get_subscription(const void * subscription_id) +{ + auto it = weak_subscriptions_map_.find(subscription_id); + + if (it != weak_subscriptions_map_.end()) { + auto subscription_weak_ptr = it->second; + auto subscription_shared_ptr = subscription_weak_ptr.lock(); + + if (subscription_shared_ptr) { + return subscription_shared_ptr; + } + + // The subscription expired, remove from map + weak_subscriptions_map_.erase(it); + } + return nullptr; +} + +rclcpp::ClientBase::SharedPtr +EventsExecutorEntitiesCollector::get_client(const void * client_id) +{ + auto it = weak_clients_map_.find(client_id); + + if (it != weak_clients_map_.end()) { + auto client_weak_ptr = it->second; + auto client_shared_ptr = client_weak_ptr.lock(); + + if (client_shared_ptr) { + return client_shared_ptr; + } + + // The client expired, remove from map + weak_clients_map_.erase(it); + } + return nullptr; +} + +rclcpp::ServiceBase::SharedPtr +EventsExecutorEntitiesCollector::get_service(const void * service_id) +{ + auto it = weak_services_map_.find(service_id); + + if (it != weak_services_map_.end()) { + auto service_weak_ptr = it->second; + auto service_shared_ptr = service_weak_ptr.lock(); + + if (service_shared_ptr) { + return service_shared_ptr; + } + + // The service expired, remove from map + weak_services_map_.erase(it); + } + return nullptr; +} + +rclcpp::Waitable::SharedPtr +EventsExecutorEntitiesCollector::get_waitable(const void * waitable_id) +{ + auto it = weak_waitables_map_.find(waitable_id); + + if (it != weak_waitables_map_.end()) { + auto waitable_weak_ptr = it->second; + auto waitable_shared_ptr = waitable_weak_ptr.lock(); + + if (waitable_shared_ptr) { + return waitable_shared_ptr; + } + + // The waitable expired, remove from map + weak_waitables_map_.erase(it); + } + return nullptr; +} + +void +EventsExecutorEntitiesCollector::add_waitable(rclcpp::Waitable::SharedPtr waitable) +{ + weak_waitables_map_.emplace(waitable.get(), waitable); + + waitable->set_listener_callback( + &EventsExecutor::push_event, + get_callback_data(waitable.get(), WAITABLE_EVENT)); +} + +const EventsExecutorCallbackData * +EventsExecutorEntitiesCollector::get_callback_data( + void * entity_id, ExecutorEventType event_type) +{ + // Create an entity callback data object and check if + // we already have stored one like it + ExecutorEvent event = {entity_id, event_type}; + EventsExecutorCallbackData data(associated_executor_, event); + + auto it = callback_data_map_.find(data); + + if (it != callback_data_map_.end()) { + // We found a callback data matching entity ID and type. + // Increment callback data counter and return pointer to data + it->second++; + return &it->first; + } + + // There was no callback data object matching ID and type, + // create one and set counter to 1. + callback_data_map_.emplace(data, 1); + + // Return a pointer to the just added entity callback data. + it = callback_data_map_.find(data); + return &it->first; +} + +void +EventsExecutorEntitiesCollector::remove_callback_data( + void * entity_id, ExecutorEventType event_type) +{ + // Create an entity callback data object and check if + // we already have stored one like it + ExecutorEvent event = {entity_id, event_type}; + EventsExecutorCallbackData data(associated_executor_, event); + + auto it = callback_data_map_.find(data); + + if (it != callback_data_map_.end()) { + // We found a callback data matching entity ID and type. + // If we have more than 1 decrement counter, otherwise remove it. + if (it->second > 1) { + it->second--; + } else { + callback_data_map_.erase(it); + } + } +} diff --git a/rclcpp/src/rclcpp/executors/timers_manager.cpp b/rclcpp/src/rclcpp/executors/timers_manager.cpp new file mode 100644 index 0000000000..66c6a9e09c --- /dev/null +++ b/rclcpp/src/rclcpp/executors/timers_manager.cpp @@ -0,0 +1,257 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "rclcpp/executors/timers_manager.hpp" + +using rclcpp::executors::TimersManager; + +constexpr std::chrono::nanoseconds TimersManager::MAX_TIME; + +TimersManager::TimersManager(std::shared_ptr context) +{ + context_ = context; +} + +TimersManager::~TimersManager() +{ + // Remove all timers + this->clear(); + + // Make sure timers thread is stopped before destroying this object + this->stop(); +} + +void TimersManager::add_timer(rclcpp::TimerBase::SharedPtr timer) +{ + if (!timer) { + throw std::invalid_argument("TimersManager::add_timer() trying to add nullptr timer"); + } + + bool added = false; + { + std::unique_lock timers_lock(timers_mutex_); + added = weak_timers_heap_.add_timer(timer); + timers_updated_ = timers_updated_ || added; + } + + if (added) { + // Notify that a timer has been added + timers_cv_.notify_one(); + } +} + +void TimersManager::start() +{ + // Make sure that the thread is not already running + if (running_.exchange(true)) { + throw std::runtime_error("TimersManager::start() can't start timers thread as already running"); + } + + timers_thread_ = std::thread(&TimersManager::run_timers, this); +} + +void TimersManager::stop() +{ + // Nothing to do if the timers thread is not running + // or if another thread already signaled to stop. + if (!running_.exchange(false)) { + return; + } + + // Notify the timers manager thread to wake up + { + std::unique_lock timers_lock(timers_mutex_); + timers_updated_ = true; + } + timers_cv_.notify_one(); + + // Join timers thread if it's running + if (timers_thread_.joinable()) { + timers_thread_.join(); + } +} + +std::chrono::nanoseconds TimersManager::get_head_timeout() +{ + // Do not allow to interfere with the thread running + if (running_) { + throw std::runtime_error( + "TimersManager::get_head_timeout() can't be used while timers thread is running"); + } + + std::unique_lock lock(timers_mutex_); + return this->get_head_timeout_unsafe(); +} + +void TimersManager::execute_ready_timers() +{ + // Do not allow to interfere with the thread running + if (running_) { + throw std::runtime_error( + "TimersManager::execute_ready_timers() can't be used while timers thread is running"); + } + + std::unique_lock lock(timers_mutex_); + this->execute_ready_timers_unsafe(); +} + +bool TimersManager::execute_head_timer( + std::chrono::time_point tp) +{ + // Do not allow to interfere with the thread running + if (running_) { + throw std::runtime_error( + "TimersManager::execute_head_timer() can't be used while timers thread is running"); + } + + std::unique_lock lock(timers_mutex_); + + TimersHeap timers_heap = weak_timers_heap_.validate_and_lock(); + + // Nothing to do if we don't have any timer + if (timers_heap.empty()) { + return false; + } + + TimerPtr head = timers_heap.front(); + + bool timer_ready = false; + if (tp != std::chrono::time_point::max()) { + timer_ready = timer_was_ready_at_tp(head, tp); + } else { + timer_ready = head->is_ready(); + } + + if (timer_ready) { + // Head timer is ready, execute + head->execute_callback(); + timers_heap.heapify_root(); + weak_timers_heap_.store(timers_heap); + return true; + } + + // Head timer was not ready yet + return false; +} + +std::chrono::nanoseconds TimersManager::get_head_timeout_unsafe() +{ + // If we don't have any weak pointer, then we just return maximum timeout + if (weak_timers_heap_.empty()) { + return MAX_TIME; + } + + // Weak heap is not empty, so try to lock the first element + TimerPtr head_timer = weak_timers_heap_.front().lock(); + // If it is still a valid pointer, it is guaranteed to be the correct head + if (head_timer != nullptr) { + return head_timer->time_until_trigger(); + } + + // If the first elements has expired, we can't make other assumptions on the heap + // and we need to entirely validate it. + TimersHeap locked_heap = weak_timers_heap_.validate_and_lock(); + + // NOTE: the following operations will not modify any element in the heap, so we + // don't have to call `weak_timers_heap_.store(locked_heap)` at the end. + + if (locked_heap.empty()) { + return MAX_TIME; + } + return locked_heap.front()->time_until_trigger(); +} + +void TimersManager::execute_ready_timers_unsafe() +{ + // We start by locking the timers + TimersHeap locked_heap = weak_timers_heap_.validate_and_lock(); + + // Nothing to do if we don't have any timer + if (locked_heap.empty()) { + return; + } + + // Keep executing timers until they are ready and they were already ready when we started. + // The second check prevents this function from blocking indefinitely if the + // time required for executing the timers is longer than their period. + + TimerPtr head = locked_heap.front(); + auto start_time = std::chrono::steady_clock::now(); + while (head->is_ready() && this->timer_was_ready_at_tp(head, start_time)) { + // Execute head timer + head->execute_callback(); + // Executing a timer will result in updating its time_until_trigger, so re-heapify + locked_heap.heapify_root(); + // Get new head timer + head = locked_heap.front(); + } + + // After having performed work on the locked heap we reflect the changes to weak one. + // Timers will be already sorted the next time we need them if none went out of scope. + weak_timers_heap_.store(locked_heap); +} + +void TimersManager::run_timers() +{ + while (rclcpp::ok(context_) && running_) { + // Lock mutex + std::unique_lock timers_lock(timers_mutex_); + + std::chrono::nanoseconds time_to_sleep = get_head_timeout_unsafe(); + // Wait until timeout or notification that timers have been updated + timers_cv_.wait_for(timers_lock, time_to_sleep, [this]() {return timers_updated_;}); + // Reset timers updated flag + timers_updated_ = false; + + // Execute timers + this->execute_ready_timers_unsafe(); + } + + // Make sure the running flag is set to false when we exit from this function + // to allow restarting the timers thread. + running_ = false; +} + +void TimersManager::clear() +{ + { + // Lock mutex and then clear all data structures + std::unique_lock timers_lock(timers_mutex_); + weak_timers_heap_.clear(); + + timers_updated_ = true; + } + + // Notify timers thread such that it can re-compute its timeout + timers_cv_.notify_one(); +} + +void TimersManager::remove_timer(TimerPtr timer) +{ + bool removed = false; + { + std::unique_lock timers_lock(timers_mutex_); + removed = weak_timers_heap_.remove_timer(timer); + + timers_updated_ = timers_updated_ || removed; + } + + if (removed) { + // Notify timers thread such that it can re-compute its timeout + timers_cv_.notify_one(); + } +} diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index d26c3ed07d..fba6117904 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -569,6 +569,33 @@ if(TARGET test_executors) target_link_libraries(test_executors ${PROJECT_NAME}) endif() +ament_add_gtest(test_events_executor executors/test_events_executor.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}") +if(TARGET test_events_executor) + ament_target_dependencies(test_events_executor + "rcl" + "test_msgs") + target_link_libraries(test_events_executor ${PROJECT_NAME} mimick) +endif() + +ament_add_gtest(test_events_queue executors/test_events_queue.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}") +if(TARGET test_events_queue) + ament_target_dependencies(test_events_queue + "rcl" + "test_msgs") + target_link_libraries(test_events_queue ${PROJECT_NAME}) +endif() + +ament_add_gtest(test_events_executor_entities_collector executors/test_events_executor_entities_collector.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}") +if(TARGET test_events_executor_entities_collector) + ament_target_dependencies(test_events_executor_entities_collector + "rcl" + "test_msgs") + target_link_libraries(test_events_executor_entities_collector ${PROJECT_NAME} mimick) +endif() + ament_add_gtest(test_static_single_threaded_executor executors/test_static_single_threaded_executor.cpp APPEND_LIBRARY_DIRS "${append_library_dirs}") if(TARGET test_static_single_threaded_executor) @@ -594,6 +621,14 @@ if(TARGET test_static_executor_entities_collector) target_link_libraries(test_static_executor_entities_collector ${PROJECT_NAME} mimick) endif() +ament_add_gtest(test_timers_manager executors/test_timers_manager.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}") +if(TARGET test_timers_manager) + ament_target_dependencies(test_timers_manager + "rcl") + target_link_libraries(test_timers_manager ${PROJECT_NAME}) +endif() + ament_add_gtest(test_guard_condition test_guard_condition.cpp APPEND_LIBRARY_DIRS "${append_library_dirs}") if(TARGET test_guard_condition) diff --git a/rclcpp/test/rclcpp/executors/test_events_executor.cpp b/rclcpp/test/rclcpp/executors/test_events_executor.cpp new file mode 100644 index 0000000000..949b0ee994 --- /dev/null +++ b/rclcpp/test/rclcpp/executors/test_events_executor.cpp @@ -0,0 +1,374 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include + +#include "rclcpp/executors/events_executor.hpp" + +#include "test_msgs/srv/empty.hpp" +#include "test_msgs/msg/empty.hpp" + +#include "../../mocking_utils/patch.hpp" + +using namespace std::chrono_literals; + +using rclcpp::executors::EventsExecutor; +using rclcpp::executors::EventsExecutorNotifyWaitable; + +class TestEventsExecutor : public ::testing::Test +{ +public: + void SetUp() + { + rclcpp::init(0, nullptr); + } + + void TearDown() + { + rclcpp::shutdown(); + } +}; + +TEST_F(TestEventsExecutor, notify_waitable) +{ + auto notifier = std::make_shared(); + + // Waitset methods can't be used on EventsWaitable + rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); + EXPECT_THROW(notifier->add_to_wait_set(&wait_set), std::runtime_error); + EXPECT_THROW(notifier->is_ready(&wait_set), std::runtime_error); + + { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_guard_condition_set_listener_callback, RCL_RET_ERROR); + EXPECT_THROW(std::make_shared(), std::runtime_error); + } +} + +TEST_F(TestEventsExecutor, run_clients_servers) +{ + auto node = std::make_shared("node"); + + bool request_received = false; + bool response_received = false; + auto service = + node->create_service( + "service", + [&request_received]( + const test_msgs::srv::Empty::Request::SharedPtr, + test_msgs::srv::Empty::Response::SharedPtr) + { + request_received = true; + }); + auto client = node->create_client("service"); + + EventsExecutor executor; + executor.add_node(node); + + bool spin_exited = false; + std::thread spinner([&spin_exited, &executor, this]() { + executor.spin(); + spin_exited = true; + }); + + auto request = std::make_shared(); + client->async_send_request( + request, + [&response_received](rclcpp::Client::SharedFuture result_future) { + (void)result_future; + response_received = true; + }); + + // Wait some time for the client-server to be invoked + auto start = std::chrono::steady_clock::now(); + while ( + !response_received && + !spin_exited && + (std::chrono::steady_clock::now() - start < 1s)) + { + std::this_thread::sleep_for(5ms); + } + + executor.cancel(); + spinner.join(); + executor.remove_node(node); + + EXPECT_TRUE(request_received); + EXPECT_TRUE(response_received); + EXPECT_TRUE(spin_exited); +} + +TEST_F(TestEventsExecutor, spin_once_max_duration) +{ + { + auto node = std::make_shared("node"); + + size_t t_runs = 0; + auto t = node->create_wall_timer( + 10s, + [&]() { + t_runs++; + }); + + EventsExecutor executor; + executor.add_node(node); + + auto start = std::chrono::steady_clock::now(); + executor.spin_once(10ms); + + EXPECT_EQ(0u, t_runs); + EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms); + } + + { + auto node = std::make_shared("node"); + + size_t t_runs = 0; + auto t = node->create_wall_timer( + 10ms, + [&]() { + t_runs++; + }); + + EventsExecutor executor; + executor.add_node(node); + + auto start = std::chrono::steady_clock::now(); + executor.spin_once(10s); + + EXPECT_EQ(1u, t_runs); + EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms); + } +} + +TEST_F(TestEventsExecutor, spin_some_max_duration) +{ + { + auto node = std::make_shared("node"); + + size_t t_runs = 0; + auto t = node->create_wall_timer( + 10s, + [&]() { + t_runs++; + }); + + EventsExecutor executor; + executor.add_node(node); + + auto start = std::chrono::steady_clock::now(); + executor.spin_some(10ms); + + EXPECT_EQ(0u, t_runs); + EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms); + } + + { + auto node = std::make_shared("node"); + + size_t t_runs = 0; + auto t = node->create_wall_timer( + 10ms, + [&]() { + t_runs++; + }); + + // Sleep some time for the timer to be ready when spin + std::this_thread::sleep_for(10ms); + + EventsExecutor executor; + executor.add_node(node); + + auto start = std::chrono::steady_clock::now(); + executor.spin_some(10s); + + EXPECT_EQ(1u, t_runs); + EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms); + } +} + +TEST_F(TestEventsExecutor, spin_some_zero_duration) +{ + auto node = std::make_shared("node"); + + size_t t_runs = 0; + auto t = node->create_wall_timer( + 20ms, + [&]() { + t_runs++; + }); + + // Sleep some time for the timer to be ready when spin + std::this_thread::sleep_for(20ms); + + EventsExecutor executor; + executor.add_node(node); + executor.spin_some(0ms); + + EXPECT_EQ(1u, t_runs); +} + +TEST_F(TestEventsExecutor, spin_all_max_duration) +{ + { + auto node = std::make_shared("node"); + + size_t t_runs = 0; + auto t = node->create_wall_timer( + 10s, + [&]() { + t_runs++; + }); + + EventsExecutor executor; + executor.add_node(node); + + auto start = std::chrono::steady_clock::now(); + executor.spin_all(10ms); + + EXPECT_EQ(0u, t_runs); + EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms); + } + + { + auto node = std::make_shared("node"); + + size_t t_runs = 0; + auto t = node->create_wall_timer( + 10ms, + [&]() { + t_runs++; + }); + + // Sleep some time for the timer to be ready when spin + std::this_thread::sleep_for(10ms); + + EventsExecutor executor; + executor.add_node(node); + + auto start = std::chrono::steady_clock::now(); + executor.spin_all(10s); + + EXPECT_EQ(1u, t_runs); + EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms); + } + + EventsExecutor executor; + EXPECT_THROW(executor.spin_all(0ms), std::invalid_argument); + EXPECT_THROW(executor.spin_all(-5ms), std::invalid_argument); +} + +TEST_F(TestEventsExecutor, cancel_while_timers_running) +{ + auto node = std::make_shared("node"); + + size_t t1_runs = 0; + auto t1 = node->create_wall_timer( + 1ms, + [&]() { + t1_runs++; + std::this_thread::sleep_for(25ms); + }); + + size_t t2_runs = 0; + auto t2 = node->create_wall_timer( + 1ms, + [&]() { + t2_runs++; + std::this_thread::sleep_for(25ms); + }); + + EventsExecutor executor; + executor.add_node(node); + + std::thread spinner([&executor, this]() {executor.spin();}); + + std::this_thread::sleep_for(10ms); + // Call cancel while t1 callback is still being executed + executor.cancel(); + spinner.join(); + + // Depending on the latency on the system, t2 may start to execute before cancel is signaled + EXPECT_GE(1u, t1_runs); + EXPECT_GE(1u, t2_runs); +} + +TEST_F(TestEventsExecutor, cancel_while_timers_waiting) +{ + auto node = std::make_shared("node"); + + size_t t1_runs = 0; + auto t1 = node->create_wall_timer( + 100s, + [&]() { + t1_runs++; + }); + + EventsExecutor executor; + executor.add_node(node); + + auto start = std::chrono::steady_clock::now(); + std::thread spinner([&executor, this]() {executor.spin();}); + + std::this_thread::sleep_for(10ms); + executor.cancel(); + spinner.join(); + + EXPECT_EQ(0u, t1_runs); + EXPECT_TRUE(std::chrono::steady_clock::now() - start < 1s); +} + +TEST_F(TestEventsExecutor, destroy_entities) +{ + // Create a publisher node and start publishing messages + auto node_pub = std::make_shared("node_pub"); + auto publisher = node_pub->create_publisher("topic", rclcpp::QoS(10)); + auto timer = node_pub->create_wall_timer( + 2ms, [&]() {publisher->publish(std::make_unique());}); + EventsExecutor executor_pub; + executor_pub.add_node(node_pub); + std::thread spinner([&executor_pub, this]() {executor_pub.spin();}); + + // Create a node with two different subscriptions to the topic + auto node_sub = std::make_shared("node_sub"); + size_t callback_count_1 = 0; + auto subscription_1 = + node_sub->create_subscription( + "topic", rclcpp::QoS(10), [&](test_msgs::msg::Empty::SharedPtr) {callback_count_1++;}); + size_t callback_count_2 = 0; + auto subscription_2 = + node_sub->create_subscription( + "topic", rclcpp::QoS(10), [&](test_msgs::msg::Empty::SharedPtr) {callback_count_2++;}); + EventsExecutor executor_sub; + executor_sub.add_node(node_sub); + + // Wait some time while messages are published + std::this_thread::sleep_for(10ms); + + // Destroy one of the two subscriptions + subscription_1.reset(); + + // Let subscriptions executor spin + executor_sub.spin_some(10ms); + + // The callback count of the destroyed subscription remained at 0 + EXPECT_EQ(0u, callback_count_1); + EXPECT_LT(0u, callback_count_2); + + executor_pub.cancel(); + spinner.join(); +} diff --git a/rclcpp/test/rclcpp/executors/test_events_executor_entities_collector.cpp b/rclcpp/test/rclcpp/executors/test_events_executor_entities_collector.cpp new file mode 100644 index 0000000000..3f11f983fe --- /dev/null +++ b/rclcpp/test/rclcpp/executors/test_events_executor_entities_collector.cpp @@ -0,0 +1,178 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include "rclcpp/rclcpp.hpp" + +#include "../../mocking_utils/patch.hpp" + +class TestEventsExecutorEntitiesCollector : public ::testing::Test +{ +public: + void SetUp() + { + rclcpp::init(0, nullptr); + dummy_executor_ = std::make_shared(); + entities_collector_ = + std::make_shared(dummy_executor_.get()); + } + + void TearDown() + { + rclcpp::shutdown(); + } + + rclcpp::executors::EventsExecutorEntitiesCollector::SharedPtr entities_collector_; + +private: + rclcpp::executors::EventsExecutor::SharedPtr dummy_executor_; +}; + +TEST_F(TestEventsExecutorEntitiesCollector, bad_init) +{ + EXPECT_THROW( + std::make_shared(nullptr), + std::runtime_error); +} + +TEST_F(TestEventsExecutorEntitiesCollector, add_remove_node) +{ + auto node1 = std::make_shared("node1", "ns"); + EXPECT_NO_THROW(entities_collector_->add_node(node1->get_node_base_interface())); + + // Check adding second time + EXPECT_THROW( + entities_collector_->add_node(node1->get_node_base_interface()), + std::runtime_error); + + auto node2 = std::make_shared("node2", "ns"); + EXPECT_THROW( + entities_collector_->remove_node(node2->get_node_base_interface()), + std::runtime_error); + EXPECT_NO_THROW(entities_collector_->add_node(node2->get_node_base_interface())); + + EXPECT_NO_THROW(entities_collector_->remove_node(node1->get_node_base_interface())); + EXPECT_THROW( + entities_collector_->remove_node(node1->get_node_base_interface()), + std::runtime_error); + EXPECT_NO_THROW(entities_collector_->remove_node(node2->get_node_base_interface())); + + auto node3 = std::make_shared("node3", "ns"); + node3->get_node_base_interface()->get_associated_with_executor_atomic().exchange(true); + EXPECT_THROW( + entities_collector_->remove_node(node3->get_node_base_interface()), + std::runtime_error); +} + +TEST_F(TestEventsExecutorEntitiesCollector, add_callback_group) +{ + auto node = std::make_shared("node1", "ns"); + rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + entities_collector_->add_callback_group(cb_group, node->get_node_base_interface()); + ASSERT_EQ(entities_collector_->get_all_callback_groups().size(), 1u); +} + +TEST_F(TestEventsExecutorEntitiesCollector, add_callback_group_after_add_node) +{ + auto node = std::make_shared("node1", "ns"); + rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + entities_collector_->add_node(node->get_node_base_interface()); + EXPECT_THROW( + entities_collector_->add_callback_group(cb_group, node->get_node_base_interface()), + std::runtime_error); +} + +TEST_F(TestEventsExecutorEntitiesCollector, add_callback_group_twice) +{ + auto node = std::make_shared("node1", "ns"); + rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + entities_collector_->add_callback_group(cb_group, node->get_node_base_interface()); + ASSERT_EQ(entities_collector_->get_all_callback_groups().size(), 1u); + cb_group->get_associated_with_executor_atomic().exchange(false); + EXPECT_THROW( + entities_collector_->add_callback_group(cb_group, node->get_node_base_interface()), + std::runtime_error); +} + +TEST_F(TestEventsExecutorEntitiesCollector, remove_callback_group_after_node) +{ + auto node = std::make_shared("node1", "ns"); + rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + entities_collector_->add_callback_group(cb_group, node->get_node_base_interface()); + ASSERT_EQ(entities_collector_->get_all_callback_groups().size(), 1u); + + node.reset(); + + EXPECT_THROW( + entities_collector_->remove_callback_group(cb_group), + std::runtime_error); +} + +TEST_F(TestEventsExecutorEntitiesCollector, remove_callback_group_twice) +{ + auto node = std::make_shared("node1", "ns"); + rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + entities_collector_->add_callback_group(cb_group, node->get_node_base_interface()); + ASSERT_EQ(entities_collector_->get_all_callback_groups().size(), 1u); + + entities_collector_->remove_callback_group(cb_group); + + EXPECT_THROW( + entities_collector_->remove_callback_group(cb_group), + std::runtime_error); +} + +TEST_F(TestEventsExecutorEntitiesCollector, remove_node_opposite_order) +{ + auto node1 = std::make_shared("node1", "ns"); + EXPECT_NO_THROW(entities_collector_->add_node(node1->get_node_base_interface())); + + auto node2 = std::make_shared("node2", "ns"); + EXPECT_NO_THROW(entities_collector_->add_node(node2->get_node_base_interface())); + + EXPECT_NO_THROW(entities_collector_->remove_node(node2->get_node_base_interface())); +} + +TEST_F(TestEventsExecutorEntitiesCollector, test_rcl_exception) +{ + auto node1 = std::make_shared("node1", "ns"); + auto node2 = std::make_shared("node2", "ns"); + entities_collector_->add_node(node1->get_node_base_interface()); + + { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_guard_condition_set_listener_callback, RCL_RET_ERROR); + + EXPECT_THROW( + entities_collector_->add_node(node2->get_node_base_interface()), + std::runtime_error); + + EXPECT_THROW( + entities_collector_->remove_node(node1->get_node_base_interface()), + std::runtime_error); + } +} diff --git a/rclcpp/test/rclcpp/executors/test_events_queue.cpp b/rclcpp/test/rclcpp/executors/test_events_queue.cpp new file mode 100644 index 0000000000..9d600e61f7 --- /dev/null +++ b/rclcpp/test/rclcpp/executors/test_events_queue.cpp @@ -0,0 +1,65 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +#include +#include +#include + +#include "rclcpp/experimental/buffers/simple_events_queue.hpp" + +using namespace std::chrono_literals; + +TEST(TestEventsQueue, SimpleQueueTest) +{ + // Create a SimpleEventsQueue and a local queue + auto simple_queue = std::make_unique(); + std::queue local_events_queue; + + // Make sure the queue is empty after init + simple_queue->init(); + EXPECT_TRUE(simple_queue->empty()); + + // Push 11 messages + for (int i = 0; i < 11; i++) { + rclcpp::executors::ExecutorEvent stub_event; + simple_queue->push(stub_event); + } + + // Pop one message + simple_queue->pop(); + + local_events_queue = simple_queue->pop_all_events(); + + // We should have (11 - 1) events in the local queue + size_t local_queue_size = local_events_queue.size(); + + // The local queue size should be 10 + EXPECT_EQ(10u, local_queue_size); + + // The simple queue should be empty after taking all events + EXPECT_TRUE(simple_queue->empty()); + + // Lets push an event into the queue and get it back + rclcpp::executors::ExecutorEvent push_event = {simple_queue.get(), + rclcpp::executors::ExecutorEventType::SUBSCRIPTION_EVENT}; + + simple_queue->push(push_event); + + rclcpp::executors::ExecutorEvent front_event = simple_queue->front(); + + // The events should be equal + EXPECT_EQ(push_event.entity_id, front_event.entity_id); + EXPECT_EQ(push_event.type, front_event.type); +} diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index ad34273f68..ab681692d5 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -90,7 +90,8 @@ using ExecutorTypes = ::testing::Types< rclcpp::executors::SingleThreadedExecutor, rclcpp::executors::MultiThreadedExecutor, - rclcpp::executors::StaticSingleThreadedExecutor>; + rclcpp::executors::StaticSingleThreadedExecutor, + rclcpp::executors::EventsExecutor>; class ExecutorTypeNames { @@ -111,6 +112,10 @@ class ExecutorTypeNames return "StaticSingleThreadedExecutor"; } + if (std::is_same()) { + return "EventsExecutor"; + } + return ""; } }; @@ -123,9 +128,10 @@ TYPED_TEST_SUITE(TestExecutors, ExecutorTypes, ExecutorTypeNames); // https://github.com/ros2/rclcpp/issues/1219 using StandardExecutors = ::testing::Types< - rclcpp::executors::SingleThreadedExecutor, + rclcpp::executors::EventsExecutor, rclcpp::executors::MultiThreadedExecutor>; -TYPED_TEST_SUITE(TestExecutorsStable, StandardExecutors, ExecutorTypeNames); + rclcpp::executors::SingleThreadedExecutor>; +TYPED_TEST_CASE(TestExecutorsStable, StandardExecutors, ExecutorTypeNames); // Make sure that executors detach from nodes when destructing TYPED_TEST(TestExecutors, detachOnDestruction) { @@ -156,7 +162,7 @@ TYPED_TEST(TestExecutorsStable, addTemporaryNode) { // Sleep for a short time to verify executor.spin() is going, and didn't throw. std::thread spinner([&]() {EXPECT_NO_THROW(executor.spin());}); - std::this_thread::sleep_for(50ms); + std::this_thread::sleep_for(200ms); executor.cancel(); spinner.join(); } @@ -422,10 +428,12 @@ class TestWaitable : public rclcpp::Waitable add_to_wait_set(rcl_wait_set_t * wait_set) override { rcl_ret_t ret = rcl_wait_set_add_guard_condition(wait_set, &gc_, NULL); - if (RCL_RET_OK != ret) { - return false; - } - ret = rcl_trigger_guard_condition(&gc_); + return RCL_RET_OK == ret; + } + + bool trigger() + { + rcl_ret_t ret = rcl_trigger_guard_condition(&gc_); return RCL_RET_OK == ret; } @@ -447,7 +455,7 @@ class TestWaitable : public rclcpp::Waitable { (void) data; count_++; - std::this_thread::sleep_for(1ms); + std::this_thread::sleep_for(3ms); } size_t @@ -459,6 +467,22 @@ class TestWaitable : public rclcpp::Waitable return count_; } + void + set_listener_callback( + rmw_listener_callback_t callback, + const void * user_data) const override + { + rcl_ret_t ret = rcl_guard_condition_set_listener_callback( + &gc_, + callback, + user_data, + true /*Use previous events*/); + + if (RCL_RET_OK != ret) { + throw std::runtime_error(std::string("Couldn't set guard condition callback")); + } + } + private: size_t count_ = 0; rcl_guard_condition_t gc_; @@ -476,6 +500,7 @@ TYPED_TEST(TestExecutorsStable, spinAll) { // executor. bool spin_exited = false; std::thread spinner([&spin_exited, &executor, this]() { + std::this_thread::sleep_for(10ms); executor.spin_all(1s); executor.remove_node(this->node, true); spin_exited = true; @@ -488,6 +513,7 @@ TYPED_TEST(TestExecutorsStable, spinAll) { !spin_exited && (std::chrono::steady_clock::now() - start < 1s)) { + my_waitable->trigger(); this->publisher->publish(test_msgs::msg::Empty()); std::this_thread::sleep_for(1ms); } @@ -525,10 +551,11 @@ TYPED_TEST(TestExecutorsStable, spinSome) { // count becomes too large, spin exits, or the 1 second timeout completes. auto start = std::chrono::steady_clock::now(); while ( - my_waitable->get_count() <= 1 && + my_waitable->get_count() <= 10 && !spin_exited && (std::chrono::steady_clock::now() - start < 1s)) { + my_waitable->trigger(); this->publisher->publish(test_msgs::msg::Empty()); std::this_thread::sleep_for(1ms); } @@ -572,6 +599,57 @@ TYPED_TEST(TestExecutorsStable, testSpinNodeUntilFutureCompleteNodePtr) { EXPECT_EQ(rclcpp::FutureReturnCode::SUCCESS, ret); } +TYPED_TEST(TestExecutorsStable, testSpinSomeWhileSpinning) { + using ExecutorType = TypeParam; + ExecutorType executor; + + std::thread spinner([&]() {executor.spin();}); + + // Wait to make sure thread started + do { + std::this_thread::sleep_for(5ms); + } while (!spinner.joinable()); + + EXPECT_THROW(executor.spin_some(1s), std::runtime_error); + + executor.cancel(); + spinner.join(); +} + +TYPED_TEST(TestExecutorsStable, testSpinAllWhileSpinning) { + using ExecutorType = TypeParam; + ExecutorType executor; + + std::thread spinner([&]() {executor.spin();}); + + // Wait to make sure thread started + do { + std::this_thread::sleep_for(5ms); + } while (!spinner.joinable()); + + EXPECT_THROW(executor.spin_all(1s), std::runtime_error); + + executor.cancel(); + spinner.join(); +} + +TYPED_TEST(TestExecutorsStable, testSpinOnceWhileSpinning) { + using ExecutorType = TypeParam; + ExecutorType executor; + + std::thread spinner([&]() {executor.spin();}); + + // Wait to make sure thread started + do { + std::this_thread::sleep_for(5ms); + } while (!spinner.joinable()); + + EXPECT_THROW(executor.spin_once(), std::runtime_error); + + executor.cancel(); + spinner.join(); +} + // Check spin_until_future_complete with node base pointer (instantiates its own executor) TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr) { rclcpp::init(0, nullptr); diff --git a/rclcpp/test/rclcpp/executors/test_timers_manager.cpp b/rclcpp/test/rclcpp/executors/test_timers_manager.cpp new file mode 100644 index 0000000000..ccafcd9c47 --- /dev/null +++ b/rclcpp/test/rclcpp/executors/test_timers_manager.cpp @@ -0,0 +1,435 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "rclcpp/contexts/default_context.hpp" +#include "rclcpp/executors/timers_manager.hpp" + +using namespace std::chrono_literals; + +using rclcpp::executors::TimersManager; + +using CallbackT = std::function; +using TimerT = rclcpp::WallTimer; + +class TestTimersManager : public ::testing::Test +{ +public: + void SetUp() + { + rclcpp::init(0, nullptr); + } + + void TearDown() + { + rclcpp::shutdown(); + } +}; + +TEST_F(TestTimersManager, empty_manager) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + EXPECT_EQ(TimersManager::MAX_TIME, timers_manager->get_head_timeout()); + EXPECT_FALSE(timers_manager->execute_head_timer()); + EXPECT_NO_THROW(timers_manager->execute_ready_timers()); + EXPECT_NO_THROW(timers_manager->clear()); + EXPECT_NO_THROW(timers_manager->start()); + EXPECT_NO_THROW(timers_manager->stop()); +} + +TEST_F(TestTimersManager, max_time) +{ + // Timers manager max time should be a very big duration that does not cause overflow + + EXPECT_LT(std::chrono::hours(50).count(), TimersManager::MAX_TIME.count()); + auto tp = std::chrono::steady_clock::now() + TimersManager::MAX_TIME; + EXPECT_TRUE(std::chrono::steady_clock::now() < tp); +} + +TEST_F(TestTimersManager, add_run_remove_timer) +{ + size_t t_runs = 0; + auto t = TimerT::make_shared( + 1ms, + [&t_runs]() { + t_runs++; + }, + rclcpp::contexts::get_global_default_context()); + std::weak_ptr t_weak = t; + + // Add the timer to the timers manager + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + timers_manager->add_timer(t); + + // Sleep for more 3 times the timer period + std::this_thread::sleep_for(3ms); + + // The timer is executed only once, even if we slept 3 times the period + timers_manager->execute_ready_timers(); + EXPECT_EQ(1u, t_runs); + + // Remove the timer from the manager + timers_manager->remove_timer(t); + + t.reset(); + // The timer is now not valid anymore + EXPECT_FALSE(t_weak.lock() != nullptr); +} + +TEST_F(TestTimersManager, clear) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + auto t1 = TimerT::make_shared(1ms, CallbackT(), rclcpp::contexts::get_global_default_context()); + std::weak_ptr t1_weak = t1; + auto t2 = TimerT::make_shared(1ms, CallbackT(), rclcpp::contexts::get_global_default_context()); + std::weak_ptr t2_weak = t2; + + timers_manager->add_timer(t1); + timers_manager->add_timer(t2); + + EXPECT_TRUE(t1_weak.lock() != nullptr); + EXPECT_TRUE(t2_weak.lock() != nullptr); + + timers_manager->clear(); + + t1.reset(); + t2.reset(); + + EXPECT_FALSE(t1_weak.lock() != nullptr); + EXPECT_FALSE(t2_weak.lock() != nullptr); +} + +TEST_F(TestTimersManager, remove_not_existing_timer) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + // Try to remove a nullptr timer + EXPECT_NO_THROW(timers_manager->remove_timer(nullptr)); + + auto t = TimerT::make_shared(1ms, CallbackT(), rclcpp::contexts::get_global_default_context()); + timers_manager->add_timer(t); + + // Remove twice the same timer + timers_manager->remove_timer(t); + EXPECT_NO_THROW(timers_manager->remove_timer(t)); +} + +TEST_F(TestTimersManager, timers_thread_exclusive_usage) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + timers_manager->start(); + + EXPECT_THROW(timers_manager->start(), std::exception); + EXPECT_THROW(timers_manager->get_head_timeout(), std::exception); + EXPECT_THROW(timers_manager->execute_ready_timers(), std::exception); + EXPECT_THROW(timers_manager->execute_head_timer(), std::exception); + + timers_manager->stop(); + + EXPECT_NO_THROW(timers_manager->get_head_timeout()); + EXPECT_NO_THROW(timers_manager->execute_ready_timers()); + EXPECT_NO_THROW(timers_manager->execute_head_timer()); +} + +TEST_F(TestTimersManager, add_timer_twice) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + auto t = TimerT::make_shared(1ms, CallbackT(), rclcpp::contexts::get_global_default_context()); + + timers_manager->add_timer(t); + EXPECT_NO_THROW(timers_manager->add_timer(t)); +} + +TEST_F(TestTimersManager, add_nullptr) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + EXPECT_THROW(timers_manager->add_timer(nullptr), std::exception); +} + +TEST_F(TestTimersManager, head_not_ready) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + size_t t_runs = 0; + auto t = TimerT::make_shared( + 10s, + [&t_runs]() { + t_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + timers_manager->add_timer(t); + + // Timer will take 10s to get ready, so nothing to execute here + bool ret = timers_manager->execute_head_timer(); + EXPECT_FALSE(ret); + EXPECT_EQ(0u, t_runs); +} + +TEST_F(TestTimersManager, timers_order) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + size_t t1_runs = 0; + auto t1 = TimerT::make_shared( + 1ms, + [&t1_runs]() { + t1_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + size_t t2_runs = 0; + auto t2 = TimerT::make_shared( + 3ms, + [&t2_runs]() { + t2_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + size_t t3_runs = 0; + auto t3 = TimerT::make_shared( + 10ms, + [&t3_runs]() { + t3_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + // Add timers in a random order + timers_manager->add_timer(t2); + timers_manager->add_timer(t3); + timers_manager->add_timer(t1); + + std::this_thread::sleep_for(1ms); + timers_manager->execute_ready_timers(); + EXPECT_EQ(1u, t1_runs); + EXPECT_EQ(0u, t2_runs); + EXPECT_EQ(0u, t3_runs); + + std::this_thread::sleep_for(3ms); + timers_manager->execute_ready_timers(); + EXPECT_EQ(2u, t1_runs); + EXPECT_EQ(1u, t2_runs); + EXPECT_EQ(0u, t3_runs); + + std::this_thread::sleep_for(10ms); + timers_manager->execute_ready_timers(); + EXPECT_EQ(3u, t1_runs); + EXPECT_EQ(2u, t2_runs); + EXPECT_EQ(1u, t3_runs); + + timers_manager->remove_timer(t1); + + std::this_thread::sleep_for(3ms); + timers_manager->execute_ready_timers(); + EXPECT_EQ(3u, t1_runs); + EXPECT_EQ(3u, t2_runs); + EXPECT_EQ(1u, t3_runs); +} + +TEST_F(TestTimersManager, start_stop_timers_thread) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + auto t = TimerT::make_shared(1ms, []() {}, rclcpp::contexts::get_global_default_context()); + timers_manager->add_timer(t); + + // Calling start multiple times will throw an error + EXPECT_NO_THROW(timers_manager->start()); + EXPECT_THROW(timers_manager->start(), std::exception); + + // Calling stop multiple times does not throw an error + EXPECT_NO_THROW(timers_manager->stop()); + EXPECT_NO_THROW(timers_manager->stop()); +} + +TEST_F(TestTimersManager, timers_thread) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + size_t t1_runs = 0; + auto t1 = TimerT::make_shared( + 1ms, + [&t1_runs]() { + t1_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + size_t t2_runs = 0; + auto t2 = TimerT::make_shared( + 1ms, + [&t2_runs]() { + t2_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + // Add timers + timers_manager->add_timer(t1); + timers_manager->add_timer(t2); + + // Run timers thread for a while + timers_manager->start(); + std::this_thread::sleep_for(5ms); + timers_manager->stop(); + + EXPECT_LT(1u, t1_runs); + EXPECT_LT(1u, t2_runs); + EXPECT_EQ(t1_runs, t2_runs); +} + +TEST_F(TestTimersManager, destructor) +{ + size_t t_runs = 0; + auto t = TimerT::make_shared( + 1ms, + [&t_runs]() { + t_runs++; + }, + rclcpp::contexts::get_global_default_context()); + std::weak_ptr t_weak = t; + + // When the timers manager is destroyed, it will stop the thread + // and clear the timers + { + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + timers_manager->add_timer(t); + + timers_manager->start(); + std::this_thread::sleep_for(3ms); + + EXPECT_LT(1u, t_runs); + } + + // The thread is not running anymore, so this value does not increase + size_t runs = t_runs; + std::this_thread::sleep_for(3ms); + EXPECT_EQ(runs, t_runs); + t.reset(); + EXPECT_FALSE(t_weak.lock() != nullptr); +} + +TEST_F(TestTimersManager, add_remove_while_thread_running) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + size_t t1_runs = 0; + auto t1 = TimerT::make_shared( + 1ms, + [&t1_runs]() { + t1_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + size_t t2_runs = 0; + auto t2 = TimerT::make_shared( + 1ms, + [&t2_runs]() { + t2_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + // Add timers + timers_manager->add_timer(t1); + + // Start timers thread + timers_manager->start(); + + // After a while remove t1 and add t2 + std::this_thread::sleep_for(5ms); + timers_manager->remove_timer(t1); + size_t tmp_t1 = t1_runs; + timers_manager->add_timer(t2); + + // Wait some more time and then stop + std::this_thread::sleep_for(5ms); + timers_manager->stop(); + + // t1 has stopped running + EXPECT_EQ(tmp_t1, t1_runs); + // t2 is correctly running + EXPECT_LT(1u, t2_runs); +} + +TEST_F(TestTimersManager, infinite_loop) +{ + // This test makes sure that even if timers have a period shorter than the duration + // of their callback the functions never block indefinitely. + + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + size_t t1_runs = 0; + auto t1 = TimerT::make_shared( + 1ms, + [&t1_runs]() { + t1_runs++; + std::this_thread::sleep_for(5ms); + }, + rclcpp::contexts::get_global_default_context()); + + size_t t2_runs = 0; + auto t2 = TimerT::make_shared( + 1ms, + [&t2_runs]() { + t2_runs++; + std::this_thread::sleep_for(5ms); + }, + rclcpp::contexts::get_global_default_context()); + + timers_manager->add_timer(t1); + timers_manager->add_timer(t2); + + // Sleep for enough time to trigger timers + std::this_thread::sleep_for(3ms); + timers_manager->execute_ready_timers(); + EXPECT_EQ(1u, t1_runs); + EXPECT_EQ(1u, t2_runs); + + // Due to the long execution of timer callbacks, timers are already ready + bool ret = timers_manager->execute_head_timer(); + EXPECT_TRUE(ret); + EXPECT_EQ(3u, t1_runs + t2_runs); + + // Start a timers thread + timers_manager->start(); + std::this_thread::sleep_for(10ms); + timers_manager->stop(); + + EXPECT_LT(3u, t1_runs + t2_runs); + EXPECT_LT(1u, t1_runs); + EXPECT_LT(1u, t2_runs); +} diff --git a/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp b/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp index 25bd2aba03..f25da5cb3d 100644 --- a/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp +++ b/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp @@ -50,7 +50,8 @@ using ExecutorTypes = ::testing::Types< rclcpp::executors::SingleThreadedExecutor, rclcpp::executors::MultiThreadedExecutor, - rclcpp::executors::StaticSingleThreadedExecutor>; + rclcpp::executors::StaticSingleThreadedExecutor, + rclcpp::executors::EventsExecutor>; class ExecutorTypeNames { @@ -71,6 +72,10 @@ class ExecutorTypeNames return "StaticSingleThreadedExecutor"; } + if (std::is_same()) { + return "EventsExecutor"; + } + return ""; } }; @@ -158,7 +163,8 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, remove_callback_groups) { */ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_duplicate_callback_groups) { - rclcpp::executors::MultiThreadedExecutor executor; + using ExecutorType = TypeParam; + ExecutorType executor; auto node = std::make_shared("my_node", "/ns"); auto timer_callback = []() {}; rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group( @@ -176,7 +182,8 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_duplicate_callback_groups) */ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_callback_groups_after_add_node_to_executor) { - rclcpp::executors::MultiThreadedExecutor executor; + using ExecutorType = TypeParam; + ExecutorType executor; auto node = std::make_shared("my_node", "/ns"); executor.add_node(node->get_node_base_interface()); ASSERT_EQ(executor.get_all_callback_groups().size(), 1u); @@ -210,13 +217,14 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_callback_groups_after_add_node_t */ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_unallowable_callback_groups) { + using ExecutorType = TypeParam; + ExecutorType executor; auto node = std::make_shared("my_node", "/ns"); auto timer_callback = []() {}; rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group( rclcpp::CallbackGroupType::MutuallyExclusive, false); rclcpp::TimerBase::SharedPtr timer_ = node->create_wall_timer( 2s, timer_callback, cb_grp); - rclcpp::executors::MultiThreadedExecutor executor; executor.add_callback_group(cb_grp, node->get_node_base_interface()); ASSERT_EQ(executor.get_all_callback_groups().size(), 1u); @@ -245,14 +253,15 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_unallowable_callback_groups) */ TYPED_TEST(TestAddCallbackGroupsToExecutor, one_node_many_callback_groups_many_executors) { + using ExecutorType = TypeParam; auto node = std::make_shared("my_node", "/ns"); auto timer_callback = []() {}; rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group( rclcpp::CallbackGroupType::MutuallyExclusive, false); rclcpp::TimerBase::SharedPtr timer_ = node->create_wall_timer( 2s, timer_callback, cb_grp); - rclcpp::executors::MultiThreadedExecutor timer_executor; - rclcpp::executors::MultiThreadedExecutor sub_executor; + ExecutorType timer_executor; + ExecutorType sub_executor; timer_executor.add_callback_group(cb_grp, node->get_node_base_interface()); const rclcpp::QoS qos(10); auto options = rclcpp::SubscriptionOptions(); @@ -281,7 +290,8 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, one_node_many_callback_groups_many_e */ TYPED_TEST(TestAddCallbackGroupsToExecutor, remove_callback_group) { - rclcpp::executors::MultiThreadedExecutor executor; + using ExecutorType = TypeParam; + ExecutorType executor; auto node = std::make_shared("my_node", "/ns"); auto timer_callback = []() {}; rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group(