diff --git a/fuel-core-interfaces/src/p2p.rs b/fuel-core-interfaces/src/p2p.rs index dbe565b0685..9ac56a70cf0 100644 --- a/fuel-core-interfaces/src/p2p.rs +++ b/fuel-core-interfaces/src/p2p.rs @@ -8,7 +8,10 @@ use crate::{ model::ConsensusVote, }; use async_trait::async_trait; -use std::sync::Arc; +use std::{ + fmt::Debug, + sync::Arc, +}; use tokio::sync::oneshot; #[derive(Debug, PartialEq, Eq, Clone)] @@ -16,15 +19,40 @@ pub enum TransactionBroadcast { NewTransaction(Transaction), } +#[derive(Debug, PartialEq, Eq, Clone)] pub enum ConsensusBroadcast { NewVote(ConsensusVote), } +#[derive(Debug, Clone)] pub enum BlockBroadcast { /// fuel block without consensus data NewBlock(FuelBlock), } +#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash)] +pub enum GossipsubMessageAcceptance { + Accept, + Reject, + Ignore, +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct GossipsubMessageInfo { + pub message_id: Vec, + pub peer_id: Vec, +} + +impl From<&GossipData> for GossipsubMessageInfo { + fn from(gossip_data: &GossipData) -> Self { + Self { + message_id: gossip_data.message_id.clone(), + peer_id: gossip_data.peer_id.clone(), + } + } +} + #[derive(Debug)] pub enum P2pRequestEvent { RequestBlock { @@ -40,9 +68,48 @@ pub enum P2pRequestEvent { BroadcastConsensusVote { vote: Arc, }, + GossipsubMessageReport { + message: GossipsubMessageInfo, + acceptance: GossipsubMessageAcceptance, + }, Stop, } +#[derive(Debug, Clone)] +pub struct GossipData { + pub data: Option, + pub peer_id: Vec, + pub message_id: Vec, +} + +pub type ConsensusGossipData = GossipData; +pub type TransactionGossipData = GossipData; +pub type BlockGossipData = GossipData; + +impl GossipData { + pub fn new( + data: T, + peer_id: impl Into>, + message_id: impl Into>, + ) -> Self { + Self { + data: Some(data), + peer_id: peer_id.into(), + message_id: message_id.into(), + } + } +} + +pub trait NetworkData: Debug + Send { + fn take_data(&mut self) -> Option; +} + +impl NetworkData for GossipData { + fn take_data(&mut self) -> Option { + self.data.take() + } +} + #[async_trait] pub trait P2pDb: Send + Sync { async fn get_sealed_block(&self, height: BlockHeight) diff --git a/fuel-core/src/cli/run/p2p.rs b/fuel-core/src/cli/run/p2p.rs index 05a4f5a2584..c9a741d38d7 100644 --- a/fuel-core/src/cli/run/p2p.rs +++ b/fuel-core/src/cli/run/p2p.rs @@ -12,6 +12,7 @@ use clap::Args; use fuel_core_interfaces::common::fuel_crypto::SecretKey; use fuel_p2p::{ config::P2PConfig, + gossipsub_config::default_gossipsub_builder, Multiaddr, }; @@ -89,6 +90,22 @@ pub struct P2pArgs { #[clap(long = "ideal_mesh_size", default_value = "6")] pub ideal_mesh_size: usize, + /// Number of heartbeats to keep in the gossipsub `memcache` + #[clap(long = "history_length", default_value = "5")] + pub history_length: usize, + + /// Number of past heartbeats to gossip about + #[clap(long = "history_gossip", default_value = "3")] + pub history_gossip: usize, + + /// Time between each heartbeat + #[clap(long = "heartbeat_interval", default_value = "1")] + pub heartbeat_interval: u64, + + /// The maximum byte size for each gossip + #[clap(long = "max_transmit_size", default_value = "2048")] + pub max_transmit_size: usize, + /// Choose timeout for sent requests in RequestResponse protocol #[clap(long = "request_timeout", default_value = "20")] pub request_timeout: u64, @@ -122,6 +139,17 @@ impl From for anyhow::Result { } }; + let gossipsub_config = default_gossipsub_builder() + .mesh_n(args.ideal_mesh_size) + .mesh_n_low(args.min_mesh_size) + .mesh_n_high(args.max_mesh_size) + .history_length(args.history_length) + .history_gossip(args.history_gossip) + .heartbeat_interval(Duration::from_secs(args.heartbeat_interval)) + .max_transmit_size(args.max_transmit_size) + .build() + .expect("valid gossipsub configuration"); + Ok(P2PConfig { local_keypair, network_name: args.network, @@ -139,9 +167,7 @@ impl From for anyhow::Result { args.connection_idle_timeout, )), topics: args.topics, - max_mesh_size: args.max_mesh_size, - min_mesh_size: args.min_mesh_size, - ideal_mesh_size: args.ideal_mesh_size, + gossipsub_config, set_request_timeout: Duration::from_secs(args.request_timeout), set_connection_keep_alive: Duration::from_secs(args.connection_keep_alive), info_interval: Some(Duration::from_secs(args.info_interval)), diff --git a/fuel-p2p/Cargo.toml b/fuel-p2p/Cargo.toml index 57839f4d5b0..53f71073824 100644 --- a/fuel-p2p/Cargo.toml +++ b/fuel-p2p/Cargo.toml @@ -30,6 +30,7 @@ tracing = "0.1" [dev-dependencies] ctor = "0.1" +fuel-core-interfaces = { path = "../fuel-core-interfaces", features = ["serde", "test-helpers"], version = "0.11.2" } rand = "0.8" tokio = { version = "1.21", features = ["full"] } tracing-appender = "0.2" diff --git a/fuel-p2p/src/behavior.rs b/fuel-p2p/src/behavior.rs index 042b742cf8d..26c04b863e1 100644 --- a/fuel-p2p/src/behavior.rs +++ b/fuel-p2p/src/behavior.rs @@ -7,7 +7,7 @@ use crate::{ DiscoveryEvent, }, gossipsub::{ - build_gossipsub, + config::build_gossipsub_behaviour, topics::GossipTopic, }, peer_info::{ @@ -28,6 +28,7 @@ use libp2p::{ }, Gossipsub, GossipsubEvent, + MessageAcceptance, MessageId, }, request_response::{ @@ -106,7 +107,7 @@ impl FuelBehaviour { Self { discovery: discovery_config.finish(), - gossipsub: build_gossipsub(&p2p_config.local_keypair, p2p_config), + gossipsub: build_gossipsub_behaviour(p2p_config), peer_info, request_response, } @@ -166,6 +167,19 @@ impl FuelBehaviour { self.request_response.send_response(channel, message) } + pub fn report_message_validation_result( + &mut self, + msg_id: &MessageId, + propagation_source: &PeerId, + acceptance: MessageAcceptance, + ) -> Result { + self.gossipsub.report_message_validation_result( + msg_id, + propagation_source, + acceptance, + ) + } + // Currently only used in testing, but should be useful for the NetworkOrchestrator API #[allow(dead_code)] pub fn get_peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> { diff --git a/fuel-p2p/src/config.rs b/fuel-p2p/src/config.rs index 4bace0e1fb1..14b43bd7ead 100644 --- a/fuel-p2p/src/config.rs +++ b/fuel-p2p/src/config.rs @@ -1,8 +1,12 @@ -use crate::gossipsub::topics::{ - CON_VOTE_GOSSIP_TOPIC, - NEW_BLOCK_GOSSIP_TOPIC, - NEW_TX_GOSSIP_TOPIC, +use crate::gossipsub::{ + config::default_gossipsub_config, + topics::{ + CON_VOTE_GOSSIP_TOPIC, + NEW_BLOCK_GOSSIP_TOPIC, + NEW_TX_GOSSIP_TOPIC, + }, }; + use libp2p::{ core::{ muxing::StreamMuxerBox, @@ -23,6 +27,7 @@ use libp2p::{ PeerId, Transport, }; + use std::{ net::{ IpAddr, @@ -31,6 +36,8 @@ use std::{ time::Duration, }; +use libp2p::gossipsub::GossipsubConfig; + const REQ_RES_TIMEOUT: Duration = Duration::from_secs(20); /// Maximum number of frames buffered per substream. @@ -72,11 +79,9 @@ pub struct P2PConfig { /// and the next outbound ping pub info_interval: Option, - // `Gossipsub` related fields + // `Gossipsub` config and topics + pub gossipsub_config: GossipsubConfig, pub topics: Vec, - pub ideal_mesh_size: usize, - pub min_mesh_size: usize, - pub max_mesh_size: usize, // RequestResponse related fields /// Sets the timeout for inbound and outbound requests. @@ -116,9 +121,7 @@ impl P2PConfig { NEW_BLOCK_GOSSIP_TOPIC.into(), CON_VOTE_GOSSIP_TOPIC.into(), ], - max_mesh_size: 12, - min_mesh_size: 4, - ideal_mesh_size: 6, + gossipsub_config: default_gossipsub_config(), set_request_timeout: REQ_RES_TIMEOUT, set_connection_keep_alive: REQ_RES_TIMEOUT, info_interval: Some(Duration::from_secs(3)), diff --git a/fuel-p2p/src/gossipsub.rs b/fuel-p2p/src/gossipsub.rs index dd485e67b12..7b623bd2f69 100644 --- a/fuel-p2p/src/gossipsub.rs +++ b/fuel-p2p/src/gossipsub.rs @@ -1,5 +1,3 @@ -mod builder; +pub mod config; pub mod messages; pub mod topics; - -pub use builder::build_gossipsub; diff --git a/fuel-p2p/src/gossipsub/builder.rs b/fuel-p2p/src/gossipsub/builder.rs deleted file mode 100644 index a9f3f8f9310..00000000000 --- a/fuel-p2p/src/gossipsub/builder.rs +++ /dev/null @@ -1,52 +0,0 @@ -use libp2p::{ - gossipsub::{ - FastMessageId, - Gossipsub, - GossipsubConfigBuilder, - GossipsubMessage, - MessageAuthenticity, - MessageId, - PeerScoreParams, - PeerScoreThresholds, - RawGossipsubMessage, - }, - identity::Keypair, -}; -use sha2::{ - Digest, - Sha256, -}; - -use crate::config::P2PConfig; - -pub fn build_gossipsub(local_key: &Keypair, p2p_config: &P2PConfig) -> Gossipsub { - let gossip_message_id = move |message: &GossipsubMessage| { - MessageId::from(&Sha256::digest(&message.data)[..20]) - }; - - let fast_gossip_message_id = move |message: &RawGossipsubMessage| { - FastMessageId::from(&Sha256::digest(&message.data)[..8]) - }; - - let gossipsub_config = GossipsubConfigBuilder::default() - .protocol_id_prefix("/meshsub/1.0.0") - .mesh_n(p2p_config.ideal_mesh_size) - .mesh_n_low(p2p_config.min_mesh_size) - .mesh_n_high(p2p_config.max_mesh_size) - .message_id_fn(gossip_message_id) - .fast_message_id_fn(fast_gossip_message_id) - .build() - .expect("valid gossipsub configuration"); - - let mut gossipsub = Gossipsub::new( - MessageAuthenticity::Signed(local_key.clone()), - gossipsub_config, - ) - .expect("gossipsub initialized"); - - gossipsub - .with_peer_score(PeerScoreParams::default(), PeerScoreThresholds::default()) - .expect("gossipsub initialized with peer score"); - - gossipsub -} diff --git a/fuel-p2p/src/gossipsub/config.rs b/fuel-p2p/src/gossipsub/config.rs new file mode 100644 index 00000000000..1fef47c4088 --- /dev/null +++ b/fuel-p2p/src/gossipsub/config.rs @@ -0,0 +1,65 @@ +use libp2p::gossipsub::{ + FastMessageId, + Gossipsub, + GossipsubConfig, + GossipsubConfigBuilder, + GossipsubMessage, + MessageAuthenticity, + MessageId, + PeerScoreParams, + PeerScoreThresholds, + RawGossipsubMessage, +}; +use sha2::{ + Digest, + Sha256, +}; + +use crate::config::P2PConfig; + +/// Creates `GossipsubConfigBuilder` with few of the Gossipsub values already defined +pub fn default_gossipsub_builder() -> GossipsubConfigBuilder { + let gossip_message_id = move |message: &GossipsubMessage| { + MessageId::from(&Sha256::digest(&message.data)[..]) + }; + + let fast_gossip_message_id = move |message: &RawGossipsubMessage| { + FastMessageId::from(&Sha256::digest(&message.data)[..]) + }; + + let mut builder = GossipsubConfigBuilder::default(); + + builder + .protocol_id_prefix("/meshsub/1.0.0") + .message_id_fn(gossip_message_id) + .fast_message_id_fn(fast_gossip_message_id) + .validate_messages(); + + builder +} + +/// Builds a default `GossipsubConfig`. +/// Used in testing. +pub(crate) fn default_gossipsub_config() -> GossipsubConfig { + default_gossipsub_builder() + .mesh_n(6) + .mesh_n_low(4) + .mesh_n_high(12) + .build() + .expect("valid gossipsub configuration") +} + +/// Given a `P2pConfig` creates a Gossipsub Behaviour +pub(crate) fn build_gossipsub_behaviour(p2p_config: &P2PConfig) -> Gossipsub { + let mut gossipsub = Gossipsub::new( + MessageAuthenticity::Signed(p2p_config.local_keypair.clone()), + p2p_config.gossipsub_config.clone(), + ) + .expect("gossipsub initialized"); + + gossipsub + .with_peer_score(PeerScoreParams::default(), PeerScoreThresholds::default()) + .expect("gossipsub initialized with peer score"); + + gossipsub +} diff --git a/fuel-p2p/src/lib.rs b/fuel-p2p/src/lib.rs index 8287eb922a0..16c89d16474 100644 --- a/fuel-p2p/src/lib.rs +++ b/fuel-p2p/src/lib.rs @@ -8,6 +8,8 @@ mod peer_info; mod request_response; mod service; +pub use gossipsub::config as gossipsub_config; + pub use libp2p::{ Multiaddr, PeerId, diff --git a/fuel-p2p/src/orchestrator.rs b/fuel-p2p/src/orchestrator.rs index da58a848722..7d0cabfdc5e 100644 --- a/fuel-p2p/src/orchestrator.rs +++ b/fuel-p2p/src/orchestrator.rs @@ -1,14 +1,24 @@ use std::sync::Arc; use anyhow::anyhow; + use fuel_core_interfaces::p2p::{ BlockBroadcast, + BlockGossipData, ConsensusBroadcast, + ConsensusGossipData, + GossipData, + GossipsubMessageAcceptance, + GossipsubMessageInfo, P2pDb, P2pRequestEvent, TransactionBroadcast, + TransactionGossipData, +}; +use libp2p::{ + gossipsub::MessageAcceptance, + request_response::RequestId, }; -use libp2p::request_response::RequestId; use tokio::{ sync::{ broadcast, @@ -20,10 +30,17 @@ use tokio::{ }, task::JoinHandle, }; -use tracing::warn; +use tracing::{ + debug, + error, + warn, +}; use crate::{ - codecs::bincode::BincodeCodec, + codecs::{ + bincode::BincodeCodec, + NetworkCodec, + }, config::P2PConfig, gossipsub::messages::{ GossipsubBroadcastRequest, @@ -48,11 +65,10 @@ pub struct NetworkOrchestrator { rx_outbound_responses: Receiver>, // senders - tx_consensus: Sender, - tx_transaction: broadcast::Sender, - tx_block: Sender, + tx_consensus: Sender, + tx_transaction: broadcast::Sender, + tx_block: Sender, tx_outbound_responses: Sender>, - db: Arc, } @@ -61,9 +77,9 @@ impl NetworkOrchestrator { p2p_config: P2PConfig, rx_request_event: Receiver, - tx_consensus: Sender, - tx_transaction: broadcast::Sender, - tx_block: Sender, + tx_consensus: Sender, + tx_transaction: broadcast::Sender, + tx_block: Sender, db: Arc, ) -> Self { @@ -97,20 +113,22 @@ impl NetworkOrchestrator { }, p2p_event = p2p_service.next_event() => { match p2p_event { - FuelP2PEvent::GossipsubMessage { message, .. } => { + Some(FuelP2PEvent::GossipsubMessage { message, message_id, peer_id,.. }) => { + let message_id = message_id.0; + match message { GossipsubMessage::NewTx(tx) => { - let _ = self.tx_transaction.send(TransactionBroadcast::NewTransaction(tx)); + let _ = self.tx_transaction.send(GossipData::new(TransactionBroadcast::NewTransaction(tx), peer_id, message_id)); }, GossipsubMessage::NewBlock(block) => { - let _ = self.tx_block.send(BlockBroadcast::NewBlock(block)); + let _ = self.tx_block.send(GossipData::new(BlockBroadcast::NewBlock(block), peer_id, message_id)); }, GossipsubMessage::ConsensusVote(vote) => { - let _ = self.tx_consensus.send(ConsensusBroadcast::NewVote(vote)); + let _ = self.tx_consensus.send(GossipData::new(ConsensusBroadcast::NewVote(vote), peer_id, message_id)); }, } }, - FuelP2PEvent::RequestMessage { request_message, request_id } => { + Some(FuelP2PEvent::RequestMessage { request_message, request_id }) => { match request_message { RequestMessage::RequestBlock(block_height) => { let db = self.db.clone(); @@ -146,6 +164,9 @@ impl NetworkOrchestrator { let broadcast = GossipsubBroadcastRequest::ConsensusVote(vote); let _ = p2p_service.publish_message(broadcast); }, + P2pRequestEvent::GossipsubMessageReport { message, acceptance } => { + report_message(message, acceptance, &mut p2p_service); + } P2pRequestEvent::Stop => break, } } else { @@ -159,6 +180,42 @@ impl NetworkOrchestrator { } } +fn report_message( + message: GossipsubMessageInfo, + acceptance: GossipsubMessageAcceptance, + p2p_service: &mut FuelP2PService, +) { + let GossipsubMessageInfo { + peer_id, + message_id, + } = message; + + let msg_id = message_id.into(); + + if let Ok(peer_id) = peer_id.try_into() { + let acceptance = match acceptance { + GossipsubMessageAcceptance::Accept => MessageAcceptance::Accept, + GossipsubMessageAcceptance::Reject => MessageAcceptance::Reject, + GossipsubMessageAcceptance::Ignore => MessageAcceptance::Ignore, + }; + + match p2p_service.report_message_validation_result(&msg_id, &peer_id, acceptance) + { + Ok(true) => { + debug!(target: "fuel-libp2p", "Sent a report for MessageId: {} from PeerId: {}", msg_id, peer_id); + } + Ok(false) => { + warn!(target: "fuel-libp2p", "Message with MessageId: {} not found in the Gossipsub Message Cache", msg_id); + } + Err(e) => { + error!(target: "fuel-libp2p", "Failed to publish Message with MessageId: {} with Error: {:?}", msg_id, e); + } + } + } else { + warn!(target: "fuel-libp2p", "Failed to read PeerId from received GossipsubMessageId: {}", msg_id); + } +} + pub struct Service { /// Network Orchestrator that handles p2p network and inter-module communication network_orchestrator: Arc>>, @@ -174,9 +231,9 @@ impl Service { db: Arc, tx_request_event: Sender, rx_request_event: Receiver, - tx_consensus: Sender, - tx_transaction: broadcast::Sender, - tx_block: Sender, + tx_consensus: Sender, + tx_transaction: broadcast::Sender, + tx_block: Sender, ) -> Self { let network_orchestrator = NetworkOrchestrator::new( p2p_config, diff --git a/fuel-p2p/src/service.rs b/fuel-p2p/src/service.rs index bc00e5dd7c5..3be96fb7803 100644 --- a/fuel-p2p/src/service.rs +++ b/fuel-p2p/src/service.rs @@ -30,11 +30,13 @@ use crate::{ ResponseMessage, }, }; + use futures::prelude::*; use libp2p::{ gossipsub::{ error::PublishError, GossipsubEvent, + MessageAcceptance, MessageId, Topic, TopicHash, @@ -58,6 +60,7 @@ use rand::Rng; use std::collections::HashMap; use tracing::{ debug, + info, warn, }; @@ -97,6 +100,7 @@ struct NetworkMetadata { pub enum FuelP2PEvent { GossipsubMessage { peer_id: PeerId, + message_id: MessageId, topic_hash: TopicHash, message: FuelGossipsubMessage, }, @@ -238,15 +242,27 @@ impl FuelP2PService { Ok(()) } - pub async fn next_event(&mut self) -> FuelP2PEvent { - loop { - if let SwarmEvent::Behaviour(fuel_behaviour) = - self.swarm.select_next_some().await - { - if let Some(event) = self.handle_behaviour_event(fuel_behaviour) { - return event - } - } + pub fn report_message_validation_result( + &mut self, + msg_id: &MessageId, + propagation_source: &PeerId, + acceptance: MessageAcceptance, + ) -> Result { + self.swarm.behaviour_mut().report_message_validation_result( + msg_id, + propagation_source, + acceptance, + ) + } + + /// Handles P2P Events. + /// Returns only events that are of interest to the Network Orchestrator. + pub async fn next_event(&mut self) -> Option { + if let SwarmEvent::Behaviour(fuel_behaviour) = self.swarm.select_next_some().await + { + self.handle_behaviour_event(fuel_behaviour) + } else { + None } } @@ -272,7 +288,7 @@ impl FuelP2PService { if let GossipsubEvent::Message { propagation_source, message, - .. + message_id, } = gossipsub_event { if let Some(correct_topic) = self @@ -284,12 +300,29 @@ impl FuelP2PService { Ok(decoded_message) => { return Some(FuelP2PEvent::GossipsubMessage { peer_id: propagation_source, + message_id, topic_hash: message.topic, message: decoded_message, }) } Err(err) => { - warn!(target: "fuel-libp2p", "Failed to decode a message: {:?} with error: {:?}", &message.data, err); + warn!(target: "fuel-libp2p", "Failed to decode a message. ID: {}, Message: {:?} with error: {:?}", message_id, &message.data, err); + + match self.report_message_validation_result( + &message_id, + &propagation_source, + MessageAcceptance::Reject, + ) { + Ok(false) => { + warn!(target: "fuel-libp2p", "Message was not found in the cache, peer with PeerId: {} has been reported.", propagation_source); + } + Ok(true) => { + info!(target: "fuel-libp2p", "Message found in the cache, peer with PeerId: {} has been reported.", propagation_source); + } + Err(e) => { + warn!(target: "fuel-libp2p", "Failed to publish the message with following error: {:?}.", e); + } + } } } } else { @@ -414,7 +447,10 @@ mod tests { }; use futures::StreamExt; use libp2p::{ - gossipsub::Topic, + gossipsub::{ + error::PublishError, + Topic, + }, identity::Keypair, swarm::SwarmEvent, Multiaddr, @@ -510,7 +546,7 @@ mod tests { loop { tokio::select! { node_b_event = node_b.next_event() => { - if let FuelP2PEvent::PeerConnected(_) = node_b_event { + if let Some(FuelP2PEvent::PeerConnected(_)) = node_b_event { // successfully connected to Node B break } @@ -558,7 +594,7 @@ mod tests { }, node_c_event = node_c.next_event() => { - if let FuelP2PEvent::PeerConnected(peer_id) = node_c_event { + if let Some(FuelP2PEvent::PeerConnected(peer_id)) = node_c_event { // we have connected to Node B! if peer_id == node_b.local_peer_id { break @@ -594,7 +630,7 @@ mod tests { loop { tokio::select! { node_a_event = node_a.next_event() => { - if let FuelP2PEvent::PeerInfoUpdated(peer_id) = node_a_event { + if let Some(FuelP2PEvent::PeerInfoUpdated(peer_id)) = node_a_event { if let Some(PeerInfo { peer_addresses, latest_ping, client_version, .. }) = node_a.swarm.behaviour().get_peer_info(&peer_id) { // Exits after it verifies that: // 1. Peer Addresses are known @@ -683,7 +719,7 @@ mod tests { loop { tokio::select! { node_a_event = node_a.next_event() => { - if let FuelP2PEvent::PeerInfoUpdated(peer_id) = node_a_event { + if let Some(FuelP2PEvent::PeerInfoUpdated(peer_id)) = node_a_event { if let Some(PeerInfo { peer_addresses, .. }) = node_a.swarm.behaviour().get_peer_info(&peer_id) { // verifies that we've got at least a single peer address to send message to if !peer_addresses.is_empty() && !message_sent { @@ -697,7 +733,7 @@ mod tests { tracing::info!("Node A Event: {:?}", node_a_event); }, node_b_event = node_b.next_event() => { - if let FuelP2PEvent::GossipsubMessage { topic_hash, message, .. } = node_b_event.clone() { + if let Some(FuelP2PEvent::GossipsubMessage { topic_hash, message, .. }) = node_b_event.clone() { if topic_hash != selected_topic.hash() { tracing::error!("Wrong topic hash, expected: {} - actual: {}", selected_topic.hash(), topic_hash); panic!("Wrong Topic"); @@ -725,6 +761,12 @@ mod tests { } } + // Node B received the correct message + // If we try to publish it again we will get `PublishError::Duplicate` + // This asserts that our MessageId calculation is consistant irrespective of which Peer sends it + let broadcast_request = broadcast_request.clone(); + matches!(node_b.publish_message(broadcast_request), Err(PublishError::Duplicate)); + break } @@ -775,7 +817,7 @@ mod tests { break; } node_a_event = node_a.next_event() => { - if let FuelP2PEvent::PeerInfoUpdated(peer_id) = node_a_event { + if let Some(FuelP2PEvent::PeerInfoUpdated(peer_id)) = node_a_event { if let Some(PeerInfo { peer_addresses, .. }) = node_a.swarm.behaviour().get_peer_info(&peer_id) { // 0. verifies that we've got at least a single peer address to request message from if !peer_addresses.is_empty() && !request_sent { @@ -808,12 +850,8 @@ mod tests { }, node_b_event = node_b.next_event() => { // 2. Node B receives the RequestMessage from Node A initiated by the NetworkOrchestrator - if let FuelP2PEvent::RequestMessage{ request_id, .. } = node_b_event { - let block = FuelBlock::new( - PartialFuelBlockHeader::default(), - vec![Transaction::default(), Transaction::default(), Transaction::default(), Transaction::default(), Transaction::default()], - &[] - ); + if let Some(FuelP2PEvent::RequestMessage{ request_id, .. }) = node_b_event { + let block = FuelBlock::new(PartialFuelBlockHeader::default(), vec![Transaction::default(), Transaction::default(), Transaction::default(), Transaction::default(), Transaction::default()], &[]); let sealed_block = SealedFuelBlock { block, @@ -860,7 +898,7 @@ mod tests { loop { tokio::select! { node_a_event = node_a.next_event() => { - if let FuelP2PEvent::PeerInfoUpdated(peer_id) = node_a_event { + if let Some(FuelP2PEvent::PeerInfoUpdated(peer_id)) = node_a_event { if let Some(PeerInfo { peer_addresses, .. }) = node_a.swarm.behaviour().get_peer_info(&peer_id) { // 0. verifies that we've got at least a single peer address to request message from if !peer_addresses.is_empty() && !request_sent { diff --git a/fuel-sync/src/service.rs b/fuel-sync/src/service.rs index 565d5b0b643..cd0f15de63b 100644 --- a/fuel-sync/src/service.rs +++ b/fuel-sync/src/service.rs @@ -2,7 +2,7 @@ use crate::Config; use fuel_core_interfaces::{ block_importer::ImportBlockMpsc, p2p::{ - BlockBroadcast, + BlockGossipData, P2pRequestEvent, }, sync::SyncMpsc, @@ -29,7 +29,7 @@ impl Service { pub async fn start( &self, - _p2p_block: mpsc::Receiver, + _p2p_block: mpsc::Receiver, _p2p_request: mpsc::Sender, // TODO: re-introduce this when sync actually depends on the coordinator // _bft: mpsc::Sender, diff --git a/fuel-txpool/src/service.rs b/fuel-txpool/src/service.rs index 1b83aa74153..f90b3979db8 100644 --- a/fuel-txpool/src/service.rs +++ b/fuel-txpool/src/service.rs @@ -6,8 +6,10 @@ use anyhow::anyhow; use fuel_core_interfaces::{ block_importer::ImportBlockBroadcast, p2p::{ + GossipData, P2pRequestEvent, TransactionBroadcast, + TransactionGossipData, }, txpool::{ self, @@ -35,7 +37,7 @@ pub struct ServiceBuilder { txpool_receiver: Option>, tx_status_sender: Option>, import_block_receiver: Option>, - incoming_tx_receiver: Option>, + incoming_tx_receiver: Option>, network_sender: Option>, } @@ -95,7 +97,7 @@ impl ServiceBuilder { pub fn incoming_tx_receiver( &mut self, - incoming_tx_receiver: broadcast::Receiver, + incoming_tx_receiver: broadcast::Receiver, ) -> &mut Self { self.incoming_tx_receiver = Some(incoming_tx_receiver); self @@ -157,7 +159,7 @@ pub struct Context { pub txpool_receiver: mpsc::Receiver, pub tx_status_sender: broadcast::Sender, pub import_block_receiver: broadcast::Receiver, - pub incoming_tx_receiver: broadcast::Receiver, + pub incoming_tx_receiver: broadcast::Receiver, pub network_sender: mpsc::Sender, } @@ -179,11 +181,9 @@ impl Context { tokio::spawn( async move { let txpool = txpool.as_ref(); - match new_transaction.unwrap() { - TransactionBroadcast::NewTransaction ( tx ) => { - let txs = vec!(Arc::new(tx)); - TxPool::insert(txpool, db.as_ref().as_ref(), tx_status_sender, txs).await - } + if let GossipData { data: Some(TransactionBroadcast::NewTransaction ( tx )), .. } = new_transaction.unwrap() { + let txs = vec!(Arc::new(tx)); + TxPool::insert(txpool, db.as_ref().as_ref(), tx_status_sender, txs).await; } }); }