Skip to content

Commit

Permalink
fix multi-threaded spinning (#867)
Browse files Browse the repository at this point in the history
* replace global spinmutex with SpinnerMonitor

Using a single recursive mutex disables to run several spinners in
parallel (started from different threads) - even if they operate on
different callback queues.

The SpinnerMonitor keeps a list of spinning callback queues, thus
making the monitoring local to callback queues.

* always start spinning, regardless of error

* making the error fatal

* fixup! replace global spinmutex with SpinnerMonitor

* activate unittest for spinners

- moved test/test_roscpp/test/test_spinners.cpp -> test/test_roscpp/test/src/spinners.cpp
- created rostest test/test_roscpp/test/launch/spinners.xml
- use thrown exception to evaluate error conditions

* throw a std::run_time exception when spinner couldn't be started

This correctly indicates the fatality of the error but allows for graceful quitting too.

* correctly count number of started multi-threaded spinners

fixes unittest Spinners.async

* addressed comments

* bug fix: it wasn't reassigned correctly

* removed redundant line of code

* turn sanity check into assertion

* turn error into warning

* rollback strict error checking for backwards compatibility

Previously, we could have the following situations:

1. `S1` (single-threaded spinner started in `thread 1`): will block `thread 1` until shutdown. Any further spinners in different threads were not allowed (with an error message).
2. `M1` (multi-threaded spinner started in `thread 1`): Further spinners started from _different_ threads were not allowed (with an error message).
3. `M1 ... M1 S1` (multi-threaded spinners started in `thread 1` and afterwards a single-threaded one started): This was accepted without any errors. But the new behavior is to reject `S1`!

Restrictions of case 1 + 2 are relaxed with this PR: Other spinners are allowed as long as the operate on a different queue. Thread doesn't matter.

The tricky part is case 3, which - although nonsense - was perfectly valid code before.
In order to maintain the old behavior, I need to remember, which thread the first M-spinner was started in, using the new variable `initial_tid`.

* allow spinning of a single-threaded spinner after some multi-threaded ones, as long as they are started from the same thread
* don't throw exceptions
* disabled corresponding unittests

* code simplification + improved comments

Allow multiple single-threaded spinners (in same thread)
and count their number.
Thus single-threaded and multi-threaded spinners are handled similarly.
  • Loading branch information
rhaschke authored and dirk-thomas committed Sep 17, 2016
1 parent 13008b9 commit cd255f8
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 95 deletions.
8 changes: 6 additions & 2 deletions clients/roscpp/include/ros/spinner.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,13 @@ class ROSCPP_DECL AsyncSpinner


/**
* \brief Check if the spinner can be started. A spinner can't be started if
* another spinner is already running.
* \brief Check if the spinner can be started. The spinner shouldn't be started if
* another single-threaded spinner is already operating on the callback queue.
*
* This function is not necessary anymore. start() will always try to start spinning
* and throw a std::runtime_error if it failed.
*/
// TODO: deprecate in L-turtle
bool canStart();
/**
* \brief Start this spinner spinning asynchronously
Expand Down
154 changes: 118 additions & 36 deletions clients/roscpp/src/libros/spinner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,111 @@
#include "ros/callback_queue.h"

#include <boost/thread/thread.hpp>
#include <boost/thread/recursive_mutex.hpp>
#include <boost/thread/mutex.hpp>

namespace {
boost::recursive_mutex spinmutex;

const std::string DEFAULT_ERROR_MESSAGE =
"\nAttempt to spin a callback queue from two spinners, one of them being single-threaded."
"\nThis will probably result in callbacks being executed out-of-order."
"\nIn future this will throw an exception!";

/** class to monitor running single-threaded spinners.
*
* Calling the callbacks of a callback queue _in order_, requires a unique SingleThreadedSpinner
* spinning on the queue. Other threads accessing the callback queue will probably intercept execution order.
* To avoid multiple SingleThreadedSpinners (started from different threads) to operate on the same callback queue,
* this class stores a map of all spinned callback queues.
* If the spinner is single threaded, the corresponding thread-id is stored in the map
* and if other threads will try to spin the same queue, an error message is issued.
*
* If the spinner is multi-threaded, the stored thread-id is NULL and future SingleThreadedSpinners
* should not spin this queue. However, other multi-threaded spinners are allowed.
*/
struct SpinnerMonitor
{
/* store spinner information per callback queue:
Only alike spinners (single-threaded or multi-threaded) are allowed on a callback queue.
For single-threaded spinners we store their thread id.
We store the number of alike spinners operating on the callback queue.
*/
struct Entry
{
Entry(const boost::thread::id &tid,
const boost::thread::id &initial_tid) : tid(tid), initial_tid(initial_tid), num(0) {}

boost::thread::id tid; // proper thread id of single-threaded spinner
boost::thread::id initial_tid; // to retain old behaviour, store first spinner's thread id
unsigned int num; // number of (alike) spinners serving this queue
};

/// add a queue to the list
bool add(ros::CallbackQueue* queue, bool single_threaded)
{
boost::mutex::scoped_lock lock(mutex_);

boost::thread::id current_tid = boost::this_thread::get_id();
boost::thread::id tid; // current thread id for single-threaded spinners, zero for multi-threaded ones
if (single_threaded)
tid = current_tid;

std::map<ros::CallbackQueue*, Entry>::iterator it = spinning_queues_.find(queue);
bool can_spin = ( it == spinning_queues_.end() || // we will spin on any new queue
it->second.tid == tid ); // otherwise spinner must be alike (all multi-threaded: 0, or single-threaded on same thread id)

if (!can_spin)
{
// Previous behavior (up to Kinetic) was to accept multiple spinners on a queue
// as long as they were started from the same thread. Although this is wrong behavior,
// we retain it here for backwards compatibility, i.e. we allow spinning of a
// single-threaded spinner after several multi-threaded ones, given that they
// were started from the same initial thread
if (it->second.initial_tid == tid)
{
ROS_ERROR_STREAM("SpinnerMonitor: single-threaded spinner after multi-threaded one(s)."
<< DEFAULT_ERROR_MESSAGE
<< " Only allowed for backwards compatibility.");
it->second.tid = tid; // "upgrade" tid to represent single-threaded spinner
}
else
return false;
}

if (it == spinning_queues_.end())
it = spinning_queues_.insert(it, std::make_pair(queue, Entry(tid, current_tid)));

// increment number of active spinners
it->second.num += 1;

return true;
}

/// remove a queue from the list
void remove(ros::CallbackQueue* queue)
{
boost::mutex::scoped_lock lock(mutex_);
std::map<ros::CallbackQueue*, Entry>::iterator it = spinning_queues_.find(queue);
ROS_ASSERT_MSG(it != spinning_queues_.end(), "Call to SpinnerMonitor::remove() without matching call to add().");

if (it->second.tid != boost::thread::id() && it->second.tid != boost::this_thread::get_id())
{
// This doesn't harm, but isn't good practice?
// It was enforced by the previous implementation.
ROS_WARN("SpinnerMonitor::remove() called from different thread than add().");
}

ROS_ASSERT_MSG(it->second.num > 0, "SpinnerMonitor::remove(): Invalid spinner count (0) encountered.");
it->second.num -= 1;
if (it->second.num == 0)
spinning_queues_.erase(it); // erase queue entry to allow future queues with same pointer
}

std::map<ros::CallbackQueue*, Entry> spinning_queues_;
boost::mutex mutex_;
};

SpinnerMonitor spinner_monitor;
}

