Skip to content

Commit

Permalink
query all DNS seeders if missing many connections
Browse files Browse the repository at this point in the history
  • Loading branch information
x100111010 committed Oct 19, 2024
1 parent 694da52 commit 4e35e0b
Showing 1 changed file with 54 additions and 25 deletions.
79 changes: 54 additions & 25 deletions components/connectionmanager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
};

use duration_string::DurationString;
use futures_util::future::join_all;
use futures_util::future::{join_all, try_join_all};
use itertools::Itertools;
use parking_lot::Mutex as ParkingLotMutex;
use rand::{seq::SliceRandom, thread_rng};
Expand Down Expand Up @@ -227,12 +227,14 @@ impl ConnectionManager {
}

if missing_connections > 0 && !self.dns_seeders.is_empty() {
let cmgr = self.clone();
// DNS lookup is a blocking i/o operation, so we spawn it as a blocking task
let _ = tokio::task::spawn_blocking(move || {
cmgr.dns_seed(missing_connections); //TODO: Consider putting a number higher than `missing_connections`.
})
.await;
if missing_connections > self.outbound_target / 2 {
// If we are missing more than half of our target, query all in parallel.
// This will always be the case on new node start-up and is the most resilient strategy in such a case.
self.dns_seed_many(self.dns_seeders.len()).await;
} else {
// Try to obtain at least twice the number of missing connections
self.dns_seed_with_address_target(2 * missing_connections).await;
}
}
}

Expand All @@ -251,26 +253,17 @@ impl ConnectionManager {
join_all(futures).await;
}

fn dns_seed(self: &Arc<Self>, mut min_addresses_to_fetch: usize) {
/// Queries DNS seeders in random order, one after the other, until obtaining `min_addresses_to_fetch` addresses
async fn dns_seed_with_address_target(self: &Arc<Self>, min_addresses_to_fetch: usize) {
let cmgr = self.clone();
tokio::task::spawn_blocking(move || cmgr.dns_seed_with_address_target_blocking(min_addresses_to_fetch)).await.unwrap();
}

fn dns_seed_with_address_target_blocking(self: &Arc<Self>, mut min_addresses_to_fetch: usize) {
let shuffled_dns_seeders = self.dns_seeders.choose_multiple(&mut thread_rng(), self.dns_seeders.len());
for &seeder in shuffled_dns_seeders {
info!("Querying DNS seeder {}", seeder);
// Since the DNS lookup protocol doesn't come with a port, we must assume that the default port is used.
let addrs = match (seeder, self.default_port).to_socket_addrs() {
Ok(addrs) => addrs,
Err(e) => {
warn!("Error connecting to DNS seeder {}: {}", seeder, e);
continue;
}
};

let addrs_len = addrs.len();
info!("Retrieved {} addresses from DNS seeder {}", addrs_len, seeder);
let mut amgr_lock = self.address_manager.lock();
for addr in addrs {
amgr_lock.add_address(NetAddress::new(addr.ip().into(), addr.port()));
}

// Query seeders sequentially until reaching the desired number of addresses
let addrs_len = self.dns_seed_single(seeder);
if addrs_len >= min_addresses_to_fetch {
break;
} else {
Expand All @@ -279,6 +272,42 @@ impl ConnectionManager {
}
}

/// Queries `num_seeders_to_query` random DNS seeders in parallel
async fn dns_seed_many(self: &Arc<Self>, num_seeders_to_query: usize) -> usize {
info!("Querying {} DNS seeders", num_seeders_to_query);
let shuffled_dns_seeders = self.dns_seeders.choose_multiple(&mut thread_rng(), num_seeders_to_query);
let jobs = shuffled_dns_seeders.map(|seeder| {
let cmgr = self.clone();
tokio::task::spawn_blocking(move || cmgr.dns_seed_single(seeder))
});
try_join_all(jobs).await.unwrap().into_iter().sum()
}

/// Query a single DNS seeder and add the obtained addresses to the address manager.
///
/// DNS lookup is a blocking i/o operation so this function is assumed to be called
/// from a blocking execution context.
fn dns_seed_single(self: &Arc<Self>, seeder: &str) -> usize {
info!("Querying DNS seeder {}", seeder);
// Since the DNS lookup protocol doesn't come with a port, we must assume that the default port is used.
let addrs = match (seeder, self.default_port).to_socket_addrs() {
Ok(addrs) => addrs,
Err(e) => {
warn!("Error connecting to DNS seeder {}: {}", seeder, e);
return 0;
}
};

let addrs_len = addrs.len();
info!("Retrieved {} addresses from DNS seeder {}", addrs_len, seeder);
let mut amgr_lock = self.address_manager.lock();
for addr in addrs {
amgr_lock.add_address(NetAddress::new(addr.ip().into(), addr.port()));
}

addrs_len
}

/// Bans the given IP and disconnects from all the peers with that IP.
///
/// _GO-SPECTRED: BanByIP_
Expand Down

0 comments on commit 4e35e0b

Please sign in to comment.