Skip to content

Commit

Permalink
add unit tests for callback groups and fix some issues
Browse files Browse the repository at this point in the history
Signed-off-by: Soragna, Alberto <alberto.soragna@gmail.com>
  • Loading branch information
alsora committed Oct 20, 2020
1 parent 8688d3b commit 91f1a3b
Show file tree
Hide file tree
Showing 11 changed files with 289 additions and 88 deletions.
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ class Executor
* \throws std::runtime_error if there is an issue triggering the guard condition
*/
RCLCPP_PUBLIC
virtual void
void
cancel();

/// Support dynamic switching of the memory strategy.
Expand Down
9 changes: 2 additions & 7 deletions rclcpp/include/rclcpp/executors/event_waitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ namespace executors
* @brief This class provides a wrapper around the waitable object, that is
* meant to be used with the EventsExecutor.
* The waitset related methods are stubbed out as they should not be called.
* This class is abstract as the execute method of rclcpp::Waitable is not implemented.
* Nodes who want to implement a custom EventWaitable, can derive from this class and override
* the execute function.
* the execute method.
*/
class EventWaitable : public rclcpp::Waitable
{
Expand All @@ -42,12 +43,6 @@ class EventWaitable : public rclcpp::Waitable
RCLCPP_PUBLIC
virtual ~EventWaitable() = default;

// Executing an EventWaitable is a no-op.
// Derive from this class to implement execute function.
RCLCPP_PUBLIC
virtual void
execute() = 0;

// Stub API: not used by EventsExecutor
RCLCPP_PUBLIC
bool
Expand Down
19 changes: 8 additions & 11 deletions rclcpp/include/rclcpp/executors/events_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ namespace executors
*/
class EventsExecutor : public rclcpp::Executor
{
friend class EventsExecutorEntitiesCollector;

public:
RCLCPP_SMART_PTR_DEFINITIONS(EventsExecutor)

Expand Down Expand Up @@ -121,15 +123,6 @@ class EventsExecutor : public rclcpp::Executor
void
remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify = true) override;

/// Cancel any running spin* function, causing it to return.
/**
* This function can be called asynchonously from any thread.
* \throws std::runtime_error if there is an issue triggering the guard condition
*/
RCLCPP_PUBLIC
void
cancel() override;

// Executor callback: Push new events into the queue and trigger cv.
// This function is called by the DDS entities when an event happened,
// like a subscription receiving a message.
Expand Down Expand Up @@ -184,11 +177,15 @@ class EventsExecutor : public rclcpp::Executor
}
}

/// Add a callback group to an executor.
/**
* \sa rclcpp::Executor::add_callback_group
*/
void
add_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
bool notify) override;
bool notify = true) override;

