diff --git a/Cargo.lock b/Cargo.lock index 2830055192..be61e911a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6513,6 +6513,7 @@ dependencies = [ "crypto", "hex", "itertools 0.11.0", + "logging", "rand_chacha 0.3.1", "rstest", "serialization", diff --git a/p2p/p2p-test-utils/src/lib.rs b/p2p/p2p-test-utils/src/lib.rs index 97e34b25dd..a5eca2da4f 100644 --- a/p2p/p2p-test-utils/src/lib.rs +++ b/p2p/p2p-test-utils/src/lib.rs @@ -40,21 +40,23 @@ pub fn start_subsystems( ShutdownTrigger, ManagerJoinHandle, ) { + let time_getter: TimeGetter = Default::default(); let chainstate = make_chainstate( Arc::clone(&chain_config), ChainstateConfig::new(), chainstate_storage::inmemory::Store::new_empty().unwrap(), DefaultTransactionVerificationStrategy::new(), None, - Default::default(), + time_getter.clone(), ) .unwrap(); - start_subsystems_with_chainstate(chainstate, chain_config) + start_subsystems_with_chainstate(chainstate, chain_config, time_getter) } pub fn start_subsystems_with_chainstate( chainstate: ChainstateSubsystem, chain_config: Arc<ChainConfig>, + time_getter: TimeGetter, ) -> ( ChainstateHandle, MempoolHandle, @@ -66,7 +68,7 @@ pub fn start_subsystems_with_chainstate( let chainstate = manager.add_subsystem("p2p-test-chainstate", chainstate); - let mempool = mempool::make_mempool(chain_config, chainstate.clone(), Default::default()); + let mempool = mempool::make_mempool(chain_config, chainstate.clone(), time_getter); let mempool = manager.add_custom_subsystem("p2p-test-mempool", |handle| mempool.init(handle)); let manager_handle = manager.main_in_task(); @@ -90,6 +92,7 @@ pub fn create_n_blocks(tf: &mut TestFramework, n: usize) -> Vec<Block> { blocks } +#[derive(Clone)] pub struct P2pBasicTestTimeGetter { current_time_millis: Arc<SeqCstAtomicU64>, } diff --git a/p2p/src/net/default_backend/backend.rs b/p2p/src/net/default_backend/backend.rs index 3ad99549a4..8bb6553b2a 100644 --- a/p2p/src/net/default_backend/backend.rs +++ b/p2p/src/net/default_backend/backend.rs @@ -413,7 +413,10 @@ where None => return Ok(()), }; + log::debug!("Creating peer {peer_id} after handshake"); + if self.is_connection_from_self(connection_info, handshake_nonce)? { + log::debug!("Peer {peer_id} is a connection from self"); return Ok(()); } diff --git a/p2p/src/net/default_backend/transport/impls/channel.rs b/p2p/src/net/default_backend/transport/impls/channel.rs index 867172efd5..63ad1c107b 100644 --- a/p2p/src/net/default_backend/transport/impls/channel.rs +++ b/p2p/src/net/default_backend/transport/impls/channel.rs @@ -68,7 +68,22 @@ pub struct MpscChannelTransport { impl MpscChannelTransport { pub fn new() -> Self { - let local_address: Ipv4Addr = NEXT_IP_ADDRESS.fetch_add(1, Ordering::Relaxed).into(); + Self::new_with_addr_in_group(0, 0) + } + + /// Create a new transport with a local address in the specified "group", which is represented + /// by a certain number of most significant bits in the ip address. + /// + /// The resulting local address will be: + /// (addr_group_idx << (32 - addr_group_bits)) + NEXT_IP_ADDRESS + pub fn new_with_addr_in_group(addr_group_idx: u32, addr_group_bits: u32) -> Self { + let addr_group_bit_offset = 32 - addr_group_bits; + let next_addr = NEXT_IP_ADDRESS.fetch_add(1, Ordering::Relaxed); + assert!((next_addr as u64) < (1_u64 << addr_group_bit_offset)); + let addr_group = (addr_group_idx as u64) << addr_group_bit_offset; + assert!(addr_group <= u32::MAX as u64); + + let local_address: Ipv4Addr = (next_addr + addr_group as u32).into(); MpscChannelTransport { local_address: local_address.into(), last_port: 1024.into(), diff --git a/p2p/src/net/types.rs b/p2p/src/net/types.rs index 71313a5d18..d07fc844bf 100644 --- a/p2p/src/net/types.rs +++ b/p2p/src/net/types.rs @@ -68,7 +68,7 @@ impl PeerRole { /// wants to keep the connection open or close it and possibly ban the peer from. /// /// If new fields are added, make sure they are limited in size. -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub struct PeerInfo { /// Unique ID of the peer pub peer_id: PeerId, diff --git a/p2p/src/peer_manager/address_groups.rs b/p2p/src/peer_manager/address_groups.rs index 98eb5ba851..404bd9d8d2 100644 --- a/p2p/src/peer_manager/address_groups.rs +++ b/p2p/src/peer_manager/address_groups.rs @@ -18,9 +18,9 @@ use std::net::{Ipv4Addr, Ipv6Addr}; use crate::types::peer_address::PeerAddress; // IPv4 addresses grouped into /16 subnets -const IPV4_GROUP_BYTES: usize = 2; +pub const IPV4_GROUP_BYTES: usize = 2; // IPv6 addresses grouped into /32 subnets -const IPV6_GROUP_BYTES: usize = 4; +pub const IPV6_GROUP_BYTES: usize = 4; #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum AddressGroup { diff --git a/p2p/src/peer_manager/dns_seed.rs b/p2p/src/peer_manager/dns_seed.rs new file mode 100644 index 0000000000..b5e7ced9cb --- /dev/null +++ b/p2p/src/peer_manager/dns_seed.rs @@ -0,0 +1,100 @@ +// Copyright (c) 2021-2023 RBB S.r.l +// opensource@mintlayer.org +// SPDX-License-Identifier: MIT +// Licensed under the MIT License; +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use common::chain::{config::ChainType, ChainConfig}; +use crypto::random::{make_pseudo_rng, seq::IteratorRandom}; +use logging::log; +use p2p_types::socket_address::SocketAddress; + +use crate::config::P2pConfig; + +#[async_trait] +pub trait DnsSeed: Send + Sync { + async fn obtain_addresses(&self) -> Vec<SocketAddress>; +} + +pub struct DefaultDnsSeed { + chain_config: Arc<ChainConfig>, + p2p_config: Arc<P2pConfig>, +} + +impl DefaultDnsSeed { + pub fn new(chain_config: Arc<ChainConfig>, p2p_config: Arc<P2pConfig>) -> Self { + Self { + chain_config, + p2p_config, + } + } +} + +/// Hardcoded seed DNS hostnames +// TODO: Replace with actual values +const DNS_SEEDS_MAINNET: [&str; 0] = []; +const DNS_SEEDS_TESTNET: [&str; 1] = ["testnet-seed.mintlayer.org"]; + +/// Maximum number of records accepted in a single DNS server response +const MAX_DNS_RECORDS: usize = 10; + +#[async_trait] +impl DnsSeed for DefaultDnsSeed { + async fn obtain_addresses(&self) -> Vec<SocketAddress> { + let dns_seed = match self.chain_config.chain_type() { + ChainType::Mainnet => DNS_SEEDS_MAINNET.as_slice(), + ChainType::Testnet => DNS_SEEDS_TESTNET.as_slice(), + ChainType::Regtest | ChainType::Signet => &[], + }; + + if dns_seed.is_empty() { + return Vec::new(); + } + + log::debug!("Resolve DNS seed..."); + let results = futures::future::join_all( + dns_seed + .iter() + .map(|host| tokio::net::lookup_host((*host, self.chain_config.p2p_port()))), + ) + .await; + + let mut addresses = Vec::new(); + for result in results { + match result { + Ok(list) => { + list.filter_map(|addr| { + SocketAddress::from_peer_address( + // Convert SocketAddr to PeerAddress + &addr.into(), + *self.p2p_config.allow_discover_private_ips, + ) + }) + // Randomize selection because records can be sorted by type (A and AAAA) + .choose_multiple(&mut make_pseudo_rng(), MAX_DNS_RECORDS) + .into_iter() + .for_each(|addr| { + addresses.push(addr); + }); + } + Err(err) => { + log::error!("resolve DNS seed failed: {err}"); + } + } + } + log::debug!("DNS seed records found: {}", addresses.len()); + addresses + } +} diff --git a/p2p/src/peer_manager/mod.rs b/p2p/src/peer_manager/mod.rs index 73264dcb62..9dadde03d6 100644 --- a/p2p/src/peer_manager/mod.rs +++ b/p2p/src/peer_manager/mod.rs @@ -16,10 +16,11 @@ //! Peer manager pub mod address_groups; +pub mod dns_seed; pub mod peer_context; pub mod peerdb; pub mod peerdb_common; -mod peers_eviction; +pub mod peers_eviction; use std::{ collections::{BTreeMap, BTreeSet, HashMap}, @@ -36,8 +37,8 @@ use tokio::sync::mpsc; use chainstate::ban_score::BanScore; use common::{ - chain::{config::ChainType, ChainConfig}, - primitives::time::duration_to_int, + chain::ChainConfig, + primitives::time::{duration_to_int, Time}, time_getter::TimeGetter, }; use crypto::random::{make_pseudo_rng, seq::IteratorRandom, Rng}; @@ -70,30 +71,34 @@ use crate::{ }; use self::{ + dns_seed::{DefaultDnsSeed, DnsSeed}, peer_context::{PeerContext, SentPing}, peerdb::storage::PeerDbStorage, }; /// Desired number of full relay outbound connections. /// This value is constant because users should not change this. -const OUTBOUND_FULL_RELAY_COUNT: usize = 8; +pub const OUTBOUND_FULL_RELAY_COUNT: usize = 8; /// Desired number of block relay outbound connections (two permanent and one temporary). /// This value is constant because users should not change this. -const OUTBOUND_BLOCK_RELAY_COUNT: usize = 3; +pub const OUTBOUND_BLOCK_RELAY_COUNT: usize = 3; /// Desired number of automatic outbound connections -const OUTBOUND_FULL_AND_BLOCK_RELAY_COUNT: usize = +pub const OUTBOUND_FULL_AND_BLOCK_RELAY_COUNT: usize = OUTBOUND_FULL_RELAY_COUNT + OUTBOUND_BLOCK_RELAY_COUNT; /// Lower bound for how often [`PeerManager::heartbeat()`] is called const PEER_MGR_HEARTBEAT_INTERVAL_MIN: Duration = Duration::from_secs(5); /// Upper bound for how often [`PeerManager::heartbeat()`] is called -const PEER_MGR_HEARTBEAT_INTERVAL_MAX: Duration = Duration::from_secs(30); +pub const PEER_MGR_HEARTBEAT_INTERVAL_MAX: Duration = Duration::from_secs(30); /// How often resend own address to a specific peer (on average) const RESEND_OWN_ADDRESS_TO_PEER_PERIOD: Duration = Duration::from_secs(24 * 60 * 60); +/// The interval at which to contact DNS seed servers. +pub const PEER_MGR_DNS_RELOAD_INTERVAL: Duration = Duration::from_secs(60); + /// How many addresses are allowed to be sent const MAX_ADDRESS_COUNT: usize = 1000; @@ -112,14 +117,6 @@ const PEER_ADDRESS_RESEND_COUNT: usize = 2; const PEER_ADDRESSES_ROLLING_BLOOM_FILTER_SIZE: usize = 5000; const PEER_ADDRESSES_ROLLING_BLOOM_FPP: f64 = 0.001; -/// Hardcoded seed DNS hostnames -// TODO: Replace with actual values -const DNS_SEEDS_MAINNET: [&str; 0] = []; -const DNS_SEEDS_TESTNET: [&str; 1] = ["testnet-seed.mintlayer.org"]; - -/// Maximum number of records accepted in a single DNS server response -const MAX_DNS_RECORDS: usize = 10; - enum OutboundConnectType { /// OutboundBlockRelay or OutboundFullRelay Automatic, @@ -174,8 +171,15 @@ where peer_eviction_random_state: peers_eviction::RandomState, + /// Last time when a new tip was added to the chainstate. + last_chainstate_tip_block_time: Time, + /// PeerManager's observer for use by tests. observer: Option<Box<dyn Observer + Send>>, + + /// Normally, this will be DefaultDnsSeed, which performs the actual address lookup, but tests can + /// substitute it with a mock implementation. + dns_seed: Box<dyn DnsSeed>, } /// Takes IP or socket address and converts it to socket address (adding the default peer port if IP address is used) @@ -200,18 +204,20 @@ where time_getter: TimeGetter, peerdb_storage: S, ) -> crate::Result<Self> { - Self::new_with_observer( - chain_config, - p2p_config, + Self::new_generic( + chain_config.clone(), + p2p_config.clone(), handle, peer_mgr_event_rx, time_getter, peerdb_storage, None, + Box::new(DefaultDnsSeed::new(chain_config, p2p_config)), ) } - pub fn new_with_observer( + #[allow(clippy::too_many_arguments)] + pub fn new_generic( chain_config: Arc<ChainConfig>, p2p_config: Arc<P2pConfig>, handle: T::ConnectivityHandle, @@ -219,6 +225,7 @@ where time_getter: TimeGetter, peerdb_storage: S, observer: Option<Box<dyn Observer + Send>>, + dns_seed: Box<dyn DnsSeed + Send>, ) -> crate::Result<Self> { let mut rng = make_pseudo_rng(); let peerdb = peerdb::PeerDb::new( @@ -227,6 +234,7 @@ where time_getter.clone(), peerdb_storage, )?; + let now = time_getter.get_time(); assert!(!p2p_config.outbound_connection_timeout.is_zero()); assert!(!p2p_config.ping_timeout.is_zero()); Ok(PeerManager { @@ -241,7 +249,9 @@ where peerdb, subscribed_to_peer_addresses: BTreeSet::new(), peer_eviction_random_state: peers_eviction::RandomState::new(&mut rng), + last_chainstate_tip_block_time: now, observer, + dns_seed, }) } @@ -817,6 +827,10 @@ where self.peerdb.set_anchors(anchor_addresses); } + if let Some(o) = self.observer.as_mut() { + o.on_connection_accepted(address) + } + Ok(()) } @@ -865,10 +879,19 @@ where log::debug!("connection rejected for peer {peer_id}: {accept_err}"); // Disconnect should always succeed unless the node is shutting down. - // Calling expect here is fine because PeerManager will stop before the backend. - self.peer_connectivity_handle - .disconnect(peer_id) - .expect("disconnect failed unexpectedly"); + // But at this moment there is a possibility for backend to be shut down + // before peer manager, at least in tests, so we don't "expect" and log + // the error instead. + // TODO: investigate why peer manager can be shut down before the backend (it shouldn't + // be this way according to an earlier comment). + // TODO: we probably shouldn't use "log::error" if the error happened during + // shutdown. Probably, peer manager should accept the "shutdown" flag, like other + // p2p components do, and ignore/log::info the errors it it's set (this also applies + // to other places, search for "log::error" in this file). + let disconnect_result = self.peer_connectivity_handle.disconnect(peer_id); + if let Err(err) = disconnect_result { + log::error!("disconnect failed unexpectedly: {err:?}"); + } match peer_role { PeerRole::Inbound => {} @@ -973,49 +996,10 @@ where /// Fill PeerDb with addresses from the DNS seed servers async fn reload_dns_seed(&mut self) { - let dns_seed = match self.chain_config.chain_type() { - ChainType::Mainnet => DNS_SEEDS_MAINNET.as_slice(), - ChainType::Testnet => DNS_SEEDS_TESTNET.as_slice(), - ChainType::Regtest | ChainType::Signet => &[], - }; - - if dns_seed.is_empty() { - return; - } - - log::debug!("Resolve DNS seed..."); - let results = futures::future::join_all( - dns_seed - .iter() - .map(|host| tokio::net::lookup_host((*host, self.chain_config.p2p_port()))), - ) - .await; - - let mut total = 0; - for result in results { - match result { - Ok(list) => { - list.filter_map(|addr| { - SocketAddress::from_peer_address( - // Convert SocketAddr to PeerAddress - &addr.into(), - *self.p2p_config.allow_discover_private_ips, - ) - }) - // Randomize selection because records can be sorted by type (A and AAAA) - .choose_multiple(&mut make_pseudo_rng(), MAX_DNS_RECORDS) - .into_iter() - .for_each(|addr| { - total += 1; - self.peerdb.peer_discovered(addr); - }); - } - Err(err) => { - log::error!("resolve DNS seed failed: {err}"); - } - } + let addresses = self.dns_seed.obtain_addresses().await; + for addr in addresses { + self.peerdb.peer_discovered(addr); } - log::debug!("DNS seed records found: {total}"); } fn automatic_outbound_peers(&self) -> BTreeSet<SocketAddress> { @@ -1075,6 +1059,10 @@ where } self.try_evict_block_relay_peer(); + + if let Some(o) = self.observer.as_mut() { + o.on_heartbeat(); + } } fn handle_incoming_message(&mut self, peer: PeerId, message: PeerManagerMessage) { @@ -1243,13 +1231,17 @@ where } PeerManagerEvent::NewTipReceived { peer_id, block_id } => { if let Some(peer) = self.peers.get_mut(&peer_id) { - log::debug!("new tip {block_id} received from peer {peer_id}",); + log::debug!("new tip {block_id} received from peer {peer_id}"); peer.last_tip_block_time = Some(self.time_getter.get_time()); } } + PeerManagerEvent::NewChainstateTip(block_id) => { + log::debug!("new tip {block_id} added to chainstate"); + self.last_chainstate_tip_block_time = self.time_getter.get_time(); + } PeerManagerEvent::NewValidTransactionReceived { peer_id, txid } => { if let Some(peer) = self.peers.get_mut(&peer_id) { - log::debug!("new transaction {txid} received from peer {peer_id}",); + log::debug!("new transaction {txid} received from peer {peer_id}"); peer.last_tx_time = Some(self.time_getter.get_time()); } } @@ -1287,6 +1279,9 @@ where self.peerdb.unban(&address); response.send(Ok(())); } + PeerManagerEvent::GenericQuery(query_func) => { + query_func(self); + } } } @@ -1480,6 +1475,10 @@ where let mut periodic_interval = tokio::time::interval(Duration::from_secs(1)); + // Note: bitcoin core also uses "3 * block_spacing" for stale tip detection, but their + // "block spacing" is 10 min instead of out 2. TODO: should we use bigger time diff? + let stale_tip_time_diff = *self.chain_config.target_block_spacing() * 3; + if let Some(chan) = loop_started_tx { chan.send(()); } @@ -1530,13 +1529,16 @@ where heartbeat_call_needed = false; } + let time_since_last_chainstate_tip = + (now - self.last_chainstate_tip_block_time).unwrap_or(Duration::ZERO); + // Reload DNS if there are no outbound connections if now >= next_dns_reload - && self.peers.is_empty() && self.pending_outbound_connects.is_empty() + && (self.peers.is_empty() || time_since_last_chainstate_tip > stale_tip_time_diff) { self.reload_dns_seed().await; - next_dns_reload = (now + Duration::from_secs(60)) + next_dns_reload = (now + PEER_MGR_DNS_RELOAD_INTERVAL) .expect("Times derived from local clock; cannot fail"); heartbeat_call_needed = true; } @@ -1574,11 +1576,6 @@ where self.run_internal(None).await } - #[cfg(test)] - pub fn peers(&self) -> &BTreeMap<PeerId, PeerContext> { - &self.peers - } - #[cfg(test)] pub fn peerdb(&self) -> &peerdb::PeerDb<S> { &self.peerdb @@ -1588,6 +1585,27 @@ where pub trait Observer { fn on_peer_ban_score_adjustment(&mut self, address: SocketAddress, new_score: u32); fn on_peer_ban(&mut self, address: BannableAddress); + // This will be called at the end of "heartbeat" function. + fn on_heartbeat(&mut self); + // This will be called for both incoming and outgoing connections. + fn on_connection_accepted(&mut self, address: SocketAddress); +} + +pub trait PeerManagerQueryInterface { + #[cfg(test)] + fn peers(&self) -> &BTreeMap<PeerId, PeerContext>; +} + +impl<T, S> PeerManagerQueryInterface for PeerManager<T, S> +where + T: NetworkingService + 'static, + T::ConnectivityHandle: ConnectivityService<T>, + S: PeerDbStorage, +{ + #[cfg(test)] + fn peers(&self) -> &BTreeMap<PeerId, PeerContext> { + &self.peers + } } #[cfg(test)] diff --git a/p2p/src/peer_manager/peers_eviction/mod.rs b/p2p/src/peer_manager/peers_eviction/mod.rs index e9a793a9fe..234f2bbc73 100644 --- a/p2p/src/peer_manager/peers_eviction/mod.rs +++ b/p2p/src/peer_manager/peers_eviction/mod.rs @@ -25,16 +25,16 @@ use super::{address_groups::AddressGroup, peer_context::PeerContext, OUTBOUND_BL #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] struct NetGroupKeyed(u64); -const PRESERVED_COUNT_ADDRESS_GROUP: usize = 4; -const PRESERVED_COUNT_PING: usize = 8; -const PRESERVED_COUNT_NEW_BLOCKS: usize = 8; -const PRESERVED_COUNT_NEW_TRANSACTIONS: usize = 4; +const PRESERVED_INBOUND_COUNT_ADDRESS_GROUP: usize = 4; +const PRESERVED_INBOUND_COUNT_PING: usize = 8; +const PRESERVED_INBOUND_COUNT_NEW_BLOCKS: usize = 8; +const PRESERVED_INBOUND_COUNT_NEW_TRANSACTIONS: usize = 4; #[cfg(test)] -const PRESERVED_COUNT_TOTAL: usize = PRESERVED_COUNT_ADDRESS_GROUP - + PRESERVED_COUNT_PING - + PRESERVED_COUNT_NEW_BLOCKS - + PRESERVED_COUNT_NEW_TRANSACTIONS; +pub const PRESERVED_INBOUND_COUNT_TOTAL: usize = PRESERVED_INBOUND_COUNT_ADDRESS_GROUP + + PRESERVED_INBOUND_COUNT_PING + + PRESERVED_INBOUND_COUNT_NEW_BLOCKS + + PRESERVED_INBOUND_COUNT_NEW_TRANSACTIONS; /// A copy of `PeerContext` with fields relevant to the eviction logic /// @@ -183,10 +183,11 @@ pub fn select_for_eviction_inbound(candidates: Vec<EvictionCandidate>) -> Option // TODO: Preserve connections from whitelisted IPs let candidates = filter_peer_role(candidates, PeerRole::Inbound); - let candidates = filter_address_group(candidates, PRESERVED_COUNT_ADDRESS_GROUP); - let candidates = filter_fast_ping(candidates, PRESERVED_COUNT_PING); - let candidates = filter_by_last_tip_block_time(candidates, PRESERVED_COUNT_NEW_BLOCKS); - let candidates = filter_by_last_transaction_time(candidates, PRESERVED_COUNT_NEW_TRANSACTIONS); + let candidates = filter_address_group(candidates, PRESERVED_INBOUND_COUNT_ADDRESS_GROUP); + let candidates = filter_fast_ping(candidates, PRESERVED_INBOUND_COUNT_PING); + let candidates = filter_by_last_tip_block_time(candidates, PRESERVED_INBOUND_COUNT_NEW_BLOCKS); + let candidates = + filter_by_last_transaction_time(candidates, PRESERVED_INBOUND_COUNT_NEW_TRANSACTIONS); find_group_most_connections(candidates) } diff --git a/p2p/src/peer_manager/peers_eviction/tests.rs b/p2p/src/peer_manager/peers_eviction/tests.rs index 913e7bd05d..83ad1f07fe 100644 --- a/p2p/src/peer_manager/peers_eviction/tests.rs +++ b/p2p/src/peer_manager/peers_eviction/tests.rs @@ -618,13 +618,13 @@ fn random_eviction_candidate(rng: &mut impl Rng) -> EvictionCandidate { fn test_preserved_by_ping(index: usize, candidate: &mut EvictionCandidate) -> bool { // Check that `PRESERVED_COUNT_PING` peers with the lowest ping times are preserved candidate.ping_min = index as i64; - index < PRESERVED_COUNT_PING + index < PRESERVED_INBOUND_COUNT_PING } fn test_preserved_by_address_group(index: usize, candidate: &mut EvictionCandidate) -> bool { // Check that `PRESERVED_COUNT_ADDRESS_GROUP` peers with the highest net_group_keyed values are preserved candidate.net_group_keyed = NetGroupKeyed(u64::MAX - index as u64); - index < PRESERVED_COUNT_ADDRESS_GROUP + index < PRESERVED_INBOUND_COUNT_ADDRESS_GROUP } #[tracing::instrument(skip(seed))] @@ -654,7 +654,7 @@ fn test_randomized(#[case] seed: Seed) { candidates.shuffle(&mut rng); let peer_id = select_for_eviction_inbound(candidates.clone()); assert_eq!( - count > PRESERVED_COUNT_TOTAL, + count > PRESERVED_INBOUND_COUNT_TOTAL, peer_id.is_some(), "unexpected result, candidates: {candidates:?}, peer_id: {peer_id:?}" ); diff --git a/p2p/src/peer_manager_event.rs b/p2p/src/peer_manager_event.rs index c8368bd56a..c1500290df 100644 --- a/p2p/src/peer_manager_event.rs +++ b/p2p/src/peer_manager_event.rs @@ -22,7 +22,10 @@ use p2p_types::{ socket_address::SocketAddress, }; -use crate::{interface::types::ConnectedPeer, types::peer_id::PeerId, utils::oneshot_nofail}; +use crate::{ + interface::types::ConnectedPeer, peer_manager::PeerManagerQueryInterface, + types::peer_id::PeerId, utils::oneshot_nofail, +}; #[derive(Debug)] pub enum PeerDisconnectionDbAction { @@ -62,6 +65,7 @@ pub enum PeerManagerEvent { AdjustPeerScore(PeerId, u32, oneshot_nofail::Sender<crate::Result<()>>), /// New tip block received. + /// /// In PoW all valid blocks have a cost, but in PoS new blocks are practically free. /// So, unlike Bitcoin Core, we only consider new tips. /// It is used as an eviction criterion. @@ -70,6 +74,14 @@ pub enum PeerManagerEvent { block_id: Id<Block>, }, + /// A new tip block has been added to the chainstate + /// + /// Note: normally, NewTipReceived and NewChainstateTip are dependent in the sense + /// that if NewTipReceived is produced, it will be accompanied by NewChainstateTip. + /// However, peer manager should not use this fact and treat them as independent + /// events instead. + NewChainstateTip(Id<Block>), + /// New valid unseen transaction received. /// It is used as an eviction criterion. NewValidTransactionReceived { @@ -84,4 +96,16 @@ pub enum PeerManagerEvent { ListBanned(oneshot_nofail::Sender<Vec<BannableAddress>>), Ban(BannableAddress, oneshot_nofail::Sender<crate::Result<()>>), Unban(BannableAddress, oneshot_nofail::Sender<crate::Result<()>>), + + GenericQuery(Box<dyn PeerManagerQueryFunc>), +} + +pub trait PeerManagerQueryFunc: FnOnce(&dyn PeerManagerQueryInterface) + Send {} + +impl<F> PeerManagerQueryFunc for F where F: FnOnce(&dyn PeerManagerQueryInterface) + Send {} + +impl std::fmt::Debug for dyn PeerManagerQueryFunc { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "DisplayFunction") + } } diff --git a/p2p/src/sync/mod.rs b/p2p/src/sync/mod.rs index b3ed134c95..863fee66be 100644 --- a/p2p/src/sync/mod.rs +++ b/p2p/src/sync/mod.rs @@ -226,6 +226,10 @@ where /// Announces the header of a new block to peers. async fn handle_new_tip(&mut self, block_id: Id<Block>) -> Result<()> { + self.peer_manager_sender + .send(PeerManagerEvent::NewChainstateTip(block_id)) + .map_err(|_| P2pError::ChannelClosed)?; + if self.chainstate_handle.is_initial_block_download().await? { return Ok(()); } @@ -351,3 +355,8 @@ pub async fn subscribe_to_tx_processed( #[cfg(test)] mod tests; + +#[cfg(test)] +pub mod test_helpers { + pub use super::tests::helpers::*; +} diff --git a/p2p/src/sync/peer_v2.rs b/p2p/src/sync/peer_v2.rs index 28fa6f425a..96b95d3715 100644 --- a/p2p/src/sync/peer_v2.rs +++ b/p2p/src/sync/peer_v2.rs @@ -406,7 +406,7 @@ where } if self.chainstate_handle.is_initial_block_download().await? { - log::debug!("[peer id = {}] Ignoring headers request because the node is in initial block download", self.id()); + log::debug!("[peer id = {}] Responding with empty headers list because the node is in initial block download", self.id()); // Respond with an empty list to avoid being marked as stalled self.send_headers(HeaderList::new(Vec::new()))?; return Ok(()); diff --git a/p2p/src/sync/tests/helpers/mod.rs b/p2p/src/sync/tests/helpers/mod.rs index 7280032b0d..1ecccf3c73 100644 --- a/p2p/src/sync/tests/helpers/mod.rs +++ b/p2p/src/sync/tests/helpers/mod.rs @@ -275,10 +275,12 @@ impl TestNode { | PeerManagerEvent::RemoveReserved(_, _) | PeerManagerEvent::ListBanned(_) | PeerManagerEvent::Ban(_, _) - | PeerManagerEvent::Unban(_, _) => { + | PeerManagerEvent::Unban(_, _) + | PeerManagerEvent::GenericQuery(_) => { panic!("Unexpected peer manager event: {peer_event:?}"); } PeerManagerEvent::NewTipReceived { .. } + | PeerManagerEvent::NewChainstateTip(_) | PeerManagerEvent::NewValidTransactionReceived { .. } => { // Ignored } diff --git a/p2p/src/sync/tests/helpers/test_node_group.rs b/p2p/src/sync/tests/helpers/test_node_group.rs index 60ed93a438..1795e31d5b 100644 --- a/p2p/src/sync/tests/helpers/test_node_group.rs +++ b/p2p/src/sync/tests/helpers/test_node_group.rs @@ -93,9 +93,6 @@ impl TestNodeGroup { /// Receive a SyncMessage from any peer for which delay_sync_messages_from is set to false. /// Panic if a timeout occurs. async fn receive_next_sync_message(&mut self) -> SyncMessageWithNodeIdx { - // TODO: is there a better way to perform select_all on sync_event_receivers (i.e. without - // extra allocations)? - let mut sync_msg_receivers: Vec<_> = self .data .iter_mut() @@ -329,10 +326,12 @@ impl TestNodeGroup { | PeerManagerEvent::RemoveReserved(_, _) | PeerManagerEvent::ListBanned(_) | PeerManagerEvent::Ban(_, _) - | PeerManagerEvent::Unban(_, _) => { + | PeerManagerEvent::Unban(_, _) + | PeerManagerEvent::GenericQuery(_) => { panic!("Unexpected peer manager event: {peer_event:?}"); } PeerManagerEvent::NewTipReceived { .. } + | PeerManagerEvent::NewChainstateTip(_) | PeerManagerEvent::NewValidTransactionReceived { .. } => { // Ignored } diff --git a/p2p/src/sync/tests/mod.rs b/p2p/src/sync/tests/mod.rs index 2b1d20d034..89a6d2aa10 100644 --- a/p2p/src/sync/tests/mod.rs +++ b/p2p/src/sync/tests/mod.rs @@ -19,7 +19,7 @@ mod block_list_request; mod block_response; mod header_list_request; mod header_list_response; -mod helpers; +pub mod helpers; mod network_sync; mod peer_events; mod tx_announcement; diff --git a/p2p/src/testing_utils.rs b/p2p/src/testing_utils.rs index 7b8518aa75..9de27366ab 100644 --- a/p2p/src/testing_utils.rs +++ b/p2p/src/testing_utils.rs @@ -92,6 +92,14 @@ impl TestTransportMaker for TestTransportChannel { } } +impl TestTransportChannel { + pub fn make_transport_with_local_addr_in_group( + addr_group_idx: u32, + addr_group_bit_offset: u32, + ) -> MpscChannelTransport { + MpscChannelTransport::new_with_addr_in_group(addr_group_idx, addr_group_bit_offset) + } +} pub struct TestTransportNoise {} impl TestTransportMaker for TestTransportNoise { diff --git a/p2p/src/tests/correct_handshake.rs b/p2p/src/tests/correct_handshake.rs index d7c9b23c3f..0f43fef26f 100644 --- a/p2p/src/tests/correct_handshake.rs +++ b/p2p/src/tests/correct_handshake.rs @@ -15,6 +15,7 @@ use std::sync::Arc; +use p2p_test_utils::P2pBasicTestTimeGetter; use test_utils::assert_matches; use crate::{ @@ -22,6 +23,7 @@ use crate::{ transport::{BufferedTranscoder, TransportListener, TransportSocket}, types::{HandshakeMessage, Message, P2pTimestamp}, }, + peer_manager::PeerManagerQueryInterface, testing_utils::{ test_p2p_config, TestTransportChannel, TestTransportMaker, TestTransportNoise, TestTransportTcp, TEST_PROTOCOL_VERSION, @@ -37,12 +39,15 @@ where TTM: TestTransportMaker, TTM::Transport: TransportSocket, { + let time_getter = P2pBasicTestTimeGetter::new(); let chain_config = Arc::new(common::chain::config::create_unit_test_config()); let p2p_config = Arc::new(test_p2p_config()); - let mut test_node = TestNode::<TTM>::start( + let mut test_node = TestNode::<TTM::Transport>::start( + time_getter.get_time_getter(), Arc::clone(&chain_config), Arc::clone(&p2p_config), + TTM::make_transport(), TTM::make_address(), TEST_PROTOCOL_VERSION.into(), ) @@ -68,9 +73,7 @@ where software_version: *chain_config.software_version(), services: (*p2p_config.node_type).into(), receiver_address: None, - current_time: P2pTimestamp::from_time( - test_node.time_getter().get_time_getter().get_time(), - ), + current_time: P2pTimestamp::from_time(time_getter.get_time_getter().get_time()), })) .await .unwrap(); @@ -116,12 +119,15 @@ where TTM: TestTransportMaker, TTM::Transport: TransportSocket, { + let time_getter = P2pBasicTestTimeGetter::new(); let chain_config = Arc::new(common::chain::config::create_unit_test_config()); let p2p_config = Arc::new(test_p2p_config()); - let mut test_node = TestNode::<TTM>::start( + let mut test_node = TestNode::<TTM::Transport>::start( + time_getter.get_time_getter(), Arc::clone(&chain_config), Arc::clone(&p2p_config), + TTM::make_transport(), TTM::make_address(), TEST_PROTOCOL_VERSION.into(), ) @@ -141,9 +147,7 @@ where software_version: *chain_config.software_version(), services: (*p2p_config.node_type).into(), receiver_address: None, - current_time: P2pTimestamp::from_time( - test_node.time_getter().get_time_getter().get_time(), - ), + current_time: P2pTimestamp::from_time(time_getter.get_time_getter().get_time()), handshake_nonce: 0, })) .await diff --git a/p2p/src/tests/helpers/mod.rs b/p2p/src/tests/helpers/mod.rs new file mode 100644 index 0000000000..b87c76bd78 --- /dev/null +++ b/p2p/src/tests/helpers/mod.rs @@ -0,0 +1,151 @@ +// Copyright (c) 2021-2023 RBB S.r.l +// opensource@mintlayer.org +// SPDX-License-Identifier: MIT +// Licensed under the MIT License; +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + collections::BTreeMap, + sync::{Arc, Mutex}, +}; + +use async_trait::async_trait; +use futures::Future; +use tokio::{sync::mpsc::UnboundedSender, time}; + +use logging::log; +use p2p_test_utils::LONG_TIMEOUT; +use p2p_types::{bannable_address::BannableAddress, socket_address::SocketAddress, PeerId}; + +use crate::{ + net::types::{PeerInfo, PeerRole}, + peer_manager::{self, dns_seed::DnsSeed}, +}; + +pub mod test_node; +pub mod test_node_group; + +pub use test_node::*; +pub use test_node_group::*; + +pub async fn timeout<F>(future: F) +where + F: Future, +{ + // TODO: in the case of timeout, a panic is likely to occur in an unrelated place, + // e.g. "subsystem manager's handle hasn't been joined" is a common one. This can be + // confusing, so we need a way to abort the test before some unrelated code decides to panic. + time::timeout(LONG_TIMEOUT, future).await.unwrap(); +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum PeerManagerNotification { + BanScoreAdjustment { + address: SocketAddress, + new_score: u32, + }, + Ban { + address: BannableAddress, + }, + Heartbeat, + ConnectionAccepted { + address: SocketAddress, + }, +} + +pub struct PeerManagerObserver { + event_tx: UnboundedSender<PeerManagerNotification>, +} + +impl PeerManagerObserver { + pub fn new(event_tx: UnboundedSender<PeerManagerNotification>) -> Self { + Self { event_tx } + } + + fn send_notification(&self, notification: PeerManagerNotification) { + let send_result = self.event_tx.send(notification.clone()); + + if let Err(err) = send_result { + log::warn!("Error sending peer manager notification {notification:?}: {err}"); + } + } +} + +impl peer_manager::Observer for PeerManagerObserver { + fn on_peer_ban_score_adjustment(&mut self, address: SocketAddress, new_score: u32) { + self.send_notification(PeerManagerNotification::BanScoreAdjustment { address, new_score }); + } + + fn on_peer_ban(&mut self, address: BannableAddress) { + self.send_notification(PeerManagerNotification::Ban { address }); + } + + fn on_heartbeat(&mut self) { + self.send_notification(PeerManagerNotification::Heartbeat); + } + + fn on_connection_accepted(&mut self, address: SocketAddress) { + self.send_notification(PeerManagerNotification::ConnectionAccepted { address }); + } +} + +#[derive(Debug)] +pub struct TestPeerInfo { + pub info: PeerInfo, + pub role: PeerRole, +} + +#[derive(Debug)] +pub struct TestPeersInfo { + pub info: BTreeMap<SocketAddress, TestPeerInfo>, +} + +impl TestPeersInfo { + pub fn from_peer_mgr_peer_contexts( + contexts: &BTreeMap<PeerId, peer_manager::peer_context::PeerContext>, + ) -> Self { + let mut info = BTreeMap::new(); + + for ctx in contexts.values() { + info.insert( + ctx.address, + TestPeerInfo { + info: ctx.info.clone(), + role: ctx.peer_role, + }, + ); + } + + Self { info } + } + + pub fn count_peers_by_role(&self, role: PeerRole) -> usize { + self.info.iter().filter(|(_, info)| info.role == role).count() + } +} + +pub struct TestDnsSeed { + addresses: Arc<Mutex<Vec<SocketAddress>>>, +} + +impl TestDnsSeed { + pub fn new(addresses: Arc<Mutex<Vec<SocketAddress>>>) -> Self { + Self { addresses } + } +} + +#[async_trait] +impl DnsSeed for TestDnsSeed { + async fn obtain_addresses(&self) -> Vec<SocketAddress> { + self.addresses.lock().unwrap().clone() + } +} diff --git a/p2p/src/tests/helpers.rs b/p2p/src/tests/helpers/test_node.rs similarity index 60% rename from p2p/src/tests/helpers.rs rename to p2p/src/tests/helpers/test_node.rs index 81c6491590..94ec4a9521 100644 --- a/p2p/src/tests/helpers.rs +++ b/p2p/src/tests/helpers/test_node.rs @@ -1,4 +1,4 @@ -// Copyright (c) 2023 RBB S.r.l +// Copyright (c) 2021-2023 RBB S.r.l // opensource@mintlayer.org // SPDX-License-Identifier: MIT // Licensed under the MIT License; @@ -13,21 +13,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! A module for tests that behave like integration tests but still need access to private data -//! via methods under #[cfg(test)], +use std::sync::{Arc, Mutex}; -use std::sync::Arc; - -use futures::Future; -use p2p_test_utils::{expect_recv, P2pBasicTestTimeGetter, LONG_TIMEOUT, SHORT_TIMEOUT}; -use p2p_types::{ - bannable_address::BannableAddress, p2p_event::P2pEventHandler, socket_address::SocketAddress, +use chainstate::{ + make_chainstate, ChainstateConfig, ChainstateHandle, DefaultTransactionVerificationStrategy, }; +use p2p_test_utils::SHORT_TIMEOUT; +use p2p_types::{p2p_event::P2pEventHandler, socket_address::SocketAddress}; use storage_inmemory::InMemory; use subsystem::ShutdownTrigger; use tokio::{ sync::{ - mpsc::{self, UnboundedSender}, + mpsc::{self}, oneshot, }, task::JoinHandle, @@ -41,81 +38,97 @@ use crate::{ default_backend::{transport::TransportSocket, DefaultNetworkingService}, ConnectivityService, }, - peer_manager::{self, peerdb::storage_impl::PeerDbStorageImpl, PeerManager}, + peer_manager::{ + peerdb::storage_impl::PeerDbStorageImpl, PeerManager, PeerManagerQueryInterface, + }, protocol::ProtocolVersion, sync::BlockSyncManager, - testing_utils::{peerdb_inmemory_store, test_p2p_config, TestTransportMaker}, + testing_utils::peerdb_inmemory_store, types::ip_or_socket_address::IpOrSocketAddress, utils::oneshot_nofail, PeerManagerEvent, }; -use common::chain::ChainConfig; +use common::{chain::ChainConfig, time_getter::TimeGetter}; use utils::atomics::SeqCstAtomicBool; -type PeerMgr<TTM> = PeerManager< - DefaultNetworkingService<<TTM as TestTransportMaker>::Transport>, - PeerDbStorageImpl<InMemory>, ->; +use super::{PeerManagerNotification, PeerManagerObserver, TestDnsSeed, TestPeersInfo}; + +type PeerMgr<Transport> = + PeerManager<DefaultNetworkingService<Transport>, PeerDbStorageImpl<InMemory>>; -pub struct TestNode<TTM> +pub struct TestNode<Transport> where - TTM: TestTransportMaker, - TTM::Transport: TransportSocket, + Transport: TransportSocket, { - time_getter: P2pBasicTestTimeGetter, peer_mgr_event_tx: mpsc::UnboundedSender<PeerManagerEvent>, local_address: SocketAddress, shutdown: Arc<SeqCstAtomicBool>, - shutdown_sender: oneshot::Sender<()>, + backend_shutdown_sender: oneshot::Sender<()>, _subscribers_sender: mpsc::UnboundedSender<P2pEventHandler>, - peer_mgr_join_handle: JoinHandle<(PeerMgr<TTM>, P2pError)>, + backend_join_handle: JoinHandle<()>, + peer_mgr_join_handle: JoinHandle<(PeerMgr<Transport>, P2pError)>, sync_mgr_join_handle: JoinHandle<P2pError>, shutdown_trigger: ShutdownTrigger, subsystem_mgr_join_handle: subsystem::ManagerJoinHandle, peer_mgr_notification_rx: mpsc::UnboundedReceiver<PeerManagerNotification>, + chainstate: ChainstateHandle, + dns_seed_addresses: Arc<Mutex<Vec<SocketAddress>>>, } // This is what's left of a test node after it has been stopped. -// TODO: this is kind of ugly; instead of examining the remnants, tests should be able to observe -// the innards of the p2p components (such as the peer db) on the fly. -pub struct TestNodeRemnants<TTM> +// TODO: it should be possible to use PeerManagerEvent::GenericQuery to examine peer manager's +// internals on the fly. +pub struct TestNodeRemnants<Transport> where - TTM: TestTransportMaker, - TTM::Transport: TransportSocket, + Transport: TransportSocket, { - pub peer_mgr: PeerMgr<TTM>, + pub peer_mgr: PeerMgr<Transport>, pub peer_mgr_error: P2pError, pub sync_mgr_error: P2pError, } -impl<TTM> TestNode<TTM> +impl<Transport> TestNode<Transport> where - TTM: TestTransportMaker, - TTM::Transport: TransportSocket, + Transport: TransportSocket, { pub async fn start( + time_getter: TimeGetter, chain_config: Arc<ChainConfig>, p2p_config: Arc<P2pConfig>, + transport: Transport, bind_address: SocketAddress, protocol_version: ProtocolVersion, ) -> Self { - let time_getter = P2pBasicTestTimeGetter::new(); - let (peer_mgr_event_tx, peer_mgr_event_rx) = mpsc::unbounded_channel(); + let chainstate = make_chainstate( + Arc::clone(&chain_config), + ChainstateConfig::new(), + chainstate_storage::inmemory::Store::new_empty().unwrap(), + DefaultTransactionVerificationStrategy::new(), + None, + time_getter.clone(), + ) + .unwrap(); let (chainstate, mempool, shutdown_trigger, subsystem_mgr_join_handle) = - p2p_test_utils::start_subsystems(Arc::clone(&chain_config)); + p2p_test_utils::start_subsystems_with_chainstate( + chainstate, + Arc::clone(&chain_config), + time_getter.clone(), + ); + + let (peer_mgr_event_tx, peer_mgr_event_rx) = mpsc::unbounded_channel(); let shutdown = Arc::new(SeqCstAtomicBool::new(false)); - let (shutdown_sender, shutdown_receiver) = oneshot::channel(); + let (backend_shutdown_sender, backend_shutdown_receiver) = oneshot::channel(); let (subscribers_sender, subscribers_receiver) = mpsc::unbounded_channel(); - let (conn_handle, messaging_handle, syncing_event_rx, _) = - DefaultNetworkingService::<TTM::Transport>::start_with_version( - TTM::make_transport(), + let (conn_handle, messaging_handle, syncing_event_rx, backend_join_handle) = + DefaultNetworkingService::<Transport>::start_with_version( + transport, vec![bind_address], Arc::clone(&chain_config), - Arc::new(test_p2p_config()), - time_getter.get_time_getter(), + Arc::clone(&p2p_config), + time_getter.clone(), Arc::clone(&shutdown), - shutdown_receiver, + backend_shutdown_receiver, subscribers_receiver, protocol_version, ) @@ -126,15 +139,17 @@ where let (peer_mgr_notification_tx, peer_mgr_notification_rx) = mpsc::unbounded_channel(); let peer_mgr_observer = Box::new(PeerManagerObserver::new(peer_mgr_notification_tx)); + let dns_seed_addresses = Arc::new(Mutex::new(Vec::new())); - let peer_mgr = PeerMgr::<TTM>::new_with_observer( + let peer_mgr = PeerMgr::<Transport>::new_generic( Arc::clone(&chain_config), Arc::clone(&p2p_config), conn_handle, peer_mgr_event_rx, - time_getter.get_time_getter(), + time_getter.clone(), peerdb_inmemory_store(), Some(peer_mgr_observer), + Box::new(TestDnsSeed::new(dns_seed_addresses.clone())), ) .unwrap(); let peer_mgr_join_handle = logging::spawn_in_current_span(async move { @@ -147,15 +162,15 @@ where (peer_mgr, err) }); - let sync_mgr = BlockSyncManager::<DefaultNetworkingService<TTM::Transport>>::new( + let sync_mgr = BlockSyncManager::<DefaultNetworkingService<Transport>>::new( Arc::clone(&chain_config), Arc::clone(&p2p_config), messaging_handle, syncing_event_rx, - chainstate, + chainstate.clone(), mempool, peer_mgr_event_tx.clone(), - time_getter.get_time_getter(), + time_getter.clone(), ); let sync_mgr_join_handle = logging::spawn_in_current_span(async move { match sync_mgr.run().await { @@ -165,17 +180,19 @@ where }); TestNode { - time_getter, peer_mgr_event_tx, local_address, shutdown, - shutdown_sender, + backend_shutdown_sender, _subscribers_sender: subscribers_sender, + backend_join_handle, peer_mgr_join_handle, sync_mgr_join_handle, shutdown_trigger, subsystem_mgr_join_handle, peer_mgr_notification_rx, + chainstate, + dns_seed_addresses, } } @@ -183,8 +200,8 @@ where &self.local_address } - pub fn time_getter(&self) -> &P2pBasicTestTimeGetter { - &self.time_getter + pub fn chainstate(&self) -> &ChainstateHandle { + &self.chainstate } // Note: the returned receiver will become readable only after the handshake is finished. @@ -203,15 +220,8 @@ where connect_result_rx } - pub async fn expect_peer_mgr_notification(&mut self) -> PeerManagerNotification { - expect_recv!(self.peer_mgr_notification_rx) - } - pub async fn expect_no_banning(&mut self) { - // Note: at the moment the loop is useless, because all existing notification types - // are related to banning, but it may change in the future. time::timeout(SHORT_TIMEOUT, async { - #[allow(clippy::never_loop)] loop { match self.peer_mgr_notification_rx.recv().await.unwrap() { PeerManagerNotification::BanScoreAdjustment { @@ -221,6 +231,7 @@ where | PeerManagerNotification::Ban { address: _ } => { break; } + _ => {} } } }) @@ -228,11 +239,40 @@ where .unwrap_err(); } - pub async fn join(self) -> TestNodeRemnants<TTM> { + pub async fn wait_for_ban_score_adjustment(&mut self) -> (SocketAddress, u32) { + loop { + if let PeerManagerNotification::BanScoreAdjustment { address, new_score } = + self.peer_mgr_notification_rx.recv().await.unwrap() + { + return (address, new_score); + } + } + } + + pub async fn get_peers_info(&self) -> TestPeersInfo { + let (tx, mut rx) = mpsc::unbounded_channel(); + + self.peer_mgr_event_tx + .send(PeerManagerEvent::GenericQuery(Box::new( + move |mgr: &dyn PeerManagerQueryInterface| { + tx.send(TestPeersInfo::from_peer_mgr_peer_contexts(mgr.peers())).unwrap(); + }, + ))) + .unwrap(); + + rx.recv().await.unwrap() + } + + pub fn set_dns_seed_addresses(&self, addresses: Vec<SocketAddress>) { + *self.dns_seed_addresses.lock().unwrap() = addresses; + } + + pub async fn join(self) -> TestNodeRemnants<Transport> { self.shutdown.store(true); - let _ = self.shutdown_sender.send(()); + let _ = self.backend_shutdown_sender.send(()); let (peer_mgr, peer_mgr_error) = self.peer_mgr_join_handle.await.unwrap(); let sync_mgr_error = self.sync_mgr_join_handle.await.unwrap(); + self.backend_join_handle.await.unwrap(); self.shutdown_trigger.initiate(); self.subsystem_mgr_join_handle.join().await; @@ -243,46 +283,3 @@ where } } } - -pub async fn timeout<F>(future: F) -where - F: Future, -{ - // TODO: in the case of timeout, a panic is likely to occur in an unrelated place, - // e.g. "subsystem manager's handle hasn't been joined" is a common one. This can be - // confusing, so we need a way to abort the test before some unrelated code decides to panic. - time::timeout(LONG_TIMEOUT, future).await.unwrap(); -} - -#[derive(Debug)] -pub enum PeerManagerNotification { - BanScoreAdjustment { - address: SocketAddress, - new_score: u32, - }, - Ban { - address: BannableAddress, - }, -} - -pub struct PeerManagerObserver { - event_tx: UnboundedSender<PeerManagerNotification>, -} - -impl PeerManagerObserver { - pub fn new(event_tx: UnboundedSender<PeerManagerNotification>) -> Self { - Self { event_tx } - } -} - -impl peer_manager::Observer for PeerManagerObserver { - fn on_peer_ban_score_adjustment(&mut self, address: SocketAddress, new_score: u32) { - self.event_tx - .send(PeerManagerNotification::BanScoreAdjustment { address, new_score }) - .unwrap(); - } - - fn on_peer_ban(&mut self, address: BannableAddress) { - self.event_tx.send(PeerManagerNotification::Ban { address }).unwrap(); - } -} diff --git a/p2p/src/tests/helpers/test_node_group.rs b/p2p/src/tests/helpers/test_node_group.rs new file mode 100644 index 0000000000..0abaa5a0f0 --- /dev/null +++ b/p2p/src/tests/helpers/test_node_group.rs @@ -0,0 +1,104 @@ +// Copyright (c) 2021-2023 RBB S.r.l +// opensource@mintlayer.org +// SPDX-License-Identifier: MIT +// Licensed under the MIT License; +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use common::{chain::Block, primitives::Id}; +use p2p_test_utils::{P2pBasicTestTimeGetter, SHORT_TIMEOUT}; +use p2p_types::socket_address::SocketAddress; +use tokio::time; + +use crate::net::default_backend::transport::TransportSocket; + +use super::test_node::TestNode; + +pub struct TestNodeGroup<Transport> +where + Transport: TransportSocket, +{ + nodes: Vec<TestNode<Transport>>, + time_getter: P2pBasicTestTimeGetter, +} + +impl<Transport> TestNodeGroup<Transport> +where + Transport: TransportSocket, +{ + pub fn new(nodes: Vec<TestNode<Transport>>, time_getter: P2pBasicTestTimeGetter) -> Self { + Self { nodes, time_getter } + } + + pub fn nodes(&self) -> &[TestNode<Transport>] { + &self.nodes + } + + pub fn time_getter(&self) -> &P2pBasicTestTimeGetter { + &self.time_getter + } + + pub fn get_adresses(&self) -> Vec<SocketAddress> { + self.nodes.iter().map(|node| *node.local_address()).collect() + } + + pub fn set_dns_seed_addresses(&self, addresses: &[SocketAddress]) { + for node in &self.nodes { + node.set_dns_seed_addresses(addresses.to_vec()); + } + } + + // Wait until the specified block has been propagated to the specified number of nodes. + pub async fn wait_for_block_propagation_advance_time( + &self, + nodes_count: usize, + block_id: Id<Block>, + time_diff: Duration, + ) { + let mut cur_nodes_count = 0; + + loop { + let prev_nodes_count = cur_nodes_count; + cur_nodes_count = 0; + + for node in &self.nodes { + let block = node + .chainstate() + .call(move |cs| cs.get_block(block_id)) + .await + .unwrap() + .unwrap(); + if block.is_some() { + cur_nodes_count += 1; + } + } + + if cur_nodes_count != prev_nodes_count { + println!("Block {block_id} has been propagated to {cur_nodes_count} nodes"); + } + + if cur_nodes_count >= nodes_count { + break; + } + + time::sleep(SHORT_TIMEOUT).await; + self.time_getter.advance_time(time_diff); + } + } + + pub async fn join(self) { + for node in self.nodes { + node.join().await; + } + } +} diff --git a/p2p/src/tests/incorrect_handshake.rs b/p2p/src/tests/incorrect_handshake.rs index b395886c5d..1b339a4e7c 100644 --- a/p2p/src/tests/incorrect_handshake.rs +++ b/p2p/src/tests/incorrect_handshake.rs @@ -15,6 +15,7 @@ use std::sync::Arc; +use p2p_test_utils::P2pBasicTestTimeGetter; use test_utils::assert_matches; use crate::{ @@ -27,7 +28,7 @@ use crate::{ test_p2p_config, TestTransportChannel, TestTransportMaker, TestTransportNoise, TestTransportTcp, TEST_PROTOCOL_VERSION, }, - tests::helpers::{timeout, PeerManagerNotification, TestNode}, + tests::helpers::{timeout, TestNode}, }; async fn incorrect_handshake_outgoing<TTM>() @@ -35,12 +36,15 @@ where TTM: TestTransportMaker, TTM::Transport: TransportSocket, { + let time_getter = P2pBasicTestTimeGetter::new(); let chain_config = Arc::new(common::chain::config::create_unit_test_config()); let p2p_config = Arc::new(test_p2p_config()); - let mut test_node = TestNode::<TTM>::start( + let mut test_node = TestNode::<TTM::Transport>::start( + time_getter.get_time_getter(), Arc::clone(&chain_config), Arc::clone(&p2p_config), + TTM::make_transport(), TTM::make_address(), TEST_PROTOCOL_VERSION.into(), ) @@ -101,12 +105,15 @@ where TTM: TestTransportMaker, TTM::Transport: TransportSocket, { + let time_getter = P2pBasicTestTimeGetter::new(); let chain_config = Arc::new(common::chain::config::create_unit_test_config()); let p2p_config = Arc::new(test_p2p_config()); - let mut test_node = TestNode::<TTM>::start( + let mut test_node = TestNode::<TTM::Transport>::start( + time_getter.get_time_getter(), Arc::clone(&chain_config), Arc::clone(&p2p_config), + TTM::make_transport(), TTM::make_address(), TEST_PROTOCOL_VERSION.into(), ) @@ -126,13 +133,7 @@ where // This is mainly needed to ensure that the corresponding event reaches peer manager before // we end the test. - assert_matches!( - test_node.expect_peer_mgr_notification().await, - PeerManagerNotification::BanScoreAdjustment { - address: _, - new_score: _ - } - ); + test_node.wait_for_ban_score_adjustment().await; // The peer address should be banned. let test_node_remnants = test_node.join().await; diff --git a/p2p/src/tests/misbehavior.rs b/p2p/src/tests/misbehavior.rs index 9c09eb611b..a2c9651ef1 100644 --- a/p2p/src/tests/misbehavior.rs +++ b/p2p/src/tests/misbehavior.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use chainstate::ban_score::BanScore; +use p2p_test_utils::P2pBasicTestTimeGetter; use test_utils::assert_matches; use crate::{ @@ -24,11 +25,12 @@ use crate::{ transport::{BufferedTranscoder, TransportSocket}, types::{HandshakeMessage, Message, P2pTimestamp}, }, + peer_manager::PeerManagerQueryInterface, testing_utils::{ test_p2p_config, TestTransportChannel, TestTransportMaker, TestTransportNoise, TestTransportTcp, TEST_PROTOCOL_VERSION, }, - tests::helpers::{timeout, PeerManagerNotification, TestNode}, + tests::helpers::{timeout, TestNode}, }; async fn unexpected_handshake_message<TTM>() @@ -36,12 +38,15 @@ where TTM: TestTransportMaker, TTM::Transport: TransportSocket, { + let time_getter = P2pBasicTestTimeGetter::new(); let chain_config = Arc::new(common::chain::config::create_unit_test_config()); let p2p_config = Arc::new(test_p2p_config()); - let mut test_node = TestNode::<TTM>::start( + let mut test_node = TestNode::<TTM::Transport>::start( + time_getter.get_time_getter(), Arc::clone(&chain_config), Arc::clone(&p2p_config), + TTM::make_transport(), TTM::make_address(), TEST_PROTOCOL_VERSION.into(), ) @@ -62,9 +67,7 @@ where software_version: *chain_config.software_version(), services: (*p2p_config.node_type).into(), receiver_address: None, - current_time: P2pTimestamp::from_time( - test_node.time_getter().get_time_getter().get_time(), - ), + current_time: P2pTimestamp::from_time(time_getter.get_time_getter().get_time()), handshake_nonce: 0, })) .await @@ -86,9 +89,7 @@ where software_version: *chain_config.software_version(), services: (*p2p_config.node_type).into(), receiver_address: None, - current_time: P2pTimestamp::from_time( - test_node.time_getter().get_time_getter().get_time(), - ), + current_time: P2pTimestamp::from_time(time_getter.get_time_getter().get_time()), handshake_nonce: 0, })) .await @@ -96,13 +97,7 @@ where // This is mainly needed to ensure that the corresponding event reaches peer manager before // we end the test. - assert_matches!( - test_node.expect_peer_mgr_notification().await, - PeerManagerNotification::BanScoreAdjustment { - address: _, - new_score: _ - } - ); + test_node.wait_for_ban_score_adjustment().await; let test_node_remnants = test_node.join().await; diff --git a/p2p/src/tests/mod.rs b/p2p/src/tests/mod.rs index 80bcd3316c..df6bbd9d13 100644 --- a/p2p/src/tests/mod.rs +++ b/p2p/src/tests/mod.rs @@ -19,6 +19,7 @@ mod correct_handshake; mod incorrect_handshake; mod misbehavior; +mod peer_discovery_on_stale_tip; mod unsupported_version; pub mod helpers; diff --git a/p2p/src/tests/peer_discovery_on_stale_tip.rs b/p2p/src/tests/peer_discovery_on_stale_tip.rs new file mode 100644 index 0000000000..4a1540e388 --- /dev/null +++ b/p2p/src/tests/peer_discovery_on_stale_tip.rs @@ -0,0 +1,416 @@ +// Copyright (c) 2021-2023 RBB S.r.l +// opensource@mintlayer.org +// SPDX-License-Identifier: MIT +// Licensed under the MIT License; +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{collections::BTreeSet, sync::Arc, time::Duration}; + +use chainstate::BlockSource; +use common::{ + chain::{Block, ChainConfig}, + primitives::{user_agent::mintlayer_core_user_agent, Idable}, +}; +use logging::log; +use p2p_test_utils::P2pBasicTestTimeGetter; +use p2p_types::socket_address::SocketAddress; +use test_utils::random::Seed; + +use crate::{ + config::{MaxClockDiff, P2pConfig, PingTimeout, SyncStallingTimeout}, + net::types::PeerRole, + peer_manager::{ + self, address_groups::AddressGroup, OUTBOUND_BLOCK_RELAY_COUNT, + OUTBOUND_FULL_AND_BLOCK_RELAY_COUNT, OUTBOUND_FULL_RELAY_COUNT, + PEER_MGR_DNS_RELOAD_INTERVAL, PEER_MGR_HEARTBEAT_INTERVAL_MAX, + }, + sync::test_helpers::make_new_block, + testing_utils::{TestTransportChannel, TestTransportMaker, TEST_PROTOCOL_VERSION}, + tests::helpers::{timeout, TestNode, TestNodeGroup}, +}; + +// In these tests we want to create nodes in different "address groups" to ensure that +// the maximum number of connections can be established (peer manager normally won't allow more +// than 1 outbound connection per address group). To do so we must use ip addresses with distinct +// higher bytes; only the channel-based transport allows to use arbitrary ip addresses, so we +// have to use it. +type Transport = <TestTransportChannel as TestTransportMaker>::Transport; + +// Test scenario: +// 1) Create a set of nodes; the number of nodes is equal to the maximum number of outbound +// connections that a single node can establish plus 1. +// The nodes start with a fresh block, so they are not in IBD. +// 2) Announce nodes' addresses via the dns seed; the nodes should connect to each other. +// 3) Wait for one hour; the initial block is now stale, but the nodes are still connected +// to each other. +// 4) Start a new node that has a fresh block; announce its address via the dns seed; +// the old nodes should find the new one; some of them should establish an outbound connection +// to it; eventually, all old nodes should receive the fresh block. +#[tracing::instrument(skip(seed))] +#[rstest::rstest] +#[trace] +#[case(Seed::from_entropy())] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn peer_discovery_on_stale_tip(#[case] seed: Seed) { + timeout(peer_discovery_on_stale_tip_impl(seed)).await; +} + +async fn peer_discovery_on_stale_tip_impl(seed: Seed) { + let mut rng = test_utils::random::make_seedable_rng(seed); + let time_getter = P2pBasicTestTimeGetter::new(); + let chain_config = Arc::new(common::chain::config::create_unit_test_config()); + let two_hours = Duration::from_secs(60 * 60 * 2); + let p2p_config = Arc::new(make_p2p_config( + // Note: we'll be moving mocked time forward by 1 hour once and by smaller intervals + // multiple times; because of this, nodes may see each other as dead or as having invalid + // clocks and disconnect each other. To avoid this, we specify artificially large timeouts + // and clock diff. + two_hours.into(), + two_hours.into(), + two_hours.into(), + )); + + let nodes_count = OUTBOUND_FULL_AND_BLOCK_RELAY_COUNT + 1; + let mut nodes = Vec::with_capacity(nodes_count); + + let initial_block = make_new_block( + &chain_config, + None, + &time_getter.get_time_getter(), + &mut rng, + ); + + for i in 0..nodes_count { + nodes.push( + start_node_with_a_block( + &time_getter, + &chain_config, + &p2p_config, + i + 1, + initial_block.clone(), + ) + .await, + ); + } + + let node_group = TestNodeGroup::new(nodes, time_getter.clone()); + let node_addresses = node_group.get_adresses(); + + let address_groups: BTreeSet<_> = node_addresses + .iter() + .map(|addr| AddressGroup::from_peer_address(&addr.as_peer_address())) + .collect(); + // Sanity check - all addresses belong to separate address groups + assert_eq!(address_groups.len(), nodes_count); + + node_group.set_dns_seed_addresses(&node_addresses); + + time_getter.advance_time(PEER_MGR_DNS_RELOAD_INTERVAL); + + // Wait until the maximum number of outbound connections is established. + wait_for_max_outbound_connections(&node_group).await; + + // Advance the time by 1 hour + log::debug!("Advancing time by 1 hour"); + time_getter.advance_time(Duration::from_secs(60 * 60)); + + // All the connections must still be in place + assert_max_outbound_connections(&node_group).await; + + // Start a new node that would produce a block. + let new_node_idx = node_group.nodes().len() + 1; + let new_node = start_node_with_a_block( + &time_getter, + &chain_config, + &p2p_config, + new_node_idx, + initial_block.clone(), + ) + .await; + let new_node_addr = *new_node.local_address(); + + let new_block = make_new_block( + &chain_config, + Some(&initial_block), + &time_getter.get_time_getter(), + &mut rng, + ); + let new_block_id = new_block.get_id(); + + new_node + .chainstate() + .call_mut(move |cs| { + cs.process_block(new_block, BlockSource::Local).unwrap(); + }) + .await + .unwrap(); + + // Announce the node through the dns seed. + let mut node_addresses = node_addresses; + node_addresses.push(new_node_addr); + node_group.set_dns_seed_addresses(&node_addresses); + + // Wait for some connections to the new node to be established. + wait_for_connections_to(&node_group, new_node_addr, nodes_count / 2).await; + + // Wait for the new block to be propagated to all the nodes. + node_group + .wait_for_block_propagation_advance_time( + nodes_count, + new_block_id, + PEER_MGR_HEARTBEAT_INTERVAL_MAX, + ) + .await; + + log::debug!("shutting down"); + + node_group.join().await; + new_node.join().await; +} + +// Same as peer_discovery_on_stale_tip, but here the "old" nodes start without a fresh block, +// i.e. they are in IBD initially. +#[tracing::instrument(skip(seed))] +#[rstest::rstest] +#[trace] +#[case(Seed::from_entropy())] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn peer_discovery_on_stale_tip_ibd(#[case] seed: Seed) { + timeout(peer_discovery_on_stale_tip_ibd_impl(seed)).await; +} + +async fn peer_discovery_on_stale_tip_ibd_impl(seed: Seed) { + let mut rng = test_utils::random::make_seedable_rng(seed); + let time_getter = P2pBasicTestTimeGetter::new(); + let chain_config = Arc::new(common::chain::config::create_unit_test_config()); + let two_hours = Duration::from_secs(60 * 60 * 2); + let p2p_config = Arc::new(make_p2p_config( + // Note: we'll be moving mocked time forward by 1 hour once and by smaller intervals + // multiple times; because of this, nodes may see each other as dead or as having invalid + // clocks and disconnect each other. To avoid this, we specify artificially large timeouts + // and clock diff. + two_hours.into(), + two_hours.into(), + two_hours.into(), + )); + + let nodes_count = OUTBOUND_FULL_AND_BLOCK_RELAY_COUNT + 1; + let mut nodes = Vec::with_capacity(nodes_count); + + for i in 0..nodes_count { + nodes.push(start_node(&time_getter, &chain_config, &p2p_config, i + 1).await); + } + + let node_group = TestNodeGroup::new(nodes, time_getter.clone()); + let node_addresses = node_group.get_adresses(); + + let address_groups: BTreeSet<_> = node_addresses + .iter() + .map(|addr| AddressGroup::from_peer_address(&addr.as_peer_address())) + .collect(); + // Sanity check - all addresses belong to separate address groups + assert_eq!(address_groups.len(), nodes_count); + + node_group.set_dns_seed_addresses(&node_addresses); + + time_getter.advance_time(PEER_MGR_DNS_RELOAD_INTERVAL); + + // Wait until the maximum number of outbound connections is established. + wait_for_max_outbound_connections(&node_group).await; + + // Advance the time by 1 hour + log::debug!("Advancing time by 1 hour"); + time_getter.advance_time(Duration::from_secs(60 * 60)); + + // All the connections must still be in place + assert_max_outbound_connections(&node_group).await; + + // Start a new node that would produce a block. + let new_node_idx = node_group.nodes().len() + 1; + let new_node = start_node(&time_getter, &chain_config, &p2p_config, new_node_idx).await; + let new_node_addr = *new_node.local_address(); + + let new_block = make_new_block( + &chain_config, + None, + &time_getter.get_time_getter(), + &mut rng, + ); + let new_block_id = new_block.get_id(); + + new_node + .chainstate() + .call_mut(move |cs| { + cs.process_block(new_block, BlockSource::Local).unwrap(); + }) + .await + .unwrap(); + + // Announce the node through the dns seed. + let mut node_addresses = node_addresses; + node_addresses.push(new_node_addr); + node_group.set_dns_seed_addresses(&node_addresses); + + // Wait for some connections to the new node to be established. + wait_for_connections_to(&node_group, new_node_addr, nodes_count / 2).await; + + // Wait for the new block to be propagated to all the nodes. + node_group + .wait_for_block_propagation_advance_time( + nodes_count, + new_block_id, + PEER_MGR_HEARTBEAT_INTERVAL_MAX, + ) + .await; + + log::debug!("shutting down"); + + node_group.join().await; + new_node.join().await; +} + +fn make_transport_with_local_addr_in_group( + group_idx: u32, +) -> <TestTransportChannel as TestTransportMaker>::Transport { + let group_bits = peer_manager::address_groups::IPV4_GROUP_BYTES * 8; + + TestTransportChannel::make_transport_with_local_addr_in_group( + // Make sure that the most significant byte of the address is non-zero + // (all 0.x.x.x addresses get into AddressGroup::Private, but we want all + // addresses to be in different address groups). + group_idx + (1 << (group_bits - 1)), + group_bits as u32, + ) +} + +fn make_p2p_config( + ping_timeout: PingTimeout, + max_clock_diff: MaxClockDiff, + sync_stalling_timeout: SyncStallingTimeout, +) -> P2pConfig { + P2pConfig { + ping_timeout, + max_clock_diff, + sync_stalling_timeout, + + bind_addresses: Default::default(), + socks5_proxy: Default::default(), + disable_noise: Default::default(), + boot_nodes: Default::default(), + reserved_nodes: Default::default(), + max_inbound_connections: Default::default(), + ban_threshold: Default::default(), + ban_duration: Default::default(), + outbound_connection_timeout: Default::default(), + ping_check_period: Default::default(), + node_type: Default::default(), + allow_discover_private_ips: Default::default(), + msg_header_count_limit: Default::default(), + msg_max_locator_count: Default::default(), + max_request_blocks_count: Default::default(), + user_agent: mintlayer_core_user_agent(), + max_message_size: Default::default(), + max_peer_tx_announcements: Default::default(), + max_singular_unconnected_headers: Default::default(), + enable_block_relay_peers: Default::default(), + } +} + +async fn start_node( + time_getter: &P2pBasicTestTimeGetter, + chain_config: &Arc<ChainConfig>, + p2p_config: &Arc<P2pConfig>, + node_index: usize, +) -> TestNode<Transport> { + TestNode::<Transport>::start( + time_getter.get_time_getter(), + Arc::clone(chain_config), + Arc::clone(p2p_config), + make_transport_with_local_addr_in_group(node_index as u32), + TestTransportChannel::make_address(), + TEST_PROTOCOL_VERSION.into(), + ) + .await +} + +async fn start_node_with_a_block( + time_getter: &P2pBasicTestTimeGetter, + chain_config: &Arc<ChainConfig>, + p2p_config: &Arc<P2pConfig>, + node_index: usize, + block: Block, +) -> TestNode<Transport> { + let node = start_node(time_getter, chain_config, p2p_config, node_index).await; + node.chainstate() + .call_mut(move |cs| { + cs.process_block(block, BlockSource::Local).unwrap(); + }) + .await + .unwrap(); + node +} + +async fn wait_for_max_outbound_connections(node_group: &TestNodeGroup<Transport>) { + for node in node_group.nodes() { + let mut outbound_full_relay_peers_count = 0; + let mut outbound_block_relay_peers_count = 0; + while outbound_full_relay_peers_count < OUTBOUND_FULL_RELAY_COUNT + // Note: "-1" is used because one of the block relay connections is not permanent, + // it's dropped and re-established regularly. + || outbound_block_relay_peers_count < OUTBOUND_BLOCK_RELAY_COUNT - 1 + { + tokio::time::sleep(Duration::from_millis(100)).await; + let peers_info = node.get_peers_info().await; + outbound_full_relay_peers_count = + peers_info.count_peers_by_role(PeerRole::OutboundFullRelay); + outbound_block_relay_peers_count = + peers_info.count_peers_by_role(PeerRole::OutboundBlockRelay); + + node_group.time_getter().advance_time(PEER_MGR_HEARTBEAT_INTERVAL_MAX); + } + } +} + +async fn assert_max_outbound_connections(node_group: &TestNodeGroup<Transport>) { + for node in node_group.nodes() { + let peers_info = node.get_peers_info().await; + let outbound_full_relay_peers_count = + peers_info.count_peers_by_role(PeerRole::OutboundFullRelay); + let outbound_block_relay_peers_count = + peers_info.count_peers_by_role(PeerRole::OutboundBlockRelay); + + assert!(outbound_full_relay_peers_count >= OUTBOUND_FULL_RELAY_COUNT); + assert!(outbound_block_relay_peers_count >= OUTBOUND_BLOCK_RELAY_COUNT - 1); + } +} + +async fn wait_for_connections_to( + node_group: &TestNodeGroup<Transport>, + address: SocketAddress, + nodes_count: usize, +) { + let mut connected_nodes_count = 0; + loop { + for node in node_group.nodes() { + let peers_info = node.get_peers_info().await; + if peers_info.info.contains_key(&address) { + connected_nodes_count += 1; + } + } + + if connected_nodes_count >= nodes_count { + break; + } + + node_group.time_getter().advance_time(PEER_MGR_HEARTBEAT_INTERVAL_MAX); + } +} diff --git a/p2p/src/tests/unsupported_version.rs b/p2p/src/tests/unsupported_version.rs index 4c142eb0e7..5f83cc730e 100644 --- a/p2p/src/tests/unsupported_version.rs +++ b/p2p/src/tests/unsupported_version.rs @@ -15,6 +15,7 @@ use std::sync::Arc; +use p2p_test_utils::P2pBasicTestTimeGetter; use test_utils::assert_matches; use crate::{ @@ -35,12 +36,15 @@ where TTM: TestTransportMaker, TTM::Transport: TransportSocket, { + let time_getter = P2pBasicTestTimeGetter::new(); let chain_config = Arc::new(common::chain::config::create_unit_test_config()); let p2p_config = Arc::new(test_p2p_config()); - let test_node = TestNode::<TTM>::start( + let test_node = TestNode::<TTM::Transport>::start( + time_getter.get_time_getter(), Arc::clone(&chain_config), Arc::clone(&p2p_config), + TTM::make_transport(), TTM::make_address(), TEST_PROTOCOL_VERSION.into(), ) @@ -68,9 +72,7 @@ where software_version: *chain_config.software_version(), services: (*p2p_config.node_type).into(), receiver_address: None, - current_time: P2pTimestamp::from_time( - test_node.time_getter().get_time_getter().get_time(), - ), + current_time: P2pTimestamp::from_time(time_getter.get_time_getter().get_time()), })) .await .unwrap(); @@ -110,12 +112,15 @@ where TTM: TestTransportMaker, TTM::Transport: TransportSocket, { + let time_getter = P2pBasicTestTimeGetter::new(); let chain_config = Arc::new(common::chain::config::create_unit_test_config()); let p2p_config = Arc::new(test_p2p_config()); - let test_node = TestNode::<TTM>::start( + let test_node = TestNode::<TTM::Transport>::start( + time_getter.get_time_getter(), Arc::clone(&chain_config), Arc::clone(&p2p_config), + TTM::make_transport(), TTM::make_address(), TEST_PROTOCOL_VERSION.into(), ) @@ -136,9 +141,7 @@ where software_version: *chain_config.software_version(), services: (*p2p_config.node_type).into(), receiver_address: None, - current_time: P2pTimestamp::from_time( - test_node.time_getter().get_time_getter().get_time(), - ), + current_time: P2pTimestamp::from_time(time_getter.get_time_getter().get_time()), handshake_nonce: 0, })) .await @@ -177,12 +180,15 @@ where TTM: TestTransportMaker, TTM::Transport: TransportSocket, { + let time_getter = P2pBasicTestTimeGetter::new(); let chain_config = Arc::new(common::chain::config::create_unit_test_config()); let p2p_config = Arc::new(test_p2p_config()); - let test_node = TestNode::<TTM>::start( + let test_node = TestNode::<TTM::Transport>::start( + time_getter.get_time_getter(), Arc::clone(&chain_config), Arc::clone(&p2p_config), + TTM::make_transport(), TTM::make_address(), TEST_PROTOCOL_VERSION.into(), ) @@ -218,9 +224,7 @@ where software_version: *chain_config.software_version(), services: (*p2p_config.node_type).into(), receiver_address: None, - current_time: P2pTimestamp::from_time( - test_node.time_getter().get_time_getter().get_time(), - ), + current_time: P2pTimestamp::from_time(time_getter.get_time_getter().get_time()), })) .await .unwrap(); @@ -234,9 +238,7 @@ where software_version: *chain_config.software_version(), services: (*p2p_config.node_type).into(), receiver_address: None, - current_time: P2pTimestamp::from_time( - test_node.time_getter().get_time_getter().get_time(), - ), + current_time: P2pTimestamp::from_time(time_getter.get_time_getter().get_time()), })) .await .unwrap(); diff --git a/test-utils/Cargo.toml b/test-utils/Cargo.toml index 401ebb98f7..3f18850f29 100644 --- a/test-utils/Cargo.toml +++ b/test-utils/Cargo.toml @@ -10,6 +10,7 @@ rust-version.workspace = true [dependencies] common = {path = '../common'} crypto = {path = '../crypto'} +logging = { path = "../logging/" } serialization = { path = '../serialization' } utils = { path = '../utils' }