Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support services run with multiple threads #315

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: ros-tooling/setup-ros@v0.1
- uses: ros-tooling/setup-ros@v0.2
with:
required-ros-distributions: "noetic rolling"
- name: Build and test ros1-bridge
Expand Down
73 changes: 44 additions & 29 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -223,35 +223,44 @@ endif()
macro(targets)
set(TEST_BRIDGE_RMW ${rmw_implementation})

configure_file(
test/test_topics_across_dynamic_bridge.py.in
test_topics_across_dynamic_bridge${target_suffix}.py.genexp
@ONLY
)
file(GENERATE
OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/test_topics_across_dynamic_bridge${target_suffix}_$<CONFIG>.py"
INPUT "${CMAKE_CURRENT_BINARY_DIR}/test_topics_across_dynamic_bridge${target_suffix}.py.genexp"
)
add_launch_test(
"${CMAKE_CURRENT_BINARY_DIR}/test_topics_across_dynamic_bridge${target_suffix}_$<CONFIG>.py"
TARGET test_topics_across_dynamic_bridge${target_suffix}
ENV RMW_IMPLEMENTATION=${rmw_implementation}
TIMEOUT 60)

configure_file(
test/test_services_across_dynamic_bridge.py.in
test_services_across_dynamic_bridge${target_suffix}.py.genexp
@ONLY
)
file(GENERATE
OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/test_services_across_dynamic_bridge${target_suffix}_$<CONFIG>.py"
INPUT "${CMAKE_CURRENT_BINARY_DIR}/test_services_across_dynamic_bridge${target_suffix}.py.genexp"
)
add_launch_test(
"${CMAKE_CURRENT_BINARY_DIR}/test_services_across_dynamic_bridge${target_suffix}_$<CONFIG>.py"
TARGET test_services_across_dynamic_bridge${target_suffix}
ENV RMW_IMPLEMENTATION=${rmw_implementation}
TIMEOUT 60)
list(LENGTH DYNAMIC_BRIDGE_TYPE count)
math(EXPR count "${count}-1")
foreach(i RANGE ${count})
list(GET DYNAMIC_BRIDGE_TYPE ${i} bridge_type)
list(GET DYNAMIC_BRIDGE_ARG ${i} bridge_arg)
set(TEST_BRIDGE_DYNAMIC_BRIDGE_ARG "${bridge_arg}")

configure_file(
test/test_topics_across_dynamic_bridge.py.in
test_topics_across_dynamic_bridge${target_suffix}_${bridge_type}.py.genexp
@ONLY
)
file(GENERATE
OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/test_topics_across_dynamic_bridge${target_suffix}_${bridge_type}_$<CONFIG>.py"
INPUT "${CMAKE_CURRENT_BINARY_DIR}/test_topics_across_dynamic_bridge${target_suffix}_${bridge_type}.py.genexp"
)
add_launch_test(
"${CMAKE_CURRENT_BINARY_DIR}/test_topics_across_dynamic_bridge${target_suffix}_${bridge_type}_$<CONFIG>.py"
TARGET test_topics_across_dynamic_bridge${target_suffix}_${bridge_type}
ENV RMW_IMPLEMENTATION=${rmw_implementation}
TIMEOUT 60)

configure_file(
test/test_services_across_dynamic_bridge.py.in
test_services_across_dynamic_bridge${target_suffix}_${bridge_type}.py.genexp
@ONLY
)
file(GENERATE
OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/test_services_across_dynamic_bridge${target_suffix}_${bridge_type}_$<CONFIG>.py"
INPUT "${CMAKE_CURRENT_BINARY_DIR}/test_services_across_dynamic_bridge${target_suffix}_${bridge_type}.py.genexp"
)
add_launch_test(
"${CMAKE_CURRENT_BINARY_DIR}/test_services_across_dynamic_bridge${target_suffix}_${bridge_type}_$<CONFIG>.py"
TARGET test_services_across_dynamic_bridge${target_suffix}_${bridge_type}
ENV RMW_IMPLEMENTATION=${rmw_implementation}
TIMEOUT 60)
endforeach()

endmacro()

