diff --git a/crates/services/p2p/src/behavior.rs b/crates/services/p2p/src/behavior.rs index 825f5bcf..4ce5e3d7 100644 --- a/crates/services/p2p/src/behavior.rs +++ b/crates/services/p2p/src/behavior.rs @@ -1,8 +1,3 @@ -use std::sync::{ - Arc, - RwLock, -}; - use crate::{ codecs::NetworkCodec, config::Config, @@ -15,11 +10,9 @@ use crate::{ config::build_gossipsub_behaviour, topics::GossipTopic, }, - peer_manager::{ - ConnectionState, - PeerInfo, - PeerInfoEvent, - PeerManagerBehaviour, + peer_report::{ + PeerReportBehaviour, + PeerReportEvent, }, request_response::messages::{ NetworkResponse, @@ -29,10 +22,7 @@ use crate::{ use fuel_core_types::fuel_types::BlockHeight; use libp2p::{ gossipsub::{ - error::{ - PublishError, - SubscriptionError, - }, + error::PublishError, Gossipsub, GossipsubEvent, MessageAcceptance, @@ -50,11 +40,16 @@ use libp2p::{ Multiaddr, PeerId, }; +use tracing::{ + debug, + error, + log::warn, +}; #[derive(Debug)] pub enum FuelBehaviourEvent { Discovery(DiscoveryEvent), - PeerInfo(PeerInfoEvent), + PeerReport(PeerReportEvent), Gossipsub(GossipsubEvent), RequestResponse(RequestResponseEvent), } @@ -66,9 +61,8 @@ pub struct FuelBehaviour { /// Node discovery discovery: DiscoveryBehaviour, - /// Handles Peer Connections /// Identifies and periodically requests `BlockHeight` from connected nodes - peer_manager: PeerManagerBehaviour, + peer_report: PeerReportBehaviour, /// Message propagation for p2p gossipsub: Gossipsub, @@ -78,11 +72,7 @@ pub struct FuelBehaviour { } impl FuelBehaviour { - pub(crate) fn new( - p2p_config: &Config, - codec: Codec, - connection_state: Arc>, - ) -> Self { + pub(crate) fn new(p2p_config: &Config, codec: Codec) -> Self { let local_public_key = p2p_config.keypair.public(); let local_peer_id = PeerId::from_public_key(&local_public_key); @@ -109,7 +99,9 @@ impl FuelBehaviour { discovery_config }; - let peer_manager = PeerManagerBehaviour::new(p2p_config, connection_state); + let gossipsub = build_gossipsub_behaviour(p2p_config); + + let peer_report = PeerReportBehaviour::new(p2p_config); let req_res_protocol = std::iter::once((codec.get_req_res_protocol(), ProtocolSupport::Full)); @@ -123,20 +115,12 @@ impl FuelBehaviour { Self { discovery: discovery_config.finish(), - gossipsub: build_gossipsub_behaviour(p2p_config), - peer_manager, + gossipsub, + peer_report, request_response, } } - pub fn add_addresses_to_peer_info( - &mut self, - peer_id: &PeerId, - addresses: Vec, - ) { - self.peer_manager.insert_peer_addresses(peer_id, addresses); - } - pub fn add_addresses_to_discovery( &mut self, peer_id: &PeerId, @@ -147,14 +131,6 @@ impl FuelBehaviour { } } - pub fn get_peers_ids(&self) -> impl Iterator { - self.peer_manager.get_peers_ids() - } - - pub fn total_peers_connected(&self) -> usize { - self.peer_manager.total_peers_connected() - } - pub fn publish_message( &mut self, topic: GossipTopic, @@ -163,13 +139,6 @@ impl FuelBehaviour { self.gossipsub.publish(topic, encoded_data) } - pub fn subscribe_to_topic( - &mut self, - topic: &GossipTopic, - ) -> Result { - self.gossipsub.subscribe(topic) - } - pub fn send_request_msg( &mut self, message_request: RequestMessage, @@ -191,26 +160,33 @@ impl FuelBehaviour { msg_id: &MessageId, propagation_source: &PeerId, acceptance: MessageAcceptance, - ) -> Result { - self.gossipsub.report_message_validation_result( + ) -> Option { + let should_check_score = matches!(acceptance, MessageAcceptance::Reject); + + match self.gossipsub.report_message_validation_result( msg_id, propagation_source, acceptance, - ) - } - - pub fn update_block_height(&mut self, block_height: BlockHeight) { - self.peer_manager.update_block_height(block_height); - } + ) { + Ok(true) => { + debug!(target: "fuel-p2p", "Sent a report for MessageId: {} from PeerId: {}", msg_id, propagation_source); + if should_check_score { + return self.gossipsub.peer_score(propagation_source) + } + } + Ok(false) => { + warn!(target: "fuel-p2p", "Message with MessageId: {} not found in the Gossipsub Message Cache", msg_id); + } + Err(e) => { + error!(target: "fuel-p2p", "Failed to report Message with MessageId: {} with Error: {:?}", msg_id, e); + } + } - // Currently only used in testing, but should be useful for the P2P Service API - #[allow(dead_code)] - pub fn get_peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> { - self.peer_manager.get_peer_info(peer_id) + None } - pub fn peer_manager(&self) -> &PeerManagerBehaviour { - &self.peer_manager + pub fn update_block_height(&mut self, block_height: BlockHeight) { + self.peer_report.update_block_height(block_height); } } @@ -220,9 +196,9 @@ impl From for FuelBehaviourEvent { } } -impl From for FuelBehaviourEvent { - fn from(event: PeerInfoEvent) -> Self { - FuelBehaviourEvent::PeerInfo(event) +impl From for FuelBehaviourEvent { + fn from(event: PeerReportEvent) -> Self { + FuelBehaviourEvent::PeerReport(event) } } diff --git a/crates/services/p2p/src/gossipsub/config.rs b/crates/services/p2p/src/gossipsub/config.rs index 7c2ba966..77f0403e 100644 --- a/crates/services/p2p/src/gossipsub/config.rs +++ b/crates/services/p2p/src/gossipsub/config.rs @@ -15,7 +15,10 @@ use libp2p::gossipsub::{ PeerScoreParams, PeerScoreThresholds, RawGossipsubMessage, + Topic, + TopicScoreParams, }; + use prometheus_client::registry::Registry; use sha2::{ Digest, @@ -23,6 +26,10 @@ use sha2::{ }; use std::time::Duration; +use super::topics::GossipTopic; + +const MAX_GOSSIP_SCORE: f64 = 100.0; + /// Creates `GossipsubConfigBuilder` with few of the Gossipsub values already defined pub fn default_gossipsub_builder() -> GossipsubConfigBuilder { let gossip_message_id = move |message: &GossipsubMessage| { @@ -57,6 +64,144 @@ pub(crate) fn default_gossipsub_config() -> GossipsubConfig { .expect("valid gossipsub configuration") } +fn initialize_topic_score_params() -> TopicScoreParams { + TopicScoreParams { + /// Each topic will weigh this much + topic_weight: 0.5, + + // Reflects positive on the Score + + // Time in the mesh + // This is the time the peer has been grafted in the mesh. + // The value of of the parameter is the `time/time_in_mesh_quantum`, capped by `time_in_mesh_cap` + // The weight of the parameter must be positive (or zero to disable). + time_in_mesh_weight: 1.0, + time_in_mesh_quantum: Duration::from_millis(1), + time_in_mesh_cap: 50.0, + + /// First message deliveries + /// This is the number of message deliveries in the topic. + /// The value of the parameter is a counter, decaying with `first_message_deliveries_decay`, and capped + /// by `first_message_deliveries_cap`. + /// The weight of the parameter MUST be positive (or zero to disable). + first_message_deliveries_weight: 1.0, + first_message_deliveries_decay: 0.5, + first_message_deliveries_cap: 50.0, + + // Reflects negative on the Score + /// Mesh message deliveries + /// This is the number of message deliveries in the mesh, within the + /// `mesh_message_deliveries_window` of message validation; deliveries during validation also + /// count and are retroactively applied when validation succeeds. + /// This window accounts for the minimum time before a hostile mesh peer trying to game the + /// score could replay back a valid message we just sent them. + /// It effectively tracks first and near-first deliveries, ie a message seen from a mesh peer + /// before we have forwarded it to them. + /// The parameter has an associated counter, decaying with `mesh_message_deliveries_decay`. + /// If the counter exceeds the threshold, its value is 0. + /// If the counter is below the `mesh_message_deliveries_threshold`, the value is the square of + /// the deficit, ie (`message_deliveries_threshold - counter)^2` + /// The penalty is only activated after `mesh_message_deliveries_activation` time in the mesh. + /// The weight of the parameter MUST be negative (or zero to disable). + mesh_message_deliveries_weight: -1.0, + mesh_message_deliveries_decay: 0.5, + mesh_message_deliveries_cap: 10.0, + mesh_message_deliveries_threshold: 20.0, + mesh_message_deliveries_window: Duration::from_millis(10), + mesh_message_deliveries_activation: Duration::from_secs(5), + + /// Sticky mesh propagation failures + /// This is a sticky penalty that applies when a peer gets pruned from the mesh with an active + /// mesh message delivery penalty. + /// The weight of the parameter MUST be negative (or zero to disable) + mesh_failure_penalty_weight: -1.0, + mesh_failure_penalty_decay: 0.5, + + /// Invalid messages + /// This is the number of invalid messages in the topic. + /// The value of the parameter is the square of the counter, decaying with + /// `invalid_message_deliveries_decay`. + /// The weight of the parameter MUST be negative (or zero to disable). + invalid_message_deliveries_weight: -1.0, + invalid_message_deliveries_decay: 0.3, + } +} + +/// This function takes in `max_gossipsub_score` and sets it to `topic_score_cap` +/// The reasoning is because app-specific is set to 0 so the max peer score +/// can be the score of the topic score cap +fn initialize_peer_score_params() -> PeerScoreParams { + PeerScoreParams { + // topics are added later + topics: Default::default(), + + /// Aggregate topic score cap; this limits the total contribution of topics towards a positive + /// score. It must be positive (or 0 for no cap). + topic_score_cap: MAX_GOSSIP_SCORE, + + // Application-specific peer scoring + // We keep app score separate from gossipsub score + app_specific_weight: 0.0, + + /// IP-colocation factor. + /// The parameter has an associated counter which counts the number of peers with the same IP. + /// If the number of peers in the same IP exceeds `ip_colocation_factor_threshold, then the value + /// is the square of the difference, ie `(peers_in_same_ip - ip_colocation_threshold)^2`. + /// If the number of peers in the same IP is less than the threshold, then the value is 0. + /// The weight of the parameter MUST be negative, unless you want to disable for testing. + ip_colocation_factor_weight: -5.0, + ip_colocation_factor_threshold: 10.0, + ip_colocation_factor_whitelist: Default::default(), + + /// Behavioural pattern penalties. + /// This parameter has an associated counter which tracks misbehaviour as detected by the + /// router. The router currently applies penalties for the following behaviors: + /// - attempting to re-graft before the prune backoff time has elapsed. + /// - not following up in IWANT requests for messages advertised with IHAVE. + /// + /// The value of the parameter is the square of the counter over the threshold, which decays + /// with BehaviourPenaltyDecay. + /// The weight of the parameter MUST be negative (or zero to disable). + behaviour_penalty_weight: -10.0, + behaviour_penalty_threshold: 0.0, + behaviour_penalty_decay: 0.2, + + /// The decay interval for parameter counters. + decay_interval: Duration::from_secs(1), + + /// Counter value below which it is considered 0. + decay_to_zero: 0.1, + + /// Time to remember counters for a disconnected peer. + retain_score: Duration::from_secs(3600), + } +} + +fn initialize_peer_score_thresholds() -> PeerScoreThresholds { + PeerScoreThresholds { + /// The score threshold below which gossip propagation is suppressed; + /// should be negative. + gossip_threshold: -10.0, + + /// The score threshold below which we shouldn't publish when using flood + /// publishing (also applies to fanout peers); should be negative and <= `gossip_threshold`. + publish_threshold: -50.0, + + /// The score threshold below which message processing is suppressed altogether, + /// implementing an effective graylist according to peer score; should be negative and + /// <= `publish_threshold`. + graylist_threshold: -80.0, + + /// The score threshold below which px will be ignored; this should be positive + /// and limited to scores attainable by bootstrappers and other trusted nodes. + accept_px_threshold: 10.0, + + /// The median mesh score threshold before triggering opportunistic + /// grafting; this should have a small positive value. + opportunistic_graft_threshold: 20.0, + } +} + /// Given a `P2pConfig` containing `GossipsubConfig` creates a Gossipsub Behaviour pub(crate) fn build_gossipsub_behaviour(p2p_config: &Config) -> Gossipsub { if p2p_config.metrics { @@ -79,9 +224,7 @@ pub(crate) fn build_gossipsub_behaviour(p2p_config: &Config) -> Gossipsub { .set(Box::new(p2p_registry)) .unwrap_or(()); - gossipsub - .with_peer_score(PeerScoreParams::default(), PeerScoreThresholds::default()) - .expect("gossipsub initialized with peer score"); + initialize_gossipsub(&mut gossipsub, p2p_config); gossipsub } else { @@ -91,10 +234,33 @@ pub(crate) fn build_gossipsub_behaviour(p2p_config: &Config) -> Gossipsub { ) .expect("gossipsub initialized"); + initialize_gossipsub(&mut gossipsub, p2p_config); + + gossipsub + } +} + +fn initialize_gossipsub(gossipsub: &mut Gossipsub, p2p_config: &Config) { + let peer_score_params = initialize_peer_score_params(); + + let peer_score_thresholds = initialize_peer_score_thresholds(); + + let topic_score_params = initialize_topic_score_params(); + + gossipsub + .with_peer_score(peer_score_params, peer_score_thresholds) + .expect("gossipsub initialized with peer score"); + + // subscribe to gossipsub topics with the network name suffix + for topic in &p2p_config.topics { + let t: GossipTopic = Topic::new(format!("{}/{}", topic, p2p_config.network_name)); + gossipsub - .with_peer_score(PeerScoreParams::default(), PeerScoreThresholds::default()) - .expect("gossipsub initialized with peer score"); + .set_topic_params(t.clone(), topic_score_params.clone()) + .expect("First time initializing Topic Score"); gossipsub + .subscribe(&t) + .expect("Subscription to Topic: {topic} successful"); } } diff --git a/crates/services/p2p/src/lib.rs b/crates/services/p2p/src/lib.rs index 6e5cc85b..83ad440d 100644 --- a/crates/services/p2p/src/lib.rs +++ b/crates/services/p2p/src/lib.rs @@ -6,6 +6,7 @@ mod gossipsub; mod heartbeat; mod p2p_service; mod peer_manager; +mod peer_report; pub mod ports; mod request_response; pub mod service; diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 772b52be..0ab16544 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -8,7 +8,6 @@ use crate::{ build_transport, Config, }, - discovery::DiscoveryEvent, gossipsub::{ messages::{ GossipsubBroadcastRequest, @@ -17,9 +16,10 @@ use crate::{ topics::GossipsubTopics, }, peer_manager::{ - PeerInfoEvent, - PeerManagerBehaviour, + PeerManager, + Punisher, }, + peer_report::PeerReportEvent, request_response::messages::{ NetworkResponse, OutboundResponse, @@ -31,7 +31,10 @@ use crate::{ }, }; use fuel_core_metrics::p2p_metrics::P2P_METRICS; -use fuel_core_types::fuel_types::BlockHeight; +use fuel_core_types::{ + fuel_types::BlockHeight, + services::p2p::peer_reputation::AppScore, +}; use futures::prelude::*; use libp2p::{ gossipsub::{ @@ -39,7 +42,6 @@ use libp2p::{ GossipsubEvent, MessageAcceptance, MessageId, - Topic, TopicHash, }, multiaddr::Protocol, @@ -63,10 +65,15 @@ use rand::seq::IteratorRandom; use std::collections::HashMap; use tracing::{ debug, - info, warn, }; +impl Punisher for Swarm> { + fn ban_peer(&mut self, peer_id: PeerId) { + self.ban_peer_id(peer_id) + } +} + /// Listens to the events on the p2p network /// And forwards them to the Orchestrator pub struct FuelP2PService { @@ -100,12 +107,26 @@ pub struct FuelP2PService { /// Whether or not metrics collection is enabled metrics: bool, + + /// Holds peers' information, and manages existing connections + peer_manager: PeerManager, +} + +#[derive(Debug)] +struct GossipsubData { + topics: GossipsubTopics, +} + +impl GossipsubData { + pub fn with_topics(topics: GossipsubTopics) -> Self { + Self { topics } + } } /// Holds additional Network data for FuelBehavior #[derive(Debug)] struct NetworkMetadata { - gossipsub_topics: GossipsubTopics, + gossipsub_data: GossipsubData, } #[derive(Debug, Clone)] @@ -133,9 +154,13 @@ impl FuelP2PService { pub fn new(config: Config, codec: Codec) -> Self { let local_peer_id = PeerId::from(config.keypair.public()); + let gossipsub_data = + GossipsubData::with_topics(GossipsubTopics::new(&config.network_name)); + let network_metadata = NetworkMetadata { gossipsub_data }; + // configure and build P2P Service let (transport, connection_state) = build_transport(&config); - let behaviour = FuelBehaviour::new(&config, codec.clone(), connection_state); + let behaviour = FuelBehaviour::new(&config, codec.clone()); let total_connections = { // Reserved nodes do not count against the configured peer input/output limits. @@ -170,21 +195,18 @@ impl FuelP2PService { .connection_limits(connection_limits) .build(); - // subscribe to gossipsub topics with the network name suffix - for topic in config.topics { - let t = Topic::new(format!("{}/{}", topic, config.network_name)); - swarm.behaviour_mut().subscribe_to_topic(&t).unwrap(); - } - - let gossipsub_topics = GossipsubTopics::new(&config.network_name); - let network_metadata = NetworkMetadata { gossipsub_topics }; - let metrics = config.metrics; if let Some(public_address) = config.public_address { let _ = swarm.add_external_address(public_address, AddressScore::Infinite); } + let reserved_peers = config + .reserved_nodes + .iter() + .filter_map(PeerId::try_from_multiaddr) + .collect(); + Self { local_peer_id, local_address: config.address, @@ -195,6 +217,11 @@ impl FuelP2PService { inbound_requests_table: HashMap::default(), network_metadata, metrics, + peer_manager: PeerManager::new( + reserved_peers, + connection_state, + config.max_peers_connected as usize, + ), } } @@ -222,7 +249,7 @@ impl FuelP2PService { } pub fn get_peers_ids(&self) -> impl Iterator { - self.swarm.behaviour().get_peers_ids() + self.peer_manager.get_peers_ids() } pub fn publish_message( @@ -231,7 +258,8 @@ impl FuelP2PService { ) -> Result { let topic = self .network_metadata - .gossipsub_topics + .gossipsub_data + .topics .get_gossipsub_topic(&message); match self.network_codec.encode(message) { @@ -255,7 +283,7 @@ impl FuelP2PService { Some(peer_id) => peer_id, _ => { let peers = self.get_peers_ids(); - let peers_count = self.swarm.behaviour().total_peers_connected(); + let peers_count = self.peer_manager.total_peers_connected(); if peers_count == 0 { return Err(RequestError::NoPeersConnected) @@ -311,21 +339,47 @@ impl FuelP2PService { Ok(()) } + pub fn update_block_height(&mut self, block_height: BlockHeight) { + self.swarm.behaviour_mut().update_block_height(block_height) + } + + /// The report is forwarded to gossipsub behaviour + /// If acceptance is "Rejected" the gossipsub peer score is calculated + /// And if it's below allowed threshold the peer is banned pub fn report_message_validation_result( &mut self, msg_id: &MessageId, - propagation_source: &PeerId, + propagation_source: PeerId, acceptance: MessageAcceptance, - ) -> Result { - self.swarm.behaviour_mut().report_message_validation_result( - msg_id, - propagation_source, - acceptance, - ) + ) { + if let Some(gossip_score) = self + .swarm + .behaviour_mut() + .report_message_validation_result(msg_id, &propagation_source, acceptance) + { + self.peer_manager.handle_gossip_score_update( + propagation_source, + gossip_score, + &mut self.swarm, + ); + } } - pub fn update_block_height(&mut self, block_height: BlockHeight) { - self.swarm.behaviour_mut().update_block_height(block_height) + /// Report application score + /// If application peer score is below allowed threshold + /// the peer is banend + pub fn report_peer( + &mut self, + peer_id: PeerId, + app_score: AppScore, + reporting_service: &str, + ) { + self.peer_manager.update_app_score( + peer_id, + app_score, + reporting_service, + &mut self.swarm, + ); } #[tracing::instrument(skip_all, @@ -363,8 +417,8 @@ impl FuelP2PService { } } - pub fn peer_manager(&self) -> &PeerManagerBehaviour { - self.swarm.behaviour().peer_manager() + pub fn peer_manager(&self) -> &PeerManager { + &self.peer_manager } fn handle_behaviour_event( @@ -372,100 +426,113 @@ impl FuelP2PService { event: FuelBehaviourEvent, ) -> Option { match event { - FuelBehaviourEvent::Discovery(discovery_event) => { - if let DiscoveryEvent::PeerInfoOnConnect { peer_id, addresses } = - discovery_event + FuelBehaviourEvent::Gossipsub(GossipsubEvent::Message { + propagation_source, + message, + message_id, + }) => { + if let Some(correct_topic) = self + .network_metadata + .gossipsub_data + .topics + .get_gossipsub_tag(&message.topic) { - self.swarm - .behaviour_mut() - .add_addresses_to_peer_info(&peer_id, addresses); - } - } - FuelBehaviourEvent::Gossipsub(gossipsub_event) => { - if let GossipsubEvent::Message { - propagation_source, - message, - message_id, - } = gossipsub_event - { - if let Some(correct_topic) = self - .network_metadata - .gossipsub_topics - .get_gossipsub_tag(&message.topic) - { - match self.network_codec.decode(&message.data, correct_topic) { - Ok(decoded_message) => { - return Some(FuelP2PEvent::GossipsubMessage { - peer_id: propagation_source, - message_id, - topic_hash: message.topic, - message: decoded_message, - }) - } - Err(err) => { - warn!(target: "fuel-libp2p", "Failed to decode a message. ID: {}, Message: {:?} with error: {:?}", message_id, &message.data, err); - - match self.report_message_validation_result( - &message_id, - &propagation_source, - MessageAcceptance::Reject, - ) { - Ok(false) => { - warn!(target: "fuel-libp2p", "Message was not found in the cache, peer with PeerId: {} has been reported.", propagation_source); - } - Ok(true) => { - info!(target: "fuel-libp2p", "Message found in the cache, peer with PeerId: {} has been reported.", propagation_source); - } - Err(e) => { - warn!(target: "fuel-libp2p", "Failed to publish the message with following error: {:?}.", e); - } - } - } + match self.network_codec.decode(&message.data, correct_topic) { + Ok(decoded_message) => { + return Some(FuelP2PEvent::GossipsubMessage { + peer_id: propagation_source, + message_id, + topic_hash: message.topic, + message: decoded_message, + }) + } + Err(err) => { + warn!(target: "fuel-p2p", "Failed to decode a message. ID: {}, Message: {:?} with error: {:?}", message_id, &message.data, err); + + self.report_message_validation_result( + &message_id, + propagation_source, + MessageAcceptance::Reject, + ); } - } else { - warn!(target: "fuel-libp2p", "GossipTopicTag does not exist for {:?}", &message.topic); } + } else { + warn!(target: "fuel-p2p", "GossipTopicTag does not exist for {:?}", &message.topic); } } - FuelBehaviourEvent::PeerInfo(peer_info_event) => match peer_info_event { - PeerInfoEvent::PeerIdentified { peer_id, addresses } => { - if self.metrics { - P2P_METRICS.unique_peers.inc(); + FuelBehaviourEvent::PeerReport(peer_report_event) => { + match peer_report_event { + PeerReportEvent::PeerIdentified { + peer_id, + addresses, + agent_version, + } => { + if self.metrics { + P2P_METRICS.unique_peers.inc(); + } + + self.peer_manager.handle_peer_identified( + &peer_id, + addresses.clone(), + agent_version, + ); + + self.swarm + .behaviour_mut() + .add_addresses_to_discovery(&peer_id, addresses); } - self.swarm - .behaviour_mut() - .add_addresses_to_discovery(&peer_id, addresses); - } - PeerInfoEvent::PeerInfoUpdated { - peer_id, - block_height, - } => { - return Some(FuelP2PEvent::PeerInfoUpdated { + PeerReportEvent::PerformDecay => { + self.peer_manager.batch_update_score_with_decay() + } + PeerReportEvent::CheckReservedNodesHealth => { + let disconnected_peers: Vec<_> = self + .peer_manager + .get_disconnected_reserved_peers() + .copied() + .collect(); + + for peer_id in disconnected_peers { + debug!(target: "fuel-p2p", "Trying to reconnect to reserved peer {:?}", peer_id); + + let _ = self.swarm.dial(peer_id); + } + } + PeerReportEvent::PeerInfoUpdated { peer_id, block_height, - }) - } - PeerInfoEvent::PeerConnected(peer_id) => { - return Some(FuelP2PEvent::PeerConnected(peer_id)) - } - PeerInfoEvent::ReconnectToPeer(peer_id) => { - let _ = self.swarm.dial(peer_id); - } - PeerInfoEvent::PeerDisconnected { - peer_id, - should_reconnect, - } => { - if should_reconnect { - let _ = self.swarm.dial(peer_id); + } => { + self.peer_manager + .handle_peer_info_updated(&peer_id, block_height); + + return Some(FuelP2PEvent::PeerInfoUpdated { + peer_id, + block_height, + }) + } + PeerReportEvent::PeerConnected { + peer_id, + addresses, + initial_connection, + } => { + if self.peer_manager.handle_peer_connected( + &peer_id, + addresses, + initial_connection, + ) { + let _ = self.swarm.disconnect_peer_id(peer_id); + } else if initial_connection { + return Some(FuelP2PEvent::PeerConnected(peer_id)) + } + } + PeerReportEvent::PeerDisconnected { peer_id } => { + if self.peer_manager.handle_peer_disconnect(peer_id) { + let _ = self.swarm.dial(peer_id); + } + return Some(FuelP2PEvent::PeerDisconnected(peer_id)) } - return Some(FuelP2PEvent::PeerDisconnected(peer_id)) - } - PeerInfoEvent::TooManyPeers { peer_to_disconnect } => { - // disconnect the surplus peer - let _ = self.swarm.disconnect_peer_id(peer_to_disconnect); } - }, + } FuelBehaviourEvent::RequestResponse(req_res_event) => match req_res_event { RequestResponseEvent::Message { peer, message } => match message { RequestResponseMessage::Request { @@ -550,6 +617,8 @@ impl FuelP2PService { } _ => {} }, + + _ => {} } None @@ -725,9 +794,10 @@ mod tests { // listener address registered, we are good to go break } + SwarmEvent::Behaviour(_) => {} other_event => { tracing::error!("Unexpected event: {:?}", other_event); - panic!("Unexpected event") + panic!("Unexpected event {other_event:?}") } } } @@ -799,7 +869,7 @@ mod tests { tokio::select! { sentry_node_event = sentry_node.next_event() => { // we've connected to all other peers - if sentry_node.swarm.behaviour().total_peers_connected() >= 5 { + if sentry_node.peer_manager.total_peers_connected() >= 5 { // if the `reserved_node` is not included, // create and insert it, to be polled with rest of the nodes if !all_nodes_ids @@ -1156,7 +1226,7 @@ mod tests { tokio::select! { node_a_event = node_a.next_event() => { if let Some(FuelP2PEvent::PeerInfoUpdated { peer_id, block_height: _ }) = node_a_event { - if let Some(PeerInfo { peer_addresses, heartbeat_data, client_version, .. }) = node_a.swarm.behaviour().get_peer_info(&peer_id) { + if let Some(PeerInfo { peer_addresses, heartbeat_data, client_version, .. }) = node_a.peer_manager.get_peer_info(&peer_id) { // Exits after it verifies that: // 1. Peer Addresses are known // 2. Client Version is known @@ -1281,7 +1351,7 @@ mod tests { tokio::select! { node_a_event = node_a.next_event() => { if let Some(FuelP2PEvent::PeerInfoUpdated { peer_id, block_height: _ }) = node_a_event { - if let Some(PeerInfo { peer_addresses, .. }) = node_a.swarm.behaviour().get_peer_info(&peer_id) { + if let Some(PeerInfo { peer_addresses, .. }) = node_a.peer_manager.get_peer_info(&peer_id) { // verifies that we've got at least a single peer address to send message to if !peer_addresses.is_empty() && !message_sent { message_sent = true; @@ -1299,7 +1369,7 @@ mod tests { // If it's `Accept`, Node B will propagate the message to Node C // If it's `Ignore` or `Reject`, Node C should not receive anything let msg_acceptance = to_message_acceptance(&acceptance); - let _ = node_b.report_message_validation_result(&message_id, &peer_id, msg_acceptance); + node_b.report_message_validation_result(&message_id, peer_id, msg_acceptance); if topic_hash != selected_topic.hash() { tracing::error!("Wrong topic hash, expected: {} - actual: {}", selected_topic.hash(), topic_hash); panic!("Wrong Topic"); @@ -1388,7 +1458,7 @@ mod tests { } node_a_event = node_a.next_event() => { if let Some(FuelP2PEvent::PeerInfoUpdated { peer_id, block_height: _ }) = node_a_event { - if let Some(PeerInfo { peer_addresses, .. }) = node_a.swarm.behaviour().get_peer_info(&peer_id) { + if let Some(PeerInfo { peer_addresses, .. }) = node_a.peer_manager.get_peer_info(&peer_id) { // 0. verifies that we've got at least a single peer address to request message from if !peer_addresses.is_empty() && !request_sent { request_sent = true; @@ -1533,7 +1603,7 @@ mod tests { tokio::select! { node_a_event = node_a.next_event() => { if let Some(FuelP2PEvent::PeerInfoUpdated { peer_id, block_height: _ }) = node_a_event { - if let Some(PeerInfo { peer_addresses, .. }) = node_a.swarm.behaviour().get_peer_info(&peer_id) { + if let Some(PeerInfo { peer_addresses, .. }) = node_a.peer_manager.get_peer_info(&peer_id) { // 0. verifies that we've got at least a single peer address to request message from if !peer_addresses.is_empty() && !request_sent { request_sent = true; diff --git a/crates/services/p2p/src/peer_manager.rs b/crates/services/p2p/src/peer_manager.rs index 6f7c5d16..d9eb082f 100644 --- a/crates/services/p2p/src/peer_manager.rs +++ b/crates/services/p2p/src/peer_manager.rs @@ -1,523 +1,53 @@ -use crate::{ - config::Config, - heartbeat::{ - Heartbeat, - HeartbeatEvent, +use fuel_core_types::{ + fuel_types::BlockHeight, + services::p2p::peer_reputation::{ + AppScore, + DECAY_APP_SCORE, + DEFAULT_APP_SCORE, + MAX_APP_SCORE, + MIN_APP_SCORE, }, }; -use fuel_core_types::fuel_types::BlockHeight; use libp2p::{ - core::{ - connection::ConnectionId, - either::EitherOutput, - }, - identify::{ - Behaviour as Identify, - Config as IdentifyConfig, - Event as IdentifyEvent, - Info as IdentifyInfo, - }, - swarm::{ - derive_prelude::{ - ConnectionClosed, - ConnectionEstablished, - DialFailure, - FromSwarm, - ListenFailure, - }, - ConnectionHandler, - IntoConnectionHandler, - IntoConnectionHandlerSelect, - NetworkBehaviour, - NetworkBehaviourAction, - PollParameters, - }, Multiaddr, PeerId, }; use rand::seq::IteratorRandom; - use std::{ collections::{ HashMap, HashSet, - VecDeque, }, sync::{ Arc, RwLock, }, - task::{ - Context, - Poll, - }, - time::{ - Duration, - Instant, - }, + time::Duration, +}; +use tokio::time::Instant; +use tracing::{ + debug, + info, }; -use tokio::time::Interval; -use tracing::debug; - -/// Maximum amount of peer's addresses that we are ready to store per peer -const MAX_IDENTIFY_ADDRESSES: usize = 10; -const HEALTH_CHECK_INTERVAL_IN_SECONDS: u64 = 10; -/// Events emitted by PeerInfoBehaviour +// Info about a single Peer that we're connected to #[derive(Debug, Clone)] -pub enum PeerInfoEvent { - PeerConnected(PeerId), - PeerDisconnected { - peer_id: PeerId, - should_reconnect: bool, - }, - TooManyPeers { - peer_to_disconnect: PeerId, - }, - ReconnectToPeer(PeerId), - PeerIdentified { - peer_id: PeerId, - addresses: Vec, - }, - PeerInfoUpdated { - peer_id: PeerId, - block_height: BlockHeight, - }, -} - -// `Behaviour` that holds info about peers -pub struct PeerManagerBehaviour { - heartbeat: Heartbeat, - identify: Identify, - peer_manager: PeerManager, - // regulary checks if reserved nodes are connected - health_check: Interval, +pub struct PeerInfo { + pub peer_addresses: HashSet, + pub client_version: Option, + pub heartbeat_data: HeartbeatData, + pub score: AppScore, } -impl PeerManagerBehaviour { - pub(crate) fn new( - config: &Config, - connection_state: Arc>, - ) -> Self { - let identify = { - let identify_config = - IdentifyConfig::new("/fuel/1.0".to_string(), config.keypair.public()); - if let Some(interval) = config.identify_interval { - Identify::new(identify_config.with_interval(interval)) - } else { - Identify::new(identify_config) - } - }; - - let heartbeat = - Heartbeat::new(config.heartbeat_config.clone(), BlockHeight::default()); - - let reserved_peers: HashSet = config - .reserved_nodes - .iter() - .filter_map(PeerId::try_from_multiaddr) - .collect(); - - let peer_manager = PeerManager::new( - reserved_peers, - connection_state, - config.max_peers_connected as usize, - ); - +impl Default for PeerInfo { + fn default() -> Self { Self { - heartbeat, - identify, - peer_manager, - health_check: tokio::time::interval(Duration::from_secs( - HEALTH_CHECK_INTERVAL_IN_SECONDS, - )), + score: DEFAULT_APP_SCORE, + client_version: Default::default(), + heartbeat_data: Default::default(), + peer_addresses: Default::default(), } } - - pub fn total_peers_connected(&self) -> usize { - self.peer_manager.total_peers_connected() - } - - /// returns an iterator over the connected peers - pub fn get_peers_ids(&self) -> impl Iterator { - self.peer_manager.get_peers_ids() - } - - pub fn get_peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> { - self.peer_manager.get_peer_info(peer_id) - } - - pub fn insert_peer_addresses(&mut self, peer_id: &PeerId, addresses: Vec) { - self.peer_manager - .insert_peer_info(peer_id, PeerInfoInsert::Addresses(addresses)); - } - - pub fn update_block_height(&mut self, block_height: BlockHeight) { - self.heartbeat.update_block_height(block_height); - } - - /// Find a peer that is holding the given block height. - pub fn get_peer_id_with_height(&self, height: &BlockHeight) -> Option { - self.peer_manager.get_peer_id_with_height(height) - } -} - -impl NetworkBehaviour for PeerManagerBehaviour { - type ConnectionHandler = IntoConnectionHandlerSelect< - ::ConnectionHandler, - ::ConnectionHandler, - >; - type OutEvent = PeerInfoEvent; - - fn new_handler(&mut self) -> Self::ConnectionHandler { - IntoConnectionHandler::select( - self.heartbeat.new_handler(), - self.identify.new_handler(), - ) - } - - fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - self.identify.addresses_of_peer(peer_id) - } - - fn on_swarm_event(&mut self, event: FromSwarm) { - match event { - FromSwarm::ConnectionEstablished(connection_established) => { - let ConnectionEstablished { - peer_id, - other_established, - .. - } = connection_established; - - self.heartbeat - .on_swarm_event(FromSwarm::ConnectionEstablished( - connection_established, - )); - self.identify - .on_swarm_event(FromSwarm::ConnectionEstablished( - connection_established, - )); - - if other_established == 0 { - // this is the first connection to a given Peer - self.peer_manager.handle_initial_connection(peer_id); - } - - let addresses = self.addresses_of_peer(&peer_id); - self.insert_peer_addresses(&peer_id, addresses); - } - FromSwarm::ConnectionClosed(connection_closed) => { - let ConnectionClosed { - remaining_established, - peer_id, - connection_id, - endpoint, - .. - } = connection_closed; - - let (ping_handler, identity_handler) = - connection_closed.handler.into_inner(); - - let ping_event = ConnectionClosed { - handler: ping_handler, - peer_id, - connection_id, - endpoint, - remaining_established, - }; - self.heartbeat - .on_swarm_event(FromSwarm::ConnectionClosed(ping_event)); - - let identify_event = ConnectionClosed { - handler: identity_handler, - peer_id, - connection_id, - endpoint, - remaining_established, - }; - - self.identify - .on_swarm_event(FromSwarm::ConnectionClosed(identify_event)); - - if remaining_established == 0 { - // this was the last connection to a given Peer - self.peer_manager.handle_peer_disconnect(peer_id); - } - } - FromSwarm::AddressChange(e) => { - self.heartbeat.on_swarm_event(FromSwarm::AddressChange(e)); - self.identify.on_swarm_event(FromSwarm::AddressChange(e)); - } - FromSwarm::DialFailure(e) => { - let (ping_handler, identity_handler) = e.handler.into_inner(); - let ping_event = DialFailure { - peer_id: e.peer_id, - handler: ping_handler, - error: e.error, - }; - let identity_event = DialFailure { - peer_id: e.peer_id, - handler: identity_handler, - error: e.error, - }; - self.heartbeat - .on_swarm_event(FromSwarm::DialFailure(ping_event)); - self.identify - .on_swarm_event(FromSwarm::DialFailure(identity_event)); - } - FromSwarm::ListenFailure(e) => { - let (ping_handler, identity_handler) = e.handler.into_inner(); - let ping_event = ListenFailure { - handler: ping_handler, - local_addr: e.local_addr, - send_back_addr: e.send_back_addr, - }; - let identity_event = ListenFailure { - handler: identity_handler, - local_addr: e.local_addr, - send_back_addr: e.send_back_addr, - }; - self.heartbeat - .on_swarm_event(FromSwarm::ListenFailure(ping_event)); - self.identify - .on_swarm_event(FromSwarm::ListenFailure(identity_event)); - } - FromSwarm::NewListener(e) => { - self.heartbeat.on_swarm_event(FromSwarm::NewListener(e)); - self.identify.on_swarm_event(FromSwarm::NewListener(e)); - } - FromSwarm::ExpiredListenAddr(e) => { - self.heartbeat - .on_swarm_event(FromSwarm::ExpiredListenAddr(e)); - self.identify - .on_swarm_event(FromSwarm::ExpiredListenAddr(e)); - } - FromSwarm::ListenerError(e) => { - self.heartbeat.on_swarm_event(FromSwarm::ListenerError(e)); - self.identify.on_swarm_event(FromSwarm::ListenerError(e)); - } - FromSwarm::ListenerClosed(e) => { - self.heartbeat.on_swarm_event(FromSwarm::ListenerClosed(e)); - self.identify.on_swarm_event(FromSwarm::ListenerClosed(e)); - } - FromSwarm::NewExternalAddr(e) => { - self.heartbeat.on_swarm_event(FromSwarm::NewExternalAddr(e)); - self.identify.on_swarm_event(FromSwarm::NewExternalAddr(e)); - } - FromSwarm::ExpiredExternalAddr(e) => { - self.heartbeat - .on_swarm_event(FromSwarm::ExpiredExternalAddr(e)); - self.identify - .on_swarm_event(FromSwarm::ExpiredExternalAddr(e)); - } - FromSwarm::NewListenAddr(e) => { - self.heartbeat.on_swarm_event(FromSwarm::NewListenAddr(e)); - self.identify.on_swarm_event(FromSwarm::NewListenAddr(e)); - } - } - } - - fn poll( - &mut self, - cx: &mut Context<'_>, - params: &mut impl PollParameters, - ) -> Poll> { - if self.health_check.poll_tick(cx).is_ready() { - let disconnected_peers: Vec<_> = self - .peer_manager - .get_disconnected_reserved_peers() - .copied() - .collect(); - - for peer_id in disconnected_peers { - debug!(target: "fuel-libp2p", "Trying to reconnect to reserved peer {:?}", peer_id); - - self.peer_manager - .pending_events - .push_back(PeerInfoEvent::ReconnectToPeer(peer_id)); - } - } - - if let Some(event) = self.peer_manager.pending_events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) - } - - match self.heartbeat.poll(cx, params) { - Poll::Pending => {} - Poll::Ready(NetworkBehaviourAction::NotifyHandler { - peer_id, - handler, - event, - }) => { - return Poll::Ready(NetworkBehaviourAction::NotifyHandler { - peer_id, - handler, - event: EitherOutput::First(event), - }) - } - Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { - address, - score, - }) => { - return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { - address, - score, - }) - } - Poll::Ready(NetworkBehaviourAction::CloseConnection { - peer_id, - connection, - }) => { - return Poll::Ready(NetworkBehaviourAction::CloseConnection { - peer_id, - connection, - }) - } - Poll::Ready(NetworkBehaviourAction::Dial { handler, opts }) => { - let handler = - IntoConnectionHandler::select(handler, self.identify.new_handler()); - - return Poll::Ready(NetworkBehaviourAction::Dial { handler, opts }) - } - Poll::Ready(NetworkBehaviourAction::GenerateEvent(HeartbeatEvent { - peer_id, - latest_block_height, - })) => { - let heartbeat_data = HeartbeatData::new(latest_block_height); - - if let Some(previous_heartbeat) = self - .get_peer_info(&peer_id) - .and_then(|info| info.heartbeat_data.seconds_since_last_heartbeat()) - { - debug!(target: "fuel-libp2p", "Previous hearbeat happened {:?} seconds ago", previous_heartbeat); - } - - self.peer_manager.insert_peer_info( - &peer_id, - PeerInfoInsert::HeartbeatData(heartbeat_data), - ); - - let event = PeerInfoEvent::PeerInfoUpdated { - peer_id, - block_height: latest_block_height, - }; - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) - } - } - - loop { - match self.identify.poll(cx, params) { - Poll::Pending => break, - Poll::Ready(NetworkBehaviourAction::NotifyHandler { - peer_id, - handler, - event, - }) => { - return Poll::Ready(NetworkBehaviourAction::NotifyHandler { - peer_id, - handler, - event: EitherOutput::Second(event), - }) - } - Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { - address, - score, - }) => { - return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { - address, - score, - }) - } - Poll::Ready(NetworkBehaviourAction::CloseConnection { - peer_id, - connection, - }) => { - return Poll::Ready(NetworkBehaviourAction::CloseConnection { - peer_id, - connection, - }) - } - Poll::Ready(NetworkBehaviourAction::Dial { handler, opts }) => { - let handler = IntoConnectionHandler::select( - self.heartbeat.new_handler(), - handler, - ); - return Poll::Ready(NetworkBehaviourAction::Dial { handler, opts }) - } - Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => { - match event { - IdentifyEvent::Received { - peer_id, - info: - IdentifyInfo { - protocol_version, - agent_version, - mut listen_addrs, - .. - }, - } => { - if listen_addrs.len() > MAX_IDENTIFY_ADDRESSES { - debug!( - target: "fuel-libp2p", - "Node {:?} has reported more than {} addresses; it is identified by {:?} and {:?}", - peer_id, MAX_IDENTIFY_ADDRESSES, protocol_version, agent_version - ); - listen_addrs.truncate(MAX_IDENTIFY_ADDRESSES); - } - - self.peer_manager.insert_peer_info( - &peer_id, - PeerInfoInsert::ClientVersion(agent_version), - ); - self.peer_manager.insert_peer_info( - &peer_id, - PeerInfoInsert::Addresses(listen_addrs.clone()), - ); - - let event = PeerInfoEvent::PeerIdentified { - peer_id, - addresses: listen_addrs, - }; - return Poll::Ready(NetworkBehaviourAction::GenerateEvent( - event, - )) - } - IdentifyEvent::Error { peer_id, error } => { - debug!(target: "fuel-libp2p", "Identification with peer {:?} failed => {}", peer_id, error) - } - _ => {} - } - } - } - } - - Poll::Pending - } - - fn on_connection_handler_event( - &mut self, - peer_id: PeerId, - connection_id: ConnectionId, - event: <::Handler as - ConnectionHandler>::OutEvent, - ) { - match event { - EitherOutput::First(heartbeat_event) => self - .heartbeat - .on_connection_handler_event(peer_id, connection_id, heartbeat_event), - EitherOutput::Second(identify_event) => self - .identify - .on_connection_handler_event(peer_id, connection_id, identify_event), - } - } -} - -// Info about a single Peer that we're connected to -#[derive(Debug, Default, Clone)] -pub struct PeerInfo { - pub peer_addresses: HashSet, - pub client_version: Option, - pub heartbeat_data: HeartbeatData, } enum PeerInfoInsert { @@ -527,9 +57,9 @@ enum PeerInfoInsert { } /// Manages Peers and their events -#[derive(Debug, Default, Clone)] -struct PeerManager { - pending_events: VecDeque, +#[derive(Debug)] +pub struct PeerManager { + score_config: ScoreConfig, non_reserved_connected_peers: HashMap, reserved_connected_peers: HashMap, reserved_peers: HashSet, @@ -538,13 +68,13 @@ struct PeerManager { } impl PeerManager { - fn new( + pub fn new( reserved_peers: HashSet, connection_state: Arc>, max_non_reserved_peers: usize, ) -> Self { Self { - pending_events: VecDeque::default(), + score_config: ScoreConfig::default(), non_reserved_connected_peers: HashMap::with_capacity(max_non_reserved_peers), reserved_connected_peers: HashMap::with_capacity(reserved_peers.len()), reserved_peers, @@ -553,98 +83,125 @@ impl PeerManager { } } - fn total_peers_connected(&self) -> usize { - self.reserved_connected_peers.len() + self.non_reserved_connected_peers.len() - } - - fn get_peers_ids(&self) -> impl Iterator { - self.non_reserved_connected_peers - .keys() - .chain(self.reserved_connected_peers.keys()) + pub fn handle_gossip_score_update( + &self, + peer_id: PeerId, + gossip_score: f64, + punisher: &mut T, + ) { + if gossip_score < self.score_config.min_gossip_score_allowed + && !self.reserved_peers.contains(&peer_id) + { + punisher.ban_peer(peer_id); + } } - fn get_peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> { - if self.reserved_peers.contains(peer_id) { - return self.reserved_connected_peers.get(peer_id) + pub fn handle_peer_info_updated( + &mut self, + peer_id: &PeerId, + block_height: BlockHeight, + ) { + if let Some(previous_heartbeat) = self + .get_peer_info(peer_id) + .and_then(|info| info.heartbeat_data.seconds_since_last_heartbeat()) + { + debug!(target: "fuel-p2p", "Previous hearbeat happened {:?} seconds ago", previous_heartbeat); } - self.non_reserved_connected_peers.get(peer_id) + + let heartbeat_data = HeartbeatData::new(block_height); + + self.insert_peer_info(peer_id, PeerInfoInsert::HeartbeatData(heartbeat_data)); } - fn insert_peer_info(&mut self, peer_id: &PeerId, data: PeerInfoInsert) { - let peers = if self.reserved_peers.contains(peer_id) { - &mut self.reserved_connected_peers + /// Returns `true` signaling that the peer should be disconnected + pub fn handle_peer_connected( + &mut self, + peer_id: &PeerId, + addresses: Vec, + initial_connection: bool, + ) -> bool { + if initial_connection { + self.handle_initial_connection(peer_id, addresses) } else { - &mut self.non_reserved_connected_peers - }; - match data { - PeerInfoInsert::Addresses(addresses) => { - insert_peer_addresses(peers, peer_id, addresses) - } - PeerInfoInsert::ClientVersion(client_version) => { - insert_client_version(peers, peer_id, client_version) - } - PeerInfoInsert::HeartbeatData(block_height) => { - insert_heartbeat_data(peers, peer_id, block_height) - } + self.insert_peer_info(peer_id, PeerInfoInsert::Addresses(addresses)); + false } } - fn get_disconnected_reserved_peers(&self) -> impl Iterator { - self.reserved_peers - .iter() - .filter(|peer_id| !self.reserved_connected_peers.contains_key(peer_id)) + pub fn handle_peer_identified( + &mut self, + peer_id: &PeerId, + addresses: Vec, + agent_version: String, + ) { + self.insert_peer_info(peer_id, PeerInfoInsert::ClientVersion(agent_version)); + self.insert_peer_info(peer_id, PeerInfoInsert::Addresses(addresses)); } - /// Handles the first connnection established with a Peer - fn handle_initial_connection(&mut self, peer_id: PeerId) { - let non_reserved_peers_connected = self.non_reserved_connected_peers.len(); + pub fn batch_update_score_with_decay(&mut self) { + for peer_info in self.non_reserved_connected_peers.values_mut() { + peer_info.score *= DECAY_APP_SCORE; + } + } - // if the connected Peer is not from the reserved peers - if !self.reserved_peers.contains(&peer_id) { - // check if all the slots are already taken - if non_reserved_peers_connected >= self.max_non_reserved_peers { - // Too many peers already connected, disconnect the Peer with the first priority. - self.pending_events.push_front(PeerInfoEvent::TooManyPeers { - peer_to_disconnect: peer_id, - }); - - // early exit, we don't want to report new peer connection - // nor insert it in `connected_peers` - // since we're going to disconnect it anyways - return - } + pub fn update_app_score( + &mut self, + peer_id: PeerId, + score: AppScore, + reporting_service: &str, + punisher: &mut T, + ) { + if let Some(peer) = self.non_reserved_connected_peers.get_mut(&peer_id) { + // score should not go over `max_score` + let new_score = self.score_config.max_app_score.min(peer.score + score); + peer.score = new_score; - if non_reserved_peers_connected + 1 == self.max_non_reserved_peers { - // this is the last non-reserved peer allowed - if let Ok(mut connection_state) = self.connection_state.write() { - connection_state.deny_new_peers(); - } - } + info!(target: "fuel-p2p", "{reporting_service} updated {peer_id} with new score {score}"); - self.non_reserved_connected_peers - .insert(peer_id, PeerInfo::default()); + if new_score < self.score_config.min_app_score_allowed { + punisher.ban_peer(peer_id); + } } else { - self.reserved_connected_peers - .insert(peer_id, PeerInfo::default()); + log_missing_peer(&peer_id); } + } + + pub fn total_peers_connected(&self) -> usize { + self.reserved_connected_peers.len() + self.non_reserved_connected_peers.len() + } - self.pending_events - .push_back(PeerInfoEvent::PeerConnected(peer_id)); + pub fn get_peers_ids(&self) -> impl Iterator { + self.non_reserved_connected_peers + .keys() + .chain(self.reserved_connected_peers.keys()) + } + + pub fn get_peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> { + if self.reserved_peers.contains(peer_id) { + return self.reserved_connected_peers.get(peer_id) + } + self.non_reserved_connected_peers.get(peer_id) + } + + pub fn get_disconnected_reserved_peers(&self) -> impl Iterator { + self.reserved_peers + .iter() + .filter(|peer_id| !self.reserved_connected_peers.contains_key(peer_id)) } /// Handles on peer's last connection getting disconnected - fn handle_peer_disconnect(&mut self, peer_id: PeerId) { + /// Returns 'true' signaling we should try reconnecting + pub fn handle_peer_disconnect(&mut self, peer_id: PeerId) -> bool { // try immediate reconnect if it's a reserved peer let is_reserved = self.reserved_peers.contains(&peer_id); - let is_removed; if !is_reserved { - is_removed = self.non_reserved_connected_peers.remove(&peer_id).is_some(); + // check were all the slots taken prior to this disconnect + let all_slots_taken = self.max_non_reserved_peers + == self.non_reserved_connected_peers.len() + 1; - // check were all the slots full prior to this disconnect - if is_removed - && self.max_non_reserved_peers - == self.non_reserved_connected_peers.len() + 1 + if self.non_reserved_connected_peers.remove(&peer_id).is_some() + && all_slots_taken { // since all the slots were full prior to this disconnect // let's allow new peer non-reserved peers connections @@ -652,16 +209,10 @@ impl PeerManager { connection_state.allow_new_peers(); } } - } else { - is_removed = self.reserved_connected_peers.remove(&peer_id).is_some(); - } - if is_removed { - self.pending_events - .push_back(PeerInfoEvent::PeerDisconnected { - peer_id, - should_reconnect: is_reserved, - }) + false + } else { + self.reserved_connected_peers.remove(&peer_id).is_some() } } @@ -680,10 +231,63 @@ impl PeerManager { .map(|(peer_id, _)| *peer_id) .choose(&mut range) } + + /// Handles the first connnection established with a Peer + fn handle_initial_connection( + &mut self, + peer_id: &PeerId, + addresses: Vec, + ) -> bool { + // if the connected Peer is not from the reserved peers + if !self.reserved_peers.contains(peer_id) { + let non_reserved_peers_connected = self.non_reserved_connected_peers.len(); + // check if all the slots are already taken + if non_reserved_peers_connected >= self.max_non_reserved_peers { + // Too many peers already connected, disconnect the Peer + return true + } + + if non_reserved_peers_connected + 1 == self.max_non_reserved_peers { + // this is the last non-reserved peer allowed + if let Ok(mut connection_state) = self.connection_state.write() { + connection_state.deny_new_peers(); + } + } + + self.non_reserved_connected_peers + .insert(*peer_id, PeerInfo::default()); + } else { + self.reserved_connected_peers + .insert(*peer_id, PeerInfo::default()); + } + + self.insert_peer_info(peer_id, PeerInfoInsert::Addresses(addresses)); + + false + } + + fn insert_peer_info(&mut self, peer_id: &PeerId, data: PeerInfoInsert) { + let peers = if self.reserved_peers.contains(peer_id) { + &mut self.reserved_connected_peers + } else { + &mut self.non_reserved_connected_peers + }; + match data { + PeerInfoInsert::Addresses(addresses) => { + insert_peer_addresses(peers, peer_id, addresses) + } + PeerInfoInsert::ClientVersion(client_version) => { + insert_client_version(peers, peer_id, client_version) + } + PeerInfoInsert::HeartbeatData(block_height) => { + insert_heartbeat_data(peers, peer_id, block_height) + } + } + } } #[derive(Debug, Default, Clone, Copy)] -pub(crate) struct ConnectionState { +pub struct ConnectionState { peers_allowed: bool, } @@ -694,7 +298,7 @@ pub struct HeartbeatData { } impl HeartbeatData { - fn new(block_height: BlockHeight) -> Self { + pub fn new(block_height: BlockHeight) -> Self { Self { block_height: Some(block_height), last_heartbeat: Some(Instant::now()), @@ -765,7 +369,34 @@ fn insert_client_version( } fn log_missing_peer(peer_id: &PeerId) { - debug!(target: "fuel-libp2p", "Peer with PeerId: {:?} is not among the connected peers", peer_id) + debug!(target: "fuel-p2p", "Peer with PeerId: {:?} is not among the connected peers", peer_id) +} + +#[derive(Clone, Debug, Copy)] +struct ScoreConfig { + max_app_score: AppScore, + min_app_score_allowed: AppScore, + min_gossip_score_allowed: f64, +} + +impl Default for ScoreConfig { + fn default() -> Self { + Self::new() + } +} + +impl ScoreConfig { + pub fn new() -> Self { + Self { + max_app_score: MAX_APP_SCORE, + min_app_score_allowed: MIN_APP_SCORE, + min_gossip_score_allowed: -100.0, + } + } +} + +pub trait Punisher { + fn ban_peer(&mut self, peer_id: PeerId); } #[cfg(test)] @@ -798,7 +429,7 @@ mod tests { // try connecting all the random peers for peer_id in &random_peers { - peer_manager.handle_initial_connection(*peer_id); + peer_manager.handle_initial_connection(peer_id, vec![]); } assert_eq!(peer_manager.total_peers_connected(), max_non_reserved_peers); @@ -813,7 +444,7 @@ mod tests { // try connecting all the reserved peers for peer_id in &reserved_peers { - peer_manager.handle_initial_connection(*peer_id); + peer_manager.handle_initial_connection(peer_id, vec![]); } assert_eq!(peer_manager.total_peers_connected(), reserved_peers.len()); @@ -821,7 +452,7 @@ mod tests { // try connecting random peers let random_peers = get_random_peers(10); for peer_id in &random_peers { - peer_manager.handle_initial_connection(*peer_id); + peer_manager.handle_initial_connection(peer_id, vec![]); } // the number should stay the same @@ -837,7 +468,7 @@ mod tests { // try connecting all the reserved peers for peer_id in &reserved_peers { - peer_manager.handle_initial_connection(*peer_id); + peer_manager.handle_initial_connection(peer_id, vec![]); } // disconnect a single reserved peer @@ -846,7 +477,7 @@ mod tests { // try connecting random peers let random_peers = get_random_peers(max_non_reserved_peers * 2); for peer_id in &random_peers { - peer_manager.handle_initial_connection(*peer_id); + peer_manager.handle_initial_connection(peer_id, vec![]); } // there should be an available slot for a reserved peer @@ -856,7 +487,7 @@ mod tests { ); // reconnect the disconnected reserved peer - peer_manager.handle_initial_connection(*reserved_peers.first().unwrap()); + peer_manager.handle_initial_connection(reserved_peers.first().unwrap(), vec![]); // all the slots should be taken now assert_eq!( diff --git a/crates/services/p2p/src/peer_report.rs b/crates/services/p2p/src/peer_report.rs new file mode 100644 index 00000000..4ec48652 --- /dev/null +++ b/crates/services/p2p/src/peer_report.rs @@ -0,0 +1,483 @@ +use crate::{ + config::Config, + heartbeat::{ + Heartbeat, + HeartbeatEvent, + }, +}; +use fuel_core_types::fuel_types::BlockHeight; +use libp2p::{ + core::{ + connection::ConnectionId, + either::EitherOutput, + }, + identify::{ + Behaviour as Identify, + Config as IdentifyConfig, + Event as IdentifyEvent, + Info as IdentifyInfo, + }, + swarm::{ + derive_prelude::{ + ConnectionClosed, + ConnectionEstablished, + DialFailure, + FromSwarm, + ListenFailure, + }, + ConnectionHandler, + IntoConnectionHandler, + IntoConnectionHandlerSelect, + NetworkBehaviour, + NetworkBehaviourAction, + PollParameters, + }, + Multiaddr, + PeerId, +}; +use std::{ + collections::VecDeque, + task::{ + Context, + Poll, + }, + time::Duration, +}; +use tokio::time::{ + self, + Interval, +}; + +use tracing::debug; + +/// Maximum amount of peer's addresses that we are ready to store per peer +const MAX_IDENTIFY_ADDRESSES: usize = 10; +const HEALTH_CHECK_INTERVAL_IN_SECONDS: u64 = 10; +const REPUTATION_DECAY_INTERVAL_IN_SECONDS: u64 = 1; + +/// Events emitted by PeerReportBehavior +#[derive(Debug, Clone)] +pub enum PeerReportEvent { + PeerConnected { + peer_id: PeerId, + addresses: Vec, + initial_connection: bool, + }, + PeerDisconnected { + peer_id: PeerId, + }, + PeerIdentified { + peer_id: PeerId, + agent_version: String, + addresses: Vec, + }, + PeerInfoUpdated { + peer_id: PeerId, + block_height: BlockHeight, + }, + /// Informs p2p service / PeerManager to check health of reserved nodes' connections + CheckReservedNodesHealth, + /// Informs p2p service / PeerManager to perform reputation decay of connected nodes + PerformDecay, +} + +// `Behaviour` that reports events about peers +pub struct PeerReportBehaviour { + heartbeat: Heartbeat, + identify: Identify, + pending_events: VecDeque, + // regulary checks if reserved nodes are connected + health_check: Interval, + decay_interval: Interval, +} + +impl PeerReportBehaviour { + pub(crate) fn new(config: &Config) -> Self { + let identify = { + let identify_config = + IdentifyConfig::new("/fuel/1.0".to_string(), config.keypair.public()); + if let Some(interval) = config.identify_interval { + Identify::new(identify_config.with_interval(interval)) + } else { + Identify::new(identify_config) + } + }; + + let heartbeat = + Heartbeat::new(config.heartbeat_config.clone(), BlockHeight::default()); + + Self { + heartbeat, + identify, + pending_events: VecDeque::default(), + health_check: time::interval(Duration::from_secs( + HEALTH_CHECK_INTERVAL_IN_SECONDS, + )), + decay_interval: time::interval(Duration::from_secs( + REPUTATION_DECAY_INTERVAL_IN_SECONDS, + )), + } + } + + pub fn update_block_height(&mut self, block_height: BlockHeight) { + self.heartbeat.update_block_height(block_height); + } +} + +impl NetworkBehaviour for PeerReportBehaviour { + type ConnectionHandler = IntoConnectionHandlerSelect< + ::ConnectionHandler, + ::ConnectionHandler, + >; + type OutEvent = PeerReportEvent; + + fn new_handler(&mut self) -> Self::ConnectionHandler { + IntoConnectionHandler::select( + self.heartbeat.new_handler(), + self.identify.new_handler(), + ) + } + + fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + self.identify.addresses_of_peer(peer_id) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + match event { + FromSwarm::ConnectionEstablished(connection_established) => { + let ConnectionEstablished { + peer_id, + other_established, + .. + } = connection_established; + + self.heartbeat + .on_swarm_event(FromSwarm::ConnectionEstablished( + connection_established, + )); + self.identify + .on_swarm_event(FromSwarm::ConnectionEstablished( + connection_established, + )); + + let addresses = self.addresses_of_peer(&peer_id); + self.pending_events + .push_back(PeerReportEvent::PeerConnected { + peer_id, + addresses, + initial_connection: other_established == 0, + }); + } + FromSwarm::ConnectionClosed(connection_closed) => { + let ConnectionClosed { + remaining_established, + peer_id, + connection_id, + endpoint, + .. + } = connection_closed; + + let (ping_handler, identity_handler) = + connection_closed.handler.into_inner(); + + let ping_event = ConnectionClosed { + handler: ping_handler, + peer_id, + connection_id, + endpoint, + remaining_established, + }; + self.heartbeat + .on_swarm_event(FromSwarm::ConnectionClosed(ping_event)); + + let identify_event = ConnectionClosed { + handler: identity_handler, + peer_id, + connection_id, + endpoint, + remaining_established, + }; + + self.identify + .on_swarm_event(FromSwarm::ConnectionClosed(identify_event)); + + if remaining_established == 0 { + // this was the last connection to a given Peer + self.pending_events + .push_back(PeerReportEvent::PeerDisconnected { peer_id }) + } + } + FromSwarm::DialFailure(e) => { + let (ping_handler, identity_handler) = e.handler.into_inner(); + let ping_event = DialFailure { + peer_id: e.peer_id, + handler: ping_handler, + error: e.error, + }; + let identity_event = DialFailure { + peer_id: e.peer_id, + handler: identity_handler, + error: e.error, + }; + self.heartbeat + .on_swarm_event(FromSwarm::DialFailure(ping_event)); + self.identify + .on_swarm_event(FromSwarm::DialFailure(identity_event)); + } + FromSwarm::ListenFailure(e) => { + let (ping_handler, identity_handler) = e.handler.into_inner(); + let ping_event = ListenFailure { + handler: ping_handler, + local_addr: e.local_addr, + send_back_addr: e.send_back_addr, + }; + let identity_event = ListenFailure { + handler: identity_handler, + local_addr: e.local_addr, + send_back_addr: e.send_back_addr, + }; + self.heartbeat + .on_swarm_event(FromSwarm::ListenFailure(ping_event)); + self.identify + .on_swarm_event(FromSwarm::ListenFailure(identity_event)); + } + _ => { + self.heartbeat.handle_swarm_event(&event); + self.identify.handle_swarm_event(&event); + } + } + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + params: &mut impl PollParameters, + ) -> Poll> { + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) + } + + match self.heartbeat.poll(cx, params) { + Poll::Pending => {} + Poll::Ready(action) => { + let action = + >::convert_action( + self, action, + ); + if let Some(action) = action { + return Poll::Ready(action) + } + } + } + + loop { + // poll until we've either exhausted the events or found one of interest + match self.identify.poll(cx, params) { + Poll::Pending => break, + Poll::Ready(action) => { + if let Some(action) = + >::convert_action( + self, action, + ) + { + return Poll::Ready(action) + } + } + } + } + + if self.decay_interval.poll_tick(cx).is_ready() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent( + PeerReportEvent::PerformDecay, + )) + } + + if self.health_check.poll_tick(cx).is_ready() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent( + PeerReportEvent::CheckReservedNodesHealth, + )) + } + + Poll::Pending + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: <::Handler as + ConnectionHandler>::OutEvent, + ) { + match event { + EitherOutput::First(heartbeat_event) => self + .heartbeat + .on_connection_handler_event(peer_id, connection_id, heartbeat_event), + EitherOutput::Second(identify_event) => self + .identify + .on_connection_handler_event(peer_id, connection_id, identify_event), + } + } +} + +impl FromAction for PeerReportBehaviour { + fn convert_action( + &mut self, + action: NetworkBehaviourAction< + ::OutEvent, + ::ConnectionHandler, + >, + ) -> Option> { + match action { + NetworkBehaviourAction::GenerateEvent(HeartbeatEvent { + peer_id, + latest_block_height, + }) => { + let event = PeerReportEvent::PeerInfoUpdated { + peer_id, + block_height: latest_block_height, + }; + Some(NetworkBehaviourAction::GenerateEvent(event)) + } + NetworkBehaviourAction::Dial { handler, opts } => { + let handler = + IntoConnectionHandler::select(handler, self.identify.new_handler()); + Some(NetworkBehaviourAction::Dial { handler, opts }) + } + NetworkBehaviourAction::NotifyHandler { + peer_id, + handler, + event, + } => Some(NetworkBehaviourAction::NotifyHandler { + peer_id, + handler, + event: EitherOutput::First(event), + }), + NetworkBehaviourAction::ReportObservedAddr { address, score } => { + Some(NetworkBehaviourAction::ReportObservedAddr { address, score }) + } + NetworkBehaviourAction::CloseConnection { + peer_id, + connection, + } => Some(NetworkBehaviourAction::CloseConnection { + peer_id, + connection, + }), + } + } +} + +impl FromAction for PeerReportBehaviour { + fn convert_action( + &mut self, + action: NetworkBehaviourAction< + ::OutEvent, + ::ConnectionHandler, + >, + ) -> Option> { + match action { + NetworkBehaviourAction::GenerateEvent(event) => match event { + IdentifyEvent::Received { + peer_id, + info: + IdentifyInfo { + protocol_version, + agent_version, + mut listen_addrs, + .. + }, + } => { + if listen_addrs.len() > MAX_IDENTIFY_ADDRESSES { + debug!( + target: "fuel-p2p", + "Node {:?} has reported more than {} addresses; it is identified by {:?} and {:?}", + peer_id, MAX_IDENTIFY_ADDRESSES, protocol_version, agent_version + ); + listen_addrs.truncate(MAX_IDENTIFY_ADDRESSES); + } + + let event = PeerReportEvent::PeerIdentified { + peer_id, + agent_version, + addresses: listen_addrs, + }; + + Some(NetworkBehaviourAction::GenerateEvent(event)) + } + IdentifyEvent::Error { peer_id, error } => { + debug!(target: "fuel-p2p", "Identification with peer {:?} failed => {}", peer_id, error); + None + } + _ => None, + }, + NetworkBehaviourAction::Dial { handler, opts } => { + let handler = + IntoConnectionHandler::select(self.heartbeat.new_handler(), handler); + Some(NetworkBehaviourAction::Dial { handler, opts }) + } + NetworkBehaviourAction::NotifyHandler { + peer_id, + handler, + event, + } => Some(NetworkBehaviourAction::NotifyHandler { + peer_id, + handler, + event: EitherOutput::Second(event), + }), + NetworkBehaviourAction::ReportObservedAddr { address, score } => { + Some(NetworkBehaviourAction::ReportObservedAddr { address, score }) + } + NetworkBehaviourAction::CloseConnection { + peer_id, + connection, + } => Some(NetworkBehaviourAction::CloseConnection { + peer_id, + connection, + }), + } + } +} + +trait FromAction: NetworkBehaviour { + fn convert_action( + &mut self, + action: NetworkBehaviourAction, + ) -> Option>; +} + +impl FromSwarmEvent for Heartbeat {} +impl FromSwarmEvent for Identify {} + +trait FromSwarmEvent: NetworkBehaviour { + fn handle_swarm_event( + &mut self, + event: &FromSwarm<::ConnectionHandler>, + ) { + match event { + FromSwarm::NewListener(e) => { + self.on_swarm_event(FromSwarm::NewListener(*e)); + } + FromSwarm::ExpiredListenAddr(e) => { + self.on_swarm_event(FromSwarm::ExpiredListenAddr(*e)); + } + FromSwarm::ListenerError(e) => { + self.on_swarm_event(FromSwarm::ListenerError(*e)); + } + FromSwarm::ListenerClosed(e) => { + self.on_swarm_event(FromSwarm::ListenerClosed(*e)); + } + FromSwarm::NewExternalAddr(e) => { + self.on_swarm_event(FromSwarm::NewExternalAddr(*e)); + } + FromSwarm::ExpiredExternalAddr(e) => { + self.on_swarm_event(FromSwarm::ExpiredExternalAddr(*e)); + } + FromSwarm::NewListenAddr(e) => { + self.on_swarm_event(FromSwarm::NewListenAddr(*e)); + } + FromSwarm::AddressChange(e) => { + self.on_swarm_event(FromSwarm::AddressChange(*e)); + } + _ => {} + } + } +} diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 7d1d1c3f..3afe42df 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -41,10 +41,15 @@ use fuel_core_types::{ fuel_tx::Transaction, fuel_types::BlockHeight, services::p2p::{ + peer_reputation::{ + AppScore, + PeerReport, + }, BlockHeightHeartbeatData, GossipData, GossipsubMessageAcceptance, GossipsubMessageInfo, + PeerId as FuelPeerId, TransactionGossipData, }, }; @@ -62,11 +67,7 @@ use tokio::sync::{ mpsc, oneshot, }; -use tracing::{ - debug, - error, - warn, -}; +use tracing::warn; pub type Service = ServiceRunner>; @@ -92,6 +93,11 @@ enum TaskRequest { }, // Responds back to the p2p network RespondWithGossipsubMessageReport((GossipsubMessageInfo, GossipsubMessageAcceptance)), + RespondWithPeerReport { + peer_id: PeerId, + score: AppScore, + reporting_service: &'static str, + }, } impl Debug for TaskRequest { @@ -220,6 +226,9 @@ where Some(TaskRequest::RespondWithGossipsubMessageReport((message, acceptance))) => { report_message(&mut self.p2p_service, message, acceptance); } + Some(TaskRequest::RespondWithPeerReport { peer_id, score, reporting_service }) => { + self.p2p_service.report_peer(peer_id, score, reporting_service) + } None => { unreachable!("The `Task` is holder of the `Sender`, so it should not be possible"); } @@ -425,6 +434,32 @@ impl SharedState { ) -> broadcast::Receiver { self.block_height_broadcast.subscribe() } + + pub fn report_peer( + &self, + peer_id: FuelPeerId, + peer_report: T, + reporting_service: &'static str, + ) -> anyhow::Result<()> { + match Vec::from(peer_id).try_into() { + Ok(peer_id) => { + let score = peer_report.get_score_from_report(); + + self.request_sender + .try_send(TaskRequest::RespondWithPeerReport { + peer_id, + score, + reporting_service, + })?; + + Ok(()) + } + Err(e) => { + warn!(target: "fuel-p2p", "Failed to read PeerId from {e:?}"); + Err(anyhow::anyhow!("Failed to read PeerId from {e:?}")) + } + } + } } pub fn new_service(p2p_config: Config, db: D, block_importer: B) -> Service @@ -464,21 +499,9 @@ fn report_message( if let Ok(peer_id) = peer_id.try_into() { let acceptance = to_message_acceptance(&acceptance); - - match p2p_service.report_message_validation_result(&msg_id, &peer_id, acceptance) - { - Ok(true) => { - debug!(target: "fuel-libp2p", "Sent a report for MessageId: {} from PeerId: {}", msg_id, peer_id); - } - Ok(false) => { - warn!(target: "fuel-libp2p", "Message with MessageId: {} not found in the Gossipsub Message Cache", msg_id); - } - Err(e) => { - error!(target: "fuel-libp2p", "Failed to publish Message with MessageId: {} with Error: {:?}", msg_id, e); - } - } + p2p_service.report_message_validation_result(&msg_id, peer_id, acceptance); } else { - warn!(target: "fuel-libp2p", "Failed to read PeerId from received GossipsubMessageId: {}", msg_id); + warn!(target: "fuel-p2p", "Failed to read PeerId from received GossipsubMessageId: {}", msg_id); } } diff --git a/crates/types/src/services/p2p.rs b/crates/types/src/services/p2p.rs index a2f1616f..d0f8e604 100644 --- a/crates/types/src/services/p2p.rs +++ b/crates/types/src/services/p2p.rs @@ -5,6 +5,8 @@ use crate::{ fuel_types::BlockHeight, }; use std::fmt::Debug; +/// Contains types and logic for Peer Reputation +pub mod peer_reputation; /// Lightweight representation of gossipped data that only includes IDs #[derive(Debug, Clone, Hash, PartialEq, Eq)] diff --git a/crates/types/src/services/p2p/peer_reputation.rs b/crates/types/src/services/p2p/peer_reputation.rs new file mode 100644 index 00000000..9fd02b67 --- /dev/null +++ b/crates/types/src/services/p2p/peer_reputation.rs @@ -0,0 +1,56 @@ +/// PeerScore type used for Peer Reputation +pub type AppScore = f64; + +/// Minimum allowed peer score before peer is banned +pub const MIN_APP_SCORE: AppScore = -50.0; +/// Default value for peer score +pub const DEFAULT_APP_SCORE: AppScore = 0.0; +/// Maximum value a Peer can reach with its PeerScore +pub const MAX_APP_SCORE: AppScore = 150.0; +/// Score by which we slowly decrease active peer reputation +pub const DECAY_APP_SCORE: AppScore = 0.9; + +/// Types implementing this can report new PeerScore +pub trait PeerReport { + /// Extracts PeerScore from the Report + fn get_score_from_report(&self) -> AppScore; +} + +/// Example of negative PeerReport +#[derive(Debug, Clone)] +pub enum NegativePeerReport { + /// Worst offense, peer should likely be banned after this + Fatal, + /// Minor offense, deduct few points + Minor, + /// Major offense, deduct reasonable amount of points + Major, +} + +impl PeerReport for NegativePeerReport { + fn get_score_from_report(&self) -> AppScore { + match self { + Self::Fatal => -MAX_APP_SCORE - 10.0, + Self::Major => -10.0, + Self::Minor => -5.0, + } + } +} + +/// Example of positive PeerReport +#[derive(Debug, Clone)] +pub enum PositivePeerReport { + /// Minor positive feedback, increase reputation slightly + Minor, + /// Major positive feedback, increase reputation + Major, +} + +impl PeerReport for PositivePeerReport { + fn get_score_from_report(&self) -> AppScore { + match self { + Self::Major => 5.0, + Self::Minor => 1.0, + } + } +} diff --git a/tests/tests/tx_gossip.rs b/tests/tests/tx_gossip.rs index d2388030..f1661493 100644 --- a/tests/tests/tx_gossip.rs +++ b/tests/tests/tx_gossip.rs @@ -100,7 +100,7 @@ async fn test_tx_gossiping() { let node_two = FuelService::new_node(node_config).await.unwrap(); let client_two = FuelClient::from(node_two.bound_address); - let wait_time = Duration::from_secs(6); + let wait_time = Duration::from_secs(10); tokio::time::sleep(wait_time).await;