From 118773c9e618f6dc8ae0a5c104028fbcb62344e4 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Sun, 9 Jun 2024 21:55:07 +0800 Subject: [PATCH 1/5] Release ownership of entities after spinning cancelled Signed-off-by: Barry Xu --- rclcpp/src/rclcpp/executor.cpp | 2 ++ rclcpp/test/rclcpp/CMakeLists.txt | 2 +- rclcpp/test/rclcpp/test_executor.cpp | 23 +++++++++++++++++++++++ 3 files changed, 26 insertions(+), 1 deletion(-) diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 95b0d7fcc8..eefe8cef01 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -885,6 +885,8 @@ Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanos // Wait for subscriptions or timers to work on wait_for_work(timeout); if (!spinning.load()) { + // Clear wait result to release ownership of entity + wait_result_.reset(); return false; } // Try again diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index b950bd149b..b171b215d9 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -641,7 +641,7 @@ ament_add_gtest(test_executor test_executor.cpp TIMEOUT 120) ament_add_test_label(test_executor mimick) if(TARGET test_executor) - target_link_libraries(test_executor ${PROJECT_NAME} mimick) + target_link_libraries(test_executor ${PROJECT_NAME} mimick ${test_msgs_TARGETS}) endif() ament_add_gtest(test_graph_listener test_graph_listener.cpp) diff --git a/rclcpp/test/rclcpp/test_executor.cpp b/rclcpp/test/rclcpp/test_executor.cpp index 668ab96797..a3554be131 100644 --- a/rclcpp/test/rclcpp/test_executor.cpp +++ b/rclcpp/test/rclcpp/test_executor.cpp @@ -23,6 +23,7 @@ #include "rclcpp/memory_strategy.hpp" #include "rclcpp/executors/single_threaded_executor.hpp" #include "rclcpp/strategies/allocator_memory_strategy.hpp" +#include "test_msgs/srv/empty.hpp" #include "../mocking_utils/patch.hpp" #include "../utils/rclcpp_gtest_macros.hpp" @@ -508,3 +509,25 @@ TEST_F(TestExecutor, is_spinning) { ASSERT_TRUE(timer_called); } + +TEST_F(TestExecutor, release_ownership_entity_after_spinning_cancel) { + using namespace std::chrono_literals; + + // Create an Executor + rclcpp::executors::SingleThreadedExecutor executor; + + auto future = std::async(std::launch::async, [&executor] {executor.spin();}); + + auto node = std::make_shared("test_node"); + auto callback = []( + const test_msgs::srv::Empty::Request::SharedPtr, test_msgs::srv::Empty::Response::SharedPtr) { + }; + auto server = node->create_service("test_service", callback); + executor.add_node(node); + std::this_thread::sleep_for(50ms); + executor.cancel(); + std::future_status future_status = future.wait_for(1s); + EXPECT_EQ(future_status, std::future_status::ready); + + EXPECT_EQ(server.use_count(), 1); +} From 429fc32d2a655fb4d00162717625396444f502e9 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Thu, 13 Jun 2024 11:03:58 +0800 Subject: [PATCH 2/5] Move release action to every exit point in different spin functions Signed-off-by: Barry Xu --- rclcpp/src/rclcpp/executor.cpp | 8 +++----- rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp | 2 +- rclcpp/src/rclcpp/executors/single_threaded_executor.cpp | 2 +- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index eefe8cef01..20718492ec 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -275,7 +275,7 @@ Executor::spin_until_future_complete_impl( if (spinning.exchange(true)) { throw std::runtime_error("spin_until_future_complete() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false);wait_result_.reset();); while (rclcpp::ok(this->context_) && spinning.load()) { // Do one item of work. spin_once_impl(timeout_left); @@ -364,7 +364,7 @@ Executor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive) if (spinning.exchange(true)) { throw std::runtime_error("spin_some() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false);wait_result_.reset();); // clear the wait result and wait for work without blocking to collect the work // for the first time @@ -431,7 +431,7 @@ Executor::spin_once(std::chrono::nanoseconds timeout) if (spinning.exchange(true)) { throw std::runtime_error("spin_once() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false);wait_result_.reset();); spin_once_impl(timeout); } @@ -885,8 +885,6 @@ Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanos // Wait for subscriptions or timers to work on wait_for_work(timeout); if (!spinning.load()) { - // Clear wait result to release ownership of entity - wait_result_.reset(); return false; } // Try again diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 2d5c76e817..e742faf99c 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -55,7 +55,7 @@ MultiThreadedExecutor::spin() if (spinning.exchange(true)) { throw std::runtime_error("spin() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false);); + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false);wait_result_.reset();); std::vector threads; size_t thread_id = 0; { diff --git a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp index 975733b497..f4bbe654cf 100644 --- a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp @@ -30,7 +30,7 @@ SingleThreadedExecutor::spin() if (spinning.exchange(true)) { throw std::runtime_error("spin() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); wait_result_.reset();); // Clear any previous result and rebuild the waitset this->wait_result_.reset(); From 50bce781a43565f6efda062f04609887531254b2 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Fri, 14 Jun 2024 10:56:27 +0800 Subject: [PATCH 3/5] Move wait_result_.reset() before setting spinning to false Signed-off-by: Barry Xu --- rclcpp/src/rclcpp/executor.cpp | 6 +++--- rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp | 2 +- rclcpp/src/rclcpp/executors/single_threaded_executor.cpp | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 20718492ec..181f0c9a6a 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -275,7 +275,7 @@ Executor::spin_until_future_complete_impl( if (spinning.exchange(true)) { throw std::runtime_error("spin_until_future_complete() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false);wait_result_.reset();); + RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false);); while (rclcpp::ok(this->context_) && spinning.load()) { // Do one item of work. spin_once_impl(timeout_left); @@ -364,7 +364,7 @@ Executor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive) if (spinning.exchange(true)) { throw std::runtime_error("spin_some() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false);wait_result_.reset();); + RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false);); // clear the wait result and wait for work without blocking to collect the work // for the first time @@ -431,7 +431,7 @@ Executor::spin_once(std::chrono::nanoseconds timeout) if (spinning.exchange(true)) { throw std::runtime_error("spin_once() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false);wait_result_.reset();); + RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false);); spin_once_impl(timeout); } diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index e742faf99c..130b4d953f 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -55,7 +55,7 @@ MultiThreadedExecutor::spin() if (spinning.exchange(true)) { throw std::runtime_error("spin() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false);wait_result_.reset();); + RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false);); std::vector threads; size_t thread_id = 0; { diff --git a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp index f4bbe654cf..689bbae398 100644 --- a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp @@ -30,7 +30,7 @@ SingleThreadedExecutor::spin() if (spinning.exchange(true)) { throw std::runtime_error("spin() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); wait_result_.reset();); + RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false);); // Clear any previous result and rebuild the waitset this->wait_result_.reset(); From 815de21632e27a94c33c89df25e5d89e1ad26d6b Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Wed, 3 Jul 2024 16:14:14 +0800 Subject: [PATCH 4/5] Update test code Signed-off-by: Barry Xu --- rclcpp/test/rclcpp/test_executor.cpp | 48 +++++++++++++++++++++++++--- 1 file changed, 43 insertions(+), 5 deletions(-) diff --git a/rclcpp/test/rclcpp/test_executor.cpp b/rclcpp/test/rclcpp/test_executor.cpp index a3554be131..e934847f0e 100644 --- a/rclcpp/test/rclcpp/test_executor.cpp +++ b/rclcpp/test/rclcpp/test_executor.cpp @@ -21,6 +21,7 @@ #include "rclcpp/executor.hpp" #include "rclcpp/memory_strategy.hpp" +#include "rclcpp/executors/multi_threaded_executor.hpp" #include "rclcpp/executors/single_threaded_executor.hpp" #include "rclcpp/strategies/allocator_memory_strategy.hpp" #include "test_msgs/srv/empty.hpp" @@ -510,24 +511,61 @@ TEST_F(TestExecutor, is_spinning) { ASSERT_TRUE(timer_called); } -TEST_F(TestExecutor, release_ownership_entity_after_spinning_cancel) { +class TestAllThreadedExecutor + : public ::testing::Test, public ::testing::WithParamInterface +{ +public: + void SetUp() override + { + rclcpp::init(0, nullptr); + } + + void TearDown() override + { + rclcpp::shutdown(); + } +}; + +TEST_P(TestAllThreadedExecutor, release_ownership_entity_after_spinning_cancel) { using namespace std::chrono_literals; + const std::string single_threaded_executor( + typeid(rclcpp::executors::SingleThreadedExecutor).name()); + const std::string multi_threaded_executor( + typeid(rclcpp::executors::MultiThreadedExecutor).name()); // Create an Executor - rclcpp::executors::SingleThreadedExecutor executor; + auto type_name = GetParam(); + std::shared_ptr executor; + if (single_threaded_executor == type_name) { + executor = std::make_shared(); + } else if (multi_threaded_executor == type_name) { + executor = std::make_shared(); + } else { + FAIL() << "Unsupported Executor Type !"; + } - auto future = std::async(std::launch::async, [&executor] {executor.spin();}); + auto future = std::async(std::launch::async, [&executor] {executor->spin();}); auto node = std::make_shared("test_node"); auto callback = []( const test_msgs::srv::Empty::Request::SharedPtr, test_msgs::srv::Empty::Response::SharedPtr) { }; auto server = node->create_service("test_service", callback); - executor.add_node(node); + while (!executor->is_spinning()) { + std::this_thread::sleep_for(50ms); + } + executor->add_node(node); std::this_thread::sleep_for(50ms); - executor.cancel(); + executor->cancel(); std::future_status future_status = future.wait_for(1s); EXPECT_EQ(future_status, std::future_status::ready); EXPECT_EQ(server.use_count(), 1); } + +INSTANTIATE_TEST_SUITE_P( + TestAllThreadedExecutorWithParam, + TestAllThreadedExecutor, + ::testing::Values( + std::string(typeid(rclcpp::executors::SingleThreadedExecutor).name()), + std::string(typeid(rclcpp::executors::MultiThreadedExecutor).name()))); From cc70aad3083691211705a2acd35d2db8d35a6986 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Thu, 4 Jul 2024 13:10:54 +0800 Subject: [PATCH 5/5] Move test code to test_executors.cpp Signed-off-by: Barry Xu --- .../test/rclcpp/executors/test_executors.cpp | 25 ++++++++ rclcpp/test/rclcpp/test_executor.cpp | 61 ------------------- 2 files changed, 25 insertions(+), 61 deletions(-) diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 5aae03c5bc..a7129d4488 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -39,6 +39,7 @@ #include "rclcpp/time_source.hpp" #include "test_msgs/msg/empty.hpp" +#include "test_msgs/srv/empty.hpp" #include "./executor_types.hpp" #include "./test_waitable.hpp" @@ -831,3 +832,27 @@ TEST(TestExecutors, testSpinWithNonDefaultContext) rclcpp::shutdown(non_default_context); } + +TYPED_TEST(TestExecutors, release_ownership_entity_after_spinning_cancel) +{ + using ExecutorType = TypeParam; + ExecutorType executor; + + auto future = std::async(std::launch::async, [&executor] {executor.spin();}); + + auto node = std::make_shared("test_node"); + auto callback = []( + const test_msgs::srv::Empty::Request::SharedPtr, test_msgs::srv::Empty::Response::SharedPtr) { + }; + auto server = node->create_service("test_service", callback); + while (!executor.is_spinning()) { + std::this_thread::sleep_for(50ms); + } + executor.add_node(node); + std::this_thread::sleep_for(50ms); + executor.cancel(); + std::future_status future_status = future.wait_for(1s); + EXPECT_EQ(future_status, std::future_status::ready); + + EXPECT_EQ(server.use_count(), 1); +} diff --git a/rclcpp/test/rclcpp/test_executor.cpp b/rclcpp/test/rclcpp/test_executor.cpp index e934847f0e..668ab96797 100644 --- a/rclcpp/test/rclcpp/test_executor.cpp +++ b/rclcpp/test/rclcpp/test_executor.cpp @@ -21,10 +21,8 @@ #include "rclcpp/executor.hpp" #include "rclcpp/memory_strategy.hpp" -#include "rclcpp/executors/multi_threaded_executor.hpp" #include "rclcpp/executors/single_threaded_executor.hpp" #include "rclcpp/strategies/allocator_memory_strategy.hpp" -#include "test_msgs/srv/empty.hpp" #include "../mocking_utils/patch.hpp" #include "../utils/rclcpp_gtest_macros.hpp" @@ -510,62 +508,3 @@ TEST_F(TestExecutor, is_spinning) { ASSERT_TRUE(timer_called); } - -class TestAllThreadedExecutor - : public ::testing::Test, public ::testing::WithParamInterface -{ -public: - void SetUp() override - { - rclcpp::init(0, nullptr); - } - - void TearDown() override - { - rclcpp::shutdown(); - } -}; - -TEST_P(TestAllThreadedExecutor, release_ownership_entity_after_spinning_cancel) { - using namespace std::chrono_literals; - const std::string single_threaded_executor( - typeid(rclcpp::executors::SingleThreadedExecutor).name()); - const std::string multi_threaded_executor( - typeid(rclcpp::executors::MultiThreadedExecutor).name()); - - // Create an Executor - auto type_name = GetParam(); - std::shared_ptr executor; - if (single_threaded_executor == type_name) { - executor = std::make_shared(); - } else if (multi_threaded_executor == type_name) { - executor = std::make_shared(); - } else { - FAIL() << "Unsupported Executor Type !"; - } - - auto future = std::async(std::launch::async, [&executor] {executor->spin();}); - - auto node = std::make_shared("test_node"); - auto callback = []( - const test_msgs::srv::Empty::Request::SharedPtr, test_msgs::srv::Empty::Response::SharedPtr) { - }; - auto server = node->create_service("test_service", callback); - while (!executor->is_spinning()) { - std::this_thread::sleep_for(50ms); - } - executor->add_node(node); - std::this_thread::sleep_for(50ms); - executor->cancel(); - std::future_status future_status = future.wait_for(1s); - EXPECT_EQ(future_status, std::future_status::ready); - - EXPECT_EQ(server.use_count(), 1); -} - -INSTANTIATE_TEST_SUITE_P( - TestAllThreadedExecutorWithParam, - TestAllThreadedExecutor, - ::testing::Values( - std::string(typeid(rclcpp::executors::SingleThreadedExecutor).name()), - std::string(typeid(rclcpp::executors::MultiThreadedExecutor).name())));