if(TEST_ROS1_BRIDGE)
Expand All @@ -266,6 +275,12 @@ if(TEST_ROS1_BRIDGE)
set(TEST_BRIDGE_ROS1_CLIENT "$<TARGET_FILE:test_ros1_client>")
set(TEST_BRIDGE_ROS1_SERVER "$<TARGET_FILE:test_ros1_server>")

list(APPEND DYNAMIC_BRIDGE_TYPE "st")
list(APPEND DYNAMIC_BRIDGE_ARG " ")

list(APPEND DYNAMIC_BRIDGE_TYPE "mt")
list(APPEND DYNAMIC_BRIDGE_ARG "--multi-threads")

call_for_each_rmw_implementation(targets)
endif()

Expand Down
9 changes: 6 additions & 3 deletions include/ros1_bridge/bridge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ create_bridge_from_2_to_1(
const std::string & ros1_type_name,
const std::string & ros1_topic_name,
size_t publisher_queue_size,
rclcpp::PublisherBase::SharedPtr ros2_pub = nullptr);
rclcpp::PublisherBase::SharedPtr ros2_pub = nullptr,
bool custom_callback_group = false);

Bridge2to1Handles
create_bridge_from_2_to_1(
Expand All @@ -116,7 +117,8 @@ create_bridge_from_2_to_1(
const std::string & ros1_type_name,
const std::string & ros1_topic_name,
size_t publisher_queue_size,
rclcpp::PublisherBase::SharedPtr ros2_pub = nullptr);
rclcpp::PublisherBase::SharedPtr ros2_pub = nullptr,
bool custom_callback_group = false);

BridgeHandles
create_bidirectional_bridge(
Expand All @@ -125,7 +127,8 @@ create_bidirectional_bridge(
const std::string & ros1_type_name,
const std::string & ros2_type_name,
const std::string & topic_name,
size_t queue_size = 10);
size_t queue_size = 10,
bool custom_callback_group = false);

} // namespace ros1_bridge

Expand Down
64 changes: 55 additions & 9 deletions include/ros1_bridge/factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#define ROS1_BRIDGE__FACTORY_HPP_

#include <functional>
#include <map>
#include <memory>
#include <string>
#include <utility>
Expand All @@ -33,6 +34,32 @@
namespace ros1_bridge
{

static rclcpp::CallbackGroup::SharedPtr get_callback_group(
rclcpp::Node::SharedPtr ros2_node,
const std::string & topic_name = "")
{
auto node_base = ros2_node->get_node_base_interface();
rclcpp::CallbackGroup::SharedPtr group = nullptr;

typedef std::map<std::string, rclcpp::CallbackGroup::SharedPtr> CallbackGroupMap;
static CallbackGroupMap s_callbackgroups;
auto iter = s_callbackgroups.find(topic_name);
if (iter != s_callbackgroups.end()) {
return iter->second;
}

group = ros2_node->create_callback_group(
topic_name.empty() ?
// create a shared callback group with Reentrant for creating all ros2 clients and services
rclcpp::CallbackGroupType::Reentrant :
// create one CallbackGroup with MutuallyExclusive for each topic
// to ensure that the message data of each topic is received in order
rclcpp::CallbackGroupType::MutuallyExclusive);
s_callbackgroups.insert({topic_name, group});

return group;
}

template<typename ROS1_T, typename ROS2_T>
class Factory : public FactoryInterface
{
Expand Down Expand Up @@ -110,10 +137,12 @@ class Factory : public FactoryInterface
const std::string & topic_name,
size_t queue_size,
ros::Publisher ros1_pub,
rclcpp::PublisherBase::SharedPtr ros2_pub = nullptr)
rclcpp::PublisherBase::SharedPtr ros2_pub = nullptr,
bool custom_callback_group = false)
{
auto qos = rclcpp::SensorDataQoS(rclcpp::KeepLast(queue_size));
return create_ros2_subscriber(node, topic_name, qos, ros1_pub, ros2_pub);
return create_ros2_subscriber(
node, topic_name, qos, ros1_pub, ros2_pub, custom_callback_group);
}

