Skip to content

Commit

Permalink
#1608 revisited: Fix subscription busy wait melodic (#1684)
Browse files Browse the repository at this point in the history
* Fix a busy-wait loop in subscription queue.

* Better fix for slow callbacks CPU throttling.

* More versatile callback queue test.

* Finished cherry-pick merge to melodic-devel.

* libros: moved define

* roscpp: implementet #1608 without ABI/API breaks

* /test_roscpp: fake_message is in a header now...

* test_roscpp: fixed sign-compare warning

* stabilized test

* CallbackQueue: use SteadyTime instead of WallTime to get independent of system-time changes

* style only

* Update clients/roscpp/include/ros/callback_queue.h

Co-authored-by: Johannes Meyer <johannes@intermodalics.eu>

Co-authored-by: Martin Pecka <peci1@seznam.cz>
Co-authored-by: CTU base <robot@ctu-base>
Co-authored-by: Martin Pecka <peckama2@fel.cvut.cz>
Co-authored-by: Christopher Wecht <christopher.wechtstudent.kit.edu>
Co-authored-by: Dirk Thomas <dirk-thomas@users.noreply.github.com>
Co-authored-by: Johannes Meyer <johannes@intermodalics.eu>
  • Loading branch information
6 people authored Jul 31, 2020
1 parent f781def commit 3bca6e3
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 67 deletions.
22 changes: 21 additions & 1 deletion clients/roscpp/src/libros/callback_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ void CallbackQueue::addCallback(const CallbackInterfacePtr& callback, uint64_t r
callbacks_.push_back(info);
}

condition_.notify_one();
if (callback->ready())
{
condition_.notify_one();
}
}

CallbackQueue::IDInfoPtr CallbackQueue::getIDInfo(uint64_t id)
Expand Down Expand Up @@ -225,6 +228,8 @@ CallbackQueue::CallOneResult CallbackQueue::callOne(ros::WallDuration timeout)
return Disabled;
}

ros::SteadyTime start_time(ros::SteadyTime::now());

if (callbacks_.empty())
{
if (!timeout.isZero())
Expand Down Expand Up @@ -266,6 +271,17 @@ CallbackQueue::CallOneResult CallbackQueue::callOne(ros::WallDuration timeout)

if (!cb_info.callback)
{
// do not spend more than `timeout` seconds in the callback; we already waited for some time when waiting for
// nonempty queue
ros::SteadyTime now(ros::SteadyTime::now());
ros::WallDuration time_spent = now - start_time;
ros::WallDuration time_to_wait = timeout - time_spent;

if (time_to_wait.toNSec() > 0)
{
condition_.wait_for(lock, boost::chrono::nanoseconds(time_to_wait.toNSec()));
}

return TryAgain;
}

Expand Down Expand Up @@ -391,6 +407,10 @@ CallbackQueue::CallOneResult CallbackQueue::callOneCB(TLS* tls)
{
tls->cb_it = tls->callbacks.erase(tls->cb_it);
result = cb->call();
if (result == CallbackInterface::Success)
{
condition_.notify_one();
}
}
}

Expand Down
7 changes: 6 additions & 1 deletion clients/roscpp/src/libros/subscription_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,12 @@ CallbackInterface::CallResult SubscriptionQueue::call()

bool SubscriptionQueue::ready()
{
return true;
if (allow_concurrent_callbacks_)
{
return true;
}
boost::recursive_mutex::scoped_try_lock lock(callback_mutex_, boost::try_to_lock);
return lock.owns_lock();
}

bool SubscriptionQueue::full()
Expand Down
91 changes: 91 additions & 0 deletions test/test_roscpp/test/fake_message.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2009, Willow Garage, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Willow Garage, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

/* Author: Josh Faust */

/*
* Subscription queue test helper classes
*/
#ifndef TEST_ROSCPP_FAKE_MESSAGE_H
#define TEST_ROSCPP_FAKE_MESSAGE_H

#include "ros/subscription_callback_helper.h"

class FakeMessage
{
public:
virtual const std::string __getDataType() const { return ""; }
virtual const std::string __getMD5Sum() const { return ""; }
virtual const std::string __getMessageDefinition() const { return ""; }
virtual uint32_t serializationLength() const { return 0; }
virtual uint8_t *serialize(uint8_t *write_ptr, uint32_t seq) const { (void)seq; return write_ptr; }
virtual uint8_t *deserialize(uint8_t *read_ptr) { return read_ptr; }
};

class FakeSubHelper : public ros::SubscriptionCallbackHelper
{
public:
FakeSubHelper()
: calls_(0)
{}

virtual ros::VoidConstPtr deserialize(const ros::SubscriptionCallbackHelperDeserializeParams&)
{
return boost::make_shared<FakeMessage>();
}

virtual std::string getMD5Sum() { return ""; }
virtual std::string getDataType() { return ""; }

virtual void call(ros::SubscriptionCallbackHelperCallParams& params)
{
(void)params;
{
boost::mutex::scoped_lock lock(mutex_);
++calls_;
}

if (cb_)
{
cb_();
}
}

virtual const std::type_info& getTypeInfo() { return typeid(FakeMessage); }
virtual bool isConst() { return true; }
virtual bool hasHeader() { return false; }

boost::mutex mutex_;
uint32_t calls_;

boost::function<void(void)> cb_;
};
typedef boost::shared_ptr<FakeSubHelper> FakeSubHelperPtr;

