Skip to content

Commit

Permalink
Crash in callback group pointer vector iterator (#814)
Browse files Browse the repository at this point in the history
Signed-off-by: Guillaume Autran <gautran@clearpath.ai>
guillaumeautran authored and ivanpauno committed Aug 28, 2019

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 17841d6 commit 4feecc5
Showing 7 changed files with 164 additions and 135 deletions.
68 changes: 49 additions & 19 deletions rclcpp/include/rclcpp/callback_group.hpp
Original file line number Diff line number Diff line change
@@ -62,25 +62,40 @@ class CallbackGroup
RCLCPP_PUBLIC
explicit CallbackGroup(CallbackGroupType group_type);

RCLCPP_PUBLIC
const std::vector<rclcpp::SubscriptionBase::WeakPtr> &
get_subscription_ptrs() const;

RCLCPP_PUBLIC
const std::vector<rclcpp::TimerBase::WeakPtr> &
get_timer_ptrs() const;

RCLCPP_PUBLIC
const std::vector<rclcpp::ServiceBase::WeakPtr> &
get_service_ptrs() const;

RCLCPP_PUBLIC
const std::vector<rclcpp::ClientBase::WeakPtr> &
get_client_ptrs() const;

RCLCPP_PUBLIC
const std::vector<rclcpp::Waitable::WeakPtr> &
get_waitable_ptrs() const;
template<typename Function>
rclcpp::SubscriptionBase::SharedPtr
find_subscription_ptrs_if(Function func) const
{
return _find_ptrs_if_impl<rclcpp::SubscriptionBase, Function>(func, subscription_ptrs_);
}

template<typename Function>
rclcpp::TimerBase::SharedPtr
find_timer_ptrs_if(Function func) const
{
return _find_ptrs_if_impl<rclcpp::TimerBase, Function>(func, timer_ptrs_);
}

template<typename Function>
rclcpp::ServiceBase::SharedPtr
find_service_ptrs_if(Function func) const
{
return _find_ptrs_if_impl<rclcpp::ServiceBase, Function>(func, service_ptrs_);
}

template<typename Function>
rclcpp::ClientBase::SharedPtr
find_client_ptrs_if(Function func) const
{
return _find_ptrs_if_impl<rclcpp::ClientBase, Function>(func, client_ptrs_);
}

template<typename Function>
rclcpp::Waitable::SharedPtr
find_waitable_ptrs_if(Function func) const
{
return _find_ptrs_if_impl<rclcpp::Waitable, Function>(func, waitable_ptrs_);
}

RCLCPP_PUBLIC
std::atomic_bool &
@@ -130,6 +145,21 @@ class CallbackGroup
std::vector<rclcpp::ClientBase::WeakPtr> client_ptrs_;
std::vector<rclcpp::Waitable::WeakPtr> waitable_ptrs_;
std::atomic_bool can_be_taken_from_;

private:
template<typename TypeT, typename Function>
typename TypeT::SharedPtr _find_ptrs_if_impl(
Function func, const std::vector<typename TypeT::WeakPtr> & vect_ptrs) const
{
std::lock_guard<std::mutex> lock(mutex_);
for (auto & weak_ptr : vect_ptrs) {
auto ref_ptr = weak_ptr.lock();
if (ref_ptr && func(ref_ptr)) {
return ref_ptr;
}
}
return typename TypeT::SharedPtr();
}
};

} // namespace callback_group
5 changes: 4 additions & 1 deletion rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
#ifndef RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_
#define RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_

#include <chrono>
#include <memory>
#include <mutex>
#include <set>
@@ -53,7 +54,8 @@ class MultiThreadedExecutor : public executor::Executor
MultiThreadedExecutor(
const executor::ExecutorArgs & args = executor::ExecutorArgs(),
size_t number_of_threads = 0,
bool yield_before_execute = false);
bool yield_before_execute = false,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

RCLCPP_PUBLIC
virtual ~MultiThreadedExecutor();
@@ -77,6 +79,7 @@ class MultiThreadedExecutor : public executor::Executor
std::mutex wait_mutex_;
size_t number_of_threads_;
bool yield_before_execute_;
std::chrono::nanoseconds next_exec_timeout_;

