Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discriminate when the Client has gone from when the Client has not completely matched #467

Merged
merged 4 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ rmw_create_service(
delete info->pub_listener_;
}
});
info->pub_listener_ = new (std::nothrow) ServicePubListener();
info->pub_listener_ = new (std::nothrow) ServicePubListener(info);
if (!info->pub_listener_) {
RMW_SET_ERROR_MSG("failed to create service response publisher listener");
return nullptr;
Expand Down
2 changes: 1 addition & 1 deletion rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ rmw_create_service(
RMW_SET_ERROR_MSG("failed to get datawriter qos");
goto fail;
}
info->pub_listener_ = new ServicePubListener();
info->pub_listener_ = new ServicePubListener(info);
info->response_publisher_ =
Domain::createPublisher(participant, publisherParam, info->pub_listener_);
if (!info->response_publisher_) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <list>
#include <mutex>
#include <unordered_set>
#include <unordered_map>

#include "fastcdr/FastBuffer.h"

Expand All @@ -38,6 +39,14 @@
class ServiceListener;
class ServicePubListener;

enum class client_present_t
{
FAILURE, // an error occurred when checking
MAYBE, // reader not matched, writer still present
YES, // reader matched
GONE // neither reader nor writer
};

typedef struct CustomServiceInfo
{
rmw_fastrtps_shared_cpp::TypeSupport * request_type_support_{nullptr};
Expand All @@ -62,6 +71,91 @@ typedef struct CustomServiceRequest
: buffer_(nullptr) {}
} CustomServiceRequest;

class ServicePubListener : public eprosima::fastrtps::PublisherListener
{
using subscriptions_set_t =
std::unordered_set<eprosima::fastrtps::rtps::GUID_t,
rmw_fastrtps_shared_cpp::hash_fastrtps_guid>;
using clients_endpoints_map_t =
std::unordered_map<eprosima::fastrtps::rtps::GUID_t,
eprosima::fastrtps::rtps::GUID_t,
rmw_fastrtps_shared_cpp::hash_fastrtps_guid>;

public:
explicit ServicePubListener(CustomServiceInfo * info)
: info_(info)
{
(void)info_;
}

void
onPublicationMatched(
eprosima::fastrtps::Publisher * pub,
eprosima::fastrtps::rtps::MatchingInfo & matchingInfo)
{
(void) pub;
std::lock_guard<std::mutex> lock(mutex_);
if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) {
subscriptions_.insert(matchingInfo.remoteEndpointGuid);
} else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) {
subscriptions_.erase(matchingInfo.remoteEndpointGuid);
auto endpoint = clients_endpoints_.find(matchingInfo.remoteEndpointGuid);
if (endpoint != clients_endpoints_.end()) {
clients_endpoints_.erase(endpoint->second);
clients_endpoints_.erase(matchingInfo.remoteEndpointGuid);
}
} else {
return;
}
cv_.notify_all();
}

template<class Rep, class Period>
bool
wait_for_subscription(
const eprosima::fastrtps::rtps::GUID_t & guid,
const std::chrono::duration<Rep, Period> & rel_time)
{
auto guid_is_present = [this, guid]() RCPPUTILS_TSA_REQUIRES(mutex_)->bool
{
return subscriptions_.find(guid) != subscriptions_.end();
};

std::unique_lock<std::mutex> lock(mutex_);
return cv_.wait_for(lock, rel_time, guid_is_present);
}

client_present_t
check_for_subscription(
const eprosima::fastrtps::rtps::GUID_t & guid)
{
// Check if the guid is still in the map
if (clients_endpoints_.find(guid) == clients_endpoints_.end()) {
// Client is gone
return client_present_t::GONE;
}
// Wait for subscription
if (!wait_for_subscription(guid, std::chrono::milliseconds(100))) {
return client_present_t::MAYBE;
}
return client_present_t::YES;
}

// Accesors
clients_endpoints_map_t & clients_endpoints()
{
std::lock_guard<std::mutex> lock(mutex_);
return clients_endpoints_;
}

private:
CustomServiceInfo * info_;
std::mutex mutex_;
subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
clients_endpoints_map_t clients_endpoints_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
std::condition_variable cv_;
};

class ServiceListener : public eprosima::fastrtps::SubscriberListener
{
public:
Expand All @@ -72,6 +166,21 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
(void)info_;
}

void
onSubscriptionMatched(
eprosima::fastrtps::Subscriber * sub,
eprosima::fastrtps::rtps::MatchingInfo & matchingInfo)
{
(void) sub;
if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) {
auto endpoint = info_->pub_listener_->clients_endpoints().find(
matchingInfo.remoteEndpointGuid);
if (endpoint != info_->pub_listener_->clients_endpoints().end()) {
info_->pub_listener_->clients_endpoints().erase(endpoint->second);
info_->pub_listener_->clients_endpoints().erase(matchingInfo.remoteEndpointGuid);
}
}
}

void
onNewDataMessage(eprosima::fastrtps::Subscriber * sub)
Expand All @@ -95,6 +204,12 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
request.sample_identity_.writer_guid() = reader_guid;
}

// Save both guids in the clients_endpoints map
const eprosima::fastrtps::rtps::GUID_t & writer_guid =
request.sample_info_.sample_identity.writer_guid();
info_->pub_listener_->clients_endpoints().emplace(reader_guid, writer_guid);
info_->pub_listener_->clients_endpoints().emplace(writer_guid, reader_guid);

std::lock_guard<std::mutex> lock(internalMutex_);

if (conditionMutex_ != nullptr) {
Expand Down Expand Up @@ -169,49 +284,4 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
};

class ServicePubListener : public eprosima::fastrtps::PublisherListener
{
public:
ServicePubListener() = default;

template<class Rep, class Period>
bool wait_for_subscription(
const eprosima::fastrtps::rtps::GUID_t & guid,
const std::chrono::duration<Rep, Period> & rel_time)
{
auto guid_is_present = [this, guid]() RCPPUTILS_TSA_REQUIRES(mutex_)->bool
{
return subscriptions_.find(guid) != subscriptions_.end();
};

std::unique_lock<std::mutex> lock(mutex_);
return cv_.wait_for(lock, rel_time, guid_is_present);
}

void onPublicationMatched(
eprosima::fastrtps::Publisher * pub,
eprosima::fastrtps::rtps::MatchingInfo & matchingInfo)
{
(void) pub;
std::lock_guard<std::mutex> lock(mutex_);
if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) {
subscriptions_.insert(matchingInfo.remoteEndpointGuid);
} else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) {
subscriptions_.erase(matchingInfo.remoteEndpointGuid);
} else {
return;
}
cv_.notify_all();
}

private:
using subscriptions_set_t =
std::unordered_set<eprosima::fastrtps::rtps::GUID_t,
rmw_fastrtps_shared_cpp::hash_fastrtps_guid>;

std::mutex mutex_;
subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
std::condition_variable cv_;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_
7 changes: 5 additions & 2 deletions rmw_fastrtps_shared_cpp/src/rmw_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,12 @@ __rmw_send_response(
// Related guid is a reader, so it is the response subscription guid.
// Wait for the response writer to be matched with it.
auto listener = info->pub_listener_;
if (!listener->wait_for_subscription(related_guid, std::chrono::milliseconds(100))) {
client_present_t ret = listener->check_for_subscription(related_guid);
if (ret == client_present_t::GONE) {
return RMW_RET_OK;
} else if (ret == client_present_t::MAYBE) {
RMW_SET_ERROR_MSG("client will not receive response");
return RMW_RET_ERROR;
return RMW_RET_TIMEOUT;
}
}

Expand Down