/// Remove callback group from the executor
/**
Expand All @@ -198,7 +195,7 @@ class EventsExecutor : public rclcpp::Executor
void
remove_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
bool notify) override;
bool notify = true) override;

RCLCPP_PUBLIC
std::vector<rclcpp::CallbackGroup::WeakPtr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ class EventsExecutorEntitiesCollector final : public EventWaitable
// Constructor
RCLCPP_PUBLIC
EventsExecutorEntitiesCollector(
EventsExecutor * executor_context,
std::shared_ptr<TimersManager> timers_manager);
EventsExecutor * executor);

// Destructor
RCLCPP_PUBLIC
Expand Down
16 changes: 1 addition & 15 deletions rclcpp/src/rclcpp/executors/events_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ EventsExecutor::EventsExecutor(
: rclcpp::Executor(options)
{
timers_manager_ = std::make_shared<TimersManager>(context_);
entities_collector_ = std::make_shared<EventsExecutorEntitiesCollector>(this, timers_manager_);
entities_collector_ = std::make_shared<EventsExecutorEntitiesCollector>(this);

// This API uses the wait_set only as a token to identify different executors.
auto context_interrupt_gc = options.context->get_interrupt_guard_condition(&wait_set_);
Expand Down Expand Up @@ -251,20 +251,6 @@ EventsExecutor::remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
this->remove_node(node_ptr->get_node_base_interface(), notify);
}

void
EventsExecutor::cancel()
{
spinning.store(false);
rcl_ret_t ret = rcl_trigger_guard_condition(&interrupt_guard_condition_);
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to trigger guard condition in cancel");
}

// This makes sure that the timers manager is stopped when we return from this function
// otherwise applications may call rclcpp::shutdown() while that thread is still running.
timers_manager_->stop();
}

void
EventsExecutor::consume_all_events(EventQueue & event_queue)
{
Expand Down
84 changes: 43 additions & 41 deletions rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
using rclcpp::executors::EventsExecutorEntitiesCollector;

EventsExecutorEntitiesCollector::EventsExecutorEntitiesCollector(
EventsExecutor * executor_context,
TimersManager::SharedPtr timers_manager)
EventsExecutor * executor)
{
associated_executor_ = executor_context;
timers_manager_ = timers_manager;
if (executor == nullptr) {
throw std::runtime_error("Received NULL executor in EventsExecutorEntitiesCollector.");
}

associated_executor_ = executor;
timers_manager_ = associated_executor_->timers_manager_;
}

EventsExecutorEntitiesCollector::~EventsExecutorEntitiesCollector()
Expand Down Expand Up @@ -80,7 +83,6 @@ EventsExecutorEntitiesCollector::add_node(
{
// Check if the node already has an executor and if not, set this to true
std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();

if (has_executor.exchange(true)) {
throw std::runtime_error("Node has already been added to an executor.");
}
Expand All @@ -99,7 +101,6 @@ EventsExecutorEntitiesCollector::add_node(
weak_nodes_.push_back(node_ptr);
}


void
EventsExecutorEntitiesCollector::add_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
Expand Down Expand Up @@ -346,35 +347,36 @@ EventsExecutorEntitiesCollector::remove_callback_group_from_map(

// Look for the group to remove in the map
auto iter = weak_groups_to_nodes.find(weak_group_ptr);
if (iter != weak_groups_to_nodes.end()) {
// Group found, get its associated node.
node_ptr = iter->second.lock();
if (node_ptr == nullptr) {
throw std::runtime_error("Node must not be deleted before its callback group(s).");
}
// Remove group from map
weak_groups_to_nodes.erase(iter);

// For all the entities in the group, unset their callbacks
unset_callback_group_entities_callbacks(group_ptr);

// Check if this node still has other callback groups associated with the executor
bool node_has_associated_callback_groups =
!has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) &&
!has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_);

if (!node_has_associated_callback_groups) {
// Node doesn't have more callback groups associated to the executor.
// Unset the event callback for the node's notify guard condition, to stop
// receiving events if entities are added or removed to this node.
unset_guard_condition_callback(node_ptr->get_notify_guard_condition());

// Remove guard condition from list
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr weak_node_ptr(node_ptr);
weak_nodes_to_guard_conditions_.erase(weak_node_ptr);
}
} else {
throw std::runtime_error("Callback group needs to be associated with executor.");
if (iter == weak_groups_to_nodes.end()) {
// Group not found.
throw std::runtime_error("Callback group needs to be associated with this executor.");
}

// Group found, get its associated node.
node_ptr = iter->second.lock();
if (node_ptr == nullptr) {
throw std::runtime_error("Node must not be deleted before its callback group(s).");
}
// Remove group from map
weak_groups_to_nodes.erase(iter);

// For all the entities in the group, unset their callbacks
unset_callback_group_entities_callbacks(group_ptr);

// Check if this node still has other callback groups associated with the executor
bool node_has_associated_callback_groups =
has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) ||
has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_);

if (!node_has_associated_callback_groups) {
// Node doesn't have more callback groups associated to the executor.
// Unset the event callback for the node's notify guard condition, to stop
// receiving events if entities are added or removed to this node.
unset_guard_condition_callback(node_ptr->get_notify_guard_condition());

// Remove guard condition from list
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr weak_node_ptr(node_ptr);
weak_nodes_to_guard_conditions_.erase(weak_node_ptr);
}
}

Expand All @@ -383,26 +385,27 @@ EventsExecutorEntitiesCollector::remove_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
{
if (!node_ptr->get_associated_with_executor_atomic().load()) {
throw std::runtime_error("Node needs to be associated with this executor.");
throw std::runtime_error("Node needs to be associated with an executor.");
return;
}
// Check if this node is currently stored here
auto node_it = weak_nodes_.begin();
while (node_it != weak_nodes_.end()) {
if (node_it->lock() == node_ptr) {
bool matched = (node_it->lock() == node_ptr);
if (matched) {
weak_nodes_.erase(node_it);
break;
}
++node_it;
}
if (node_it == weak_nodes_.end()) {
// The node is not stored here, so nothing to do
throw std::runtime_error("Tried to remove node not stored in executor.");
// The node is not stored here
throw std::runtime_error("Tried to remove node not stored in this executor.");
return;
}

// Find callback groups belonging to the node to remove
std::vector<rclcpp::CallbackGroup::SharedPtr> found_group_ptrs;

std::for_each(
weak_groups_to_nodes_associated_with_executor_.begin(),
weak_groups_to_nodes_associated_with_executor_.end(),
Expand All @@ -415,7 +418,6 @@ EventsExecutorEntitiesCollector::remove_node(
found_group_ptrs.push_back(group_ptr);
}
});

// Remove those callback groups
std::for_each(
found_group_ptrs.begin(), found_group_ptrs.end(), [this]
Expand Down
9 changes: 9 additions & 0 deletions rclcpp/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,15 @@ if(TARGET test_events_executor)
target_link_libraries(test_events_executor ${PROJECT_NAME} mimick)
endif()

ament_add_gtest(test_events_executor_entities_collector rclcpp/executors/test_events_executor_entities_collector.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}")
if(TARGET test_events_executor_entities_collector)
ament_target_dependencies(test_events_executor_entities_collector
"rcl"
"test_msgs")
target_link_libraries(test_events_executor_entities_collector ${PROJECT_NAME} mimick)
endif()

ament_add_gtest(test_static_single_threaded_executor rclcpp/executors/test_static_single_threaded_executor.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}")
if(TARGET test_static_single_threaded_executor)
Expand Down
26 changes: 26 additions & 0 deletions rclcpp/test/rclcpp/executors/test_events_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
#include "test_msgs/srv/empty.hpp"
#include "test_msgs/msg/empty.hpp"

#include "../../mocking_utils/patch.hpp"

using namespace std::chrono_literals;

using rclcpp::executors::EventsExecutor;
using rclcpp::executors::EventsExecutorNotifyWaitable;

class TestEventsExecutor : public ::testing::Test
{
Expand All @@ -40,6 +43,29 @@ class TestEventsExecutor : public ::testing::Test
}
};

TEST_F(TestEventsExecutor, notify_waitable)
{
auto notifier = std::make_shared<EventsExecutorNotifyWaitable>();

// Waitset methods can't be used on EventsWaitable
rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set();
EXPECT_THROW(notifier->add_to_wait_set(&wait_set), std::runtime_error);
EXPECT_THROW(notifier->is_ready(&wait_set), std::runtime_error);

EventsExecutor executor;
rcl_guard_condition_t gc = rcl_get_zero_initialized_guard_condition();
notifier->add_guard_condition(&gc);
{
auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_guard_condition_set_events_executor_callback, RCL_RET_ERROR);
EXPECT_THROW(
notifier->set_events_executor_callback(
&executor,
&EventsExecutor::push_event),
std::runtime_error);
}
}

TEST_F(TestEventsExecutor, run_clients_servers)
{
auto node = std::make_shared<rclcpp::Node>("node");
Expand Down
Loading

0 comments on commit 91f1a3b

Please sign in to comment.