#endif // TEST_ROSCPP_FAKE_MESSAGE_H
162 changes: 153 additions & 9 deletions test/test_roscpp/test/test_callback_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
#include <gtest/gtest.h>
#include <ros/callback_queue.h>
#include <ros/console.h>
#include <ros/message_deserializer.h>
#include <ros/subscription_queue.h>
#include <ros/subscription_callback_helper.h>
#include <ros/timer.h>

#include <boost/atomic.hpp>
Expand All @@ -44,6 +47,8 @@
#include <boost/thread.hpp>
#include <boost/function.hpp>

#include "fake_message.h"

using namespace ros;

class CountingCallback : public CallbackInterface
Expand Down Expand Up @@ -258,31 +263,51 @@ TEST(CallbackQueue, recursive4)
EXPECT_EQ(cb->count, 3U);
}

void callAvailableThread(CallbackQueue* queue, bool& done)
void callAvailableThread(
CallbackQueue* queue, bool& done, boost::atomic<size_t>* num_calls,
ros::WallDuration call_timeout = ros::WallDuration(0.1))
{
size_t i = 0;
while (!done)
{
queue->callAvailable(ros::WallDuration(0.1));
queue->callAvailable(call_timeout);
++i;
}

if (num_calls)
{
num_calls->fetch_add(i);
}
}

size_t runThreadedTest(const CountingCallbackPtr& cb, const boost::function<void(CallbackQueue*, bool&)>& threadFunc)
size_t runThreadedTest(
const CallbackInterfacePtr& cb,
const boost::function<void(CallbackQueue*, bool&, boost::atomic<size_t>*, ros::WallDuration)>& threadFunc,
size_t* num_calls = NULL, size_t num_threads = 10,
ros::WallDuration duration = ros::WallDuration(5),
ros::WallDuration pause_between_callbacks = ros::WallDuration(0),
ros::WallDuration call_one_timeout = ros::WallDuration(0.1))
{
CallbackQueue queue;
boost::thread_group tg;
bool done = false;
boost::atomic<size_t> calls(0);

for (uint32_t i = 0; i < 10; ++i)
for (uint32_t i = 0; i < num_threads; ++i)
{
tg.create_thread(boost::bind(threadFunc, &queue, boost::ref(done)));
tg.create_thread(boost::bind(threadFunc, &queue, boost::ref(done), &calls, call_one_timeout));
}

ros::WallTime start = ros::WallTime::now();
size_t i = 0;
while (ros::WallTime::now() - start < ros::WallDuration(5))
while (ros::WallTime::now() - start < duration)
{
queue.addCallback(cb);
++i;
if (!pause_between_callbacks.isZero())
{
pause_between_callbacks.sleep();
}
}

while (!queue.isEmpty())
Expand All @@ -293,6 +318,9 @@ size_t runThreadedTest(const CountingCallbackPtr& cb, const boost::function<void
done = true;
tg.join_all();

if (num_calls)
*num_calls = calls;

return i;
}

Expand All @@ -304,11 +332,20 @@ TEST(CallbackQueue, threadedCallAvailable)
EXPECT_EQ(cb->count, i);
}

void callOneThread(CallbackQueue* queue, bool& done)
void callOneThread(
CallbackQueue* queue, bool& done, boost::atomic<size_t>* num_calls,
ros::WallDuration timeout = ros::WallDuration(0.1))
{
size_t i = 0;
while (!done)
{
queue->callOne(ros::WallDuration(0.1));
queue->callOne(timeout);
++i;
}

if (num_calls)
{
num_calls->fetch_add(i);
}
}

Expand All @@ -320,6 +357,112 @@ TEST(CallbackQueue, threadedCallOne)
EXPECT_EQ(cb->count, i);
}

class CountingSubscriptionQueue : public SubscriptionQueue
{
public:
CountingSubscriptionQueue(
const std::string& topic, int32_t queue_size,
bool allow_concurrent_callbacks)
: SubscriptionQueue(topic, queue_size, allow_concurrent_callbacks),
ready_count(0)
{}

virtual bool ready()
{
ready_count.fetch_add(1);
return SubscriptionQueue::ready();
}

boost::atomic<size_t> ready_count;
};
typedef boost::shared_ptr<CountingSubscriptionQueue> CountingSubscriptionQueuePtr;

struct ThreadedCallOneSlowParams {
ros::WallDuration callback_duration; // long-lasting callback
size_t num_threads;
ros::WallDuration call_one_timeout;
ros::WallDuration test_duration;
ros::WallDuration pause_between_callbacks;
};