std::set<TimerBase::SharedPtr> scheduled_timers_;
};
41 changes: 16 additions & 25 deletions rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp
Original file line number Diff line number Diff line change
@@ -164,40 +164,31 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
if (!group || !group->can_be_taken_from().load()) {
continue;
}
for (auto & weak_subscription : group->get_subscription_ptrs()) {
auto subscription = weak_subscription.lock();
if (subscription) {
group->find_subscription_ptrs_if(
[this](const rclcpp::SubscriptionBase::SharedPtr & subscription) {
subscription_handles_.push_back(subscription->get_subscription_handle());
if (subscription->get_intra_process_subscription_handle()) {
subscription_handles_.push_back(
subscription->get_intra_process_subscription_handle());
}
}
}
for (auto & weak_service : group->get_service_ptrs()) {
auto service = weak_service.lock();
if (service) {
return false;
});
group->find_service_ptrs_if([this](const rclcpp::ServiceBase::SharedPtr & service) {
service_handles_.push_back(service->get_service_handle());
}
}
for (auto & weak_client : group->get_client_ptrs()) {
auto client = weak_client.lock();
if (client) {
return false;
});
group->find_client_ptrs_if([this](const rclcpp::ClientBase::SharedPtr & client) {
client_handles_.push_back(client->get_client_handle());
}
}
for (auto & weak_timer : group->get_timer_ptrs()) {
auto timer = weak_timer.lock();
if (timer) {
return false;
});
group->find_timer_ptrs_if([this](const rclcpp::TimerBase::SharedPtr & timer) {
timer_handles_.push_back(timer->get_timer_handle());
}
}
for (auto & weak_waitable : group->get_waitable_ptrs()) {
auto waitable = weak_waitable.lock();
if (waitable) {
return false;
});
group->find_waitable_ptrs_if([this](const rclcpp::Waitable::SharedPtr & waitable) {
waitable_handles_.push_back(waitable);
}
}
return false;
});
}
}
return has_invalid_weak_nodes;
64 changes: 30 additions & 34 deletions rclcpp/src/rclcpp/callback_group.cpp
Original file line number Diff line number Diff line change
@@ -23,40 +23,6 @@ CallbackGroup::CallbackGroup(CallbackGroupType group_type)
: type_(group_type), can_be_taken_from_(true)
{}

const std::vector<rclcpp::SubscriptionBase::WeakPtr> &
CallbackGroup::get_subscription_ptrs() const
{
std::lock_guard<std::mutex> lock(mutex_);
return subscription_ptrs_;
}

const std::vector<rclcpp::TimerBase::WeakPtr> &
CallbackGroup::get_timer_ptrs() const
{
std::lock_guard<std::mutex> lock(mutex_);
return timer_ptrs_;
}

const std::vector<rclcpp::ServiceBase::WeakPtr> &
CallbackGroup::get_service_ptrs() const
{
std::lock_guard<std::mutex> lock(mutex_);
return service_ptrs_;
}

const std::vector<rclcpp::ClientBase::WeakPtr> &
CallbackGroup::get_client_ptrs() const
{
std::lock_guard<std::mutex> lock(mutex_);
return client_ptrs_;
}

const std::vector<rclcpp::Waitable::WeakPtr> &
CallbackGroup::get_waitable_ptrs() const
{
std::lock_guard<std::mutex> lock(mutex_);
return waitable_ptrs_;
}

