Skip to content

Commit

Permalink
Merge pull request ros2#16 from alsora/asoragna/events-executor-tests
Browse files Browse the repository at this point in the history
more thread safety and events executor test
  • Loading branch information
iRobot ROS authored Oct 14, 2020
2 parents 2898804 + da55bd9 commit aaaa265
Show file tree
Hide file tree
Showing 14 changed files with 605 additions and 127 deletions.
4 changes: 3 additions & 1 deletion rclcpp/include/rclcpp/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ class ClientBase

RCLCPP_PUBLIC
void
set_events_executor_callback(const void * executor_context, ExecutorEventCallback executor_callback) const;
set_events_executor_callback(
const void * executor_context,
ExecutorEventCallback executor_callback) const;

protected:
RCLCPP_DISABLE_COPY(ClientBase)
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/executors/events_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class EventsExecutor : public rclcpp::Executor
std::mutex event_queue_mutex_;
std::condition_variable event_queue_cv_;

// Timers heap manager
// Timers manager
std::shared_ptr<TimersManager> timers_manager_;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#define RCLCPP__EXECUTORS__EVENTS_EXECUTOR_ENTITIES_COLLECTOR_HPP_

#include <list>
#include <memory>

#include "rclcpp/executors/timers_manager.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
Expand Down Expand Up @@ -90,7 +91,7 @@ class EventsExecutorEntitiesCollector final : public rclcpp::Waitable
add_to_wait_set(rcl_wait_set_t * wait_set) override
{
(void)wait_set;
return true;
return false;
}

private:
Expand Down
37 changes: 7 additions & 30 deletions rclcpp/include/rclcpp/executors/timers_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
#ifndef RCLCPP__EXECUTORS__TIMERS_MANAGER_HPP_
#define RCLCPP__EXECUTORS__TIMERS_MANAGER_HPP_

#include <algorithm>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <list>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <thread>
#include <vector>

Expand Down Expand Up @@ -152,7 +150,6 @@ class TimersManager
if (heap_.empty()) {
return MAX_TIME;
}

return (*heap_[0])->time_until_trigger();
}

Expand All @@ -166,26 +163,19 @@ class TimersManager
/**
* @brief Helper function that checks whether a timer was already ready
* at a specific timepoint
* @param tp the timepoint to check for
* @param timer a pointer to the timer to check for
* @param tp the timepoint to check for
* @return true if timer was ready at tp
*/
bool timer_was_ready_at_tp(
std::chrono::time_point<std::chrono::steady_clock> tp,
TimerPtr timer)
TimerPtr timer,
std::chrono::time_point<std::chrono::steady_clock> tp)
{
// A ready timer will return a negative duration when calling time_until_trigger
auto time_ready = std::chrono::steady_clock::now() + (*timer)->time_until_trigger();
return time_ready < tp;
}

/**
* @brief Rebuilds the heap queue from the timers storage
* This function is meant to be called whenever something changes in the timers storage.
* This function is not thread safe, you need to acquire a mutex before calling it.
*/
void rebuild_heap();