namespace ros
Expand All @@ -42,25 +143,24 @@ namespace ros

void SingleThreadedSpinner::spin(CallbackQueue* queue)
{
boost::recursive_mutex::scoped_try_lock spinlock(spinmutex);
if(!spinlock.owns_lock()) {
ROS_ERROR("SingleThreadedSpinner: You've attempted to call spin "
"from multiple threads. Use a MultiThreadedSpinner instead.");
return;
}

ros::WallDuration timeout(0.1f);

if (!queue)
{
queue = getGlobalCallbackQueue();
}

if (!spinner_monitor.add(queue, true))
{
ROS_ERROR_STREAM("SingleThreadedSpinner: " << DEFAULT_ERROR_MESSAGE);
return;
}

ros::WallDuration timeout(0.1f);
ros::NodeHandle n;
while (n.ok())
{
queue->callAvailable(timeout);
}
spinner_monitor.remove(queue);
}

MultiThreadedSpinner::MultiThreadedSpinner(uint32_t thread_count)
Expand All @@ -70,13 +170,6 @@ MultiThreadedSpinner::MultiThreadedSpinner(uint32_t thread_count)

void MultiThreadedSpinner::spin(CallbackQueue* queue)
{
boost::recursive_mutex::scoped_try_lock spinlock(spinmutex);
if (!spinlock.owns_lock()) {
ROS_ERROR("MultiThreadeSpinner: You've attempted to call ros::spin "
"from multiple threads... "
"but this spinner is already multithreaded.");
return;
}
AsyncSpinner s(thread_count_, queue);
s.start();

Expand All @@ -97,7 +190,6 @@ class AsyncSpinnerImpl
void threadFunc();

boost::mutex mutex_;
boost::recursive_mutex::scoped_try_lock member_spinlock;
boost::thread_group threads_;

uint32_t thread_count_;
Expand Down Expand Up @@ -136,27 +228,21 @@ AsyncSpinnerImpl::~AsyncSpinnerImpl()

bool AsyncSpinnerImpl::canStart()
{
boost::recursive_mutex::scoped_try_lock spinlock(spinmutex);
return spinlock.owns_lock();
return true;
}

void AsyncSpinnerImpl::start()
{
boost::mutex::scoped_lock lock(mutex_);

if (continue_)
return;
return; // already spinning

boost::recursive_mutex::scoped_try_lock spinlock(spinmutex);
if (!spinlock.owns_lock()) {
ROS_WARN("AsyncSpinnerImpl: Attempt to start() an AsyncSpinner failed "
"because another AsyncSpinner is already running. Note that the "
"other AsyncSpinner might not be using the same callback queue "
"as this AsyncSpinner, in which case no callbacks in your "
"callback queue will be serviced.");
if (!spinner_monitor.add(callback_queue_, false))
{
ROS_ERROR_STREAM("AsyncSpinnerImpl: " << DEFAULT_ERROR_MESSAGE);
return;
}
spinlock.swap(member_spinlock);

continue_ = true;

Expand All @@ -172,14 +258,10 @@ void AsyncSpinnerImpl::stop()
if (!continue_)
return;

ROS_ASSERT_MSG(member_spinlock.owns_lock(),
"Async spinner's member lock doesn't own the global spinlock, hrm.");
ROS_ASSERT_MSG(member_spinlock.mutex() == &spinmutex,
"Async spinner's member lock owns a lock on the wrong mutex?!?!?");
member_spinlock.unlock();

continue_ = false;
threads_.join_all();

spinner_monitor.remove(callback_queue_);
}

void AsyncSpinnerImpl::threadFunc()
Expand Down
18 changes: 3 additions & 15 deletions test/test_roscpp/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,6 @@ if(TARGET ${PROJECT_NAME}-test_version)
target_link_libraries(${PROJECT_NAME}-test_version)
endif()

# WARNING test_spinners is not actually run. This is because this
# infrastructure won't let me pass arguments from here through to
# these test units. otherwise one would have to build ten executables.
# also the output of test_spinners has to be visually inspected
# because there is no automatic way to ensure that an error condition
# provokes a particular message.
if(GTEST_FOUND)
include_directories(${GTEST_INCLUDE_DIRS})
add_executable(${PROJECT_NAME}-test_spinners EXCLUDE_FROM_ALL test_spinners.cpp)
add_dependencies(tests ${PROJECT_NAME}-test_spinners)
endif()
if(TARGET ${PROJECT_NAME}-test_spinners)
target_link_libraries(${PROJECT_NAME}-test_spinners ${catkin_LIBRARIES} ${GTEST_LIBRARIES})
endif()

catkin_add_gtest(${PROJECT_NAME}-test_header test_header.cpp)
if(TARGET ${PROJECT_NAME}-test_header)
target_link_libraries(${PROJECT_NAME}-test_header ${catkin_LIBRARIES})
Expand Down Expand Up @@ -161,3 +146,6 @@ add_rostest(launch/ns_node_remapping.xml)
add_rostest(launch/search_param.xml)

add_rostest(launch/stamped_topic_statistics_with_empty_timestamp.xml)

# Test spinners
add_rostest(launch/spinners.xml)
4 changes: 4 additions & 0 deletions test/test_roscpp/test/launch/spinners.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
<launch>
<test pkg="test_roscpp" type="test_roscpp-spinners" test-name="Spinners_spin" args="--gtest_filter=Spinners.spin"/>
<test pkg="test_roscpp" type="test_roscpp-spinners" test-name="Spinners_multi" args="--gtest_filter=Spinners.multi"/>
</launch>
6 changes: 6 additions & 0 deletions test/test_roscpp/test/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ target_link_libraries(${PROJECT_NAME}-namespaces ${GTEST_LIBRARIES} ${catkin_LIB
add_executable(${PROJECT_NAME}-params EXCLUDE_FROM_ALL params.cpp)
target_link_libraries(${PROJECT_NAME}-params ${GTEST_LIBRARIES} ${catkin_LIBRARIES})

# Test spinners
add_executable(${PROJECT_NAME}-spinners EXCLUDE_FROM_ALL spinners.cpp)
target_link_libraries(${PROJECT_NAME}-spinners ${GTEST_LIBRARIES} ${catkin_LIBRARIES})

# Test getting information from the master
add_executable(${PROJECT_NAME}-get_master_information EXCLUDE_FROM_ALL get_master_information.cpp)
target_link_libraries(${PROJECT_NAME}-get_master_information ${GTEST_LIBRARIES} ${catkin_LIBRARIES})
Expand Down Expand Up @@ -251,6 +255,7 @@ if(TARGET tests)
${PROJECT_NAME}-name_remapping_with_ns
${PROJECT_NAME}-namespaces
${PROJECT_NAME}-params
${PROJECT_NAME}-spinners
${PROJECT_NAME}-get_master_information
${PROJECT_NAME}-multiple_subscriptions
${PROJECT_NAME}-check_master
Expand Down Expand Up @@ -316,6 +321,7 @@ add_dependencies(${PROJECT_NAME}-name_remapping ${${PROJECT_NAME}_EXPORTED_TARGE
add_dependencies(${PROJECT_NAME}-name_remapping_with_ns ${${PROJECT_NAME}_EXPORTED_TARGETS})
add_dependencies(${PROJECT_NAME}-namespaces ${${PROJECT_NAME}_EXPORTED_TARGETS})
add_dependencies(${PROJECT_NAME}-params ${${PROJECT_NAME}_EXPORTED_TARGETS})
add_dependencies(${PROJECT_NAME}-spinners ${${PROJECT_NAME}_EXPORTED_TARGETS})
add_dependencies(${PROJECT_NAME}-get_master_information ${${PROJECT_NAME}_EXPORTED_TARGETS})
add_dependencies(${PROJECT_NAME}-multiple_subscriptions ${${PROJECT_NAME}_EXPORTED_TARGETS})
add_dependencies(${PROJECT_NAME}-check_master ${${PROJECT_NAME}_EXPORTED_TARGETS})
Expand Down
Loading

0 comments on commit cd255f8

Please sign in to comment.