From 36ab1a0bfeecbfde7c35fb0ca4d9bd145fb35460 Mon Sep 17 00:00:00 2001 From: Matt Condino <36555625+mwcondino@users.noreply.github.com> Date: Tue, 30 Jan 2024 06:16:37 -0800 Subject: [PATCH] [events executor] - Fix Behavior with Timer Cancel (#2375) * fix Signed-off-by: Matt Condino * add timer cancel tests Signed-off-by: Matt Condino * cleanup header include Signed-off-by: Matt Condino * reverting change to timer_greater function Signed-off-by: Gus Brigantino * use std::optional, and handle edgecase of 1 cancelled timer Signed-off-by: Matt Condino * clean up run_timers func Signed-off-by: Gus Brigantino * some fixes and added tests for cancel then reset of timers. Signed-off-by: Matt Condino * refactor and clean up. remove cancelled timer tracking. Signed-off-by: Matt Condino * remove unused method for size() Signed-off-by: Matt Condino * linting Signed-off-by: Matt Condino * relax timing constraints in tests Signed-off-by: Matt Condino * further relax timing constraints to ensure windows tests are not flaky. Signed-off-by: Matt Condino * use sim clock for tests, pub clock at .25 realtime rate. Signed-off-by: Matt Condino --------- Signed-off-by: Matt Condino Signed-off-by: Gus Brigantino Co-authored-by: Gus Brigantino (cherry picked from commit 99f0fc938b2d0c441787113a3920c16d3614a197) --- .../rclcpp/experimental/timers_manager.hpp | 12 +- .../events_executor/events_executor.cpp | 7 +- .../rclcpp/experimental/timers_manager.cpp | 45 ++- .../test/rclcpp/executors/test_executors.cpp | 379 ++++++++++++++++++ rclcpp/test/rclcpp/test_timers_manager.cpp | 46 +++ 5 files changed, 468 insertions(+), 21 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/timers_manager.hpp b/rclcpp/include/rclcpp/experimental/timers_manager.hpp index e8830c01a0..44aa07ecd5 100644 --- a/rclcpp/include/rclcpp/experimental/timers_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/timers_manager.hpp @@ -22,10 +22,10 @@ #include #include #include +#include #include #include #include - #include "rclcpp/context.hpp" #include "rclcpp/timer.hpp" @@ -172,13 +172,14 @@ class TimersManager * @brief Get the amount of time before the next timer triggers. * This function is thread safe. * - * @return std::chrono::nanoseconds to wait, + * @return std::optional to wait, * the returned value could be negative if the timer is already expired * or std::chrono::nanoseconds::max() if there are no timers stored in the object. + * If the head timer was cancelled, then this will return a nullopt. * @throws std::runtime_error if the timers thread was already running. */ RCLCPP_PUBLIC - std::chrono::nanoseconds get_head_timeout(); + std::optional get_head_timeout(); private: RCLCPP_DISABLE_COPY(TimersManager) @@ -512,12 +513,13 @@ class TimersManager * @brief Get the amount of time before the next timer triggers. * This function is not thread safe, acquire a mutex before calling it. * - * @return std::chrono::nanoseconds to wait, + * @return std::optional to wait, * the returned value could be negative if the timer is already expired * or std::chrono::nanoseconds::max() if the heap is empty. + * If the head timer was cancelled, then this will return a nullopt. * This function is not thread safe, acquire the timers_mutex_ before calling it. */ - std::chrono::nanoseconds get_head_timeout_unsafe(); + std::optional get_head_timeout_unsafe(); /** * @brief Executes all the timers currently ready when the function is invoked diff --git a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp index 33e9cb67bd..538e37cd5a 100644 --- a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp +++ b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp @@ -206,11 +206,12 @@ EventsExecutor::spin_once_impl(std::chrono::nanoseconds timeout) timeout = std::chrono::nanoseconds::max(); } - // Select the smallest between input timeout and timer timeout + // Select the smallest between input timeout and timer timeout. + // Cancelled timers are not considered. bool is_timer_timeout = false; auto next_timer_timeout = timers_manager_->get_head_timeout(); - if (next_timer_timeout < timeout) { - timeout = next_timer_timeout; + if (next_timer_timeout.has_value() && next_timer_timeout.value() < timeout) { + timeout = next_timer_timeout.value(); is_timer_timeout = true; } diff --git a/rclcpp/src/rclcpp/experimental/timers_manager.cpp b/rclcpp/src/rclcpp/experimental/timers_manager.cpp index f4ecd16b76..423287b1cc 100644 --- a/rclcpp/src/rclcpp/experimental/timers_manager.cpp +++ b/rclcpp/src/rclcpp/experimental/timers_manager.cpp @@ -100,7 +100,7 @@ void TimersManager::stop() } } -std::chrono::nanoseconds TimersManager::get_head_timeout() +std::optional TimersManager::get_head_timeout() { // Do not allow to interfere with the thread running if (running_) { @@ -169,7 +169,7 @@ void TimersManager::execute_ready_timer(const rclcpp::TimerBase * timer_id) } } -std::chrono::nanoseconds TimersManager::get_head_timeout_unsafe() +std::optional TimersManager::get_head_timeout_unsafe() { // If we don't have any weak pointer, then we just return maximum timeout if (weak_timers_heap_.empty()) { @@ -191,7 +191,9 @@ std::chrono::nanoseconds TimersManager::get_head_timeout_unsafe() } head_timer = locked_heap.front(); } - + if (head_timer->is_canceled()) { + return std::nullopt; + } return head_timer->time_until_trigger(); } @@ -242,17 +244,34 @@ void TimersManager::run_timers() // Lock mutex std::unique_lock lock(timers_mutex_); - std::chrono::nanoseconds time_to_sleep = get_head_timeout_unsafe(); + std::optional time_to_sleep = get_head_timeout_unsafe(); + + // If head timer was cancelled, try to reheap and get a new head. + // This avoids an edge condition where head timer is cancelled, but other + // valid timers remain in the heap. + if (!time_to_sleep.has_value()) { + // Re-heap to (possibly) move cancelled timer from head of heap. If + // entire heap is cancelled, this will still result in a nullopt. + TimersHeap locked_heap = weak_timers_heap_.validate_and_lock(); + locked_heap.heapify(); + weak_timers_heap_.store(locked_heap); + time_to_sleep = get_head_timeout_unsafe(); + } - // No need to wait if a timer is already available - if (time_to_sleep > std::chrono::nanoseconds::zero()) { - if (time_to_sleep != std::chrono::nanoseconds::max()) { - // Wait until timeout or notification that timers have been updated - timers_cv_.wait_for(lock, time_to_sleep, [this]() {return timers_updated_;}); - } else { - // Wait until notification that timers have been updated - timers_cv_.wait(lock, [this]() {return timers_updated_;}); - } + // If no timers, or all timers cancelled, wait for an update. + if (!time_to_sleep.has_value() || (time_to_sleep.value() == std::chrono::nanoseconds::max()) ) { + // Wait until notification that timers have been updated + timers_cv_.wait(lock, [this]() {return timers_updated_;}); + + // Re-heap in case ordering changed due to a cancelled timer + // re-activating. + TimersHeap locked_heap = weak_timers_heap_.validate_and_lock(); + locked_heap.heapify(); + weak_timers_heap_.store(locked_heap); + } else if (time_to_sleep.value() != std::chrono::nanoseconds::zero()) { + // If time_to_sleep is zero, we immediately execute. Otherwise, wait + // until timeout or notification that timers have been updated + timers_cv_.wait_for(lock, time_to_sleep.value(), [this]() {return timers_updated_;}); } // Reset timers updated flag diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 68138eb82b..fdf1136d74 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -36,8 +36,10 @@ #include "rclcpp/duration.hpp" #include "rclcpp/guard_condition.hpp" #include "rclcpp/rclcpp.hpp" +#include "rclcpp/time_source.hpp" #include "test_msgs/msg/empty.hpp" +#include "rosgraph_msgs/msg/clock.hpp" using namespace std::chrono_literals; @@ -939,3 +941,380 @@ TYPED_TEST(TestIntraprocessExecutors, testIntraprocessRetrigger) { executor.spin(); EXPECT_EQ(kNumMessages, this->callback_count.load()); } + +class TimerNode : public rclcpp::Node +{ +public: + explicit TimerNode(std::string subname) + : Node("timer_node", subname) + { + timer1_ = rclcpp::create_timer( + this->get_node_base_interface(), get_node_timers_interface(), + get_clock(), 1ms, + std::bind(&TimerNode::Timer1Callback, this)); + + timer2_ = + rclcpp::create_timer( + this->get_node_base_interface(), get_node_timers_interface(), + get_clock(), 1ms, + std::bind(&TimerNode::Timer2Callback, this)); + } + + int GetTimer1Cnt() {return cnt1_;} + int GetTimer2Cnt() {return cnt2_;} + + void ResetTimer1() + { + timer1_->reset(); + } + + void ResetTimer2() + { + timer2_->reset(); + } + + void CancelTimer1() + { + RCLCPP_DEBUG(this->get_logger(), "Timer 1 cancelling!"); + timer1_->cancel(); + } + + void CancelTimer2() + { + RCLCPP_DEBUG(this->get_logger(), "Timer 2 cancelling!"); + timer2_->cancel(); + } + +private: + void Timer1Callback() + { + RCLCPP_DEBUG(this->get_logger(), "Timer 1!"); + cnt1_++; + } + + void Timer2Callback() + { + RCLCPP_DEBUG(this->get_logger(), "Timer 2!"); + cnt2_++; + } + + rclcpp::TimerBase::SharedPtr timer1_; + rclcpp::TimerBase::SharedPtr timer2_; + int cnt1_ = 0; + int cnt2_ = 0; +}; + +// Sets up a separate thread to publish /clock messages. +// Clock rate relative to real clock is controlled by realtime_update_rate. +// This is set conservatively slow to ensure unit tests are reliable on Windows +// environments, where timing performance is subpar. +// +// Use `sleep_for` in tests to advance the clock. Clock should run and be published +// in separate thread continuously to ensure correct behavior in node under test. +class ClockPublisher : public rclcpp::Node +{ +public: + explicit ClockPublisher(float simulated_clock_step = .001f, float realtime_update_rate = 0.25f) + : Node("clock_publisher"), + ros_update_duration_(0, 0), + realtime_clock_step_(0, 0), + rostime_(0, 0) + { + clock_publisher_ = this->create_publisher("clock", 10); + realtime_clock_step_ = + rclcpp::Duration::from_seconds(simulated_clock_step / realtime_update_rate); + ros_update_duration_ = rclcpp::Duration::from_seconds(simulated_clock_step); + + timer_thread_ = std::thread(&ClockPublisher::RunTimer, this); + } + + ~ClockPublisher() + { + running_ = false; + if (timer_thread_.joinable()) { + timer_thread_.join(); + } + } + + void sleep_for(rclcpp::Duration duration) + { + rclcpp::Time start_time(0, 0, RCL_ROS_TIME); + { + const std::lock_guard lock(mutex_); + start_time = rostime_; + } + rclcpp::Time current_time = start_time; + + while (true) { + { + const std::lock_guard lock(mutex_); + current_time = rostime_; + } + if ((current_time - start_time) >= duration) { + return; + } + std::this_thread::sleep_for(realtime_clock_step_.to_chrono()); + rostime_ += ros_update_duration_; + } + } + +private: + void RunTimer() + { + while (running_) { + PublishClock(); + std::this_thread::sleep_for(realtime_clock_step_.to_chrono()); + } + } + + void PublishClock() + { + const std::lock_guard lock(mutex_); + auto message = rosgraph_msgs::msg::Clock(); + message.clock = rostime_; + clock_publisher_->publish(message); + } + + rclcpp::Publisher::SharedPtr clock_publisher_; + + rclcpp::Duration ros_update_duration_; + rclcpp::Duration realtime_clock_step_; + // Rostime must be guarded by a mutex, since accessible in running thread + // as well as sleep_for + rclcpp::Time rostime_; + std::mutex mutex_; + std::thread timer_thread_; + std::atomic running_ = true; +}; + + +template +class TestTimerCancelBehavior : public ::testing::Test +{ +public: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + + static void TearDownTestCase() + { + rclcpp::shutdown(); + } + + void SetUp() + { + const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info(); + std::stringstream test_name; + test_name << test_info->test_case_name() << "_" << test_info->name(); + node = std::make_shared(test_name.str()); + param_client = std::make_shared(node); + ASSERT_TRUE(param_client->wait_for_service(5s)); + + auto set_parameters_results = param_client->set_parameters( + {rclcpp::Parameter("use_sim_time", false)}); + for (auto & result : set_parameters_results) { + ASSERT_TRUE(result.successful); + } + + // Run standalone thread to publish clock time + sim_clock_node = std::make_shared(); + + // Spin the executor in a standalone thread + executor.add_node(this->node); + standalone_thread = std::thread( + [this]() { + executor.spin(); + }); + } + + void TearDown() + { + node.reset(); + + // Clean up thread object + if (standalone_thread.joinable()) { + standalone_thread.join(); + } + } + + std::shared_ptr node; + std::shared_ptr sim_clock_node; + rclcpp::SyncParametersClient::SharedPtr param_client; + std::thread standalone_thread; + T executor; +}; + +TYPED_TEST_SUITE(TestTimerCancelBehavior, ExecutorTypes, ExecutorTypeNames); + +TYPED_TEST(TestTimerCancelBehavior, testTimer1CancelledWithExecutorSpin) { + // Validate that cancelling one timer yields no change in behavior for other + // timers. Specifically, this tests the behavior when using spin() to run the + // executor, which is the most common usecase. + + // Cancel to stop the spin after some time. + this->sim_clock_node->sleep_for(50ms); + this->node->CancelTimer1(); + this->sim_clock_node->sleep_for(150ms); + this->executor.cancel(); + + int t1_runs = this->node->GetTimer1Cnt(); + int t2_runs = this->node->GetTimer2Cnt(); + EXPECT_NE(t1_runs, t2_runs); + // Check that t2 has significantly more calls + EXPECT_LT(t1_runs + 50, t2_runs); +} + +TYPED_TEST(TestTimerCancelBehavior, testTimer2CancelledWithExecutorSpin) { + // Validate that cancelling one timer yields no change in behavior for other + // timers. Specifically, this tests the behavior when using spin() to run the + // executor, which is the most common usecase. + + // Cancel to stop the spin after some time. + this->sim_clock_node->sleep_for(50ms); + this->node->CancelTimer2(); + this->sim_clock_node->sleep_for(150ms); + this->executor.cancel(); + + int t1_runs = this->node->GetTimer1Cnt(); + int t2_runs = this->node->GetTimer2Cnt(); + EXPECT_NE(t1_runs, t2_runs); + // Check that t1 has significantly more calls + EXPECT_LT(t2_runs + 50, t1_runs); +} + +TYPED_TEST(TestTimerCancelBehavior, testHeadTimerCancelThenResetBehavior) { + // Validate that cancelling timer doesn't affect operation of other timers, + // and that the cancelled timer starts executing normally once reset manually. + + // Cancel to stop the spin after some time. + this->sim_clock_node->sleep_for(50ms); + this->node->CancelTimer1(); + this->sim_clock_node->sleep_for(150ms); + int t1_runs_initial = this->node->GetTimer1Cnt(); + int t2_runs_initial = this->node->GetTimer2Cnt(); + + // Manually reset timer 1, then sleep again + // Counts should update. + this->node->ResetTimer1(); + this->sim_clock_node->sleep_for(150ms); + int t1_runs_final = this->node->GetTimer1Cnt(); + int t2_runs_final = this->node->GetTimer2Cnt(); + + this->executor.cancel(); + + // T1 should have been restarted, and execute about 15 additional times. + // Check 10 greater than initial, to account for some timing jitter. + EXPECT_LT(t1_runs_initial + 50, t1_runs_final); + + EXPECT_LT(t1_runs_initial + 50, t2_runs_initial); + // Check that t2 has significantly more calls, and keeps getting called. + EXPECT_LT(t2_runs_initial + 50, t2_runs_final); +} + +TYPED_TEST(TestTimerCancelBehavior, testBackTimerCancelThenResetBehavior) { + // Validate that cancelling timer doesn't affect operation of other timers, + // and that the cancelled timer starts executing normally once reset manually. + + // Cancel to stop the spin after some time. + this->sim_clock_node->sleep_for(50ms); + this->node->CancelTimer2(); + this->sim_clock_node->sleep_for(150ms); + int t1_runs_initial = this->node->GetTimer1Cnt(); + int t2_runs_initial = this->node->GetTimer2Cnt(); + + // Manually reset timer 1, then sleep again + // Counts should update. + this->node->ResetTimer2(); + this->sim_clock_node->sleep_for(150ms); + int t1_runs_final = this->node->GetTimer1Cnt(); + int t2_runs_final = this->node->GetTimer2Cnt(); + + this->executor.cancel(); + + // T2 should have been restarted, and execute about 15 additional times. + // Check 10 greater than initial, to account for some timing jitter. + EXPECT_LT(t2_runs_initial + 50, t2_runs_final); + + EXPECT_LT(t2_runs_initial + 50, t1_runs_initial); + // Check that t1 has significantly more calls, and keeps getting called. + EXPECT_LT(t1_runs_initial + 50, t1_runs_final); +} + +TYPED_TEST(TestTimerCancelBehavior, testBothTimerCancelThenResetT1Behavior) { + // Validate behavior from cancelling 2 timers, then only re-enabling one of them. + // Ensure that only the reset timer is executed. + + // Cancel to stop the spin after some time. + this->sim_clock_node->sleep_for(50ms); + this->node->CancelTimer1(); + this->node->CancelTimer2(); + this->sim_clock_node->sleep_for(150ms); + int t1_runs_initial = this->node->GetTimer1Cnt(); + int t2_runs_initial = this->node->GetTimer2Cnt(); + + // Manually reset timer 1, then sleep again + // Counts should update. + this->node->ResetTimer1(); + this->sim_clock_node->sleep_for(150ms); + int t1_runs_intermediate = this->node->GetTimer1Cnt(); + int t2_runs_intermediate = this->node->GetTimer2Cnt(); + + this->node->ResetTimer2(); + this->sim_clock_node->sleep_for(150ms); + int t1_runs_final = this->node->GetTimer1Cnt(); + int t2_runs_final = this->node->GetTimer2Cnt(); + + this->executor.cancel(); + + // T1 and T2 should have the same initial count. + EXPECT_LE(std::abs(t1_runs_initial - t2_runs_initial), 1); + + // Expect that T1 has up to 15 more calls than t2. Add some buffer + // to account for jitter. + EXPECT_EQ(t2_runs_initial, t2_runs_intermediate); + EXPECT_LT(t1_runs_initial + 50, t1_runs_intermediate); + + // Expect that by end of test, both are running properly again. + EXPECT_LT(t1_runs_intermediate + 50, t1_runs_final); + EXPECT_LT(t2_runs_intermediate + 50, t2_runs_final); +} + +TYPED_TEST(TestTimerCancelBehavior, testBothTimerCancelThenResetT2Behavior) { + // Validate behavior from cancelling 2 timers, then only re-enabling one of them. + // Ensure that only the reset timer is executed. + + // Cancel to stop the spin after some time. + this->sim_clock_node->sleep_for(50ms); + this->node->CancelTimer1(); + this->node->CancelTimer2(); + this->sim_clock_node->sleep_for(150ms); + int t1_runs_initial = this->node->GetTimer1Cnt(); + int t2_runs_initial = this->node->GetTimer2Cnt(); + + // Manually reset timer 1, then sleep again + // Counts should update. + this->node->ResetTimer2(); + this->sim_clock_node->sleep_for(150ms); + int t1_runs_intermediate = this->node->GetTimer1Cnt(); + int t2_runs_intermediate = this->node->GetTimer2Cnt(); + + this->node->ResetTimer1(); + this->sim_clock_node->sleep_for(150ms); + int t1_runs_final = this->node->GetTimer1Cnt(); + int t2_runs_final = this->node->GetTimer2Cnt(); + + this->executor.cancel(); + + // T1 and T2 should have the same initial count. + EXPECT_LE(std::abs(t1_runs_initial - t2_runs_initial), 1); + + // Expect that T1 has up to 15 more calls than t2. Add some buffer + // to account for jitter. + EXPECT_EQ(t1_runs_initial, t1_runs_intermediate); + EXPECT_LT(t2_runs_initial + 50, t2_runs_intermediate); + + // Expect that by end of test, both are running properly again. + EXPECT_LT(t1_runs_intermediate + 50, t1_runs_final); + EXPECT_LT(t2_runs_intermediate + 50, t2_runs_final); +} diff --git a/rclcpp/test/rclcpp/test_timers_manager.cpp b/rclcpp/test/rclcpp/test_timers_manager.cpp index 635ec322c0..fcd8d74398 100644 --- a/rclcpp/test/rclcpp/test_timers_manager.cpp +++ b/rclcpp/test/rclcpp/test_timers_manager.cpp @@ -417,3 +417,49 @@ TEST_F(TestTimersManager, infinite_loop) EXPECT_LT(0u, t1_runs); EXPECT_LT(0u, t2_runs); } + +// Validate that cancelling one timer yields no change in behavior for other +// timers. +TEST_F(TestTimersManager, check_one_timer_cancel_doesnt_affect_other_timers) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + size_t t1_runs = 0; + std::shared_ptr t1; + // After a while cancel t1. Don't remove it though. + // Simulates typical usage in a Node where a timer is cancelled but not removed, + // since typical users aren't going to mess around with the timer manager. + t1 = TimerT::make_shared( + 1ms, + [&t1_runs, &t1]() { + t1_runs++; + if (t1_runs == 5) { + t1->cancel(); + } + }, + rclcpp::contexts::get_global_default_context()); + + size_t t2_runs = 0; + auto t2 = TimerT::make_shared( + 1ms, + [&t2_runs]() { + t2_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + // Add timers + timers_manager->add_timer(t1); + timers_manager->add_timer(t2); + + // Start timers thread + timers_manager->start(); + + std::this_thread::sleep_for(15ms); + + // t1 has stopped running + EXPECT_NE(t1_runs, t2_runs); + // Check that t2 has significantly more calls + EXPECT_LT(t1_runs + 5, t2_runs); + timers_manager->stop(); +}