From cd255f833b561cfcf7f54ec06174d9e5b8f4a2f7 Mon Sep 17 00:00:00 2001 From: Robert Haschke Date: Sat, 17 Sep 2016 16:52:28 +0200 Subject: [PATCH] fix multi-threaded spinning (#867) * replace global spinmutex with SpinnerMonitor Using a single recursive mutex disables to run several spinners in parallel (started from different threads) - even if they operate on different callback queues. The SpinnerMonitor keeps a list of spinning callback queues, thus making the monitoring local to callback queues. * always start spinning, regardless of error * making the error fatal * fixup! replace global spinmutex with SpinnerMonitor * activate unittest for spinners - moved test/test_roscpp/test/test_spinners.cpp -> test/test_roscpp/test/src/spinners.cpp - created rostest test/test_roscpp/test/launch/spinners.xml - use thrown exception to evaluate error conditions * throw a std::run_time exception when spinner couldn't be started This correctly indicates the fatality of the error but allows for graceful quitting too. * correctly count number of started multi-threaded spinners fixes unittest Spinners.async * addressed comments * bug fix: it wasn't reassigned correctly * removed redundant line of code * turn sanity check into assertion * turn error into warning * rollback strict error checking for backwards compatibility Previously, we could have the following situations: 1. `S1` (single-threaded spinner started in `thread 1`): will block `thread 1` until shutdown. Any further spinners in different threads were not allowed (with an error message). 2. `M1` (multi-threaded spinner started in `thread 1`): Further spinners started from _different_ threads were not allowed (with an error message). 3. `M1 ... M1 S1` (multi-threaded spinners started in `thread 1` and afterwards a single-threaded one started): This was accepted without any errors. But the new behavior is to reject `S1`! Restrictions of case 1 + 2 are relaxed with this PR: Other spinners are allowed as long as the operate on a different queue. Thread doesn't matter. The tricky part is case 3, which - although nonsense - was perfectly valid code before. In order to maintain the old behavior, I need to remember, which thread the first M-spinner was started in, using the new variable `initial_tid`. * allow spinning of a single-threaded spinner after some multi-threaded ones, as long as they are started from the same thread * don't throw exceptions * disabled corresponding unittests * code simplification + improved comments Allow multiple single-threaded spinners (in same thread) and count their number. Thus single-threaded and multi-threaded spinners are handled similarly. --- clients/roscpp/include/ros/spinner.h | 8 +- clients/roscpp/src/libros/spinner.cpp | 154 ++++++++++++++---- test/test_roscpp/test/CMakeLists.txt | 18 +- test/test_roscpp/test/launch/spinners.xml | 4 + test/test_roscpp/test/src/CMakeLists.txt | 6 + .../{test_spinners.cpp => src/spinners.cpp} | 102 +++++++----- 6 files changed, 197 insertions(+), 95 deletions(-) create mode 100644 test/test_roscpp/test/launch/spinners.xml rename test/test_roscpp/test/{test_spinners.cpp => src/spinners.cpp} (63%) diff --git a/clients/roscpp/include/ros/spinner.h b/clients/roscpp/include/ros/spinner.h index 597204942e..d09f0a090e 100644 --- a/clients/roscpp/include/ros/spinner.h +++ b/clients/roscpp/include/ros/spinner.h @@ -109,9 +109,13 @@ class ROSCPP_DECL AsyncSpinner /** - * \brief Check if the spinner can be started. A spinner can't be started if - * another spinner is already running. + * \brief Check if the spinner can be started. The spinner shouldn't be started if + * another single-threaded spinner is already operating on the callback queue. + * + * This function is not necessary anymore. start() will always try to start spinning + * and throw a std::runtime_error if it failed. */ + // TODO: deprecate in L-turtle bool canStart(); /** * \brief Start this spinner spinning asynchronously diff --git a/clients/roscpp/src/libros/spinner.cpp b/clients/roscpp/src/libros/spinner.cpp index 9981aaa68d..4bae89a236 100644 --- a/clients/roscpp/src/libros/spinner.cpp +++ b/clients/roscpp/src/libros/spinner.cpp @@ -30,10 +30,111 @@ #include "ros/callback_queue.h" #include -#include +#include namespace { - boost::recursive_mutex spinmutex; + +const std::string DEFAULT_ERROR_MESSAGE = + "\nAttempt to spin a callback queue from two spinners, one of them being single-threaded." + "\nThis will probably result in callbacks being executed out-of-order." + "\nIn future this will throw an exception!"; + +/** class to monitor running single-threaded spinners. + * + * Calling the callbacks of a callback queue _in order_, requires a unique SingleThreadedSpinner + * spinning on the queue. Other threads accessing the callback queue will probably intercept execution order. + + * To avoid multiple SingleThreadedSpinners (started from different threads) to operate on the same callback queue, + * this class stores a map of all spinned callback queues. + * If the spinner is single threaded, the corresponding thread-id is stored in the map + * and if other threads will try to spin the same queue, an error message is issued. + * + * If the spinner is multi-threaded, the stored thread-id is NULL and future SingleThreadedSpinners + * should not spin this queue. However, other multi-threaded spinners are allowed. + */ +struct SpinnerMonitor +{ + /* store spinner information per callback queue: + Only alike spinners (single-threaded or multi-threaded) are allowed on a callback queue. + For single-threaded spinners we store their thread id. + We store the number of alike spinners operating on the callback queue. + */ + struct Entry + { + Entry(const boost::thread::id &tid, + const boost::thread::id &initial_tid) : tid(tid), initial_tid(initial_tid), num(0) {} + + boost::thread::id tid; // proper thread id of single-threaded spinner + boost::thread::id initial_tid; // to retain old behaviour, store first spinner's thread id + unsigned int num; // number of (alike) spinners serving this queue + }; + + /// add a queue to the list + bool add(ros::CallbackQueue* queue, bool single_threaded) + { + boost::mutex::scoped_lock lock(mutex_); + + boost::thread::id current_tid = boost::this_thread::get_id(); + boost::thread::id tid; // current thread id for single-threaded spinners, zero for multi-threaded ones + if (single_threaded) + tid = current_tid; + + std::map::iterator it = spinning_queues_.find(queue); + bool can_spin = ( it == spinning_queues_.end() || // we will spin on any new queue + it->second.tid == tid ); // otherwise spinner must be alike (all multi-threaded: 0, or single-threaded on same thread id) + + if (!can_spin) + { + // Previous behavior (up to Kinetic) was to accept multiple spinners on a queue + // as long as they were started from the same thread. Although this is wrong behavior, + // we retain it here for backwards compatibility, i.e. we allow spinning of a + // single-threaded spinner after several multi-threaded ones, given that they + // were started from the same initial thread + if (it->second.initial_tid == tid) + { + ROS_ERROR_STREAM("SpinnerMonitor: single-threaded spinner after multi-threaded one(s)." + << DEFAULT_ERROR_MESSAGE + << " Only allowed for backwards compatibility."); + it->second.tid = tid; // "upgrade" tid to represent single-threaded spinner + } + else + return false; + } + + if (it == spinning_queues_.end()) + it = spinning_queues_.insert(it, std::make_pair(queue, Entry(tid, current_tid))); + + // increment number of active spinners + it->second.num += 1; + + return true; + } + + /// remove a queue from the list + void remove(ros::CallbackQueue* queue) + { + boost::mutex::scoped_lock lock(mutex_); + std::map::iterator it = spinning_queues_.find(queue); + ROS_ASSERT_MSG(it != spinning_queues_.end(), "Call to SpinnerMonitor::remove() without matching call to add()."); + + if (it->second.tid != boost::thread::id() && it->second.tid != boost::this_thread::get_id()) + { + // This doesn't harm, but isn't good practice? + // It was enforced by the previous implementation. + ROS_WARN("SpinnerMonitor::remove() called from different thread than add()."); + } + + ROS_ASSERT_MSG(it->second.num > 0, "SpinnerMonitor::remove(): Invalid spinner count (0) encountered."); + it->second.num -= 1; + if (it->second.num == 0) + spinning_queues_.erase(it); // erase queue entry to allow future queues with same pointer + } + + std::map spinning_queues_; + boost::mutex mutex_; +}; + +SpinnerMonitor spinner_monitor; } namespace ros @@ -42,25 +143,24 @@ namespace ros void SingleThreadedSpinner::spin(CallbackQueue* queue) { - boost::recursive_mutex::scoped_try_lock spinlock(spinmutex); - if(!spinlock.owns_lock()) { - ROS_ERROR("SingleThreadedSpinner: You've attempted to call spin " - "from multiple threads. Use a MultiThreadedSpinner instead."); - return; - } - - ros::WallDuration timeout(0.1f); - if (!queue) { queue = getGlobalCallbackQueue(); } + if (!spinner_monitor.add(queue, true)) + { + ROS_ERROR_STREAM("SingleThreadedSpinner: " << DEFAULT_ERROR_MESSAGE); + return; + } + + ros::WallDuration timeout(0.1f); ros::NodeHandle n; while (n.ok()) { queue->callAvailable(timeout); } + spinner_monitor.remove(queue); } MultiThreadedSpinner::MultiThreadedSpinner(uint32_t thread_count) @@ -70,13 +170,6 @@ MultiThreadedSpinner::MultiThreadedSpinner(uint32_t thread_count) void MultiThreadedSpinner::spin(CallbackQueue* queue) { - boost::recursive_mutex::scoped_try_lock spinlock(spinmutex); - if (!spinlock.owns_lock()) { - ROS_ERROR("MultiThreadeSpinner: You've attempted to call ros::spin " - "from multiple threads... " - "but this spinner is already multithreaded."); - return; - } AsyncSpinner s(thread_count_, queue); s.start(); @@ -97,7 +190,6 @@ class AsyncSpinnerImpl void threadFunc(); boost::mutex mutex_; - boost::recursive_mutex::scoped_try_lock member_spinlock; boost::thread_group threads_; uint32_t thread_count_; @@ -136,8 +228,7 @@ AsyncSpinnerImpl::~AsyncSpinnerImpl() bool AsyncSpinnerImpl::canStart() { - boost::recursive_mutex::scoped_try_lock spinlock(spinmutex); - return spinlock.owns_lock(); + return true; } void AsyncSpinnerImpl::start() @@ -145,18 +236,13 @@ void AsyncSpinnerImpl::start() boost::mutex::scoped_lock lock(mutex_); if (continue_) - return; + return; // already spinning - boost::recursive_mutex::scoped_try_lock spinlock(spinmutex); - if (!spinlock.owns_lock()) { - ROS_WARN("AsyncSpinnerImpl: Attempt to start() an AsyncSpinner failed " - "because another AsyncSpinner is already running. Note that the " - "other AsyncSpinner might not be using the same callback queue " - "as this AsyncSpinner, in which case no callbacks in your " - "callback queue will be serviced."); + if (!spinner_monitor.add(callback_queue_, false)) + { + ROS_ERROR_STREAM("AsyncSpinnerImpl: " << DEFAULT_ERROR_MESSAGE); return; } - spinlock.swap(member_spinlock); continue_ = true; @@ -172,14 +258,10 @@ void AsyncSpinnerImpl::stop() if (!continue_) return; - ROS_ASSERT_MSG(member_spinlock.owns_lock(), - "Async spinner's member lock doesn't own the global spinlock, hrm."); - ROS_ASSERT_MSG(member_spinlock.mutex() == &spinmutex, - "Async spinner's member lock owns a lock on the wrong mutex?!?!?"); - member_spinlock.unlock(); - continue_ = false; threads_.join_all(); + + spinner_monitor.remove(callback_queue_); } void AsyncSpinnerImpl::threadFunc() diff --git a/test/test_roscpp/test/CMakeLists.txt b/test/test_roscpp/test/CMakeLists.txt index e510c692b8..7328a59828 100644 --- a/test/test_roscpp/test/CMakeLists.txt +++ b/test/test_roscpp/test/CMakeLists.txt @@ -3,21 +3,6 @@ if(TARGET ${PROJECT_NAME}-test_version) target_link_libraries(${PROJECT_NAME}-test_version) endif() -# WARNING test_spinners is not actually run. This is because this -# infrastructure won't let me pass arguments from here through to -# these test units. otherwise one would have to build ten executables. -# also the output of test_spinners has to be visually inspected -# because there is no automatic way to ensure that an error condition -# provokes a particular message. -if(GTEST_FOUND) - include_directories(${GTEST_INCLUDE_DIRS}) - add_executable(${PROJECT_NAME}-test_spinners EXCLUDE_FROM_ALL test_spinners.cpp) - add_dependencies(tests ${PROJECT_NAME}-test_spinners) -endif() -if(TARGET ${PROJECT_NAME}-test_spinners) - target_link_libraries(${PROJECT_NAME}-test_spinners ${catkin_LIBRARIES} ${GTEST_LIBRARIES}) -endif() - catkin_add_gtest(${PROJECT_NAME}-test_header test_header.cpp) if(TARGET ${PROJECT_NAME}-test_header) target_link_libraries(${PROJECT_NAME}-test_header ${catkin_LIBRARIES}) @@ -161,3 +146,6 @@ add_rostest(launch/ns_node_remapping.xml) add_rostest(launch/search_param.xml) add_rostest(launch/stamped_topic_statistics_with_empty_timestamp.xml) + +# Test spinners +add_rostest(launch/spinners.xml) diff --git a/test/test_roscpp/test/launch/spinners.xml b/test/test_roscpp/test/launch/spinners.xml new file mode 100644 index 0000000000..15650a8b3f --- /dev/null +++ b/test/test_roscpp/test/launch/spinners.xml @@ -0,0 +1,4 @@ + + + + diff --git a/test/test_roscpp/test/src/CMakeLists.txt b/test/test_roscpp/test/src/CMakeLists.txt index 54f086d1e4..332f49be74 100644 --- a/test/test_roscpp/test/src/CMakeLists.txt +++ b/test/test_roscpp/test/src/CMakeLists.txt @@ -125,6 +125,10 @@ target_link_libraries(${PROJECT_NAME}-namespaces ${GTEST_LIBRARIES} ${catkin_LIB add_executable(${PROJECT_NAME}-params EXCLUDE_FROM_ALL params.cpp) target_link_libraries(${PROJECT_NAME}-params ${GTEST_LIBRARIES} ${catkin_LIBRARIES}) +# Test spinners +add_executable(${PROJECT_NAME}-spinners EXCLUDE_FROM_ALL spinners.cpp) +target_link_libraries(${PROJECT_NAME}-spinners ${GTEST_LIBRARIES} ${catkin_LIBRARIES}) + # Test getting information from the master add_executable(${PROJECT_NAME}-get_master_information EXCLUDE_FROM_ALL get_master_information.cpp) target_link_libraries(${PROJECT_NAME}-get_master_information ${GTEST_LIBRARIES} ${catkin_LIBRARIES}) @@ -251,6 +255,7 @@ if(TARGET tests) ${PROJECT_NAME}-name_remapping_with_ns ${PROJECT_NAME}-namespaces ${PROJECT_NAME}-params + ${PROJECT_NAME}-spinners ${PROJECT_NAME}-get_master_information ${PROJECT_NAME}-multiple_subscriptions ${PROJECT_NAME}-check_master @@ -316,6 +321,7 @@ add_dependencies(${PROJECT_NAME}-name_remapping ${${PROJECT_NAME}_EXPORTED_TARGE add_dependencies(${PROJECT_NAME}-name_remapping_with_ns ${${PROJECT_NAME}_EXPORTED_TARGETS}) add_dependencies(${PROJECT_NAME}-namespaces ${${PROJECT_NAME}_EXPORTED_TARGETS}) add_dependencies(${PROJECT_NAME}-params ${${PROJECT_NAME}_EXPORTED_TARGETS}) +add_dependencies(${PROJECT_NAME}-spinners ${${PROJECT_NAME}_EXPORTED_TARGETS}) add_dependencies(${PROJECT_NAME}-get_master_information ${${PROJECT_NAME}_EXPORTED_TARGETS}) add_dependencies(${PROJECT_NAME}-multiple_subscriptions ${${PROJECT_NAME}_EXPORTED_TARGETS}) add_dependencies(${PROJECT_NAME}-check_master ${${PROJECT_NAME}_EXPORTED_TARGETS}) diff --git a/test/test_roscpp/test/test_spinners.cpp b/test/test_roscpp/test/src/spinners.cpp similarity index 63% rename from test/test_roscpp/test/test_spinners.cpp rename to test/test_roscpp/test/src/spinners.cpp index 8645399800..b94ae0d389 100644 --- a/test/test_roscpp/test/test_spinners.cpp +++ b/test/test_roscpp/test/src/spinners.cpp @@ -67,76 +67,96 @@ void fire_shutdown(const ros::WallTimerEvent&) { TEST(Spinners, spin) { DOIT(); - ros::spin(); + ros::spin(); // will block until ROS shutdown } TEST(Spinners, spinfail) { DOIT(); boost::thread th(boost::bind(&ros::spin)); - ros::spin(); -} + ros::WallDuration(0.1).sleep(); // wait for thread to be started -TEST(Spinners, single) -{ - DOIT(); - SingleThreadedSpinner s; - ros::spin(s); + EXPECT_THROW(ros::spin(), std::runtime_error); + + SingleThreadedSpinner ss; + EXPECT_THROW(ros::spin(ss), std::runtime_error); + EXPECT_THROW(ss.spin(), std::runtime_error); + + MultiThreadedSpinner ms; + EXPECT_THROW(ros::spin(ms), std::runtime_error); + EXPECT_THROW(ms.spin(), std::runtime_error); + + AsyncSpinner as(2); + EXPECT_THROW(as.start(), std::runtime_error); + + ros::waitForShutdown(); } TEST(Spinners, singlefail) { DOIT(); - boost::thread th(boost::bind(&ros::spin)); - SingleThreadedSpinner s; - ros::spin(s); -} + SingleThreadedSpinner ss; + boost::thread th(boost::bind(&ros::spin, ss)); + ros::WallDuration(0.1).sleep(); // wait for thread to be started -TEST(Spinners, singlefail2) -{ - DOIT(); - SingleThreadedSpinner s; - boost::thread th(boost::bind(&ros::spin, s)); - ros::spin(s); + EXPECT_THROW(ros::spin(), std::runtime_error); + + SingleThreadedSpinner ss2; + EXPECT_THROW(ros::spin(ss2), std::runtime_error); + EXPECT_THROW(ss2.spin(), std::runtime_error); + + MultiThreadedSpinner ms; + EXPECT_THROW(ros::spin(ms), std::runtime_error); + EXPECT_THROW(ms.spin(), std::runtime_error); + + AsyncSpinner as(2); + EXPECT_THROW(as.start(), std::runtime_error); + + ros::waitForShutdown(); } TEST(Spinners, multi) { DOIT(); - MultiThreadedSpinner s; - ros::spin(s); + MultiThreadedSpinner ms; + ros::spin(ms); // will block until ROS shutdown } TEST(Spinners, multifail) { DOIT(); - boost::thread th(boost::bind(&ros::spin)); - MultiThreadedSpinner s; - ros::spin(s); -} + MultiThreadedSpinner ms; + boost::thread th(boost::bind(&ros::spin, ms)); + ros::WallDuration(0.1).sleep(); // wait for thread to be started -TEST(Spinners, multifail2) -{ - DOIT(); - MultiThreadedSpinner s; - boost::thread th(boost::bind(&ros::spin, s)); - ros::spin(s); + SingleThreadedSpinner ss2; + EXPECT_THROW(ros::spin(ss2), std::runtime_error); + EXPECT_THROW(ss2.spin(), std::runtime_error); + + // running another multi-threaded spinner is allowed + MultiThreadedSpinner ms2; + ros::spin(ms2); // will block until ROS shutdown } TEST(Spinners, async) { DOIT(); - AsyncSpinner s(2); - s.start(); - ros::waitForShutdown(); -} + AsyncSpinner as1(2); + as1.start(); + + // running another AsyncSpinner is allowed + AsyncSpinner as2(2); + as2.start(); + as2.stop(); + + SingleThreadedSpinner ss; + EXPECT_THROW(ros::spin(ss), std::runtime_error); + EXPECT_THROW(ss.spin(), std::runtime_error); + + // running a multi-threaded spinner is allowed + MultiThreadedSpinner ms; + ros::spin(ms); // will block until ROS shutdown -TEST(Spinners, asyncfail) -{ - DOIT(); - boost::thread th(boost::bind(&ros::spin)); - AsyncSpinner s(2); - s.start(); ros::waitForShutdown(); } @@ -149,5 +169,3 @@ main(int argc, char** argv) argv_ = argv; return RUN_ALL_TESTS(); } - -