Skip to content

Commit

Permalink
Merge pull request ros2#31 from mauropasse/mauro/pr-events-executor
Browse files Browse the repository at this point in the history
rename local_event_queue -> execution_event_queue
  • Loading branch information
iRobot ROS authored Nov 20, 2020
2 parents 5971b41 + 0a5f3f2 commit 6c55f0b
Show file tree
Hide file tree
Showing 19 changed files with 45 additions and 45 deletions.
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class ClientBase
void
set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
EventsExecutorCallback executor_callback) const;
rmw_listener_cb_t executor_callback) const;

RCLCPP_PUBLIC
void
Expand Down
20 changes: 10 additions & 10 deletions rclcpp/include/rclcpp/executors/events_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include "rclcpp/executors/timers_manager.hpp"
#include "rclcpp/node.hpp"

#include "rmw/executor_event_types.h"
#include "rmw/listener_event_types.h"

namespace rclcpp
{
Expand Down Expand Up @@ -176,13 +176,13 @@ class EventsExecutor : public rclcpp::Executor

// Event queue implementation is a deque only to
// facilitate the removal of events from expired entities.
using EventQueue = std::deque<ExecutorEvent>;
using EventQueue = std::deque<rmw_listener_event_t>;

// Executor callback: Push new events into the queue and trigger cv.
// This function is called by the DDS entities when an event happened,
// like a subscription receiving a message.
static void
push_event(const void * executor_ptr, ExecutorEvent event)
push_event(const void * executor_ptr, rmw_listener_event_t event)
{
// Cast executor_ptr to this (need to remove constness)
auto this_executor = const_cast<executors::EventsExecutor *>(
Expand Down Expand Up @@ -217,18 +217,18 @@ class EventsExecutor : public rclcpp::Executor
event_queue_.erase(
std::remove_if(
event_queue_.begin(), event_queue_.end(),
[&entity](ExecutorEvent event) {return event.entity == entity;}), event_queue_.end());
[&entity](rmw_listener_event_t event) {return event.entity == entity;}), event_queue_.end());
}

// Remove events associated with this entity from the local event queue
{
std::unique_lock<std::mutex> lock(execution_mutex_);
local_event_queue_.erase(
execution_event_queue_.erase(
std::remove_if(
local_event_queue_.begin(), local_event_queue_.end(),
[&entity](ExecutorEvent event) {
execution_event_queue_.begin(), execution_event_queue_.end(),
[&entity](rmw_listener_event_t event) {
return event.entity == entity;
}), local_event_queue_.end());
}), execution_event_queue_.end());
}
}

Expand All @@ -240,11 +240,11 @@ class EventsExecutor : public rclcpp::Executor
// Execute a single event
RCLCPP_PUBLIC
void
execute_event(const ExecutorEvent & event);
execute_event(const rmw_listener_event_t & event);

// We use two instances of EventQueue to allow threads to push events while we execute them
EventQueue event_queue_;
EventQueue local_event_queue_;
EventQueue execution_event_queue_;

EventsExecutorEntitiesCollector::SharedPtr entities_collector_;
EventsExecutorNotifyWaitable::SharedPtr executor_notifier_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ class EventsExecutorNotifyWaitable final : public EventWaitable
void
set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
EventsExecutorCallback executor_callback) const override
rmw_listener_cb_t executor_callback) const override
{
for (auto gc : notify_guard_conditions_) {
rcl_ret_t ret = rcl_guard_condition_set_events_executor_callback(
rcl_ret_t ret = rcl_guard_condition_set_listener_callback(
executor,
executor_callback,
this,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
void
set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
EventsExecutorCallback executor_callback) const override;
rmw_listener_cb_t executor_callback) const override;

protected:
std::recursive_mutex reentrant_mutex_;
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/qos_event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class QOSEventHandlerBase : public Waitable
void
set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
EventsExecutorCallback executor_callback) const override;
rmw_listener_cb_t executor_callback) const override;

protected:
rcl_event_t event_handle_;
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class ServiceBase
void
set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
EventsExecutorCallback executor_callback) const;
rmw_listener_cb_t executor_callback) const;

