Skip to content

Commit

Permalink
EntitiesCollector GCs to push single event
Browse files Browse the repository at this point in the history
Signed-off-by: Mauro Passerino <mpasserino@irobot.com>
  • Loading branch information
Mauro Passerino committed Apr 12, 2021
1 parent 7cadae3 commit 41c8f8a
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 13 deletions.
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/guard_condition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class GuardCondition

RCLCPP_PUBLIC
void
set_on_trigger_callback(std::function<void(size_t, int)> callback);
set_on_trigger_callback(std::function<void(size_t)> callback);

protected:
rclcpp::Context::SharedPtr context_;
Expand Down
19 changes: 17 additions & 2 deletions rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,21 @@ void
EventsExecutorEntitiesCollector::set_guard_condition_callback(
rclcpp::GuardCondition * guard_condition)
{
guard_condition->set_on_trigger_callback(create_waitable_callback(this));
auto gc_callback = [this](size_t num_events) {
// Override num events (we don't care more than a single event)
num_events = 1;
int gc_id = -1;
ExecutorEvent event = {this, gc_id, WAITABLE_EVENT, num_events};
// Event queue mutex scope
{
std::unique_lock<std::mutex> lock(associated_executor_->push_mutex_);
associated_executor_->events_queue_->push(event);
}
// Notify that the event queue has some events in it.
associated_executor_->events_queue_cv_.notify_one();
};

guard_condition->set_on_trigger_callback(gc_callback);
}

void
Expand Down Expand Up @@ -569,7 +583,8 @@ EventsExecutorEntitiesCollector::add_waitable(rclcpp::Waitable::SharedPtr waitab
{
weak_waitables_map_.emplace(waitable.get(), waitable);

waitable->set_listener_callback([] (size_t, int){});
waitable->set_listener_callback(
create_waitable_callback(waitable.get()));
}

std::function<void(size_t)>
Expand Down
15 changes: 10 additions & 5 deletions rclcpp/src/rclcpp/guard_condition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,18 @@ GuardCondition::add_to_wait_set(rcl_wait_set_t * wait_set) const
}

void
GuardCondition::set_on_trigger_callback(std::function<void(size_t, int)> callback)
GuardCondition::set_on_trigger_callback(std::function<void(size_t)> callback)
{
on_trigger_callback_ = std::bind(callback, std::placeholders::_1, -1);
if (callback) {
on_trigger_callback_ = callback;

if (unread_count_) {
on_trigger_callback_(unread_count_);
unread_count_ = 0;
if (unread_count_) {
callback(unread_count_);
unread_count_ = 0;
}
return;
}

on_trigger_callback_ = nullptr;
}
} // namespace rclcpp
18 changes: 13 additions & 5 deletions rclcpp/src/rclcpp/subscription_intra_process_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,18 @@ SubscriptionIntraProcessBase::get_actual_qos() const
void
SubscriptionIntraProcessBase::set_listener_callback(std::function<void(size_t, int)> callback)
{
on_new_message_callback_ = std::bind(callback, std::placeholders::_1, -1);

if (unread_count_ && on_new_message_callback_) {
on_new_message_callback_(unread_count_);
unread_count_ = 0;
if (callback) {
on_new_message_callback_ = std::bind(callback, std::placeholders::_1, -1);
if (unread_count_) {
if (unread_count_ < qos_profile_.depth) {
on_new_message_callback_(unread_count_);
} else {
on_new_message_callback_(qos_profile_.depth);
}
unread_count_ = 0;
}
return;
}

on_new_message_callback_ = nullptr;
}

0 comments on commit 41c8f8a

Please sign in to comment.