rclcpp::SubscriptionBase::SharedPtr
Expand All @@ -122,12 +151,13 @@ class Factory : public FactoryInterface
const std::string & topic_name,
const rmw_qos_profile_t & qos,
ros::Publisher ros1_pub,
rclcpp::PublisherBase::SharedPtr ros2_pub = nullptr)
rclcpp::PublisherBase::SharedPtr ros2_pub = nullptr,
bool custom_callback_group = false)
{
auto rclcpp_qos = rclcpp::QoS(rclcpp::QoSInitialization::from_rmw(qos));
rclcpp_qos.get_rmw_qos_profile() = qos;
return create_ros2_subscriber(
node, topic_name, rclcpp_qos, ros1_pub, ros2_pub);
node, topic_name, rclcpp_qos, ros1_pub, ros2_pub, custom_callback_group);
}

rclcpp::SubscriptionBase::SharedPtr
Expand All @@ -136,7 +166,8 @@ class Factory : public FactoryInterface
const std::string & topic_name,
const rclcpp::QoS & qos,
ros::Publisher ros1_pub,
rclcpp::PublisherBase::SharedPtr ros2_pub = nullptr)
rclcpp::PublisherBase::SharedPtr ros2_pub = nullptr,
bool custom_callback_group = false)
{
std::function<
void(const typename ROS2_T::SharedPtr msg, const rclcpp::MessageInfo & msg_info)> callback;
Expand All @@ -145,6 +176,9 @@ class Factory : public FactoryInterface
ros1_pub, ros1_type_name_, ros2_type_name_, node->get_logger(), ros2_pub);
rclcpp::SubscriptionOptions options;
options.ignore_local_publications = true;
if (custom_callback_group) {
options.callback_group = ros1_bridge::get_callback_group(node, topic_name);
}
return node->create_subscription<ROS2_T>(
topic_name, qos, callback, options);
}
Expand Down Expand Up @@ -325,10 +359,16 @@ class ServiceFactory : public ServiceFactoryInterface
}

ServiceBridge1to2 service_bridge_1_to_2(
ros::NodeHandle & ros1_node, rclcpp::Node::SharedPtr ros2_node, const std::string & name)
ros::NodeHandle & ros1_node, rclcpp::Node::SharedPtr ros2_node, const std::string & name,
bool custom_callback_group = false)
{
ServiceBridge1to2 bridge;
bridge.client = ros2_node->create_client<ROS2_T>(name);
rclcpp::CallbackGroup::SharedPtr group = nullptr;
if (custom_callback_group) {
group = ros1_bridge::get_callback_group(ros2_node);
}
bridge.client = ros2_node->create_client<ROS2_T>(
name, rmw_qos_profile_services_default, group);
auto m = &ServiceFactory<ROS1_T, ROS2_T>::forward_1_to_2;
auto f = std::bind(
m, this, bridge.client, ros2_node->get_logger(), std::placeholders::_1,
Expand All @@ -338,7 +378,8 @@ class ServiceFactory : public ServiceFactoryInterface
}

ServiceBridge2to1 service_bridge_2_to_1(
ros::NodeHandle & ros1_node, rclcpp::Node::SharedPtr ros2_node, const std::string & name)
ros::NodeHandle & ros1_node, rclcpp::Node::SharedPtr ros2_node, const std::string & name,
bool custom_callback_group = false)
{
ServiceBridge2to1 bridge;
bridge.client = ros1_node.serviceClient<ROS1_T>(name);
Expand All @@ -351,7 +392,12 @@ class ServiceFactory : public ServiceFactoryInterface
f = std::bind(
m, this, bridge.client, ros2_node->get_logger(), std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3);
bridge.server = ros2_node->create_service<ROS2_T>(name, f);
rclcpp::CallbackGroup::SharedPtr group = nullptr;
if (custom_callback_group) {
group = ros1_bridge::get_callback_group(ros2_node);
}
bridge.server = ros2_node->create_service<ROS2_T>(
name, f, rmw_qos_profile_services_default, group);
return bridge;
}

Expand Down
15 changes: 10 additions & 5 deletions include/ros1_bridge/factory_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ class FactoryInterface
const std::string & topic_name,
size_t queue_size,
ros::Publisher ros1_pub,
rclcpp::PublisherBase::SharedPtr ros2_pub = nullptr) = 0;
rclcpp::PublisherBase::SharedPtr ros2_pub = nullptr,
bool custom_callback_group = false) = 0;

