diff --git a/Cargo.lock b/Cargo.lock index 842b3450..54053a0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -581,6 +581,7 @@ dependencies = [ "prost", "rand", "serde_json", + "smallvec", "tendermint-proto", "thiserror", "tokio", @@ -3657,9 +3658,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" +checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" [[package]] name = "smol_str" diff --git a/node/Cargo.toml b/node/Cargo.toml index 8126ae6a..c822fbed 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -22,6 +22,7 @@ libp2p = { version = "0.52.3", features = [ "kad", ] } prost = "0.12.0" +smallvec = { version = "1.11.1", features = ["union", "const_generics"] } thiserror = "1.0.48" tokio = { version = "1.32.0", features = ["macros", "sync"] } tracing = "0.1.37" diff --git a/node/src/exchange/client.rs b/node/src/exchange/client.rs index 6f11f6d5..a61697d7 100644 --- a/node/src/exchange/client.rs +++ b/node/src/exchange/client.rs @@ -295,6 +295,7 @@ mod tests { use celestia_types::consts::HASH_SIZE; use celestia_types::hash::Hash; use celestia_types::test_utils::{invalidate, unverify, ExtendedHeaderGenerator}; + use libp2p::swarm::ConnectionId; use std::collections::VecDeque; use std::sync::atomic::{AtomicU64, Ordering}; @@ -1053,8 +1054,10 @@ mod tests { fn peer_tracker_with_n_peers(amount: usize) -> Arc { let peers = Arc::new(PeerTracker::new()); - for _ in 0..amount { - peers.connected(PeerId::random(), None); + for i in 0..amount { + let peer = PeerId::random(); + peers.set_trusted(peer); + peers.set_connected(peer, ConnectionId::new_unchecked(i), None); } peers diff --git a/node/src/p2p.rs b/node/src/p2p.rs index 6135bba3..95671609 100644 --- a/node/src/p2p.rs +++ b/node/src/p2p.rs @@ -17,8 +17,8 @@ use libp2p::{ multiaddr::Protocol, ping, swarm::{ - keep_alive, DialError, NetworkBehaviour, NetworkInfo, Swarm, SwarmBuilder, SwarmEvent, - THandlerErr, + keep_alive, ConnectionId, DialError, NetworkBehaviour, NetworkInfo, Swarm, SwarmBuilder, + SwarmEvent, THandlerErr, }, Multiaddr, PeerId, TransportError, }; @@ -29,6 +29,7 @@ use tracing::{debug, info, instrument, trace, warn}; use crate::exchange::{ExchangeBehaviour, ExchangeConfig}; use crate::executor::{spawn, Executor}; use crate::peer_tracker::PeerTracker; +use crate::peer_tracker::PeerTrackerInfo; use crate::store::Store; use crate::utils::{ celestia_protocol_id, gossipsub_ident_topic, MultiaddrExt, OneshotResultSender, @@ -77,6 +78,7 @@ impl From for P2pError { pub struct P2p { cmd_tx: mpsc::Sender, header_sub_watcher: watch::Receiver>, + peer_tracker_info_watcher: watch::Receiver, _store: PhantomData, } @@ -99,9 +101,6 @@ pub enum P2pCmd { request: HeaderRequest, respond_to: OneshotResultSender, P2pError>, }, - WaitConnected { - respond_to: oneshot::Sender<()>, - }, Listeners { respond_to: oneshot::Sender>, }, @@ -122,7 +121,11 @@ where async fn start(args: P2pArgs) -> Result { let (cmd_tx, cmd_rx) = mpsc::channel(16); let (header_sub_tx, header_sub_rx) = watch::channel(None); - let mut worker = Worker::new(args, cmd_rx, header_sub_tx)?; + + let peer_tracker = Arc::new(PeerTracker::new()); + let peer_tracker_info_watcher = peer_tracker.info_watcher(); + + let mut worker = Worker::new(args, cmd_rx, header_sub_tx, peer_tracker)?; spawn(async move { worker.run().await; @@ -131,6 +134,7 @@ where Ok(P2p { cmd_tx, header_sub_watcher: header_sub_rx, + peer_tracker_info_watcher, _store: PhantomData, }) } @@ -155,14 +159,22 @@ pub trait P2pService: type Store: Store; fn new_header_sub_watcher(&self) -> watch::Receiver>; + fn peer_tracker_info_watcher(&self) -> watch::Receiver; async fn wait_connected(&self) -> Result<()> { - let (tx, rx) = oneshot::channel(); - - self.send_command(P2pCmd::WaitConnected { respond_to: tx }) - .await?; + self.peer_tracker_info_watcher() + .wait_for(|info| info.num_connected_peers > 0) + .await + .map(drop) + .map_err(|_| P2pError::WorkerDied) + } - Ok(rx.await?) + async fn wait_connected_trusted(&self) -> Result<()> { + self.peer_tracker_info_watcher() + .wait_for(|info| info.num_connected_trusted_peers > 0) + .await + .map(drop) + .map_err(|_| P2pError::WorkerDied) } async fn network_info(&self) -> Result { @@ -263,6 +275,10 @@ where fn new_header_sub_watcher(&self) -> watch::Receiver> { self.header_sub_watcher.clone() } + + fn peer_tracker_info_watcher(&self) -> watch::Receiver { + self.peer_tracker_info_watcher.clone() + } } /// Our network behaviour. @@ -288,7 +304,6 @@ where header_sub_topic_hash: TopicHash, cmd_rx: mpsc::Receiver, peer_tracker: Arc, - wait_connected_tx: Option>>, header_sub_watcher: watch::Sender>, store: Arc, } @@ -301,8 +316,8 @@ where args: P2pArgs, cmd_rx: mpsc::Receiver, header_sub_watcher: watch::Sender>, + peer_tracker: Arc, ) -> Result { - let peer_tracker = Arc::new(PeerTracker::new()); let local_peer_id = PeerId::from(args.local_keypair.public()); let autonat = autonat::Behaviour::new(local_peer_id, autonat::Config::default()); @@ -343,6 +358,10 @@ where } for addr in args.bootstrap_peers { + // Bootstrap peers are always trusted + if let Some(peer_id) = addr.peer_id() { + peer_tracker.set_trusted(peer_id); + } swarm.dial(addr)?; } @@ -351,7 +370,6 @@ where swarm, header_sub_topic_hash: header_sub_topic.hash(), peer_tracker, - wait_connected_tx: None, header_sub_watcher, store: args.store.clone(), }) @@ -389,12 +407,19 @@ where | BehaviourEvent::HeaderEx(_) => {} }, SwarmEvent::ConnectionEstablished { - peer_id, endpoint, .. + peer_id, + connection_id, + endpoint, + .. } => { - self.on_peer_connected(peer_id, endpoint); + self.on_peer_connected(peer_id, connection_id, endpoint); } - SwarmEvent::ConnectionClosed { peer_id, .. } => { - self.on_peer_disconnected(peer_id); + SwarmEvent::ConnectionClosed { + peer_id, + connection_id, + .. + } => { + self.on_peer_disconnected(peer_id, connection_id); } _ => {} } @@ -416,9 +441,6 @@ where .header_ex .send_request(request, respond_to); } - P2pCmd::WaitConnected { respond_to } => { - self.on_wait_connected(respond_to); - } P2pCmd::Listeners { respond_to } => { let local_peer_id = self.swarm.local_peer_id().to_owned(); let listeners = self @@ -450,7 +472,7 @@ where let kademlia = &mut self.swarm.behaviour_mut().kademlia; // Inform peer tracker - self.peer_tracker.identified(peer_id, &info); + self.peer_tracker.set_identified(peer_id, &info); // Inform Kademlia for addr in info.listen_addrs { @@ -522,7 +544,7 @@ where #[instrument(skip_all, fields(peer_id = %peer_id))] fn peer_maybe_discovered(&mut self, peer_id: PeerId) { - if !self.peer_tracker.maybe_discovered(peer_id) { + if !self.peer_tracker.set_maybe_discovered(peer_id) { return; } @@ -536,7 +558,12 @@ where } #[instrument(skip_all, fields(peer_id = %peer_id))] - fn on_peer_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { + fn on_peer_connected( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + endpoint: ConnectedPoint, + ) { info!("Peer connected"); // Inform PeerTracker about the dialed address. @@ -552,26 +579,17 @@ where _ => None, }; - self.peer_tracker.connected(peer_id, dialed_addr); - - for tx in self.wait_connected_tx.take().into_iter().flatten() { - tx.maybe_send(()); - } + self.peer_tracker + .set_connected(peer_id, connection_id, dialed_addr); } #[instrument(skip_all, fields(peer_id = %peer_id))] - fn on_peer_disconnected(&mut self, peer_id: PeerId) { - info!("Peer disconnected"); - self.peer_tracker.disconnected(peer_id); - } - - fn on_wait_connected(&mut self, respond_to: oneshot::Sender<()>) { - if self.peer_tracker.is_anyone_connected() { - respond_to.maybe_send(()); - } else { - self.wait_connected_tx - .get_or_insert_with(Vec::new) - .push(respond_to); + fn on_peer_disconnected(&mut self, peer_id: PeerId, connection_id: ConnectionId) { + if self + .peer_tracker + .set_maybe_disconnected(peer_id, connection_id) + { + info!("Peer disconnected"); } } diff --git a/node/src/peer_tracker.rs b/node/src/peer_tracker.rs index 850c879c..8414a631 100644 --- a/node/src/peer_tracker.rs +++ b/node/src/peer_tracker.rs @@ -3,17 +3,31 @@ use std::borrow::Borrow; use dashmap::mapref::entry::Entry; use dashmap::mapref::one::RefMut; use dashmap::DashMap; -use libp2p::{identify, Multiaddr, PeerId}; +use libp2p::{identify, swarm::ConnectionId, Multiaddr, PeerId}; +use smallvec::SmallVec; +use tokio::sync::watch; +/// Keeps track various information about peers. #[derive(Debug)] pub struct PeerTracker { peers: DashMap, + info_tx: watch::Sender, +} + +#[derive(Debug, Clone, Default)] +pub struct PeerTrackerInfo { + /// Number of the connected peers. + pub num_connected_peers: u64, + /// Number of the connected trusted peers. + pub num_connected_trusted_peers: u64, } #[derive(Debug)] struct PeerInfo { - addrs: Vec, state: PeerState, + addrs: SmallVec<[Multiaddr; 4]>, + connections: SmallVec<[ConnectionId; 1]>, + trusted: bool, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -31,18 +45,35 @@ impl PeerInfo { } impl PeerTracker { + /// Constructs an empty PeerTracker. pub fn new() -> Self { PeerTracker { peers: DashMap::new(), + info_tx: watch::channel(PeerTrackerInfo::default()).0, } } - pub fn maybe_discovered(&self, peer: PeerId) -> bool { + /// Returns the current [`PeerTrackerInfo`]. + pub fn info(&self) -> PeerTrackerInfo { + self.info_tx.borrow().to_owned() + } + + /// Returns a watcher for any [`PeerTrackerInfo`] changes. + pub fn info_watcher(&self) -> watch::Receiver { + self.info_tx.subscribe() + } + + /// Sets peer as discovered if this is it's first appearance. + /// + /// Returns `true` if peer was not known from before. + pub fn set_maybe_discovered(&self, peer: PeerId) -> bool { match self.peers.entry(peer) { Entry::Vacant(entry) => { entry.insert(PeerInfo { - addrs: Vec::new(), state: PeerState::Discovered, + addrs: SmallVec::new(), + connections: SmallVec::new(), + trusted: false, }); true } @@ -55,11 +86,14 @@ impl PeerTracker { /// If peer is not found it is added as `PeerState::Discovered`. fn get(&self, peer: PeerId) -> RefMut { self.peers.entry(peer).or_insert_with(|| PeerInfo { - addrs: Vec::new(), state: PeerState::Discovered, + addrs: SmallVec::new(), + connections: SmallVec::new(), + trusted: false, }) } + /// Add an address for a peer. pub fn add_addresses(&self, peer: PeerId, addrs: I) where I: IntoIterator, @@ -81,56 +115,87 @@ impl PeerTracker { } } - pub fn connected(&self, peer: PeerId, address: impl Into>) { - let mut state = self.get(peer); + /// Sets peer as trusted. + pub fn set_trusted(&self, peer: PeerId) { + self.get(peer).value_mut().trusted = true; + } + + /// Sets peer as connected. + pub fn set_connected( + &self, + peer: PeerId, + connection_id: ConnectionId, + address: impl Into>, + ) { + let mut peer_info = self.get(peer); if let Some(address) = address.into() { - if !state.addrs.contains(&address) { - state.addrs.push(address); + if !peer_info.addrs.contains(&address) { + peer_info.addrs.push(address); } } - state.state = PeerState::Connected; + peer_info.connections.push(connection_id); + + // If peer was not already connected from before + if !peer_info.is_connected() { + peer_info.state = PeerState::Connected; + increment_connected_peers(&self.info_tx, peer_info.trusted); + } } - pub fn disconnected(&self, peer: PeerId) { - let mut state = self.get(peer); + /// Sets peer as disconnected if `connection_id` was the last connection. + /// + /// Returns `true` if was set to disconnected. + pub fn set_maybe_disconnected(&self, peer: PeerId, connection_id: ConnectionId) -> bool { + let mut peer_info = self.get(peer); + + peer_info.connections.retain(|id| *id != connection_id); - if state.addrs.is_empty() { - state.state = PeerState::Discovered; + // If this is the last connection from the peer + if peer_info.connections.is_empty() { + if peer_info.addrs.is_empty() { + peer_info.state = PeerState::Discovered; + } else { + peer_info.state = PeerState::AddressesFound; + } + + decrement_connected_peers(&self.info_tx, peer_info.trusted); + true } else { - state.state = PeerState::AddressesFound; + false } } - pub fn identified(&self, peer: PeerId, info: &identify::Info) { - let mut state = self.get(peer); + /// Sets peer as identified. + pub fn set_identified(&self, peer: PeerId, info: &identify::Info) { + let mut peer_info = self.get(peer); for addr in &info.listen_addrs { - if !state.addrs.contains(addr) { - state.addrs.push(addr.to_owned()); + if !peer_info.addrs.contains(addr) { + peer_info.addrs.push(addr.to_owned()); } } - state.state = PeerState::Identified - } - - pub fn is_anyone_connected(&self) -> bool { - self.peers.iter().any(|pair| pair.value().is_connected()) + peer_info.state = PeerState::Identified; } + /// Returns true if peer is connected. pub fn is_connected(&self, peer: PeerId) -> bool { self.get(peer).is_connected() } - pub fn addresses(&self, peer: PeerId) -> Vec { + /// Returns the addresses of the peer. + pub fn addresses(&self, peer: PeerId) -> SmallVec<[Multiaddr; 4]> { self.get(peer).addrs.clone() } + /// Removes a peer. pub fn remove(&self, peer: PeerId) { self.peers.remove(&peer); } + /// Returns connected peers. pub fn connected_peers(&self) -> Vec { self.peers .iter() @@ -139,6 +204,7 @@ impl PeerTracker { .collect() } + /// Returns one of the best peers. pub fn best_peer(&self) -> Option { // TODO: Implement peer score and return the best. self.peers @@ -147,6 +213,7 @@ impl PeerTracker { .map(|pair| pair.key().to_owned()) } + /// Returns up to N amount of best peers. pub fn best_n_peers(&self, limit: usize) -> Vec { // TODO: Implement peer score and return the best N peers. self.peers @@ -157,6 +224,17 @@ impl PeerTracker { // collect instead of returning an iter to not block the dashmap .collect() } + + /// Returns up to N amount of trusted peers. + pub fn trusted_n_peers(&self, limit: usize) -> Vec { + self.peers + .iter() + .filter(|pair| pair.value().is_connected() && pair.value().trusted) + .take(limit) + .map(|pair| pair.key().to_owned()) + // collect instead of returning an iter to not block the dashmap + .collect() + } } impl Default for PeerTracker { @@ -164,3 +242,23 @@ impl Default for PeerTracker { PeerTracker::new() } } + +fn increment_connected_peers(info_tx: &watch::Sender, trusted: bool) { + info_tx.send_modify(|tracker_info| { + tracker_info.num_connected_peers += 1; + + if trusted { + tracker_info.num_connected_trusted_peers += 1; + } + }); +} + +fn decrement_connected_peers(info_tx: &watch::Sender, trusted: bool) { + info_tx.send_modify(|tracker_info| { + tracker_info.num_connected_peers -= 1; + + if trusted { + tracker_info.num_connected_trusted_peers -= 1; + } + }); +}