Skip to content
This repository was archived by the owner on Feb 3, 2025. It is now read-only.

Replace use of tbb::task with oneapi::tbb::task_group, where available #3146

Closed
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
Alex Dewar <alex.dewar@gmx.co.uk>
Nate Koenig <nkoenig@osrfoundation.org>
John Hsu <hsu@osrfoundation.org>
18 changes: 17 additions & 1 deletion cmake/SearchForStuff.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ if (PKG_CONFIG_FOUND)

#################################################
# Find TBB
pkg_check_modules(TBB tbb<2021)
pkg_check_modules(TBB tbb)
set (TBB_PKG_CONFIG "tbb")
if (NOT TBB_FOUND)
message(STATUS "TBB not found, attempting to detect manually")
Expand All @@ -326,6 +326,22 @@ if (PKG_CONFIG_FOUND)
endif (NOT TBB_FOUND)
endif (NOT TBB_FOUND)

# tbb::task was removed in tbb v2021.01, so in this case we must move to the
# newer tbb::task_group API. This could also be used with earlier versions of
# TBB, but we want to maintain ABI compatibility for Ubuntu 18.04 etc., so
# disable this by default.
if (NOT DEFINED USE_LEGACY_TBB_TASK)
if (${TBB_VERSION} VERSION_LESS 2021)
set (USE_LEGACY_TBB_TASK ON)
else (${TBB_VERSION} VERSION_LESS 2021)
set (USE_LEGACY_TBB_TASK OFF)
endif(${TBB_VERSION} VERSION_LESS 2021)
endif()
set(USE_LEGACY_TBB_TASK ${USE_LEGACY_TBB_TASK} CACHE BOOL "Whether to use the deprecated tbb::task class")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick, feel free to ignore: in my experience cache variable vs normal variables is one aspect of CMake that is most confusing for novice users. To improve readability, it may be worth to use a different name for the normal variable name, like USE_LEGACY_TASK_DEFAULT_VALUE.

if (USE_LEGACY_TBB_TASK)
add_definitions(-DUSE_LEGACY_TBB_TASK)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This define will be used in public headers, hence it will need to be define also in downstream compilation units. For these reason, it could make sense to use a gazebo-specific name here (like GAZEBO_USE_LEGACY_TBB_TASK), to avoid confusion and the risk of collisions.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This definition needs to be propagated to downstream compilation units as it is used in public headers, but unfortunately gazebo does not use modern style CMake targets so there is no straightforward way to do so. I can check if there is any facility currently use to expose definitions to downstream projects, like a GAZEBO_DEFINITIONS CMake variable.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the docs for building downstream projects that use Gazebo (see http://gazebosim.org/tutorials?tut=plugins_hello_world) and there is no GAZEBO_DEFINITIONS or similar. We could try to encode this definition in Gazebo's GAZEBO_CXX_FLAGS, but this is not straightforward. Thinking about it, a possible alternative solution is to remove all the CMake logic related to USE_LEGACY_TBB_TASK, and just define this macro at the C++ preprocessor level, depending on the tbb version. This is the strategy that we used for supporting Ogre 1.12, see bc30cd9 . In this case we would lose the possibility of compiling with tbb task when using tbb <= 2020, but I am not sure this is actually of interest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like a more sensible approach. I'll have a go 😄

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

endif (USE_LEGACY_TBB_TASK)

#################################################
# Find OGRE

Expand Down
1 change: 1 addition & 0 deletions gazebo/transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ set (headers
SubscribeOptions.hh
Subscriber.hh
SubscriptionTransport.hh
TaskGroup.hh
TopicManager.hh
TransportIface.hh
TransportTypes.hh
Expand Down
27 changes: 11 additions & 16 deletions gazebo/transport/Connection.hh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#ifndef _CONNECTION_HH_
#define _CONNECTION_HH_

#include <tbb/task.h>
#include <google/protobuf/message.h>

#include <boost/asio.hpp>
Expand All @@ -37,6 +36,7 @@
#include "gazebo/common/Console.hh"
#include "gazebo/common/Exception.hh"
#include "gazebo/common/WeakBind.hh"
#include "gazebo/transport/TaskGroup.hh"
#include "gazebo/util/system.hh"

#define HEADER_LENGTH 8
Expand All @@ -54,7 +54,7 @@ namespace gazebo
/// \cond
/// \brief A task instance that is created when data is read from
/// a socket and used by TBB
class GZ_TRANSPORT_VISIBLE ConnectionReadTask : public tbb::task
class GZ_TRANSPORT_VISIBLE ConnectionReadTask
{
/// \brief Constructor
/// \param[_in] _func Boost function pointer, which is the function
Expand All @@ -68,13 +68,9 @@ namespace gazebo
{
}

/// \bried Overridden function from tbb::task that exectues the data
/// callback.
public: tbb::task *execute()
{
this->func(this->data);
return NULL;
}
/// \brief Execute the data callback
public: void operator()() const
{ this->func(this->data); }

/// \brief The boost function pointer
private: boost::function<void (const std::string &)> func;
Expand Down Expand Up @@ -310,12 +306,7 @@ namespace gazebo

if (!_e && !transport::is_stopped())
{
ConnectionReadTask *task = new(tbb::task::allocate_root())
ConnectionReadTask(boost::get<0>(_handler), data);
tbb::task::enqueue(*task);

// Non-tbb version:
// boost::get<0>(_handler)(data);
this->taskGroup.run<ConnectionReadTask>(boost::get<0>(_handler), data);
}
}

Expand Down Expand Up @@ -372,7 +363,7 @@ namespace gazebo
private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint() const;

/// \brief Gets hostname
/// \param[in] _ep The end point to get the hostename of
/// \param[in] _ep The end point to get the hostname of
private: static std::string GetHostname(
boost::asio::ip::tcp::endpoint _ep);

Expand Down Expand Up @@ -465,6 +456,10 @@ namespace gazebo

/// \brief True if the connection is open.
private: bool isOpen;


/// \brief For managing asynchronous tasks with tbb
private: TaskGroup taskGroup;
};
/// \}
}
Expand Down
29 changes: 4 additions & 25 deletions gazebo/transport/ConnectionManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,16 @@
using namespace gazebo;
using namespace transport;

