diff --git a/Cargo.lock b/Cargo.lock index ff9a7d541b4..bd94eeba5c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2117,6 +2117,8 @@ dependencies = [ "fuel-block-producer", "fuel-core-bft", "fuel-core-interfaces", + "fuel-crypto", + "fuel-p2p", "fuel-relayer", "fuel-sync", "fuel-txpool", @@ -2236,6 +2238,7 @@ dependencies = [ name = "fuel-p2p" version = "0.10.1" dependencies = [ + "anyhow", "async-trait", "bincode", "ctor", diff --git a/fuel-core-bft/src/service.rs b/fuel-core-bft/src/service.rs index bb94eecaed2..7dd488759ac 100644 --- a/fuel-core-bft/src/service.rs +++ b/fuel-core-bft/src/service.rs @@ -3,6 +3,7 @@ use fuel_core_interfaces::{ bft::BftMpsc, block_importer::{ImportBlockBroadcast, ImportBlockMpsc}, block_producer::BlockProducerMpsc, + p2p::P2pRequestEvent, relayer, }; use parking_lot::Mutex; @@ -28,7 +29,7 @@ impl Service { pub async fn start( &self, _relayer: relayer::Sender, - _p2p_consensus: (), + _p2p_consensus: mpsc::Sender, _block_producer: mpsc::Sender, _block_importer_sender: mpsc::Sender, _block_importer_broadcast: broadcast::Receiver, diff --git a/fuel-core-interfaces/src/p2p.rs b/fuel-core-interfaces/src/p2p.rs index 3d6af9cb73b..6f45a865d8c 100644 --- a/fuel-core-interfaces/src/p2p.rs +++ b/fuel-core-interfaces/src/p2p.rs @@ -1,5 +1,6 @@ use super::model::{BlockHeight, FuelBlock, SealedFuelBlock}; use crate::model::ConsensusVote; +use async_trait::async_trait; use fuel_tx::Transaction; use std::sync::Arc; use tokio::sync::oneshot; @@ -31,4 +32,10 @@ pub enum P2pRequestEvent { BroadcastConsensusVote { vote: Arc, }, + Stop, +} + +#[async_trait] +pub trait P2pDb: Send + Sync { + async fn get_sealed_block(&self, height: BlockHeight) -> Option>; } diff --git a/fuel-core/Cargo.toml b/fuel-core/Cargo.toml index ba22e114494..1e5d24324cf 100644 --- a/fuel-core/Cargo.toml +++ b/fuel-core/Cargo.toml @@ -40,6 +40,8 @@ fuel-core-bft = { path = "../fuel-core-bft", version = "0.10.1" } fuel-core-interfaces = { path = "../fuel-core-interfaces", version = "0.10.1", features = [ "serde", ] } +fuel-crypto = { version = "0.6", default-features = false, features = [ "random" ] } +fuel-p2p = { path = "../fuel-p2p", version = "0.10.1", optional = true } fuel-relayer = { path = "../fuel-relayer", version = "0.10.1" } fuel-sync = { path = "../fuel-sync", version = "0.10.1" } fuel-txpool = { path = "../fuel-txpool", version = "0.10.1" } @@ -87,3 +89,4 @@ prometheus = ["dep:prometheus"] default = ["rocksdb", "prometheus", "debug"] debug = ["fuel-core-interfaces/debug"] test-helpers = [] +p2p = ["dep:fuel-p2p"] diff --git a/fuel-core/src/cli.rs b/fuel-core/src/cli.rs index dfa60df1477..b99b814bed4 100644 --- a/fuel-core/src/cli.rs +++ b/fuel-core/src/cli.rs @@ -23,6 +23,7 @@ pub struct Opt { command: Fuel, } +#[allow(clippy::large_enum_variant)] #[derive(Debug, Parser)] pub enum Fuel { Run(run::Command), diff --git a/fuel-core/src/cli/run.rs b/fuel-core/src/cli/run.rs index de8caed59d3..ef6345cfc4c 100644 --- a/fuel-core/src/cli/run.rs +++ b/fuel-core/src/cli/run.rs @@ -5,7 +5,8 @@ use fuel_core::service::{Config, DbType, VMConfig}; use std::{env, io, net, path::PathBuf}; use strum::VariantNames; use tracing::{info, trace}; - +#[cfg(feature = "p2p")] +mod p2p; mod relayer; #[derive(Debug, Clone, Parser)] @@ -55,6 +56,10 @@ pub struct Command { #[clap(flatten)] pub relayer_args: relayer::RelayerArgs, + + #[cfg(feature = "p2p")] + #[clap(flatten)] + pub p2p_args: p2p::P2pArgs, } impl Command { @@ -71,9 +76,20 @@ impl Command { min_gas_price, predicates, relayer_args, + #[cfg(feature = "p2p")] + p2p_args, } = self; let addr = net::SocketAddr::new(ip, port); + + #[cfg(feature = "p2p")] + let p2p = { + match p2p_args.into() { + Ok(value) => value, + Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e)), + } + }; + Ok(Config { addr, database_path, @@ -95,6 +111,8 @@ impl Command { relayer: relayer_args.into(), bft: Default::default(), sync: Default::default(), + #[cfg(feature = "p2p")] + p2p, }) } } diff --git a/fuel-core/src/cli/run/p2p.rs b/fuel-core/src/cli/run/p2p.rs new file mode 100644 index 00000000000..0360467ace3 --- /dev/null +++ b/fuel-core/src/cli/run/p2p.rs @@ -0,0 +1,141 @@ +use std::{ + net::{IpAddr, Ipv4Addr}, + path::PathBuf, + time::Duration, +}; + +use clap::Args; + +use fuel_p2p::{config::P2PConfig, Multiaddr}; + +#[derive(Debug, Clone, Args)] +pub struct P2pArgs { + /// Path to the location of DER-encoded Secp256k1 Keypair + #[clap(long = "keypair")] + pub keypair: Option, + + /// The name of the p2p Network + /// If this value is not provided the p2p network won't start + #[clap(long = "network", default_value = "")] + pub network: String, + + /// p2p network's IP Address + #[clap(long = "address")] + pub address: Option, + + /// p2p network's TCP Port + #[clap(long = "peering-port", default_value = "4001")] + pub peering_port: u16, + + /// Max Block size + #[clap(long = "max_block_size", default_value = "100000")] + pub max_block_size: usize, + + /// Addresses of the bootstrap nodes + /// They should contain PeerId at the end of the specified Multiaddr + #[clap(long = "bootstrap_nodes")] + pub bootstrap_nodes: Vec, + + /// Allow nodes to be discoverable on the local network + #[clap(long = "enable_mdns")] + pub enable_mdns: bool, + + /// Maximum amount of allowed connected peers + #[clap(long = "max_peers_connected", default_value = "50")] + pub max_peers_connected: usize, + + /// Enable random walk for p2p node discovery + #[clap(long = "enable_random_walk")] + pub enable_random_walk: bool, + + /// Choose to include private IPv4/IPv6 addresses as discoverable + /// except for the ones stored in `bootstrap_nodes` + #[clap(long = "allow_private_addresses")] + pub allow_private_addresses: bool, + + /// Choose how long will connection keep alive if idle + #[clap(long = "connection_idle_timeout", default_value = "120")] + pub connection_idle_timeout: u64, + + /// Choose how often to recieve PeerInfo from other nodes + #[clap(long = "info_interval", default_value = "3")] + pub info_interval: u64, + + /// Choose the interval at which identification requests are sent to + /// the remote on established connections after the first request + #[clap(long = "identify_interval", default_value = "5")] + pub identify_interval: u64, + + /// Choose which topics to subscribe to via gossipsub protocol + #[clap(long = "topics", default_values = &["new_tx", "new_block", "consensus_vote"])] + pub topics: Vec, + + /// Choose max mesh size for gossipsub protocol + #[clap(long = "max_mesh_size", default_value = "12")] + pub max_mesh_size: usize, + + /// Choose min mesh size for gossipsub protocol + #[clap(long = "min_mesh_size", default_value = "4")] + pub min_mesh_size: usize, + + /// Choose ideal mesh size for gossipsub protocol + #[clap(long = "ideal_mesh_size", default_value = "6")] + pub ideal_mesh_size: usize, + + /// Choose timeout for sent requests in RequestResponse protocol + #[clap(long = "request_timeout", default_value = "20")] + pub request_timeout: u64, + + /// Choose how long RequestResponse protocol connections will live if idle + #[clap(long = "connection_keep_alive", default_value = "20")] + pub connection_keep_alive: u64, +} + +impl From for anyhow::Result { + fn from(args: P2pArgs) -> Self { + let local_keypair = { + match args.keypair { + Some(path) => { + let phrase = std::fs::read_to_string(path)?; + + let secret_key = fuel_crypto::SecretKey::new_from_mnemonic_phrase_with_path( + &phrase, + "m/44'/60'/0'/0/0", + )?; + + fuel_p2p::config::convert_to_libp2p_keypair(&mut secret_key.to_vec())? + } + _ => { + let mut rand = fuel_crypto::rand::thread_rng(); + let secret_key = fuel_crypto::SecretKey::random(&mut rand); + + fuel_p2p::config::convert_to_libp2p_keypair(&mut secret_key.to_vec())? + } + } + }; + + Ok(P2PConfig { + local_keypair, + network_name: args.network, + address: args + .address + .unwrap_or_else(|| IpAddr::V4(Ipv4Addr::from([0, 0, 0, 0]))), + tcp_port: args.peering_port, + max_block_size: args.max_block_size, + bootstrap_nodes: args.bootstrap_nodes, + enable_mdns: args.enable_mdns, + max_peers_connected: args.max_peers_connected, + allow_private_addresses: args.allow_private_addresses, + enable_random_walk: args.enable_random_walk, + connection_idle_timeout: Some(Duration::from_secs(args.connection_idle_timeout)), + topics: args.topics, + max_mesh_size: args.max_mesh_size, + min_mesh_size: args.min_mesh_size, + ideal_mesh_size: args.ideal_mesh_size, + set_request_timeout: Duration::from_secs(args.request_timeout), + set_connection_keep_alive: Duration::from_secs(args.connection_keep_alive), + info_interval: Some(Duration::from_secs(args.info_interval)), + identify_interval: Some(Duration::from_secs(args.identify_interval)), + }) + } +} diff --git a/fuel-core/src/database.rs b/fuel-core/src/database.rs index 6d6eb83ad33..faefd99f090 100644 --- a/fuel-core/src/database.rs +++ b/fuel-core/src/database.rs @@ -18,6 +18,7 @@ use fuel_core_interfaces::{ model::{ BlockHeight, ConsensusId, DaBlockHeight, SealedFuelBlock, ValidatorId, ValidatorStake, }, + p2p::P2pDb, relayer::{RelayerDb, StakingDiff}, txpool::TxPoolDb, }; @@ -131,6 +132,12 @@ unsafe impl Send for Database {} unsafe impl Sync for Database {} impl TxPoolDb for Database {} +#[async_trait] +impl P2pDb for Database { + async fn get_sealed_block(&self, height: BlockHeight) -> Option> { + ::get_sealed_block(self, height).await + } +} impl Database { #[cfg(feature = "rocksdb")] diff --git a/fuel-core/src/service/config.rs b/fuel-core/src/service/config.rs index 4a8acf0c366..d01abe334ff 100644 --- a/fuel-core/src/service/config.rs +++ b/fuel-core/src/service/config.rs @@ -5,6 +5,9 @@ use std::{ }; use strum_macros::{Display, EnumString, EnumVariantNames}; +#[cfg(feature = "p2p")] +use fuel_p2p; + #[derive(Clone, Debug)] pub struct Config { pub addr: SocketAddr, @@ -24,6 +27,8 @@ pub struct Config { pub bft: fuel_core_bft::Config, pub sync: fuel_sync::Config, pub relayer: fuel_relayer::Config, + #[cfg(feature = "p2p")] + pub p2p: fuel_p2p::config::P2PConfig, } impl Config { @@ -44,6 +49,8 @@ impl Config { bft: Default::default(), sync: Default::default(), relayer: Default::default(), + #[cfg(feature = "p2p")] + p2p: fuel_p2p::config::P2PConfig::default_with_network("test_network"), } } } diff --git a/fuel-core/src/service/modules.rs b/fuel-core/src/service/modules.rs index 7ba27053ac0..e0bacfb201e 100644 --- a/fuel-core/src/service/modules.rs +++ b/fuel-core/src/service/modules.rs @@ -2,10 +2,13 @@ use crate::database::Database; use crate::service::Config; use anyhow::Result; +#[cfg(feature = "p2p")] +use fuel_core_interfaces::p2p::P2pDb; use fuel_core_interfaces::relayer::RelayerDb; use fuel_core_interfaces::txpool::TxPoolDb; use futures::future::join_all; use std::sync::Arc; +use tokio::sync::mpsc; use tokio::task::JoinHandle; pub struct Modules { @@ -15,6 +18,8 @@ pub struct Modules { pub bft: Arc, pub sync: Arc, pub relayer: Arc, + #[cfg(feature = "p2p")] + pub network_service: Arc, } impl Modules { @@ -25,6 +30,8 @@ impl Modules { self.block_producer.stop().await, self.bft.stop().await, self.sync.stop().await, + #[cfg(feature = "p2p")] + self.network_service.stop().await, ] .into_iter() .flatten() @@ -61,16 +68,22 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result) .import_block_event(block_importer.subscribe()); - let p2p_mpsc = (); - let p2p_broadcast_consensus = (); - let p2p_broadcast_block = (); + #[cfg(feature = "p2p")] + let (tx_request_event, rx_request_event) = mpsc::channel(100); + #[cfg(feature = "p2p")] + let (tx_block, rx_block) = mpsc::channel(100); + + #[cfg(not(feature = "p2p"))] + let (tx_request_event, _) = mpsc::channel(100); + #[cfg(not(feature = "p2p"))] + let (_, rx_block) = mpsc::channel(100); block_importer.start().await; block_producer.start(txpool_builder.sender().clone()).await; bft.start( relayer_builder.sender().clone(), - p2p_broadcast_consensus, + tx_request_event.clone(), block_producer.sender().clone(), block_importer.sender().clone(), block_importer.subscribe(), @@ -78,8 +91,8 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result Result = Arc::new(database.clone()); + #[cfg(feature = "p2p")] + let (tx_consensus, _) = mpsc::channel(100); + #[cfg(feature = "p2p")] + let (tx_transaction, _) = mpsc::channel(100); + + #[cfg(feature = "p2p")] + let network_service = fuel_p2p::orchestrator::Service::new( + config.p2p.clone(), + p2p_db, + tx_request_event, + rx_request_event, + tx_consensus, + tx_transaction, + tx_block, + ); + + #[cfg(feature = "p2p")] + if !config.p2p.network_name.is_empty() { + network_service.start().await?; + } + Ok(Modules { txpool: Arc::new(txpool), block_importer: Arc::new(block_importer), @@ -103,5 +139,7 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result { } impl FuelBehaviour { - pub fn new(local_keypair: Keypair, p2p_config: &P2PConfig, codec: Codec) -> Self { - let local_public_key = local_keypair.public(); + pub fn new(p2p_config: &P2PConfig, codec: Codec) -> Self { + let local_public_key = p2p_config.local_keypair.public(); let local_peer_id = PeerId::from_public_key(&local_public_key); let discovery_config = { @@ -132,13 +131,8 @@ impl FuelBehaviour { std::iter::once((codec.get_req_res_protocol(), ProtocolSupport::Full)); let mut req_res_config = RequestResponseConfig::default(); - req_res_config - .set_request_timeout(p2p_config.set_request_timeout.unwrap_or(REQ_RES_TIMEOUT)); - req_res_config.set_connection_keep_alive( - p2p_config - .set_connection_keep_alive - .unwrap_or(REQ_RES_TIMEOUT), - ); + req_res_config.set_request_timeout(p2p_config.set_request_timeout); + req_res_config.set_connection_keep_alive(p2p_config.set_connection_keep_alive); let request_response = RequestResponse::new(codec.clone(), req_res_protocol, req_res_config); @@ -148,7 +142,7 @@ impl FuelBehaviour { Self { discovery: discovery_config.finish(), - gossipsub: gossipsub::build_gossipsub(&local_keypair, p2p_config), + gossipsub: gossipsub::build_gossipsub(&p2p_config.local_keypair, p2p_config), peer_info, request_response, diff --git a/fuel-p2p/src/config.rs b/fuel-p2p/src/config.rs index faebe73257b..1cdc4e0e1a2 100644 --- a/fuel-p2p/src/config.rs +++ b/fuel-p2p/src/config.rs @@ -1,11 +1,14 @@ use libp2p::{ core::{muxing::StreamMuxerBox, transport::Boxed}, - identity::Keypair, + identity::{secp256k1::SecretKey, Keypair}, mplex, noise, yamux, Multiaddr, PeerId, Transport, }; -use std::{net::IpAddr, time::Duration}; +use std::{ + net::{IpAddr, Ipv4Addr}, + time::Duration, +}; -pub const REQ_RES_TIMEOUT: Duration = Duration::from_secs(20); +const REQ_RES_TIMEOUT: Duration = Duration::from_secs(20); /// Maximum number of frames buffered per substream. const MAX_NUM_OF_FRAMES_BUFFERED: usize = 256; @@ -16,6 +19,8 @@ const TRANSPORT_TIMEOUT: Duration = Duration::from_secs(20); #[derive(Clone, Debug)] pub struct P2PConfig { + pub local_keypair: Keypair, + /// Name of the Network pub network_name: String, @@ -29,7 +34,7 @@ pub struct P2PConfig { pub max_block_size: usize, // `DiscoveryBehaviour` related fields - pub bootstrap_nodes: Vec<(PeerId, Multiaddr)>, + pub bootstrap_nodes: Vec, pub enable_mdns: bool, pub max_peers_connected: usize, pub allow_private_addresses: bool, @@ -52,16 +57,52 @@ pub struct P2PConfig { // RequestResponse related fields /// Sets the timeout for inbound and outbound requests. - pub set_request_timeout: Option, + pub set_request_timeout: Duration, /// Sets the keep-alive timeout of idle connections. - pub set_connection_keep_alive: Option, + pub set_connection_keep_alive: Duration, +} + +/// Takes secret key bytes generated outside of libp2p. +/// And converts it into libp2p's `Keypair::Secp256k1`. +pub fn convert_to_libp2p_keypair(secret_key_bytes: impl AsMut<[u8]>) -> anyhow::Result { + let secret_key = SecretKey::from_bytes(secret_key_bytes)?; + + Ok(Keypair::Secp256k1(secret_key.into())) +} + +impl P2PConfig { + pub fn default_with_network(network_name: &str) -> Self { + let local_keypair = Keypair::generate_secp256k1(); + + P2PConfig { + local_keypair, + network_name: network_name.into(), + address: IpAddr::V4(Ipv4Addr::from([0, 0, 0, 0])), + tcp_port: 0, + max_block_size: 100_000, + bootstrap_nodes: vec![], + enable_mdns: false, + max_peers_connected: 50, + allow_private_addresses: true, + enable_random_walk: true, + connection_idle_timeout: Some(Duration::from_secs(120)), + topics: vec![], + max_mesh_size: 12, + min_mesh_size: 4, + ideal_mesh_size: 6, + set_request_timeout: REQ_RES_TIMEOUT, + set_connection_keep_alive: REQ_RES_TIMEOUT, + info_interval: Some(Duration::from_secs(3)), + identify_interval: Some(Duration::from_secs(5)), + } + } } /// Transport for libp2p communication: /// TCP/IP, Websocket /// Noise as encryption layer /// mplex or yamux for multiplexing -pub async fn build_transport(local_keypair: Keypair) -> Boxed<(PeerId, StreamMuxerBox)> { +pub(crate) async fn build_transport(local_keypair: Keypair) -> Boxed<(PeerId, StreamMuxerBox)> { let transport = { let tcp = libp2p::tcp::TcpConfig::new().nodelay(true); let ws_tcp = libp2p::websocket::WsConfig::new(tcp.clone()).or_transport(tcp); diff --git a/fuel-p2p/src/discovery.rs b/fuel-p2p/src/discovery.rs index e47372553a9..4fb86311b1d 100644 --- a/fuel-p2p/src/discovery.rs +++ b/fuel-p2p/src/discovery.rs @@ -342,7 +342,7 @@ mod tests { /// helper function for building Discovery Behaviour for testing fn build_fuel_discovery( - bootstrap_nodes: Vec<(PeerId, Multiaddr)>, + bootstrap_nodes: Vec, ) -> (Swarm, Multiaddr, PeerId) { let keypair = Keypair::generate_secp256k1(); let public_key = keypair.public(); @@ -390,7 +390,15 @@ mod tests { let (first_swarm, first_peer_addr, first_peer_id) = build_fuel_discovery(vec![]); let mut discovery_swarms = (0..num_of_swarms - 1) - .map(|_| build_fuel_discovery(vec![(first_peer_id, first_peer_addr.clone())])) + .map(|_| { + build_fuel_discovery(vec![format!( + "{}/p2p/{}", + first_peer_addr.clone(), + first_peer_id + ) + .parse() + .unwrap()]) + }) .collect::>(); discovery_swarms.push_front((first_swarm, first_peer_addr, first_peer_id)); diff --git a/fuel-p2p/src/discovery/discovery_config.rs b/fuel-p2p/src/discovery/discovery_config.rs index 6dd6b168a51..b45823d381d 100644 --- a/fuel-p2p/src/discovery/discovery_config.rs +++ b/fuel-p2p/src/discovery/discovery_config.rs @@ -13,7 +13,7 @@ use tracing::warn; #[derive(Clone, Debug)] pub struct DiscoveryConfig { local_peer_id: PeerId, - bootstrap_nodes: Vec<(PeerId, Multiaddr)>, + bootstrap_nodes: Vec, with_mdns: bool, with_random_walk: bool, allow_private_addresses: bool, @@ -57,7 +57,7 @@ impl DiscoveryConfig { // List of bootstrap nodes to bootstrap the network pub fn with_bootstrap_nodes(&mut self, bootstrap_nodes: I) -> &mut Self where - I: IntoIterator, + I: IntoIterator, { self.bootstrap_nodes.extend(bootstrap_nodes); self @@ -92,8 +92,14 @@ impl DiscoveryConfig { kademlia_config.set_connection_idle_timeout(connection_idle_timeout); let mut kademlia = Kademlia::with_config(local_peer_id, memory_store, kademlia_config); - for (peer_id, addr) in &bootstrap_nodes { - kademlia.add_address(peer_id, addr.clone()); + // bootstrap nodes need to have their peer_id defined in the Multiaddr + let bootstrap_nodes = bootstrap_nodes + .into_iter() + .filter_map(|node| PeerId::try_from_multiaddr(&node).map(|peer_id| (peer_id, node))) + .collect::>(); + + for (peer_id, address) in &bootstrap_nodes { + kademlia.add_address(peer_id, address.clone()); } if let Err(e) = kademlia.bootstrap() { diff --git a/fuel-p2p/src/lib.rs b/fuel-p2p/src/lib.rs index 640227e682f..7cd14cae7e8 100644 --- a/fuel-p2p/src/lib.rs +++ b/fuel-p2p/src/lib.rs @@ -7,4 +7,5 @@ pub mod orchestrator; mod peer_info; mod request_response; mod service; -pub use libp2p::identity::Keypair; + +pub use libp2p::{Multiaddr, PeerId}; diff --git a/fuel-p2p/src/orchestrator.rs b/fuel-p2p/src/orchestrator.rs index a1b382b97eb..5fe84160568 100644 --- a/fuel-p2p/src/orchestrator.rs +++ b/fuel-p2p/src/orchestrator.rs @@ -1,15 +1,14 @@ -use std::error::Error; use std::sync::Arc; -use std::{future::Future, pin::Pin}; -use fuel_core_interfaces::{ - p2p::{BlockBroadcast, ConsensusBroadcast, P2pRequestEvent, TransactionBroadcast}, - relayer::RelayerDb, +use anyhow::anyhow; +use fuel_core_interfaces::p2p::{ + BlockBroadcast, ConsensusBroadcast, P2pDb, P2pRequestEvent, TransactionBroadcast, }; -use futures::{stream::futures_unordered::FuturesUnordered, StreamExt}; -use libp2p::identity::Keypair; + use libp2p::request_response::RequestId; use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::Mutex; +use tokio::task::JoinHandle; use tracing::warn; use crate::{ @@ -20,27 +19,24 @@ use crate::{ service::{FuelP2PEvent, FuelP2PService}, }; -type ResponseFuture = Pin>>>; - pub struct NetworkOrchestrator { - p2p_service: FuelP2PService, + p2p_config: P2PConfig, /// receives messages from different Fuel components rx_request_event: Receiver, + rx_outbound_responses: Receiver>, // senders tx_consensus: Sender, tx_transaction: Sender, tx_block: Sender, + tx_outbound_responses: Sender>, - db: Arc, - - outbound_responses: FuturesUnordered, + db: Arc, } impl NetworkOrchestrator { - pub async fn new( - local_keypair: Keypair, + pub fn new( p2p_config: P2PConfig, rx_request_event: Receiver, @@ -48,30 +44,33 @@ impl NetworkOrchestrator { tx_transaction: Sender, tx_block: Sender, - db: Arc, - ) -> Result> { - let p2p_service = FuelP2PService::new(local_keypair, p2p_config).await?; + db: Arc, + ) -> Self { + let (tx_outbound_responses, rx_outbound_responses) = tokio::sync::mpsc::channel(100); - Ok(Self { - p2p_service, + Self { + p2p_config, rx_request_event, + rx_outbound_responses, tx_block, tx_consensus, tx_transaction, + tx_outbound_responses, db, - outbound_responses: Default::default(), - }) + } } - pub async fn run(&mut self) { + pub async fn run(mut self) -> anyhow::Result { + let mut p2p_service = FuelP2PService::new(self.p2p_config.clone()).await?; + loop { tokio::select! { - next_response = self.outbound_responses.next() => { + next_response = self.rx_outbound_responses.recv() => { if let Some(Some((response, request_id))) = next_response { - let _ = self.p2p_service.send_response_msg(request_id, response); + let _ = p2p_service.send_response_msg(request_id, response); } }, - p2p_event = self.p2p_service.next_event() => { + p2p_event = p2p_service.next_event() => { if let FuelP2PEvent::Behaviour(behaviour_event) = p2p_event { match behaviour_event { FuelBehaviourEvent::GossipsubMessage { message, .. } => { @@ -91,12 +90,12 @@ impl NetworkOrchestrator { match request_message { RequestMessage::RequestBlock(block_height) => { let db = self.db.clone(); + let tx_outbound_response = self.tx_outbound_responses.clone(); - self.outbound_responses.push( - Box::pin(async move { - db.get_sealed_block(block_height).await.map(|block| (OutboundResponse::ResponseBlock(block), request_id)) - }) - ); + tokio::spawn(async move { + let res = db.get_sealed_block(block_height).await.map(|block| (OutboundResponse::ResponseBlock(block), request_id)); + let _ = tx_outbound_response.send(res); + }); } } }, @@ -110,20 +109,21 @@ impl NetworkOrchestrator { P2pRequestEvent::RequestBlock { height, response } => { let request_msg = RequestMessage::RequestBlock(height); let channel_item = ResponseChannelItem::ResponseBlock(response); - let _ = self.p2p_service.send_request_msg(None, request_msg, channel_item); + let _ = p2p_service.send_request_msg(None, request_msg, channel_item); }, P2pRequestEvent::BroadcastNewBlock { block } => { let broadcast = GossipsubBroadcastRequest::NewBlock(block); - let _ = self.p2p_service.publish_message(broadcast); + let _ = p2p_service.publish_message(broadcast); }, P2pRequestEvent::BroadcastNewTransaction { transaction } => { let broadcast = GossipsubBroadcastRequest::NewTx(transaction); - let _ = self.p2p_service.publish_message(broadcast); + let _ = p2p_service.publish_message(broadcast); }, P2pRequestEvent::BroadcastConsensusVote { vote } => { let broadcast = GossipsubBroadcastRequest::ConsensusVote(vote); - let _ = self.p2p_service.publish_message(broadcast); - } + let _ = p2p_service.publish_message(broadcast); + }, + P2pRequestEvent::Stop => break, } } else { warn!(target: "fuel-libp2p", "Failed to receive P2PRequestEvent"); @@ -131,5 +131,137 @@ impl NetworkOrchestrator { } } } + + Ok(self) + } +} + +pub struct Service { + /// Network Orchestrator that handles p2p network and inter-module communication + network_orchestrator: Arc>>, + /// Holds the spawned task when Netowrk Orchestrator is started + join: Mutex>>>, + /// Used for notifying the Network Orchestrator to stop + tx_request_event: Sender, +} + +impl Service { + pub fn new( + p2p_config: P2PConfig, + db: Arc, + tx_request_event: Sender, + rx_request_event: Receiver, + tx_consensus: Sender, + tx_transaction: Sender, + tx_block: Sender, + ) -> Self { + let network_orchestrator = NetworkOrchestrator::new( + p2p_config, + rx_request_event, + tx_consensus, + tx_transaction, + tx_block, + db, + ); + + Self { + join: Mutex::new(None), + network_orchestrator: Arc::new(Mutex::new(Some(network_orchestrator))), + tx_request_event, + } + } + + pub async fn start(&self) -> anyhow::Result<()> { + let mut join = self.join.lock().await; + + if join.is_none() { + if let Some(network_orchestrator) = self.network_orchestrator.lock().await.take() { + *join = Some(tokio::spawn(async { network_orchestrator.run().await })); + + Ok(()) + } else { + Err(anyhow!("Starting Network Orchestrator that is stopping")) + } + } else { + Err(anyhow!("Network Orchestrator already started")) + } + } + + pub async fn stop(&self) -> Option> { + let join_handle = self.join.lock().await.take(); + + if let Some(join_handle) = join_handle { + let network_orchestrator = self.network_orchestrator.clone(); + let _ = self.tx_request_event.send(P2pRequestEvent::Stop).await; + Some(tokio::spawn(async move { + if let Ok(res) = join_handle.await { + *network_orchestrator.lock().await = res.ok(); + } + })) + } else { + None + } + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + use async_trait::async_trait; + use fuel_core_interfaces::model::{ + BlockHeight, FuelBlock, FuelBlockConsensus, SealedFuelBlock, + }; + use tokio::time::{sleep, Duration}; + + #[derive(Clone, Debug)] + struct FakeDb; + + #[async_trait] + impl P2pDb for FakeDb { + async fn get_sealed_block(&self, _height: BlockHeight) -> Option> { + let block = FuelBlock { + header: Default::default(), + transactions: vec![], + }; + + Some(Arc::new(SealedFuelBlock { + block, + consensus: FuelBlockConsensus { + required_stake: 100_000, + validators: Default::default(), + }, + })) + } + } + + #[tokio::test] + async fn start_stop_works() { + let p2p_config = P2PConfig::default_with_network("start_stop_works"); + let db: Arc = Arc::new(FakeDb); + + let (tx_request_event, rx_request_event) = tokio::sync::mpsc::channel(100); + let (tx_consensus, _) = tokio::sync::mpsc::channel(100); + let (tx_transaction, _) = tokio::sync::mpsc::channel(100); + let (tx_block, _) = tokio::sync::mpsc::channel(100); + + let service = Service::new( + p2p_config, + db.clone(), + tx_request_event, + rx_request_event, + tx_consensus, + tx_transaction, + tx_block, + ); + + // Node with p2p service started + assert!(service.start().await.is_ok()); + sleep(Duration::from_secs(1)).await; + // Node with p2p service stopped + assert!(service.stop().await.is_some()); + sleep(Duration::from_secs(1)).await; + + // Node with p2p service successfully restarted + assert!(service.start().await.is_ok()); } } diff --git a/fuel-p2p/src/service.rs b/fuel-p2p/src/service.rs index 9ae82e94936..d2f84a97679 100644 --- a/fuel-p2p/src/service.rs +++ b/fuel-p2p/src/service.rs @@ -12,14 +12,13 @@ use crate::{ use futures::prelude::*; use libp2p::{ gossipsub::{error::PublishError, MessageId, Topic}, - identity::Keypair, multiaddr::Protocol, request_response::RequestId, swarm::SwarmEvent, Multiaddr, PeerId, Swarm, }; use rand::Rng; -use std::{collections::HashMap, error::Error}; +use std::collections::HashMap; /// Listens to the events on the p2p network /// And forwards them to the Orchestrator @@ -38,16 +37,12 @@ pub enum FuelP2PEvent { } impl FuelP2PService { - pub async fn new(local_keypair: Keypair, config: P2PConfig) -> Result> { - let local_peer_id = PeerId::from(local_keypair.public()); + pub async fn new(config: P2PConfig) -> anyhow::Result { + let local_peer_id = PeerId::from(config.local_keypair.public()); // configure and build P2P Service - let transport = build_transport(local_keypair.clone()).await; - let behaviour = FuelBehaviour::new( - local_keypair, - &config, - BincodeCodec::new(config.max_block_size), - ); + let transport = build_transport(config.local_keypair.clone()).await; + let behaviour = FuelBehaviour::new(&config, BincodeCodec::new(config.max_block_size)); let mut swarm = Swarm::new(transport, behaviour, local_peer_id); // set up node's address to listen on @@ -149,41 +144,15 @@ mod tests { use ctor::ctor; use fuel_core_interfaces::common::fuel_tx::Transaction; use fuel_core_interfaces::model::{ConsensusVote, FuelBlock}; - use libp2p::{gossipsub::Topic, identity::Keypair}; + use libp2p::gossipsub::Topic; + use libp2p::identity::Keypair; + use libp2p::{Multiaddr, PeerId}; use std::collections::HashMap; - use std::{ - net::{IpAddr, Ipv4Addr}, - sync::Arc, - time::Duration, - }; + use std::{sync::Arc, time::Duration}; use tokio::sync::{mpsc, oneshot}; use tracing_attributes::instrument; use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter}; - /// helper function for building default testing config - fn build_p2p_config(network_name: &str) -> P2PConfig { - P2PConfig { - network_name: network_name.into(), - address: IpAddr::V4(Ipv4Addr::from([0, 0, 0, 0])), - tcp_port: 4000, - max_block_size: 100_000, - bootstrap_nodes: vec![], - enable_mdns: false, - max_peers_connected: 50, - allow_private_addresses: true, - enable_random_walk: true, - connection_idle_timeout: Some(Duration::from_secs(120)), - topics: vec![], - max_mesh_size: 12, - min_mesh_size: 4, - ideal_mesh_size: 6, - set_request_timeout: None, - set_connection_keep_alive: None, - info_interval: Some(Duration::from_secs(3)), - identify_interval: Some(Duration::from_secs(5)), - } - } - /// Conditionally initializes tracing, depending if RUST_LOG env variable is set /// Logs to stderr & to a file #[ctor] @@ -208,16 +177,21 @@ mod tests { } /// helper function for building FuelP2PService - async fn build_fuel_p2p_service(p2p_config: P2PConfig) -> FuelP2PService { - let keypair = Keypair::generate_secp256k1(); - FuelP2PService::new(keypair, p2p_config).await.unwrap() + async fn build_fuel_p2p_service(mut p2p_config: P2PConfig) -> FuelP2PService { + p2p_config.local_keypair = Keypair::generate_secp256k1(); // change keypair for each Node + FuelP2PService::new(p2p_config).await.unwrap() + } + + /// attaches PeerId to the Multiaddr + fn build_bootstrap_node(peer_id: PeerId, address: Multiaddr) -> Multiaddr { + format!("{}/p2p/{}", address, peer_id).parse().unwrap() } #[tokio::test] #[instrument] async fn p2p_service_works() { let mut fuel_p2p_service = - build_fuel_p2p_service(build_p2p_config("p2p_service_works")).await; + build_fuel_p2p_service(P2PConfig::default_with_network("p2p_service_works")).await; loop { match fuel_p2p_service.next_event().await { @@ -239,13 +213,11 @@ mod tests { #[instrument] async fn nodes_connected_via_mdns() { // Node A - let mut p2p_config = build_p2p_config("nodes_connected_via_mdns"); - p2p_config.tcp_port = 4001; + let mut p2p_config = P2PConfig::default_with_network("nodes_connected_via_mdns"); p2p_config.enable_mdns = true; let mut node_a = build_fuel_p2p_service(p2p_config.clone()).await; // Node B - p2p_config.tcp_port = 4002; let mut node_b = build_fuel_p2p_service(p2p_config).await; loop { @@ -270,8 +242,7 @@ mod tests { #[instrument] async fn nodes_connected_via_identify() { // Node A - let mut p2p_config = build_p2p_config("nodes_connected_via_identify"); - p2p_config.tcp_port = 4003; + let mut p2p_config = P2PConfig::default_with_network("nodes_connected_via_identify"); let mut node_a = build_fuel_p2p_service(p2p_config.clone()).await; let node_a_address = match node_a.next_event().await { @@ -280,12 +251,13 @@ mod tests { }; // Node B - p2p_config.tcp_port = 4004; - p2p_config.bootstrap_nodes = vec![(node_a.local_peer_id, node_a_address.clone().unwrap())]; + p2p_config.bootstrap_nodes = vec![build_bootstrap_node( + node_a.local_peer_id, + node_a_address.clone().unwrap(), + )]; let mut node_b = build_fuel_p2p_service(p2p_config.clone()).await; // Node C - p2p_config.tcp_port = 4005; let mut node_c = build_fuel_p2p_service(p2p_config).await; loop { @@ -316,8 +288,7 @@ mod tests { #[instrument] async fn peer_info_updates_work() { // Node A - let mut p2p_config = build_p2p_config("peer_info_updates_work"); - p2p_config.tcp_port = 4006; + let mut p2p_config = P2PConfig::default_with_network("peer_info_updates_work"); let mut node_a = build_fuel_p2p_service(p2p_config.clone()).await; let node_a_address = match node_a.next_event().await { @@ -326,8 +297,10 @@ mod tests { }; // Node B - p2p_config.tcp_port = 4007; - p2p_config.bootstrap_nodes = vec![(node_a.local_peer_id, node_a_address.clone().unwrap())]; + p2p_config.bootstrap_nodes = vec![build_bootstrap_node( + node_a.local_peer_id, + node_a_address.clone().unwrap(), + )]; let mut node_b = build_fuel_p2p_service(p2p_config).await; loop { @@ -357,43 +330,33 @@ mod tests { #[tokio::test] #[instrument] async fn gossipsub_broadcast_tx() { - gossipsub_broadcast( - GossipsubBroadcastRequest::NewTx(Arc::new(Transaction::default())), - 4008, - 4009, - ) + gossipsub_broadcast(GossipsubBroadcastRequest::NewTx(Arc::new( + Transaction::default(), + ))) .await; } #[tokio::test] #[instrument] async fn gossipsub_broadcast_vote() { - gossipsub_broadcast( - GossipsubBroadcastRequest::ConsensusVote(Arc::new(ConsensusVote::default())), - 4010, - 4011, - ) + gossipsub_broadcast(GossipsubBroadcastRequest::ConsensusVote(Arc::new( + ConsensusVote::default(), + ))) .await; } #[tokio::test] #[instrument] async fn gossipsub_broadcast_block() { - gossipsub_broadcast( - GossipsubBroadcastRequest::NewBlock(Arc::new(FuelBlock::default())), - 4012, - 4013, - ) + gossipsub_broadcast(GossipsubBroadcastRequest::NewBlock(Arc::new( + FuelBlock::default(), + ))) .await; } /// Reusable helper function for Broadcasting Gossipsub requests - async fn gossipsub_broadcast( - broadcast_request: GossipsubBroadcastRequest, - port_a: u16, - port_b: u16, - ) { - let mut p2p_config = build_p2p_config("gossipsub_exchanges_messages"); + async fn gossipsub_broadcast(broadcast_request: GossipsubBroadcastRequest) { + let mut p2p_config = P2PConfig::default_with_network("gossipsub_exchanges_messages"); let topics = vec![ NEW_TX_GOSSIP_TOPIC.into(), NEW_BLOCK_GOSSIP_TOPIC.into(), @@ -413,7 +376,6 @@ mod tests { let mut message_sent = false; // Node A - p2p_config.tcp_port = port_a; p2p_config.topics = topics.clone(); let mut node_a = build_fuel_p2p_service(p2p_config.clone()).await; @@ -423,8 +385,10 @@ mod tests { }; // Node B - p2p_config.tcp_port = port_b; - p2p_config.bootstrap_nodes = vec![(node_a.local_peer_id, node_a_address.clone().unwrap())]; + p2p_config.bootstrap_nodes = vec![build_bootstrap_node( + node_a.local_peer_id, + node_a_address.clone().unwrap(), + )]; let mut node_b = build_fuel_p2p_service(p2p_config.clone()).await; loop { @@ -489,10 +453,9 @@ mod tests { FuelBlock, FuelBlockConsensus, FuelBlockHeader, SealedFuelBlock, }; - let mut p2p_config = build_p2p_config("request_response_works"); + let mut p2p_config = P2PConfig::default_with_network("request_response_works"); // Node A - p2p_config.tcp_port = 4014; let mut node_a = build_fuel_p2p_service(p2p_config.clone()).await; let node_a_address = match node_a.next_event().await { @@ -501,8 +464,10 @@ mod tests { }; // Node B - p2p_config.tcp_port = 4015; - p2p_config.bootstrap_nodes = vec![(node_a.local_peer_id, node_a_address.clone().unwrap())]; + p2p_config.bootstrap_nodes = vec![build_bootstrap_node( + node_a.local_peer_id, + node_a_address.clone().unwrap(), + )]; let mut node_b = build_fuel_p2p_service(p2p_config.clone()).await; let (tx_test_end, mut rx_test_end) = mpsc::channel(1); @@ -576,12 +541,11 @@ mod tests { #[tokio::test] #[instrument] async fn req_res_outbound_timeout_works() { - let mut p2p_config = build_p2p_config("req_res_outbound_timeout_works"); + let mut p2p_config = P2PConfig::default_with_network("req_res_outbound_timeout_works"); // Node A - p2p_config.tcp_port = 4016; // setup request timeout to 0 in order for the Request to fail - p2p_config.set_request_timeout = Some(Duration::from_secs(0)); + p2p_config.set_request_timeout = Duration::from_secs(0); let mut node_a = build_fuel_p2p_service(p2p_config.clone()).await; let node_a_address = match node_a.next_event().await { @@ -590,8 +554,10 @@ mod tests { }; // Node B - p2p_config.tcp_port = 4017; - p2p_config.bootstrap_nodes = vec![(node_a.local_peer_id, node_a_address.clone().unwrap())]; + p2p_config.bootstrap_nodes = vec![build_bootstrap_node( + node_a.local_peer_id, + node_a_address.clone().unwrap(), + )]; let mut node_b = build_fuel_p2p_service(p2p_config.clone()).await; let (tx_test_end, mut rx_test_end) = tokio::sync::mpsc::channel(1); diff --git a/fuel-sync/src/service.rs b/fuel-sync/src/service.rs index 06a70ff7112..4b0fc16e11b 100644 --- a/fuel-sync/src/service.rs +++ b/fuel-sync/src/service.rs @@ -1,6 +1,10 @@ use crate::Config; use fuel_core_interfaces::{ - bft::BftMpsc, block_importer::ImportBlockMpsc, relayer, sync::SyncMpsc, + bft::BftMpsc, + block_importer::ImportBlockMpsc, + p2p::{BlockBroadcast, P2pRequestEvent}, + relayer, + sync::SyncMpsc, }; use parking_lot::Mutex; use tokio::{sync::mpsc, task::JoinHandle}; @@ -21,8 +25,8 @@ impl Service { pub async fn start( &self, - _p2p_block: (), // broadcast::Receiver, - _p2p_request: (), // mpsc::Sender, + _p2p_block: mpsc::Receiver, + _p2p_request: mpsc::Sender, _relayer: relayer::Sender, _bft: mpsc::Sender, _block_importer: mpsc::Sender,