diff --git a/components/connectionmanager/src/lib.rs b/components/connectionmanager/src/lib.rs index 27b8c2a..331f6c1 100644 --- a/components/connectionmanager/src/lib.rs +++ b/components/connectionmanager/src/lib.rs @@ -7,7 +7,7 @@ use std::{ }; use duration_string::DurationString; -use futures_util::future::join_all; +use futures_util::future::{join_all, try_join_all}; use itertools::Itertools; use parking_lot::Mutex as ParkingLotMutex; use rand::{seq::SliceRandom, thread_rng}; @@ -227,12 +227,14 @@ impl ConnectionManager { } if missing_connections > 0 && !self.dns_seeders.is_empty() { - let cmgr = self.clone(); - // DNS lookup is a blocking i/o operation, so we spawn it as a blocking task - let _ = tokio::task::spawn_blocking(move || { - cmgr.dns_seed(missing_connections); //TODO: Consider putting a number higher than `missing_connections`. - }) - .await; + if missing_connections > self.outbound_target / 2 { + // If we are missing more than half of our target, query all in parallel. + // This will always be the case on new node start-up and is the most resilient strategy in such a case. + self.dns_seed_many(self.dns_seeders.len()).await; + } else { + // Try to obtain at least twice the number of missing connections + self.dns_seed_with_address_target(2 * missing_connections).await; + } } } @@ -251,26 +253,17 @@ impl ConnectionManager { join_all(futures).await; } - fn dns_seed(self: &Arc, mut min_addresses_to_fetch: usize) { + /// Queries DNS seeders in random order, one after the other, until obtaining `min_addresses_to_fetch` addresses + async fn dns_seed_with_address_target(self: &Arc, min_addresses_to_fetch: usize) { + let cmgr = self.clone(); + tokio::task::spawn_blocking(move || cmgr.dns_seed_with_address_target_blocking(min_addresses_to_fetch)).await.unwrap(); + } + + fn dns_seed_with_address_target_blocking(self: &Arc, mut min_addresses_to_fetch: usize) { let shuffled_dns_seeders = self.dns_seeders.choose_multiple(&mut thread_rng(), self.dns_seeders.len()); for &seeder in shuffled_dns_seeders { - info!("Querying DNS seeder {}", seeder); - // Since the DNS lookup protocol doesn't come with a port, we must assume that the default port is used. - let addrs = match (seeder, self.default_port).to_socket_addrs() { - Ok(addrs) => addrs, - Err(e) => { - warn!("Error connecting to DNS seeder {}: {}", seeder, e); - continue; - } - }; - - let addrs_len = addrs.len(); - info!("Retrieved {} addresses from DNS seeder {}", addrs_len, seeder); - let mut amgr_lock = self.address_manager.lock(); - for addr in addrs { - amgr_lock.add_address(NetAddress::new(addr.ip().into(), addr.port())); - } - + // Query seeders sequentially until reaching the desired number of addresses + let addrs_len = self.dns_seed_single(seeder); if addrs_len >= min_addresses_to_fetch { break; } else { @@ -279,6 +272,42 @@ impl ConnectionManager { } } + /// Queries `num_seeders_to_query` random DNS seeders in parallel + async fn dns_seed_many(self: &Arc, num_seeders_to_query: usize) -> usize { + info!("Querying {} DNS seeders", num_seeders_to_query); + let shuffled_dns_seeders = self.dns_seeders.choose_multiple(&mut thread_rng(), num_seeders_to_query); + let jobs = shuffled_dns_seeders.map(|seeder| { + let cmgr = self.clone(); + tokio::task::spawn_blocking(move || cmgr.dns_seed_single(seeder)) + }); + try_join_all(jobs).await.unwrap().into_iter().sum() + } + + /// Query a single DNS seeder and add the obtained addresses to the address manager. + /// + /// DNS lookup is a blocking i/o operation so this function is assumed to be called + /// from a blocking execution context. + fn dns_seed_single(self: &Arc, seeder: &str) -> usize { + info!("Querying DNS seeder {}", seeder); + // Since the DNS lookup protocol doesn't come with a port, we must assume that the default port is used. + let addrs = match (seeder, self.default_port).to_socket_addrs() { + Ok(addrs) => addrs, + Err(e) => { + warn!("Error connecting to DNS seeder {}: {}", seeder, e); + return 0; + } + }; + + let addrs_len = addrs.len(); + info!("Retrieved {} addresses from DNS seeder {}", addrs_len, seeder); + let mut amgr_lock = self.address_manager.lock(); + for addr in addrs { + amgr_lock.add_address(NetAddress::new(addr.ip().into(), addr.port())); + } + + addrs_len + } + /// Bans the given IP and disconnects from all the peers with that IP. /// /// _GO-SPECTRED: BanByIP_