/**
* @brief Add a new timer to the heap and sort it.
* @param x pointer to a timer owned by this object.
Expand Down Expand Up @@ -245,29 +235,16 @@ class TimersManager
heap_[i] = updated_timer;
}

// Helper function to check the correctness of the heap.
void verify()
{
for (size_t i = 0; i < heap_.size()/2; ++i) {
size_t left = 2*i + 1;
if (left < heap_.size()) {
assert(((*heap_[left])->time_until_trigger().count() >= (*heap_[i])->time_until_trigger().count()));
}
size_t right = left + 1;
if (right < heap_.size()) {
assert(((*heap_[right])->time_until_trigger().count() >= (*heap_[i])->time_until_trigger().count()));
}
}
}

// Thread used to run the timers monitoring and execution
// Thread used to run the timers monitoring and execution task
std::thread timers_thread_;
// Protects access to timers
std::mutex timers_mutex_;
// Notifies the timers thread whenever timers are added/removed
std::condition_variable timers_cv_;
// Flag used as predicate by timers_cv
bool timers_updated_ {false};
// Indicates whether the timers thread is currently running or requested to stop
std::atomic<bool> running_ {false};
bool running_ {false};
// Context of the parent executor
std::shared_ptr<rclcpp::Context> context_;
// Container to keep ownership of the timers
Expand Down
4 changes: 3 additions & 1 deletion rclcpp/include/rclcpp/service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ class ServiceBase

RCLCPP_PUBLIC
void
set_events_executor_callback(const void * executor_context, ExecutorEventCallback executor_callback) const;
set_events_executor_callback(
const void * executor_context,
ExecutorEventCallback executor_callback) const;

protected:
RCLCPP_DISABLE_COPY(ServiceBase)
Expand Down
4 changes: 3 additions & 1 deletion rclcpp/include/rclcpp/subscription_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,9 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>

RCLCPP_PUBLIC
void
set_events_executor_callback(const void * executor_context, ExecutorEventCallback executor_callback) const;
set_events_executor_callback(
const void * executor_context,
ExecutorEventCallback executor_callback) const;

protected:
template<typename EventCallbackT>
Expand Down
4 changes: 0 additions & 4 deletions rclcpp/src/rclcpp/executors/events_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,5 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
waitable->execute();
break;
}

default:
throw std::runtime_error("EventsExecutor received unrecognized event");
break;
}
}
26 changes: 18 additions & 8 deletions rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <string>

#include "rclcpp/executors/events_executor.hpp"
#include "rclcpp/executors/events_executor_entities_collector.hpp"

Expand Down Expand Up @@ -110,7 +112,7 @@ EventsExecutorEntitiesCollector::set_entities_callbacks(
if (!group || !group->can_be_taken_from().load()) {
continue;
}

// Timers are handled by the timers manager
group->find_timer_ptrs_if(
[this](const rclcpp::TimerBase::SharedPtr & timer) {
Expand All @@ -119,45 +121,53 @@ EventsExecutorEntitiesCollector::set_entities_callbacks(
}
return false;
});

// Set callbacks for all other entity types
group->find_subscription_ptrs_if(
[this](const rclcpp::SubscriptionBase::SharedPtr & subscription) {
if (subscription) {
subscription->set_events_executor_callback(associated_executor_, &EventsExecutor::push_event);
subscription->set_events_executor_callback(
associated_executor_,
&EventsExecutor::push_event);
}
return false;
});
group->find_service_ptrs_if(
[this](const rclcpp::ServiceBase::SharedPtr & service) {
if (service) {
service->set_events_executor_callback(associated_executor_, &EventsExecutor::push_event);
service->set_events_executor_callback(
associated_executor_,
&EventsExecutor::push_event);
}
return false;
});
group->find_client_ptrs_if(
[this](const rclcpp::ClientBase::SharedPtr & client) {
if (client) {
client->set_events_executor_callback(associated_executor_, &EventsExecutor::push_event);
client->set_events_executor_callback(
associated_executor_,
&EventsExecutor::push_event);
}
return false;
});
group->find_waitable_ptrs_if(
[this](const rclcpp::Waitable::SharedPtr & waitable) {
if (waitable) {
waitable->set_events_executor_callback(associated_executor_, &EventsExecutor::push_event);
waitable->set_events_executor_callback(
associated_executor_,
&EventsExecutor::push_event);
}
return false;
});
}

// Set an event callback for the node's notify guard condition, so if new entities are added
// or remove to this node we will receive an event.
// or removed to this node we will receive an event.
rcl_ret_t ret = rcl_guard_condition_set_events_executor_callback(
associated_executor_,
&EventsExecutor::push_event,
this,
node_ptr->get_notify_guard_condition(),
node->get_notify_guard_condition(),
false /* Discard previous events */);

if (ret != RCL_RET_OK) {
Expand Down
Loading

0 comments on commit aaaa265

Please sign in to comment.