virtual
rclcpp::SubscriptionBase::SharedPtr
Expand All @@ -99,7 +100,8 @@ class FactoryInterface
const std::string & topic_name,
const rmw_qos_profile_t & qos_profile,
ros::Publisher ros1_pub,
rclcpp::PublisherBase::SharedPtr ros2_pub = nullptr) = 0;
rclcpp::PublisherBase::SharedPtr ros2_pub = nullptr,
bool custom_callback_group = false) = 0;

virtual
rclcpp::SubscriptionBase::SharedPtr
Expand All @@ -108,7 +110,8 @@ class FactoryInterface
const std::string & topic_name,
const rclcpp::QoS & qos,
ros::Publisher ros1_pub,
rclcpp::PublisherBase::SharedPtr ros2_pub = nullptr) = 0;
rclcpp::PublisherBase::SharedPtr ros2_pub = nullptr,
bool custom_callback_group = false) = 0;

virtual
void
Expand All @@ -123,10 +126,12 @@ class ServiceFactoryInterface
{
public:
virtual ServiceBridge1to2 service_bridge_1_to_2(
ros::NodeHandle &, rclcpp::Node::SharedPtr, const std::string &) = 0;
ros::NodeHandle &, rclcpp::Node::SharedPtr, const std::string &,
bool custom_callback_group = false) = 0;

virtual ServiceBridge2to1 service_bridge_2_to_1(
ros::NodeHandle &, rclcpp::Node::SharedPtr, const std::string &) = 0;
ros::NodeHandle &, rclcpp::Node::SharedPtr, const std::string &,
bool custom_callback_group = false) = 0;
};

} // namespace ros1_bridge
Expand Down
16 changes: 10 additions & 6 deletions src/bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ create_bridge_from_2_to_1(
const std::string & ros1_type_name,
const std::string & ros1_topic_name,
size_t publisher_queue_size,
rclcpp::PublisherBase::SharedPtr ros2_pub)
rclcpp::PublisherBase::SharedPtr ros2_pub,
bool custom_callback_group)
{
auto subscriber_qos = rclcpp::SensorDataQoS(rclcpp::KeepLast(subscriber_queue_size));
return create_bridge_from_2_to_1(
Expand All @@ -88,7 +89,8 @@ create_bridge_from_2_to_1(
ros1_type_name,
ros1_topic_name,
publisher_queue_size,
ros2_pub);
ros2_pub,
custom_callback_group);
}

Bridge2to1Handles
Expand All @@ -101,14 +103,15 @@ create_bridge_from_2_to_1(
const std::string & ros1_type_name,
const std::string & ros1_topic_name,
size_t publisher_queue_size,
rclcpp::PublisherBase::SharedPtr ros2_pub)
rclcpp::PublisherBase::SharedPtr ros2_pub,
bool custom_callback_group)
{
auto factory = get_factory(ros1_type_name, ros2_type_name);
auto ros1_pub = factory->create_ros1_publisher(
ros1_node, ros1_topic_name, publisher_queue_size);

auto ros2_sub = factory->create_ros2_subscriber(
ros2_node, ros2_topic_name, subscriber_qos, ros1_pub, ros2_pub);
ros2_node, ros2_topic_name, subscriber_qos, ros1_pub, ros2_pub, custom_callback_group);

Bridge2to1Handles handles;
handles.ros2_subscriber = ros2_sub;
Expand All @@ -123,7 +126,8 @@ create_bidirectional_bridge(
const std::string & ros1_type_name,
const std::string & ros2_type_name,
const std::string & topic_name,
size_t queue_size)
size_t queue_size,
bool custom_callback_group)
{
RCLCPP_INFO(
ros2_node->get_logger(), "create bidirectional bridge for topic %s",
Expand All @@ -135,7 +139,7 @@ create_bidirectional_bridge(
handles.bridge2to1 = create_bridge_from_2_to_1(
ros2_node, ros1_node,
ros2_type_name, topic_name, queue_size, ros1_type_name, topic_name, queue_size,
handles.bridge1to2.ros2_publisher);
handles.bridge1to2.ros2_publisher, custom_callback_group);
return handles;
}

Expand Down
Loading