Skip to content

Commit

Permalink
* Fix a busy-wait loop in subscription queue.
Browse files Browse the repository at this point in the history
* Better fix for slow callbacks CPU throttling.

* More versatile callback queue test.

* Finished cherry-pick merge to melodic-devel.

* libros: moved define

* roscpp: implementet ros#1608 without ABI/API breaks

* /test_roscpp: fake_message is in a header now...

* test_roscpp: fixed sign-compare warning

* stabilized test

* CallbackQueue: use SteadyTime instead of WallTime to get independent of system-time changes

* style only

* Update clients/roscpp/include/ros/callback_queue.h

Co-authored-by: Johannes Meyer <johannes@intermodalics.eu>

Co-authored-by: Martin Pecka <peci1@seznam.cz>
Co-authored-by: CTU base <robot@ctu-base>
Co-authored-by: Martin Pecka <peckama2@fel.cvut.cz>
Co-authored-by: Christopher Wecht <christopher.wechtstudent.kit.edu>
Co-authored-by: Dirk Thomas <dirk-thomas@users.noreply.github.com>
Co-authored-by: Johannes Meyer <johannes@intermodalics.eu>
  • Loading branch information
6 people committed Oct 7, 2024
1 parent 8250c7d commit a60cca0
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 68 deletions.
22 changes: 21 additions & 1 deletion clients/roscpp/src/libros/callback_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ void CallbackQueue::addCallback(const CallbackInterfacePtr& callback, uint64_t r
callbacks_.push_back(info);
}

condition_.notify_one();
if (callback->ready())
{
condition_.notify_one();
}
}

CallbackQueue::IDInfoPtr CallbackQueue::getIDInfo(uint64_t id)
Expand Down Expand Up @@ -242,6 +245,8 @@ CallbackQueue::CallOneResult CallbackQueue::callOne(ros::WallDuration timeout)
return Disabled;
}

ros::SteadyTime start_time(ros::SteadyTime::now());

if (callbacks_.empty())
{
if (!timeout.isZero())
Expand Down Expand Up @@ -283,6 +288,17 @@ CallbackQueue::CallOneResult CallbackQueue::callOne(ros::WallDuration timeout)

if (!cb_info.callback)
{
// do not spend more than `timeout` seconds in the callback; we already waited for some time when waiting for
// nonempty queue
ros::SteadyTime now(ros::SteadyTime::now());
ros::WallDuration time_spent = now - start_time;
ros::WallDuration time_to_wait = timeout - time_spent;

if (time_to_wait.toNSec() > 0)
{
condition_.wait_for(lock, boost::chrono::nanoseconds(time_to_wait.toNSec()));
}

return TryAgain;
}

Expand Down Expand Up @@ -408,6 +424,10 @@ CallbackQueue::CallOneResult CallbackQueue::callOneCB(TLS* tls)
{
tls->cb_it = tls->callbacks.erase(tls->cb_it);
result = cb->call();
if (result == CallbackInterface::Success)
{
condition_.notify_one();
}
}
}

Expand Down
7 changes: 6 additions & 1 deletion clients/roscpp/src/libros/subscription_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,12 @@ CallbackInterface::CallResult SubscriptionQueue::call()

bool SubscriptionQueue::ready()
{
return true;
if (allow_concurrent_callbacks_)
{
return true;
}
boost::recursive_mutex::scoped_try_lock lock(callback_mutex_, boost::try_to_lock);
return lock.owns_lock();
}

bool SubscriptionQueue::full()
Expand Down
91 changes: 91 additions & 0 deletions test/test_roscpp/test/fake_message.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2009, Willow Garage, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Willow Garage, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

/* Author: Josh Faust */

/*
* Subscription queue test helper classes
*/
#ifndef TEST_ROSCPP_FAKE_MESSAGE_H
#define TEST_ROSCPP_FAKE_MESSAGE_H

#include "ros/subscription_callback_helper.h"

class FakeMessage
{
public:
virtual const std::string __getDataType() const { return ""; }
virtual const std::string __getMD5Sum() const { return ""; }
virtual const std::string __getMessageDefinition() const { return ""; }
virtual uint32_t serializationLength() const { return 0; }
virtual uint8_t *serialize(uint8_t *write_ptr, uint32_t seq) const { (void)seq; return write_ptr; }
virtual uint8_t *deserialize(uint8_t *read_ptr) { return read_ptr; }
};

class FakeSubHelper : public ros::SubscriptionCallbackHelper
{
public:
FakeSubHelper()
: calls_(0)
{}

virtual ros::VoidConstPtr deserialize(const ros::SubscriptionCallbackHelperDeserializeParams&)
{
return boost::make_shared<FakeMessage>();
}

virtual std::string getMD5Sum() { return ""; }
virtual std::string getDataType() { return ""; }

virtual void call(ros::SubscriptionCallbackHelperCallParams& params)
{
(void)params;
{
boost::mutex::scoped_lock lock(mutex_);
++calls_;
}

if (cb_)
{
cb_();
}
}

virtual const std::type_info& getTypeInfo() { return typeid(FakeMessage); }
virtual bool isConst() { return true; }
virtual bool hasHeader() { return false; }

boost::mutex mutex_;
uint32_t calls_;

boost::function<void(void)> cb_;
};
typedef boost::shared_ptr<FakeSubHelper> FakeSubHelperPtr;

#endif // TEST_ROSCPP_FAKE_MESSAGE_H
Loading

0 comments on commit a60cca0

Please sign in to comment.