/// TBB task to process nodes.
class TopicManagerProcessTask : public tbb::task
{
/// Implements the necessary execute function
public: tbb::task *execute()
{
TopicManager::Instance()->ProcessNodes();
return NULL;
}
};

/// TBB task to establish subscriber to publisher connection.
class TopicManagerConnectionTask : public tbb::task
class TopicManagerConnectionTask
{
/// \brief Constructor.
/// \param[in] _pub Publish message
public: explicit TopicManagerConnectionTask(msgs::Publish _pub) : pub(_pub) {}

/// Implements the necessary execute function
public: tbb::task *execute()
{
TopicManager::Instance()->ConnectSubToPub(pub);
return NULL;
}
public: void operator()() const
{ TopicManager::Instance()->ConnectSubToPub(pub); }

/// \brief Publish message
private: msgs::Publish pub;
Expand Down Expand Up @@ -272,11 +258,6 @@ void ConnectionManager::RunUpdate()
if (this->masterConn)
this->masterConn->ProcessWriteQueue();

// Use TBB to process nodes. Need more testing to see if this makes
// a difference.
// TopicManagerProcessTask *task = new(tbb::task::allocate_root())
// TopicManagerProcessTask();
// tbb::task::enqueue(*task);
boost::recursive_mutex::scoped_lock lock(this->connectionMutex);

TopicManager::Instance()->ProcessNodes();
Expand Down Expand Up @@ -401,9 +382,7 @@ void ConnectionManager::ProcessMessage(const std::string &_data)
if (pub.host() != this->serverConn->GetLocalAddress() ||
pub.port() != this->serverConn->GetLocalPort())
{
TopicManagerConnectionTask *task = new(tbb::task::allocate_root())
TopicManagerConnectionTask(pub);
tbb::task::enqueue(*task);
this->taskGroup.run<TopicManagerConnectionTask>(pub);
}
}
// publisher_subscribe. This occurs when we try to subscribe to a topic, and
Expand Down
6 changes: 5 additions & 1 deletion gazebo/transport/ConnectionManager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
#include "gazebo/msgs/msgs.hh"
#include "gazebo/common/SingletonT.hh"

#include "gazebo/transport/Publisher.hh"
#include "gazebo/transport/Connection.hh"
#include "gazebo/transport/Publisher.hh"
#include "gazebo/transport/TaskGroup.hh"
#include "gazebo/util/system.hh"

/// \brief Explicit instantiation for typed SingletonT.
Expand Down Expand Up @@ -193,6 +194,9 @@ namespace gazebo

