Skip to content

Commit

Permalink
correctly count number of started multi-threaded spinners
Browse files Browse the repository at this point in the history
fixes unittest Spinners.async
  • Loading branch information
rhaschke committed Sep 9, 2016
1 parent e7f8c4b commit 0dc06b2
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 15 deletions.
50 changes: 37 additions & 13 deletions clients/roscpp/src/libros/spinner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,20 @@ namespace {
* If the spinner is multi-threaded, the stored thread-id is NULL and future SingleThreadedSpinners
* should not spin this queue.
*/
struct SpinnerMonitor {
struct SpinnerMonitor
{
/* store spinner information per callback queue:
Only as unique single-threaded spinner is allowed and we store its thread id.
As multiple multi-threaded spinners are allowed in parallel, their number is counted.
*/
struct Entry
{
Entry(const boost::thread::id &tid) : tid(tid) {}

boost::thread::id tid; // thread id of single-threaded spinner
unsigned int num_multi_threaded; // number of multi-threaded spinners
};

/// add a queue to the list
bool add(ros::CallbackQueue* queue, bool single_threaded)
{
Expand All @@ -57,33 +70,43 @@ struct SpinnerMonitor {
if (single_threaded)
tid = boost::this_thread::get_id();

std::map<ros::CallbackQueue*, boost::thread::id>::iterator it = spinning_queues_.find(queue);
bool can_spin = ( it == spinning_queues_.end() || it->second == tid );
std::map<ros::CallbackQueue*, Entry>::iterator it = spinning_queues_.find(queue);
bool can_spin = ( it == spinning_queues_.end() || it->second.tid == tid );
if (!can_spin)
return false;

if (it == spinning_queues_.end())
spinning_queues_.insert(it, std::make_pair(queue, tid));
else if (single_threaded)
it->second = tid; // "upgrade" thread id to "single-threaded"
spinning_queues_.insert(it, std::make_pair(queue, Entry(tid)));

if (!single_threaded) // increment number of multi-threaded spinners
it->second.num_multi_threaded += 1;

return true;
}

/// remove a queue from the list
void remove(ros::CallbackQueue* queue)
void remove(ros::CallbackQueue* queue, bool single_threaded)
{
boost::mutex::scoped_lock lock(mutex_);
std::map<ros::CallbackQueue*, boost::thread::id>::const_iterator it = spinning_queues_.find(queue);
std::map<ros::CallbackQueue*, Entry>::iterator it = spinning_queues_.find(queue);
if (it != spinning_queues_.end())
{
if (it->second != boost::thread::id() && it->second != boost::this_thread::get_id())
if (it->second.tid != boost::thread::id() && it->second.tid != boost::this_thread::get_id())
ROS_ERROR("SpinnerMonitor::remove() called from different thread than add().");
spinning_queues_.erase(it);

if (single_threaded)
spinning_queues_.erase(it);
else
{
ROS_ASSERT(it->second.num_multi_threaded > 0);
it->second.num_multi_threaded -= 1;
if (it->second.num_multi_threaded == 0)
spinning_queues_.erase(it);
}
}
}

std::map<ros::CallbackQueue*, boost::thread::id> spinning_queues_;
std::map<ros::CallbackQueue*, Entry> spinning_queues_;
boost::mutex mutex_;
};

Expand Down Expand Up @@ -116,7 +139,7 @@ void SingleThreadedSpinner::spin(CallbackQueue* queue)
{
queue->callAvailable(timeout);
}
spinner_monitor.remove(queue);
spinner_monitor.remove(queue, true);
}

MultiThreadedSpinner::MultiThreadedSpinner(uint32_t thread_count)
Expand All @@ -130,6 +153,7 @@ void MultiThreadedSpinner::spin(CallbackQueue* queue)
s.start();

ros::waitForShutdown();
s.stop();
}

class AsyncSpinnerImpl
Expand Down Expand Up @@ -217,7 +241,7 @@ void AsyncSpinnerImpl::stop()
continue_ = false;
threads_.join_all();

spinner_monitor.remove(callback_queue_);
spinner_monitor.remove(callback_queue_, false);
}

void AsyncSpinnerImpl::threadFunc()
Expand Down
2 changes: 0 additions & 2 deletions test/test_roscpp/test/src/spinners.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,10 @@ TEST(Spinners, async)
AsyncSpinner as1(2);
as1.start();

#if 0
// running another AsyncSpinner is allowed
AsyncSpinner as2(2);
as2.start();
as2.stop();
#endif

SingleThreadedSpinner ss;
EXPECT_THROW(ros::spin(ss), std::runtime_error);
Expand Down

0 comments on commit 0dc06b2

Please sign in to comment.