From d65e96ae7039e194ebeeb6e3ac0fb2d5e5f7e6c0 Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 27 Oct 2021 11:59:08 +1000 Subject: [PATCH 1/9] Limit open inbound connections based on the config --- zebra-network/src/peer_set/initialize.rs | 33 +++++++++++++++++------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index d01a6a314bb..91c45343211 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -162,10 +162,12 @@ where // Connect peerset_tx to the 3 peer sources: // // 1. Incoming peer connections, via a listener. - let listen_guard = tokio::spawn( - accept_inbound_connections(tcp_listener, listen_handshaker, peerset_tx.clone()) - .instrument(Span::current()), - ); + let listen_fut = { + let config = config.clone(); + let peerset_tx = peerset_tx.clone(); + accept_inbound_connections(config, tcp_listener, listen_handshaker, peerset_tx) + }; + let listen_guard = tokio::spawn(listen_fut.instrument(Span::current())); // 2. Initial peers, specified in the config. let initial_peers_fut = { @@ -434,8 +436,11 @@ async fn open_listener(config: &Config) -> (TcpListener, SocketAddr) { /// /// Uses `handshaker` to perform a Zcash network protocol handshake, and sends /// the [`peer::Client`] result over `peerset_tx`. -#[instrument(skip(listener, handshaker, peerset_tx), fields(listener_addr = ?listener.local_addr()))] +/// +/// Limit the number of active inbound connections based on `config`. +#[instrument(skip(config, listener, handshaker, peerset_tx), fields(listener_addr = ?listener.local_addr()))] async fn accept_inbound_connections( + config: Config, listener: TcpListener, mut handshaker: S, peerset_tx: mpsc::Sender, @@ -448,7 +453,17 @@ where loop { if let Ok((tcp_stream, addr)) = listener.accept().await { - // The peer already opened a connection, so increment the connection count immediately. + if active_inbound_connections.update_count() + >= config.peerset_inbound_connection_limit() + { + // Too many open inbound connections or pending handshakes already. + // Close the connection. + std::mem::drop(tcp_stream); + continue; + } + + // The peer already opened a connection to us. + // So we want to increment the connection count as soon as possible. let connection_tracker = active_inbound_connections.track_connection(); debug!( inbound_connections = ?active_inbound_connections.update_count(), @@ -536,8 +551,8 @@ enum CrawlerAction { /// permanent internal error. Transient errors and individual peer errors should /// be handled within the crawler. /// -/// Uses `active_outbound_connections` to track the number of active outbound connections -/// in both the initial peers and crawler. +/// Uses `active_outbound_connections` to limit the number of active outbound connections +/// across both the initial peers and crawler. The limit is based on `config`. #[instrument(skip( config, demand_tx, @@ -606,7 +621,7 @@ where // turn the demand into an action, based on the crawler's current state _ = demand_rx.next() => { if active_outbound_connections.update_count() >= config.peerset_outbound_connection_limit() { - // Too many open connections or pending handshakes already + // Too many open outbound connections or pending handshakes already DemandDrop } else if let Some(candidate) = candidates.next().await { // candidates.next has a short delay, and briefly holds the address From 5fb8e759e114cf1485da8fddc7d17b1e076a48dc Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 27 Oct 2021 12:02:43 +1000 Subject: [PATCH 2/9] Log inbound connection errors at debug level --- zebra-network/src/peer_set/initialize.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 91c45343211..e39f9f49582 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -452,7 +452,8 @@ where let mut active_inbound_connections = ActiveConnectionCounter::new_counter(); loop { - if let Ok((tcp_stream, addr)) = listener.accept().await { + let inbound_result = listener.accept().await; + if let Ok((tcp_stream, addr)) = inbound_result { if active_inbound_connections.update_count() >= config.peerset_inbound_connection_limit() { @@ -507,6 +508,8 @@ where // Zebra can't control how many queued connections are waiting, // but most OSes also limit the number of queued inbound connections on a listener port. tokio::time::sleep(constants::MIN_PEER_CONNECTION_INTERVAL).await; + } else { + debug!(?inbound_result, "error accepting inbound connection"); } // Security: Let other tasks run after each connection is processed. From f739a7ac8311b418e12f504c70fb9ee59fdd1b93 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 28 Oct 2021 10:16:11 +1000 Subject: [PATCH 3/9] Test inbound connection limits --- .../src/peer_set/initialize/tests/vectors.rs | 471 +++++++++++++++++- 1 file changed, 468 insertions(+), 3 deletions(-) diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index 71366de8abd..87d14851fe2 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -24,7 +24,7 @@ use futures::{ channel::{mpsc, oneshot}, FutureExt, StreamExt, }; -use tokio::task::JoinHandle; +use tokio::{net::TcpStream, task::JoinHandle}; use tower::{discover::Change, service_fn, Service}; use tracing::Span; @@ -34,9 +34,12 @@ use zebra_test::net::random_known_port; use crate::{ constants, init, meta_addr::MetaAddr, - peer::{self, ErrorSlot, OutboundConnectorRequest}, + peer::{self, ErrorSlot, HandshakeRequest, OutboundConnectorRequest}, peer_set::{ - initialize::{add_initial_peers, crawl_and_dial, PeerChange}, + initialize::{ + accept_inbound_connections, add_initial_peers, crawl_and_dial, open_listener, + PeerChange, + }, set::MorePeers, ActiveConnectionCounter, CandidateSet, }, @@ -51,6 +54,11 @@ use Network::*; /// Using a very short time can make the crawler not run at all. const CRAWLER_TEST_DURATION: Duration = Duration::from_secs(10); +/// The amount of time to run the listener, before testing what it has done. +/// +/// Using a very short time can make the listener not run at all. +const LISTENER_TEST_DURATION: Duration = Duration::from_secs(10); + /// Test that zebra-network discovers dynamic bind-to-all-interfaces listener ports, /// and sends them to the `AddressBook`. /// @@ -588,6 +596,367 @@ async fn crawler_peer_limit_default_connect_ok_stay_open() { ); } +/// Test the listener with an inbound peer limit of zero peers, and a handshaker that panics. +#[tokio::test] +async fn listener_peer_limit_zero_handshake_panic() { + zebra_test::init(); + + // This test requires an IPv4 network stack with 127.0.0.1 as localhost. + if zebra_test::net::zebra_skip_network_tests() { + return; + } + + let unreachable_inbound_handshaker = service_fn(|_| async { + unreachable!("inbound handshaker should never be called with a zero peer limit") + }); + + let (_config, mut peerset_tx) = + spawn_inbound_listener_with_peer_limit(0, unreachable_inbound_handshaker).await; + + let peer_result = peerset_tx.try_next(); + assert!( + // `Err(_)` means that no peers are available, and the sender has not been dropped. + // `Ok(None)` means that no peers are available, and the sender has been dropped. + matches!(peer_result, Err(_) | Ok(None)), + "unexpected peer when inbound limit is zero: {:?}", + peer_result, + ); +} + +/// Test the listener with an inbound peer limit of one peer, and a handshaker that always errors. +#[tokio::test] +async fn listener_peer_limit_one_handshake_error() { + zebra_test::init(); + + // This test requires an IPv4 network stack with 127.0.0.1 as localhost. + if zebra_test::net::zebra_skip_network_tests() { + return; + } + + let error_inbound_handshaker = + service_fn(|_| async { Err("test inbound handshaker always returns errors".into()) }); + + let (_config, mut peerset_tx) = + spawn_inbound_listener_with_peer_limit(1, error_inbound_handshaker).await; + + let peer_result = peerset_tx.try_next(); + assert!( + // `Err(_)` means that no peers are available, and the sender has not been dropped. + // `Ok(None)` means that no peers are available, and the sender has been dropped. + matches!(peer_result, Err(_) | Ok(None)), + "unexpected peer when all handshakes error: {:?}", + peer_result, + ); +} + +/// Test the listener with an inbound peer limit of one peer, +/// and a handshaker that returns success then disconnects the peer. +#[tokio::test] +async fn listener_peer_limit_one_handshake_ok_then_drop() { + zebra_test::init(); + + // This test requires an IPv4 network stack with 127.0.0.1 as localhost. + if zebra_test::net::zebra_skip_network_tests() { + return; + } + + let success_disconnect_inbound_handshaker = service_fn(|req: HandshakeRequest| async move { + let HandshakeRequest { + tcp_stream, + connected_addr: _, + connection_tracker, + } = req; + + let (server_tx, _server_rx) = mpsc::channel(0); + let (shutdown_tx, _shutdown_rx) = oneshot::channel(); + let error_slot = ErrorSlot::default(); + + let fake_client = peer::Client { + shutdown_tx: Some(shutdown_tx), + server_tx, + error_slot, + }; + + // Actually close the connection. + std::mem::drop(connection_tracker); + std::mem::drop(tcp_stream); + + // Give the crawler time to get the message. + tokio::task::yield_now().await; + + Ok(fake_client) + }); + + let (config, mut peerset_tx) = + spawn_inbound_listener_with_peer_limit(1, success_disconnect_inbound_handshaker).await; + + let mut peer_count: usize = 0; + loop { + let peer_result = peerset_tx.try_next(); + match peer_result { + // A peer handshake succeeded. + Ok(Some(peer_result)) => { + assert!( + matches!(peer_result, Ok(Change::Insert(_, _))), + "unexpected connection error: {:?}\n\ + {} previous peers succeeded", + peer_result, + peer_count, + ); + peer_count += 1; + } + + // The channel is closed and there are no messages left in the channel. + Ok(None) => break, + // The channel is still open, but there are no messages left in the channel. + Err(_) => break, + } + } + + assert!( + peer_count > config.peerset_inbound_connection_limit(), + "unexpected number of peer connections {}, should be over the limit of {}", + peer_count, + config.peerset_inbound_connection_limit(), + ); +} + +/// Test the listener with an inbound peer limit of one peer, +/// and a handshaker that returns success then holds the peer open. +#[tokio::test] +async fn listener_peer_limit_one_handshake_ok_stay_open() { + zebra_test::init(); + + // This test requires an IPv4 network stack with 127.0.0.1 as localhost. + if zebra_test::net::zebra_skip_network_tests() { + return; + } + + let success_stay_open_inbound_handshaker = service_fn(|req: HandshakeRequest| async move { + let HandshakeRequest { + tcp_stream, + connected_addr: _, + connection_tracker, + } = req; + + let (server_tx, _server_rx) = mpsc::channel(0); + let (shutdown_tx, _shutdown_rx) = oneshot::channel(); + let error_slot = ErrorSlot::default(); + + let fake_client = peer::Client { + shutdown_tx: Some(shutdown_tx), + server_tx, + error_slot, + }; + + // Fake the connection being open forever. + std::mem::forget(connection_tracker); + std::mem::forget(tcp_stream); + + Ok(fake_client) + }); + + let (config, mut peerset_tx) = + spawn_inbound_listener_with_peer_limit(1, success_stay_open_inbound_handshaker).await; + + let mut peer_count: usize = 0; + loop { + let peer_result = peerset_tx.try_next(); + match peer_result { + // A peer handshake succeeded. + Ok(Some(peer_result)) => { + assert!( + matches!(peer_result, Ok(Change::Insert(_, _))), + "unexpected connection error: {:?}\n\ + {} previous peers succeeded", + peer_result, + peer_count, + ); + peer_count += 1; + } + + // The channel is closed and there are no messages left in the channel. + Ok(None) => break, + // The channel is still open, but there are no messages left in the channel. + Err(_) => break, + } + } + + assert!( + peer_count <= config.peerset_inbound_connection_limit(), + "unexpected number of peer connections {}, over limit of {}", + peer_count, + config.peerset_inbound_connection_limit(), + ); +} + +/// Test the listener with the default inbound peer limit, and a handshaker that always errors. +#[tokio::test] +async fn listener_peer_limit_default_handshake_error() { + zebra_test::init(); + + // This test requires an IPv4 network stack with 127.0.0.1 as localhost. + if zebra_test::net::zebra_skip_network_tests() { + return; + } + + let error_inbound_handshaker = + service_fn(|_| async { Err("test inbound handshaker always returns errors".into()) }); + + let (_config, mut peerset_tx) = + spawn_inbound_listener_with_peer_limit(None, error_inbound_handshaker).await; + + let peer_result = peerset_tx.try_next(); + assert!( + // `Err(_)` means that no peers are available, and the sender has not been dropped. + // `Ok(None)` means that no peers are available, and the sender has been dropped. + matches!(peer_result, Err(_) | Ok(None)), + "unexpected peer when all handshakes error: {:?}", + peer_result, + ); +} + +/// Test the listener with the default inbound peer limit, +/// and a handshaker that returns success then disconnects the peer. +#[tokio::test] +async fn listener_peer_limit_default_handshake_ok_then_drop() { + zebra_test::init(); + + // This test requires an IPv4 network stack with 127.0.0.1 as localhost. + if zebra_test::net::zebra_skip_network_tests() { + return; + } + + let success_disconnect_inbound_handshaker = service_fn(|req: HandshakeRequest| async move { + let HandshakeRequest { + tcp_stream, + connected_addr: _, + connection_tracker, + } = req; + + let (server_tx, _server_rx) = mpsc::channel(0); + let (shutdown_tx, _shutdown_rx) = oneshot::channel(); + let error_slot = ErrorSlot::default(); + + let fake_client = peer::Client { + shutdown_tx: Some(shutdown_tx), + server_tx, + error_slot, + }; + + // Actually close the connection. + std::mem::drop(connection_tracker); + std::mem::drop(tcp_stream); + + // Give the crawler time to get the message. + tokio::task::yield_now().await; + + Ok(fake_client) + }); + + let (config, mut peerset_tx) = + spawn_inbound_listener_with_peer_limit(None, success_disconnect_inbound_handshaker).await; + + let mut peer_count: usize = 0; + loop { + let peer_result = peerset_tx.try_next(); + match peer_result { + // A peer handshake succeeded. + Ok(Some(peer_result)) => { + assert!( + matches!(peer_result, Ok(Change::Insert(_, _))), + "unexpected connection error: {:?}\n\ + {} previous peers succeeded", + peer_result, + peer_count, + ); + peer_count += 1; + } + + // The channel is closed and there are no messages left in the channel. + Ok(None) => break, + // The channel is still open, but there are no messages left in the channel. + Err(_) => break, + } + } + + assert!( + peer_count > config.peerset_inbound_connection_limit(), + "unexpected number of peer connections {}, should be over the limit of {}", + peer_count, + config.peerset_inbound_connection_limit(), + ); +} + +/// Test the listener with the default inbound peer limit, +/// and a handshaker that returns success then holds the peer open. +#[tokio::test] +async fn listener_peer_limit_default_handshake_ok_stay_open() { + zebra_test::init(); + + // This test requires an IPv4 network stack with 127.0.0.1 as localhost. + if zebra_test::net::zebra_skip_network_tests() { + return; + } + + let success_stay_open_inbound_handshaker = service_fn(|req: HandshakeRequest| async move { + let HandshakeRequest { + tcp_stream, + connected_addr: _, + connection_tracker, + } = req; + + let (server_tx, _server_rx) = mpsc::channel(0); + let (shutdown_tx, _shutdown_rx) = oneshot::channel(); + let error_slot = ErrorSlot::default(); + + let fake_client = peer::Client { + shutdown_tx: Some(shutdown_tx), + server_tx, + error_slot, + }; + + // Fake the connection being open forever. + std::mem::forget(connection_tracker); + std::mem::forget(tcp_stream); + + Ok(fake_client) + }); + + let (config, mut peerset_tx) = + spawn_inbound_listener_with_peer_limit(None, success_stay_open_inbound_handshaker).await; + + let mut peer_count: usize = 0; + loop { + let peer_result = peerset_tx.try_next(); + match peer_result { + // A peer handshake succeeded. + Ok(Some(peer_result)) => { + assert!( + matches!(peer_result, Ok(Change::Insert(_, _))), + "unexpected connection error: {:?}\n\ + {} previous peers succeeded", + peer_result, + peer_count, + ); + peer_count += 1; + } + + // The channel is closed and there are no messages left in the channel. + Ok(None) => break, + // The channel is still open, but there are no messages left in the channel. + Err(_) => break, + } + } + + assert!( + peer_count <= config.peerset_inbound_connection_limit(), + "unexpected number of peer connections {}, over limit of {}", + peer_count, + config.peerset_inbound_connection_limit(), + ); +} + /// Test if the initial seed peer connections is rate-limited. #[tokio::test] async fn add_initial_peers_is_rate_limited() { @@ -801,6 +1170,102 @@ where (config, peerset_rx) } +/// Run an inbound peer listener with `peerset_initial_target_size` and `handshaker`. +/// +/// Uses the default values for all other config fields. +/// +/// Returns the generated [`Config`], and the peer set receiver. +async fn spawn_inbound_listener_with_peer_limit( + peerset_initial_target_size: impl Into>, + listen_handshaker: S, +) -> (Config, mpsc::Receiver) +where + S: Service + + Clone + + Send + + 'static, + S::Future: Send + 'static, +{ + // Create a test config that listens on any unused port. + let listen_addr = "127.0.0.1:0".parse().unwrap(); + let mut config = Config { + listen_addr, + ..Config::default() + }; + + if let Some(peerset_initial_target_size) = peerset_initial_target_size.into() { + config.peerset_initial_target_size = peerset_initial_target_size; + } + + // Open the listener port. + let (tcp_listener, listen_addr) = open_listener(&config.clone()).await; + + // Make enough inbound connections to go over the limit, even if the limit is zero. + // Make the channels large enough to hold all the connections. + let over_limit_connections = config.peerset_inbound_connection_limit() * 2 + 1; + let (peerset_tx, peerset_rx) = mpsc::channel::(over_limit_connections); + + // Start listening for connections. + let listen_fut = { + let config = config.clone(); + let peerset_tx = peerset_tx.clone(); + accept_inbound_connections(config, tcp_listener, listen_handshaker, peerset_tx) + }; + let listen_task_handle = tokio::spawn(listen_fut); + + // Open inbound connections. + let mut outbound_task_handles = Vec::new(); + for _ in 0..over_limit_connections { + let outbound_fut = async move { + let outbound_result = TcpStream::connect(listen_addr).await; + // Let other tasks run before we block on reading. + tokio::task::yield_now().await; + + if let Ok(outbound_stream) = outbound_result { + // Wait until the listener closes the connection. + // The handshaker is fake, so it never sends any data. + let readable_result = outbound_stream.readable().await; + info!( + ?readable_result, + "outbound connection became readable or errored: \ + closing connection to test inbound listener" + ); + } else { + // If the connection is closed quickly, we might get errors here. + debug!( + ?outbound_result, + "outbound connection error in inbound listener test" + ); + } + }; + + // TODO: Abort all the tasks. + let outbound_task_handle = tokio::spawn(outbound_fut); + outbound_task_handles.push(outbound_task_handle); + } + + // Let the listener run for a while. + tokio::time::sleep(LISTENER_TEST_DURATION).await; + + // Stop the listener and outbound tasks, and let them finish. + listen_task_handle.abort(); + for outbound_task_handle in outbound_task_handles { + outbound_task_handle.abort(); + } + tokio::task::yield_now().await; + + // Check for panics or errors in the listener. + let listen_result = listen_task_handle.now_or_never(); + assert!( + matches!(listen_result, None) + || matches!(listen_result, Some(Err(ref e)) if e.is_cancelled()), + "unexpected error or panic in inbound peer listener task: {:?}", + listen_result, + ); + + (config, peerset_rx) +} + /// Initialize a task that connects to `peer_count` initial peers using the /// given connector. /// From 0fb5bc93e5d93573952a2afb7c84d97b3c9bffe3 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 28 Oct 2021 06:35:37 +1000 Subject: [PATCH 4/9] Use clone directly in function call argument lists --- zebra-network/src/peer_set/initialize.rs | 44 +++++++++---------- .../src/peer_set/initialize/tests/vectors.rs | 11 ++--- 2 files changed, 26 insertions(+), 29 deletions(-) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index e39f9f49582..35485109834 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -162,21 +162,20 @@ where // Connect peerset_tx to the 3 peer sources: // // 1. Incoming peer connections, via a listener. - let listen_fut = { - let config = config.clone(); - let peerset_tx = peerset_tx.clone(); - accept_inbound_connections(config, tcp_listener, listen_handshaker, peerset_tx) - }; + let listen_fut = accept_inbound_connections( + config.clone(), + tcp_listener, + listen_handshaker, + peerset_tx.clone(), + ); let listen_guard = tokio::spawn(listen_fut.instrument(Span::current())); // 2. Initial peers, specified in the config. - let initial_peers_fut = { - let config = config.clone(); - let outbound_connector = outbound_connector.clone(); - let peerset_tx = peerset_tx.clone(); - add_initial_peers(config, outbound_connector, peerset_tx) - }; - + let initial_peers_fut = add_initial_peers( + config.clone(), + outbound_connector.clone(), + peerset_tx.clone(), + ); let initial_peers_join = tokio::spawn(initial_peers_fut.instrument(Span::current())); // 3. Outgoing peers we connect to in response to load. @@ -204,18 +203,15 @@ where let _ = demand_tx.try_send(MorePeers); } - let crawl_fut = { - let config = config.clone(); - crawl_and_dial( - config, - demand_tx, - demand_rx, - candidates, - outbound_connector, - peerset_tx, - active_outbound_connections, - ) - }; + let crawl_fut = crawl_and_dial( + config, + demand_tx, + demand_rx, + candidates, + outbound_connector, + peerset_tx, + active_outbound_connections, + ); let crawl_guard = tokio::spawn(crawl_fut.instrument(Span::current())); handle_tx.send(vec![listen_guard, crawl_guard]).unwrap(); diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index 87d14851fe2..86be5a83f98 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -1206,11 +1206,12 @@ where let (peerset_tx, peerset_rx) = mpsc::channel::(over_limit_connections); // Start listening for connections. - let listen_fut = { - let config = config.clone(); - let peerset_tx = peerset_tx.clone(); - accept_inbound_connections(config, tcp_listener, listen_handshaker, peerset_tx) - }; + let listen_fut = accept_inbound_connections( + config.clone(), + tcp_listener, + listen_handshaker, + peerset_tx.clone(), + ); let listen_task_handle = tokio::spawn(listen_fut); // Open inbound connections. From 741d3e6f8db33884329e7e5fc24f7a9da430c8f2 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 28 Oct 2021 06:38:36 +1000 Subject: [PATCH 5/9] Remove an outdated comment --- zebra-network/src/peer_set/initialize/tests/vectors.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index 86be5a83f98..c550fa874e8 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -1240,7 +1240,6 @@ where } }; - // TODO: Abort all the tasks. let outbound_task_handle = tokio::spawn(outbound_fut); outbound_task_handles.push(outbound_task_handle); } From a13aafc375c085ee5fde4c53374a4496c74a6d11 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 28 Oct 2021 07:54:28 +1000 Subject: [PATCH 6/9] Update tests to use an unbounded channel rather than mem::forget And rename some variables. --- .../src/peer_set/initialize/tests/vectors.rs | 358 ++++++++++++------ 1 file changed, 244 insertions(+), 114 deletions(-) diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index c550fa874e8..39ea02b5049 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -261,10 +261,10 @@ async fn crawler_peer_limit_zero_connect_panic() { unreachable!("outbound connector should never be called with a zero peer limit") }); - let (_config, mut peerset_tx) = + let (_config, mut peerset_rx) = spawn_crawler_with_peer_limit(0, unreachable_outbound_connector).await; - let peer_result = peerset_tx.try_next(); + let peer_result = peerset_rx.try_next(); assert!( // `Err(_)` means that no peers are available, and the sender has not been dropped. // `Ok(None)` means that no peers are available, and the sender has been dropped. @@ -285,10 +285,10 @@ async fn crawler_peer_limit_one_connect_error() { let error_outbound_connector = service_fn(|_| async { Err("test outbound connector always returns errors".into()) }); - let (_config, mut peerset_tx) = + let (_config, mut peerset_rx) = spawn_crawler_with_peer_limit(1, error_outbound_connector).await; - let peer_result = peerset_tx.try_next(); + let peer_result = peerset_rx.try_next(); assert!( // `Err(_)` means that no peers are available, and the sender has not been dropped. // `Ok(None)` means that no peers are available, and the sender has been dropped. @@ -333,12 +333,12 @@ async fn crawler_peer_limit_one_connect_ok_then_drop() { Ok(Change::Insert(addr, fake_client)) }); - let (config, mut peerset_tx) = + let (config, mut peerset_rx) = spawn_crawler_with_peer_limit(1, success_disconnect_outbound_connector).await; let mut peer_count: usize = 0; loop { - let peer_result = peerset_tx.try_next(); + let peer_result = peerset_rx.try_next(); match peer_result { // A peer handshake succeeded. Ok(Some(peer_result)) => { @@ -376,8 +376,11 @@ async fn crawler_peer_limit_one_connect_ok_stay_open() { // This test does not require network access, because the outbound connector // and peer set are fake. - let success_stay_open_outbound_connector = - service_fn(|req: OutboundConnectorRequest| async move { + let (peer_tracker_tx, mut peer_tracker_rx) = mpsc::unbounded(); + + let success_stay_open_outbound_connector = service_fn(move |req: OutboundConnectorRequest| { + let peer_tracker_tx = peer_tracker_tx.clone(); + async move { let OutboundConnectorRequest { addr, connection_tracker, @@ -393,29 +396,49 @@ async fn crawler_peer_limit_one_connect_ok_stay_open() { error_slot, }; - // Fake the connection being open forever. - std::mem::forget(connection_tracker); + // Make the connection staying open. + peer_tracker_tx + .unbounded_send(connection_tracker) + .expect("unexpected error sending to unbounded channel"); Ok(Change::Insert(addr, fake_client)) - }); + } + }); - let (config, mut peerset_tx) = + let (config, mut peerset_rx) = spawn_crawler_with_peer_limit(1, success_stay_open_outbound_connector).await; - let mut peer_count: usize = 0; + let mut peer_change_count: usize = 0; loop { - let peer_result = peerset_tx.try_next(); - match peer_result { + let peer_change_result = peerset_rx.try_next(); + match peer_change_result { // A peer handshake succeeded. - Ok(Some(peer_result)) => { + Ok(Some(peer_change_result)) => { assert!( - matches!(peer_result, Ok(Change::Insert(_, _))), + matches!(peer_change_result, Ok(Change::Insert(_, _))), "unexpected connection error: {:?}\n\ {} previous peers succeeded", - peer_result, - peer_count, + peer_change_result, + peer_change_count, ); - peer_count += 1; + peer_change_count += 1; + } + + // The channel is closed and there are no messages left in the channel. + Ok(None) => break, + // The channel is still open, but there are no messages left in the channel. + Err(_) => break, + } + } + + let mut peer_tracker_count: usize = 0; + loop { + let peer_tracker_result = peer_tracker_rx.try_next(); + match peer_tracker_result { + // We held this peer tracker open until now. + Ok(Some(peer_tracker)) => { + std::mem::drop(peer_tracker); + peer_tracker_count += 1; } // The channel is closed and there are no messages left in the channel. @@ -426,10 +449,19 @@ async fn crawler_peer_limit_one_connect_ok_stay_open() { } assert!( - peer_count <= config.peerset_outbound_connection_limit(), - "unexpected number of peer connections {}, over limit of {}", - peer_count, + peer_change_count <= config.peerset_outbound_connection_limit(), + "unexpected number of peer changes {}, over limit of {}, had {} peer trackers", + peer_change_count, + config.peerset_outbound_connection_limit(), + peer_tracker_count, + ); + + assert!( + peer_tracker_count <= config.peerset_outbound_connection_limit(), + "unexpected number of peer trackers {}, over limit of {}, had {} peer changes", + peer_tracker_count, config.peerset_outbound_connection_limit(), + peer_change_count, ); } @@ -444,10 +476,10 @@ async fn crawler_peer_limit_default_connect_error() { let error_outbound_connector = service_fn(|_| async { Err("test outbound connector always returns errors".into()) }); - let (_config, mut peerset_tx) = + let (_config, mut peerset_rx) = spawn_crawler_with_peer_limit(None, error_outbound_connector).await; - let peer_result = peerset_tx.try_next(); + let peer_result = peerset_rx.try_next(); assert!( // `Err(_)` means that no peers are available, and the sender has not been dropped. // `Ok(None)` means that no peers are available, and the sender has been dropped. @@ -492,12 +524,12 @@ async fn crawler_peer_limit_default_connect_ok_then_drop() { Ok(Change::Insert(addr, fake_client)) }); - let (config, mut peerset_tx) = + let (config, mut peerset_rx) = spawn_crawler_with_peer_limit(None, success_disconnect_outbound_connector).await; let mut peer_count: usize = 0; loop { - let peer_result = peerset_tx.try_next(); + let peer_result = peerset_rx.try_next(); match peer_result { // A peer handshake succeeded. Ok(Some(peer_result)) => { @@ -538,8 +570,11 @@ async fn crawler_peer_limit_default_connect_ok_stay_open() { // This test does not require network access, because the outbound connector // and peer set are fake. - let success_stay_open_outbound_connector = - service_fn(|req: OutboundConnectorRequest| async move { + let (peer_tracker_tx, mut peer_tracker_rx) = mpsc::unbounded(); + + let success_stay_open_outbound_connector = service_fn(move |req: OutboundConnectorRequest| { + let peer_tracker_tx = peer_tracker_tx.clone(); + async move { let OutboundConnectorRequest { addr, connection_tracker, @@ -555,30 +590,50 @@ async fn crawler_peer_limit_default_connect_ok_stay_open() { error_slot, }; - // Fake the connection being open forever. - std::mem::forget(connection_tracker); + // Make the connection staying open. + peer_tracker_tx + .unbounded_send(connection_tracker) + .expect("unexpected error sending to unbounded channel"); Ok(Change::Insert(addr, fake_client)) - }); + } + }); // The initial target size is multiplied by 1.5 to give the actual limit. - let (config, mut peerset_tx) = + let (config, mut peerset_rx) = spawn_crawler_with_peer_limit(None, success_stay_open_outbound_connector).await; - let mut peer_count: usize = 0; + let mut peer_change_count: usize = 0; loop { - let peer_result = peerset_tx.try_next(); - match peer_result { + let peer_change_result = peerset_rx.try_next(); + match peer_change_result { // A peer handshake succeeded. - Ok(Some(peer_result)) => { + Ok(Some(peer_change_result)) => { assert!( - matches!(peer_result, Ok(Change::Insert(_, _))), + matches!(peer_change_result, Ok(Change::Insert(_, _))), "unexpected connection error: {:?}\n\ {} previous peers succeeded", - peer_result, - peer_count, + peer_change_result, + peer_change_count, ); - peer_count += 1; + peer_change_count += 1; + } + + // The channel is closed and there are no messages left in the channel. + Ok(None) => break, + // The channel is still open, but there are no messages left in the channel. + Err(_) => break, + } + } + + let mut peer_tracker_count: usize = 0; + loop { + let peer_tracker_result = peer_tracker_rx.try_next(); + match peer_tracker_result { + // We held this peer tracker open until now. + Ok(Some(peer_tracker)) => { + std::mem::drop(peer_tracker); + peer_tracker_count += 1; } // The channel is closed and there are no messages left in the channel. @@ -589,10 +644,19 @@ async fn crawler_peer_limit_default_connect_ok_stay_open() { } assert!( - peer_count <= config.peerset_outbound_connection_limit(), - "unexpected number of peer connections {}, over limit of {}", - peer_count, + peer_change_count <= config.peerset_outbound_connection_limit(), + "unexpected number of peer changes {}, over limit of {}, had {} peer trackers", + peer_change_count, config.peerset_outbound_connection_limit(), + peer_tracker_count, + ); + + assert!( + peer_tracker_count <= config.peerset_outbound_connection_limit(), + "unexpected number of peer trackers {}, over limit of {}, had {} peer changes", + peer_tracker_count, + config.peerset_outbound_connection_limit(), + peer_change_count, ); } @@ -610,10 +674,10 @@ async fn listener_peer_limit_zero_handshake_panic() { unreachable!("inbound handshaker should never be called with a zero peer limit") }); - let (_config, mut peerset_tx) = + let (_config, mut peerset_rx) = spawn_inbound_listener_with_peer_limit(0, unreachable_inbound_handshaker).await; - let peer_result = peerset_tx.try_next(); + let peer_result = peerset_rx.try_next(); assert!( // `Err(_)` means that no peers are available, and the sender has not been dropped. // `Ok(None)` means that no peers are available, and the sender has been dropped. @@ -636,10 +700,10 @@ async fn listener_peer_limit_one_handshake_error() { let error_inbound_handshaker = service_fn(|_| async { Err("test inbound handshaker always returns errors".into()) }); - let (_config, mut peerset_tx) = + let (_config, mut peerset_rx) = spawn_inbound_listener_with_peer_limit(1, error_inbound_handshaker).await; - let peer_result = peerset_tx.try_next(); + let peer_result = peerset_rx.try_next(); assert!( // `Err(_)` means that no peers are available, and the sender has not been dropped. // `Ok(None)` means that no peers are available, and the sender has been dropped. @@ -687,12 +751,12 @@ async fn listener_peer_limit_one_handshake_ok_then_drop() { Ok(fake_client) }); - let (config, mut peerset_tx) = + let (config, mut peerset_rx) = spawn_inbound_listener_with_peer_limit(1, success_disconnect_inbound_handshaker).await; let mut peer_count: usize = 0; loop { - let peer_result = peerset_tx.try_next(); + let peer_result = peerset_rx.try_next(); match peer_result { // A peer handshake succeeded. Ok(Some(peer_result)) => { @@ -732,47 +796,71 @@ async fn listener_peer_limit_one_handshake_ok_stay_open() { return; } - let success_stay_open_inbound_handshaker = service_fn(|req: HandshakeRequest| async move { - let HandshakeRequest { - tcp_stream, - connected_addr: _, - connection_tracker, - } = req; + let (peer_tracker_tx, mut peer_tracker_rx) = mpsc::unbounded(); - let (server_tx, _server_rx) = mpsc::channel(0); - let (shutdown_tx, _shutdown_rx) = oneshot::channel(); - let error_slot = ErrorSlot::default(); + let success_stay_open_inbound_handshaker = service_fn(move |req: HandshakeRequest| { + let peer_tracker_tx = peer_tracker_tx.clone(); + async move { + let HandshakeRequest { + tcp_stream, + connected_addr: _, + connection_tracker, + } = req; - let fake_client = peer::Client { - shutdown_tx: Some(shutdown_tx), - server_tx, - error_slot, - }; + let (server_tx, _server_rx) = mpsc::channel(0); + let (shutdown_tx, _shutdown_rx) = oneshot::channel(); + let error_slot = ErrorSlot::default(); + + let fake_client = peer::Client { + shutdown_tx: Some(shutdown_tx), + server_tx, + error_slot, + }; - // Fake the connection being open forever. - std::mem::forget(connection_tracker); - std::mem::forget(tcp_stream); + // Make the connection staying open. + peer_tracker_tx + .unbounded_send((tcp_stream, connection_tracker)) + .expect("unexpected error sending to unbounded channel"); - Ok(fake_client) + Ok(fake_client) + } }); - let (config, mut peerset_tx) = + let (config, mut peerset_rx) = spawn_inbound_listener_with_peer_limit(1, success_stay_open_inbound_handshaker).await; - let mut peer_count: usize = 0; + let mut peer_change_count: usize = 0; loop { - let peer_result = peerset_tx.try_next(); - match peer_result { + let peer_change_result = peerset_rx.try_next(); + match peer_change_result { // A peer handshake succeeded. - Ok(Some(peer_result)) => { + Ok(Some(peer_change_result)) => { assert!( - matches!(peer_result, Ok(Change::Insert(_, _))), + matches!(peer_change_result, Ok(Change::Insert(_, _))), "unexpected connection error: {:?}\n\ {} previous peers succeeded", - peer_result, - peer_count, + peer_change_result, + peer_change_count, ); - peer_count += 1; + peer_change_count += 1; + } + + // The channel is closed and there are no messages left in the channel. + Ok(None) => break, + // The channel is still open, but there are no messages left in the channel. + Err(_) => break, + } + } + + let mut peer_tracker_count: usize = 0; + loop { + let peer_tracker_result = peer_tracker_rx.try_next(); + match peer_tracker_result { + // We held this peer connection and tracker open until now. + Ok(Some((peer_connection, peer_tracker))) => { + std::mem::drop(peer_connection); + std::mem::drop(peer_tracker); + peer_tracker_count += 1; } // The channel is closed and there are no messages left in the channel. @@ -783,10 +871,19 @@ async fn listener_peer_limit_one_handshake_ok_stay_open() { } assert!( - peer_count <= config.peerset_inbound_connection_limit(), - "unexpected number of peer connections {}, over limit of {}", - peer_count, + peer_change_count <= config.peerset_inbound_connection_limit(), + "unexpected number of peer changes {}, over limit of {}, had {} peer trackers", + peer_change_count, + config.peerset_inbound_connection_limit(), + peer_tracker_count, + ); + + assert!( + peer_tracker_count <= config.peerset_inbound_connection_limit(), + "unexpected number of peer trackers {}, over limit of {}, had {} peer changes", + peer_tracker_count, config.peerset_inbound_connection_limit(), + peer_change_count, ); } @@ -803,10 +900,10 @@ async fn listener_peer_limit_default_handshake_error() { let error_inbound_handshaker = service_fn(|_| async { Err("test inbound handshaker always returns errors".into()) }); - let (_config, mut peerset_tx) = + let (_config, mut peerset_rx) = spawn_inbound_listener_with_peer_limit(None, error_inbound_handshaker).await; - let peer_result = peerset_tx.try_next(); + let peer_result = peerset_rx.try_next(); assert!( // `Err(_)` means that no peers are available, and the sender has not been dropped. // `Ok(None)` means that no peers are available, and the sender has been dropped. @@ -854,12 +951,12 @@ async fn listener_peer_limit_default_handshake_ok_then_drop() { Ok(fake_client) }); - let (config, mut peerset_tx) = + let (config, mut peerset_rx) = spawn_inbound_listener_with_peer_limit(None, success_disconnect_inbound_handshaker).await; let mut peer_count: usize = 0; loop { - let peer_result = peerset_tx.try_next(); + let peer_result = peerset_rx.try_next(); match peer_result { // A peer handshake succeeded. Ok(Some(peer_result)) => { @@ -899,47 +996,71 @@ async fn listener_peer_limit_default_handshake_ok_stay_open() { return; } - let success_stay_open_inbound_handshaker = service_fn(|req: HandshakeRequest| async move { - let HandshakeRequest { - tcp_stream, - connected_addr: _, - connection_tracker, - } = req; + let (peer_tracker_tx, mut peer_tracker_rx) = mpsc::unbounded(); - let (server_tx, _server_rx) = mpsc::channel(0); - let (shutdown_tx, _shutdown_rx) = oneshot::channel(); - let error_slot = ErrorSlot::default(); + let success_stay_open_inbound_handshaker = service_fn(move |req: HandshakeRequest| { + let peer_tracker_tx = peer_tracker_tx.clone(); + async move { + let HandshakeRequest { + tcp_stream, + connected_addr: _, + connection_tracker, + } = req; - let fake_client = peer::Client { - shutdown_tx: Some(shutdown_tx), - server_tx, - error_slot, - }; + let (server_tx, _server_rx) = mpsc::channel(0); + let (shutdown_tx, _shutdown_rx) = oneshot::channel(); + let error_slot = ErrorSlot::default(); - // Fake the connection being open forever. - std::mem::forget(connection_tracker); - std::mem::forget(tcp_stream); + let fake_client = peer::Client { + shutdown_tx: Some(shutdown_tx), + server_tx, + error_slot, + }; - Ok(fake_client) + // Make the connection staying open. + peer_tracker_tx + .unbounded_send((tcp_stream, connection_tracker)) + .expect("unexpected error sending to unbounded channel"); + + Ok(fake_client) + } }); - let (config, mut peerset_tx) = + let (config, mut peerset_rx) = spawn_inbound_listener_with_peer_limit(None, success_stay_open_inbound_handshaker).await; - let mut peer_count: usize = 0; + let mut peer_change_count: usize = 0; loop { - let peer_result = peerset_tx.try_next(); - match peer_result { + let peer_change_result = peerset_rx.try_next(); + match peer_change_result { // A peer handshake succeeded. - Ok(Some(peer_result)) => { + Ok(Some(peer_change_result)) => { assert!( - matches!(peer_result, Ok(Change::Insert(_, _))), + matches!(peer_change_result, Ok(Change::Insert(_, _))), "unexpected connection error: {:?}\n\ {} previous peers succeeded", - peer_result, - peer_count, + peer_change_result, + peer_change_count, ); - peer_count += 1; + peer_change_count += 1; + } + + // The channel is closed and there are no messages left in the channel. + Ok(None) => break, + // The channel is still open, but there are no messages left in the channel. + Err(_) => break, + } + } + + let mut peer_tracker_count: usize = 0; + loop { + let peer_tracker_result = peer_tracker_rx.try_next(); + match peer_tracker_result { + // We held this peer connection and tracker open until now. + Ok(Some((peer_connection, peer_tracker))) => { + std::mem::drop(peer_connection); + std::mem::drop(peer_tracker); + peer_tracker_count += 1; } // The channel is closed and there are no messages left in the channel. @@ -950,10 +1071,19 @@ async fn listener_peer_limit_default_handshake_ok_stay_open() { } assert!( - peer_count <= config.peerset_inbound_connection_limit(), - "unexpected number of peer connections {}, over limit of {}", - peer_count, + peer_change_count <= config.peerset_inbound_connection_limit(), + "unexpected number of peer changes {}, over limit of {}, had {} peer trackers", + peer_change_count, + config.peerset_inbound_connection_limit(), + peer_tracker_count, + ); + + assert!( + peer_tracker_count <= config.peerset_inbound_connection_limit(), + "unexpected number of peer trackers {}, over limit of {}, had {} peer changes", + peer_tracker_count, config.peerset_inbound_connection_limit(), + peer_change_count, ); } From b50e322308ef80a42ca2136ff0edcdadf3d6bfb8 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 28 Oct 2021 10:35:58 +1000 Subject: [PATCH 7/9] Use a lower limit in a slow test and require that it is exceeded --- .../src/peer_set/initialize/tests/vectors.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index 39ea02b5049..da5bf498ea0 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -524,8 +524,10 @@ async fn crawler_peer_limit_default_connect_ok_then_drop() { Ok(Change::Insert(addr, fake_client)) }); + // TODO: tweak the crawler timeouts and rate-limits so we get over the actual limit + // (currently, getting over the limit can take 30 seconds or more) let (config, mut peerset_rx) = - spawn_crawler_with_peer_limit(None, success_disconnect_outbound_connector).await; + spawn_crawler_with_peer_limit(15, success_disconnect_outbound_connector).await; let mut peer_count: usize = 0; loop { @@ -550,14 +552,11 @@ async fn crawler_peer_limit_default_connect_ok_then_drop() { } } - // TODO: tweak the crawler timeouts and rate-limits so we get over the actual limit - // (currently, getting over the limit can take 30 seconds or more) - let lower_limit = config.peerset_outbound_connection_limit() / 3; assert!( - peer_count > lower_limit, + peer_count > config.peerset_outbound_connection_limit(), "unexpected number of peer connections {}, should be over the limit of {}", peer_count, - lower_limit, + config.peerset_outbound_connection_limit(), ); } From 417ac5f11c6faa69911eaf887bc59806c4e8dfc1 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 28 Oct 2021 10:42:09 +1000 Subject: [PATCH 8/9] Panic if Zebra exceeds its connection limit (#2947) --- zebra-network/src/peer_set/initialize.rs | 1 + zebra-network/src/peer_set/set.rs | 17 ++++++++++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 35485109834..ab355b6fcbb 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -144,6 +144,7 @@ where // Connect the rx end to a PeerSet, wrapping new peers in load instruments. let peer_set = PeerSet::new( + &config, PeakEwmaDiscover::new( // Discover interprets an error as stream termination, // so discard any errored connections... diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index d224d2568d6..6531859960a 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -80,7 +80,7 @@ use crate::{ external::InventoryHash, internal::{Request, Response}, }, - AddressBook, BoxError, + AddressBook, BoxError, Config, }; /// A signal sent by the [`PeerSet`] when it has no ready peers, and gets a request from Zebra. @@ -134,6 +134,8 @@ where /// /// Used for logging diagnostics. address_book: Arc>, + /// The configured limit for inbound and outbound connections. + peerset_total_connection_limit: usize, } impl PeerSet @@ -147,6 +149,7 @@ where { /// Construct a peerset which uses `discover` internally. pub fn new( + config: &Config, discover: D, demand_signal: mpsc::Sender, handle_rx: tokio::sync::oneshot::Receiver>>>, @@ -165,6 +168,7 @@ where inventory_registry: InventoryRegistry::new(inv_stream), last_peer_log: None, address_book, + peerset_total_connection_limit: config.peerset_total_connection_limit(), } } @@ -432,6 +436,17 @@ where metrics::gauge!("pool.num_ready", num_ready as f64); metrics::gauge!("pool.num_unready", num_unready as f64); metrics::gauge!("zcash.net.peers", num_peers as f64); + + // Security: make sure we haven't exceeded the connection limit + if num_peers > self.peerset_total_connection_limit { + let address_metrics = self.address_book.lock().unwrap().address_metrics(); + panic!( + "unexpectedly exceeded configured peer set connection limit: \n\ + peers: {:?}, ready: {:?}, unready: {:?}, \n\ + address_metrics: {:?}", + num_peers, num_ready, num_unready, address_metrics, + ); + } } } From 5200b341b09f250a2d417d51551e79491a8d1beb Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 28 Oct 2021 10:57:49 +1000 Subject: [PATCH 9/9] Run some network test cases in parallel to speed them up (#2960) --- .../src/peer_set/initialize/tests/vectors.rs | 43 +++++++++++++++++-- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index da5bf498ea0..245fb212b00 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -64,7 +64,7 @@ const LISTENER_TEST_DURATION: Duration = Duration::from_secs(10); /// /// Note: This test doesn't cover local interface or public IP address discovery. #[tokio::test] -async fn local_listener_unspecified_port_unspecified_addr() { +async fn local_listener_unspecified_port_unspecified_addr_v4() { zebra_test::init(); if zebra_test::net::zebra_skip_network_tests() { @@ -75,6 +75,19 @@ async fn local_listener_unspecified_port_unspecified_addr() { // (localhost should be enough) local_listener_port_with("0.0.0.0:0".parse().unwrap(), Mainnet).await; local_listener_port_with("0.0.0.0:0".parse().unwrap(), Testnet).await; +} + +/// Test that zebra-network discovers dynamic bind-to-all-interfaces listener ports, +/// and sends them to the `AddressBook`. +/// +/// Note: This test doesn't cover local interface or public IP address discovery. +#[tokio::test] +async fn local_listener_unspecified_port_unspecified_addr_v6() { + zebra_test::init(); + + if zebra_test::net::zebra_skip_network_tests() { + return; + } if zebra_test::net::zebra_skip_ipv6_tests() { return; @@ -88,7 +101,7 @@ async fn local_listener_unspecified_port_unspecified_addr() { /// Test that zebra-network discovers dynamic localhost listener ports, /// and sends them to the `AddressBook`. #[tokio::test] -async fn local_listener_unspecified_port_localhost_addr() { +async fn local_listener_unspecified_port_localhost_addr_v4() { zebra_test::init(); if zebra_test::net::zebra_skip_network_tests() { @@ -98,6 +111,17 @@ async fn local_listener_unspecified_port_localhost_addr() { // these tests might fail on machines with unusual IPv4 localhost configs local_listener_port_with("127.0.0.1:0".parse().unwrap(), Mainnet).await; local_listener_port_with("127.0.0.1:0".parse().unwrap(), Testnet).await; +} + +/// Test that zebra-network discovers dynamic localhost listener ports, +/// and sends them to the `AddressBook`. +#[tokio::test] +async fn local_listener_unspecified_port_localhost_addr_v6() { + zebra_test::init(); + + if zebra_test::net::zebra_skip_network_tests() { + return; + } if zebra_test::net::zebra_skip_ipv6_tests() { return; @@ -110,11 +134,10 @@ async fn local_listener_unspecified_port_localhost_addr() { /// Test that zebra-network propagates fixed localhost listener ports to the `AddressBook`. #[tokio::test] -async fn local_listener_fixed_port_localhost_addr() { +async fn local_listener_fixed_port_localhost_addr_v4() { zebra_test::init(); let localhost_v4 = "127.0.0.1".parse().unwrap(); - let localhost_v6 = "::1".parse().unwrap(); if zebra_test::net::zebra_skip_network_tests() { return; @@ -122,6 +145,18 @@ async fn local_listener_fixed_port_localhost_addr() { local_listener_port_with(SocketAddr::new(localhost_v4, random_known_port()), Mainnet).await; local_listener_port_with(SocketAddr::new(localhost_v4, random_known_port()), Testnet).await; +} + +/// Test that zebra-network propagates fixed localhost listener ports to the `AddressBook`. +#[tokio::test] +async fn local_listener_fixed_port_localhost_addr_v6() { + zebra_test::init(); + + let localhost_v6 = "::1".parse().unwrap(); + + if zebra_test::net::zebra_skip_network_tests() { + return; + } if zebra_test::net::zebra_skip_ipv6_tests() { return;