Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[5.0.3] P2P: Resolve on reconnect #2410

Draft
wants to merge 6 commits into
base: release/5.0
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 101 additions & 105 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,6 @@ namespace eosio {
struct connection_detail {
std::string host;
connection_ptr c;
tcp::endpoint active_ip;
tcp::resolver::results_type ips;
};

using connection_details_index = multi_index_container<
Expand Down Expand Up @@ -400,6 +398,8 @@ namespace eosio {
boost::asio::steady_timer::duration conn_period,
uint32_t maximum_client_count);

std::chrono::milliseconds get_heartbeat_timeout() const { return heartbeat_timeout; }

uint32_t get_max_client_count() const { return max_client_count; }

fc::microseconds get_connector_period() const;
Expand All @@ -417,8 +417,6 @@ namespace eosio {
void add(connection_ptr c);
string connect(const string& host, const string& p2p_address);
string resolve_and_connect(const string& host, const string& p2p_address);
void update_connection_endpoint(connection_ptr c, const tcp::endpoint& endpoint);
void connect(const connection_ptr& c);
string disconnect(const string& host);
void close_all();

Expand Down Expand Up @@ -998,7 +996,7 @@ namespace eosio {

bool populate_handshake( handshake_message& hello ) const;

bool reconnect();
bool resolve_and_connect();
void connect( const tcp::resolver::results_type& endpoints );
void start_read_message();

Expand Down Expand Up @@ -1178,16 +1176,21 @@ namespace eosio {



std::tuple<std::string, std::string, std::string> split_host_port_type(const std::string& peer_add) {
std::tuple<std::string, std::string, std::string> split_host_port_type(const std::string& peer_add, bool incoming) {
// host:port:[<trx>|<blk>]
if (peer_add.empty()) return {};

string::size_type p = peer_add[0] == '[' ? peer_add.find(']') : 0;
if (p == string::npos) {
fc_wlog( logger, "Invalid peer address: ${peer}", ("peer", peer_add) );
string::size_type colon = p != string::npos ? peer_add.find(':', p) : string::npos;
if (colon == std::string::npos || colon == 0) {
// if incoming then not an error this peer can do anything about
if (incoming) {
fc_dlog( logger, "Invalid peer address. must be \"host:port[:<blk>|<trx>]\": ${p}", ("p", peer_add) );
} else {
fc_elog( logger, "Invalid peer address. must be \"host:port[:<blk>|<trx>]\": ${p}", ("p", peer_add) );
}
return {};
}
string::size_type colon = peer_add.find(':', p);
string::size_type colon2 = peer_add.find(':', colon + 1);
string::size_type end = colon2 == string::npos
? string::npos : peer_add.find_first_of( " :+=.,<>!$%^&(*)|-#@\t", colon2 + 1 ); // future proof by including most symbols without using regex
Expand Down Expand Up @@ -1264,9 +1267,9 @@ namespace eosio {
last_handshake_sent(),
p2p_address( endpoint )
{
set_connection_type( peer_address() );
my_impl->mark_bp_connection(this);
update_endpoints();
fc_ilog( logger, "created connection ${c} to ${n}", ("c", connection_id)("n", endpoint) );
fc_ilog( logger, "created connection - ${c} to ${n}", ("c", connection_id)("n", endpoint) );
}

connection::connection(tcp::socket&& s, const string& listen_address, size_t block_sync_rate_limit)
Expand All @@ -1280,8 +1283,8 @@ namespace eosio {
last_handshake_recv(),
last_handshake_sent()
{
update_endpoints();
fc_dlog( logger, "new connection object created for peer ${address}:${port} from listener ${addr}", ("address", log_remote_endpoint_ip)("port", log_remote_endpoint_port)("addr", listen_address) );
fc_dlog( logger, "new connection - ${c} object created for peer ${address}:${port} from listener ${addr}",
("c", connection_id)("address", log_remote_endpoint_ip)("port", log_remote_endpoint_port)("addr", listen_address) );
}

void connection::update_endpoints(const tcp::endpoint& endpoint) {
Expand Down Expand Up @@ -1312,7 +1315,7 @@ namespace eosio {

// called from connection strand
void connection::set_connection_type( const std::string& peer_add ) {
auto [host, port, type] = split_host_port_type(peer_add);
auto [host, port, type] = split_host_port_type(peer_add, false);
if( type.empty() ) {
fc_dlog( logger, "Setting connection ${c} type for: ${peer} to both transactions and blocks", ("c", connection_id)("peer", peer_add) );
connection_type = both;
Expand Down Expand Up @@ -1370,6 +1373,7 @@ namespace eosio {
bool connection::start_session() {
verify_strand_in_this_thread( strand, __func__, __LINE__ );

update_endpoints();
boost::asio::ip::tcp::no_delay nodelay( true );
boost::system::error_code ec;
socket->set_option( nodelay, ec );
Expand Down Expand Up @@ -2753,31 +2757,6 @@ namespace eosio {

//------------------------------------------------------------------------

bool connection::reconnect() {
switch ( no_retry ) {
case no_reason:
case wrong_version:
case benign_other:
case duplicate: // attempt reconnect in case connection has been dropped, should quickly disconnect if duplicate
break;
default:
fc_dlog( logger, "Skipping connect due to go_away reason ${r}",("r", reason_str( no_retry )));
return false;
}
if( consecutive_immediate_connection_close > def_max_consecutive_immediate_connection_close || no_retry == benign_other ) {
fc::microseconds connector_period = my_impl->connections.get_connector_period();
fc::lock_guard g( conn_mtx );
if( last_close == fc::time_point() || last_close > fc::time_point::now() - connector_period ) {
return true; // true so doesn't remove from valid connections
}
}
connection_ptr c = shared_from_this();
strand.post([c]() {
my_impl->connections.connect(c);
});
return true;
}

// called from connection strand
void connection::connect( const tcp::resolver::results_type& endpoints ) {
set_state(connection_state::connecting);
Expand All @@ -2787,7 +2766,6 @@ namespace eosio {
boost::asio::bind_executor( strand,
[c = shared_from_this(), socket=socket]( const boost::system::error_code& err, const tcp::endpoint& endpoint ) {
if( !err && socket->is_open() && socket == c->socket ) {
my_impl->connections.update_connection_endpoint(c, endpoint);
c->update_endpoints(endpoint);
if( c->start_session() ) {
c->send_handshake();
Expand Down Expand Up @@ -2835,7 +2813,7 @@ namespace eosio {
fc_ilog(logger, "Accepted new connection: " + paddr_str);

connections.any_of_supplied_peers([&listen_address, &paddr_str, &limit](const string& peer_addr) {
auto [host, port, type] = split_host_port_type(peer_addr);
auto [host, port, type] = split_host_port_type(peer_addr, false);
if (host == paddr_str) {
if (limit > 0) {
fc_dlog(logger, "Connection inbound to ${la} from ${a} is a configured p2p-peer-address and will not be throttled", ("la", listen_address)("a", paddr_str));
Expand Down Expand Up @@ -3302,9 +3280,9 @@ namespace eosio {
}

if( incoming() ) {
auto [host, port, type] = split_host_port_type(msg.p2p_address);
auto [host, port, type] = split_host_port_type(msg.p2p_address, true);
if (host.size())
set_connection_type( msg.p2p_address );
set_connection_type( msg.p2p_address);

peer_dlog( this, "checking for duplicate" );
auto is_duplicate = [&](const connection_ptr& check) {
Expand Down Expand Up @@ -4430,7 +4408,7 @@ namespace eosio {
//----------------------------------------------------------------------------

size_t connections_manager::number_connections() const {
std::lock_guard g(connections_mtx);
std::shared_lock g(connections_mtx);
return connections.size();
}

Expand Down Expand Up @@ -4459,8 +4437,9 @@ namespace eosio {
update_p2p_connection_metrics = std::move(fun);
}

// can be called from any thread
void connections_manager::connect_supplied_peers(const string& p2p_address) {
std::unique_lock g(connections_mtx);
std::shared_lock g(connections_mtx);
chain::flat_set<string> peers = supplied_peers;
g.unlock();
for (const auto& peer : peers) {
Expand All @@ -4470,12 +4449,9 @@ namespace eosio {

void connections_manager::add( connection_ptr c ) {
std::lock_guard g( connections_mtx );
boost::system::error_code ec;
auto endpoint = c->socket->remote_endpoint(ec);
connections.insert( connection_detail{
.host = c->peer_address(),
.c = std::move(c),
.active_ip = endpoint} );
.c = std::move(c)} );
}

// called by API
Expand All @@ -4487,62 +4463,72 @@ namespace eosio {
}

string connections_manager::resolve_and_connect( const string& peer_address, const string& listen_address ) {
string::size_type colon = peer_address.find(':');
if (colon == std::string::npos || colon == 0) {
fc_elog( logger, "Invalid peer address. must be \"host:port[:<blk>|<trx>]\": ${p}", ("p", peer_address) );
auto [host, port, type] = split_host_port_type(peer_address, false);
if (host.empty()) {
return "invalid peer address";
}

std::lock_guard g( connections_mtx );
if( find_connection_i( peer_address ) )
return "already connected";

auto [host, port, type] = split_host_port_type(peer_address);

auto resolver = std::make_shared<tcp::resolver>( my_impl->thread_pool.get_executor() );

resolver->async_resolve(host, port,
[resolver, host = host, port = port, peer_address = peer_address, listen_address = listen_address, this]( const boost::system::error_code& err, const tcp::resolver::results_type& results ) {
connection_ptr c = std::make_shared<connection>( peer_address, listen_address );
c->set_heartbeat_timeout( heartbeat_timeout );
std::lock_guard g( connections_mtx );
auto [it, inserted] = connections.emplace( connection_detail{
.host = peer_address,
.c = std::move(c),
.ips = results
});
if( !err ) {
it->c->connect( results );
} else {
fc_wlog( logger, "Unable to resolve ${host}:${port} ${error}",
("host", host)("port", port)( "error", err.message() ) );
it->c->set_state(connection::connection_state::closed);
++(it->c->consecutive_immediate_connection_close);
}
} );
{
std::shared_lock g( connections_mtx );
if( find_connection_i( peer_address ) )
return "already connected";
}

return "added connection";
}
connection_ptr c = std::make_shared<connection>( peer_address, listen_address );
if (c->resolve_and_connect()) {
add(std::move(c));

void connections_manager::update_connection_endpoint(connection_ptr c,
const tcp::endpoint& endpoint) {
std::unique_lock g( connections_mtx );
auto& index = connections.get<by_connection>();
const auto& it = index.find(c);
if( it != index.end() ) {
index.modify(it, [endpoint](connection_detail& cd) {
cd.active_ip = endpoint;
});
return "added connection";
}

return "connection failed";
}

void connections_manager::connect(const connection_ptr& c) {
std::lock_guard g( connections_mtx );
const auto& index = connections.get<by_connection>();
const auto& it = index.find(c);
if( it != index.end() ) {
it->c->connect( it->ips );
// called from any thread
bool connection::resolve_and_connect() {
switch ( no_retry ) {
case no_reason:
case wrong_version:
case benign_other:
case duplicate: // attempt reconnect in case connection has been dropped, should quickly disconnect if duplicate
break;
default:
fc_dlog( logger, "Skipping connect due to go_away reason ${r}",("r", reason_str( no_retry )));
return false;
}

auto [host, port, type] = split_host_port_type(peer_address(), false);
if (host.empty())
return false;

connection_ptr c = shared_from_this();

if( consecutive_immediate_connection_close > def_max_consecutive_immediate_connection_close || no_retry == benign_other ) {
fc::microseconds connector_period = my_impl->connections.get_connector_period();
fc::lock_guard g( conn_mtx );
if( last_close == fc::time_point() || last_close > fc::time_point::now() - connector_period ) {
return true; // true so doesn't remove from valid connections
}
}

strand.post([c, host, port]() {
auto resolver = std::make_shared<tcp::resolver>( my_impl->thread_pool.get_executor() );
resolver->async_resolve(host, port,
[resolver, c, host, port]
( const boost::system::error_code& err, const tcp::resolver::results_type& results ) {
c->set_heartbeat_timeout( my_impl->connections.get_heartbeat_timeout() );
if( !err ) {
c->connect( results );
} else {
fc_wlog( logger, "Unable to resolve ${host}:${port} ${error}",
("host", host)("port", port)( "error", err.message() ) );
c->set_state(connection::connection_state::closed);
++c->consecutive_immediate_connection_close;
}
} );
} );

return true;
}

// called by API
Expand Down Expand Up @@ -4571,21 +4557,31 @@ namespace eosio {
}

std::optional<connection_status> connections_manager::status( const string& host )const {
std::shared_lock g( connections_mtx );
auto con = find_connection_i( host );
connection_ptr con;
{
std::shared_lock g( connections_mtx );
con = find_connection_i( host );
}
if( con ) {
return con->get_status();
}
return {};
}

vector<connection_status> connections_manager::connection_statuses()const {
vector<connection_ptr> conns;
vector<connection_status> result;
std::shared_lock g( connections_mtx );
auto& index = connections.get<by_connection>();
result.reserve( index.size() );
for( const connection_detail& cd : index ) {
result.emplace_back( cd.c->get_status() );
{
std::shared_lock g( connections_mtx );
auto& index = connections.get<by_connection>();
result.reserve( index.size() );
conns.reserve( index.size() );
for( const connection_detail& cd : index ) {
conns.emplace_back( cd.c );
}
}
for (const auto& c : conns) {
result.push_back( c->get_status() );
}
return result;
}
Expand Down Expand Up @@ -4647,7 +4643,7 @@ namespace eosio {
auto cleanup = [&num_peers, &num_rm, this](vector<connection_ptr>&& reconnecting,
vector<connection_ptr>&& removing) {
for( auto& c : reconnecting ) {
if (!c->reconnect()) {
if (!c->resolve_and_connect()) {
--num_peers;
++num_rm;
removing.push_back(c);
Expand Down Expand Up @@ -4713,7 +4709,7 @@ namespace eosio {
assert(update_p2p_connection_metrics);
auto from = from_connection.lock();
std::shared_lock g(connections_mtx);
auto& index = connections.get<by_connection>();
const auto& index = connections.get<by_connection>();
size_t num_clients = 0, num_peers = 0, num_bp_peers = 0;
net_plugin::p2p_per_connection_metrics per_connection(index.size());
for (auto it = index.begin(); it != index.end(); ++it) {
Expand Down
Loading