Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20120] TCPSendResources cleanup #4300

Merged
merged 27 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7d5b079
Refs #20120: Remove unused include
jepemi Jan 25, 2024
58f6308
Refs #20120: TCP event call
jepemi Jan 25, 2024
4c44fe4
Refs #20120: Sanitize transport
jepemi Jan 25, 2024
21b3037
Refs #20120: Added tests and minor fixes
jepemi Jan 26, 2024
493f3a6
Refs #20120: Extended doxygen description and added to versions.md
jepemi Jan 26, 2024
89a0fc2
Refs #20120: Uncrustify
jepemi Jan 26, 2024
e3df98c
Refs #20120: Add missing header
jepemi Jan 29, 2024
9ea1d8f
Refs #20120: Fix tests
jepemi Feb 1, 2024
d32ceff
Refs #20120: Uncrustify
jepemi Feb 1, 2024
6cd60e5
Refs #20120: After client-server decision making rebase, not working
jepemi Feb 9, 2024
24f32f6
Refs #20120: Update
jepemi Feb 15, 2024
c9d5caf
Refs #20120: Fix for chaining-transports
jepemi Feb 16, 2024
26130ef
Refs #20120: Add new channel connection status and tests
jepemi Feb 19, 2024
d59be2b
Refs #20120: PR refactor, timed event deleted. cleanup on pdp unbinding
jepemi Feb 20, 2024
6169f41
Refs #20120: Uncrustify
jepemi Feb 21, 2024
8abe004
Refs #20120: Add unittests
jepemi Feb 21, 2024
b6b4cbd
Refs #20120: Fix deadlock
jepemi Feb 22, 2024
f54ea87
Refs #20120: Fix unittest
jepemi Feb 22, 2024
099384b
Refs #20120: Fix asio throwing exceptions
jepemi Feb 22, 2024
deeca26
Refs #20120: Unnittest untab
jepemi Feb 22, 2024
4c6cde2
Refs #20120: Apply suggestions
jepemi Mar 1, 2024
5216d94
Refs #20120: Uncrustify
jepemi Mar 1, 2024
be4991d
Refs #20120: Consider wan case + associated tests
jepemi Mar 1, 2024
04e9ee5
Refs #20120: Remove versions.md update
jepemi Mar 1, 2024
98d0d36
Refs #20120: Fix rebasing wrong deletion
jepemi Mar 4, 2024
8e19c6e
Refs #20120: Delete assert clause
jepemi Mar 4, 2024
b479be8
Refs #20120: Apply suggestions
EduPonz Mar 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
#include <fastrtps/types/TypeObjectFactory.h>
#include <fastrtps/types/DynamicPubSubType.h>

#include <fastdds/rtps/common/LocatorList.hpp>

#include <fastrtps/utils/TimeConversion.h>
#include <fastrtps/utils/IPLocator.h>
#include "fastrtps/utils/shared_mutex.hpp"
Expand Down Expand Up @@ -1239,6 +1241,21 @@ bool PDP::remove_remote_participant(

this->mp_mutex->lock();

// Delete from sender resource list (TCP only)
LocatorList_t remote_participant_locators;
for (auto& remote_participant_default_locator : pdata->default_locators.unicast)
{
remote_participant_locators.push_back(remote_participant_default_locator);
}
for (auto& remote_participant_metatraffic_locator : pdata->metatraffic_locators.unicast)
{
remote_participant_locators.push_back(remote_participant_metatraffic_locator);
}
if (!remote_participant_locators.empty())
{
mp_RTPSParticipant->update_removed_participant(remote_participant_locators);
}

// Return reader proxy objects to pool
for (auto pit : *pdata->m_readers)
{
Expand Down Expand Up @@ -1266,6 +1283,7 @@ bool PDP::remove_remote_participant(
participant_proxies_pool_.push_back(pdata);

this->mp_mutex->unlock();

return true;
}

Expand Down
21 changes: 20 additions & 1 deletion src/cpp/rtps/network/NetworkFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
#include <utility>

#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/common/LocatorList.hpp>
#include <fastdds/rtps/participant/RTPSParticipant.h>
#include <fastdds/rtps/transport/TransportDescriptorInterface.h>
#include <fastrtps/utils/IPFinder.h>
#include <fastrtps/utils/IPLocator.h>

#include <rtps/transport/UDPv4Transport.h>
#include <rtps/transport/TCPTransportInterface.h>

using namespace std;
using namespace eprosima::fastdds::rtps;
Expand Down Expand Up @@ -471,6 +472,24 @@ void NetworkFactory::update_network_interfaces()
}
}