/// \brief Condition used for synchronization
private: boost::condition_variable namespaceCondition;

/// \brief For managing asynchronous tasks with tbb
private: TaskGroup taskGroup;

// Singleton implementation
private: friend class SingletonT<ConnectionManager>;
Expand Down
19 changes: 8 additions & 11 deletions gazebo/transport/Node.hh
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
#ifndef GAZEBO_TRANSPORT_NODE_HH_
#define GAZEBO_TRANSPORT_NODE_HH_

#include <tbb/task.h>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <map>
#include <list>
#include <string>
#include <vector>

#include "gazebo/transport/TaskGroup.hh"
#include "gazebo/transport/TransportTypes.hh"
#include "gazebo/transport/TopicManager.hh"
#include "gazebo/util/system.hh"
Expand All @@ -36,7 +36,7 @@ namespace gazebo
{
/// \cond
/// \brief Task used by Node::Publish to publish on a one-time publisher
class GZ_TRANSPORT_VISIBLE PublishTask : public tbb::task
class GZ_TRANSPORT_VISIBLE PublishTask
{
/// \brief Constructor
/// \param[in] _pub Publisher to publish the message on.
Expand All @@ -49,16 +49,14 @@ namespace gazebo
this->msg->CopyFrom(_message);
}

/// \brief Overridden function from tbb::task that exectues the
/// publish task.
public: tbb::task *execute()
/// \brief Executes the publish task.
public: void operator()()
{
this->pub->WaitForConnection();
this->pub->Publish(*this->msg, true);
this->pub->SendMessage();
delete this->msg;
this->pub.reset();
return NULL;
}

/// \brief Pointer to the publisher.
Expand Down Expand Up @@ -159,11 +157,7 @@ namespace gazebo
const google::protobuf::Message &_message)
{
transport::PublisherPtr pub = this->Advertise<M>(_topic);
PublishTask *task = new(tbb::task::allocate_root())
PublishTask(pub, _message);

tbb::task::enqueue(*task);
return;
this->taskGroup.run<PublishTask>(pub, _message);
}

/// \brief Advertise a topic
Expand Down Expand Up @@ -418,6 +412,9 @@ namespace gazebo

/// \brief List of newly arrive messages
private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;

/// \brief For managing asynchronous tasks with tbb
private: TaskGroup taskGroup;

private: boost::mutex publisherMutex;
private: boost::mutex publisherDeleteMutex;
Expand Down
84 changes: 84 additions & 0 deletions gazebo/transport/TaskGroup.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (C) 2021 Open Source Robotics Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef _TASK_GROUP_HH_
#define _TASK_GROUP_HH_

#include <utility>

#ifndef USE_LEGACY_TBB_TASK

// Emit is both a macro in Qt and a function defined by tbb
#undef emit
#include <oneapi/tbb/task_group.h>
#define emit

namespace gazebo {
namespace transport {
class TaskGroup
{
public: ~TaskGroup() noexcept
{
// Wait for running tasks to finish
this->taskGroup.wait();
}

public: template<class Functor, class... Args> void run(Args&&... args)
{
this->taskGroup.run(Functor(std::forward<Args>(args)...));
}

private: oneapi::tbb::task_group taskGroup;
};
}
}
#else
#include <tbb/task.h>

namespace gazebo {
namespace transport {
class TaskGroup
{
/// \brief A helper class which provides the requisite execute() method
/// required by tbb.
private: template<class T> class TaskWrapper : public tbb::task
{
public: template<class... Args> TaskWrapper<T>(Args&&... args)
: functor(std::forward<Args>(args)...)
{
}

public: tbb::task *execute()
{
this->functor();
return nullptr;
}

private: T functor;
};

public: template<class Functor, class... Args> void run(Args&&... args)
{
TaskWrapper<Functor> *task = new (tbb::task::allocate_root())
TaskWrapper<Functor>(std::forward<Args>(args)...);
tbb::task::enqueue(*task);
}
};
}
}

#endif
#endif
6 changes: 6 additions & 0 deletions gazebo/transport/transport_pch.hh
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
#include <string>
#include <tbb/blocked_range.h>
#include <tbb/parallel_for.h>
#ifdef USE_LEGACY_TBB_TASK
#include <tbb/task.h>
#else
#undef emit
#include <oneapi/tbb/task_group.h>
#define emit
#endif
#include <utility>
#include <vector>