From fba37e05034c30601aa6d7753dea5115a42eb7d0 Mon Sep 17 00:00:00 2001 From: leviathanbeak88 Date: Fri, 7 Oct 2022 15:51:40 +0200 Subject: [PATCH 01/22] remove loop from next_event --- fuel-p2p/src/orchestrator.rs | 4 ++-- fuel-p2p/src/service.rs | 33 ++++++++++++++++----------------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/fuel-p2p/src/orchestrator.rs b/fuel-p2p/src/orchestrator.rs index 435f0cc3c20..463b68882b1 100644 --- a/fuel-p2p/src/orchestrator.rs +++ b/fuel-p2p/src/orchestrator.rs @@ -97,7 +97,7 @@ impl NetworkOrchestrator { }, p2p_event = p2p_service.next_event() => { match p2p_event { - FuelP2PEvent::GossipsubMessage { message, .. } => { + Some(FuelP2PEvent::GossipsubMessage { message, .. }) => { match message { GossipsubMessage::NewTx(tx) => { let _ = self.tx_transaction.send(TransactionBroadcast::NewTransaction(tx)); @@ -110,7 +110,7 @@ impl NetworkOrchestrator { }, } }, - FuelP2PEvent::RequestMessage { request_message, request_id } => { + Some(FuelP2PEvent::RequestMessage { request_message, request_id }) => { match request_message { RequestMessage::RequestBlock(block_height) => { let db = self.db.clone(); diff --git a/fuel-p2p/src/service.rs b/fuel-p2p/src/service.rs index 95c8bb20fc1..c4b6b3a2e44 100644 --- a/fuel-p2p/src/service.rs +++ b/fuel-p2p/src/service.rs @@ -239,16 +239,15 @@ impl FuelP2PService { Ok(()) } - pub async fn next_event(&mut self) -> FuelP2PEvent { - loop { - if let SwarmEvent::Behaviour(fuel_behaviour) = - self.swarm.select_next_some().await - { - if let Some(event) = self.handle_behaviour_event(fuel_behaviour) { - return event - } - } + /// Handles P2P Events. + /// Returns only events that are of interest to the Network Orchestrator. + pub async fn next_event(&mut self) -> Option { + if let SwarmEvent::Behaviour(fuel_behaviour) = self.swarm.select_next_some().await + { + return self.handle_behaviour_event(fuel_behaviour) } + + None } fn handle_behaviour_event( @@ -510,7 +509,7 @@ mod tests { loop { tokio::select! { node_b_event = node_b.next_event() => { - if let FuelP2PEvent::PeerConnected(_) = node_b_event { + if let Some(FuelP2PEvent::PeerConnected(_)) = node_b_event { // successfully connected to Node B break } @@ -558,7 +557,7 @@ mod tests { }, node_c_event = node_c.next_event() => { - if let FuelP2PEvent::PeerConnected(peer_id) = node_c_event { + if let Some(FuelP2PEvent::PeerConnected(peer_id)) = node_c_event { // we have connected to Node B! if peer_id == node_b.local_peer_id { break @@ -594,7 +593,7 @@ mod tests { loop { tokio::select! { node_a_event = node_a.next_event() => { - if let FuelP2PEvent::PeerInfoUpdated(peer_id) = node_a_event { + if let Some(FuelP2PEvent::PeerInfoUpdated(peer_id)) = node_a_event { if let Some(PeerInfo { peer_addresses, latest_ping, client_version, .. }) = node_a.swarm.behaviour().get_peer_info(&peer_id) { // Exits after it verifies that: // 1. Peer Addresses are known @@ -683,7 +682,7 @@ mod tests { loop { tokio::select! { node_a_event = node_a.next_event() => { - if let FuelP2PEvent::PeerInfoUpdated(peer_id) = node_a_event { + if let Some(FuelP2PEvent::PeerInfoUpdated(peer_id)) = node_a_event { if let Some(PeerInfo { peer_addresses, .. }) = node_a.swarm.behaviour().get_peer_info(&peer_id) { // verifies that we've got at least a single peer address to send message to if !peer_addresses.is_empty() && !message_sent { @@ -697,7 +696,7 @@ mod tests { tracing::info!("Node A Event: {:?}", node_a_event); }, node_b_event = node_b.next_event() => { - if let FuelP2PEvent::GossipsubMessage { topic_hash, message, .. } = node_b_event.clone() { + if let Some(FuelP2PEvent::GossipsubMessage { topic_hash, message, .. }) = node_b_event.clone() { if topic_hash != selected_topic.hash() { tracing::error!("Wrong topic hash, expected: {} - actual: {}", selected_topic.hash(), topic_hash); panic!("Wrong Topic"); @@ -776,7 +775,7 @@ mod tests { break; } node_a_event = node_a.next_event() => { - if let FuelP2PEvent::PeerInfoUpdated(peer_id) = node_a_event { + if let Some(FuelP2PEvent::PeerInfoUpdated(peer_id)) = node_a_event { if let Some(PeerInfo { peer_addresses, .. }) = node_a.swarm.behaviour().get_peer_info(&peer_id) { // 0. verifies that we've got at least a single peer address to request message from if !peer_addresses.is_empty() && !request_sent { @@ -809,7 +808,7 @@ mod tests { }, node_b_event = node_b.next_event() => { // 2. Node B receives the RequestMessage from Node A initiated by the NetworkOrchestrator - if let FuelP2PEvent::RequestMessage{ request_id, .. } = node_b_event { + if let Some(FuelP2PEvent::RequestMessage{ request_id, .. }) = node_b_event { let block = FuelBlock { header: FuelBlockHeader::default(), transactions: vec![Transaction::default(), Transaction::default(), Transaction::default(), Transaction::default(), Transaction::default()], @@ -863,7 +862,7 @@ mod tests { loop { tokio::select! { node_a_event = node_a.next_event() => { - if let FuelP2PEvent::PeerInfoUpdated(peer_id) = node_a_event { + if let Some(FuelP2PEvent::PeerInfoUpdated(peer_id)) = node_a_event { if let Some(PeerInfo { peer_addresses, .. }) = node_a.swarm.behaviour().get_peer_info(&peer_id) { // 0. verifies that we've got at least a single peer address to request message from if !peer_addresses.is_empty() && !request_sent { From ee9618e4864ea89903f3a9bcecb766d9ef7813f7 Mon Sep 17 00:00:00 2001 From: leviathanbeak88 Date: Tue, 11 Oct 2022 18:37:38 +0200 Subject: [PATCH 02/22] initial work --- fuel-core-interfaces/src/p2p.rs | 24 +++++++++ fuel-p2p/src/behavior.rs | 16 +++++- fuel-p2p/src/config.rs | 12 ++++- fuel-p2p/src/gossipsub/builder.rs | 9 +++- fuel-p2p/src/orchestrator.rs | 89 +++++++++++++++++++++++++------ fuel-p2p/src/service.rs | 38 ++++++++++++- 6 files changed, 166 insertions(+), 22 deletions(-) diff --git a/fuel-core-interfaces/src/p2p.rs b/fuel-core-interfaces/src/p2p.rs index dbe565b0685..531da3dddf8 100644 --- a/fuel-core-interfaces/src/p2p.rs +++ b/fuel-core-interfaces/src/p2p.rs @@ -8,6 +8,10 @@ use crate::{ model::ConsensusVote, }; use async_trait::async_trait; +use serde::{ + Deserialize, + Serialize, +}; use std::sync::Arc; use tokio::sync::oneshot; @@ -25,6 +29,22 @@ pub enum BlockBroadcast { NewBlock(FuelBlock), } +#[derive(Debug)] +pub enum GossipsubMessageAcceptance { + Accept, + Reject, + Ignore, +} + +#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)] +pub struct GossipsubMessageId(pub Vec); + +impl From> for GossipsubMessageId { + fn from(message_id: Vec) -> Self { + GossipsubMessageId(message_id) + } +} + #[derive(Debug)] pub enum P2pRequestEvent { RequestBlock { @@ -40,6 +60,10 @@ pub enum P2pRequestEvent { BroadcastConsensusVote { vote: Arc, }, + GossipsubMessageReport { + gossip_id: GossipsubMessageId, + acceptance: GossipsubMessageAcceptance, + }, Stop, } diff --git a/fuel-p2p/src/behavior.rs b/fuel-p2p/src/behavior.rs index 1b991fa0b7e..a345b07a375 100644 --- a/fuel-p2p/src/behavior.rs +++ b/fuel-p2p/src/behavior.rs @@ -28,6 +28,7 @@ use libp2p::{ }, Gossipsub, GossipsubEvent, + MessageAcceptance, MessageId, }, request_response::{ @@ -130,7 +131,7 @@ impl FuelBehaviour { Self { discovery: discovery_config.finish(), - gossipsub: build_gossipsub(&p2p_config.local_keypair, p2p_config), + gossipsub: build_gossipsub(&p2p_config.local_keypair, p2p_config.clone()), peer_info, request_response, } @@ -190,6 +191,19 @@ impl FuelBehaviour { self.request_response.send_response(channel, message) } + pub fn report_message_validation_result( + &mut self, + msg_id: &MessageId, + propagation_source: &PeerId, + acceptance: MessageAcceptance, + ) -> Result { + self.gossipsub.report_message_validation_result( + msg_id, + propagation_source, + acceptance, + ) + } + // Currently only used in testing, but should be useful for the NetworkOrchestrator API #[allow(dead_code)] pub fn get_peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> { diff --git a/fuel-p2p/src/config.rs b/fuel-p2p/src/config.rs index 4bace0e1fb1..9f00b2eb8dd 100644 --- a/fuel-p2p/src/config.rs +++ b/fuel-p2p/src/config.rs @@ -28,9 +28,15 @@ use std::{ IpAddr, Ipv4Addr, }, + sync::Arc, time::Duration, }; +pub use libp2p::gossipsub::{ + GossipsubMessage, + MessageId, +}; + const REQ_RES_TIMEOUT: Duration = Duration::from_secs(20); /// Maximum number of frames buffered per substream. @@ -40,7 +46,7 @@ const MAX_NUM_OF_FRAMES_BUFFERED: usize = 256; /// inbound and outbound connections established through the transport. const TRANSPORT_TIMEOUT: Duration = Duration::from_secs(20); -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct P2PConfig { pub local_keypair: Keypair, @@ -77,6 +83,8 @@ pub struct P2PConfig { pub ideal_mesh_size: usize, pub min_mesh_size: usize, pub max_mesh_size: usize, + pub message_id_fn: + Option MessageId + Send + Sync + 'static>>, // RequestResponse related fields /// Sets the timeout for inbound and outbound requests. @@ -119,6 +127,8 @@ impl P2PConfig { max_mesh_size: 12, min_mesh_size: 4, ideal_mesh_size: 6, + // default value set in gossipsub builder + message_id_fn: None, set_request_timeout: REQ_RES_TIMEOUT, set_connection_keep_alive: REQ_RES_TIMEOUT, info_interval: Some(Duration::from_secs(3)), diff --git a/fuel-p2p/src/gossipsub/builder.rs b/fuel-p2p/src/gossipsub/builder.rs index a9f3f8f9310..6f4a1e625dc 100644 --- a/fuel-p2p/src/gossipsub/builder.rs +++ b/fuel-p2p/src/gossipsub/builder.rs @@ -19,9 +19,13 @@ use sha2::{ use crate::config::P2PConfig; -pub fn build_gossipsub(local_key: &Keypair, p2p_config: &P2PConfig) -> Gossipsub { +pub fn build_gossipsub(local_key: &Keypair, p2p_config: P2PConfig) -> Gossipsub { let gossip_message_id = move |message: &GossipsubMessage| { - MessageId::from(&Sha256::digest(&message.data)[..20]) + p2p_config + .message_id_fn + .clone() + .map(|f| f(message)) + .unwrap_or(MessageId::from(&Sha256::digest(&message.data)[..20])) }; let fast_gossip_message_id = move |message: &RawGossipsubMessage| { @@ -35,6 +39,7 @@ pub fn build_gossipsub(local_key: &Keypair, p2p_config: &P2PConfig) -> Gossipsub .mesh_n_high(p2p_config.max_mesh_size) .message_id_fn(gossip_message_id) .fast_message_id_fn(fast_gossip_message_id) + .validate_messages() .build() .expect("valid gossipsub configuration"); diff --git a/fuel-p2p/src/orchestrator.rs b/fuel-p2p/src/orchestrator.rs index 463b68882b1..f67815be237 100644 --- a/fuel-p2p/src/orchestrator.rs +++ b/fuel-p2p/src/orchestrator.rs @@ -1,14 +1,27 @@ -use std::sync::Arc; +use std::{ + collections::HashMap, + sync::Arc, +}; use anyhow::anyhow; + use fuel_core_interfaces::p2p::{ BlockBroadcast, ConsensusBroadcast, + GossipsubMessageAcceptance, + GossipsubMessageId, P2pDb, P2pRequestEvent, TransactionBroadcast, }; -use libp2p::request_response::RequestId; +use libp2p::{ + gossipsub::{ + MessageAcceptance, + MessageId, + }, + request_response::RequestId, + PeerId, +}; use tokio::{ sync::{ broadcast, @@ -20,7 +33,10 @@ use tokio::{ }, task::JoinHandle, }; -use tracing::warn; +use tracing::{ + info, + warn, +}; use crate::{ codecs::bincode::BincodeCodec, @@ -40,6 +56,12 @@ use crate::{ }, }; +type ConsensusWithMsgId = GossipData; +type TransactionWithMsgId = GossipData; +type BlockWithMsgId = GossipData; + +type MessageIdWithPeer = (MessageId, PeerId); + pub struct NetworkOrchestrator { p2p_config: P2PConfig, @@ -48,12 +70,21 @@ pub struct NetworkOrchestrator { rx_outbound_responses: Receiver>, // senders - tx_consensus: Sender, - tx_transaction: broadcast::Sender, - tx_block: Sender, + tx_consensus: Sender, + tx_transaction: broadcast::Sender, + tx_block: Sender, tx_outbound_responses: Sender>, db: Arc, + + message_cache: HashMap, +} + +#[derive(Debug, Clone)] +pub struct GossipData { + pub data: T, + pub peer_id: PeerId, + pub message_id: MessageId, } impl NetworkOrchestrator { @@ -61,9 +92,9 @@ impl NetworkOrchestrator { p2p_config: P2PConfig, rx_request_event: Receiver, - tx_consensus: Sender, - tx_transaction: broadcast::Sender, - tx_block: Sender, + tx_consensus: Sender, + tx_transaction: broadcast::Sender, + tx_block: Sender, db: Arc, ) -> Self { @@ -79,6 +110,7 @@ impl NetworkOrchestrator { tx_transaction, tx_outbound_responses, db, + message_cache: Default::default(), } } @@ -97,16 +129,17 @@ impl NetworkOrchestrator { }, p2p_event = p2p_service.next_event() => { match p2p_event { - Some(FuelP2PEvent::GossipsubMessage { message, .. }) => { + Some(FuelP2PEvent::GossipsubMessage { message, message_id, peer_id,.. }) => { + match message { GossipsubMessage::NewTx(tx) => { - let _ = self.tx_transaction.send(TransactionBroadcast::NewTransaction(tx)); + let _ = self.tx_transaction.send(GossipData { data: TransactionBroadcast::NewTransaction(tx), message_id, peer_id}); }, GossipsubMessage::NewBlock(block) => { - let _ = self.tx_block.send(BlockBroadcast::NewBlock(block)); + let _ = self.tx_block.send(GossipData { data: BlockBroadcast::NewBlock(block), message_id, peer_id }); }, GossipsubMessage::ConsensusVote(vote) => { - let _ = self.tx_consensus.send(ConsensusBroadcast::NewVote(vote)); + let _ = self.tx_consensus.send(GossipData {data: ConsensusBroadcast::NewVote(vote), message_id, peer_id } ); }, } }, @@ -146,6 +179,30 @@ impl NetworkOrchestrator { let broadcast = GossipsubBroadcastRequest::ConsensusVote(vote); let _ = p2p_service.publish_message(broadcast); }, + P2pRequestEvent::GossipsubMessageReport { gossip_id, acceptance } => { + if let Some((msg_id, peer_id)) = self.message_cache.remove(&gossip_id) { + + let acceptance = match acceptance { + GossipsubMessageAcceptance::Accept => MessageAcceptance::Accept, + GossipsubMessageAcceptance::Reject => MessageAcceptance::Reject, + GossipsubMessageAcceptance::Ignore => MessageAcceptance::Ignore + }; + + match p2p_service.report_message_validation_result(&msg_id, &peer_id, acceptance) { + Ok(true) => { + info!(target: "fuel-libp2p", "Sent a report for MessageId: {} from PeerId: {}", msg_id, peer_id); + } + Ok(false) => { + warn!(target: "fuel-libp2p", "Message with MessageId: {} not found in the Gossipsub Message Cache", msg_id); + } + Err(e) => { + warn!(target: "fuel-libp2p", "Failed to publish Message with MessageId: {} with Error: {:?}", msg_id, e); + } + } + } else { + warn!(target: "fuel-libp2p", "Message with GossipMessageId: {:?} not found in the Network Orchestrator Message Cache", gossip_id); + } + } P2pRequestEvent::Stop => break, } } else { @@ -174,9 +231,9 @@ impl Service { db: Arc, tx_request_event: Sender, rx_request_event: Receiver, - tx_consensus: Sender, - tx_transaction: broadcast::Sender, - tx_block: Sender, + tx_consensus: Sender, + tx_transaction: broadcast::Sender, + tx_block: Sender, ) -> Self { let network_orchestrator = NetworkOrchestrator::new( p2p_config, diff --git a/fuel-p2p/src/service.rs b/fuel-p2p/src/service.rs index c4b6b3a2e44..a797b1b916f 100644 --- a/fuel-p2p/src/service.rs +++ b/fuel-p2p/src/service.rs @@ -31,11 +31,13 @@ use crate::{ ResponseMessage, }, }; + use futures::prelude::*; use libp2p::{ gossipsub::{ error::PublishError, GossipsubEvent, + MessageAcceptance, MessageId, Topic, TopicHash, @@ -59,6 +61,7 @@ use rand::Rng; use std::collections::HashMap; use tracing::{ debug, + info, warn, }; @@ -98,6 +101,7 @@ struct NetworkMetadata { pub enum FuelP2PEvent { GossipsubMessage { peer_id: PeerId, + message_id: MessageId, topic_hash: TopicHash, message: FuelGossipsubMessage, }, @@ -239,6 +243,19 @@ impl FuelP2PService { Ok(()) } + pub fn report_message_validation_result( + &mut self, + msg_id: &MessageId, + propagation_source: &PeerId, + acceptance: MessageAcceptance, + ) -> Result { + self.swarm.behaviour_mut().report_message_validation_result( + msg_id, + propagation_source, + acceptance, + ) + } + /// Handles P2P Events. /// Returns only events that are of interest to the Network Orchestrator. pub async fn next_event(&mut self) -> Option { @@ -272,7 +289,7 @@ impl FuelP2PService { if let GossipsubEvent::Message { propagation_source, message, - .. + message_id, } = gossipsub_event { if let Some(correct_topic) = self @@ -284,12 +301,29 @@ impl FuelP2PService { Ok(decoded_message) => { return Some(FuelP2PEvent::GossipsubMessage { peer_id: propagation_source, + message_id, topic_hash: message.topic, message: decoded_message, }) } Err(err) => { - warn!(target: "fuel-libp2p", "Failed to decode a message: {:?} with error: {:?}", &message.data, err); + warn!(target: "fuel-libp2p", "Failed to decode a message. ID: {}, Message: {:?} with error: {:?}", message_id, &message.data, err); + + match self.report_message_validation_result( + &message_id, + &propagation_source, + MessageAcceptance::Reject, + ) { + Ok(false) => { + warn!(target: "fuel-libp2p", "Message was not found in the cache, peer with PeerId: {} has been reported.", propagation_source); + } + Ok(true) => { + info!(target: "fuel-libp2p", "Message found in the cache, peer with PeerId: {} has been reported.", propagation_source); + } + Err(e) => { + warn!(target: "fuel-libp2p", "Failed to publish the message with following error: {:?}.", e); + } + } } } } else { From 4030736e601eb5f0f6bea094ebde0eb2b4fe8254 Mon Sep 17 00:00:00 2001 From: leviathanbeak88 Date: Tue, 11 Oct 2022 18:39:40 +0200 Subject: [PATCH 03/22] more work --- fuel-core-interfaces/src/p2p.rs | 2 ++ fuel-p2p/src/orchestrator.rs | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/fuel-core-interfaces/src/p2p.rs b/fuel-core-interfaces/src/p2p.rs index 531da3dddf8..d6ed1f57ebc 100644 --- a/fuel-core-interfaces/src/p2p.rs +++ b/fuel-core-interfaces/src/p2p.rs @@ -20,10 +20,12 @@ pub enum TransactionBroadcast { NewTransaction(Transaction), } +#[derive(Debug, PartialEq, Eq, Clone)] pub enum ConsensusBroadcast { NewVote(ConsensusVote), } +#[derive(Debug, Clone)] pub enum BlockBroadcast { /// fuel block without consensus data NewBlock(FuelBlock), diff --git a/fuel-p2p/src/orchestrator.rs b/fuel-p2p/src/orchestrator.rs index f67815be237..4286742683e 100644 --- a/fuel-p2p/src/orchestrator.rs +++ b/fuel-p2p/src/orchestrator.rs @@ -70,7 +70,7 @@ pub struct NetworkOrchestrator { rx_outbound_responses: Receiver>, // senders - tx_consensus: Sender, + tx_consensus: Sender>, tx_transaction: broadcast::Sender, tx_block: Sender, tx_outbound_responses: Sender>, @@ -81,7 +81,7 @@ pub struct NetworkOrchestrator { } #[derive(Debug, Clone)] -pub struct GossipData { +pub struct GossipData { pub data: T, pub peer_id: PeerId, pub message_id: MessageId, From 2afb2942d23814256ae0fa4d56c24f530c2796d1 Mon Sep 17 00:00:00 2001 From: Brandon Kite Date: Tue, 11 Oct 2022 10:07:27 -0700 Subject: [PATCH 04/22] trying to use an alternative approach to message id cache --- fuel-core-interfaces/src/p2p.rs | 13 +++++- fuel-p2p/src/orchestrator.rs | 74 ++++++++++++++++++++------------- 2 files changed, 55 insertions(+), 32 deletions(-) diff --git a/fuel-core-interfaces/src/p2p.rs b/fuel-core-interfaces/src/p2p.rs index d6ed1f57ebc..eaa0a8c8e9d 100644 --- a/fuel-core-interfaces/src/p2p.rs +++ b/fuel-core-interfaces/src/p2p.rs @@ -12,7 +12,10 @@ use serde::{ Deserialize, Serialize, }; -use std::sync::Arc; +use std::{ + fmt::Debug, + sync::Arc, +}; use tokio::sync::oneshot; #[derive(Debug, PartialEq, Eq, Clone)] @@ -63,12 +66,18 @@ pub enum P2pRequestEvent { vote: Arc, }, GossipsubMessageReport { - gossip_id: GossipsubMessageId, + message: Box, acceptance: GossipsubMessageAcceptance, }, Stop, } +pub trait NetworkData: Debug + Send { + fn take_data(&mut self) -> Option>; + fn message_id(&self) -> Vec; + fn peer_id(&self) -> Vec; +} + #[async_trait] pub trait P2pDb: Send + Sync { async fn get_sealed_block(&self, height: BlockHeight) diff --git a/fuel-p2p/src/orchestrator.rs b/fuel-p2p/src/orchestrator.rs index 4286742683e..a5f879f26d8 100644 --- a/fuel-p2p/src/orchestrator.rs +++ b/fuel-p2p/src/orchestrator.rs @@ -1,5 +1,9 @@ use std::{ collections::HashMap, + fmt::{ + Debug, + Formatter, + }, sync::Arc, }; @@ -10,6 +14,7 @@ use fuel_core_interfaces::p2p::{ ConsensusBroadcast, GossipsubMessageAcceptance, GossipsubMessageId, + NetworkData, P2pDb, P2pRequestEvent, TransactionBroadcast, @@ -74,19 +79,30 @@ pub struct NetworkOrchestrator { tx_transaction: broadcast::Sender, tx_block: Sender, tx_outbound_responses: Sender>, - db: Arc, - - message_cache: HashMap, } #[derive(Debug, Clone)] -pub struct GossipData { - pub data: T, +pub struct GossipData { + pub data: Option>, pub peer_id: PeerId, pub message_id: MessageId, } +impl NetworkData for GossipData { + fn take_data(&mut self) -> Option> { + self.data.take() + } + + fn message_id(&self) -> Vec { + self.message_id.0.clone() + } + + fn peer_id(&self) -> Vec { + self.peer_id.to_bytes() + } +} + impl NetworkOrchestrator { pub fn new( p2p_config: P2PConfig, @@ -110,7 +126,6 @@ impl NetworkOrchestrator { tx_transaction, tx_outbound_responses, db, - message_cache: Default::default(), } } @@ -133,13 +148,13 @@ impl NetworkOrchestrator { match message { GossipsubMessage::NewTx(tx) => { - let _ = self.tx_transaction.send(GossipData { data: TransactionBroadcast::NewTransaction(tx), message_id, peer_id}); + let _ = self.tx_transaction.send(GossipData { data: Some(TransactionBroadcast::NewTransaction(tx)), message_id, peer_id}); }, GossipsubMessage::NewBlock(block) => { - let _ = self.tx_block.send(GossipData { data: BlockBroadcast::NewBlock(block), message_id, peer_id }); + let _ = self.tx_block.send(GossipData { data: Some(BlockBroadcast::NewBlock(block)), message_id, peer_id }); }, GossipsubMessage::ConsensusVote(vote) => { - let _ = self.tx_consensus.send(GossipData {data: ConsensusBroadcast::NewVote(vote), message_id, peer_id } ); + let _ = self.tx_consensus.send(GossipData {data: Some(ConsensusBroadcast::NewVote(vote)), message_id, peer_id } ); }, } }, @@ -179,29 +194,28 @@ impl NetworkOrchestrator { let broadcast = GossipsubBroadcastRequest::ConsensusVote(vote); let _ = p2p_service.publish_message(broadcast); }, - P2pRequestEvent::GossipsubMessageReport { gossip_id, acceptance } => { - if let Some((msg_id, peer_id)) = self.message_cache.remove(&gossip_id) { - - let acceptance = match acceptance { - GossipsubMessageAcceptance::Accept => MessageAcceptance::Accept, - GossipsubMessageAcceptance::Reject => MessageAcceptance::Reject, - GossipsubMessageAcceptance::Ignore => MessageAcceptance::Ignore - }; - - match p2p_service.report_message_validation_result(&msg_id, &peer_id, acceptance) { - Ok(true) => { - info!(target: "fuel-libp2p", "Sent a report for MessageId: {} from PeerId: {}", msg_id, peer_id); - } - Ok(false) => { - warn!(target: "fuel-libp2p", "Message with MessageId: {} not found in the Gossipsub Message Cache", msg_id); - } - Err(e) => { - warn!(target: "fuel-libp2p", "Failed to publish Message with MessageId: {} with Error: {:?}", msg_id, e); - } + P2pRequestEvent::GossipsubMessageReport { message, acceptance } => { + let msg_id = message.message_id().into(); + let peer_id = message.peer_id().try_into().unwrap(); + + let acceptance = match acceptance { + GossipsubMessageAcceptance::Accept => MessageAcceptance::Accept, + GossipsubMessageAcceptance::Reject => MessageAcceptance::Reject, + GossipsubMessageAcceptance::Ignore => MessageAcceptance::Ignore + }; + + match p2p_service.report_message_validation_result(&msg_id, &peer_id, acceptance) { + Ok(true) => { + info!(target: "fuel-libp2p", "Sent a report for MessageId: {} from PeerId: {}", msg_id, peer_id); + } + Ok(false) => { + warn!(target: "fuel-libp2p", "Message with MessageId: {} not found in the Gossipsub Message Cache", msg_id); + } + Err(e) => { + warn!(target: "fuel-libp2p", "Failed to publish Message with MessageId: {} with Error: {:?}", msg_id, e); } - } else { - warn!(target: "fuel-libp2p", "Message with GossipMessageId: {:?} not found in the Network Orchestrator Message Cache", gossip_id); } + } P2pRequestEvent::Stop => break, } From 7e691fb899438463859be82ae3ea894ddec6f2be Mon Sep 17 00:00:00 2001 From: leviathanbeak88 Date: Tue, 11 Oct 2022 19:24:21 +0200 Subject: [PATCH 05/22] quick break --- fuel-core-interfaces/src/p2p.rs | 6 +++--- fuel-p2p/src/orchestrator.rs | 24 +++++++++++++++++------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/fuel-core-interfaces/src/p2p.rs b/fuel-core-interfaces/src/p2p.rs index eaa0a8c8e9d..9008831a619 100644 --- a/fuel-core-interfaces/src/p2p.rs +++ b/fuel-core-interfaces/src/p2p.rs @@ -66,14 +66,14 @@ pub enum P2pRequestEvent { vote: Arc, }, GossipsubMessageReport { - message: Box, + message: Box>, acceptance: GossipsubMessageAcceptance, }, Stop, } -pub trait NetworkData: Debug + Send { - fn take_data(&mut self) -> Option>; +pub trait NetworkData: Debug + Send { + fn take_data(&mut self) -> Option; fn message_id(&self) -> Vec; fn peer_id(&self) -> Vec; } diff --git a/fuel-p2p/src/orchestrator.rs b/fuel-p2p/src/orchestrator.rs index a5f879f26d8..f4baf3943d9 100644 --- a/fuel-p2p/src/orchestrator.rs +++ b/fuel-p2p/src/orchestrator.rs @@ -83,14 +83,24 @@ pub struct NetworkOrchestrator { } #[derive(Debug, Clone)] -pub struct GossipData { - pub data: Option>, +pub struct GossipData { + pub data: Option, pub peer_id: PeerId, pub message_id: MessageId, } -impl NetworkData for GossipData { - fn take_data(&mut self) -> Option> { +impl GossipData { + pub fn new(value: T, peer_id: PeerId, message_id: MessageId) -> Self { + Self { + data: Some(value), + peer_id, + message_id, + } + } +} + +impl NetworkData for GossipData { + fn take_data(&mut self) -> Option { self.data.take() } @@ -148,13 +158,13 @@ impl NetworkOrchestrator { match message { GossipsubMessage::NewTx(tx) => { - let _ = self.tx_transaction.send(GossipData { data: Some(TransactionBroadcast::NewTransaction(tx)), message_id, peer_id}); + let _ = self.tx_transaction.send(GossipData::new(TransactionBroadcast::NewTransaction(tx), peer_id, message_id) ); }, GossipsubMessage::NewBlock(block) => { - let _ = self.tx_block.send(GossipData { data: Some(BlockBroadcast::NewBlock(block)), message_id, peer_id }); + let _ = self.tx_block.send(GossipData::new(BlockBroadcast::NewBlock(block), peer_id, message_id)); }, GossipsubMessage::ConsensusVote(vote) => { - let _ = self.tx_consensus.send(GossipData {data: Some(ConsensusBroadcast::NewVote(vote)), message_id, peer_id } ); + let _ = self.tx_consensus.send(GossipData::new(ConsensusBroadcast::NewVote(vote), peer_id, message_id)); }, } }, From b676c5b7aa24e162edea35bccda4ff6c9e66d7f5 Mon Sep 17 00:00:00 2001 From: Brandon Kite Date: Tue, 11 Oct 2022 10:44:34 -0700 Subject: [PATCH 06/22] fix sized issue --- fuel-core-interfaces/src/p2p.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fuel-core-interfaces/src/p2p.rs b/fuel-core-interfaces/src/p2p.rs index 9008831a619..7db876551eb 100644 --- a/fuel-core-interfaces/src/p2p.rs +++ b/fuel-core-interfaces/src/p2p.rs @@ -66,7 +66,7 @@ pub enum P2pRequestEvent { vote: Arc, }, GossipsubMessageReport { - message: Box>, + message: Box>>, acceptance: GossipsubMessageAcceptance, }, Stop, From 7acd4cc626ecaf263255438ed6189f5dbcc02584 Mon Sep 17 00:00:00 2001 From: leviathanbeak88 Date: Wed, 12 Oct 2022 17:06:32 +0200 Subject: [PATCH 07/22] finalize gossipsub validation --- fuel-core-interfaces/src/p2p.rs | 45 ++++++++-- fuel-p2p/src/behavior.rs | 2 +- fuel-p2p/src/config.rs | 7 +- fuel-p2p/src/gossipsub/builder.rs | 34 +++---- fuel-p2p/src/orchestrator.rs | 144 ++++++++++++------------------ 5 files changed, 113 insertions(+), 119 deletions(-) diff --git a/fuel-core-interfaces/src/p2p.rs b/fuel-core-interfaces/src/p2p.rs index 7db876551eb..72a79cf2ad9 100644 --- a/fuel-core-interfaces/src/p2p.rs +++ b/fuel-core-interfaces/src/p2p.rs @@ -42,11 +42,17 @@ pub enum GossipsubMessageAcceptance { } #[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)] -pub struct GossipsubMessageId(pub Vec); +pub struct GossipsubMessageInfo { + pub message_id: Vec, + pub peer_id: Vec, +} -impl From> for GossipsubMessageId { - fn from(message_id: Vec) -> Self { - GossipsubMessageId(message_id) +impl From<&GossipData> for GossipsubMessageInfo { + fn from(gossip_data: &GossipData) -> Self { + Self { + message_id: gossip_data.message_id.clone(), + peer_id: gossip_data.peer_id.clone(), + } } } @@ -66,16 +72,41 @@ pub enum P2pRequestEvent { vote: Arc, }, GossipsubMessageReport { - message: Box>>, + message: GossipsubMessageInfo, acceptance: GossipsubMessageAcceptance, }, Stop, } +#[derive(Debug, Clone)] +pub struct GossipData { + pub data: Option, + pub peer_id: Vec, + pub message_id: Vec, +} + +impl GossipData { + pub fn new( + data: T, + peer_id: impl Into>, + message_id: impl Into>, + ) -> Self { + Self { + data: Some(data), + peer_id: peer_id.into(), + message_id: message_id.into(), + } + } +} + pub trait NetworkData: Debug + Send { fn take_data(&mut self) -> Option; - fn message_id(&self) -> Vec; - fn peer_id(&self) -> Vec; +} + +impl NetworkData for GossipData { + fn take_data(&mut self) -> Option { + self.data.take() + } } #[async_trait] diff --git a/fuel-p2p/src/behavior.rs b/fuel-p2p/src/behavior.rs index a345b07a375..add84e62781 100644 --- a/fuel-p2p/src/behavior.rs +++ b/fuel-p2p/src/behavior.rs @@ -131,7 +131,7 @@ impl FuelBehaviour { Self { discovery: discovery_config.finish(), - gossipsub: build_gossipsub(&p2p_config.local_keypair, p2p_config.clone()), + gossipsub: build_gossipsub(p2p_config), peer_info, request_response, } diff --git a/fuel-p2p/src/config.rs b/fuel-p2p/src/config.rs index 9f00b2eb8dd..c238a5e7d31 100644 --- a/fuel-p2p/src/config.rs +++ b/fuel-p2p/src/config.rs @@ -28,7 +28,6 @@ use std::{ IpAddr, Ipv4Addr, }, - sync::Arc, time::Duration, }; @@ -46,7 +45,7 @@ const MAX_NUM_OF_FRAMES_BUFFERED: usize = 256; /// inbound and outbound connections established through the transport. const TRANSPORT_TIMEOUT: Duration = Duration::from_secs(20); -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct P2PConfig { pub local_keypair: Keypair, @@ -83,8 +82,6 @@ pub struct P2PConfig { pub ideal_mesh_size: usize, pub min_mesh_size: usize, pub max_mesh_size: usize, - pub message_id_fn: - Option MessageId + Send + Sync + 'static>>, // RequestResponse related fields /// Sets the timeout for inbound and outbound requests. @@ -127,8 +124,6 @@ impl P2PConfig { max_mesh_size: 12, min_mesh_size: 4, ideal_mesh_size: 6, - // default value set in gossipsub builder - message_id_fn: None, set_request_timeout: REQ_RES_TIMEOUT, set_connection_keep_alive: REQ_RES_TIMEOUT, info_interval: Some(Duration::from_secs(3)), diff --git a/fuel-p2p/src/gossipsub/builder.rs b/fuel-p2p/src/gossipsub/builder.rs index 6f4a1e625dc..e06a22f03f5 100644 --- a/fuel-p2p/src/gossipsub/builder.rs +++ b/fuel-p2p/src/gossipsub/builder.rs @@ -1,16 +1,13 @@ -use libp2p::{ - gossipsub::{ - FastMessageId, - Gossipsub, - GossipsubConfigBuilder, - GossipsubMessage, - MessageAuthenticity, - MessageId, - PeerScoreParams, - PeerScoreThresholds, - RawGossipsubMessage, - }, - identity::Keypair, +use libp2p::gossipsub::{ + FastMessageId, + Gossipsub, + GossipsubConfigBuilder, + GossipsubMessage, + MessageAuthenticity, + MessageId, + PeerScoreParams, + PeerScoreThresholds, + RawGossipsubMessage, }; use sha2::{ Digest, @@ -19,16 +16,13 @@ use sha2::{ use crate::config::P2PConfig; -pub fn build_gossipsub(local_key: &Keypair, p2p_config: P2PConfig) -> Gossipsub { +pub fn build_gossipsub(p2p_config: &P2PConfig) -> Gossipsub { let gossip_message_id = move |message: &GossipsubMessage| { - p2p_config - .message_id_fn - .clone() - .map(|f| f(message)) - .unwrap_or(MessageId::from(&Sha256::digest(&message.data)[..20])) + MessageId::from(&Sha256::digest(&message.data)[..20]) }; let fast_gossip_message_id = move |message: &RawGossipsubMessage| { + // todo: cheaper hash func? FastMessageId::from(&Sha256::digest(&message.data)[..8]) }; @@ -44,7 +38,7 @@ pub fn build_gossipsub(local_key: &Keypair, p2p_config: P2PConfig) -> Gossipsub .expect("valid gossipsub configuration"); let mut gossipsub = Gossipsub::new( - MessageAuthenticity::Signed(local_key.clone()), + MessageAuthenticity::Signed(p2p_config.local_keypair.clone()), gossipsub_config, ) .expect("gossipsub initialized"); diff --git a/fuel-p2p/src/orchestrator.rs b/fuel-p2p/src/orchestrator.rs index f4baf3943d9..d7d5b33c9b1 100644 --- a/fuel-p2p/src/orchestrator.rs +++ b/fuel-p2p/src/orchestrator.rs @@ -1,31 +1,20 @@ -use std::{ - collections::HashMap, - fmt::{ - Debug, - Formatter, - }, - sync::Arc, -}; +use std::sync::Arc; use anyhow::anyhow; use fuel_core_interfaces::p2p::{ BlockBroadcast, ConsensusBroadcast, + GossipData, GossipsubMessageAcceptance, - GossipsubMessageId, - NetworkData, + GossipsubMessageInfo, P2pDb, P2pRequestEvent, TransactionBroadcast, }; use libp2p::{ - gossipsub::{ - MessageAcceptance, - MessageId, - }, + gossipsub::MessageAcceptance, request_response::RequestId, - PeerId, }; use tokio::{ sync::{ @@ -44,7 +33,10 @@ use tracing::{ }; use crate::{ - codecs::bincode::BincodeCodec, + codecs::{ + bincode::BincodeCodec, + NetworkCodec, + }, config::P2PConfig, gossipsub::messages::{ GossipsubBroadcastRequest, @@ -61,11 +53,9 @@ use crate::{ }, }; -type ConsensusWithMsgId = GossipData; -type TransactionWithMsgId = GossipData; -type BlockWithMsgId = GossipData; - -type MessageIdWithPeer = (MessageId, PeerId); +type ConsensusGossipData = GossipData; +type TransactionGossipData = GossipData; +type BlockGossipData = GossipData; pub struct NetworkOrchestrator { p2p_config: P2PConfig, @@ -76,51 +66,20 @@ pub struct NetworkOrchestrator { // senders tx_consensus: Sender>, - tx_transaction: broadcast::Sender, - tx_block: Sender, + tx_transaction: broadcast::Sender, + tx_block: Sender, tx_outbound_responses: Sender>, db: Arc, } -#[derive(Debug, Clone)] -pub struct GossipData { - pub data: Option, - pub peer_id: PeerId, - pub message_id: MessageId, -} - -impl GossipData { - pub fn new(value: T, peer_id: PeerId, message_id: MessageId) -> Self { - Self { - data: Some(value), - peer_id, - message_id, - } - } -} - -impl NetworkData for GossipData { - fn take_data(&mut self) -> Option { - self.data.take() - } - - fn message_id(&self) -> Vec { - self.message_id.0.clone() - } - - fn peer_id(&self) -> Vec { - self.peer_id.to_bytes() - } -} - impl NetworkOrchestrator { pub fn new( p2p_config: P2PConfig, rx_request_event: Receiver, - tx_consensus: Sender, - tx_transaction: broadcast::Sender, - tx_block: Sender, + tx_consensus: Sender, + tx_transaction: broadcast::Sender, + tx_block: Sender, db: Arc, ) -> Self { @@ -155,16 +114,15 @@ impl NetworkOrchestrator { p2p_event = p2p_service.next_event() => { match p2p_event { Some(FuelP2PEvent::GossipsubMessage { message, message_id, peer_id,.. }) => { - match message { GossipsubMessage::NewTx(tx) => { - let _ = self.tx_transaction.send(GossipData::new(TransactionBroadcast::NewTransaction(tx), peer_id, message_id) ); + let _ = self.tx_transaction.send(GossipData::new(TransactionBroadcast::NewTransaction(tx), peer_id, message_id.0) ); }, GossipsubMessage::NewBlock(block) => { - let _ = self.tx_block.send(GossipData::new(BlockBroadcast::NewBlock(block), peer_id, message_id)); + let _ = self.tx_block.send(GossipData::new(BlockBroadcast::NewBlock(block), peer_id, message_id.0)); }, GossipsubMessage::ConsensusVote(vote) => { - let _ = self.tx_consensus.send(GossipData::new(ConsensusBroadcast::NewVote(vote), peer_id, message_id)); + let _ = self.tx_consensus.send(GossipData::new(ConsensusBroadcast::NewVote(vote), peer_id, message_id.0)); }, } }, @@ -205,27 +163,7 @@ impl NetworkOrchestrator { let _ = p2p_service.publish_message(broadcast); }, P2pRequestEvent::GossipsubMessageReport { message, acceptance } => { - let msg_id = message.message_id().into(); - let peer_id = message.peer_id().try_into().unwrap(); - - let acceptance = match acceptance { - GossipsubMessageAcceptance::Accept => MessageAcceptance::Accept, - GossipsubMessageAcceptance::Reject => MessageAcceptance::Reject, - GossipsubMessageAcceptance::Ignore => MessageAcceptance::Ignore - }; - - match p2p_service.report_message_validation_result(&msg_id, &peer_id, acceptance) { - Ok(true) => { - info!(target: "fuel-libp2p", "Sent a report for MessageId: {} from PeerId: {}", msg_id, peer_id); - } - Ok(false) => { - warn!(target: "fuel-libp2p", "Message with MessageId: {} not found in the Gossipsub Message Cache", msg_id); - } - Err(e) => { - warn!(target: "fuel-libp2p", "Failed to publish Message with MessageId: {} with Error: {:?}", msg_id, e); - } - } - + report_message_validation_result(message, acceptance, &mut p2p_service); } P2pRequestEvent::Stop => break, } @@ -240,6 +178,42 @@ impl NetworkOrchestrator { } } +fn report_message_validation_result( + message: GossipsubMessageInfo, + acceptance: GossipsubMessageAcceptance, + p2p_service: &mut FuelP2PService, +) { + let GossipsubMessageInfo { + peer_id, + message_id, + } = message; + + let msg_id = message_id.into(); + + if let Ok(peer_id) = peer_id.try_into() { + let acceptance = match acceptance { + GossipsubMessageAcceptance::Accept => MessageAcceptance::Accept, + GossipsubMessageAcceptance::Reject => MessageAcceptance::Reject, + GossipsubMessageAcceptance::Ignore => MessageAcceptance::Ignore, + }; + + match p2p_service.report_message_validation_result(&msg_id, &peer_id, acceptance) + { + Ok(true) => { + info!(target: "fuel-libp2p", "Sent a report for MessageId: {} from PeerId: {}", msg_id, peer_id); + } + Ok(false) => { + warn!(target: "fuel-libp2p", "Message with MessageId: {} not found in the Gossipsub Message Cache", msg_id); + } + Err(e) => { + warn!(target: "fuel-libp2p", "Failed to publish Message with MessageId: {} with Error: {:?}", msg_id, e); + } + } + } else { + warn!(target: "fuel-libp2p", "Failed to read PeerId from received GossipsubMessageId: {}", msg_id); + } +} + pub struct Service { /// Network Orchestrator that handles p2p network and inter-module communication network_orchestrator: Arc>>, @@ -255,9 +229,9 @@ impl Service { db: Arc, tx_request_event: Sender, rx_request_event: Receiver, - tx_consensus: Sender, - tx_transaction: broadcast::Sender, - tx_block: Sender, + tx_consensus: Sender, + tx_transaction: broadcast::Sender, + tx_block: Sender, ) -> Self { let network_orchestrator = NetworkOrchestrator::new( p2p_config, From a40daf56fca8687907a6ea9e7994971232bd07f0 Mon Sep 17 00:00:00 2001 From: leviathanbeak88 Date: Wed, 12 Oct 2022 17:09:30 +0200 Subject: [PATCH 08/22] remove unused --- fuel-p2p/src/config.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/fuel-p2p/src/config.rs b/fuel-p2p/src/config.rs index c238a5e7d31..4bace0e1fb1 100644 --- a/fuel-p2p/src/config.rs +++ b/fuel-p2p/src/config.rs @@ -31,11 +31,6 @@ use std::{ time::Duration, }; -pub use libp2p::gossipsub::{ - GossipsubMessage, - MessageId, -}; - const REQ_RES_TIMEOUT: Duration = Duration::from_secs(20); /// Maximum number of frames buffered per substream. From 68e51ee0f2711ada748b95706e0b5992a9600838 Mon Sep 17 00:00:00 2001 From: leviathanbeak88 Date: Wed, 12 Oct 2022 17:19:56 +0200 Subject: [PATCH 09/22] update fast_gossip_message_id --- fuel-p2p/src/gossipsub/builder.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fuel-p2p/src/gossipsub/builder.rs b/fuel-p2p/src/gossipsub/builder.rs index e06a22f03f5..ba9e9bb5f68 100644 --- a/fuel-p2p/src/gossipsub/builder.rs +++ b/fuel-p2p/src/gossipsub/builder.rs @@ -11,6 +11,7 @@ use libp2p::gossipsub::{ }; use sha2::{ Digest, + Sha224, Sha256, }; @@ -22,8 +23,7 @@ pub fn build_gossipsub(p2p_config: &P2PConfig) -> Gossipsub { }; let fast_gossip_message_id = move |message: &RawGossipsubMessage| { - // todo: cheaper hash func? - FastMessageId::from(&Sha256::digest(&message.data)[..8]) + FastMessageId::from(&Sha224::digest(&message.data)[..16]) }; let gossipsub_config = GossipsubConfigBuilder::default() From 7d02dc32235cbed1a68a52e14a720437f70c51d8 Mon Sep 17 00:00:00 2001 From: leviathanbeak88 Date: Wed, 12 Oct 2022 18:52:18 +0200 Subject: [PATCH 10/22] check MessageId calculation --- fuel-p2p/src/service.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/fuel-p2p/src/service.rs b/fuel-p2p/src/service.rs index a797b1b916f..dcbab78559b 100644 --- a/fuel-p2p/src/service.rs +++ b/fuel-p2p/src/service.rs @@ -446,7 +446,10 @@ mod tests { }; use futures::StreamExt; use libp2p::{ - gossipsub::Topic, + gossipsub::{ + error::PublishError, + Topic, + }, identity::Keypair, swarm::SwarmEvent, Multiaddr, @@ -758,6 +761,12 @@ mod tests { } } + // Node B received the correct message + // If we try to publish it again we will get `PublishError::Duplicate` + // This asserts that our MessageId calculation is consistant irrespective of which Peer sends it + let broadcast_request = broadcast_request.clone(); + matches!(node_b.publish_message(broadcast_request), Err(PublishError::Duplicate)); + break } From d2456c80bf974a59a868d06865126b610baff6f7 Mon Sep 17 00:00:00 2001 From: leviathanbeak88 Date: Wed, 12 Oct 2022 19:21:59 +0200 Subject: [PATCH 11/22] cleanup network orchestrator --- fuel-p2p/src/orchestrator.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/fuel-p2p/src/orchestrator.rs b/fuel-p2p/src/orchestrator.rs index d7d5b33c9b1..5aaeb8165a1 100644 --- a/fuel-p2p/src/orchestrator.rs +++ b/fuel-p2p/src/orchestrator.rs @@ -114,15 +114,17 @@ impl NetworkOrchestrator { p2p_event = p2p_service.next_event() => { match p2p_event { Some(FuelP2PEvent::GossipsubMessage { message, message_id, peer_id,.. }) => { + let message_id = message_id.0; + match message { GossipsubMessage::NewTx(tx) => { - let _ = self.tx_transaction.send(GossipData::new(TransactionBroadcast::NewTransaction(tx), peer_id, message_id.0) ); + let _ = self.tx_transaction.send(GossipData::new(TransactionBroadcast::NewTransaction(tx), peer_id, message_id)); }, GossipsubMessage::NewBlock(block) => { - let _ = self.tx_block.send(GossipData::new(BlockBroadcast::NewBlock(block), peer_id, message_id.0)); + let _ = self.tx_block.send(GossipData::new(BlockBroadcast::NewBlock(block), peer_id, message_id)); }, GossipsubMessage::ConsensusVote(vote) => { - let _ = self.tx_consensus.send(GossipData::new(ConsensusBroadcast::NewVote(vote), peer_id, message_id.0)); + let _ = self.tx_consensus.send(GossipData::new(ConsensusBroadcast::NewVote(vote), peer_id, message_id)); }, } }, @@ -163,7 +165,7 @@ impl NetworkOrchestrator { let _ = p2p_service.publish_message(broadcast); }, P2pRequestEvent::GossipsubMessageReport { message, acceptance } => { - report_message_validation_result(message, acceptance, &mut p2p_service); + report_message(message, acceptance, &mut p2p_service); } P2pRequestEvent::Stop => break, } @@ -178,7 +180,7 @@ impl NetworkOrchestrator { } } -fn report_message_validation_result( +fn report_message( message: GossipsubMessageInfo, acceptance: GossipsubMessageAcceptance, p2p_service: &mut FuelP2PService, From 65eb0968fca104221520bc8e6b2b60bef111b987 Mon Sep 17 00:00:00 2001 From: leviathanbeak88 Date: Wed, 12 Oct 2022 20:08:20 +0200 Subject: [PATCH 12/22] move types to p2p for reuse --- fuel-core-interfaces/src/p2p.rs | 4 ++++ fuel-p2p/src/orchestrator.rs | 7 +++---- fuel-sync/src/service.rs | 4 ++-- fuel-txpool/src/service.rs | 16 ++++++++-------- 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/fuel-core-interfaces/src/p2p.rs b/fuel-core-interfaces/src/p2p.rs index 72a79cf2ad9..bd1170a3728 100644 --- a/fuel-core-interfaces/src/p2p.rs +++ b/fuel-core-interfaces/src/p2p.rs @@ -85,6 +85,10 @@ pub struct GossipData { pub message_id: Vec, } +pub type ConsensusGossipData = GossipData; +pub type TransactionGossipData = GossipData; +pub type BlockGossipData = GossipData; + impl GossipData { pub fn new( data: T, diff --git a/fuel-p2p/src/orchestrator.rs b/fuel-p2p/src/orchestrator.rs index 5aaeb8165a1..7c27bd022e7 100644 --- a/fuel-p2p/src/orchestrator.rs +++ b/fuel-p2p/src/orchestrator.rs @@ -4,13 +4,16 @@ use anyhow::anyhow; use fuel_core_interfaces::p2p::{ BlockBroadcast, + BlockGossipData, ConsensusBroadcast, + ConsensusGossipData, GossipData, GossipsubMessageAcceptance, GossipsubMessageInfo, P2pDb, P2pRequestEvent, TransactionBroadcast, + TransactionGossipData, }; use libp2p::{ gossipsub::MessageAcceptance, @@ -53,10 +56,6 @@ use crate::{ }, }; -type ConsensusGossipData = GossipData; -type TransactionGossipData = GossipData; -type BlockGossipData = GossipData; - pub struct NetworkOrchestrator { p2p_config: P2PConfig, diff --git a/fuel-sync/src/service.rs b/fuel-sync/src/service.rs index 774c3fd917a..562cb06a046 100644 --- a/fuel-sync/src/service.rs +++ b/fuel-sync/src/service.rs @@ -3,7 +3,7 @@ use fuel_core_interfaces::{ bft::BftMpsc, block_importer::ImportBlockMpsc, p2p::{ - BlockBroadcast, + BlockGossipData, P2pRequestEvent, }, sync::SyncMpsc, @@ -30,7 +30,7 @@ impl Service { pub async fn start( &self, - _p2p_block: mpsc::Receiver, + _p2p_block: mpsc::Receiver, _p2p_request: mpsc::Sender, _bft: mpsc::Sender, _block_importer: mpsc::Sender, diff --git a/fuel-txpool/src/service.rs b/fuel-txpool/src/service.rs index 0674e4e0ccc..6f225e2c695 100644 --- a/fuel-txpool/src/service.rs +++ b/fuel-txpool/src/service.rs @@ -6,8 +6,10 @@ use anyhow::anyhow; use fuel_core_interfaces::{ block_importer::ImportBlockBroadcast, p2p::{ + GossipData, P2pRequestEvent, TransactionBroadcast, + TransactionGossipData, }, txpool::{ self, @@ -35,7 +37,7 @@ pub struct ServiceBuilder { txpool_receiver: Option>, tx_status_sender: Option>, import_block_receiver: Option>, - incoming_tx_receiver: Option>, + incoming_tx_receiver: Option>, network_sender: Option>, } @@ -95,7 +97,7 @@ impl ServiceBuilder { pub fn incoming_tx_receiver( &mut self, - incoming_tx_receiver: broadcast::Receiver, + incoming_tx_receiver: broadcast::Receiver, ) -> &mut Self { self.incoming_tx_receiver = Some(incoming_tx_receiver); self @@ -157,7 +159,7 @@ pub struct Context { pub txpool_receiver: mpsc::Receiver, pub tx_status_sender: broadcast::Sender, pub import_block_receiver: broadcast::Receiver, - pub incoming_tx_receiver: broadcast::Receiver, + pub incoming_tx_receiver: broadcast::Receiver, pub network_sender: mpsc::Sender, } @@ -179,11 +181,9 @@ impl Context { tokio::spawn( async move { let txpool = txpool.as_ref(); - match new_transaction.unwrap() { - TransactionBroadcast::NewTransaction ( tx ) => { - let txs = vec!(Arc::new(tx)); - TxPool::insert(txpool, db.as_ref().as_ref(), tx_status_sender, txs).await - } + if let GossipData { data: Some(TransactionBroadcast::NewTransaction ( tx )), .. } = new_transaction.unwrap() { + let txs = vec!(Arc::new(tx)); + TxPool::insert(txpool, db.as_ref().as_ref(), tx_status_sender, txs).await; } }); } From dcd5d02de3b9d1e05a66acd914a2791b2d16560d Mon Sep 17 00:00:00 2001 From: leviathanbeak88 Date: Wed, 12 Oct 2022 20:13:16 +0200 Subject: [PATCH 13/22] add consensus gossip data instead --- fuel-p2p/src/orchestrator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fuel-p2p/src/orchestrator.rs b/fuel-p2p/src/orchestrator.rs index d79a40427ef..59d3d56868e 100644 --- a/fuel-p2p/src/orchestrator.rs +++ b/fuel-p2p/src/orchestrator.rs @@ -64,7 +64,7 @@ pub struct NetworkOrchestrator { rx_outbound_responses: Receiver>, // senders - tx_consensus: Sender>, + tx_consensus: Sender, tx_transaction: broadcast::Sender, tx_block: Sender, tx_outbound_responses: Sender>, From b1bf7a752a7f0114953dc9fccce188cc219ab26b Mon Sep 17 00:00:00 2001 From: leviathanbeak88 Date: Fri, 14 Oct 2022 10:41:42 +0200 Subject: [PATCH 14/22] add serde only feature --- fuel-core-interfaces/src/p2p.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fuel-core-interfaces/src/p2p.rs b/fuel-core-interfaces/src/p2p.rs index bd1170a3728..3edd006a83e 100644 --- a/fuel-core-interfaces/src/p2p.rs +++ b/fuel-core-interfaces/src/p2p.rs @@ -41,7 +41,8 @@ pub enum GossipsubMessageAcceptance { Ignore, } -#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)] +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct GossipsubMessageInfo { pub message_id: Vec, pub peer_id: Vec, From 50ce0de4aaae78946ea52de0d7faee9188c046bd Mon Sep 17 00:00:00 2001 From: leviathanbeak88 Date: Fri, 14 Oct 2022 10:44:56 +0200 Subject: [PATCH 15/22] update with suggestions --- fuel-core-interfaces/src/p2p.rs | 2 +- fuel-p2p/src/orchestrator.rs | 6 ++++-- fuel-p2p/src/service.rs | 6 +++--- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/fuel-core-interfaces/src/p2p.rs b/fuel-core-interfaces/src/p2p.rs index 3edd006a83e..f0fe26133f1 100644 --- a/fuel-core-interfaces/src/p2p.rs +++ b/fuel-core-interfaces/src/p2p.rs @@ -34,7 +34,7 @@ pub enum BlockBroadcast { NewBlock(FuelBlock), } -#[derive(Debug)] +#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash)] pub enum GossipsubMessageAcceptance { Accept, Reject, diff --git a/fuel-p2p/src/orchestrator.rs b/fuel-p2p/src/orchestrator.rs index 59d3d56868e..f530d1e8a11 100644 --- a/fuel-p2p/src/orchestrator.rs +++ b/fuel-p2p/src/orchestrator.rs @@ -31,6 +31,8 @@ use tokio::{ task::JoinHandle, }; use tracing::{ + debug, + error, info, warn, }; @@ -201,13 +203,13 @@ fn report_message( match p2p_service.report_message_validation_result(&msg_id, &peer_id, acceptance) { Ok(true) => { - info!(target: "fuel-libp2p", "Sent a report for MessageId: {} from PeerId: {}", msg_id, peer_id); + debug!(target: "fuel-libp2p", "Sent a report for MessageId: {} from PeerId: {}", msg_id, peer_id); } Ok(false) => { warn!(target: "fuel-libp2p", "Message with MessageId: {} not found in the Gossipsub Message Cache", msg_id); } Err(e) => { - warn!(target: "fuel-libp2p", "Failed to publish Message with MessageId: {} with Error: {:?}", msg_id, e); + error!(target: "fuel-libp2p", "Failed to publish Message with MessageId: {} with Error: {:?}", msg_id, e); } } } else { diff --git a/fuel-p2p/src/service.rs b/fuel-p2p/src/service.rs index 7df11e06779..614c177d956 100644 --- a/fuel-p2p/src/service.rs +++ b/fuel-p2p/src/service.rs @@ -261,10 +261,10 @@ impl FuelP2PService { pub async fn next_event(&mut self) -> Option { if let SwarmEvent::Behaviour(fuel_behaviour) = self.swarm.select_next_some().await { - return self.handle_behaviour_event(fuel_behaviour) + self.handle_behaviour_event(fuel_behaviour) + } else { + None } - - None } fn handle_behaviour_event( From c49f264b748b005b0ad34b3963c06448ac72b41c Mon Sep 17 00:00:00 2001 From: leviathanbeak88 Date: Fri, 14 Oct 2022 10:45:35 +0200 Subject: [PATCH 16/22] remove unused --- fuel-p2p/src/orchestrator.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/fuel-p2p/src/orchestrator.rs b/fuel-p2p/src/orchestrator.rs index f530d1e8a11..5efc035e481 100644 --- a/fuel-p2p/src/orchestrator.rs +++ b/fuel-p2p/src/orchestrator.rs @@ -33,7 +33,6 @@ use tokio::{ use tracing::{ debug, error, - info, warn, }; From 7ed6b7e7178cc408243b059d0915c86ecd8eb226 Mon Sep 17 00:00:00 2001 From: leviathanbeak88 Date: Fri, 14 Oct 2022 21:14:08 +0200 Subject: [PATCH 17/22] use gossipsub_config --- fuel-p2p/src/config.rs | 28 +++++++++++++++++----------- fuel-p2p/src/gossipsub.rs | 5 ++++- fuel-p2p/src/gossipsub/builder.rs | 15 ++++++++++----- 3 files changed, 31 insertions(+), 17 deletions(-) diff --git a/fuel-p2p/src/config.rs b/fuel-p2p/src/config.rs index 4bace0e1fb1..d8377a19e08 100644 --- a/fuel-p2p/src/config.rs +++ b/fuel-p2p/src/config.rs @@ -1,8 +1,12 @@ -use crate::gossipsub::topics::{ - CON_VOTE_GOSSIP_TOPIC, - NEW_BLOCK_GOSSIP_TOPIC, - NEW_TX_GOSSIP_TOPIC, +use crate::gossipsub::{ + default_gossipsub_config, + topics::{ + CON_VOTE_GOSSIP_TOPIC, + NEW_BLOCK_GOSSIP_TOPIC, + NEW_TX_GOSSIP_TOPIC, + }, }; + use libp2p::{ core::{ muxing::StreamMuxerBox, @@ -23,6 +27,7 @@ use libp2p::{ PeerId, Transport, }; + use std::{ net::{ IpAddr, @@ -31,6 +36,11 @@ use std::{ time::Duration, }; +pub use libp2p::gossipsub::{ + GossipsubConfig, + GossipsubConfigBuilder, +}; + const REQ_RES_TIMEOUT: Duration = Duration::from_secs(20); /// Maximum number of frames buffered per substream. @@ -72,11 +82,9 @@ pub struct P2PConfig { /// and the next outbound ping pub info_interval: Option, - // `Gossipsub` related fields + // `Gossipsub` config and topics + pub gossipsub_config: GossipsubConfig, pub topics: Vec, - pub ideal_mesh_size: usize, - pub min_mesh_size: usize, - pub max_mesh_size: usize, // RequestResponse related fields /// Sets the timeout for inbound and outbound requests. @@ -116,9 +124,7 @@ impl P2PConfig { NEW_BLOCK_GOSSIP_TOPIC.into(), CON_VOTE_GOSSIP_TOPIC.into(), ], - max_mesh_size: 12, - min_mesh_size: 4, - ideal_mesh_size: 6, + gossipsub_config: default_gossipsub_config(), set_request_timeout: REQ_RES_TIMEOUT, set_connection_keep_alive: REQ_RES_TIMEOUT, info_interval: Some(Duration::from_secs(3)), diff --git a/fuel-p2p/src/gossipsub.rs b/fuel-p2p/src/gossipsub.rs index dd485e67b12..2b3e7425e85 100644 --- a/fuel-p2p/src/gossipsub.rs +++ b/fuel-p2p/src/gossipsub.rs @@ -2,4 +2,7 @@ mod builder; pub mod messages; pub mod topics; -pub use builder::build_gossipsub; +pub use builder::{ + build_gossipsub, + default_gossipsub_config, +}; diff --git a/fuel-p2p/src/gossipsub/builder.rs b/fuel-p2p/src/gossipsub/builder.rs index ba9e9bb5f68..aaccbe52ea1 100644 --- a/fuel-p2p/src/gossipsub/builder.rs +++ b/fuel-p2p/src/gossipsub/builder.rs @@ -1,6 +1,7 @@ use libp2p::gossipsub::{ FastMessageId, Gossipsub, + GossipsubConfig, GossipsubConfigBuilder, GossipsubMessage, MessageAuthenticity, @@ -17,7 +18,7 @@ use sha2::{ use crate::config::P2PConfig; -pub fn build_gossipsub(p2p_config: &P2PConfig) -> Gossipsub { +pub fn default_gossipsub_config() -> GossipsubConfig { let gossip_message_id = move |message: &GossipsubMessage| { MessageId::from(&Sha256::digest(&message.data)[..20]) }; @@ -28,18 +29,22 @@ pub fn build_gossipsub(p2p_config: &P2PConfig) -> Gossipsub { let gossipsub_config = GossipsubConfigBuilder::default() .protocol_id_prefix("/meshsub/1.0.0") - .mesh_n(p2p_config.ideal_mesh_size) - .mesh_n_low(p2p_config.min_mesh_size) - .mesh_n_high(p2p_config.max_mesh_size) + .mesh_n(6) + .mesh_n_low(4) + .mesh_n_high(12) .message_id_fn(gossip_message_id) .fast_message_id_fn(fast_gossip_message_id) .validate_messages() .build() .expect("valid gossipsub configuration"); + gossipsub_config +} + +pub fn build_gossipsub(p2p_config: &P2PConfig) -> Gossipsub { let mut gossipsub = Gossipsub::new( MessageAuthenticity::Signed(p2p_config.local_keypair.clone()), - gossipsub_config, + p2p_config.gossipsub_config.clone(), ) .expect("gossipsub initialized"); From 6e6193fbc101b78f53b168b2fa1f7de19f09161f Mon Sep 17 00:00:00 2001 From: leviathanbeak88 Date: Fri, 14 Oct 2022 21:14:23 +0200 Subject: [PATCH 18/22] hash complete messages --- fuel-p2p/src/gossipsub/builder.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/fuel-p2p/src/gossipsub/builder.rs b/fuel-p2p/src/gossipsub/builder.rs index aaccbe52ea1..0f427292416 100644 --- a/fuel-p2p/src/gossipsub/builder.rs +++ b/fuel-p2p/src/gossipsub/builder.rs @@ -12,7 +12,6 @@ use libp2p::gossipsub::{ }; use sha2::{ Digest, - Sha224, Sha256, }; @@ -20,11 +19,11 @@ use crate::config::P2PConfig; pub fn default_gossipsub_config() -> GossipsubConfig { let gossip_message_id = move |message: &GossipsubMessage| { - MessageId::from(&Sha256::digest(&message.data)[..20]) + MessageId::from(&Sha256::digest(&message.data)[..]) }; let fast_gossip_message_id = move |message: &RawGossipsubMessage| { - FastMessageId::from(&Sha224::digest(&message.data)[..16]) + FastMessageId::from(&Sha256::digest(&message.data)[..]) }; let gossipsub_config = GossipsubConfigBuilder::default() From 6beaec821e2496624f8f47be484dc7cf95b11c2a Mon Sep 17 00:00:00 2001 From: leviathanbeak88 Date: Mon, 17 Oct 2022 18:03:41 +0200 Subject: [PATCH 19/22] use gossipsub config and its builder --- fuel-core/src/cli/run/p2p.rs | 32 +++++++++++++++++-- fuel-p2p/src/behavior.rs | 4 +-- fuel-p2p/src/config.rs | 7 ++-- fuel-p2p/src/gossipsub.rs | 7 +--- .../src/gossipsub/{builder.rs => config.rs} | 28 ++++++++++------ fuel-p2p/src/lib.rs | 2 ++ 6 files changed, 55 insertions(+), 25 deletions(-) rename fuel-p2p/src/gossipsub/{builder.rs => config.rs} (66%) diff --git a/fuel-core/src/cli/run/p2p.rs b/fuel-core/src/cli/run/p2p.rs index 05a4f5a2584..c9a741d38d7 100644 --- a/fuel-core/src/cli/run/p2p.rs +++ b/fuel-core/src/cli/run/p2p.rs @@ -12,6 +12,7 @@ use clap::Args; use fuel_core_interfaces::common::fuel_crypto::SecretKey; use fuel_p2p::{ config::P2PConfig, + gossipsub_config::default_gossipsub_builder, Multiaddr, }; @@ -89,6 +90,22 @@ pub struct P2pArgs { #[clap(long = "ideal_mesh_size", default_value = "6")] pub ideal_mesh_size: usize, + /// Number of heartbeats to keep in the gossipsub `memcache` + #[clap(long = "history_length", default_value = "5")] + pub history_length: usize, + + /// Number of past heartbeats to gossip about + #[clap(long = "history_gossip", default_value = "3")] + pub history_gossip: usize, + + /// Time between each heartbeat + #[clap(long = "heartbeat_interval", default_value = "1")] + pub heartbeat_interval: u64, + + /// The maximum byte size for each gossip + #[clap(long = "max_transmit_size", default_value = "2048")] + pub max_transmit_size: usize, + /// Choose timeout for sent requests in RequestResponse protocol #[clap(long = "request_timeout", default_value = "20")] pub request_timeout: u64, @@ -122,6 +139,17 @@ impl From for anyhow::Result { } }; + let gossipsub_config = default_gossipsub_builder() + .mesh_n(args.ideal_mesh_size) + .mesh_n_low(args.min_mesh_size) + .mesh_n_high(args.max_mesh_size) + .history_length(args.history_length) + .history_gossip(args.history_gossip) + .heartbeat_interval(Duration::from_secs(args.heartbeat_interval)) + .max_transmit_size(args.max_transmit_size) + .build() + .expect("valid gossipsub configuration"); + Ok(P2PConfig { local_keypair, network_name: args.network, @@ -139,9 +167,7 @@ impl From for anyhow::Result { args.connection_idle_timeout, )), topics: args.topics, - max_mesh_size: args.max_mesh_size, - min_mesh_size: args.min_mesh_size, - ideal_mesh_size: args.ideal_mesh_size, + gossipsub_config, set_request_timeout: Duration::from_secs(args.request_timeout), set_connection_keep_alive: Duration::from_secs(args.connection_keep_alive), info_interval: Some(Duration::from_secs(args.info_interval)), diff --git a/fuel-p2p/src/behavior.rs b/fuel-p2p/src/behavior.rs index add84e62781..f08955b7d8f 100644 --- a/fuel-p2p/src/behavior.rs +++ b/fuel-p2p/src/behavior.rs @@ -7,7 +7,7 @@ use crate::{ DiscoveryEvent, }, gossipsub::{ - build_gossipsub, + config::build_gossipsub_behaviour, topics::GossipTopic, }, peer_info::{ @@ -131,7 +131,7 @@ impl FuelBehaviour { Self { discovery: discovery_config.finish(), - gossipsub: build_gossipsub(p2p_config), + gossipsub: build_gossipsub_behaviour(p2p_config), peer_info, request_response, } diff --git a/fuel-p2p/src/config.rs b/fuel-p2p/src/config.rs index d8377a19e08..14b43bd7ead 100644 --- a/fuel-p2p/src/config.rs +++ b/fuel-p2p/src/config.rs @@ -1,5 +1,5 @@ use crate::gossipsub::{ - default_gossipsub_config, + config::default_gossipsub_config, topics::{ CON_VOTE_GOSSIP_TOPIC, NEW_BLOCK_GOSSIP_TOPIC, @@ -36,10 +36,7 @@ use std::{ time::Duration, }; -pub use libp2p::gossipsub::{ - GossipsubConfig, - GossipsubConfigBuilder, -}; +use libp2p::gossipsub::GossipsubConfig; const REQ_RES_TIMEOUT: Duration = Duration::from_secs(20); diff --git a/fuel-p2p/src/gossipsub.rs b/fuel-p2p/src/gossipsub.rs index 2b3e7425e85..7b623bd2f69 100644 --- a/fuel-p2p/src/gossipsub.rs +++ b/fuel-p2p/src/gossipsub.rs @@ -1,8 +1,3 @@ -mod builder; +pub mod config; pub mod messages; pub mod topics; - -pub use builder::{ - build_gossipsub, - default_gossipsub_config, -}; diff --git a/fuel-p2p/src/gossipsub/builder.rs b/fuel-p2p/src/gossipsub/config.rs similarity index 66% rename from fuel-p2p/src/gossipsub/builder.rs rename to fuel-p2p/src/gossipsub/config.rs index 0f427292416..1fef47c4088 100644 --- a/fuel-p2p/src/gossipsub/builder.rs +++ b/fuel-p2p/src/gossipsub/config.rs @@ -17,7 +17,8 @@ use sha2::{ use crate::config::P2PConfig; -pub fn default_gossipsub_config() -> GossipsubConfig { +/// Creates `GossipsubConfigBuilder` with few of the Gossipsub values already defined +pub fn default_gossipsub_builder() -> GossipsubConfigBuilder { let gossip_message_id = move |message: &GossipsubMessage| { MessageId::from(&Sha256::digest(&message.data)[..]) }; @@ -26,21 +27,30 @@ pub fn default_gossipsub_config() -> GossipsubConfig { FastMessageId::from(&Sha256::digest(&message.data)[..]) }; - let gossipsub_config = GossipsubConfigBuilder::default() + let mut builder = GossipsubConfigBuilder::default(); + + builder .protocol_id_prefix("/meshsub/1.0.0") + .message_id_fn(gossip_message_id) + .fast_message_id_fn(fast_gossip_message_id) + .validate_messages(); + + builder +} + +/// Builds a default `GossipsubConfig`. +/// Used in testing. +pub(crate) fn default_gossipsub_config() -> GossipsubConfig { + default_gossipsub_builder() .mesh_n(6) .mesh_n_low(4) .mesh_n_high(12) - .message_id_fn(gossip_message_id) - .fast_message_id_fn(fast_gossip_message_id) - .validate_messages() .build() - .expect("valid gossipsub configuration"); - - gossipsub_config + .expect("valid gossipsub configuration") } -pub fn build_gossipsub(p2p_config: &P2PConfig) -> Gossipsub { +/// Given a `P2pConfig` creates a Gossipsub Behaviour +pub(crate) fn build_gossipsub_behaviour(p2p_config: &P2PConfig) -> Gossipsub { let mut gossipsub = Gossipsub::new( MessageAuthenticity::Signed(p2p_config.local_keypair.clone()), p2p_config.gossipsub_config.clone(), diff --git a/fuel-p2p/src/lib.rs b/fuel-p2p/src/lib.rs index 8287eb922a0..16c89d16474 100644 --- a/fuel-p2p/src/lib.rs +++ b/fuel-p2p/src/lib.rs @@ -8,6 +8,8 @@ mod peer_info; mod request_response; mod service; +pub use gossipsub::config as gossipsub_config; + pub use libp2p::{ Multiaddr, PeerId, From cc3c3a73b0d0aa6ce11e50a2e82269c0aae70a0b Mon Sep 17 00:00:00 2001 From: leviathanbeak88 Date: Mon, 17 Oct 2022 18:37:55 +0200 Subject: [PATCH 20/22] import serde only as feature --- fuel-core-interfaces/src/p2p.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/fuel-core-interfaces/src/p2p.rs b/fuel-core-interfaces/src/p2p.rs index f0fe26133f1..9ac56a70cf0 100644 --- a/fuel-core-interfaces/src/p2p.rs +++ b/fuel-core-interfaces/src/p2p.rs @@ -8,10 +8,6 @@ use crate::{ model::ConsensusVote, }; use async_trait::async_trait; -use serde::{ - Deserialize, - Serialize, -}; use std::{ fmt::Debug, sync::Arc, @@ -42,7 +38,7 @@ pub enum GossipsubMessageAcceptance { } #[derive(Debug, Clone, Hash, PartialEq, Eq)] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct GossipsubMessageInfo { pub message_id: Vec, pub peer_id: Vec, From b618fe13e6a70beeb83b9297cabbb6b3bb6b9a2f Mon Sep 17 00:00:00 2001 From: leviathanbeak88 Date: Wed, 19 Oct 2022 19:56:24 +0200 Subject: [PATCH 21/22] use test-helpers from fuel-core-interfaces --- fuel-p2p/Cargo.toml | 1 + fuel-p2p/src/service.rs | 5 +---- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/fuel-p2p/Cargo.toml b/fuel-p2p/Cargo.toml index 57839f4d5b0..b26b26c5674 100644 --- a/fuel-p2p/Cargo.toml +++ b/fuel-p2p/Cargo.toml @@ -35,6 +35,7 @@ tokio = { version = "1.21", features = ["full"] } tracing-appender = "0.2" tracing-attributes = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } +fuel-core-interfaces = { path = "../fuel-core-interfaces", features = ["serde", "test-helpers"], version = "0.11.2" } [features] test-helpers = [ diff --git a/fuel-p2p/src/service.rs b/fuel-p2p/src/service.rs index 8d0ecfb61bf..3be96fb7803 100644 --- a/fuel-p2p/src/service.rs +++ b/fuel-p2p/src/service.rs @@ -851,10 +851,7 @@ mod tests { node_b_event = node_b.next_event() => { // 2. Node B receives the RequestMessage from Node A initiated by the NetworkOrchestrator if let Some(FuelP2PEvent::RequestMessage{ request_id, .. }) = node_b_event { - let block = FuelBlock { - header: FuelBlockHeader::default(), - transactions: vec![Transaction::default(), Transaction::default(), Transaction::default(), Transaction::default(), Transaction::default()], - }; + let block = FuelBlock::new(PartialFuelBlockHeader::default(), vec![Transaction::default(), Transaction::default(), Transaction::default(), Transaction::default(), Transaction::default()], &[]); let sealed_block = SealedFuelBlock { block, From 4908019bfc89f105882ad642bd55821266166d1c Mon Sep 17 00:00:00 2001 From: leviathanbeak88 Date: Wed, 19 Oct 2022 19:57:53 +0200 Subject: [PATCH 22/22] to the right place in toml --- fuel-p2p/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fuel-p2p/Cargo.toml b/fuel-p2p/Cargo.toml index b26b26c5674..53f71073824 100644 --- a/fuel-p2p/Cargo.toml +++ b/fuel-p2p/Cargo.toml @@ -30,12 +30,12 @@ tracing = "0.1" [dev-dependencies] ctor = "0.1" +fuel-core-interfaces = { path = "../fuel-core-interfaces", features = ["serde", "test-helpers"], version = "0.11.2" } rand = "0.8" tokio = { version = "1.21", features = ["full"] } tracing-appender = "0.2" tracing-attributes = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } -fuel-core-interfaces = { path = "../fuel-core-interfaces", features = ["serde", "test-helpers"], version = "0.11.2" } [features] test-helpers = [