From d257c23a123b96a9c2ac32edd8d7ce9e8e21e9fd Mon Sep 17 00:00:00 2001 From: Vid Kersic Date: Wed, 3 Apr 2024 09:51:24 +0200 Subject: [PATCH] chore: p2p peer management and discovery mechanism --- Cargo.lock | 2 + crates/grpc/src/uopool.rs | 18 +- crates/p2p/Cargo.toml | 2 + crates/p2p/src/discovery/enr_ext.rs | 33 ++- crates/p2p/src/discovery/mod.rs | 160 +++++++++++---- crates/p2p/src/peer_manager/mod.rs | 191 +++++++++++++++++- .../p2p/src/peer_manager/network_behaviour.rs | 141 ++++++++++--- crates/p2p/src/peer_manager/peer/peer_info.rs | 116 ++++++++++- crates/p2p/src/peer_manager/peerdb.rs | 131 ++++++++++-- crates/p2p/src/rpc/methods.rs | 2 +- crates/p2p/src/service/behaviour.rs | 20 +- crates/p2p/src/service/mod.rs | 175 ++++++++++------ crates/p2p/src/types/globals.rs | 4 + crates/p2p/tests/common.rs | 22 +- crates/p2p/tests/rpc.rs | 35 +--- crates/primitives/src/constants.rs | 12 +- docs/P2P.md | 8 +- 17 files changed, 834 insertions(+), 238 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7689222e..5379443a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7065,6 +7065,7 @@ dependencies = [ "lazy_static", "libp2p", "libp2p-mplex", + "lru 0.12.2", "parking_lot 0.12.1", "sha2 0.10.8", "silius-primitives", @@ -7074,6 +7075,7 @@ dependencies = [ "tempdir", "test-log", "thiserror", + "tiny-keccak", "tokio", "tokio-util", "tracing", diff --git a/crates/grpc/src/uopool.rs b/crates/grpc/src/uopool.rs index 051d10e0..a3d7c87f 100644 --- a/crates/grpc/src/uopool.rs +++ b/crates/grpc/src/uopool.rs @@ -471,23 +471,13 @@ where mempool_channels.push((ep, waiting_to_pub_rv, p2p_userop_sd)) } - let listen_addrs = config.listen_addr.to_multi_addr(); - let mut p2p_network = - Network::new(config.clone(), mempool_channels).expect("p2p network init failed"); - - for listen_addr in listen_addrs.into_iter() { - info!("P2P node listened on {}", listen_addr); - p2p_network.listen_on(listen_addr).expect("Listen on p2p network failed"); - } - if config.bootnodes.is_empty() { - info!("Start p2p mode without bootnodes"); + info!("Starting p2p mode without bootnodes"); } - for enr in config.bootnodes.into_iter() { - info!("Trying to dial p2p node {enr:}"); - p2p_network.dial(enr).expect("Dial bootnode failed"); - } + let mut p2p_network = Network::new(config.clone(), mempool_channels) + .await + .expect("p2p network init failed"); tokio::spawn(async move { loop { diff --git a/crates/p2p/Cargo.toml b/crates/p2p/Cargo.toml index d5ac0eb0..f9f66440 100644 --- a/crates/p2p/Cargo.toml +++ b/crates/p2p/Cargo.toml @@ -39,6 +39,7 @@ libp2p-mplex = { version = "0.41.0" } # cryptography sha2 = "0.10.8" +tiny-keccak = "2" # async async-trait = { workspace = true } @@ -54,6 +55,7 @@ tokio-util = { version = "0.7.10", features = ["codec"] } delay_map = "0.3.0" eyre = { workspace = true } lazy_static = { workspace = true } +lru = "0.12" snap = "1.1.1" thiserror = { workspace = true } tracing = { workspace = true } diff --git a/crates/p2p/src/discovery/enr_ext.rs b/crates/p2p/src/discovery/enr_ext.rs index 49ae9f98..a9d255e1 100644 --- a/crates/p2p/src/discovery/enr_ext.rs +++ b/crates/p2p/src/discovery/enr_ext.rs @@ -1,9 +1,10 @@ use discv5::{enr::CombinedPublicKey, Enr}; use libp2p::{ - identity::{ed25519, secp256k1, PublicKey}, + identity::{ed25519, secp256k1, KeyType, PublicKey}, multiaddr::Protocol, Multiaddr, PeerId, }; +use tiny_keccak::{Hasher, Keccak}; pub trait EnrExt { /// PeerId of the ENR @@ -58,3 +59,33 @@ impl CombinedPublicKeyExt for CombinedPublicKey { } } } + +pub fn peer_id_to_node_id(peer_id: &PeerId) -> Result { + let pk_bytes = &peer_id.to_bytes()[2..]; + + let public_key = PublicKey::try_decode_protobuf(pk_bytes) + .map_err(|e| format!(" Cannot parse libp2p public key public key from peer id: {e}"))?; + + match public_key.key_type() { + KeyType::Secp256k1 => { + let pk = public_key.clone().try_into_secp256k1().expect("right key type"); + let uncompressed_key_bytes = &pk.to_bytes_uncompressed()[1..]; + let mut output = [0_u8; 32]; + let mut hasher = Keccak::v256(); + hasher.update(uncompressed_key_bytes); + hasher.finalize(&mut output); + Ok(discv5::enr::NodeId::parse(&output).expect("Must be correct length")) + } + KeyType::Ed25519 => { + let pk = public_key.clone().try_into_ed25519().expect("right key type"); + let uncompressed_key_bytes = pk.to_bytes(); + let mut output = [0_u8; 32]; + let mut hasher = Keccak::v256(); + hasher.update(&uncompressed_key_bytes); + hasher.finalize(&mut output); + Ok(discv5::enr::NodeId::parse(&output).expect("Must be correct length")) + } + + _ => Err(format!("Unsupported public key from peer {peer_id}")), + } +} diff --git a/crates/p2p/src/discovery/mod.rs b/crates/p2p/src/discovery/mod.rs index fd9c0a13..1f5e9dbb 100644 --- a/crates/p2p/src/discovery/mod.rs +++ b/crates/p2p/src/discovery/mod.rs @@ -1,19 +1,33 @@ pub mod enr; pub mod enr_ext; -use crate::config::Config; +use self::enr_ext::{peer_id_to_node_id, EnrExt}; +use crate::{config::Config, types::globals::NetworkGlobals}; use discv5::{ enr::{CombinedKey, NodeId}, - ConfigBuilder, Discv5, Enr, Event, + Discv5, Enr, Event, }; -use futures::{stream::FuturesUnordered, Future, FutureExt, StreamExt}; -use libp2p::swarm::{dummy::ConnectionHandler, NetworkBehaviour}; -use std::{collections::HashSet, pin::Pin, task::Poll}; +use futures::{stream::FuturesUnordered, Future, FutureExt, StreamExt, TryFutureExt}; +use libp2p::{ + core::Endpoint, + swarm::{ + dummy::ConnectionHandler, ConnectionDenied, ConnectionId, DialError, DialFailure, + FromSwarm, NetworkBehaviour, THandler, THandlerOutEvent, ToSwarm, + }, + Multiaddr, PeerId, +}; +use lru::LruCache; +use std::{num::NonZeroUsize, pin::Pin, sync::Arc, task::Poll}; use tokio::sync::mpsc; use tracing::{debug, warn}; type QueryResult = Result, discv5::QueryError>; +#[derive(Debug)] +pub struct DiscoveredPeers { + pub peers: Vec, +} + pub enum EventStream { /// Awaiting an event stream to be generated. This is required due to the poll nature of /// `Discovery` @@ -26,30 +40,51 @@ pub enum EventStream { pub struct Discovery { /// Core discv5 service. - pub discovery: Discv5, + discovery: Discv5, + + /// Network globals. + _network_globals: Arc, /// Active discovery queries. active_queries: FuturesUnordered + Send>>>, /// A cache of discovered ENRs. - cached_enrs: HashSet, + cached_enrs: LruCache, /// The event stream of discv5. event_stream: EventStream, } impl Discovery { - pub fn new(enr: Enr, key: CombinedKey, config: Config) -> eyre::Result { - let config = ConfigBuilder::new(config.listen_addr.to_listen_config()).build(); - let discovery: Discv5<_> = Discv5::new(enr, key, config).map_err(|e| eyre::anyhow!(e))?; + pub async fn new( + key: CombinedKey, + config: Config, + network_globals: Arc, + ) -> eyre::Result { + let enr = network_globals.local_enr(); + + let mut discovery: Discv5<_> = + Discv5::new(enr, key, config.discv5_config).map_err(|e| eyre::anyhow!(e))?; + + // adding bootnodes + for bootnode in config.bootnodes { + if bootnode.peer_id() == network_globals.peer_id() { + continue; + } - let event_stream_fut = discovery.event_stream().boxed(); + let _ = discovery.add_enr(bootnode); + } + + // start the discv5 service + discovery.start().map_err(|e| eyre::format_err!(e.to_string())).await?; + let event_stream = EventStream::Awaiting(Box::pin(discovery.event_stream())); Ok(Self { discovery, + _network_globals: network_globals, active_queries: Default::default(), - cached_enrs: HashSet::new(), - event_stream: EventStream::Awaiting(event_stream_fut), + cached_enrs: LruCache::new(NonZeroUsize::new(50).expect("50 is a valid value")), + event_stream, }) } @@ -58,9 +93,25 @@ impl Discovery { self.discovery.local_enr() } + /// Adds an ENR to the discovery. + pub fn add_enr(&mut self, enr: Enr) -> eyre::Result<()> { + self.discovery.add_enr(enr).map_err(|e| eyre::eyre!(e.to_string())) + } + + /// Return cached ENRs. + pub fn cached_enrs(&self) -> impl Iterator { + self.cached_enrs.iter().map(|(_, enr)| enr) + } + + /// Remove cached ENR. + pub fn remove_enr(&mut self, peer_id: &PeerId) { + self.cached_enrs.pop(peer_id); + } + /// Discovers peers on the network. pub fn discover_peers(&mut self, target_peers: usize) { debug!("Starting a peer discovery request target_peers {target_peers:}"); + // Generate a random target node id. let random_node = NodeId::random(); let predicate: Box bool + Send> = @@ -71,11 +122,28 @@ impl Discovery { self.active_queries.push(Box::pin(query_future)); } -} -#[derive(Debug)] -pub struct DiscoveredPeers { - pub peers: Vec, + pub fn disconnect_peer(&mut self, peer_id: &PeerId) { + if let Ok(node_id) = peer_id_to_node_id(peer_id) { + self.discovery.disconnect_node(&node_id); + } + self.cached_enrs.pop(peer_id); + } + + pub fn on_dial_failure(&mut self, peer_id: Option, error: &DialError) { + if let Some(peer_id) = peer_id { + match error { + DialError::LocalPeerId { .. } | + DialError::Denied { .. } | + DialError::NoAddresses | + DialError::Transport(_) | + DialError::WrongPeerId { .. } => { + self.disconnect_peer(&peer_id); + } + DialError::DialPeerConditionFalse(_) | DialError::Aborted => {} + } + } + } } impl NetworkBehaviour for Discovery { @@ -88,10 +156,11 @@ impl NetworkBehaviour for Discovery { ) -> Poll>> { while let Poll::Ready(Some(query_result)) = self.active_queries.poll_next_unpin(cx) { match query_result { - Ok(enrs) => { - for enr in enrs.into_iter() { - self.cached_enrs.insert(enr); + Ok(peers) => { + for enr in peers.iter() { + self.cached_enrs.put(enr.peer_id(), enr.clone()); } + return Poll::Ready(ToSwarm::GenerateEvent(DiscoveredPeers { peers })); } Err(e) => warn!("Discovery query failed: {:?}", e), } @@ -123,55 +192,60 @@ impl NetworkBehaviour for Discovery { } EventStream::InActive => {} }; + Poll::Pending } - fn on_swarm_event(&mut self, _event: libp2p::swarm::FromSwarm) {} + fn on_swarm_event(&mut self, event: FromSwarm) { + if let FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) = event { + self.on_dial_failure(peer_id, error) + } + } fn on_connection_handler_event( &mut self, - _peer_id: libp2p::PeerId, - _connection_id: libp2p::swarm::ConnectionId, - _event: libp2p::swarm::THandlerOutEvent, + _peer_id: PeerId, + _connection_id: ConnectionId, + _event: THandlerOutEvent, ) { } fn handle_established_inbound_connection( &mut self, - _connection_id: libp2p::swarm::ConnectionId, - _peer: libp2p::PeerId, - _local_addr: &libp2p::Multiaddr, - _remote_addr: &libp2p::Multiaddr, - ) -> Result, libp2p::swarm::ConnectionDenied> { + _connection_id: ConnectionId, + _peer: PeerId, + _local_addr: &Multiaddr, + _remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { Ok(ConnectionHandler) } fn handle_established_outbound_connection( &mut self, - _connection_id: libp2p::swarm::ConnectionId, - _peer: libp2p::PeerId, - _addr: &libp2p::Multiaddr, - _role_override: libp2p::core::Endpoint, - ) -> Result, libp2p::swarm::ConnectionDenied> { + _connection_id: ConnectionId, + _peer: PeerId, + _addr: &Multiaddr, + _role_override: Endpoint, + ) -> Result, ConnectionDenied> { Ok(ConnectionHandler) } fn handle_pending_inbound_connection( &mut self, - _connection_id: libp2p::swarm::ConnectionId, - _local_addr: &libp2p::Multiaddr, - _remote_addr: &libp2p::Multiaddr, - ) -> Result<(), libp2p::swarm::ConnectionDenied> { + _connection_id: ConnectionId, + _local_addr: &Multiaddr, + _remote_addr: &Multiaddr, + ) -> Result<(), ConnectionDenied> { Ok(()) } fn handle_pending_outbound_connection( &mut self, - _connection_id: libp2p::swarm::ConnectionId, - _maybe_peer: Option, - _addresses: &[libp2p::Multiaddr], - _effective_role: libp2p::core::Endpoint, - ) -> Result, libp2p::swarm::ConnectionDenied> { + _connection_id: ConnectionId, + _maybe_peer: Option, + _addresses: &[Multiaddr], + _effective_role: Endpoint, + ) -> Result, ConnectionDenied> { Ok(Vec::new()) } } diff --git a/crates/p2p/src/peer_manager/mod.rs b/crates/p2p/src/peer_manager/mod.rs index c9e23da8..7bba9ca5 100644 --- a/crates/p2p/src/peer_manager/mod.rs +++ b/crates/p2p/src/peer_manager/mod.rs @@ -2,11 +2,20 @@ pub mod network_behaviour; pub mod peer; pub mod peerdb; -use crate::types::globals::NetworkGlobals; +use self::peer::peer_info::ConnectionDirection; +use crate::{ + discovery::enr_ext::EnrExt, + rpc::methods::{GoodbyeReason, MetaData}, + types::globals::NetworkGlobals, +}; use delay_map::HashSetDelay; -use libp2p::PeerId; -use silius_primitives::constants::p2p::PING_INTERVAL; +use discv5::Enr; +use libp2p::{Multiaddr, PeerId}; +use silius_primitives::constants::p2p::{ + HEARTBEAT_INTERVAL, PING_INTERVAL_INBOUND, PING_INTERVAL_OUTBOUND, TARGET_PEERS, +}; use std::{collections::VecDeque, sync::Arc, time::Duration}; +use tracing::debug; /// The events that the `PeerManager` outputs (requests). #[derive(Debug)] @@ -17,30 +26,190 @@ pub enum PeerManagerEvent { PeerConnectedOutgoing(PeerId), /// A peer has disconnected. PeerDisconnected(PeerId), - /// Sends a PING to a peer. + /// Sends a ping to a peer. Ping(PeerId), + /// Request metadata fro a peer. + MetaData(PeerId), /// Request the behaviour to discover more peers and the amount of peers to discover. DiscoverPeers(usize), + /// Discconnecting from peer. + DisconnectPeer(PeerId, GoodbyeReason), +} + +enum ConnectingType { + Dialing, + IngoingConnected { multiaddr: Multiaddr }, + OutgoingConnected { multiaddr: Multiaddr }, } pub struct PeerManager { /// Accessing `PeerDB` through network globals. network_globals: Arc, - /// A list of peers that we need to ping. - ping_peers: HashSetDelay, - /// the target peers we want to connect, - _target_peers: usize, /// Events that the `PeerManager` outputs. events: VecDeque, + /// List of inbound peers we need to ping. + inbound_ping_peers: HashSetDelay, + /// List of outbound peers we need to ping. + outbound_ping_peers: HashSetDelay, + /// the target peers we want to connect, + target_peers: usize, + /// Peers needs to be dialed. + peers_to_dial: Vec, + /// The heartbeat interval for peer management. + heartbeat: tokio::time::Interval, } impl PeerManager { - pub fn new(network_globals: Arc, target_peers: usize) -> Self { + pub fn new(network_globals: Arc) -> Self { Self { network_globals, - ping_peers: HashSetDelay::new(Duration::from_secs(PING_INTERVAL)), - _target_peers: target_peers, events: Default::default(), + inbound_ping_peers: HashSetDelay::new(Duration::from_secs(PING_INTERVAL_INBOUND)), + outbound_ping_peers: HashSetDelay::new(Duration::from_secs(PING_INTERVAL_OUTBOUND)), + target_peers: TARGET_PEERS, + peers_to_dial: Vec::new(), + heartbeat: tokio::time::interval(Duration::from_secs(HEARTBEAT_INTERVAL)), + } + } + + pub fn is_connected(&self, peer_id: &PeerId) -> bool { + self.network_globals.peers.read().is_connected(peer_id) + } + + pub fn dial_peer(&mut self, enr: Enr) -> bool { + if self.network_globals.peers.read().should_dial(&enr.peer_id()) { + self.peers_to_dial.push(enr); + true + } else { + false + } + } + + pub fn maintain_peer_count(&mut self, dialing_peers: usize) { + if self.network_globals.connected_or_dialing_peers() < + self.target_peers.saturating_sub(dialing_peers) && + dialing_peers != 0 + { + self.events.push_back(PeerManagerEvent::DiscoverPeers(dialing_peers)); + } + } + + pub fn peers_discovered(&mut self, results: Vec) { + let mut to_dial_peers = 0; + + for enr in results { + if !self.peers_to_dial.contains(&enr) { + let peer_id = enr.peer_id(); + if self.dial_peer(enr) { + debug! {"Dialing discovered peer: {}", peer_id}; + to_dial_peers += 1; + } + } } + + self.maintain_peer_count(to_dial_peers); + } + + /// Ping request has been received. + pub fn ping_request(&mut self, peer_id: &PeerId, seq: u64) { + if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) { + match peer_info.connection_direction() { + Some(ConnectionDirection::Incoming) => { + self.inbound_ping_peers.insert(*peer_id); + } + Some(ConnectionDirection::Outgoing) => { + self.outbound_ping_peers.insert(*peer_id); + } + None => {} + } + + if let Some(metadata) = &peer_info.metadata() { + if metadata.seq_number < seq { + self.events.push_back(PeerManagerEvent::MetaData(*peer_id)); + } + } else { + self.events.push_back(PeerManagerEvent::MetaData(*peer_id)); + } + } + } + + /// The peer has responded with a pong. + pub fn pong_response(&mut self, peer_id: &PeerId, seq: u64) { + if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) { + if let Some(metadata) = &peer_info.metadata() { + if metadata.seq_number < seq { + self.events.push_back(PeerManagerEvent::MetaData(*peer_id)); + } + } else { + self.events.push_back(PeerManagerEvent::MetaData(*peer_id)); + } + } + } + + /// The peer has responded with metadata. + pub fn metadata_response(&mut self, peer_id: &PeerId, metadata: MetaData) { + if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { + peer_info.set_metadata(metadata); + } + } + + fn inject_connect_ingoing( + &mut self, + peer_id: &PeerId, + multiaddr: Multiaddr, + enr: Option, + ) -> bool { + self.inject_peer_connection(peer_id, ConnectingType::IngoingConnected { multiaddr }, enr) + } + + fn inject_connect_outgoing( + &mut self, + peer_id: &PeerId, + multiaddr: Multiaddr, + enr: Option, + ) -> bool { + self.inject_peer_connection(peer_id, ConnectingType::OutgoingConnected { multiaddr }, enr) + } + + fn inject_disconnect(&mut self, peer_id: &PeerId) { + self.network_globals.peers.write().inject_disconnect(peer_id); + + self.inbound_ping_peers.remove(peer_id); + self.outbound_ping_peers.remove(peer_id); + } + + fn inject_peer_connection( + &mut self, + peer_id: &PeerId, + connection: ConnectingType, + enr: Option, + ) -> bool { + let mut peer_db = self.network_globals.peers.write(); + + match connection { + ConnectingType::Dialing => { + peer_db.dialing_peer(peer_id, enr); + return true; + } + ConnectingType::IngoingConnected { multiaddr } => { + peer_db.connect_ingoing(peer_id, multiaddr, enr); + self.inbound_ping_peers.insert(*peer_id); + } + ConnectingType::OutgoingConnected { multiaddr } => { + peer_db.connect_outgoing(peer_id, multiaddr, enr); + self.outbound_ping_peers.insert(*peer_id); + } + } + + true + } + + fn _disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { + self.events.push_back(PeerManagerEvent::DisconnectPeer(peer_id, reason)); + self.network_globals.peers.write().notify_disconnecting(&peer_id); + } + + fn heartbeat(&mut self) { + // TODO: optionally run discovery } } diff --git a/crates/p2p/src/peer_manager/network_behaviour.rs b/crates/p2p/src/peer_manager/network_behaviour.rs index 55a594dd..1dfbb71e 100644 --- a/crates/p2p/src/peer_manager/network_behaviour.rs +++ b/crates/p2p/src/peer_manager/network_behaviour.rs @@ -1,7 +1,14 @@ -use super::{PeerManager, PeerManagerEvent}; +use super::{ConnectingType, PeerManager, PeerManagerEvent}; +use crate::discovery::enr_ext::EnrExt; use futures::StreamExt; use libp2p::{ - swarm::{dummy::ConnectionHandler, NetworkBehaviour, ToSwarm}, + core::ConnectedPoint, + swarm::{ + behaviour::ConnectionEstablished, + dial_opts::{DialOpts, PeerCondition}, + dummy::ConnectionHandler, + ConnectionClosed, DialFailure, FromSwarm, NetworkBehaviour, ToSwarm, + }, PeerId, }; use std::task::Poll; @@ -12,9 +19,23 @@ impl NetworkBehaviour for PeerManager { type ToSwarm = PeerManagerEvent; fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) { - if let libp2p::swarm::FromSwarm::ConnectionClosed(close_info) = event { - self.network_globals.peers.write().disconnect(close_info.peer_id); - self.events.push_back(PeerManagerEvent::PeerDisconnected(close_info.peer_id)); + match event { + FromSwarm::ConnectionEstablished(ConnectionEstablished { + peer_id, + endpoint, + other_established, + .. + }) => self.on_connection_established(peer_id, endpoint, other_established), + FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, + endpoint, + remaining_established, + .. + }) => self.on_connection_closed(peer_id, endpoint, remaining_established), + FromSwarm::DialFailure(DialFailure { peer_id, error: _, connection_id: _ }) => { + self.on_dial_failure(peer_id) + } + _ => {} } } @@ -29,26 +50,20 @@ impl NetworkBehaviour for PeerManager { fn handle_established_inbound_connection( &mut self, _connection_id: libp2p::swarm::ConnectionId, - peer: PeerId, + _peer: PeerId, _local_addr: &libp2p::Multiaddr, _remote_addr: &libp2p::Multiaddr, ) -> Result, libp2p::swarm::ConnectionDenied> { - self.network_globals.peers.write().new_connected(peer); - self.ping_peers.insert(peer); - self.events.push_back(PeerManagerEvent::PeerConnectedIncoming(peer)); Ok(ConnectionHandler) } fn handle_established_outbound_connection( &mut self, _connection_id: libp2p::swarm::ConnectionId, - peer: PeerId, + _peer: PeerId, _addr: &libp2p::Multiaddr, _role_override: libp2p::core::Endpoint, ) -> Result, libp2p::swarm::ConnectionDenied> { - self.network_globals.peers.write().new_connected(peer); - self.ping_peers.insert(peer); - self.events.push_back(PeerManagerEvent::PeerConnectedOutgoing(peer)); Ok(ConnectionHandler) } @@ -75,22 +90,102 @@ impl NetworkBehaviour for PeerManager { &mut self, cx: &mut std::task::Context<'_>, ) -> Poll>> { + while self.heartbeat.poll_tick(cx).is_ready() { + self.heartbeat(); + } + + loop { + match self.inbound_ping_peers.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(peer_id))) => { + self.inbound_ping_peers.insert(peer_id); + self.events.push_back(PeerManagerEvent::Ping(peer_id)); + } + Poll::Ready(Some(Err(e))) => { + error!("Failed to check inbound ping peer with {e:?}"); + } + Poll::Ready(None) | Poll::Pending => break, + } + } + loop { - match self.ping_peers.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(peer))) => { - self.events.push_back(PeerManagerEvent::Ping(peer)); - self.ping_peers.insert(peer); + match self.outbound_ping_peers.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(peer_id))) => { + self.outbound_ping_peers.insert(peer_id); + self.events.push_back(PeerManagerEvent::Ping(peer_id)); } Poll::Ready(Some(Err(e))) => { - error!("Failed to check ping peer with {e:?}") + error!("Failed to check inbound ping peer with {e:?}"); } - _ => break, - }; + Poll::Ready(None) | Poll::Pending => break, + } + } + + if !self.events.is_empty() { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(ToSwarm::GenerateEvent(event)); + } + } + + if let Some(enr) = self.peers_to_dial.pop() { + let peer_id = enr.peer_id(); + self.inject_peer_connection(&peer_id, ConnectingType::Dialing, Some(enr.clone())); + return Poll::Ready(ToSwarm::Dial { + opts: DialOpts::peer_id(peer_id) + .condition(PeerCondition::Disconnected) + .addresses(enr.multiaddr()) + .build(), + }); + } + + Poll::Pending + } +} + +impl PeerManager { + fn on_connection_established( + &mut self, + peer_id: PeerId, + endpoint: &ConnectedPoint, + other_established: usize, + ) { + if other_established == 0 { + self.events.push_back(PeerManagerEvent::MetaData(peer_id)); } - match self.events.pop_front() { - Some(event) => Poll::Ready(ToSwarm::GenerateEvent(event)), - _ => Poll::Pending, + match endpoint { + ConnectedPoint::Listener { send_back_addr, .. } => { + self.inject_connect_ingoing(&peer_id, send_back_addr.clone(), None); + self.events.push_back(PeerManagerEvent::PeerConnectedIncoming(peer_id)); + } + ConnectedPoint::Dialer { address, .. } => { + self.inject_connect_outgoing(&peer_id, address.clone(), None); + self.events.push_back(PeerManagerEvent::PeerConnectedOutgoing(peer_id)); + } + } + } + + fn on_connection_closed( + &mut self, + peer_id: PeerId, + _endpoint: &ConnectedPoint, + remaining_established: usize, + ) { + if remaining_established > 0 { + return; + } + + if self.network_globals.peers.read().is_connected_or_disconnecting(&peer_id) { + self.events.push_back(PeerManagerEvent::PeerDisconnected(peer_id)); + } + + self.inject_disconnect(&peer_id); + } + + fn on_dial_failure(&mut self, peer_id: Option) { + if let Some(peer_id) = peer_id { + if !self.network_globals.peers.read().is_connected(&peer_id) { + self.inject_disconnect(&peer_id); + } } } } diff --git a/crates/p2p/src/peer_manager/peer/peer_info.rs b/crates/p2p/src/peer_manager/peer/peer_info.rs index 7ea6a449..e0e53cf7 100644 --- a/crates/p2p/src/peer_manager/peer/peer_info.rs +++ b/crates/p2p/src/peer_manager/peer/peer_info.rs @@ -1,17 +1,123 @@ use crate::rpc::methods::MetaData; +use discv5::Enr; +use eyre::Result; +use libp2p::Multiaddr; /// Information about a peer. -#[derive(Debug, Clone, Default)] +#[derive(Default, Debug, Clone)] pub struct PeerInfo { /// Connection status of the peer. - pub connection_status: ConnectionStatus, + connection_status: PeerConnectionStatus, + /// ENR of the peer. + enr: Option, /// Metadata of the peer. - pub _metadata: Option, // TODO: need to handle metadata updates + metadata: Option, + /// Connection direction (ingoing or outgoing). + connection_direction: Option, +} + +impl PeerInfo { + pub fn connection_status(&self) -> &PeerConnectionStatus { + &self.connection_status + } + + pub fn enr(&self) -> &Option { + &self.enr + } + + pub fn metadata(&self) -> &Option { + &self.metadata + } + + pub fn set_metadata(&mut self, metadata: MetaData) { + self.metadata = Some(metadata); + } + + pub fn connection_direction(&self) -> &Option { + &self.connection_direction + } + + pub fn is_connected(&self) -> bool { + matches!(self.connection_status, PeerConnectionStatus::Connected) + } + + pub fn is_disconnected(&self) -> bool { + matches!(self.connection_status, PeerConnectionStatus::Disconnected) + } + + pub fn is_dialing(&self) -> bool { + matches!(self.connection_status, PeerConnectionStatus::Dialing) + } + + pub fn is_connected_or_dialing(&self) -> bool { + self.is_connected() || self.is_dialing() + } + + pub fn set_enr(&mut self, enr: Option) { + self.enr = enr; + } + + pub fn set_connection_status(&mut self, connection_status: PeerConnectionStatus) { + self.connection_status = connection_status; + } + + pub fn set_dialing_peer(&mut self) -> Result<()> { + match &mut self.connection_status { + PeerConnectionStatus::Connected => { + return Err(eyre::eyre!("Dialing peer is already connected")); + } + PeerConnectionStatus::Disconnecting => { + return Err(eyre::eyre!("Dialing peer is disconnecting")); + } + PeerConnectionStatus::Dialing => { + return Err(eyre::eyre!("Dialing peer is already dialing")); + } + PeerConnectionStatus::Disconnected | PeerConnectionStatus::Unknown => { + self.connection_status = PeerConnectionStatus::Dialing; + } + } + Ok(()) + } + + pub fn connect_ingoing(&mut self, _multiaddr: Multiaddr) { + match &mut self.connection_status { + PeerConnectionStatus::Connected | + PeerConnectionStatus::Disconnected | + PeerConnectionStatus::Disconnecting | + PeerConnectionStatus::Dialing | + PeerConnectionStatus::Unknown => { + self.connection_status = PeerConnectionStatus::Connected; + self.connection_direction = Some(ConnectionDirection::Incoming); + } + } + } + + pub fn connect_outgoing(&mut self, _multiaddr: Multiaddr) { + match &mut self.connection_status { + PeerConnectionStatus::Connected | + PeerConnectionStatus::Disconnected | + PeerConnectionStatus::Disconnecting | + PeerConnectionStatus::Dialing | + PeerConnectionStatus::Unknown => { + self.connection_status = PeerConnectionStatus::Connected; + self.connection_direction = Some(ConnectionDirection::Outgoing); + } + } + } +} + +#[derive(Debug, Clone)] +pub enum ConnectionDirection { + Incoming, + Outgoing, } #[derive(Debug, Clone, Default)] -pub enum ConnectionStatus { +pub enum PeerConnectionStatus { Connected, - #[default] Disconnected, + Disconnecting, + Dialing, + #[default] + Unknown, } diff --git a/crates/p2p/src/peer_manager/peerdb.rs b/crates/p2p/src/peer_manager/peerdb.rs index 65e6578c..dcfc9392 100644 --- a/crates/p2p/src/peer_manager/peerdb.rs +++ b/crates/p2p/src/peer_manager/peerdb.rs @@ -1,6 +1,8 @@ -use super::peer::peer_info::{ConnectionStatus, PeerInfo}; -use libp2p::PeerId; +use super::peer::peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo}; +use discv5::Enr; +use libp2p::{Multiaddr, PeerId}; use std::collections::HashMap; +use tracing::error; #[derive(Default)] pub struct PeerDB { @@ -16,19 +18,122 @@ impl PeerDB { Self { peers } } - pub fn new_connected(&mut self, peer_id: PeerId) { - self.peers - .entry(peer_id) - .and_modify(|info| info.connection_status = ConnectionStatus::Connected) - .or_insert(PeerInfo { - connection_status: ConnectionStatus::Connected, - _metadata: None, - }); + pub fn peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> { + self.peers.get(peer_id) + } + + pub fn peer_info_mut(&mut self, peer_id: &PeerId) -> Option<&mut PeerInfo> { + self.peers.get_mut(peer_id) + } + + pub fn connection_status(&self, peer_id: &PeerId) -> Option { + self.peer_info(peer_id).map(|info| info.connection_status().clone()) + } + + pub fn is_connected_or_disconnecting(&self, peer_id: &PeerId) -> bool { + matches!( + self.connection_status(peer_id), + Some(PeerConnectionStatus::Connected | PeerConnectionStatus::Disconnecting) + ) + } + + pub fn is_connected(&self, peer_id: &PeerId) -> bool { + matches!(self.connection_status(peer_id), Some(PeerConnectionStatus::Connected)) + } + + pub fn is_connected_or_dialing(&self, peer_id: &PeerId) -> bool { + matches!( + self.connection_status(peer_id), + Some(PeerConnectionStatus::Connected | PeerConnectionStatus::Dialing) + ) } - pub fn disconnect(&mut self, peer_id: PeerId) { + pub fn connected_or_dialing_peers(&self) -> impl Iterator { self.peers - .entry(peer_id) - .and_modify(|info| info.connection_status = ConnectionStatus::Disconnected); + .iter() + .filter(|(_, info)| info.is_connected() || info.is_dialing()) + .map(|(peer_id, _)| peer_id) + } + + pub fn should_dial(&self, peer_id: &PeerId) -> bool { + matches!( + self.connection_status(peer_id), + Some(PeerConnectionStatus::Disconnected | PeerConnectionStatus::Unknown) | None + ) } + + pub fn update_connection_state(&mut self, peer_id: &PeerId, new_state: NewConnectionState) { + let info = self.peers.entry(*peer_id).or_default(); + + match (info.connection_status().clone(), new_state) { + (_current_state, NewConnectionState::Connected { enr, direction, seen_address }) => { + info.set_enr(enr); + + match direction { + ConnectionDirection::Incoming => info.connect_ingoing(seen_address), + ConnectionDirection::Outgoing => info.connect_outgoing(seen_address), + } + } + (_old_state, NewConnectionState::Dialing { enr }) => { + info.set_enr(enr); + + if let Err(e) = info.set_dialing_peer() { + error!("Error dialing peer: {:?}", e); + } + } + (_old_state, NewConnectionState::Disconnected) => { + info.set_connection_status(PeerConnectionStatus::Disconnected) + } + (_old_state, NewConnectionState::Disconnecting) => { + info.set_connection_status(PeerConnectionStatus::Disconnecting) + } + } + } + + pub fn inject_disconnect(&mut self, peer_id: &PeerId) { + self.update_connection_state(peer_id, NewConnectionState::Disconnected) + } + + pub fn dialing_peer(&mut self, peer_id: &PeerId, enr: Option) { + self.update_connection_state(peer_id, NewConnectionState::Dialing { enr }) + } + + pub fn connect_ingoing(&mut self, peer_id: &PeerId, seen_address: Multiaddr, enr: Option) { + self.update_connection_state( + peer_id, + NewConnectionState::Connected { + enr, + direction: ConnectionDirection::Incoming, + seen_address, + }, + ) + } + + pub fn connect_outgoing( + &mut self, + peer_id: &PeerId, + seen_address: Multiaddr, + enr: Option, + ) { + self.update_connection_state( + peer_id, + NewConnectionState::Connected { + enr, + direction: ConnectionDirection::Outgoing, + seen_address, + }, + ) + } + + pub fn notify_disconnecting(&mut self, peer_id: &PeerId) { + self.update_connection_state(peer_id, NewConnectionState::Disconnecting) + } +} + +#[derive(Debug)] +pub enum NewConnectionState { + Connected { enr: Option, direction: ConnectionDirection, seen_address: Multiaddr }, + Disconnected, + Disconnecting, + Dialing { enr: Option }, } diff --git a/crates/p2p/src/rpc/methods.rs b/crates/p2p/src/rpc/methods.rs index d6491412..93ec850d 100644 --- a/crates/p2p/src/rpc/methods.rs +++ b/crates/p2p/src/rpc/methods.rs @@ -76,7 +76,7 @@ impl From for u64 { #[derive(ssz_rs_derive::Serializable, Clone, Debug, PartialEq, Default)] pub struct Ping { - data: u64, + pub data: u64, } impl Ping { diff --git a/crates/p2p/src/service/behaviour.rs b/crates/p2p/src/service/behaviour.rs index ac81e53d..d07f4460 100644 --- a/crates/p2p/src/service/behaviour.rs +++ b/crates/p2p/src/service/behaviour.rs @@ -13,7 +13,7 @@ pub type Gossipsub = gossipsub::Behaviour), /// Request-response protocol event @@ -24,33 +24,33 @@ pub enum Event { PeerManager(PeerManagerEvent), } -impl From for Event { +impl From for BehaviourEvent { fn from(value: gossipsub::Event) -> Self { - Event::GossipSub(Box::new(value)) + BehaviourEvent::GossipSub(Box::new(value)) } } -impl From for Event { +impl From for BehaviourEvent { fn from(value: rpc::RPCEvent) -> Self { - Event::RPC(value) + BehaviourEvent::RPC(value) } } -impl From for Event { +impl From for BehaviourEvent { fn from(value: discovery::DiscoveredPeers) -> Self { - Event::Discovery(value) + BehaviourEvent::Discovery(value) } } -impl From for Event { +impl From for BehaviourEvent { fn from(value: PeerManagerEvent) -> Self { - Event::PeerManager(value) + BehaviourEvent::PeerManager(value) } } /// The behaviour of the p2p network. #[derive(NetworkBehaviour)] -#[behaviour(to_swarm = "Event", event_process = false)] +#[behaviour(to_swarm = "BehaviourEvent", event_process = false)] pub struct Behaviour { /// Peer manager pub peer_manager: PeerManager, diff --git a/crates/p2p/src/service/mod.rs b/crates/p2p/src/service/mod.rs index 81d314ee..9e24ac68 100644 --- a/crates/p2p/src/service/mod.rs +++ b/crates/p2p/src/service/mod.rs @@ -15,12 +15,12 @@ use crate::{ }, peer_manager::{PeerManager, PeerManagerEvent}, rpc::{ - methods::{MetaData, Ping, RPCResponse, RequestId}, + methods::{MetaData, MetaDataRequest, Ping, RPCResponse, RequestId}, outbound::OutboundRequest, protocol::InboundRequest, RPCEvent, RPC, }, - service::utils::save_private_key_to_file, + service::{behaviour::BehaviourEvent, utils::save_private_key_to_file}, types::{ globals::NetworkGlobals, pubsub::{create_gossipsub, PubsubMessage}, @@ -34,22 +34,22 @@ use futures::channel::{ oneshot::Sender, }; use libp2p::{ - core::{transport::ListenerId, upgrade}, + core::upgrade, futures::StreamExt, gossipsub::{self, MessageId, PublishError, SubscriptionError, TopicHash}, identity::{secp256k1, Keypair}, noise, swarm::SwarmEvent, - Multiaddr, PeerId, Swarm, SwarmBuilder, TransportError, + Multiaddr, PeerId, Swarm, SwarmBuilder, }; use libp2p_mplex::{MaxBufferBehaviour, MplexConfig}; use silius_primitives::{ - constants::p2p::{MAX_IPFS_CID_LENGTH, MAX_SUPPORTED_MEMPOOLS}, + constants::p2p::{FIND_NODE_QUERY_CLOSEST_PEERS, MAX_IPFS_CID_LENGTH, MAX_SUPPORTED_MEMPOOLS}, UserOperation, VerifiedUserOperation, }; use ssz_rs::{Deserialize, List, Serialize, Vector}; use std::{ - env, io, + env, sync::Arc, task::{Context, Poll}, }; @@ -60,6 +60,12 @@ pub type MempoolChannels = #[derive(Debug)] pub enum NetworkEvent { + /// We successfully connected to a peer. + PeerConnectedOutgoing(PeerId), + /// A peer successfully connected to us. + PeerConnectedIncoming(PeerId), + /// A peer was disconnected. + PeerDisconnected(PeerId), /// A peer successfully connected with us. PeerConnected(PeerId), /// Gossipsub message from the network @@ -113,7 +119,7 @@ impl From for Swarm { } impl Network { - pub fn new(config: Config, mempool_channels: MempoolChannels) -> eyre::Result { + pub async fn new(config: Config, mempool_channels: MempoolChannels) -> eyre::Result { // Handle private key let key = if let Some(key) = load_private_key_from_file(&config.node_key_file) { key @@ -177,10 +183,18 @@ impl Network { )) }; - let gossipsub = create_gossipsub(canonical_mempools).map_err(|e| eyre::anyhow!(e))?; + let mut gossipsub = create_gossipsub(canonical_mempools).map_err(|e| eyre::anyhow!(e))?; + for bootnode in &config.bootnodes { + gossipsub.add_explicit_peer(&bootnode.peer_id()); + } + let rpc = RPC::new(); - let peer_manager = PeerManager::new(network_globals.clone(), config.target_peers); - let discovery = Discovery::new(enr, combined_key, config)?; + + let peer_manager = PeerManager::new(network_globals.clone()); + + let mut discovery = + Discovery::new(combined_key, config.clone(), network_globals.clone()).await?; + discovery.discover_peers(FIND_NODE_QUERY_CLOSEST_PEERS); let behaviour = Behaviour { peer_manager, rpc, discovery, gossipsub }; @@ -201,7 +215,34 @@ impl Network { .expect("building p2p behaviour failed") .build(); - Ok(Self { swarm, network_globals, mempool_channels }) + let mut network = Network { swarm, network_globals, mempool_channels }; + + network.start(&config).await?; + + Ok(network) + } + + async fn start(&mut self, config: &Config) -> eyre::Result<()> { + let listen_addrs = config.listen_addr.to_multi_addr(); + + for listen_addr in listen_addrs { + let _ = self.swarm.listen_on(listen_addr); + } + + for bootnode_enr in &config.bootnodes { + for multiaddr in &bootnode_enr.multiaddr() { + if !self + .network_globals + .peers + .read() + .is_connected_or_dialing(&bootnode_enr.peer_id()) + { + let _ = self.swarm.dial(multiaddr.clone()); + } + } + } + + Ok(()) } pub fn metadata(&self) -> MetaData { @@ -254,53 +295,74 @@ impl Network { } /// handle reqrep event - fn handle_rpc_event(&self, event: RPCEvent) -> Option { + fn handle_rpc_event(&mut self, event: RPCEvent) -> Option { match event { - RPCEvent::Request { peer_id, request, sender, .. } => { - match request { - InboundRequest::Ping(_ping) => { - // TODO: need to update metadata of peer (based on ping value) - let response = RPCResponse::Pong(Ping::new(self.metadata().seq_number)); - sender.send(response).expect("channel should exist"); - None - } - InboundRequest::MetaData(_) => { - let response = RPCResponse::MetaData(self.metadata()); - sender.send(response).expect("channel should exist"); - None - } - _ => Some(NetworkEvent::RequestMessage { peer_id, request, sender }), + RPCEvent::Request { peer_id, request, sender, .. } => match request { + InboundRequest::Ping(ping) => { + self.swarm.behaviour_mut().peer_manager.ping_request(&peer_id, ping.data); + sender + .send(RPCResponse::Pong(Ping::new(self.metadata().seq_number))) + .expect("channel should exist"); + None } - } - RPCEvent::Response { peer_id, response, .. } => { - Some(NetworkEvent::ResponseMessage { peer_id, response }) - } + InboundRequest::MetaData(_) => { + sender + .send(RPCResponse::MetaData(self.metadata())) + .expect("channel should exist"); + None + } + InboundRequest::Goodbye(_) => None, + _ => Some(NetworkEvent::RequestMessage { peer_id, request, sender }), + }, + RPCEvent::Response { peer_id, response, .. } => match response { + RPCResponse::Pong(ping) => { + self.swarm.behaviour_mut().peer_manager.pong_response(&peer_id, ping.data); + None + } + RPCResponse::MetaData(metadata) => { + self.swarm.behaviour_mut().peer_manager.metadata_response(&peer_id, metadata); + None + } + _ => Some(NetworkEvent::ResponseMessage { peer_id, response }), + }, _ => None, } } - // TODO: discovery peer connect - fn handle_discovery_event(&self, _event: DiscoveredPeers) -> Option { + // handle discovery event + fn handle_discovery_event(&mut self, event: DiscoveredPeers) -> Option { + self.swarm.behaviour_mut().peer_manager.peers_discovered(event.peers); None } /// handle peer manager event fn handler_peer_manager_event(&mut self, event: PeerManagerEvent) -> Option { match event { - PeerManagerEvent::Ping(peer) => { + PeerManagerEvent::PeerConnectedIncoming(peer_id) => { + Some(NetworkEvent::PeerConnectedIncoming(peer_id)) + } + PeerManagerEvent::PeerConnectedOutgoing(peer_id) => { + Some(NetworkEvent::PeerConnectedOutgoing(peer_id)) + } + PeerManagerEvent::PeerDisconnected(peer_id) => { + Some(NetworkEvent::PeerDisconnected(peer_id)) + } + PeerManagerEvent::DiscoverPeers(peers_to_find) => { + self.swarm.behaviour_mut().discovery.discover_peers(peers_to_find); + None + } + PeerManagerEvent::Ping(peer_id) => { self.send_request( - &peer, + &peer_id, OutboundRequest::Ping(Ping::new(self.metadata().seq_number)), ); None } - PeerManagerEvent::PeerConnectedIncoming(peer) | - PeerManagerEvent::PeerConnectedOutgoing(peer) => { - self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer); - Some(NetworkEvent::PeerConnected(peer)) + PeerManagerEvent::MetaData(peer_id) => { + self.send_request(&peer_id, OutboundRequest::MetaData(MetaDataRequest)); + None } - PeerManagerEvent::PeerDisconnected(_) => None, - PeerManagerEvent::DiscoverPeers(_) => None, + _ => None, } } @@ -325,7 +387,10 @@ impl Network { Err(err) => match err { PublishError::InsufficientPeers => { warn!("Currently no peers to publish message"); - self.swarm.behaviour_mut().discovery.discover_peers(16usize); + self.swarm + .behaviour_mut() + .discovery + .discover_peers(FIND_NODE_QUERY_CLOSEST_PEERS); } e => error!("Error in publish message {e:?}"), }, @@ -337,12 +402,11 @@ impl Network { info!("Swarm event {swarm_event:?}"); let event_opt = match swarm_event { SwarmEvent::Behaviour(event) => match event { - behaviour::Event::GossipSub(event) => self.handle_gossipsub_event(event), - behaviour::Event::RPC(event) => self.handle_rpc_event(event), - behaviour::Event::Discovery(event) => self.handle_discovery_event(event), - behaviour::Event::PeerManager(event) => self.handler_peer_manager_event(event), + BehaviourEvent::GossipSub(event) => self.handle_gossipsub_event(event), + BehaviourEvent::RPC(event) => self.handle_rpc_event(event), + BehaviourEvent::Discovery(event) => self.handle_discovery_event(event), + BehaviourEvent::PeerManager(event) => self.handler_peer_manager_event(event), }, - SwarmEvent::NewListenAddr { address, .. } => { Some(NetworkEvent::NewListenAddr(address)) } @@ -361,10 +425,6 @@ impl Network { Poll::Pending } - pub fn listen_on(&mut self, addr: Multiaddr) -> Result> { - self.swarm.listen_on(addr) - } - pub async fn next_event(&mut self) -> NetworkEvent { futures::future::poll_fn(|cx| self.poll_network(cx)).await } @@ -384,21 +444,6 @@ impl Network { self.swarm.behaviour_mut().gossipsub.publish(topic_hash, buf) } - /// Dial a peer. - pub fn dial(&mut self, enr: Enr) -> eyre::Result<()> { - let addrs = enr.multiaddr(); - for addr in addrs { - self.swarm.dial(addr)?; - } - self.swarm - .behaviour_mut() - .discovery - .discovery - .add_enr(enr) - .map_err(|e| eyre::eyre!(e.to_string()))?; - Ok(()) - } - /// Subscribe to a topic. pub fn subscribe(&mut self, mempool_id: &str) -> Result { self.swarm.behaviour_mut().gossipsub.subscribe(&topic(mempool_id)) diff --git a/crates/p2p/src/types/globals.rs b/crates/p2p/src/types/globals.rs index 136723ef..17f93614 100644 --- a/crates/p2p/src/types/globals.rs +++ b/crates/p2p/src/types/globals.rs @@ -58,4 +58,8 @@ impl NetworkGlobals { pub fn chain_spec(&self) -> ChainSpec { self.chain_spec.read().clone() } + + pub fn connected_or_dialing_peers(&self) -> usize { + self.peers.read().connected_or_dialing_peers().count() + } } diff --git a/crates/p2p/tests/common.rs b/crates/p2p/tests/common.rs index ae41ac4c..6e7bdf40 100644 --- a/crates/p2p/tests/common.rs +++ b/crates/p2p/tests/common.rs @@ -1,3 +1,4 @@ +use discv5::Enr; use futures::channel::mpsc::unbounded; use silius_p2p::{ config::{gossipsub_config, Config}, @@ -29,7 +30,7 @@ pub fn get_available_port() -> Option { Some(unused_port) } -fn build_p2p_instance() -> eyre::Result { +async fn build_p2p_instance(bootnode: Option) -> eyre::Result { let dir = TempDir::new("test-silius-p2p").unwrap(); let node_key_file = dir.path().join("node_key"); let node_enr_file = dir.path().join("node_enr"); @@ -57,25 +58,20 @@ fn build_p2p_instance() -> eyre::Result { discv5_config: discv5::ConfigBuilder::new(listen_addr.to_listen_config()).build(), chain_spec: chain_spec.clone(), target_peers: TARGET_PEERS, - bootnodes: vec![], + bootnodes: if let Some(bootnode) = bootnode { vec![bootnode] } else { vec![] }, }; let (_, rv) = unbounded(); let (sd, _) = unbounded(); - let mut network = Network::new(config, vec![(Default::default(), rv, sd)])?; - - for listen_addr in listen_addr.to_multi_addr() { - println!("listen on {listen_addr:?}"); - network.listen_on(listen_addr)?; - } + let network = Network::new(config, vec![(Default::default(), rv, sd)]).await?; Ok(network) } pub async fn build_connnected_p2p_pair() -> eyre::Result<(Network, Network)> { - let mut peer1 = build_p2p_instance()?; - let mut peer2 = build_p2p_instance()?; + let mut peer1 = build_p2p_instance(None).await?; + let mut peer2 = build_p2p_instance(Some(peer1.local_enr())).await?; // let the two nodes set up listeners let peer1_fut = async { @@ -97,13 +93,9 @@ pub async fn build_connnected_p2p_pair() -> eyre::Result<(Network, Network)> { // wait for either both nodes to listen or a timeout tokio::select! { - _ = tokio::time::sleep(Duration::from_millis(500)) => {} + _ = tokio::time::sleep(Duration::from_millis(500)) => {} _ = joined => {} } - let peer2_enr = peer2.local_enr(); - println!("peer1 dial peer2"); - peer1.dial(peer2_enr)?; - Ok((peer1, peer2)) } diff --git a/crates/p2p/tests/rpc.rs b/crates/p2p/tests/rpc.rs index e4526901..bbc09664 100644 --- a/crates/p2p/tests/rpc.rs +++ b/crates/p2p/tests/rpc.rs @@ -19,8 +19,9 @@ async fn rpc_case(request_case: OutboundRequest, response_case: RPCResponse) -> let sender_fut = async { loop { - match peer1.next_event().await { - NetworkEvent::PeerConnected(_) => { + let event = peer1.next_event().await; + match event { + NetworkEvent::PeerConnectedIncoming(_) => { println!("Send request"); peer1.send_request(&peer2_id, request_case.clone()); } @@ -46,7 +47,8 @@ async fn rpc_case(request_case: OutboundRequest, response_case: RPCResponse) -> let receiver_fut = async { loop { - match peer2.next_event().await { + let event = peer2.next_event().await; + match event { NetworkEvent::RequestMessage { peer_id, request, sender } => { println!("Received request"); assert_eq!(request, request_case.clone()); @@ -81,33 +83,6 @@ async fn rpc_status() -> eyre::Result<()> { Ok(()) } -#[tokio::test] -async fn rpc_goodbye() -> eyre::Result<()> { - rpc_case( - OutboundRequest::Goodbye(Default::default()), - RPCResponse::Goodbye(Default::default()), - ) - .await?; - Ok(()) -} - -#[tokio::test] -async fn rpc_ping_pong() -> eyre::Result<()> { - rpc_case(OutboundRequest::Ping(Default::default()), RPCResponse::Pong(Default::default())) - .await?; - Ok(()) -} - -#[tokio::test] -async fn rpc_metadata() -> eyre::Result<()> { - rpc_case( - OutboundRequest::MetaData(Default::default()), - RPCResponse::MetaData(Default::default()), - ) - .await?; - Ok(()) -} - #[tokio::test] async fn rpc_pooled_userops() -> eyre::Result<()> { rpc_case( diff --git a/crates/primitives/src/constants.rs b/crates/primitives/src/constants.rs index f56ed64d..20930c02 100644 --- a/crates/primitives/src/constants.rs +++ b/crates/primitives/src/constants.rs @@ -144,10 +144,16 @@ pub mod p2p { pub const NODE_KEY_FILE_NAME: &str = "p2p/node-key"; /// The default path for storing the node enr pub const NODE_ENR_FILE_NAME: &str = "p2p/node-enr"; + /// Default number of peers to find on the initial discovery. + pub const FIND_NODE_QUERY_CLOSEST_PEERS: usize = 16; /// Default target peers. - pub const TARGET_PEERS: usize = 10; - /// Default ping interval. - pub const PING_INTERVAL: u64 = 10; + pub const TARGET_PEERS: usize = 50; + /// Default heartbeat interval (how often we perform discovery and peer management). + pub const HEARTBEAT_INTERVAL: u64 = 30; + /// Default outbound ping interval. + pub const PING_INTERVAL_OUTBOUND: u64 = 15; + /// Default inbound ping interval. + pub const PING_INTERVAL_INBOUND: u64 = 20; /// Request message size maximum pub const REQUEST_SIZE_MAXIMUM: u64 = 1024 * 1024; // bytes /// Response message size maximum diff --git a/docs/P2P.md b/docs/P2P.md index 5657013e..6c567873 100644 --- a/docs/P2P.md +++ b/docs/P2P.md @@ -11,13 +11,13 @@ cargo run --release -- node --eth-client-address http://127.0.0.1:8545 --mnemoni ### Run peer node ```bash -cargo run --release -- node --eth-client-address http://127.0.0.1:8545 --mnemonic-file ./bundler-spec-tests/keys/0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266 --beneficiary 0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266 --entry-points 0x5FF137D4b0FDCD49DcA30c7CF57E578a026d2789 --http --http.port 4000 --eth-client-proxy-address http://127.0.0.1:8545 --p2p.baddr 127.0.0.1 --bootnodes "enr:-J24QFyIGX9IG6_4WO6F40-BXH0b4gChUm3zTOkYNoYBOWX5LTq7ubqm5oaFjwcg5r1YesmllbqNvKAapeM2JK8fkKoBiGNoYWluX2lkiDkFAAAAAAAAgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQMm_tiGzC78d2_BvxJAUX9hRzBQv9QUmgU4OB4Pv1eVE4N0Y3CCIyiDdWRwgiMo" --enable-p2p --discovery.port 4338 --p2p.port 4338 --datadir ./.local/node1 +cargo run --release -- node --eth-client-address http://127.0.0.1:8545 --mnemonic-file ./bundler-spec-tests/keys/0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266 --beneficiary 0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266 --entry-points 0x5FF137D4b0FDCD49DcA30c7CF57E578a026d2789 --http --http.port 4000 --eth-client-proxy-address http://127.0.0.1:8545 --p2p.baddr 127.0.0.1 --bootnodes "enr:-J24QMMKCYqEBAs659G2f4MtvjI8wp3dbAvrvRbTxIEaapZfb9Pi0La0QOs6HoGfVeGk8fsFvZF7WiM_arx43rxSHwQBiGNoYWluX2lkiDkFAAAAAAAAgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQLigwYFOcf1lit2x918h4_6upE1lZ1kK3tD029ZZioW0IN0Y3CCIyiDdWRwgiMo" --enable-p2p --discovery.port 4338 --p2p.port 4338 --datadir ./.local/node1 ``` ### Run Silius bundler with env seed set (used for generation of P2P peer keys) ```bash -P2P_SEED=1 cargo run --release -- node --eth-client-address http://127.0.0.1:8545 --mnemonic-file ./bundler-spec-tests/keys/0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266 --beneficiary 0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266 --entry-points 0x5FF137D4b0FDCD49DcA30c7CF57E578a026d2789 --http --http.port 4000 --eth-client-proxy-address http://127.0.0.1:8545 --p2p.baddr 127.0.0.1 --bootnodes "enr:-J24QFyIGX9IG6_4WO6F40-BXH0b4gChUm3zTOkYNoYBOWX5LTq7ubqm5oaFjwcg5r1YesmllbqNvKAapeM2JK8fkKoBiGNoYWluX2lkiDkFAAAAAAAAgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQMm_tiGzC78d2_BvxJAUX9hRzBQv9QUmgU4OB4Pv1eVE4N0Y3CCIyiDdWRwgiMo" --enable-p2p --discovery.port 4338 --p2p.port 4338 --datadir ./.local/node1 +P2P_SEED=1 cargo run --release -- node --eth-client-address http://127.0.0.1:8545 --mnemonic-file ./bundler-spec-tests/keys/0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266 --beneficiary 0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266 --entry-points 0x5FF137D4b0FDCD49DcA30c7CF57E578a026d2789 --http --http.port 4000 --eth-client-proxy-address http://127.0.0.1:8545 --p2p.baddr 127.0.0.1 --bootnodes "enr:-J24QMMKCYqEBAs659G2f4MtvjI8wp3dbAvrvRbTxIEaapZfb9Pi0La0QOs6HoGfVeGk8fsFvZF7WiM_arx43rxSHwQBiGNoYWluX2lkiDkFAAAAAAAAgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQLigwYFOcf1lit2x918h4_6upE1lZ1kK3tD029ZZioW0IN0Y3CCIyiDdWRwgiMo" --enable-p2p --discovery.port 4338 --p2p.port 4338 --datadir ./.local/node1 ``` ### Run cluster of Silius bundlers @@ -31,11 +31,11 @@ cargo run --release -- node --eth-client-address http://127.0.0.1:8545 --mnemoni Run first peer node ```bash -cargo run --release -- node --eth-client-address http://127.0.0.1:8545 --mnemonic-file ./bundler-spec-tests/keys/0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266 --beneficiary 0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266 --entry-points 0x5FF137D4b0FDCD49DcA30c7CF57E578a026d2789 --uopool.port 3004 --bundler.port 3005 --http --http.port 4001 --eth-client-proxy-address http://127.0.0.1:8545 --p2p.baddr 127.0.0.1 --bootnodes "enr:-J24QFyIGX9IG6_4WO6F40-BXH0b4gChUm3zTOkYNoYBOWX5LTq7ubqm5oaFjwcg5r1YesmllbqNvKAapeM2JK8fkKoBiGNoYWluX2lkiDkFAAAAAAAAgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQMm_tiGzC78d2_BvxJAUX9hRzBQv9QUmgU4OB4Pv1eVE4N0Y3CCIyiDdWRwgiMo" --enable-p2p --discovery.port 4338 --p2p.port 4338 --datadir ./.local/node1 +cargo run --release -- node --eth-client-address http://127.0.0.1:8545 --mnemonic-file ./bundler-spec-tests/keys/0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266 --beneficiary 0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266 --entry-points 0x5FF137D4b0FDCD49DcA30c7CF57E578a026d2789 --uopool.port 3004 --bundler.port 3005 --http --http.port 4001 --eth-client-proxy-address http://127.0.0.1:8545 --p2p.baddr 127.0.0.1 --bootnodes "enr:-J24QMMKCYqEBAs659G2f4MtvjI8wp3dbAvrvRbTxIEaapZfb9Pi0La0QOs6HoGfVeGk8fsFvZF7WiM_arx43rxSHwQBiGNoYWluX2lkiDkFAAAAAAAAgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQLigwYFOcf1lit2x918h4_6upE1lZ1kK3tD029ZZioW0IN0Y3CCIyiDdWRwgiMo" --enable-p2p --discovery.port 4338 --p2p.port 4338 --datadir ./.local/node1 ``` Run second peer node ```bash -cargo run --release -- node --eth-client-address http://127.0.0.1:8545 --mnemonic-file ./bundler-spec-tests/keys/0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266 --beneficiary 0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266 --entry-points 0x5FF137D4b0FDCD49DcA30c7CF57E578a026d2789 --uopool.port 3006 --bundler.port 3007 --http --http.port 4002 --eth-client-proxy-address http://127.0.0.1:8545 --p2p.baddr 127.0.0.1 --bootnodes "enr:-J24QFyIGX9IG6_4WO6F40-BXH0b4gChUm3zTOkYNoYBOWX5LTq7ubqm5oaFjwcg5r1YesmllbqNvKAapeM2JK8fkKoBiGNoYWluX2lkiDkFAAAAAAAAgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQMm_tiGzC78d2_BvxJAUX9hRzBQv9QUmgU4OB4Pv1eVE4N0Y3CCIyiDdWRwgiMo" --enable-p2p --discovery.port 4339 --p2p.port 4339 --datadir ./.local/node2 +cargo run --release -- node --eth-client-address http://127.0.0.1:8545 --mnemonic-file ./bundler-spec-tests/keys/0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266 --beneficiary 0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266 --entry-points 0x5FF137D4b0FDCD49DcA30c7CF57E578a026d2789 --uopool.port 3006 --bundler.port 3007 --http --http.port 4002 --eth-client-proxy-address http://127.0.0.1:8545 --p2p.baddr 127.0.0.1 --bootnodes "enr:-J24QMMKCYqEBAs659G2f4MtvjI8wp3dbAvrvRbTxIEaapZfb9Pi0La0QOs6HoGfVeGk8fsFvZF7WiM_arx43rxSHwQBiGNoYWluX2lkiDkFAAAAAAAAgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQLigwYFOcf1lit2x918h4_6upE1lZ1kK3tD029ZZioW0IN0Y3CCIyiDdWRwgiMo" --enable-p2p --discovery.port 4339 --p2p.port 4339 --datadir ./.local/node2 ```