From 2a888438e917a1512207166138462ef60e51cfa2 Mon Sep 17 00:00:00 2001 From: JLBuenoLopez-eProsima Date: Wed, 21 Oct 2020 12:08:45 +0200 Subject: [PATCH 1/4] Workaround when the client is gone before server sends response Signed-off-by: JLBuenoLopez-eProsima --- diff_eprosima.txt | 256 ++++++++++++++++++ rmw_fastrtps_cpp/src/rmw_service.cpp | 2 +- rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp | 2 +- .../custom_service_info.hpp | 155 ++++++++--- rmw_fastrtps_shared_cpp/src/rmw_request.cpp | 8 + rmw_fastrtps_shared_cpp/src/rmw_response.cpp | 7 +- 6 files changed, 381 insertions(+), 49 deletions(-) create mode 100644 diff_eprosima.txt diff --git a/diff_eprosima.txt b/diff_eprosima.txt new file mode 100644 index 000000000..a418adb3f --- /dev/null +++ b/diff_eprosima.txt @@ -0,0 +1,256 @@ +diff --git a/rmw_fastrtps_cpp/src/rmw_service.cpp b/rmw_fastrtps_cpp/src/rmw_service.cpp +index 7fd67d4..144b24f 100644 +--- a/rmw_fastrtps_cpp/src/rmw_service.cpp ++++ b/rmw_fastrtps_cpp/src/rmw_service.cpp +@@ -241,7 +241,7 @@ rmw_create_service( + delete info->pub_listener_; + } + }); +- info->pub_listener_ = new (std::nothrow) ServicePubListener(); ++ info->pub_listener_ = new (std::nothrow) ServicePubListener(info); + if (!info->pub_listener_) { + RMW_SET_ERROR_MSG("failed to create service response publisher listener"); + return nullptr; +diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp +index 20c2ef6..14923a2 100644 +--- a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp ++++ b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp +@@ -231,7 +231,7 @@ rmw_create_service( + RMW_SET_ERROR_MSG("failed to get datawriter qos"); + goto fail; + } +- info->pub_listener_ = new ServicePubListener(); ++ info->pub_listener_ = new ServicePubListener(info); + info->response_publisher_ = + Domain::createPublisher(participant, publisherParam, info->pub_listener_); + if (!info->response_publisher_) { +diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp +index e2e7ff8..d85f0a6 100644 +--- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp ++++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp +@@ -20,6 +20,7 @@ + #include + #include + #include ++#include + + #include "fastcdr/FastBuffer.h" + +@@ -38,6 +39,14 @@ + class ServiceListener; + class ServicePubListener; + ++enum class client_present_t ++{ ++ FAILURE, // an error occurred when checking ++ MAYBE, // reader not matched, writer still present ++ YES, // reader matched ++ GONE // neither reader nor writer ++}; ++ + typedef struct CustomServiceInfo + { + rmw_fastrtps_shared_cpp::TypeSupport * request_type_support_{nullptr}; +@@ -62,6 +71,92 @@ typedef struct CustomServiceRequest + : buffer_(nullptr) {} + } CustomServiceRequest; + ++class ServicePubListener : public eprosima::fastrtps::PublisherListener ++{ ++ using subscriptions_set_t = ++ std::unordered_set; ++ using clients_endpoints_map_t = ++ std::unordered_map; ++ ++public: ++ explicit ServicePubListener(CustomServiceInfo * info) ++ : info_(info) ++ { ++ (void)info_; ++ } ++ ++ void ++ onPublicationMatched( ++ eprosima::fastrtps::Publisher * pub, ++ eprosima::fastrtps::rtps::MatchingInfo & matchingInfo) ++ { ++ (void) pub; ++ std::lock_guard lock(mutex_); ++ if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) { ++ subscriptions_.insert(matchingInfo.remoteEndpointGuid); ++ } else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) { ++ subscriptions_.erase(matchingInfo.remoteEndpointGuid); ++ auto endpoint = clients_endpoints_.find(matchingInfo.remoteEndpointGuid); ++ if (endpoint != clients_endpoints_.end()) { ++ clients_endpoints_.erase(endpoint->second); ++ clients_endpoints_.erase(matchingInfo.remoteEndpointGuid); ++ } ++ } else { ++ return; ++ } ++ cv_.notify_all(); ++ } ++ ++ template ++ bool ++ wait_for_subscription( ++ const eprosima::fastrtps::rtps::GUID_t & guid, ++ const std::chrono::duration & rel_time) ++ { ++ auto guid_is_present = [this, guid]() RCPPUTILS_TSA_REQUIRES(mutex_)->bool ++ { ++ return subscriptions_.find(guid) != subscriptions_.end(); ++ }; ++ ++ std::unique_lock lock(mutex_); ++ return cv_.wait_for(lock, rel_time, guid_is_present); ++ } ++ ++ client_present_t ++ check_for_subscription( ++ const eprosima::fastrtps::rtps::GUID_t & guid) ++ { ++ // Check if the guid is still in the map ++ if (clients_endpoints_.find(guid) != clients_endpoints_.end()) { ++ // Wait for subscription ++ if (!wait_for_subscription(guid, std::chrono::milliseconds(100))) { ++ return client_present_t::MAYBE; ++ } ++ } else { ++ // Client has gone ++ return client_present_t::GONE; ++ } ++ return client_present_t::YES; ++ } ++ ++ // Accesors ++ clients_endpoints_map_t & clients_endpoints() ++ { ++ std::lock_guard lock(mutex_); ++ return clients_endpoints_; ++ } ++ ++private: ++ CustomServiceInfo * info_; ++ std::mutex mutex_; ++ subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); ++ clients_endpoints_map_t clients_endpoints_ RCPPUTILS_TSA_GUARDED_BY(mutex_); ++ std::condition_variable cv_; ++}; ++ + class ServiceListener : public eprosima::fastrtps::SubscriberListener + { + public: +@@ -72,6 +167,21 @@ public: + (void)info_; + } + ++ void ++ onSubscriptionMatched( ++ eprosima::fastrtps::Subscriber * sub, ++ eprosima::fastrtps::rtps::MatchingInfo & matchingInfo) ++ { ++ (void) sub; ++ if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) { ++ auto endpoint = info_->pub_listener_->clients_endpoints().find( ++ matchingInfo.remoteEndpointGuid); ++ if (endpoint != info_->pub_listener_->clients_endpoints().end()) { ++ info_->pub_listener_->clients_endpoints().erase(endpoint->second); ++ info_->pub_listener_->clients_endpoints().erase(matchingInfo.remoteEndpointGuid); ++ } ++ } ++ } + + void + onNewDataMessage(eprosima::fastrtps::Subscriber * sub) +@@ -169,49 +279,4 @@ private: + std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + }; + +-class ServicePubListener : public eprosima::fastrtps::PublisherListener +-{ +-public: +- ServicePubListener() = default; +- +- template +- bool wait_for_subscription( +- const eprosima::fastrtps::rtps::GUID_t & guid, +- const std::chrono::duration & rel_time) +- { +- auto guid_is_present = [this, guid]() RCPPUTILS_TSA_REQUIRES(mutex_)->bool +- { +- return subscriptions_.find(guid) != subscriptions_.end(); +- }; +- +- std::unique_lock lock(mutex_); +- return cv_.wait_for(lock, rel_time, guid_is_present); +- } +- +- void onPublicationMatched( +- eprosima::fastrtps::Publisher * pub, +- eprosima::fastrtps::rtps::MatchingInfo & matchingInfo) +- { +- (void) pub; +- std::lock_guard lock(mutex_); +- if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) { +- subscriptions_.insert(matchingInfo.remoteEndpointGuid); +- } else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) { +- subscriptions_.erase(matchingInfo.remoteEndpointGuid); +- } else { +- return; +- } +- cv_.notify_all(); +- } +- +-private: +- using subscriptions_set_t = +- std::unordered_set; +- +- std::mutex mutex_; +- subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); +- std::condition_variable cv_; +-}; +- + #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_ +diff --git a/rmw_fastrtps_shared_cpp/src/rmw_request.cpp b/rmw_fastrtps_shared_cpp/src/rmw_request.cpp +index 2c1ef8a..1516acc 100644 +--- a/rmw_fastrtps_shared_cpp/src/rmw_request.cpp ++++ b/rmw_fastrtps_shared_cpp/src/rmw_request.cpp +@@ -112,6 +112,14 @@ __rmw_take_request( + delete request.buffer_; + + *taken = true; ++ ++ // Save both endpoint GUIDs in the clients_endpoints map ++ ServicePubListener * listener = info->pub_listener_; ++ eprosima::fastrtps::rtps::GUID_t related_guid = request.sample_identity_.writer_guid(); ++ eprosima::fastrtps::rtps::GUID_t writer_guid = ++ request.sample_info_.sample_identity.writer_guid(); ++ listener->clients_endpoints().emplace(related_guid, writer_guid); ++ listener->clients_endpoints().emplace(writer_guid, related_guid); + } + + return RMW_RET_OK; +diff --git a/rmw_fastrtps_shared_cpp/src/rmw_response.cpp b/rmw_fastrtps_shared_cpp/src/rmw_response.cpp +index 4ee96af..cb36b27 100644 +--- a/rmw_fastrtps_shared_cpp/src/rmw_response.cpp ++++ b/rmw_fastrtps_shared_cpp/src/rmw_response.cpp +@@ -119,9 +119,12 @@ __rmw_send_response( + // Related guid is a reader, so it is the response subscription guid. + // Wait for the response writer to be matched with it. + auto listener = info->pub_listener_; +- if (!listener->wait_for_subscription(related_guid, std::chrono::milliseconds(100))) { ++ client_present_t ret = listener->check_for_subscription(related_guid); ++ if (ret == client_present_t::GONE) { ++ return RMW_RET_OK; ++ } else if (ret == client_present_t::MAYBE) { + RMW_SET_ERROR_MSG("client will not receive response"); +- return RMW_RET_ERROR; ++ return RMW_RET_TIMEOUT; + } + } + diff --git a/rmw_fastrtps_cpp/src/rmw_service.cpp b/rmw_fastrtps_cpp/src/rmw_service.cpp index 7fd67d478..144b24f04 100644 --- a/rmw_fastrtps_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_cpp/src/rmw_service.cpp @@ -241,7 +241,7 @@ rmw_create_service( delete info->pub_listener_; } }); - info->pub_listener_ = new (std::nothrow) ServicePubListener(); + info->pub_listener_ = new (std::nothrow) ServicePubListener(info); if (!info->pub_listener_) { RMW_SET_ERROR_MSG("failed to create service response publisher listener"); return nullptr; diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp index 20c2ef6e8..14923a29c 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp @@ -231,7 +231,7 @@ rmw_create_service( RMW_SET_ERROR_MSG("failed to get datawriter qos"); goto fail; } - info->pub_listener_ = new ServicePubListener(); + info->pub_listener_ = new ServicePubListener(info); info->response_publisher_ = Domain::createPublisher(participant, publisherParam, info->pub_listener_); if (!info->response_publisher_) { diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp index e2e7ff86a..d85f0a64e 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "fastcdr/FastBuffer.h" @@ -38,6 +39,14 @@ class ServiceListener; class ServicePubListener; +enum class client_present_t +{ + FAILURE, // an error occurred when checking + MAYBE, // reader not matched, writer still present + YES, // reader matched + GONE // neither reader nor writer +}; + typedef struct CustomServiceInfo { rmw_fastrtps_shared_cpp::TypeSupport * request_type_support_{nullptr}; @@ -62,6 +71,92 @@ typedef struct CustomServiceRequest : buffer_(nullptr) {} } CustomServiceRequest; +class ServicePubListener : public eprosima::fastrtps::PublisherListener +{ + using subscriptions_set_t = + std::unordered_set; + using clients_endpoints_map_t = + std::unordered_map; + +public: + explicit ServicePubListener(CustomServiceInfo * info) + : info_(info) + { + (void)info_; + } + + void + onPublicationMatched( + eprosima::fastrtps::Publisher * pub, + eprosima::fastrtps::rtps::MatchingInfo & matchingInfo) + { + (void) pub; + std::lock_guard lock(mutex_); + if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) { + subscriptions_.insert(matchingInfo.remoteEndpointGuid); + } else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) { + subscriptions_.erase(matchingInfo.remoteEndpointGuid); + auto endpoint = clients_endpoints_.find(matchingInfo.remoteEndpointGuid); + if (endpoint != clients_endpoints_.end()) { + clients_endpoints_.erase(endpoint->second); + clients_endpoints_.erase(matchingInfo.remoteEndpointGuid); + } + } else { + return; + } + cv_.notify_all(); + } + + template + bool + wait_for_subscription( + const eprosima::fastrtps::rtps::GUID_t & guid, + const std::chrono::duration & rel_time) + { + auto guid_is_present = [this, guid]() RCPPUTILS_TSA_REQUIRES(mutex_)->bool + { + return subscriptions_.find(guid) != subscriptions_.end(); + }; + + std::unique_lock lock(mutex_); + return cv_.wait_for(lock, rel_time, guid_is_present); + } + + client_present_t + check_for_subscription( + const eprosima::fastrtps::rtps::GUID_t & guid) + { + // Check if the guid is still in the map + if (clients_endpoints_.find(guid) != clients_endpoints_.end()) { + // Wait for subscription + if (!wait_for_subscription(guid, std::chrono::milliseconds(100))) { + return client_present_t::MAYBE; + } + } else { + // Client has gone + return client_present_t::GONE; + } + return client_present_t::YES; + } + + // Accesors + clients_endpoints_map_t & clients_endpoints() + { + std::lock_guard lock(mutex_); + return clients_endpoints_; + } + +private: + CustomServiceInfo * info_; + std::mutex mutex_; + subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + clients_endpoints_map_t clients_endpoints_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + std::condition_variable cv_; +}; + class ServiceListener : public eprosima::fastrtps::SubscriberListener { public: @@ -72,6 +167,21 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener (void)info_; } + void + onSubscriptionMatched( + eprosima::fastrtps::Subscriber * sub, + eprosima::fastrtps::rtps::MatchingInfo & matchingInfo) + { + (void) sub; + if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) { + auto endpoint = info_->pub_listener_->clients_endpoints().find( + matchingInfo.remoteEndpointGuid); + if (endpoint != info_->pub_listener_->clients_endpoints().end()) { + info_->pub_listener_->clients_endpoints().erase(endpoint->second); + info_->pub_listener_->clients_endpoints().erase(matchingInfo.remoteEndpointGuid); + } + } + } void onNewDataMessage(eprosima::fastrtps::Subscriber * sub) @@ -169,49 +279,4 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); }; -class ServicePubListener : public eprosima::fastrtps::PublisherListener -{ -public: - ServicePubListener() = default; - - template - bool wait_for_subscription( - const eprosima::fastrtps::rtps::GUID_t & guid, - const std::chrono::duration & rel_time) - { - auto guid_is_present = [this, guid]() RCPPUTILS_TSA_REQUIRES(mutex_)->bool - { - return subscriptions_.find(guid) != subscriptions_.end(); - }; - - std::unique_lock lock(mutex_); - return cv_.wait_for(lock, rel_time, guid_is_present); - } - - void onPublicationMatched( - eprosima::fastrtps::Publisher * pub, - eprosima::fastrtps::rtps::MatchingInfo & matchingInfo) - { - (void) pub; - std::lock_guard lock(mutex_); - if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) { - subscriptions_.insert(matchingInfo.remoteEndpointGuid); - } else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) { - subscriptions_.erase(matchingInfo.remoteEndpointGuid); - } else { - return; - } - cv_.notify_all(); - } - -private: - using subscriptions_set_t = - std::unordered_set; - - std::mutex mutex_; - subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); - std::condition_variable cv_; -}; - #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/rmw_request.cpp b/rmw_fastrtps_shared_cpp/src/rmw_request.cpp index 2c1ef8a64..1516accd5 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_request.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_request.cpp @@ -112,6 +112,14 @@ __rmw_take_request( delete request.buffer_; *taken = true; + + // Save both endpoint GUIDs in the clients_endpoints map + ServicePubListener * listener = info->pub_listener_; + eprosima::fastrtps::rtps::GUID_t related_guid = request.sample_identity_.writer_guid(); + eprosima::fastrtps::rtps::GUID_t writer_guid = + request.sample_info_.sample_identity.writer_guid(); + listener->clients_endpoints().emplace(related_guid, writer_guid); + listener->clients_endpoints().emplace(writer_guid, related_guid); } return RMW_RET_OK; diff --git a/rmw_fastrtps_shared_cpp/src/rmw_response.cpp b/rmw_fastrtps_shared_cpp/src/rmw_response.cpp index 4ee96af24..cb36b27a6 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_response.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_response.cpp @@ -119,9 +119,12 @@ __rmw_send_response( // Related guid is a reader, so it is the response subscription guid. // Wait for the response writer to be matched with it. auto listener = info->pub_listener_; - if (!listener->wait_for_subscription(related_guid, std::chrono::milliseconds(100))) { + client_present_t ret = listener->check_for_subscription(related_guid); + if (ret == client_present_t::GONE) { + return RMW_RET_OK; + } else if (ret == client_present_t::MAYBE) { RMW_SET_ERROR_MSG("client will not receive response"); - return RMW_RET_ERROR; + return RMW_RET_TIMEOUT; } } From 2ece75b793ebd33c84d0c3652d70464b84971c99 Mon Sep 17 00:00:00 2001 From: JLBuenoLopez-eProsima Date: Wed, 21 Oct 2020 12:39:09 +0200 Subject: [PATCH 2/4] Change add to the map to listener callback Signed-off-by: JLBuenoLopez-eProsima --- diff_eprosima.txt | 256 ------------------ .../custom_service_info.hpp | 6 + rmw_fastrtps_shared_cpp/src/rmw_request.cpp | 8 - 3 files changed, 6 insertions(+), 264 deletions(-) delete mode 100644 diff_eprosima.txt diff --git a/diff_eprosima.txt b/diff_eprosima.txt deleted file mode 100644 index a418adb3f..000000000 --- a/diff_eprosima.txt +++ /dev/null @@ -1,256 +0,0 @@ -diff --git a/rmw_fastrtps_cpp/src/rmw_service.cpp b/rmw_fastrtps_cpp/src/rmw_service.cpp -index 7fd67d4..144b24f 100644 ---- a/rmw_fastrtps_cpp/src/rmw_service.cpp -+++ b/rmw_fastrtps_cpp/src/rmw_service.cpp -@@ -241,7 +241,7 @@ rmw_create_service( - delete info->pub_listener_; - } - }); -- info->pub_listener_ = new (std::nothrow) ServicePubListener(); -+ info->pub_listener_ = new (std::nothrow) ServicePubListener(info); - if (!info->pub_listener_) { - RMW_SET_ERROR_MSG("failed to create service response publisher listener"); - return nullptr; -diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp -index 20c2ef6..14923a2 100644 ---- a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp -+++ b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp -@@ -231,7 +231,7 @@ rmw_create_service( - RMW_SET_ERROR_MSG("failed to get datawriter qos"); - goto fail; - } -- info->pub_listener_ = new ServicePubListener(); -+ info->pub_listener_ = new ServicePubListener(info); - info->response_publisher_ = - Domain::createPublisher(participant, publisherParam, info->pub_listener_); - if (!info->response_publisher_) { -diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp -index e2e7ff8..d85f0a6 100644 ---- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp -+++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp -@@ -20,6 +20,7 @@ - #include - #include - #include -+#include - - #include "fastcdr/FastBuffer.h" - -@@ -38,6 +39,14 @@ - class ServiceListener; - class ServicePubListener; - -+enum class client_present_t -+{ -+ FAILURE, // an error occurred when checking -+ MAYBE, // reader not matched, writer still present -+ YES, // reader matched -+ GONE // neither reader nor writer -+}; -+ - typedef struct CustomServiceInfo - { - rmw_fastrtps_shared_cpp::TypeSupport * request_type_support_{nullptr}; -@@ -62,6 +71,92 @@ typedef struct CustomServiceRequest - : buffer_(nullptr) {} - } CustomServiceRequest; - -+class ServicePubListener : public eprosima::fastrtps::PublisherListener -+{ -+ using subscriptions_set_t = -+ std::unordered_set; -+ using clients_endpoints_map_t = -+ std::unordered_map; -+ -+public: -+ explicit ServicePubListener(CustomServiceInfo * info) -+ : info_(info) -+ { -+ (void)info_; -+ } -+ -+ void -+ onPublicationMatched( -+ eprosima::fastrtps::Publisher * pub, -+ eprosima::fastrtps::rtps::MatchingInfo & matchingInfo) -+ { -+ (void) pub; -+ std::lock_guard lock(mutex_); -+ if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) { -+ subscriptions_.insert(matchingInfo.remoteEndpointGuid); -+ } else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) { -+ subscriptions_.erase(matchingInfo.remoteEndpointGuid); -+ auto endpoint = clients_endpoints_.find(matchingInfo.remoteEndpointGuid); -+ if (endpoint != clients_endpoints_.end()) { -+ clients_endpoints_.erase(endpoint->second); -+ clients_endpoints_.erase(matchingInfo.remoteEndpointGuid); -+ } -+ } else { -+ return; -+ } -+ cv_.notify_all(); -+ } -+ -+ template -+ bool -+ wait_for_subscription( -+ const eprosima::fastrtps::rtps::GUID_t & guid, -+ const std::chrono::duration & rel_time) -+ { -+ auto guid_is_present = [this, guid]() RCPPUTILS_TSA_REQUIRES(mutex_)->bool -+ { -+ return subscriptions_.find(guid) != subscriptions_.end(); -+ }; -+ -+ std::unique_lock lock(mutex_); -+ return cv_.wait_for(lock, rel_time, guid_is_present); -+ } -+ -+ client_present_t -+ check_for_subscription( -+ const eprosima::fastrtps::rtps::GUID_t & guid) -+ { -+ // Check if the guid is still in the map -+ if (clients_endpoints_.find(guid) != clients_endpoints_.end()) { -+ // Wait for subscription -+ if (!wait_for_subscription(guid, std::chrono::milliseconds(100))) { -+ return client_present_t::MAYBE; -+ } -+ } else { -+ // Client has gone -+ return client_present_t::GONE; -+ } -+ return client_present_t::YES; -+ } -+ -+ // Accesors -+ clients_endpoints_map_t & clients_endpoints() -+ { -+ std::lock_guard lock(mutex_); -+ return clients_endpoints_; -+ } -+ -+private: -+ CustomServiceInfo * info_; -+ std::mutex mutex_; -+ subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); -+ clients_endpoints_map_t clients_endpoints_ RCPPUTILS_TSA_GUARDED_BY(mutex_); -+ std::condition_variable cv_; -+}; -+ - class ServiceListener : public eprosima::fastrtps::SubscriberListener - { - public: -@@ -72,6 +167,21 @@ public: - (void)info_; - } - -+ void -+ onSubscriptionMatched( -+ eprosima::fastrtps::Subscriber * sub, -+ eprosima::fastrtps::rtps::MatchingInfo & matchingInfo) -+ { -+ (void) sub; -+ if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) { -+ auto endpoint = info_->pub_listener_->clients_endpoints().find( -+ matchingInfo.remoteEndpointGuid); -+ if (endpoint != info_->pub_listener_->clients_endpoints().end()) { -+ info_->pub_listener_->clients_endpoints().erase(endpoint->second); -+ info_->pub_listener_->clients_endpoints().erase(matchingInfo.remoteEndpointGuid); -+ } -+ } -+ } - - void - onNewDataMessage(eprosima::fastrtps::Subscriber * sub) -@@ -169,49 +279,4 @@ private: - std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); - }; - --class ServicePubListener : public eprosima::fastrtps::PublisherListener --{ --public: -- ServicePubListener() = default; -- -- template -- bool wait_for_subscription( -- const eprosima::fastrtps::rtps::GUID_t & guid, -- const std::chrono::duration & rel_time) -- { -- auto guid_is_present = [this, guid]() RCPPUTILS_TSA_REQUIRES(mutex_)->bool -- { -- return subscriptions_.find(guid) != subscriptions_.end(); -- }; -- -- std::unique_lock lock(mutex_); -- return cv_.wait_for(lock, rel_time, guid_is_present); -- } -- -- void onPublicationMatched( -- eprosima::fastrtps::Publisher * pub, -- eprosima::fastrtps::rtps::MatchingInfo & matchingInfo) -- { -- (void) pub; -- std::lock_guard lock(mutex_); -- if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) { -- subscriptions_.insert(matchingInfo.remoteEndpointGuid); -- } else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) { -- subscriptions_.erase(matchingInfo.remoteEndpointGuid); -- } else { -- return; -- } -- cv_.notify_all(); -- } -- --private: -- using subscriptions_set_t = -- std::unordered_set; -- -- std::mutex mutex_; -- subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); -- std::condition_variable cv_; --}; -- - #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_ -diff --git a/rmw_fastrtps_shared_cpp/src/rmw_request.cpp b/rmw_fastrtps_shared_cpp/src/rmw_request.cpp -index 2c1ef8a..1516acc 100644 ---- a/rmw_fastrtps_shared_cpp/src/rmw_request.cpp -+++ b/rmw_fastrtps_shared_cpp/src/rmw_request.cpp -@@ -112,6 +112,14 @@ __rmw_take_request( - delete request.buffer_; - - *taken = true; -+ -+ // Save both endpoint GUIDs in the clients_endpoints map -+ ServicePubListener * listener = info->pub_listener_; -+ eprosima::fastrtps::rtps::GUID_t related_guid = request.sample_identity_.writer_guid(); -+ eprosima::fastrtps::rtps::GUID_t writer_guid = -+ request.sample_info_.sample_identity.writer_guid(); -+ listener->clients_endpoints().emplace(related_guid, writer_guid); -+ listener->clients_endpoints().emplace(writer_guid, related_guid); - } - - return RMW_RET_OK; -diff --git a/rmw_fastrtps_shared_cpp/src/rmw_response.cpp b/rmw_fastrtps_shared_cpp/src/rmw_response.cpp -index 4ee96af..cb36b27 100644 ---- a/rmw_fastrtps_shared_cpp/src/rmw_response.cpp -+++ b/rmw_fastrtps_shared_cpp/src/rmw_response.cpp -@@ -119,9 +119,12 @@ __rmw_send_response( - // Related guid is a reader, so it is the response subscription guid. - // Wait for the response writer to be matched with it. - auto listener = info->pub_listener_; -- if (!listener->wait_for_subscription(related_guid, std::chrono::milliseconds(100))) { -+ client_present_t ret = listener->check_for_subscription(related_guid); -+ if (ret == client_present_t::GONE) { -+ return RMW_RET_OK; -+ } else if (ret == client_present_t::MAYBE) { - RMW_SET_ERROR_MSG("client will not receive response"); -- return RMW_RET_ERROR; -+ return RMW_RET_TIMEOUT; - } - } - diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp index d85f0a64e..edf8a0699 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp @@ -205,6 +205,12 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener request.sample_identity_.writer_guid() = reader_guid; } + // Save both guids in the clients_endpoints map + const eprosima::fastrtps::rtps::GUID_t & writer_guid = + request.sample_info_.sample_identity.writer_guid(); + info_->pub_listener_->clients_endpoints().emplace(reader_guid, writer_guid); + info_->pub_listener_->clients_endpoints().emplace(writer_guid, reader_guid); + std::lock_guard lock(internalMutex_); if (conditionMutex_ != nullptr) { diff --git a/rmw_fastrtps_shared_cpp/src/rmw_request.cpp b/rmw_fastrtps_shared_cpp/src/rmw_request.cpp index 1516accd5..2c1ef8a64 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_request.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_request.cpp @@ -112,14 +112,6 @@ __rmw_take_request( delete request.buffer_; *taken = true; - - // Save both endpoint GUIDs in the clients_endpoints map - ServicePubListener * listener = info->pub_listener_; - eprosima::fastrtps::rtps::GUID_t related_guid = request.sample_identity_.writer_guid(); - eprosima::fastrtps::rtps::GUID_t writer_guid = - request.sample_info_.sample_identity.writer_guid(); - listener->clients_endpoints().emplace(related_guid, writer_guid); - listener->clients_endpoints().emplace(writer_guid, related_guid); } return RMW_RET_OK; From 445a52647cf7e5acbc9285eb4b0660db795f73be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Bueno=20L=C3=B3pez?= <69244257+JLBuenoLopez-eProsima@users.noreply.github.com> Date: Thu, 22 Oct 2020 07:51:42 +0200 Subject: [PATCH 3/4] Apply suggestions from code review Co-authored-by: Michel Hidalgo Signed-off-by: JLBuenoLopez-eProsima --- .../rmw_fastrtps_shared_cpp/custom_service_info.hpp | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp index edf8a0699..d8315f2f9 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp @@ -130,14 +130,13 @@ class ServicePubListener : public eprosima::fastrtps::PublisherListener const eprosima::fastrtps::rtps::GUID_t & guid) { // Check if the guid is still in the map - if (clients_endpoints_.find(guid) != clients_endpoints_.end()) { - // Wait for subscription - if (!wait_for_subscription(guid, std::chrono::milliseconds(100))) { - return client_present_t::MAYBE; - } - } else { - // Client has gone + if (clients_endpoints_.find(guid) == clients_endpoints_.end()) { + // Client is gone return client_present_t::GONE; + } + // Wait for subscription + if (!wait_for_subscription(guid, std::chrono::milliseconds(100))) { + return client_present_t::MAYBE; } return client_present_t::YES; } From caee90dbc0e08de614a93b978e48d6331afde3c6 Mon Sep 17 00:00:00 2001 From: JLBuenoLopez-eProsima Date: Thu, 22 Oct 2020 11:13:46 +0200 Subject: [PATCH 4/4] Uncrustify Signed-off-by: JLBuenoLopez-eProsima --- .../include/rmw_fastrtps_shared_cpp/custom_service_info.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp index d8315f2f9..4e43cd28b 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp @@ -133,7 +133,7 @@ class ServicePubListener : public eprosima::fastrtps::PublisherListener if (clients_endpoints_.find(guid) == clients_endpoints_.end()) { // Client is gone return client_present_t::GONE; - } + } // Wait for subscription if (!wait_for_subscription(guid, std::chrono::milliseconds(100))) { return client_present_t::MAYBE;