diff --git a/zebra-network/src/config.rs b/zebra-network/src/config.rs index e6c1b3b90e7..e048129b914 100644 --- a/zebra-network/src/config.rs +++ b/zebra-network/src/config.rs @@ -9,7 +9,10 @@ use serde::{de, Deserialize, Deserializer}; use zebra_chain::{parameters::Network, serialization::canonical_socket_addr}; -use crate::BoxError; +use crate::{constants, BoxError}; + +#[cfg(test)] +mod tests; /// The number of times Zebra will retry each initial peer's DNS resolution, /// before checking if any other initial peers have returned addresses. @@ -53,6 +56,8 @@ pub struct Config { /// The initial target size for the peer set. /// + /// Also used to limit the number of inbound and outbound connections made by Zebra. + /// /// If you have a slow network connection, and Zebra is having trouble /// syncing, try reducing the peer set size. You can also reduce the peer /// set size to reduce Zebra's bandwidth usage. @@ -72,6 +77,38 @@ pub struct Config { } impl Config { + /// The maximum number of outbound connections that Zebra will open at the same time. + /// When this limit is reached, Zebra stops opening outbound connections. + /// + /// # Security + /// + /// This is larger than the inbound connection limit, + /// so Zebra is more likely to be connected to peers that it has selected. + pub fn peerset_outbound_connection_limit(&self) -> usize { + let inbound_limit = self.peerset_inbound_connection_limit(); + + inbound_limit + inbound_limit / constants::OUTBOUND_PEER_BIAS_DENOMINATOR + } + + /// The maximum number of inbound connections that Zebra will accept at the same time. + /// When this limit is reached, Zebra drops new inbound connections without handshaking on them. + pub fn peerset_inbound_connection_limit(&self) -> usize { + self.peerset_initial_target_size + } + + /// The maximum number of inbound and outbound connections that Zebra will have at the same time. + pub fn peerset_total_connection_limit(&self) -> usize { + self.peerset_outbound_connection_limit() + self.peerset_inbound_connection_limit() + } + + /// Get the initial seed peers based on the configured network. + pub async fn initial_peers(&self) -> HashSet { + match self.network { + Network::Mainnet => Config::resolve_peers(&self.initial_mainnet_peers).await, + Network::Testnet => Config::resolve_peers(&self.initial_testnet_peers).await, + } + } + /// Concurrently resolves `peers` into zero or more IP addresses, with a /// timeout of a few seconds on each DNS request. /// @@ -115,14 +152,6 @@ impl Config { } } - /// Get the initial seed peers based on the configured network. - pub async fn initial_peers(&self) -> HashSet { - match self.network { - Network::Mainnet => Config::resolve_peers(&self.initial_mainnet_peers).await, - Network::Testnet => Config::resolve_peers(&self.initial_testnet_peers).await, - } - } - /// Resolves `host` into zero or more IP addresses, retrying up to /// `max_retries` times. /// @@ -265,6 +294,7 @@ impl<'de> Deserialize<'de> for Config { } let config = DConfig::deserialize(deserializer)?; + // TODO: perform listener DNS lookups asynchronously with a timeout (#1631) let listen_addr = match config.listen_addr.parse::() { Ok(socket) => Ok(socket), @@ -287,6 +317,3 @@ impl<'de> Deserialize<'de> for Config { }) } } - -#[cfg(test)] -mod tests; diff --git a/zebra-network/src/constants.rs b/zebra-network/src/constants.rs index c798a95cdd7..154c9bb0bf3 100644 --- a/zebra-network/src/constants.rs +++ b/zebra-network/src/constants.rs @@ -13,6 +13,24 @@ use zebra_chain::{ serialization::Duration32, }; +/// The fractional bias towards outbound peers in the peer set, +/// if connection limits have been reached. +/// +/// Inbound and outbound connections are limited based on +/// [`Config.peerset_initial_target_size`]. +/// +/// The outbound limit is larger than the inbound limit by: +/// `Config.peerset_initial_target_size / OUTBOUND_PEER_BIAS_DENOMINATOR`. +/// +/// # Security +/// +/// This bias helps make sure that Zebra is connected to a majority of peers +/// that it has chosen from its [`AddressBook`]. +/// +/// Inbound peer connections are initiated by the remote peer, +/// so inbound peer selection is not controlled by the local node. +pub const OUTBOUND_PEER_BIAS_DENOMINATOR: usize = 2; + /// The buffer size for the peer set. /// /// This should be greater than 1 to avoid sender contention, but also reasonably diff --git a/zebra-network/src/peer.rs b/zebra-network/src/peer.rs index 083246c77b8..11cbcf13d30 100644 --- a/zebra-network/src/peer.rs +++ b/zebra-network/src/peer.rs @@ -12,10 +12,9 @@ mod error; mod handshake; use client::{ClientRequest, ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender}; -use error::ErrorSlot; pub use client::Client; pub use connection::Connection; pub use connector::{Connector, OutboundConnectorRequest}; -pub use error::{HandshakeError, PeerError, SharedPeerError}; +pub use error::{ErrorSlot, HandshakeError, PeerError, SharedPeerError}; pub use handshake::{ConnectedAddr, Handshake, HandshakeRequest}; diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index 668fcd6230e..ca63d3771d7 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -17,21 +17,29 @@ use super::{ErrorSlot, PeerError, SharedPeerError}; /// The "client" duplex half of a peer connection. pub struct Client { - // Used to shut down the corresponding heartbeat. - // This is always Some except when we take it on drop. - pub(super) shutdown_tx: Option>, - pub(super) server_tx: mpsc::Sender, - pub(super) error_slot: ErrorSlot, + /// Used to shut down the corresponding heartbeat. + /// This is always Some except when we take it on drop. + pub(crate) shutdown_tx: Option>, + + /// Used to send [`Request`]s to the remote peer. + pub(crate) server_tx: mpsc::Sender, + + /// A slot for an error shared between the Connection and the Client that uses it. + /// + /// `None` unless the connection or client have errored. + pub(crate) error_slot: ErrorSlot, } /// A message from the `peer::Client` to the `peer::Server`. #[derive(Debug)] -pub(super) struct ClientRequest { - /// The actual request. +pub(crate) struct ClientRequest { + /// The actual network request for the peer. pub request: Request, - /// The return message channel, included because `peer::Client::call` returns a + + /// The response [`Message`] channel, included because `peer::Client::call` returns a /// future that may be moved around before it resolves. pub tx: oneshot::Sender>, + /// The tracing context for the request, so that work the connection task does /// processing messages in the context of this request will have correct context. pub span: tracing::Span, diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 4319e8ee2ae..8e59e8b80db 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -331,6 +331,8 @@ pub struct Connection { pub(super) client_rx: ClientRequestReceiver, /// A slot for an error shared between the Connection and the Client that uses it. + // + /// `None` unless the connection or client have errored. pub(super) error_slot: ErrorSlot, /// A channel for sending requests to the connected peer. diff --git a/zebra-network/src/peer/error.rs b/zebra-network/src/peer/error.rs index ab1c36db017..8bdaad1ae3e 100644 --- a/zebra-network/src/peer/error.rs +++ b/zebra-network/src/peer/error.rs @@ -72,7 +72,7 @@ pub enum PeerError { /// mutex should be held for as short a time as possible. This avoids blocking /// the async task thread on acquiring the mutex. #[derive(Default, Clone)] -pub(super) struct ErrorSlot(Arc>>); +pub struct ErrorSlot(Arc>>); impl std::fmt::Debug for ErrorSlot { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index b57def9e768..7c0bce0f739 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -63,6 +63,11 @@ type PeerChange = Result, BoxError>; /// In addition to returning a service for outbound requests, this method /// returns a shared [`AddressBook`] updated with last-seen timestamps for /// connected peers. +/// +/// # Panics +/// +/// If `config.config.peerset_initial_target_size` is zero. +/// (zebra-network expects to be able to connect to at least one peer.) pub async fn init( config: Config, inbound_service: S, @@ -76,10 +81,25 @@ where S::Future: Send + 'static, C: ChainTip + Clone + Send + 'static, { + // If we want Zebra to operate with no network, + // we should implement a `zebrad` command that doesn't use `zebra-network`. + assert!( + config.peerset_initial_target_size > 0, + "Zebra must be allowed to connect to at least one peer" + ); + let (tcp_listener, listen_addr) = open_listener(&config.clone()).await; let (address_book, timestamp_collector) = TimestampCollector::spawn(listen_addr); - let (inv_sender, inv_receiver) = broadcast::channel(100); + + // Create a broadcast channel for peer inventory advertisements. + // If it reaches capacity, this channel drops older inventory advertisements. + // + // When Zebra is at the chain tip with an up-to-date mempool, + // we expect to have at most 1 new transaction per connected peer, + // and 1-2 new blocks across the entire network. + // (The block syncer and mempool crawler handle bulk fetches of blocks and transactions.) + let (inv_sender, inv_receiver) = broadcast::channel(config.peerset_total_connection_limit()); // Construct services that handle inbound handshakes and perform outbound // handshakes. These use the same handshake service internally to detect @@ -106,10 +126,14 @@ where ) }; - // Create an mpsc channel for peer changes, with a generous buffer. - let (peerset_tx, peerset_rx) = mpsc::channel::(100); - // Create an mpsc channel for peerset demand signaling. - let (mut demand_tx, demand_rx) = mpsc::channel::(100); + // Create an mpsc channel for peer changes, + // based on the maximum number of inbound and outbound peers. + let (peerset_tx, peerset_rx) = + mpsc::channel::(config.peerset_total_connection_limit()); + // Create an mpsc channel for peerset demand signaling, + // based on the maximum number of outbound peers. + let (mut demand_tx, demand_rx) = + mpsc::channel::(config.peerset_outbound_connection_limit()); // Create a oneshot to send background task JoinHandles to the peer set let (handle_tx, handle_rx) = tokio::sync::oneshot::channel(); @@ -174,9 +198,10 @@ where let _ = demand_tx.try_send(MorePeers); } - let crawl_guard = tokio::spawn( + let crawl_fut = { + let config = config.clone(); crawl_and_dial( - config.crawl_new_peer_interval, + config, demand_tx, demand_rx, candidates, @@ -184,8 +209,8 @@ where peerset_tx, active_outbound_connections, ) - .instrument(Span::current()), - ); + }; + let crawl_guard = tokio::spawn(crawl_fut.instrument(Span::current())); handle_tx.send(vec![listen_guard, crawl_guard]).unwrap(); @@ -294,6 +319,11 @@ where peerset_tx .send(handshake_result.map_err(|(_addr, e)| e)) .await?; + + // Security: Let other tasks run after each connection is processed. + // + // Avoids remote peers starving other Zebra tasks using initial connection successes or errors. + tokio::task::yield_now().await; } let outbound_connections = active_outbound_connections.update_count(); @@ -406,7 +436,7 @@ where if let Ok((tcp_stream, addr)) = listener.accept().await { // The peer already opened a connection, so increment the connection count immediately. let connection_tracker = active_inbound_connections.track_connection(); - info!( + debug!( inbound_connections = ?active_inbound_connections.update_count(), "handshaking on an open inbound peer connection" ); @@ -449,6 +479,11 @@ where // but most OSes also limit the number of queued inbound connections on a listener port. tokio::time::sleep(constants::MIN_PEER_CONNECTION_INTERVAL).await; } + + // Security: Let other tasks run after each connection is processed. + // + // Avoids remote peers starving other Zebra tasks using inbound connection successes or errors. + tokio::task::yield_now().await; } } @@ -476,9 +511,9 @@ enum CrawlerAction { /// and connect to new peers, and send the resulting `peer::Client`s through the /// `peerset_tx` channel. /// -/// Crawl for new peers every `crawl_new_peer_interval`, and whenever there is -/// demand, but no new peers in `candidates`. After crawling, try to connect to -/// one new peer using `outbound_connector`. +/// Crawl for new peers every `config.crawl_new_peer_interval`. +/// Also crawl whenever there is demand, but no new peers in `candidates`. +/// After crawling, try to connect to one new peer using `outbound_connector`. /// /// If a handshake fails, restore the unused demand signal by sending it to /// `demand_tx`. @@ -487,11 +522,19 @@ enum CrawlerAction { /// permanent internal error. Transient errors and individual peer errors should /// be handled within the crawler. /// -/// Uses `active_outbound_connections` to track active outbound connections +/// Uses `active_outbound_connections` to track the number of active outbound connections /// in both the initial peers and crawler. -#[instrument(skip(demand_tx, demand_rx, candidates, outbound_connector, peerset_tx,))] +#[instrument(skip( + config, + demand_tx, + demand_rx, + candidates, + outbound_connector, + peerset_tx, + active_outbound_connections, +))] async fn crawl_and_dial( - crawl_new_peer_interval: std::time::Duration, + config: Config, mut demand_tx: mpsc::Sender, mut demand_rx: mpsc::Receiver, mut candidates: CandidateSet, @@ -530,7 +573,7 @@ where handshakes.push(future::pending().boxed()); let mut crawl_timer = - tokio::time::interval(crawl_new_peer_interval).map(|tick| TimerCrawl { tick }); + tokio::time::interval(config.crawl_new_peer_interval).map(|tick| TimerCrawl { tick }); loop { metrics::gauge!( @@ -548,8 +591,8 @@ where next_timer = crawl_timer.next() => next_timer.expect("timers never terminate"), // turn the demand into an action, based on the crawler's current state _ = demand_rx.next() => { - if handshakes.len() > 50 { - // Too many pending handshakes already + if active_outbound_connections.update_count() >= config.peerset_outbound_connection_limit() { + // Too many open connections or pending handshakes already DemandDrop } else if let Some(candidate) = candidates.next().await { // candidates.next has a short delay, and briefly holds the address @@ -566,13 +609,13 @@ where // This is set to trace level because when the peerset is // congested it can generate a lot of demand signal very // rapidly. - trace!("too many in-flight handshakes, dropping demand signal"); + trace!("too many open connections or in-flight handshakes, dropping demand signal"); continue; } DemandHandshake { candidate } => { // Increment the connection count before we spawn the connection. let outbound_connection_tracker = active_outbound_connections.track_connection(); - info!( + debug!( outbound_connections = ?active_outbound_connections.update_count(), "opening an outbound peer connection" ); @@ -635,6 +678,11 @@ where let _ = demand_tx.try_send(MorePeers); } } + + // Security: Let other tasks run after each crawler action is processed. + // + // Avoids remote peers starving other Zebra tasks using outbound connection errors. + tokio::task::yield_now().await; } } diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index 310d07f26f9..beefc2fdc7c 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -1,4 +1,4 @@ -//! Specific configs used for zebra-network initialization tests. +//! zebra-network initialization tests using fixed configs. //! //! ## Failures due to Port Conflicts //! @@ -13,19 +13,43 @@ //! If it does not have any IPv4 interfaces, or IPv4 localhost is not on `127.0.0.1`, //! skip all the network tests by setting the `ZEBRA_SKIP_NETWORK_TESTS` environmental variable. -use std::{collections::HashSet, net::SocketAddr}; +use std::{ + collections::HashSet, + net::{Ipv4Addr, SocketAddr}, + sync::Arc, + time::Duration, +}; -use tower::service_fn; +use futures::{ + channel::{mpsc, oneshot}, + FutureExt, +}; +use tower::{discover::Change, service_fn, Service}; +use tracing::Span; -use zebra_chain::{chain_tip::NoChainTip, parameters::Network}; +use zebra_chain::{chain_tip::NoChainTip, parameters::Network, serialization::DateTime32}; use zebra_test::net::random_known_port; -use crate::Config; - -use super::super::init; +use crate::{ + init, + meta_addr::MetaAddr, + peer::{self, ErrorSlot, OutboundConnectorRequest}, + peer_set::{ + initialize::{crawl_and_dial, PeerChange}, + set::MorePeers, + ActiveConnectionCounter, CandidateSet, + }, + protocol::types::PeerServices, + AddressBook, BoxError, Config, Request, Response, +}; use Network::*; +/// The amount of time to run the crawler, before testing what it has done. +/// +/// Using a very short time can make the crawler not run at all. +const CRAWLER_TEST_DURATION: Duration = Duration::from_secs(10); + /// Test that zebra-network discovers dynamic bind-to-all-interfaces listener ports, /// and sends them to the `AddressBook`. /// @@ -98,6 +122,473 @@ async fn local_listener_fixed_port_localhost_addr() { local_listener_port_with(SocketAddr::new(localhost_v6, random_known_port()), Testnet).await; } +/// Test zebra-network with a peer limit of zero peers on mainnet. +/// (Zebra does not support this mode of operation.) +#[tokio::test] +#[should_panic] +async fn peer_limit_zero_mainnet() { + zebra_test::init(); + + // This test should not require network access, because the connection limit is zero. + + let unreachable_inbound_service = + service_fn(|_| async { unreachable!("inbound service should never be called") }); + + let address_book = init_with_peer_limit(0, unreachable_inbound_service, Mainnet).await; + assert_eq!( + address_book.lock().unwrap().peers().count(), + 0, + "expected no peers in Mainnet address book, but got: {:?}", + address_book.lock().unwrap().address_metrics() + ); +} + +/// Test zebra-network with a peer limit of zero peers on testnet. +/// (Zebra does not support this mode of operation.) +#[tokio::test] +#[should_panic] +async fn peer_limit_zero_testnet() { + zebra_test::init(); + + // This test should not require network access, because the connection limit is zero. + + let unreachable_inbound_service = + service_fn(|_| async { unreachable!("inbound service should never be called") }); + + let address_book = init_with_peer_limit(0, unreachable_inbound_service, Testnet).await; + assert_eq!( + address_book.lock().unwrap().peers().count(), + 0, + "expected no peers in Testnet address book, but got: {:?}", + address_book.lock().unwrap().address_metrics() + ); +} + +/// Test zebra-network with a peer limit of one inbound and one outbound peer on mainnet. +#[tokio::test] +async fn peer_limit_one_mainnet() { + zebra_test::init(); + + if zebra_test::net::zebra_skip_network_tests() { + return; + } + + let nil_inbound_service = service_fn(|_| async { Ok(Response::Nil) }); + + let _ = init_with_peer_limit(1, nil_inbound_service, Mainnet).await; + + // Let the crawler run for a while. + tokio::time::sleep(CRAWLER_TEST_DURATION).await; + + // Any number of address book peers is valid here, because some peers might have failed. +} + +/// Test zebra-network with a peer limit of one inbound and one outbound peer on testnet. +#[tokio::test] +async fn peer_limit_one_testnet() { + zebra_test::init(); + + if zebra_test::net::zebra_skip_network_tests() { + return; + } + + let nil_inbound_service = service_fn(|_| async { Ok(Response::Nil) }); + + let _ = init_with_peer_limit(1, nil_inbound_service, Testnet).await; + + // Let the crawler run for a while. + tokio::time::sleep(CRAWLER_TEST_DURATION).await; + + // Any number of address book peers is valid here, because some peers might have failed. +} + +/// Test zebra-network with a peer limit of two inbound and three outbound peers on mainnet. +#[tokio::test] +async fn peer_limit_two_mainnet() { + zebra_test::init(); + + if zebra_test::net::zebra_skip_network_tests() { + return; + } + + let nil_inbound_service = service_fn(|_| async { Ok(Response::Nil) }); + + let _ = init_with_peer_limit(2, nil_inbound_service, Mainnet).await; + + // Let the crawler run for a while. + tokio::time::sleep(CRAWLER_TEST_DURATION).await; + + // Any number of address book peers is valid here, because some peers might have failed. +} + +/// Test zebra-network with a peer limit of two inbound and three outbound peers on testnet. +#[tokio::test] +async fn peer_limit_two_testnet() { + zebra_test::init(); + + if zebra_test::net::zebra_skip_network_tests() { + return; + } + + let nil_inbound_service = service_fn(|_| async { Ok(Response::Nil) }); + + let _ = init_with_peer_limit(2, nil_inbound_service, Testnet).await; + + // Let the crawler run for a while. + tokio::time::sleep(CRAWLER_TEST_DURATION).await; + + // Any number of address book peers is valid here, because some peers might have failed. +} + +/// Test the crawler with an outbound peer limit of zero peers, and a connector that panics. +#[tokio::test] +async fn crawler_peer_limit_zero_connect_panic() { + zebra_test::init(); + + // This test does not require network access, because the outbound connector + // and peer set are fake. + + let unreachable_outbound_connector = service_fn(|_| async { + unreachable!("outbound connector should never be called with a zero peer limit") + }); + + let (_config, mut peerset_tx) = + spawn_crawler_with_peer_limit(0, unreachable_outbound_connector).await; + + let peer_result = peerset_tx.try_next(); + assert!( + // `Err(_)` means that no peers are available, and the sender has not been dropped. + // `Ok(None)` means that no peers are available, and the sender has been dropped. + matches!(peer_result, Err(_) | Ok(None)), + "unexpected peer when outbound limit is zero: {:?}", + peer_result, + ); +} + +/// Test the crawler with an outbound peer limit of one peer, and a connector that always errors. +#[tokio::test] +async fn crawler_peer_limit_one_connect_error() { + zebra_test::init(); + + // This test does not require network access, because the outbound connector + // and peer set are fake. + + let error_outbound_connector = + service_fn(|_| async { Err("test outbound connector always returns errors".into()) }); + + let (_config, mut peerset_tx) = + spawn_crawler_with_peer_limit(1, error_outbound_connector).await; + + let peer_result = peerset_tx.try_next(); + assert!( + // `Err(_)` means that no peers are available, and the sender has not been dropped. + // `Ok(None)` means that no peers are available, and the sender has been dropped. + matches!(peer_result, Err(_) | Ok(None)), + "unexpected peer when all connections error: {:?}", + peer_result, + ); +} + +/// Test the crawler with an outbound peer limit of one peer, +/// and a connector that returns success then disconnects the peer. +#[tokio::test] +async fn crawler_peer_limit_one_connect_ok_then_drop() { + zebra_test::init(); + + // This test does not require network access, because the outbound connector + // and peer set are fake. + + let success_disconnect_outbound_connector = + service_fn(|req: OutboundConnectorRequest| async move { + let OutboundConnectorRequest { + addr, + connection_tracker, + } = req; + + let (server_tx, _server_rx) = mpsc::channel(0); + let (shutdown_tx, _shutdown_rx) = oneshot::channel(); + let error_slot = ErrorSlot::default(); + + let fake_client = peer::Client { + shutdown_tx: Some(shutdown_tx), + server_tx, + error_slot, + }; + + // Fake the connection closing. + std::mem::drop(connection_tracker); + + // Give the crawler time to get the message. + tokio::task::yield_now().await; + + Ok(Change::Insert(addr, fake_client)) + }); + + let (config, mut peerset_tx) = + spawn_crawler_with_peer_limit(1, success_disconnect_outbound_connector).await; + + let mut peer_count: usize = 0; + loop { + let peer_result = peerset_tx.try_next(); + match peer_result { + // A peer handshake succeeded. + Ok(Some(peer_result)) => { + assert!( + matches!(peer_result, Ok(Change::Insert(_, _))), + "unexpected connection error: {:?}\n\ + {} previous peers succeeded", + peer_result, + peer_count, + ); + peer_count += 1; + } + + // The channel is closed and there are no messages left in the channel. + Ok(None) => break, + // The channel is still open, but there are no messages left in the channel. + Err(_) => break, + } + } + + assert!( + peer_count > config.peerset_outbound_connection_limit(), + "unexpected number of peer connections {}, should be at least the limit of {}", + peer_count, + config.peerset_outbound_connection_limit(), + ); +} + +/// Test the crawler with an outbound peer limit of one peer, +/// and a connector that returns success then holds the peer open. +#[tokio::test] +async fn crawler_peer_limit_one_connect_ok_stay_open() { + zebra_test::init(); + + // This test does not require network access, because the outbound connector + // and peer set are fake. + + let success_stay_open_outbound_connector = + service_fn(|req: OutboundConnectorRequest| async move { + let OutboundConnectorRequest { + addr, + connection_tracker, + } = req; + + let (server_tx, _server_rx) = mpsc::channel(0); + let (shutdown_tx, _shutdown_rx) = oneshot::channel(); + let error_slot = ErrorSlot::default(); + + let fake_client = peer::Client { + shutdown_tx: Some(shutdown_tx), + server_tx, + error_slot, + }; + + // Fake the connection being open forever. + std::mem::forget(connection_tracker); + + Ok(Change::Insert(addr, fake_client)) + }); + + let (config, mut peerset_tx) = + spawn_crawler_with_peer_limit(1, success_stay_open_outbound_connector).await; + + let mut peer_count: usize = 0; + loop { + let peer_result = peerset_tx.try_next(); + match peer_result { + // A peer handshake succeeded. + Ok(Some(peer_result)) => { + assert!( + matches!(peer_result, Ok(Change::Insert(_, _))), + "unexpected connection error: {:?}\n\ + {} previous peers succeeded", + peer_result, + peer_count, + ); + peer_count += 1; + } + + // The channel is closed and there are no messages left in the channel. + Ok(None) => break, + // The channel is still open, but there are no messages left in the channel. + Err(_) => break, + } + } + + assert!( + peer_count <= config.peerset_outbound_connection_limit(), + "unexpected number of peer connections {}, over limit of {}", + peer_count, + config.peerset_outbound_connection_limit(), + ); +} + +/// Test the crawler with the default outbound peer limit, and a connector that always errors. +#[tokio::test] +async fn crawler_peer_limit_default_connect_error() { + zebra_test::init(); + + // This test does not require network access, because the outbound connector + // and peer set are fake. + + let error_outbound_connector = + service_fn(|_| async { Err("test outbound connector always returns errors".into()) }); + + let (_config, mut peerset_tx) = + spawn_crawler_with_peer_limit(None, error_outbound_connector).await; + + let peer_result = peerset_tx.try_next(); + assert!( + // `Err(_)` means that no peers are available, and the sender has not been dropped. + // `Ok(None)` means that no peers are available, and the sender has been dropped. + matches!(peer_result, Err(_) | Ok(None)), + "unexpected peer when all connections error: {:?}", + peer_result, + ); +} + +/// Test the crawler with the default outbound peer limit, +/// and a connector that returns success then disconnects the peer. +#[tokio::test] +async fn crawler_peer_limit_default_connect_ok_then_drop() { + zebra_test::init(); + + // This test does not require network access, because the outbound connector + // and peer set are fake. + + let success_disconnect_outbound_connector = + service_fn(|req: OutboundConnectorRequest| async move { + let OutboundConnectorRequest { + addr, + connection_tracker, + } = req; + + let (server_tx, _server_rx) = mpsc::channel(0); + let (shutdown_tx, _shutdown_rx) = oneshot::channel(); + let error_slot = ErrorSlot::default(); + + let fake_client = peer::Client { + shutdown_tx: Some(shutdown_tx), + server_tx, + error_slot, + }; + + // Fake the connection closing. + std::mem::drop(connection_tracker); + + // Give the crawler time to get the message. + tokio::task::yield_now().await; + + Ok(Change::Insert(addr, fake_client)) + }); + + let (config, mut peerset_tx) = + spawn_crawler_with_peer_limit(None, success_disconnect_outbound_connector).await; + + let mut peer_count: usize = 0; + loop { + let peer_result = peerset_tx.try_next(); + match peer_result { + // A peer handshake succeeded. + Ok(Some(peer_result)) => { + assert!( + matches!(peer_result, Ok(Change::Insert(_, _))), + "unexpected connection error: {:?}\n\ + {} previous peers succeeded", + peer_result, + peer_count, + ); + peer_count += 1; + } + + // The channel is closed and there are no messages left in the channel. + Ok(None) => break, + // The channel is still open, but there are no messages left in the channel. + Err(_) => break, + } + } + + // TODO: tweak the crawler timeouts and rate-limits so we get over the actual limit + // (currently, getting over the limit can take 30 seconds or more) + let lower_limit = config.peerset_outbound_connection_limit() / 3; + assert!( + peer_count > lower_limit, + "unexpected number of peer connections {}, should be over the limit of {}", + peer_count, + lower_limit, + ); +} + +/// Test the crawler with the default outbound peer limit, +/// and a connector that returns success then holds the peer open. +#[tokio::test] +async fn crawler_peer_limit_default_connect_ok_stay_open() { + zebra_test::init(); + + // This test does not require network access, because the outbound connector + // and peer set are fake. + + let success_stay_open_outbound_connector = + service_fn(|req: OutboundConnectorRequest| async move { + let OutboundConnectorRequest { + addr, + connection_tracker, + } = req; + + let (server_tx, _server_rx) = mpsc::channel(0); + let (shutdown_tx, _shutdown_rx) = oneshot::channel(); + let error_slot = ErrorSlot::default(); + + let fake_client = peer::Client { + shutdown_tx: Some(shutdown_tx), + server_tx, + error_slot, + }; + + // Fake the connection being open forever. + std::mem::forget(connection_tracker); + + Ok(Change::Insert(addr, fake_client)) + }); + + // The initial target size is multiplied by 1.5 to give the actual limit. + let (config, mut peerset_tx) = + spawn_crawler_with_peer_limit(None, success_stay_open_outbound_connector).await; + + let mut peer_count: usize = 0; + loop { + let peer_result = peerset_tx.try_next(); + match peer_result { + // A peer handshake succeeded. + Ok(Some(peer_result)) => { + assert!( + matches!(peer_result, Ok(Change::Insert(_, _))), + "unexpected connection error: {:?}\n\ + {} previous peers succeeded", + peer_result, + peer_count, + ); + peer_count += 1; + } + + // The channel is closed and there are no messages left in the channel. + Ok(None) => break, + // The channel is still open, but there are no messages left in the channel. + Err(_) => break, + } + } + + assert!( + peer_count <= config.peerset_outbound_connection_limit(), + "unexpected number of peer connections {}, over limit of {}", + peer_count, + config.peerset_outbound_connection_limit(), + ); +} + +/// Open a local listener on `listen_addr` for `network`. +/// Asserts that the local listener address works as expected. async fn local_listener_port_with(listen_addr: SocketAddr, network: Network) { let config = Config { listen_addr, @@ -133,3 +624,142 @@ async fn local_listener_port_with(listen_addr: SocketAddr, network: Network) { "IP addresses are correctly propagated" ); } + +/// Initialize a peer set with `peerset_initial_target_size` and `inbound_service` on `network`. +/// Returns the newly created [`AddressBook`] for testing. +async fn init_with_peer_limit( + peerset_initial_target_size: usize, + inbound_service: S, + network: Network, +) -> Arc> +where + S: Service + Clone + Send + 'static, + S::Future: Send + 'static, +{ + // This test might fail on machines with no configured IPv4 addresses + // (localhost should be enough). + let unused_v4 = "0.0.0.0:0".parse().unwrap(); + + let config = Config { + peerset_initial_target_size, + + network, + listen_addr: unused_v4, + + ..Config::default() + }; + + let (_peer_service, address_book) = init(config, inbound_service, NoChainTip).await; + + address_book +} + +/// Run a peer crawler with `peerset_initial_target_size` and `outbound_connector`. +/// +/// Uses the default values for all other config fields. +/// +/// Returns the generated [`Config`], and the peer set receiver. +async fn spawn_crawler_with_peer_limit( + peerset_initial_target_size: impl Into>, + outbound_connector: C, +) -> (Config, mpsc::Receiver) +where + C: Service< + OutboundConnectorRequest, + Response = Change, + Error = BoxError, + > + Clone + + Send + + 'static, + C::Future: Send + 'static, +{ + // Create a test config. + let mut config = Config::default(); + if let Some(peerset_initial_target_size) = peerset_initial_target_size.into() { + config.peerset_initial_target_size = peerset_initial_target_size; + } + + // Manually initialize an address book without a timestamp tracker. + let mut address_book = AddressBook::new(config.listen_addr, Span::current()); + + // Add enough fake peers to go over the limit, even if the limit is zero. + let over_limit_peers = config.peerset_outbound_connection_limit() * 2 + 1; + let mut fake_peer = None; + for address_number in 0..over_limit_peers { + let addr = SocketAddr::new(Ipv4Addr::new(127, 1, 1, address_number as _).into(), 1); + let addr = + MetaAddr::new_gossiped_meta_addr(addr, PeerServices::NODE_NETWORK, DateTime32::now()); + fake_peer = Some(addr); + let addr = addr.new_gossiped_change(); + + address_book.update(addr); + } + + // Create a fake peer set. + let nil_peer_set = service_fn(move |req| async move { + let rsp = match req { + // Return the correct response variant for Peers requests, + // re-using one of the peers we already provided. + Request::Peers => Response::Peers(vec![fake_peer.unwrap()]), + _ => unreachable!("unexpected request: {:?}", req), + }; + + Ok(rsp) + }); + + let address_book = Arc::new(std::sync::Mutex::new(address_book)); + + // Make the channels large enough to hold all the peers. + let (peerset_tx, peerset_rx) = mpsc::channel::(over_limit_peers); + let (mut demand_tx, demand_rx) = mpsc::channel::(over_limit_peers); + + let candidates = CandidateSet::new(address_book.clone(), nil_peer_set); + + // In zebra_network::initialize() the counter would already have some initial peer connections, + // but in this test we start with an empty counter. + let active_outbound_connections = ActiveConnectionCounter::new_counter(); + + // Add fake demand over the limit. + for _ in 0..over_limit_peers { + let _ = demand_tx.try_send(MorePeers); + } + + // Start the crawler. + let crawl_fut = crawl_and_dial( + config.clone(), + demand_tx, + demand_rx, + candidates, + outbound_connector, + peerset_tx, + active_outbound_connections, + ); + let crawl_task_handle = tokio::spawn(crawl_fut); + + // Let the crawler run for a while. + tokio::time::sleep(CRAWLER_TEST_DURATION).await; + + // Stop the crawler and let it finish. + crawl_task_handle.abort(); + tokio::task::yield_now().await; + + // Check for panics or errors in the crawler. + let crawl_result = crawl_task_handle.now_or_never(); + assert!( + matches!(crawl_result, None) + || matches!(crawl_result, Some(Err(ref e)) if e.is_cancelled()), + "unexpected error or panic in peer crawler task: {:?}", + crawl_result, + ); + + // Check the final address book contents. + assert_eq!( + address_book.lock().unwrap().peers().count(), + over_limit_peers, + "expected {} peers in Mainnet address book, but got: {:?}", + over_limit_peers, + address_book.lock().unwrap().address_metrics() + ); + + (config, peerset_rx) +}