Skip to content

Commit

Permalink
Updates
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Carroll <mjcarroll@intrinsic.ai>
  • Loading branch information
mjcarroll committed Mar 31, 2023
1 parent 9950fb9 commit 14e009e
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 111 deletions.
4 changes: 0 additions & 4 deletions rclcpp/include/rclcpp/callback_group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,6 @@ class CallbackGroup
bool
has_valid_node();

RCLCPP_PUBLIC
bool
has_valid_node();

RCLCPP_PUBLIC
std::atomic_bool &
can_be_taken_from();
Expand Down
6 changes: 6 additions & 0 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "rcl/guard_condition.h"
#include "rcl/wait.h"
#include "rclcpp/executors/executor_notify_waitable.hpp"
#include "rcpputils/scope_exit.hpp"

#include "rclcpp/context.hpp"
Expand Down Expand Up @@ -484,6 +485,10 @@ class Executor
static void
execute_client(rclcpp::ClientBase::SharedPtr client);

RCLCPP_PUBLIC
void
collect_entities();

/// Block until more work becomes avilable or timeout is reached.
/**
* Builds a set of waitable entities, which are passed to the middleware.
Expand Down Expand Up @@ -541,6 +546,7 @@ class Executor
virtual void
spin_once_impl(std::chrono::nanoseconds timeout);

std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> notify_waitable_;
rclcpp::executors::ExecutorEntitiesCollector collector_;
rclcpp::executors::ExecutorEntitiesCollection current_collection_;

Expand Down
1 change: 1 addition & 0 deletions rclcpp/include/rclcpp/wait_set_template.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli
this->storage_acquire_ownerships();
RCPPUTILS_SCOPE_EXIT({this->storage_release_ownerships();});


// this method comes from the SynchronizationPolicy
return this->template sync_wait<WaitResult<WaitSetTemplate>>(
// pass along the time_to_wait duration as nanoseconds
Expand Down
201 changes: 104 additions & 97 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "rcl/allocator.h"
#include "rcl/error_handling.h"
#include "rclcpp/executors/executor_notify_waitable.hpp"
#include "rclcpp/subscription_wait_set_mask.hpp"
#include "rcpputils/scope_exit.hpp"

Expand All @@ -46,6 +47,11 @@ Executor::Executor(const rclcpp::ExecutorOptions & options)
: spinning(false),
interrupt_guard_condition_(options.context),
shutdown_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
notify_waitable_(std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
[this]() {
this->collect_entities();
})),
collector_(notify_waitable_),
wait_set_(std::make_shared<rclcpp::WaitSet>())
{
// Store the context for later use.
Expand All @@ -59,39 +65,45 @@ Executor::Executor(const rclcpp::ExecutorOptions & options)
}
});

this->collector_.get_executor_notify_waitable()->add_guard_condition(
&interrupt_guard_condition_);
this->collector_.get_executor_notify_waitable()->add_guard_condition(
shutdown_guard_condition_.get());
notify_waitable_->add_guard_condition(&interrupt_guard_condition_);
notify_waitable_->add_guard_condition(shutdown_guard_condition_.get());
}

Executor::~Executor()
{
current_collection_.update_timers({},
[this](auto timer){wait_set_->add_timer(timer);},
[this](auto timer){wait_set_->remove_timer(timer);});

current_collection_.update_subscriptions({},
[this](auto subscription){
wait_set_->add_subscription(subscription, kDefaultSubscriptionMask);},
[this](auto subscription){
wait_set_->remove_subscription(subscription, kDefaultSubscriptionMask);});
current_collection_.timers.update(
{},
[this](auto timer) {wait_set_->add_timer(timer);},
[this](auto timer) {wait_set_->remove_timer(timer);});

current_collection_.subscriptions.update(
{},
[this](auto subscription) {
wait_set_->add_subscription(subscription, kDefaultSubscriptionMask);
},
[this](auto subscription) {
wait_set_->remove_subscription(subscription, kDefaultSubscriptionMask);
});

current_collection_.update_clients({},
[this](auto client){wait_set_->add_client(client);},
[this](auto client){wait_set_->remove_client(client);});
current_collection_.clients.update(
{},
[this](auto client) {wait_set_->add_client(client);},
[this](auto client) {wait_set_->remove_client(client);});

current_collection_.update_services({},
[this](auto service){wait_set_->add_service(service);},
[this](auto service){wait_set_->remove_service(service);});
current_collection_.services.update(
{},
[this](auto service) {wait_set_->add_service(service);},
[this](auto service) {wait_set_->remove_service(service);});

current_collection_.update_guard_conditions({},
[this](auto guard_condition){wait_set_->add_guard_condition(guard_condition);},
[this](auto guard_condition){wait_set_->remove_guard_condition(guard_condition);});
current_collection_.guard_conditions.update(
{},
[this](auto guard_condition) {wait_set_->add_guard_condition(guard_condition);},
[this](auto guard_condition) {wait_set_->remove_guard_condition(guard_condition);});

current_collection_.update_waitables({},
[this](auto waitable){wait_set_->add_waitable(waitable);},
[this](auto waitable){wait_set_->remove_waitable(waitable);});
current_collection_.waitables.update(
{},
[this](auto waitable) {wait_set_->add_waitable(waitable);},
[this](auto waitable) {wait_set_->remove_waitable(waitable);});

// Remove shutdown callback handle registered to Context
if (!context_->remove_on_shutdown_callback(shutdown_callback_handle_)) {
Expand Down Expand Up @@ -129,15 +141,14 @@ Executor::add_callback_group(
(void) node_ptr;
this->collector_.add_callback_group(group_ptr);

if (notify)
{
if (notify) {
// Interrupt waiting to handle removed callback group
try {
interrupt_guard_condition_.trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group add: ") + ex.what());
std::string(
"Failed to trigger guard condition on callback group add: ") + ex.what());
}
}
}
Expand All @@ -146,15 +157,14 @@ void
Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
{
this->collector_.add_node(node_ptr);
if (notify)
{
if (notify) {
// Interrupt waiting to handle removed callback group
try {
interrupt_guard_condition_.trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group remove: ") + ex.what());
std::string(
"Failed to trigger guard condition on callback group remove: ") + ex.what());
}
}
}
Expand All @@ -166,15 +176,14 @@ Executor::remove_callback_group(
{
this->collector_.remove_callback_group(group_ptr);

if (notify)
{
if (notify) {
// Interrupt waiting to handle removed callback group
try {
interrupt_guard_condition_.trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group remove: ") + ex.what());
std::string(
"Failed to trigger guard condition on callback group remove: ") + ex.what());
}
}
}
Expand All @@ -183,34 +192,21 @@ void
Executor::add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
{
this->add_node(node_ptr->get_node_base_interface(), notify);

if (notify)
{
// Interrupt waiting to handle removed callback group
try {
interrupt_guard_condition_.trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on node add: ") + ex.what());
}
}
}

void
Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
{
this->collector_.remove_node(node_ptr);

if (notify)
{
if (notify) {
// Interrupt waiting to handle removed callback group
try {
interrupt_guard_condition_.trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group add: ") + ex.what());
std::string(
"Failed to trigger guard condition on callback group add: ") + ex.what());
}
}
}
Expand Down Expand Up @@ -356,16 +352,10 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
if (any_exec.waitable) {
any_exec.waitable->execute(any_exec.data);
}

// Reset the callback_group, regardless of type
any_exec.callback_group->can_be_taken_from().store(true);
// Wake the wait, because it may need to be recalculated or work that
// was previously blocked is now available.
try {
interrupt_guard_condition_.trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition from execute_any_executable: ") + ex.what());
if (any_exec.callback_group) {
any_exec.callback_group->can_be_taken_from().store(true);
}
}

Expand Down Expand Up @@ -506,47 +496,65 @@ Executor::execute_client(
}

void
Executor::wait_for_work(std::chrono::nanoseconds timeout)
Executor::collect_entities()
{
TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count());
{
std::lock_guard<std::mutex> guard(mutex_);

rclcpp::executors::ExecutorEntitiesCollection collection;
auto callback_groups = this->collector_.get_all_callback_groups();
rclcpp::executors::build_entities_collection(callback_groups, collection);

current_collection_.update_timers(collection.timers,
[this](auto timer){wait_set_->add_timer(timer);},
[this](auto timer){wait_set_->remove_timer(timer);});

current_collection_.update_subscriptions(collection.subscriptions,
[this](auto subscription){
wait_set_->add_subscription(subscription, kDefaultSubscriptionMask);},
[this](auto subscription){
wait_set_->remove_subscription(subscription, kDefaultSubscriptionMask);});

current_collection_.update_clients(collection.clients,
[this](auto client){wait_set_->add_client(client);},
[this](auto client){wait_set_->remove_client(client);});
std::lock_guard<std::mutex> guard(mutex_);

rclcpp::executors::ExecutorEntitiesCollection collection;
this->collector_.update_collections();
auto callback_groups = this->collector_.get_all_callback_groups();
rclcpp::executors::build_entities_collection(callback_groups, collection);

auto notify_waitable = std::static_pointer_cast<rclcpp::Waitable>(notify_waitable_);
collection.waitables.insert({notify_waitable.get(), {notify_waitable, {}}});

current_collection_.timers.update(
collection.timers,
[this](auto timer) {wait_set_->add_timer(timer);},
[this](auto timer) {wait_set_->remove_timer(timer);});

current_collection_.subscriptions.update(
collection.subscriptions,
[this](auto subscription) {
wait_set_->add_subscription(subscription, kDefaultSubscriptionMask);
},
[this](auto subscription) {
wait_set_->remove_subscription(subscription, kDefaultSubscriptionMask);
});

current_collection_.update_services(collection.services,
[this](auto service){wait_set_->add_service(service);},
[this](auto service){wait_set_->remove_service(service);});
current_collection_.clients.update(
collection.clients,
[this](auto client) {wait_set_->add_client(client);},
[this](auto client) {wait_set_->remove_client(client);});

current_collection_.services.update(
collection.services,
[this](auto service) {wait_set_->add_service(service);},
[this](auto service) {wait_set_->remove_service(service);});

current_collection_.guard_conditions.update(
collection.guard_conditions,
[this](auto guard_condition) {wait_set_->add_guard_condition(guard_condition);},
[this](auto guard_condition) {wait_set_->remove_guard_condition(guard_condition);});

current_collection_.waitables.update(
collection.waitables,
[this](auto waitable) {wait_set_->add_waitable(waitable);},
[this](auto waitable) {wait_set_->remove_waitable(waitable);});
}

current_collection_.update_guard_conditions(collection.guard_conditions,
[this](auto guard_condition){wait_set_->add_guard_condition(guard_condition);},
[this](auto guard_condition){wait_set_->remove_guard_condition(guard_condition);});
void
Executor::wait_for_work(std::chrono::nanoseconds timeout)
{
TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count());

current_collection_.update_waitables(collection.waitables,
[this](auto waitable){wait_set_->add_waitable(waitable);},
[this](auto waitable){wait_set_->remove_waitable(waitable);});
if (current_collection_.empty()) {
this->collect_entities();
}

auto wait_result = wait_set_->wait(timeout);

if (wait_result.kind() == WaitResultKind::Empty)
{
if (wait_result.kind() == WaitResultKind::Empty) {
RCUTILS_LOG_WARN_NAMED(
"rclcpp",
"empty wait set received in wait(). This should never happen.");
Expand All @@ -562,8 +570,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
TRACEPOINT(rclcpp_executor_get_next_ready);
std::lock_guard<std::mutex> guard{mutex_};

if (ready_executables_.size() == 0)
{
if (ready_executables_.size() == 0) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ void check_ready(
exec.callback_group = callback_group;
if (fill_executable(exec, entity)) {
executables.push_back(exec);
} else {
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t * wait_set)
bool
ExecutorNotifyWaitable::is_ready(rcl_wait_set_t * wait_set)
{
std::cout << "ExecutorNotifyWaitable::is_ready" << std::endl;
std::lock_guard<std::mutex> lock(guard_condition_mutex_);
for (size_t ii = 0; ii < wait_set->size_of_guard_conditions; ++ii) {
auto rcl_guard_condition = wait_set->guard_conditions[ii];
Expand All @@ -68,6 +69,7 @@ ExecutorNotifyWaitable::is_ready(rcl_wait_set_t * wait_set)
void
ExecutorNotifyWaitable::execute(std::shared_ptr<void> & data)
{
std::cout << "ExecutorNotifyWaitable::execute" << std::endl;
(void) data;
this->execute_callback_();
}
Expand Down
Loading

0 comments on commit 14e009e

Please sign in to comment.