void NetworkFactory::remove_participant_associated_send_resources(
SendResourceList& send_resource_list,
const LocatorList_t& remote_participant_locators,
const LocatorList_t& participant_initial_peers) const
{
for (auto& transport : mRegisteredTransports)
{
TCPTransportInterface* tcp_transport = dynamic_cast<TCPTransportInterface*>(transport.get());
EduPonz marked this conversation as resolved.
Show resolved Hide resolved
if (tcp_transport)
{
tcp_transport->CloseOutputChannel(
send_resource_list,
remote_participant_locators,
participant_initial_peers);
}
}
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
13 changes: 13 additions & 0 deletions src/cpp/rtps/network/NetworkFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <memory>

#include <fastdds/rtps/common/Locator.h>
#include <fastdds/rtps/common/LocatorList.hpp>
#include <fastdds/rtps/common/LocatorSelector.hpp>
#include <fastdds/rtps/messages/MessageReceiver.h>
#include <fastdds/rtps/transport/SenderResource.h>
Expand Down Expand Up @@ -246,6 +247,18 @@ class NetworkFactory
*/
void update_network_interfaces();

/**
* Remove the given participants from the send resource list
*
* @param send_resource_list List of send resources associated to the local participant.
* @param remote_participant_locators List of locators associated to the remote participant.
* @param participant_initial_peers List of locators of the initial peers of the local participant.
*/
void remove_participant_associated_send_resources(
fastdds::rtps::SendResourceList& send_resource_list,
const LocatorList_t& remote_participant_locators,
const LocatorList_t& participant_initial_peers) const;

private:

std::vector<std::unique_ptr<fastdds::rtps::TransportInterface>> mRegisteredTransports;
Expand Down
15 changes: 15 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
#include <fastdds/rtps/writer/StatelessPersistentWriter.h>
#include <fastdds/rtps/writer/StatefulPersistentWriter.h>

#include <fastdds/rtps/common/LocatorList.hpp>

#include <fastrtps/utils/IPFinder.h>
#include <fastrtps/utils/Semaphore.h>
#include <fastrtps/xmlparser/XMLProfileManager.h>
Expand Down Expand Up @@ -2982,6 +2984,19 @@ bool RTPSParticipantImpl::should_match_local_endpoints(
return should_match_local_endpoints;
}

void RTPSParticipantImpl::update_removed_participant(
const LocatorList_t& remote_participant_locators)
{
if (!remote_participant_locators.empty())
{
std::lock_guard<std::timed_mutex> guard(m_send_resources_mutex_);
m_network_Factory.remove_participant_associated_send_resources(
send_resource_list_,
remote_participant_locators,
m_att.builtin.initialPeersList);
}
}

} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */
9 changes: 9 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <fastdds/rtps/builtin/data/ReaderProxyData.h>
#include <fastdds/rtps/builtin/data/WriterProxyData.h>
#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/common/LocatorList.hpp>
#include <fastdds/rtps/history/IChangePool.h>
#include <fastdds/rtps/history/IPayloadPool.h>
#include <fastdds/rtps/messages/MessageReceiver.h>
Expand Down Expand Up @@ -1266,6 +1267,14 @@ class RTPSParticipantImpl
return match_local_endpoints_;
}

/**
* Method called on participant removal with the set of locators associated to the participant.
*
* @param remote_participant_locators Set of locators associated to the participant removed.
*/
void update_removed_participant(
const LocatorList_t& remote_participant_locators);

};
} // namespace rtps
} /* namespace rtps */
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/transport/TCPSenderResource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource
// Implementation functions are bound to the right transport parameters
clean_up = [this, &transport]()
{
transport.CloseOutputChannel(locator_);
transport.SenderResourceHasBeenClosed(locator_);
};

send_lambda_ = [this, &transport](
Expand Down Expand Up @@ -68,7 +68,7 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource
}

