diff --git a/node/src/events.rs b/node/src/events.rs index 2d7dcae3..17c458e2 100644 --- a/node/src/events.rs +++ b/node/src/events.rs @@ -167,6 +167,9 @@ pub struct NodeEventInfo { #[serde(tag = "type")] #[serde(rename_all = "snake_case")] pub enum NodeEvent { + /// Node is connecting to bootnodes + ConnectingToBootnodes, + /// Peer just connected PeerConnected { #[serde(serialize_with = "serialize_as_string")] @@ -295,7 +298,8 @@ impl NodeEvent { | NodeEvent::FatalSyncerError { .. } | NodeEvent::FetchingHeadersFailed { .. } | NodeEvent::NetworkCompromised => true, - NodeEvent::PeerConnected { .. } + NodeEvent::ConnectingToBootnodes + | NodeEvent::PeerConnected { .. } | NodeEvent::PeerDisconnected { .. } | NodeEvent::SamplingStarted { .. } | NodeEvent::ShareSamplingResult { .. } @@ -312,6 +316,9 @@ impl NodeEvent { impl fmt::Display for NodeEvent { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { + NodeEvent::ConnectingToBootnodes => { + write!(f, "Connecting to bootnodes") + } NodeEvent::PeerConnected { id, trusted } => { if *trusted { write!(f, "Trusted peer connected: {id}") diff --git a/node/src/p2p.rs b/node/src/p2p.rs index 22b46a34..fc9d718d 100644 --- a/node/src/p2p.rs +++ b/node/src/p2p.rs @@ -39,7 +39,10 @@ use libp2p::{ kad, multiaddr::Protocol, ping, - swarm::{ConnectionId, NetworkBehaviour, NetworkInfo, Swarm, SwarmEvent}, + swarm::{ + dial_opts::{DialOpts, PeerCondition}, + ConnectionId, NetworkBehaviour, NetworkInfo, Swarm, SwarmEvent, + }, Multiaddr, PeerId, }; use smallvec::SmallVec; @@ -54,7 +57,7 @@ pub(crate) mod shwap; mod swarm; use crate::block_ranges::BlockRange; -use crate::events::EventPublisher; +use crate::events::{EventPublisher, NodeEvent}; use crate::executor::{self, spawn, Interval}; use crate::p2p::header_ex::{HeaderExBehaviour, HeaderExConfig}; use crate::p2p::header_session::HeaderSession; @@ -580,6 +583,8 @@ where bitswap_queries: HashMap, P2pError>>, network_compromised_token: CancellationToken, store: Arc, + event_pub: EventPublisher, + bootnodes: HashMap>, } struct HeaderSubState { @@ -639,15 +644,20 @@ where } } + let mut bootnodes = HashMap::<_, Vec<_>>::new(); + for addr in args.bootnodes { - // Bootstrap peers are always trusted - if let Some(peer_id) = addr.peer_id() { - peer_tracker.set_trusted(peer_id, true); - } + let peer_id = addr.peer_id().expect("multiaddr already validated"); + bootnodes.entry(peer_id).or_default().push(addr); + } - if let Err(e) = swarm.dial(addr.clone()) { - error!("Failed to dial on {addr}: {e}"); - } + for (peer_id, addrs) in bootnodes.iter_mut() { + addrs.sort(); + addrs.dedup(); + addrs.shrink_to_fit(); + + // Bootstrap peers are always trusted + peer_tracker.set_trusted(*peer_id, true); } Ok(Worker { @@ -660,6 +670,8 @@ where bitswap_queries: HashMap::new(), network_compromised_token: CancellationToken::new(), store: args.store, + event_pub: args.event_pub, + bootnodes, }) } @@ -667,6 +679,8 @@ where let mut report_interval = Interval::new(Duration::from_secs(60)).await; let mut kademlia_interval = Interval::new(Duration::from_secs(30)).await; + self.dial_bootnodes(); + // Initiate discovery let _ = self.swarm.behaviour_mut().kademlia.bootstrap(); @@ -699,6 +713,26 @@ where } } + fn dial_bootnodes(&mut self) { + self.event_pub.send(NodeEvent::ConnectingToBootnodes); + + for (peer_id, addrs) in &self.bootnodes { + let dial_opts = DialOpts::peer_id(*peer_id) + .addresses(addrs.clone()) + // Without this set, `kademlia::Behaviour` won't be able to canonicalize + // `/tls/ws` to `/wss`. + .extend_addresses_through_behaviour() + // Tell Swarm not to dial if peer is already connected or there + // is an ongoing dialing. + .condition(PeerCondition::DisconnectedAndNotDialing) + .build(); + + if let Err(e) = self.swarm.dial(dial_opts) { + error!("Failed to dial on {addrs:?}: {e}"); + } + } + } + fn prune_canceled_bitswap_queries(&mut self) { let mut cancelled = SmallVec::<[_; 16]>::new(); diff --git a/node/src/syncer.rs b/node/src/syncer.rs index 2262235e..449abe68 100644 --- a/node/src/syncer.rs +++ b/node/src/syncer.rs @@ -379,9 +379,11 @@ where fn spawn_try_init(&self) -> oneshot::Receiver> { let p2p = self.p2p.clone(); let store = self.store.clone(); + let event_pub = self.event_pub.clone(); let (tx, rx) = oneshot::channel(); let fut = async move { + let mut event_reported = false; let now = Instant::now(); let mut backoff = ExponentialBackoffBuilder::default() .with_max_interval(TRY_INIT_BACKOFF_MAX_INTERVAL) @@ -389,7 +391,7 @@ where .build(); loop { - match try_init(&p2p, &*store).await { + match try_init(&p2p, &*store, &event_pub, &mut event_reported).await { Ok(network_head) => { tx.maybe_send(Ok((network_head, now.elapsed()))); break; @@ -410,8 +412,6 @@ where } }; - self.event_pub.send(NodeEvent::FetchingHeadHeaderStarted); - spawn_cancellable( self.cancellation_token.child_token(), fut.instrument(info_span!("try_init")), @@ -603,12 +603,22 @@ fn in_syncing_window(header: &ExtendedHeader) -> bool { header.time().after(syncing_window_start) } -async fn try_init(p2p: &P2p, store: &S) -> Result +async fn try_init( + p2p: &P2p, + store: &S, + event_pub: &EventPublisher, + event_reported: &mut bool, +) -> Result where S: Store, { p2p.wait_connected_trusted().await?; + if !*event_reported { + event_pub.send(NodeEvent::FetchingHeadHeaderStarted); + *event_reported = true; + } + let network_head = p2p.get_head_header().await?; // If the network head and the store head have the same height,