Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance TopicCache to include QoS Policies of publishers and subscribers #332

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,11 @@ class ParticipantListener : public eprosima::fastrtps::ParticipantListener
{
std::lock_guard<std::mutex> guard(topic_cache.getMutex());
if (is_alive) {
trigger = topic_cache().addTopic(proxyData.RTPSParticipantKey(),
proxyData.topicName().to_string(), proxyData.typeName().to_string());
trigger = topic_cache().addTopic(
proxyData.RTPSParticipantKey(),
proxyData.topicName().to_string(),
proxyData.typeName().to_string(),
proxyData.m_qos);
} else {
trigger = topic_cache().removeTopic(proxyData.RTPSParticipantKey(),
proxyData.topicName().to_string(), proxyData.typeName().to_string());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima).
// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -13,8 +13,8 @@
// limitations under the License.


#ifndef QOS_CONVERTER_HPP_
#define QOS_CONVERTER_HPP_
#ifndef RMW_FASTRTPS_SHARED_CPP__QOS_CONVERTER_HPP_
#define RMW_FASTRTPS_SHARED_CPP__QOS_CONVERTER_HPP_

#include "rmw/types.h"

Expand Down Expand Up @@ -83,4 +83,4 @@ dds_qos_to_rmw_qos(
qos->liveliness_lease_duration.nsec = dds_qos.m_liveliness.lease_duration.nanosec;
}

#endif // QOS_CONVERTER_HPP_
#endif // RMW_FASTRTPS_SHARED_CPP__QOS_CONVERTER_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
#include <set>
#include <sstream>
#include <string>
#include <tuple>
#include <unordered_map>
#include <utility>
#include <vector>

#include "fastrtps/participant/Participant.h"
#include "fastrtps/rtps/common/Guid.h"
#include "fastrtps/rtps/common/InstanceHandle.h"
#include "qos_converter.hpp"
#include "rcpputils/thread_safety_annotations.hpp"
#include "rcutils/logging_macros.h"

Expand All @@ -39,31 +41,48 @@ typedef eprosima::fastrtps::rtps::GUID_t GUID_t;
class TopicCache
{
private:
typedef std::map<GUID_t,
std::unordered_map<std::string, std::vector<std::string>>> ParticipantTopicMap;
typedef std::unordered_map<std::string, std::vector<std::string>> TopicToTypes;
typedef std::map<GUID_t, TopicToTypes> ParticipantTopicMap;
typedef std::vector<std::tuple<GUID_t, std::string, rmw_qos_profile_t>> TopicData;
typedef std::unordered_map<std::string, TopicData> TopicNameToTopicData;

/**
* Map of topic names to a vector of types that topic may use.
* Map of topic names to TopicData. Where topic data is vector of tuples containing
* participant GUID, topic type and the qos policy of the respective participant.
* Topics here are represented as one to many, DDS XTypes 1.2
* specifies application code 'generally' uses a 1-1 relationship.
* However, generic services such as logger and monitor, can discover
* multiple types on the same topic.
*
*/
TopicToTypes topic_to_types_;
TopicNameToTopicData topic_name_to_topic_data_;

/**
* Map of participant GUIDS to a set of topic-type.
*/
ParticipantTopicMap participant_to_topics_;

/**
* Helper function to initialize a topic vector.
* Helper function to initialize an empty TopicData for a topic name.
*
* @param topic_name
* @param topic_name the topic name for which the TopicNameToTopicData map should be initialised.
* @param topic_name_to_topic_data the map to initialise.
*/
void initializeTopic(const std::string & topic_name, TopicToTypes & topic_to_types)
void initializeTopicDataMap(
const std::string & topic_name,
TopicNameToTopicData & topic_name_to_topic_data)
{
if (topic_name_to_topic_data.find(topic_name) == topic_name_to_topic_data.end()) {
topic_name_to_topic_data[topic_name] = TopicData();
}
}

/**
* Helper function to initialize a topic vector.
*
* @param topic_name the topic name for which the TopicToTypes map should be initialised.
* @param topic_to_types the map to initialise.
*/
void initializeTopicTypesMap(const std::string & topic_name, TopicToTypes & topic_to_types)
{
if (topic_to_types.find(topic_name) == topic_to_types.end()) {
topic_to_types[topic_name] = std::vector<std::string>();
Expand All @@ -89,9 +108,16 @@ class TopicCache
/**
* @return a map of topic name to the vector of topic types used.
*/
const TopicToTypes & getTopicToTypes() const
const TopicToTypes getTopicToTypes() const
{
return topic_to_types_;
TopicToTypes topic_to_types;
for (const auto & it : topic_name_to_topic_data_) {
topic_to_types[it.first] = std::vector<std::string>();
for (const auto & mit : it.second) {
topic_to_types[it.first].push_back(std::get<1>(mit));
}
}
return topic_to_types;
}

/**
Expand All @@ -102,6 +128,14 @@ class TopicCache
return participant_to_topics_;
}

/**
* @return a map of topic name to a vector of GUID_t, type name and qos profile tuple.
*/
const TopicNameToTopicData & getTopicNameToTopicData() const
{
return topic_name_to_topic_data_;
}

/**
* Add a topic based on discovery.
*
Expand All @@ -110,15 +144,17 @@ class TopicCache
* @param type_name
* @return true if a change has been recorded
*/
template<class T>
bool addTopic(
const eprosima::fastrtps::rtps::InstanceHandle_t & rtpsParticipantKey,
const std::string & topic_name,
const std::string & type_name)
const std::string & type_name,
const T & dds_qos)
{
initializeTopic(topic_name, topic_to_types_);
initializeTopicDataMap(topic_name, topic_name_to_topic_data_);
auto guid = iHandle2GUID(rtpsParticipantKey);
initializeParticipantMap(participant_to_topics_, guid);
initializeTopic(topic_name, participant_to_topics_[guid]);
initializeTopicTypesMap(topic_name, participant_to_topics_[guid]);
if (rcutils_logging_logger_is_enabled_for("rmw_fastrtps_shared_cpp",
RCUTILS_LOG_SEVERITY_DEBUG))
{
Expand All @@ -129,7 +165,9 @@ class TopicCache
"Adding topic '%s' with type '%s' for node '%s'",
topic_name.c_str(), type_name.c_str(), guid_stream.str().c_str());
}
topic_to_types_[topic_name].push_back(type_name);
auto qos_profile = rmw_qos_profile_t();
dds_qos_to_rmw_qos(dds_qos, &qos_profile);
topic_name_to_topic_data_[topic_name].push_back(std::make_tuple(guid, type_name, qos_profile));
participant_to_topics_[guid][topic_name].push_back(type_name);
return true;
}
Expand All @@ -147,22 +185,26 @@ class TopicCache
const std::string & topic_name,
const std::string & type_name)
{
if (topic_to_types_.find(topic_name) == topic_to_types_.end()) {
if (topic_name_to_topic_data_.find(topic_name) == topic_name_to_topic_data_.end()) {
RCUTILS_LOG_DEBUG_NAMED(
"rmw_fastrtps_shared_cpp",
"unexpected removal on topic '%s' with type '%s'",
topic_name.c_str(), type_name.c_str());
return false;
}
auto guid = iHandle2GUID(rtpsParticipantKey);
{
auto & type_vec = topic_to_types_[topic_name];
type_vec.erase(std::find(type_vec.begin(), type_vec.end(), type_name));
auto & type_vec = topic_name_to_topic_data_[topic_name];
type_vec.erase(std::find_if(type_vec.begin(), type_vec.end(),
[type_name, guid](const auto & topic_info) {
return type_name.compare(std::get<1>(topic_info)) == 0 &&
guid == std::get<0>(topic_info);
}));
if (type_vec.empty()) {
topic_to_types_.erase(topic_name);
topic_name_to_topic_data_.erase(topic_name);
}
}

auto guid = iHandle2GUID(rtpsParticipantKey);
auto guid_topics_pair = participant_to_topics_.find(guid);
if (guid_topics_pair != participant_to_topics_.end() &&
guid_topics_pair->second.find(topic_name) != guid_topics_pair->second.end())
Expand All @@ -180,6 +222,7 @@ class TopicCache
"rmw_fastrtps_shared_cpp",
"Unable to remove topic, does not exist '%s' with type '%s'",
topic_name.c_str(), type_name.c_str());
return false;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivanpauno Added this because the current implementation seems to return true even if removal is not possible.

}
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion rmw_fastrtps_shared_cpp/src/qos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

#include <limits>

#include "qos_converter.hpp"
#include "rmw_fastrtps_shared_cpp/qos_converter.hpp"
#include "rmw_fastrtps_shared_cpp/qos.hpp"

#include "fastrtps/attributes/PublisherAttributes.h"
Expand Down
7 changes: 7 additions & 0 deletions rmw_fastrtps_shared_cpp/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
find_package(ament_cmake_gtest REQUIRED)

ament_add_gtest(test_dds_attributes_to_rmw_qos test_dds_attributes_to_rmw_qos.cpp)
if(TARGET test_dds_attributes_to_rmw_qos)
ament_target_dependencies(test_dds_attributes_to_rmw_qos)
target_link_libraries(test_dds_attributes_to_rmw_qos ${PROJECT_NAME})
endif()

ament_add_gtest(test_topic_cache test_topic_cache.cpp)
if(TARGET test_topic_cache)
ament_target_dependencies(test_topic_cache)
target_link_libraries(test_topic_cache ${PROJECT_NAME})
endif()
Loading