class CallbackQueueParamTest : public ::testing::TestWithParam<ThreadedCallOneSlowParams>
{};

TEST_P(CallbackQueueParamTest, threadedCallOneSlow)
{
// test for https://github.com/ros/ros_comm/issues/1545
// "roscpp multithreaded spinners eat up CPU when callbacks take too long"

const ThreadedCallOneSlowParams param = GetParam();
const WallDuration& callback_duration = param.callback_duration; // long-lasting callback
const size_t num_threads = param.num_threads;
const ros::WallDuration call_one_timeout = param.call_one_timeout;
const ros::WallDuration test_duration = param.test_duration;
const ros::WallDuration pause_between_callbacks = param.pause_between_callbacks;
// queue_size is chosen such that it is larger than the max number of callbacks we
// are really able to process in 5 secs (since allow_concurrent_callbacks is false,
// it is equal to the number of seconds the queue is running)
const size_t queue_size = static_cast<size_t>(test_duration.toSec()) + 1;

// create a subscription queue counting the number of ready() calls
const CountingSubscriptionQueuePtr cb(
boost::make_shared<CountingSubscriptionQueue>("test", queue_size, false));

// create a slow subscription callback (takes 1 second to complete)
const FakeSubHelperPtr helper(boost::make_shared<FakeSubHelper>());
helper->cb_ = boost::bind(&ros::WallDuration::sleep, callback_duration);
const MessageDeserializerPtr des(boost::make_shared<MessageDeserializer>(
helper, SerializedMessage(), boost::shared_ptr<M_string>()));

// fill the subscription queue to get max performance
for (size_t i = 0; i < queue_size; ++i)
{
cb->push(helper, des, false, VoidConstWPtr(), true);
}

// keep filling the callback queue at maximum speed for 5 seconds and
// spin up 10 processing threads until the queue is empty
size_t num_call_one_calls = 0;
const size_t num_callbacks_to_call = runThreadedTest(
cb, callOneThread, &num_call_one_calls,
num_threads, test_duration, pause_between_callbacks, call_one_timeout);

const uint32_t num_callbacks_called = helper->calls_;
const size_t num_ready_called = cb->ready_count;

// what should happen: even though we have multiple processing threads,
// the subscription queue has a per-topic lock which prevents multiple threads from
// processing the topic's callbacks simultaneously; so even though there were
// tens of thousands of callbacks in the callback queue, we only got time to process
// less than 5 of them because queue_size is quite limited (3), so most callbacks
// get thrown away during processing of the slow callbacks

// we test the number of SubscriptionQueue::ready() calls to see how often do the
// idle threads ask for more work; with bug 1545 unfixed, this gets to millions of
// calls which acts as a busy-wait; with the bug fixed, the number should not be
// higher than number of callbacks (~ 80k), since each newly added callback should
// wake one idle thread and let it ask for work

ROS_INFO_STREAM("Callback queue processed " <<
num_callbacks_called << " callbacks out of " << num_callbacks_to_call);
ROS_INFO_STREAM("callOne() was called " << num_call_one_calls << " times.");
ROS_INFO_STREAM("ready() was called " << num_ready_called << " times.");

EXPECT_EQ(num_callbacks_called, queue_size);
EXPECT_LE(num_call_one_calls, 2 * num_callbacks_to_call + num_threads * (1/call_one_timeout.toSec()) * queue_size);
}

INSTANTIATE_TEST_CASE_P(slow, CallbackQueueParamTest, ::testing::Values(
//ThreadedCallOneSlowParams{callback_duration, num_threads, call_one_timeout, test_duration, pause_between_callbacks}
ThreadedCallOneSlowParams{ros::WallDuration(1.0), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)},
ThreadedCallOneSlowParams{ros::WallDuration(1.0), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0.1)},
ThreadedCallOneSlowParams{ros::WallDuration(1.0), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0.001)},
ThreadedCallOneSlowParams{ros::WallDuration(0.1), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)},
ThreadedCallOneSlowParams{ros::WallDuration(0.001), 10, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)},
ThreadedCallOneSlowParams{ros::WallDuration(1.0), 1, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)},
ThreadedCallOneSlowParams{ros::WallDuration(1.0), 2, ros::WallDuration(0.1), ros::WallDuration(2), ros::WallDuration(0)}
));

// this class is just an ugly hack
// to access the constructor Timer(TimerOptions)
namespace ros
Expand Down Expand Up @@ -384,10 +527,11 @@ TEST(CallbackQueue, recursiveTimer)

boost::thread_group tg;
bool done = false;
boost::atomic<size_t> calls(0);

for (uint32_t i = 0; i < 2; ++i)
{
tg.create_thread(boost::bind(callOneThread, &queue, boost::ref(done)));
tg.create_thread(boost::bind(callOneThread, &queue, boost::ref(done), &calls, ros::WallDuration(0.1)));
}

while (!queue.isEmpty())
Expand Down
Loading

0 comments on commit 3bca6e3

Please sign in to comment.