static TCPSenderResource* cast(
TransportInterface& transport,
const TransportInterface& transport,
SenderResource* sender_resource)
{
TCPSenderResource* returned_resource = nullptr;
Expand Down
100 changes: 88 additions & 12 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <chrono>
#include <cstring>
#include <map>
#include <set>
#include <memory>
#include <mutex>
#include <string>
Expand Down Expand Up @@ -276,6 +277,19 @@ Locator TCPTransportInterface::remote_endpoint_to_locator(
return locator;
}

Locator TCPTransportInterface::local_endpoint_to_locator(
const std::shared_ptr<TCPChannelResource>& channel) const
{
Locator locator;
asio::error_code ec;
endpoint_to_locator(channel->local_endpoint(ec), locator);
if (ec)
{
LOCATOR_INVALID(locator);
}
return locator;
}

void TCPTransportInterface::bind_socket(
std::shared_ptr<TCPChannelResource>& channel)
{
Expand Down Expand Up @@ -634,11 +648,24 @@ bool TCPTransportInterface::transform_remote_locator(
return false;
}

void TCPTransportInterface::CloseOutputChannel(
void TCPTransportInterface::SenderResourceHasBeenClosed(
fastrtps::rtps::Locator_t& locator)
{
locator.set_Invalid_Address();
locator.port = 0;
// The TCPSendResource associated channel cannot be removed from the channel_resources_ map. On transport's destruction
// this map is consulted to send the unbind requests. If not sending it, the other participant wouldn't disconnect the
// socket and keep a connection status of eEstablished. This would prevent new connect calls since it thinks it's already
// connected.
// If moving this unbind send with the respective channel disconnection to this point, the following problem arises:
// If receiving a SenderResourceHasBeenClosed call after receiving an unbinding message from a remote participant (our participant
// isn't disconnecting but we want to erase this send resource), the channel cannot be disconnected here since the listening thread has
// taken the read mutex (permanently waiting at read asio layer). This mutex is also needed to disconnect the socket (deadlock).
// Socket disconnection should always be done in the listening thread (or in the transport cleanup, when receiver resources have
// already been destroyed and the listening thread had consequently finished).
EduPonz marked this conversation as resolved.
Show resolved Hide resolved
// An assert() clause finding the respective channel resource cannot be made since in LARGE DATA scenario, where the PDP discovery is done
// via UDP, a server's send resource can be created with without any associated channel resource until receiving a connection request from
// the client.
// The send resource locator is invalidated to prevent further use of associated channel.
LOCATOR_INVALID(locator);
}

bool TCPTransportInterface::CloseInputChannel(
Expand Down Expand Up @@ -1187,7 +1214,6 @@ bool TCPTransportInterface::Receive(
{
std::shared_ptr<RTCPMessageManager> rtcp_message_manager;
if (TCPChannelResource::eConnectionStatus::eDisconnected != channel->connection_status())

{
std::unique_lock<std::mutex> lock(rtcp_message_manager_mutex_);
rtcp_message_manager = rtcp_manager.lock();
Expand Down Expand Up @@ -1436,10 +1462,8 @@ void TCPTransportInterface::SocketAccepted(
create_listening_thread(channel);

EPROSIMA_LOG_INFO(RTCP, "Accepted connection (local: "
<< channel->local_endpoint().address() << ":"
<< channel->local_endpoint().port() << "), remote: "
<< channel->remote_endpoint().address() << ":"
<< channel->remote_endpoint().port() << ")");
<< local_endpoint_to_locator(channel) << ", remote: "
<< remote_endpoint_to_locator(channel) << ")");
}
else
{
Expand Down Expand Up @@ -1481,10 +1505,8 @@ void TCPTransportInterface::SecureSocketAccepted(
create_listening_thread(secure_channel);

EPROSIMA_LOG_INFO(RTCP, " Accepted connection (local: "
<< socket->lowest_layer().local_endpoint().address() << ":"
<< socket->lowest_layer().local_endpoint().port() << "), remote: "
<< socket->lowest_layer().remote_endpoint().address() << ":"
<< socket->lowest_layer().remote_endpoint().port() << ")");
<< local_endpoint_to_locator(secure_channel) << ", remote: "
<< remote_endpoint_to_locator(secure_channel) << ")");
}
else
{
Expand Down Expand Up @@ -1837,6 +1859,60 @@ void TCPTransportInterface::fill_local_physical_port(
}
}

void TCPTransportInterface::CloseOutputChannel(
SendResourceList& send_resource_list,
const LocatorList& remote_participant_locators,
const LocatorList& participant_initial_peers) const
{
// Since send resources handle physical locators, we need to convert the remote participant locators to physical
std::set<Locator> remote_participant_physical_locators;
for (const Locator& remote_participant_locator : remote_participant_locators)
{
remote_participant_physical_locators.insert(IPLocator::toPhysicalLocator(remote_participant_locator));

// Also add the WANtoLANLocator ([0][WAN] address) if the remote locator is a WAN locator. In WAN scenario,
//initial peer can also work with the WANtoLANLocator of the remote participant.
if (IPLocator::hasWan(remote_participant_locator))
{
remote_participant_physical_locators.insert(IPLocator::toPhysicalLocator(IPLocator::WanToLanLocator(
remote_participant_locator)));
}
}

// Exlude initial peers.
for (const auto& initial_peer : participant_initial_peers)
{
if (std::find(remote_participant_physical_locators.begin(), remote_participant_physical_locators.end(),
IPLocator::toPhysicalLocator(initial_peer)) != remote_participant_physical_locators.end())
{
remote_participant_physical_locators.erase(IPLocator::toPhysicalLocator(initial_peer));
}
}

for (const auto& remote_participant_physical_locator : remote_participant_physical_locators)
{
if (!IsLocatorSupported(remote_participant_physical_locator))
{
continue;
}
// Remove send resources for the associated remote participant locator
for (auto it = send_resource_list.begin(); it != send_resource_list.end();)
{
TCPSenderResource* tcp_sender_resource = TCPSenderResource::cast(*this, it->get());

if (tcp_sender_resource)
{
if (tcp_sender_resource->locator() == remote_participant_physical_locator)
{
it = send_resource_list.erase(it);
continue;
}
}
++it;
}
}
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
20 changes: 19 additions & 1 deletion src/cpp/rtps/transport/TCPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ class TCPTransportInterface : public TransportInterface
Locator remote_endpoint_to_locator(
const std::shared_ptr<TCPChannelResource>& channel) const;

/**
* Converts a local endpoint to a locator if possible. Otherwise, it sets an invalid locator.
*/
Locator local_endpoint_to_locator(
const std::shared_ptr<TCPChannelResource>& channel) const;

/**
* Shutdown method to close the connections of the transports.
*/
Expand Down Expand Up @@ -228,7 +234,7 @@ class TCPTransportInterface : public TransportInterface
const Locator&) override;

//! Resets the locator bound to the sender resource.
void CloseOutputChannel(
void SenderResourceHasBeenClosed(
fastrtps::rtps::Locator_t& locator);

//! Reports whether Locators correspond to the same port.
Expand Down Expand Up @@ -460,6 +466,18 @@ class TCPTransportInterface : public TransportInterface
void fill_local_physical_port(
Locator& locator) const;

/**
* Close the output channel associated to the given remote participant but if its locators belong to the
* given list of initial peers.
*
* @param send_resource_list List of send resources associated to the local participant.
* @param remote_participant_locators Set of locators associated to the remote participant.
* @param participant_initial_peers List of locators associated to the initial peers of the local participant.
*/
void CloseOutputChannel(
SendResourceList& send_resource_list,
const LocatorList& remote_participant_locators,
const LocatorList& participant_initial_peers) const;
};

} // namespace rtps
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/transport/UDPSenderResource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class UDPSenderResource : public fastrtps::rtps::SenderResource
// Implementation functions are bound to the right transport parameters
clean_up = [this, &transport]()
{
transport.CloseOutputChannel(socket_);
transport.SenderResourceHasBeenClosed(socket_);
};

send_lambda_ = [this, &transport](
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/transport/UDPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ bool UDPTransportInterface::CloseInputChannel(
return true;
}

void UDPTransportInterface::CloseOutputChannel(
void UDPTransportInterface::SenderResourceHasBeenClosed(
eProsimaUDPSocket& socket)
{
socket.cancel();
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/transport/UDPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class UDPTransportInterface : public TransportInterface
const Locator&) override;

//! Removes all outbound sockets on the given port.
void CloseOutputChannel(
void SenderResourceHasBeenClosed(
eProsimaUDPSocket& socket);

//! Reports whether Locators correspond to the same port.
Expand Down
Loading
Loading