RCLCPP_PUBLIC
void
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/subscription_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
void
set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
EventsExecutorCallback executor_callback) const;
rmw_listener_cb_t executor_callback) const;

RCLCPP_PUBLIC
void
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/waitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class Waitable
void
set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
EventsExecutorCallback executor_callback) const;
rmw_listener_cb_t executor_callback) const;

RCLCPP_PUBLIC
void
Expand Down
4 changes: 2 additions & 2 deletions rclcpp/src/rclcpp/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,9 @@ ClientBase::exchange_in_use_by_wait_set_state(bool in_use_state)
void
ClientBase::set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
EventsExecutorCallback executor_callback) const
rmw_listener_cb_t executor_callback) const
{
rcl_ret_t ret = rcl_client_set_events_executor_callback(
rcl_ret_t ret = rcl_client_set_listener_callback(
executor,
executor_callback,
this,
Expand Down
20 changes: 10 additions & 10 deletions rclcpp/src/rclcpp/executors/events_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ EventsExecutor::spin()
event_queue_cv_.wait(push_lock, has_event_predicate);
std::unique_lock<std::mutex> execution_lock(execution_mutex_);
// We got an event! Swap queues while we hold both mutexes
std::swap(local_event_queue_, event_queue_);
std::swap(execution_event_queue_, event_queue_);
// After swapping the queues, we don't need the push lock anymore
push_lock.unlock();
// Consume events while under the execution lock only
this->consume_all_events(local_event_queue_);
this->consume_all_events(execution_event_queue_);
}
timers_manager_->stop();
}
Expand Down Expand Up @@ -108,9 +108,9 @@ EventsExecutor::spin_some(std::chrono::nanoseconds max_duration)
std::unique_lock<std::mutex> push_lock(push_mutex_);
event_queue_cv_.wait_for(push_lock, max_duration, has_event_predicate);
std::unique_lock<std::mutex> execution_lock(execution_mutex_);
std::swap(local_event_queue_, event_queue_);
std::swap(execution_event_queue_, event_queue_);
push_lock.unlock();
this->consume_all_events(local_event_queue_);
this->consume_all_events(execution_event_queue_);
execution_lock.unlock();

timers_manager_->execute_ready_timers();
Expand Down Expand Up @@ -154,18 +154,18 @@ EventsExecutor::spin_all(std::chrono::nanoseconds max_duration)
while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
std::unique_lock<std::mutex> push_lock(push_mutex_);
std::unique_lock<std::mutex> execution_lock(execution_mutex_);
std::swap(local_event_queue_, event_queue_);
std::swap(execution_event_queue_, event_queue_);
push_lock.unlock();

bool ready_timer = timers_manager_->get_head_timeout() < 0ns;
bool has_events = !local_event_queue_.empty();
bool has_events = !execution_event_queue_.empty();
if (!ready_timer && !has_events) {
// Exit as there is no more work to do
break;
}
// Execute all ready work

this->consume_all_events(local_event_queue_);
this->consume_all_events(execution_event_queue_);
execution_lock.unlock();

timers_manager_->execute_ready_timers();
Expand All @@ -189,7 +189,7 @@ EventsExecutor::spin_once_impl(std::chrono::nanoseconds timeout)
// When condition variable is notified, check this predicate to proceed
auto has_event_predicate = [this]() {return !event_queue_.empty();};

ExecutorEvent event;
rmw_listener_event_t event;
bool has_event = false;

{
Expand Down Expand Up @@ -255,15 +255,15 @@ void
EventsExecutor::consume_all_events(EventQueue & event_queue)
{
while (!event_queue.empty()) {
ExecutorEvent event = event_queue.front();
rmw_listener_event_t event = event_queue.front();
event_queue.pop_front();

this->execute_event(event);
}
}

void
EventsExecutor::execute_event(const ExecutorEvent & event)
EventsExecutor::execute_event(const rmw_listener_event_t & event)
{
switch (event.type) {
case SUBSCRIPTION_EVENT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ void
EventsExecutorEntitiesCollector::set_guard_condition_callback(
const rcl_guard_condition_t * guard_condition)
{
rcl_ret_t ret = rcl_guard_condition_set_events_executor_callback(
rcl_ret_t ret = rcl_guard_condition_set_listener_callback(
associated_executor_,
&EventsExecutor::push_event,
this,
Expand All @@ -500,7 +500,7 @@ void
EventsExecutorEntitiesCollector::unset_guard_condition_callback(
const rcl_guard_condition_t * guard_condition)
{
rcl_ret_t ret = rcl_guard_condition_set_events_executor_callback(
rcl_ret_t ret = rcl_guard_condition_set_listener_callback(
nullptr,
nullptr,
nullptr,
Expand Down
4 changes: 2 additions & 2 deletions rclcpp/src/rclcpp/qos_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ QOSEventHandlerBase::is_ready(rcl_wait_set_t * wait_set)
void
QOSEventHandlerBase::set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
EventsExecutorCallback executor_callback) const
rmw_listener_cb_t executor_callback) const
{
rcl_ret_t ret = rcl_event_set_events_executor_callback(
rcl_ret_t ret = rcl_event_set_listener_callback(
executor,
executor_callback,
this,
Expand Down
4 changes: 2 additions & 2 deletions rclcpp/src/rclcpp/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ ServiceBase::exchange_in_use_by_wait_set_state(bool in_use_state)
void
ServiceBase::set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
EventsExecutorCallback executor_callback) const
rmw_listener_cb_t executor_callback) const
{
rcl_ret_t ret = rcl_service_set_events_executor_callback(
rcl_ret_t ret = rcl_service_set_listener_callback(
executor,
executor_callback,
this,
Expand Down
4 changes: 2 additions & 2 deletions rclcpp/src/rclcpp/subscription_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,9 @@ SubscriptionBase::exchange_in_use_by_wait_set_state(
void
SubscriptionBase::set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
EventsExecutorCallback executor_callback) const
rmw_listener_cb_t executor_callback) const
{
rcl_ret_t ret = rcl_subscription_set_events_executor_callback(
rcl_ret_t ret = rcl_subscription_set_listener_callback(
executor,
executor_callback,
this,
Expand Down
4 changes: 2 additions & 2 deletions rclcpp/src/rclcpp/subscription_intra_process_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ SubscriptionIntraProcessBase::get_actual_qos() const
void
SubscriptionIntraProcessBase::set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
EventsExecutorCallback executor_callback) const
rmw_listener_cb_t executor_callback) const
{
rcl_ret_t ret = rcl_guard_condition_set_events_executor_callback(
rcl_ret_t ret = rcl_guard_condition_set_listener_callback(
executor,
executor_callback,
this,
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/src/rclcpp/waitable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Waitable::exchange_in_use_by_wait_set_state(bool in_use_state)
void
Waitable::set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
EventsExecutorCallback executor_callback) const
rmw_listener_cb_t executor_callback) const
{
(void)executor;
(void)executor_callback;
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/test/rclcpp/executors/test_events_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ TEST_F(TestEventsExecutor, notify_waitable)

{
auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_guard_condition_set_events_executor_callback, RCL_RET_ERROR);
"lib:rclcpp", rcl_guard_condition_set_listener_callback, RCL_RET_ERROR);
EXPECT_THROW(std::make_shared<EventsExecutor>(), std::runtime_error);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ TEST_F(TestEventsExecutorEntitiesCollector, test_fancy_name)

{
auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_guard_condition_set_events_executor_callback, RCL_RET_ERROR);
"lib:rclcpp", rcl_guard_condition_set_listener_callback, RCL_RET_ERROR);

EXPECT_THROW(
entities_collector_->add_node(node2->get_node_base_interface()),
Expand Down
4 changes: 2 additions & 2 deletions rclcpp/test/rclcpp/executors/test_executors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -467,9 +467,9 @@ class TestWaitable : public rclcpp::Waitable
void
set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
EventsExecutorCallback executor_callback) const override
rmw_listener_cb_t executor_callback) const override
{
rcl_ret_t ret = rcl_guard_condition_set_events_executor_callback(
rcl_ret_t ret = rcl_guard_condition_set_listener_callback(
executor,
executor_callback,
this,
Expand Down

0 comments on commit 6c55f0b

Please sign in to comment.