diff --git a/rmw_cyclonedds_cpp/CMakeLists.txt b/rmw_cyclonedds_cpp/CMakeLists.txt index 60bd8170..cffd7f75 100644 --- a/rmw_cyclonedds_cpp/CMakeLists.txt +++ b/rmw_cyclonedds_cpp/CMakeLists.txt @@ -50,7 +50,6 @@ ament_export_dependencies(rosidl_typesupport_introspection_cpp) add_library(rmw_cyclonedds_cpp src/rmw_node.cpp - src/rmw_get_topic_endpoint_info.cpp src/serdata.cpp src/serdes.cpp src/graphrhc.cpp diff --git a/rmw_cyclonedds_cpp/src/rmw_get_topic_endpoint_info.cpp b/rmw_cyclonedds_cpp/src/rmw_get_topic_endpoint_info.cpp deleted file mode 100644 index 16176a97..00000000 --- a/rmw_cyclonedds_cpp/src/rmw_get_topic_endpoint_info.cpp +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "rmw_cyclonedds_cpp/rmw_version_test.hpp" - -#if RMW_VERSION_GTE(0, 8, 2) - -#include "rmw/error_handling.h" -#include "rmw/get_topic_endpoint_info.h" -#include "rmw/topic_endpoint_info_array.h" - -extern "C" -{ -rmw_ret_t -rmw_get_publishers_info_by_topic( - const rmw_node_t * /* unused_param */, - rcutils_allocator_t * /* unused_param */, - const char * /* unused_param */, - bool /* unused_param */, - rmw_topic_endpoint_info_array_t * /* unused_param */) -{ - return RMW_RET_UNSUPPORTED; -} - -rmw_ret_t -rmw_get_subscriptions_info_by_topic( - const rmw_node_t * /* unused_param */, - rcutils_allocator_t * /* unused_param */, - const char * /* unused_param */, - bool /* unused_param */, - rmw_topic_endpoint_info_array_t * /* unused_param */) -{ - return RMW_RET_UNSUPPORTED; -} -} // extern "C" - -#endif diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index 3b0adcc9..cc6ffdd6 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -51,6 +51,11 @@ #include "rmw_cyclonedds_cpp/MessageTypeSupport.hpp" #include "rmw_cyclonedds_cpp/ServiceTypeSupport.hpp" +#if RMW_VERSION_GTE(0, 8, 2) +#include "rmw/get_topic_endpoint_info.h" +#include "rmw/topic_endpoint_info_array.h" +#endif + #include "namespace_prefix.hpp" #include "dds/dds.h" @@ -1110,20 +1115,16 @@ static dds_qos_t * create_readwrite_qos( return qos; } -static bool get_readwrite_qos(dds_entity_t handle, rmw_qos_profile_t * qos_policies) +static bool dds_qos_to_rmw_qos(const dds_qos_t * dds_qos, rmw_qos_profile_t * qos_policies) { - dds_qos_t * qos = dds_create_qos(); - if (dds_get_qos(handle, qos) < 0) { - RMW_SET_ERROR_MSG("get_readwrite_qos: invalid handle"); - goto error; - } - + assert(dds_qos); + assert(qos_policies); { dds_history_kind_t kind; int32_t depth; - if (!dds_qget_history(qos, &kind, &depth)) { + if (!dds_qget_history(dds_qos, &kind, &depth)) { RMW_SET_ERROR_MSG("get_readwrite_qos: history not set"); - goto error; + return false; } switch (kind) { case DDS_HISTORY_KEEP_LAST: @@ -1142,9 +1143,9 @@ static bool get_readwrite_qos(dds_entity_t handle, rmw_qos_profile_t * qos_polic { dds_reliability_kind_t kind; dds_duration_t max_blocking_time; - if (!dds_qget_reliability(qos, &kind, &max_blocking_time)) { + if (!dds_qget_reliability(dds_qos, &kind, &max_blocking_time)) { RMW_SET_ERROR_MSG("get_readwrite_qos: history not set"); - goto error; + return false; } switch (kind) { case DDS_RELIABILITY_BEST_EFFORT: @@ -1160,9 +1161,9 @@ static bool get_readwrite_qos(dds_entity_t handle, rmw_qos_profile_t * qos_polic { dds_durability_kind_t kind; - if (!dds_qget_durability(qos, &kind)) { + if (!dds_qget_durability(dds_qos, &kind)) { RMW_SET_ERROR_MSG("get_readwrite_qos: durability not set"); - goto error; + return false; } switch (kind) { case DDS_DURABILITY_VOLATILE: @@ -1182,9 +1183,9 @@ static bool get_readwrite_qos(dds_entity_t handle, rmw_qos_profile_t * qos_polic { dds_duration_t deadline; - if (!dds_qget_deadline(qos, &deadline)) { + if (!dds_qget_deadline(dds_qos, &deadline)) { RMW_SET_ERROR_MSG("get_readwrite_qos: deadline not set"); - goto error; + return false; } qos_policies->deadline.sec = (uint64_t) deadline / 1000000000; qos_policies->deadline.nsec = (uint64_t) deadline % 1000000000; @@ -1192,7 +1193,7 @@ static bool get_readwrite_qos(dds_entity_t handle, rmw_qos_profile_t * qos_polic { dds_duration_t lifespan; - if (!dds_qget_lifespan(qos, &lifespan)) { + if (!dds_qget_lifespan(dds_qos, &lifespan)) { lifespan = DDS_INFINITY; } qos_policies->lifespan.sec = (uint64_t) lifespan / 1000000000; @@ -1202,9 +1203,9 @@ static bool get_readwrite_qos(dds_entity_t handle, rmw_qos_profile_t * qos_polic { dds_liveliness_kind_t kind; dds_duration_t lease_duration; - if (!dds_qget_liveliness(qos, &kind, &lease_duration)) { + if (!dds_qget_liveliness(dds_qos, &kind, &lease_duration)) { RMW_SET_ERROR_MSG("get_readwrite_qos: liveliness not set"); - goto error; + return false; } switch (kind) { case DDS_LIVELINESS_AUTOMATIC: @@ -1223,11 +1224,20 @@ static bool get_readwrite_qos(dds_entity_t handle, rmw_qos_profile_t * qos_polic qos_policies->liveliness_lease_duration.nsec = (uint64_t) lease_duration % 1000000000; } - dds_delete_qos(qos); return true; -error: +} + +static bool get_readwrite_qos(dds_entity_t handle, rmw_qos_profile_t * rmw_qos_policies) +{ + dds_qos_t * qos = dds_create_qos(); + dds_return_t ret = false; + if (dds_get_qos(handle, qos) < 0) { + RMW_SET_ERROR_MSG("get_readwrite_qos: invalid handle"); + } else { + ret = dds_qos_to_rmw_qos(qos, rmw_qos_policies); + } dds_delete_qos(qos); - return false; + return ret; } static CddsPublisher * create_cdds_publisher( @@ -2691,8 +2701,69 @@ extern "C" rmw_ret_t rmw_destroy_service(rmw_node_t * node, rmw_service_t * serv /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// +enum topic_kind +{ + DEFAULT, + REQUEST_RESPONSE +}; + +static bool demangle_topic_name( + const char * topic_name, + topic_kind topic_kind, + std::string & demangled_topic_name, + bool * is_request) +{ + std::string re; + bool is_def_kind = topic_kind == topic_kind::DEFAULT; + if (is_def_kind) { + re = "^" + std::string(ROS_TOPIC_PREFIX) + "(/.*)"; + } else { + re = "^(" + std::string(ROS_SERVICE_REQUESTER_PREFIX) + "|" + + std::string(ROS_SERVICE_RESPONSE_PREFIX) + ")(/.*)(Request|Reply)$"; + } + const auto re_tp = std::regex(re, std::regex::extended); + std::cmatch cm_tp; + if (std::regex_search(topic_name, cm_tp, re_tp)) { + demangled_topic_name = std::string(cm_tp[is_def_kind ? 1 : 2]); + if (is_request) { + assert(!is_def_kind); + *is_request = (std::string(cm_tp[3]) == "Request"); + } + return true; + } + return false; +} + +static bool demangle_topic_name( + const char * topic_name, + topic_kind topic_kind, + std::string & demangled_topic_name) +{ + return demangle_topic_name(topic_name, topic_kind, demangled_topic_name, NULL); +} + +static bool demangle_topic_type( + const char * type_name, + topic_kind topic_kind, + std::string & demangled_type_name) +{ + bool is_def_kind = topic_kind == topic_kind::DEFAULT; + const auto re_typ = std::regex("^(.*::)dds_::(.*)_" + + (is_def_kind ? std::string() : std::string("(Response|Request)_")) + + std::string("$"), + std::regex::extended); + std::cmatch cm_typ; + if (std::regex_search(type_name, cm_typ, re_typ)) { + demangled_type_name = + std::regex_replace(std::string(cm_typ[1]), std::regex("::"), "/") + + std::string(cm_typ[2]); + return true; + } + return false; +} + static rmw_ret_t do_for_node( - CddsNode * node_impl, + const CddsNode * node_impl, std::function oper) { dds_entity_t rd; @@ -2723,7 +2794,7 @@ static rmw_ret_t do_for_node( } static rmw_ret_t do_for_node_user_data( - CddsNode * node_impl, + const CddsNode * node_impl, std::function oper) { auto f = [oper](const dds_builtintopic_participant_t & sample) -> bool { @@ -2812,19 +2883,17 @@ extern "C" rmw_ret_t rmw_get_node_names( return RMW_RET_BAD_ALLOC; } -static rmw_ret_t rmw_collect_tptyp_for_kind( - std::map> & tt, +static rmw_ret_t rmw_collect_data_for_endpoint( CddsNode * node_impl, dds_entity_t builtin_topic, - std::function filter_and_map) + std::function filter_and_map) { assert( builtin_topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION || builtin_topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION); dds_entity_t rd; if ((rd = dds_create_reader(node_impl->enth, builtin_topic, NULL, NULL)) < 0) { - RMW_SET_ERROR_MSG("rmw_collect_tptyp_for_kind failed to create reader"); + RMW_SET_ERROR_MSG("rmw_collect_data_for_endpoint failed to create reader"); return RMW_RET_ERROR; } dds_sample_info_t info; @@ -2833,10 +2902,7 @@ static rmw_ret_t rmw_collect_tptyp_for_kind( while ((n = dds_take(rd, &msg, &info, 1, 1)) == 1) { if (info.valid_data && info.instance_state == DDS_IST_ALIVE) { auto sample = static_cast(msg); - std::string topic_name, type_name; - if (filter_and_map(*sample, topic_name, type_name)) { - tt[topic_name].insert(type_name); - } + filter_and_map(*sample); } dds_return_loan(rd, &msg, n); } @@ -2844,11 +2910,237 @@ static rmw_ret_t rmw_collect_tptyp_for_kind( if (n == 0) { return RMW_RET_OK; } else { - RMW_SET_ERROR_MSG("rmw_collect_tptyp_for_kind dds_take failed"); + RMW_SET_ERROR_MSG("rmw_collect_data_for_endpoint dds_take failed"); return RMW_RET_ERROR; } } +struct endpoint_info_t +{ + std::string topic_type; + dds_builtintopic_guid_t endpoint_key; + dds_builtintopic_guid_t node_key; + dds_instance_handle_t node_insth; + rmw_qos_profile_t qos_profile; +}; + +typedef std::map> node_cache_t; + +static void get_node_name( + dds_entity_t ppant_rd, + const dds_builtintopic_guid_t & pp_guid, + const dds_instance_handle_t pp_insth, + std::string & node_name, + std::string & node_ns) +{ + static_cast(pp_guid); // only used in assert() + bool node_found = false; + const auto re_ud = std::regex("^name=(.*);namespace=(.*);$", std::regex::extended); + size_t udsz; + dds_sample_info_t info; + void * msg = NULL; + int32_t n; + if ((n = dds_read_instance(ppant_rd, &msg, &info, 1, 1, pp_insth)) == 1) { + void * ud; + auto sample = static_cast(msg); + assert(!memcmp(&sample->key, &pp_guid, sizeof(dds_builtintopic_guid_t))); + if (dds_qget_userdata(sample->qos, &ud, &udsz) && ud != nullptr) { + std::cmatch cm; + if (std::regex_search(reinterpret_cast(ud), cm, re_ud)) { + node_found = true; + node_name = std::string(cm[1]); + node_ns = std::string(cm[2]); + } + } + dds_return_loan(ppant_rd, &msg, n); + } + if (!node_found) { + node_name = "_NODE_NAME_UNKNOWN_"; + node_ns = "_NODE_NAMESPACE_UNKNOWN_"; + } +} + +static rmw_ret_t +set_rmw_topic_endpoint_info( + node_cache_t & node_cache, + dds_entity_t ppant_rd, + rcutils_allocator_t * allocator, + const endpoint_info_t & epinfo, + bool is_publisher, + rmw_topic_endpoint_info_t & topic_endpoint_info) +{ + static_assert( + sizeof(dds_builtintopic_guid_t) <= RMW_GID_STORAGE_SIZE, + "RMW_GID_STORAGE_SIZE insufficient to store the rmw_cyclonedds_cpp GID implementation." + ); + rmw_ret_t ret; + std::string node_name; + std::string node_ns; + const auto it = node_cache.find(epinfo.node_key); + if (it != node_cache.end()) { + node_name = it->second.first; + node_ns = it->second.second; + } else { + get_node_name(ppant_rd, epinfo.node_key, epinfo.node_insth, node_name, node_ns); + node_cache[epinfo.node_key] = std::make_pair(node_name, node_ns); + } + if ((ret = rmw_topic_endpoint_info_set_node_name( + &topic_endpoint_info, + node_name.c_str(), + allocator)) != RMW_RET_OK) + { + return ret; + } + if ((ret = rmw_topic_endpoint_info_set_node_namespace( + &topic_endpoint_info, + node_ns.c_str(), + allocator)) != RMW_RET_OK) + { + return ret; + } + if ((ret = rmw_topic_endpoint_info_set_topic_type( + &topic_endpoint_info, + epinfo.topic_type.c_str(), + allocator)) != RMW_RET_OK) + { + return ret; + } + if ((ret = rmw_topic_endpoint_info_set_endpoint_type( + &topic_endpoint_info, + is_publisher ? RMW_ENDPOINT_PUBLISHER : RMW_ENDPOINT_SUBSCRIPTION)) != RMW_RET_OK) + { + return ret; + } + if ((ret = rmw_topic_endpoint_info_set_gid( + &topic_endpoint_info, + epinfo.endpoint_key.v, + sizeof(dds_builtintopic_guid_t))) != RMW_RET_OK) + { + return ret; + } + if ((ret = rmw_topic_endpoint_info_set_qos_profile( + &topic_endpoint_info, + &epinfo.qos_profile)) != RMW_RET_OK) + { + return ret; + } + return RMW_RET_OK; +} + +static void handle_topic_endpoint_info_array_fini( + rmw_topic_endpoint_info_array_t * topic_endpoint_info_array, + rcutils_allocator_t * allocator) +{ + std::string err_msg; + rmw_ret_t ret; + bool err_set = rmw_error_is_set(); + if (err_set) { + err_msg = rmw_get_error_string().str; + } + rmw_reset_error(); + if ((ret = + rmw_topic_endpoint_info_array_fini(topic_endpoint_info_array, allocator)) != RMW_RET_OK) + { + RCUTILS_LOG_ERROR_NAMED( + "rmw_cyclonedds_cpp", + "handle_topic_endpoint_info_array_fini failed: %s", + rmw_get_error_string().str); + rmw_reset_error(); + } + if (err_set) { + RMW_SET_ERROR_MSG(err_msg.c_str()); + } +} + +static rmw_ret_t +get_endpoint_info_by_topic( + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * topic_name, + bool no_mangle, + bool is_publisher, + rmw_topic_endpoint_info_array_t * endpoint_info) +{ + auto node_impl = static_cast(node->data); + const std::string fqtopic_name = make_fqtopic(ROS_TOPIC_PREFIX, topic_name, "", no_mangle); + std::vector endpoint_info_vector; + rmw_ret_t ret; + + const auto filter_and_map = + [fqtopic_name, no_mangle, &endpoint_info_vector]( + const dds_builtintopic_endpoint_t & sample) -> void + { + endpoint_info_t info; + if (std::string(sample.topic_name) == fqtopic_name) { + info.endpoint_key = sample.key; + dds_qos_to_rmw_qos(sample.qos, &info.qos_profile); + if (no_mangle) { + info.topic_type = std::string(sample.type_name); + } else if (!demangle_topic_type(sample.type_name, topic_kind::DEFAULT, info.topic_type)) { + // skip a non-ROS2 endpoint + return; + } + info.node_key = sample.participant_key; + info.node_insth = sample.participant_instance_handle; + endpoint_info_vector.push_back(info); + } + }; + if ((ret = rmw_collect_data_for_endpoint( + node_impl, + is_publisher ? DDS_BUILTIN_TOPIC_DCPSPUBLICATION : DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, + filter_and_map)) != RMW_RET_OK) + { + return ret; + } + + size_t epi_count = endpoint_info_vector.size(); + if ((ret = rmw_topic_endpoint_info_array_init_with_size( + endpoint_info, + epi_count, + allocator)) != RMW_RET_OK) + { + rmw_error_string_t err_msg = rmw_get_error_string(); + rmw_reset_error(); + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "rmw_topic_endpoint_info_array_init_with_size failed to allocate memory: %s", + err_msg.str); + return ret; + } + endpoint_info->count = 0; + + node_cache_t node_cache; + dds_entity_t ppant_rd; + if ( + (ppant_rd = dds_create_reader( + node_impl->enth, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, NULL, NULL)) < 0) + { + RMW_SET_ERROR_MSG("get_endpoint_info_by_topic: failed to create reader"); + handle_topic_endpoint_info_array_fini(endpoint_info, allocator); + return RMW_RET_ERROR; + } + for (const auto & epi : endpoint_info_vector) { + endpoint_info->info_array[endpoint_info->count] = + rmw_get_zero_initialized_topic_endpoint_info(); + if ((ret = set_rmw_topic_endpoint_info( + node_cache, + ppant_rd, + allocator, + epi, + is_publisher, + endpoint_info->info_array[endpoint_info->count])) != RMW_RET_OK) + { + endpoint_info->count++; + handle_topic_endpoint_info_array_fini(endpoint_info, allocator); + dds_delete(ppant_rd); + return ret; + } + endpoint_info->count++; + } + assert(endpoint_info->count == epi_count); + dds_delete(ppant_rd); + return RMW_RET_OK; +} + static rmw_ret_t make_names_and_types( rmw_names_and_types_t * tptyp, const std::map> & source, @@ -2887,7 +3179,7 @@ static rmw_ret_t make_names_and_types( fail_mem: if (rmw_names_and_types_fini(tptyp) != RMW_RET_OK) { - RMW_SET_ERROR_MSG("rmw_collect_tptyp_for_kind: rmw_names_and_types_fini failed"); + RMW_SET_ERROR_MSG("make_names_and_types: rmw_names_and_types_fini failed"); } return RMW_RET_BAD_ALLOC; } @@ -2953,46 +3245,32 @@ static rmw_ret_t get_endpoint_names_and_types_by_node( { return ret; } - const auto re_tp = - std::regex("^" + std::string(ROS_TOPIC_PREFIX) + "(/.*)", std::regex::extended); - const auto re_typ = std::regex("^(.*::)dds_::(.*)_$", std::regex::extended); + std::map> tt; const auto filter_and_map = - [re_tp, re_typ, guids, node_name, no_demangle](const dds_builtintopic_endpoint_t & sample, - std::string & topic_name, std::string & type_name) -> bool { - std::cmatch cm_tp, cm_typ; - if (node_name != nullptr && guids.count(sample.participant_key) == 0) { - return false; - } else { - if ( - !std::regex_search(sample.topic_name, cm_tp, re_tp) || - !std::regex_search(sample.type_name, cm_typ, re_typ)) - { - return false; - } else { - topic_name = std::string(cm_tp[1]); - if (no_demangle) { - type_name = std::string(type_name); - } else { - std::string demangled_type = std::regex_replace( - std::string(cm_typ[1]), std::regex("::"), "/"); - type_name = std::string(demangled_type) + std::string(cm_typ[2]); - } - return true; - } + [&tt, guids, node_name, no_demangle]( + const dds_builtintopic_endpoint_t & sample) -> void + { + std::string demangled_topic_name, demangled_type_name; + if ( + (node_name == nullptr || guids.count(sample.participant_key) != 0) && + demangle_topic_name(sample.topic_name, topic_kind::DEFAULT, demangled_topic_name) && + demangle_topic_type(sample.type_name, topic_kind::DEFAULT, demangled_type_name)) + { + tt[demangled_topic_name].insert( + no_demangle ? std::string(sample.type_name) : demangled_type_name); } }; - std::map> tt; if (subs && (ret = - rmw_collect_tptyp_for_kind( - tt, node_impl, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, filter_and_map)) != RMW_RET_OK) + rmw_collect_data_for_endpoint( + node_impl, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, filter_and_map)) != RMW_RET_OK) { return ret; } if (pubs && (ret = - rmw_collect_tptyp_for_kind( - tt, node_impl, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, filter_and_map)) != RMW_RET_OK) + rmw_collect_data_for_endpoint( + node_impl, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, filter_and_map)) != RMW_RET_OK) { return ret; } @@ -3045,45 +3323,26 @@ static rmw_ret_t get_cs_names_and_types_by_node( { return ret; } - const auto re_tp = std::regex( - "^(" + std::string(ROS_SERVICE_REQUESTER_PREFIX) + "|" + - std::string(ROS_SERVICE_RESPONSE_PREFIX) + ")(/.*)(Request|Reply)$", - std::regex::extended); - const auto re_typ = std::regex("^(.*::)dds_::(.*)_(Response|Request)_$", std::regex::extended); - const auto filter_and_map = [re_tp, re_typ, guids, node_name, looking_for_services]( - const dds_builtintopic_endpoint_t & sample, - std::string & topic_name, std::string & type_name) -> bool { - std::cmatch cm_tp, cm_typ; - if (node_name != nullptr && guids.count(sample.participant_key) == 0) { - return false; - } - if ( - !std::regex_search(sample.topic_name, cm_tp, re_tp) || - !std::regex_search(sample.type_name, cm_typ, re_typ)) + std::map> tt; + const auto filter_and_map = [&tt, guids, node_name, looking_for_services]( + const dds_builtintopic_endpoint_t & sample) -> void { + std::string topic_name, type_name; + bool is_request; + if ((node_name == nullptr || guids.count(sample.participant_key) != 0) && + demangle_topic_name( + sample.topic_name, topic_kind::REQUEST_RESPONSE, topic_name, &is_request) && + demangle_topic_type(sample.type_name, topic_kind::REQUEST_RESPONSE, type_name) && + looking_for_services == endpoint_is_from_service(is_request, sample.key)) { - return false; - } else { - if ( - looking_for_services != - endpoint_is_from_service(std::string(cm_tp[3]) == "Request", sample.key)) - { - return false; - } else { - std::string demangled_type = - std::regex_replace(std::string(cm_typ[1]), std::regex("::"), "/"); - topic_name = std::string(cm_tp[2]); - type_name = std::string(demangled_type) + std::string(cm_typ[2]); - return true; - } + tt[topic_name].insert(type_name); } }; - std::map> tt; if ((ret = - rmw_collect_tptyp_for_kind( - tt, node_impl, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, filter_and_map)) != RMW_RET_OK || + rmw_collect_data_for_endpoint( + node_impl, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, filter_and_map)) != RMW_RET_OK || (ret = - rmw_collect_tptyp_for_kind( - tt, node_impl, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, filter_and_map)) != RMW_RET_OK) + rmw_collect_data_for_endpoint( + node_impl, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, filter_and_map)) != RMW_RET_OK) { return ret; } @@ -3220,3 +3479,40 @@ extern "C" rmw_ret_t rmw_get_client_names_and_types_by_node( { return get_cs_names_and_types_by_node(node, allocator, node_name, node_namespace, sntyp, false); } + + +#if RMW_VERSION_GTE(0, 8, 2) + +extern "C" rmw_ret_t rmw_get_publishers_info_by_topic( + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * topic_name, + bool no_mangle, + rmw_topic_endpoint_info_array_t * publishers_info) +{ + return get_endpoint_info_by_topic( + node, + allocator, + topic_name, + no_mangle, + true, + publishers_info); +} + +extern "C" rmw_ret_t rmw_get_subscriptions_info_by_topic( + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * topic_name, + bool no_mangle, + rmw_topic_endpoint_info_array_t * subscriptions_info) +{ + return get_endpoint_info_by_topic( + node, + allocator, + topic_name, + no_mangle, + false, + subscriptions_info); +} + +#endif