std::atomic_bool &
CallbackGroup::can_be_taken_from()
@@ -76,34 +42,64 @@ CallbackGroup::add_subscription(
{
std::lock_guard<std::mutex> lock(mutex_);
subscription_ptrs_.push_back(subscription_ptr);
subscription_ptrs_.erase(
std::remove_if(
subscription_ptrs_.begin(),
subscription_ptrs_.end(),
[](rclcpp::SubscriptionBase::WeakPtr x) {return x.expired();}),
subscription_ptrs_.end());
}

void
CallbackGroup::add_timer(const rclcpp::TimerBase::SharedPtr timer_ptr)
{
std::lock_guard<std::mutex> lock(mutex_);
timer_ptrs_.push_back(timer_ptr);
timer_ptrs_.erase(
std::remove_if(
timer_ptrs_.begin(),
timer_ptrs_.end(),
[](rclcpp::TimerBase::WeakPtr x) {return x.expired();}),
timer_ptrs_.end());
}

void
CallbackGroup::add_service(const rclcpp::ServiceBase::SharedPtr service_ptr)
{
std::lock_guard<std::mutex> lock(mutex_);
service_ptrs_.push_back(service_ptr);
service_ptrs_.erase(
std::remove_if(
service_ptrs_.begin(),
service_ptrs_.end(),
[](rclcpp::ServiceBase::WeakPtr x) {return x.expired();}),
service_ptrs_.end());
}

void
CallbackGroup::add_client(const rclcpp::ClientBase::SharedPtr client_ptr)
{
std::lock_guard<std::mutex> lock(mutex_);
client_ptrs_.push_back(client_ptr);
client_ptrs_.erase(
std::remove_if(
client_ptrs_.begin(),
client_ptrs_.end(),
[](rclcpp::ClientBase::WeakPtr x) {return x.expired();}),
client_ptrs_.end());
}

void
CallbackGroup::add_waitable(const rclcpp::Waitable::SharedPtr waitable_ptr)
{
std::lock_guard<std::mutex> lock(mutex_);
waitable_ptrs_.push_back(waitable_ptr);
waitable_ptrs_.erase(
std::remove_if(
waitable_ptrs_.begin(),
waitable_ptrs_.end(),
[](rclcpp::Waitable::WeakPtr x) {return x.expired();}),
waitable_ptrs_.end());
}

void
28 changes: 15 additions & 13 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
@@ -511,11 +511,12 @@ Executor::get_group_by_timer(rclcpp::TimerBase::SharedPtr timer)
if (!group) {
continue;
}
for (auto & weak_timer : group->get_timer_ptrs()) {
auto t = weak_timer.lock();
if (t == timer) {
return group;
}
auto timer_ref = group->find_timer_ptrs_if(
[timer](const rclcpp::TimerBase::SharedPtr & timer_ptr) -> bool {
return timer_ptr == timer;
});
if (timer_ref) {
return group;
}
}
}
@@ -535,14 +536,15 @@ Executor::get_next_timer(AnyExecutable & any_exec)
if (!group || !group->can_be_taken_from().load()) {
continue;
}
for (auto & timer_ref : group->get_timer_ptrs()) {
auto timer = timer_ref.lock();
if (timer && timer->is_ready()) {
any_exec.timer = timer;
any_exec.callback_group = group;
any_exec.node_base = node;
return;
}
auto timer_ref = group->find_timer_ptrs_if(
[](const rclcpp::TimerBase::SharedPtr & timer) -> bool {
return timer->is_ready();
});
if (timer_ref) {
any_exec.timer = timer_ref;
any_exec.callback_group = group;
any_exec.node_base = node;
return;
}
}
}
9 changes: 6 additions & 3 deletions rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
Original file line number Diff line number Diff line change
@@ -27,8 +27,11 @@ using rclcpp::executors::MultiThreadedExecutor;
MultiThreadedExecutor::MultiThreadedExecutor(
const rclcpp::executor::ExecutorArgs & args,
size_t number_of_threads,
bool yield_before_execute)
: executor::Executor(args), yield_before_execute_(yield_before_execute)
bool yield_before_execute,
std::chrono::nanoseconds next_exec_timeout)
: executor::Executor(args),
yield_before_execute_(yield_before_execute),
next_exec_timeout_(next_exec_timeout)
{
number_of_threads_ = number_of_threads ? number_of_threads : std::thread::hardware_concurrency();
if (number_of_threads_ == 0) {
@@ -77,7 +80,7 @@ MultiThreadedExecutor::run(size_t)
if (!rclcpp::ok(this->context_) || !spinning.load()) {
return;
}
if (!get_next_executable(any_exec)) {
if (!get_next_executable(any_exec, next_exec_timeout_)) {
continue;
}
if (any_exec.timer) {
Loading

0 comments on commit 4feecc5

Please sign in to comment.