diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index 989eb78f868..5d4fd3eade2 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -100,7 +100,7 @@ pub(crate) enum ClosingReason { #[error("Received a message of type not allowed on this connection.")] DisallowedMessage, #[error("PeerManager requested to close the connection")] - PeerManager, + PeerManagerRequest, #[error("Received DisconnectMessage from peer")] DisconnectMessage, #[error("Peer clock skew exceeded {MAX_CLOCK_SKEW}")] @@ -1551,7 +1551,7 @@ impl actix::Handler> for PeerActor { ctx, match msg.ban_reason { Some(reason) => ClosingReason::Ban(reason), - None => ClosingReason::PeerManager, + None => ClosingReason::PeerManagerRequest, }, ); } diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index a56866c2130..dffe001a964 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -498,10 +498,7 @@ impl PeerManagerActor { let _timer = metrics::PEER_MANAGER_TRIGGER_TIME.with_label_values(&["monitor_peers"]).start_timer(); - self.state.peer_store.unban(&self.clock); - if let Err(err) = self.state.peer_store.update_connected_peers_last_seen(&self.clock) { - tracing::error!(target: "network", ?err, "Failed to update peers last seen time."); - } + self.state.peer_store.update(&self.clock); if self.is_outbound_bootstrap_needed() { let tier2 = self.state.tier2.load(); @@ -547,10 +544,6 @@ impl PeerManagerActor { // If there are too many active connections try to remove some connections self.maybe_stop_active_connection(); - if let Err(err) = self.state.peer_store.remove_expired(&self.clock) { - tracing::error!(target: "network", ?err, "Failed to remove expired peers"); - }; - // Find peers that are not reliable (too much behind) - and make sure that we're not routing messages through them. let unreliable_peers = self.unreliable_peers(); metrics::PEER_UNRELIABLE.set(unreliable_peers.len() as i64); diff --git a/chain/network/src/peer_manager/peer_store/mod.rs b/chain/network/src/peer_manager/peer_store/mod.rs index ea536310b6a..206d61c1bb8 100644 --- a/chain/network/src/peer_manager/peer_store/mod.rs +++ b/chain/network/src/peer_manager/peer_store/mod.rs @@ -18,6 +18,9 @@ mod testonly; #[cfg(test)] mod tests; +/// How often to update the KnownPeerState.last_seen in storage. +const UPDATE_LAST_SEEN_INTERVAL: time::Duration = time::Duration::minutes(1); + /// Level of trust we have about a new (PeerId, Addr) pair. #[derive(Eq, PartialEq, Debug, Clone, Copy)] enum TrustLevel { @@ -224,16 +227,73 @@ impl Inner { } Ok(()) } + + /// Removes peers that are not responding for expiration period. + fn remove_expired(&mut self, now: time::Utc) { + let mut to_remove = vec![]; + for (peer_id, peer_status) in self.peer_states.iter() { + if peer_status.status != KnownPeerStatus::Connected + && now > peer_status.last_seen + self.config.peer_expiration_duration + { + tracing::debug!(target: "network", "Removing peer: last seen {:?} ago", now-peer_status.last_seen); + to_remove.push(peer_id.clone()); + } + } + if let Err(err) = self.delete_peers(&to_remove) { + tracing::error!(target: "network", ?err, "Failed to remove expired peers"); + } + } + + fn unban(&mut self, now: time::Utc) { + let mut to_unban = vec![]; + for (peer_id, peer_state) in &self.peer_states { + if let KnownPeerStatus::Banned(_, ban_time) = peer_state.status { + if now < ban_time + self.config.ban_window { + continue; + } + tracing::info!(target: "network", unbanned = ?peer_id, ?ban_time, "unbanning a peer"); + to_unban.push(peer_id.clone()); + } + } + for peer_id in &to_unban { + if let Err(err) = self.peer_unban(&peer_id) { + tracing::error!(target: "network", ?peer_id, ?err, "Failed to unban a peer"); + } + } + } + + /// Update the 'last_seen' time for all the peers that we're currently connected to. + fn update_last_seen(&mut self, now: time::Utc) { + for (peer_id, peer_state) in self.peer_states.iter_mut() { + if peer_state.status == KnownPeerStatus::Connected + && now > peer_state.last_seen + UPDATE_LAST_SEEN_INTERVAL + { + peer_state.last_seen = now; + if let Err(err) = self.store.set_peer_state(peer_id, peer_state) { + tracing::error!(target: "network", ?peer_id, ?err, "Failed to update peers last seen time."); + } + } + } + } + + /// Cleans up the state of the PeerStore, due to passing time. + /// * it unbans a peer if config.ban_window has passed + /// * it updates KnownPeerStatus.last_seen of the connected peers + /// * it removes peers which were not seen for config.peer_expiration_duration + /// This function should be called periodically. + pub fn update(&mut self, clock: &time::Clock) { + let now = clock.now_utc(); + // TODO(gprusak): these operations could be put into a single DB write transaction. + self.unban(now); + self.update_last_seen(now); + self.remove_expired(now); + } } pub(crate) struct PeerStore(Mutex); impl PeerStore { - pub(crate) fn new( - clock: &time::Clock, - config: Config, - store: store::Store, - ) -> anyhow::Result { + pub fn new(clock: &time::Clock, config: Config, store: store::Store) -> anyhow::Result { let boot_nodes: HashSet<_> = config.boot_nodes.iter().map(|p| p.id.clone()).collect(); // A mapping from `PeerId` to `KnownPeerState`. let mut peerid_2_state = HashMap::default(); @@ -345,29 +405,29 @@ impl PeerStore { self.0.lock().config.blacklist.contains(*addr) } - pub(crate) fn len(&self) -> usize { + pub fn len(&self) -> usize { self.0.lock().peer_states.len() } - pub(crate) fn is_banned(&self, peer_id: &PeerId) -> bool { + pub fn is_banned(&self, peer_id: &PeerId) -> bool { self.0.lock().peer_states.get(peer_id).map_or(false, |s| s.status.is_banned()) } - pub(crate) fn count_banned(&self) -> usize { + pub fn count_banned(&self) -> usize { self.0.lock().peer_states.values().filter(|st| st.status.is_banned()).count() } + pub fn update(&self, clock: &time::Clock) { + self.0.lock().update(clock) + } + #[allow(dead_code)] /// Returns the state of the current peer in memory. - pub(crate) fn get_peer_state(&self, peer_id: &PeerId) -> Option { + pub fn get_peer_state(&self, peer_id: &PeerId) -> Option { self.0.lock().peer_states.get(peer_id).cloned() } - pub(crate) fn peer_connected( - &self, - clock: &time::Clock, - peer_info: &PeerInfo, - ) -> anyhow::Result<()> { + pub fn peer_connected(&self, clock: &time::Clock, peer_info: &PeerInfo) -> anyhow::Result<()> { let mut inner = self.0.lock(); inner.add_signed_peer(clock, peer_info.clone())?; let mut store = inner.store.clone(); @@ -377,29 +437,7 @@ impl PeerStore { Ok(store.set_peer_state(&peer_info.id, entry)?) } - /// Update the 'last_seen' time for all the peers that we're currently connected to. - pub(crate) fn update_connected_peers_last_seen( - &self, - clock: &time::Clock, - ) -> anyhow::Result<()> { - let mut inner = self.0.lock(); - let mut store = inner.store.clone(); - for (peer_id, peer_state) in inner.peer_states.iter_mut() { - if peer_state.status == KnownPeerStatus::Connected - && clock.now_utc() > peer_state.last_seen.saturating_add(time::Duration::minutes(1)) - { - peer_state.last_seen = clock.now_utc(); - store.set_peer_state(peer_id, peer_state)? - } - } - Ok(()) - } - - pub(crate) fn peer_disconnected( - &self, - clock: &time::Clock, - peer_id: &PeerId, - ) -> anyhow::Result<()> { + pub fn peer_disconnected(&self, clock: &time::Clock, peer_id: &PeerId) -> anyhow::Result<()> { let mut inner = self.0.lock(); let mut store = inner.store.clone(); if let Some(peer_state) = inner.peer_states.get_mut(peer_id) { @@ -414,7 +452,7 @@ impl PeerStore { /// Records the last attempt to connect to peer. /// Marks the peer as Unknown (as we failed to connect to it). - pub(crate) fn peer_connection_attempt( + pub fn peer_connection_attempt( &self, clock: &time::Clock, peer_id: &PeerId, @@ -437,7 +475,7 @@ impl PeerStore { Ok(()) } - pub(crate) fn peer_ban( + pub fn peer_ban( &self, clock: &time::Clock, peer_id: &PeerId, @@ -459,7 +497,7 @@ impl PeerStore { /// Return unconnected or peers with unknown status that we can try to connect to. /// Peers with unknown addresses are filtered out. - pub(crate) fn unconnected_peer( + pub fn unconnected_peer( &self, ignore_fn: impl Fn(&KnownPeerState) -> bool, prefer_previously_connected_peer: bool, @@ -499,29 +537,12 @@ impl PeerStore { } /// Return healthy known peers up to given amount. - pub(crate) fn healthy_peers(&self, max_count: usize) -> Vec { + pub fn healthy_peers(&self, max_count: usize) -> Vec { self.0 .lock() .find_peers(|p| matches!(p.status, KnownPeerStatus::Banned(_, _)).not(), max_count) } - /// Removes peers that are not responding for expiration period. - pub(crate) fn remove_expired(&self, clock: &time::Clock) -> anyhow::Result<()> { - let mut inner = self.0.lock(); - let now = clock.now_utc(); - let mut to_remove = vec![]; - for (peer_id, peer_status) in inner.peer_states.iter() { - let diff = now - peer_status.last_seen; - if peer_status.status != KnownPeerStatus::Connected - && diff > inner.config.peer_expiration_duration - { - tracing::debug!(target: "network", "Removing peer: last seen {:?} ago", diff); - to_remove.push(peer_id.clone()); - } - } - inner.delete_peers(&to_remove) - } - /// Adds peers we’ve learned about from other peers. /// /// Identities of the nodes hasn’t been verified in any way. We don’t even @@ -529,7 +550,7 @@ impl PeerStore { /// are nodes there we haven’t received signatures of their peer ID. /// /// See also [`Self::add_direct_peer`] and [`Self::add_signed_peer`]. - pub(crate) fn add_indirect_peers( + pub fn add_indirect_peers( &self, clock: &time::Clock, peers: impl Iterator, @@ -561,34 +582,10 @@ impl PeerStore { /// confirming that identity yet. /// /// See also [`Self::add_indirect_peers`] and [`Self::add_signed_peer`]. - pub(crate) fn add_direct_peer( - &self, - clock: &time::Clock, - peer_info: PeerInfo, - ) -> anyhow::Result<()> { + pub fn add_direct_peer(&self, clock: &time::Clock, peer_info: PeerInfo) -> anyhow::Result<()> { self.0.lock().add_peer(clock, peer_info, TrustLevel::Direct) } - pub fn unban(&self, clock: &time::Clock) { - let mut inner = self.0.lock(); - let now = clock.now_utc(); - let mut to_unban = vec![]; - for (peer_id, peer_state) in &inner.peer_states { - if let KnownPeerStatus::Banned(_, ban_time) = peer_state.status { - if now < ban_time + inner.config.ban_window { - continue; - } - tracing::info!(target: "network", unbanned = ?peer_id, ?ban_time, "unbanning a peer"); - to_unban.push(peer_id.clone()); - } - } - for peer_id in &to_unban { - if let Err(err) = inner.peer_unban(&peer_id) { - tracing::error!(target: "network", ?err, "Failed to unban a peer"); - } - } - } - pub fn load(&self) -> HashMap { self.0.lock().peer_states.clone() } diff --git a/chain/network/src/peer_manager/testonly.rs b/chain/network/src/peer_manager/testonly.rs index 801b0e2e48f..2e686660170 100644 --- a/chain/network/src/peer_manager/testonly.rs +++ b/chain/network/src/peer_manager/testonly.rs @@ -15,6 +15,7 @@ use crate::testonly::fake_client; use crate::time; use crate::types::{ AccountKeys, ChainInfo, KnownPeerStatus, NetworkRequests, PeerManagerMessageRequest, + ReasonForBan, }; use crate::PeerManagerActor; use near_o11y::WithSpanContextExt; @@ -151,7 +152,7 @@ impl ActorHandler { &self, peer_info: &PeerInfo, tier: tcp::Tier, - ) -> impl 'static + Send + Future { + ) -> impl 'static + Send + Future { let addr = self.actix.addr.clone(); let events = self.events.clone(); let peer_info = peer_info.clone(); @@ -165,7 +166,7 @@ impl ActorHandler { Event::PeerManager(PME::HandshakeCompleted(ev)) if ev.stream_id == stream_id => { - Some(()) + Some(stream_id) } Event::PeerManager(PME::ConnectionClosed(ev)) if ev.stream_id == stream_id => { panic!("PeerManager rejected the handshake") @@ -324,6 +325,25 @@ impl ActorHandler { self.with_state(move |s| async move { s.tier1_advertise_proxies(&clock).await }).await } + pub async fn disconnect_and_ban( + &self, + clock: &time::Clock, + peer_id: &PeerId, + reason: ReasonForBan, + ) { + // TODO(gprusak): make it wait asynchronously for the connection to get closed. + // TODO(gprusak): figure out how to await for both ends to disconnect. + let clock = clock.clone(); + let peer_id = peer_id.clone(); + self.with_state(move |s| async move { s.disconnect_and_ban(&clock, &peer_id, reason) }) + .await + } + + pub async fn peer_store_update(&self, clock: &time::Clock) { + let clock = clock.clone(); + self.with_state(move |s| async move { s.peer_store.update(&clock) }).await; + } + pub async fn send_ping(&self, nonce: u64, target: PeerId) { self.actix .addr @@ -452,7 +472,7 @@ pub(crate) async fn start( cfg: config::NetworkConfig, chain: Arc, ) -> ActorHandler { - let (send, recv) = broadcast::unbounded_channel(); + let (send, mut recv) = broadcast::unbounded_channel(); let actix = ActixSystem::spawn({ let mut cfg = cfg.clone(); let chain = chain.clone(); @@ -464,9 +484,13 @@ pub(crate) async fn start( } }) .await; - let mut h = ActorHandler { cfg, actix, events: recv }; + let h = ActorHandler { cfg, actix, events: recv.clone() }; // Wait for the server to start. - assert_eq!(Event::PeerManager(PME::ServerStarted), h.events.recv().await); + recv.recv_until(|ev| match ev { + Event::PeerManager(PME::ServerStarted) => Some(()), + _ => None, + }) + .await; h.set_chain_info(chain.get_chain_info()).await; h } diff --git a/chain/network/src/peer_manager/tests/routing.rs b/chain/network/src/peer_manager/tests/routing.rs index fa87b27c9f8..2c16145dd68 100644 --- a/chain/network/src/peer_manager/tests/routing.rs +++ b/chain/network/src/peer_manager/tests/routing.rs @@ -14,8 +14,8 @@ use crate::store; use crate::tcp; use crate::testonly::{make_rng, Rng}; use crate::time; -use crate::types::PeerInfo; use crate::types::PeerMessage; +use crate::types::{PeerInfo, ReasonForBan}; use near_o11y::testonly::init_test_logger; use near_primitives::network::PeerId; use near_store::db::TestDB; @@ -600,7 +600,10 @@ async fn test_dropping_duplicate_messages() { wait_for_pong(&mut pm0_ev, Pong { nonce: 1, source: id2.clone() }).await; } -// Awaits until a ConnectionClosed event with the expected reason is seen in the event stream. +/// Awaits until a ConnectionClosed event with the expected reason is seen in the event stream. +/// This helper function should be used in tests with peer manager instances with +/// `config.outbound_enabled = true`, because it makes the order of spawning connections +/// non-deterministic, so we cannot just wait for the first ConnectionClosed event. pub(crate) async fn wait_for_connection_closed( events: &mut broadcast::Receiver, want_reason: ClosingReason, @@ -1268,11 +1271,11 @@ async fn archival_node() { tracing::info!(target:"test", "connect node 4 to node 0 and wait for pm0 to close a connection"); pm4.send_outbound_connect(&pm0.peer_info(), tcp::Tier::T2).await; - wait_for_connection_closed(&mut pm0_ev, ClosingReason::PeerManager).await; + wait_for_connection_closed(&mut pm0_ev, ClosingReason::PeerManagerRequest).await; tracing::info!(target:"test", "connect node 1 to node 0 and wait for pm0 to close a connection"); pm1.send_outbound_connect(&pm0.peer_info(), tcp::Tier::T2).await; - wait_for_connection_closed(&mut pm0_ev, ClosingReason::PeerManager).await; + wait_for_connection_closed(&mut pm0_ev, ClosingReason::PeerManagerRequest).await; tracing::info!(target:"test", "check that node 0 and node 1 are still connected"); pm0.wait_for_direct_connection(id1.clone()).await; @@ -1294,9 +1297,69 @@ async fn archival_node() { tracing::info!(target:"test", "[{_step}] connect the chosen node to node 0 and wait for pm0 to close a connection"); chosen.send_outbound_connect(&pm0.peer_info(), tcp::Tier::T2).await; - wait_for_connection_closed(&mut pm0_ev, ClosingReason::PeerManager).await; + wait_for_connection_closed(&mut pm0_ev, ClosingReason::PeerManagerRequest).await; tracing::info!(target:"test", "[{_step}] check that node 0 and node 1 are still connected"); pm0.wait_for_direct_connection(id1.clone()).await; } } + +/// Awaits for ConnectionClosed event for a given `stream_id`. +async fn wait_for_stream_closed( + events: &mut broadcast::Receiver, + stream_id: tcp::StreamId, +) -> ClosingReason { + events + .recv_until(|ev| match ev { + Event::PeerManager(PME::ConnectionClosed(ev)) if ev.stream_id == stream_id => { + Some(ev.reason) + } + _ => None, + }) + .await +} + +/// Check two peers are able to connect again after one peers is banned and unbanned. +#[tokio::test] +async fn connect_to_unbanned_peer() { + init_test_logger(); + let mut rng = make_rng(921853233); + let rng = &mut rng; + let mut clock = time::FakeClock::default(); + let chain = Arc::new(data::Chain::make(&mut clock, rng, 10)); + + let mut pm0 = + start_pm(clock.clock(), TestDB::new(), chain.make_config(rng), chain.clone()).await; + let mut pm1 = + start_pm(clock.clock(), TestDB::new(), chain.make_config(rng), chain.clone()).await; + + tracing::info!(target:"test", "pm0 connects to pm1"); + let stream_id = pm0.connect_to(&pm1.peer_info(), tcp::Tier::T2).await; + + tracing::info!(target:"test", "pm1 bans pm0"); + let ban_reason = ReasonForBan::BadBlock; + pm1.disconnect_and_ban(&clock.clock(), &pm0.cfg.node_id(), ban_reason).await; + wait_for_stream_closed(&mut pm0.events, stream_id).await; + assert_eq!( + ClosingReason::Ban(ban_reason), + wait_for_stream_closed(&mut pm1.events, stream_id).await + ); + + tracing::info!(target:"test", "pm0 fails to reconnect to pm1"); + let got_reason = pm1 + .start_inbound(chain.clone(), pm0.cfg.clone()) + .await + .manager_fail_handshake(&clock.clock()) + .await; + assert_eq!(ClosingReason::RejectedByPeerManager(RegisterPeerError::Banned), got_reason); + + tracing::info!(target:"test", "pm1 unbans pm0"); + clock.advance(pm1.cfg.peer_store.ban_window); + pm1.peer_store_update(&clock.clock()).await; + + tracing::info!(target:"test", "pm0 reconnects to pm1"); + pm0.connect_to(&pm1.peer_info(), tcp::Tier::T2).await; + + drop(pm0); + drop(pm1); +} diff --git a/integration-tests/src/tests/network/ban_peers.rs b/integration-tests/src/tests/network/ban_peers.rs index de9f3cfe773..ebe371bb0c2 100644 --- a/integration-tests/src/tests/network/ban_peers.rs +++ b/integration-tests/src/tests/network/ban_peers.rs @@ -22,30 +22,3 @@ fn dont_connect_to_banned_peer() -> anyhow::Result<()> { start_test(runner) } - -/// Check two peers are able to connect again after one peers is banned and unbanned. -#[test] -fn connect_to_unbanned_peer() -> anyhow::Result<()> { - let mut runner = Runner::new(2, 2) - .enable_outbound() - .use_boot_nodes(vec![0, 1]) - .ban_window(time::Duration::seconds(2)); - - // Check both peers are connected - runner.push(Action::CheckRoutingTable(0, vec![(1, vec![1])])); - runner.push(Action::CheckRoutingTable(1, vec![(0, vec![0])])); - - // Ban peer 1 - runner.push_action(ban_peer(0, 1)); - - runner.push(Action::Wait(time::Duration::milliseconds(1000))); - // During two seconds peer is banned so no connection is possible. - runner.push(Action::CheckRoutingTable(0, vec![])); - runner.push(Action::CheckRoutingTable(1, vec![])); - - // After two seconds peer is unbanned and they should be able to connect again. - runner.push(Action::CheckRoutingTable(0, vec![(1, vec![1])])); - runner.push(Action::CheckRoutingTable(1, vec![(0, vec![0])])); - - start_test(runner) -}