From aab8af264a01eee5e22683530ac03205b72fc2e6 Mon Sep 17 00:00:00 2001 From: crypto523 Date: Fri, 15 Sep 2023 14:00:30 -0700 Subject: [PATCH] Add tracking for peer heartbeats (#1356) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit https://github.com/FuelLabs/fuel-core/issues/1348 In order to track peer heartbeats, this PR adds two new things: - A concept of a rolling/moving average for the time between heartbeats for each peer - This is updated every time a heartbeat is sent from that peer - A regular timeout in the P2P `Task` that will check peers for good heartbeat behavior _Originally I was looking at just reporting on heartbeat, but if a peer never reported then they would be exempt from checks_ There are ~3~ 2 cases in which we will want to punish peers for their heartbeat behavior: - ~They have never sent us a heartbeat~ (If we treat the setup as the first heartbeat, then we can merge this with the next 👇) - They haven't sent us a heartbeat in a long time - They are sending heartbeats very sporadically We can also use this data to determine which peers are performing the best --------- Co-authored-by: Brandon Kite --- CHANGELOG.md | 2 + Cargo.lock | 1 + bin/fuel-core/src/cli/run/p2p.rs | 19 + crates/services/p2p/Cargo.toml | 3 +- crates/services/p2p/src/config.rs | 13 + crates/services/p2p/src/p2p_service.rs | 4 +- crates/services/p2p/src/peer_manager.rs | 100 ++-- .../p2p/src/peer_manager/heartbeat_data.rs | 89 +++ .../p2p/src/request_response/messages.rs | 9 +- crates/services/p2p/src/service.rs | 562 +++++++++++++++++- .../charts/templates/fuel-core-deploy.yaml | 12 + deployment/charts/values.yaml | 3 + 12 files changed, 727 insertions(+), 90 deletions(-) create mode 100644 crates/services/p2p/src/peer_manager/heartbeat_data.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 594be2d9..c4041ed0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Description of the upcoming release here. ### Added +- [#1356](https://github.com/FuelLabs/fuel-core/pull/1356): Add peer reputation reporting to heartbeat code - [#1355](https://github.com/FuelLabs/fuel-core/pull/1355): Added new metrics related to block importing, such as tps, sync delays etc - [#1339](https://github.com/FuelLabs/fuel-core/pull/1339): Adds `baseAssetId` to `FeeParameters` in the GraphQL API. - [#1331](https://github.com/FuelLabs/fuel-core/pull/1331): Add peer reputation reporting to block import code @@ -20,6 +21,7 @@ Description of the upcoming release here. - [#1286](https://github.com/FuelLabs/fuel-core/pull/1286): Include readable names for test cases where missing. - [#1274](https://github.com/FuelLabs/fuel-core/pull/1274): Added tests to benchmark block synchronization. - [#1263](https://github.com/FuelLabs/fuel-core/pull/1263): Add gas benchmarks for `ED19` and `ECR1` instructions. +- [#1331](https://github.com/FuelLabs/fuel-core/pull/1331): Add peer reputation reporting to block import code ### Changed diff --git a/Cargo.lock b/Cargo.lock index fd9ac84e..c93ffc98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3017,6 +3017,7 @@ dependencies = [ "serde", "serde_with", "sha2 0.10.7", + "thiserror", "tokio", "tracing", "tracing-attributes", diff --git a/bin/fuel-core/src/cli/run/p2p.rs b/bin/fuel-core/src/cli/run/p2p.rs index b48631ea..2e3f5f43 100644 --- a/bin/fuel-core/src/cli/run/p2p.rs +++ b/bin/fuel-core/src/cli/run/p2p.rs @@ -175,6 +175,18 @@ pub struct P2PArgs { /// Cannot be zero. #[clap(long = "heartbeat-max-failures", default_value = "5", env)] pub heartbeat_max_failures: NonZeroU32, + + /// For peer reputations, the interval at which to check heartbeat health for all peers + #[clap(long = "heartbeat-check-interval", default_value = "5", env)] + pub heartbeat_check_interval: u64, + + /// For peer reputations, the maximum average interval between heartbeats for a peer before penalty + #[clap(long = "heartbeat-max-avg-interval", default_value = "20", env)] + pub heartbeat_max_avg_interval: u64, + + /// For peer reputations, the maximum time since last heartbeat before penalty + #[clap(long = "heartbeat-max-time-since-last", default_value = "40", env)] + pub heartbeat_max_time_since_last: u64, } #[derive(Debug, Clone, Args)] @@ -302,6 +314,13 @@ impl P2PArgs { heartbeat_config, set_request_timeout: Duration::from_secs(self.request_timeout), set_connection_keep_alive: Duration::from_secs(self.connection_keep_alive), + heartbeat_check_interval: Duration::from_secs(self.heartbeat_check_interval), + heartbeat_max_avg_interval: Duration::from_secs( + self.heartbeat_max_avg_interval, + ), + heartbeat_max_time_since_last: Duration::from_secs( + self.heartbeat_max_time_since_last, + ), info_interval: Some(Duration::from_secs(self.info_interval)), identify_interval: Some(Duration::from_secs(self.identify_interval)), metrics, diff --git a/crates/services/p2p/Cargo.toml b/crates/services/p2p/Cargo.toml index dcdfb4de..86dcda52 100644 --- a/crates/services/p2p/Cargo.toml +++ b/crates/services/p2p/Cargo.toml @@ -58,6 +58,7 @@ rand = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_with = "1.11" sha2 = "0.10" +thiserror = "1.0.47" tokio = { workspace = true, features = ["sync"] } tracing = { workspace = true } @@ -70,7 +71,7 @@ fuel-core-types = { path = "../../types", features = [ "test-helpers" ] } rand = { workspace = true } -tokio = { workspace = true, features = ["full"] } +tokio = { workspace = true, features = ["full", "test-util"] } tracing-attributes = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } diff --git a/crates/services/p2p/src/config.rs b/crates/services/p2p/src/config.rs index 1f91aa06..1ce3f252 100644 --- a/crates/services/p2p/src/config.rs +++ b/crates/services/p2p/src/config.rs @@ -131,6 +131,13 @@ pub struct Config { /// Sets the keep-alive timeout of idle connections. pub set_connection_keep_alive: Duration, + /// Time between checking heartbeat status for all peers + pub heartbeat_check_interval: Duration, + /// Max avg time between heartbeats for a given peer before getting reputation penalty + pub heartbeat_max_avg_interval: Duration, + /// Max time since a given peer has sent a heartbeat before getting reputation penalty + pub heartbeat_max_time_since_last: Duration, + /// Enables prometheus metrics for this fuel-service pub metrics: bool, @@ -176,6 +183,9 @@ impl Config { heartbeat_config: self.heartbeat_config, set_request_timeout: self.set_request_timeout, set_connection_keep_alive: self.set_connection_keep_alive, + heartbeat_check_interval: self.heartbeat_check_interval, + heartbeat_max_avg_interval: self.heartbeat_max_time_since_last, + heartbeat_max_time_since_last: self.heartbeat_max_time_since_last, metrics: self.metrics, state: Initialized(()), }) @@ -218,6 +228,9 @@ impl Config { heartbeat_config: HeartbeatConfig::default(), set_request_timeout: REQ_RES_TIMEOUT, set_connection_keep_alive: REQ_RES_TIMEOUT, + heartbeat_check_interval: Duration::from_secs(10), + heartbeat_max_avg_interval: Duration::from_secs(20), + heartbeat_max_time_since_last: Duration::from_secs(40), info_interval: Some(Duration::from_secs(3)), identify_interval: Some(Duration::from_secs(5)), metrics: false, diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 967cd09f..83912e91 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -275,7 +275,7 @@ impl FuelP2PService { .collect() } - pub fn get_peers_ids(&self) -> impl Iterator { + pub fn get_peers_ids_iter(&self) -> impl Iterator { self.peer_manager.get_peers_ids() } @@ -309,7 +309,7 @@ impl FuelP2PService { let peer_id = match peer_id { Some(peer_id) => peer_id, _ => { - let peers = self.get_peers_ids(); + let peers = self.get_peers_ids_iter(); let peers_count = self.peer_manager.total_peers_connected(); if peers_count == 0 { diff --git a/crates/services/p2p/src/peer_manager.rs b/crates/services/p2p/src/peer_manager.rs index 6a723e95..ba700267 100644 --- a/crates/services/p2p/src/peer_manager.rs +++ b/crates/services/p2p/src/peer_manager.rs @@ -22,15 +22,18 @@ use std::{ Arc, RwLock, }, - time::Duration, }; -use tokio::time::Instant; use tracing::{ debug, info, }; -use crate::gossipsub_config::GRAYLIST_THRESHOLD; +use crate::{ + gossipsub_config::GRAYLIST_THRESHOLD, + peer_manager::heartbeat_data::HeartbeatData, +}; + +pub mod heartbeat_data; /// At this point we better just ban the peer const MIN_GOSSIPSUB_SCORE_BEFORE_BAN: AppScore = GRAYLIST_THRESHOLD; @@ -44,23 +47,17 @@ pub struct PeerInfo { pub score: AppScore, } -impl Default for PeerInfo { - fn default() -> Self { +impl PeerInfo { + pub fn new(heartbeat_avg_window: u32) -> Self { Self { + peer_addresses: HashSet::new(), + client_version: None, + heartbeat_data: HeartbeatData::new(heartbeat_avg_window), score: DEFAULT_APP_SCORE, - client_version: Default::default(), - heartbeat_data: Default::default(), - peer_addresses: Default::default(), } } } -enum PeerInfoInsert { - Addresses(Vec), - ClientVersion(String), - HeartbeatData(HeartbeatData), -} - /// Manages Peers and their events #[derive(Debug)] pub struct PeerManager { @@ -117,14 +114,13 @@ impl PeerManager { ) { if let Some(time_elapsed) = self .get_peer_info(peer_id) - .and_then(|info| info.heartbeat_data.seconds_since_last_heartbeat()) + .map(|info| info.heartbeat_data.duration_since_last_heartbeat()) { - debug!(target: "fuel-p2p", "Previous heartbeat happened {:?} seconds ago", time_elapsed); + debug!(target: "fuel-p2p", "Previous heartbeat happened {:?} milliseconds ago", time_elapsed.as_millis()); } - let heartbeat_data = HeartbeatData::new(block_height); - - self.insert_peer_info(peer_id, PeerInfoInsert::HeartbeatData(heartbeat_data)); + let peers = self.get_assigned_peer_table_mut(peer_id); + update_heartbeat(peers, peer_id, block_height); } /// Returns `true` signaling that the peer should be disconnected @@ -137,7 +133,8 @@ impl PeerManager { if initial_connection { self.handle_initial_connection(peer_id, addresses) } else { - self.insert_peer_info(peer_id, PeerInfoInsert::Addresses(addresses)); + let peers = self.get_assigned_peer_table_mut(peer_id); + insert_peer_addresses(peers, peer_id, addresses); false } } @@ -148,8 +145,9 @@ impl PeerManager { addresses: Vec, agent_version: String, ) { - self.insert_peer_info(peer_id, PeerInfoInsert::ClientVersion(agent_version)); - self.insert_peer_info(peer_id, PeerInfoInsert::Addresses(addresses)); + let peers = self.get_assigned_peer_table_mut(peer_id); + insert_client_version(peers, peer_id, agent_version); + insert_peer_addresses(peers, peer_id, addresses); } pub fn batch_update_score_with_decay(&mut self) { @@ -197,6 +195,12 @@ impl PeerManager { self.non_reserved_connected_peers.get(peer_id) } + pub fn get_all_peers(&self) -> impl Iterator { + self.non_reserved_connected_peers + .iter() + .chain(self.reserved_connected_peers.iter()) + } + pub fn get_disconnected_reserved_peers(&self) -> impl Iterator { self.reserved_peers .iter() @@ -255,6 +259,8 @@ impl PeerManager { peer_id: &PeerId, addresses: Vec, ) -> bool { + const HEARTBEAT_AVG_WINDOW: u32 = 10; + // if the connected Peer is not from the reserved peers if !self.reserved_peers.contains(peer_id) { let non_reserved_peers_connected = self.non_reserved_connected_peers.len(); @@ -272,15 +278,16 @@ impl PeerManager { } self.non_reserved_connected_peers - .insert(*peer_id, PeerInfo::default()); + .insert(*peer_id, PeerInfo::new(HEARTBEAT_AVG_WINDOW)); } else { self.reserved_connected_peers - .insert(*peer_id, PeerInfo::default()); + .insert(*peer_id, PeerInfo::new(HEARTBEAT_AVG_WINDOW)); self.send_reserved_peers_update(); } - self.insert_peer_info(peer_id, PeerInfoInsert::Addresses(addresses)); + let peers = self.get_assigned_peer_table_mut(peer_id); + insert_peer_addresses(peers, peer_id, addresses); false } @@ -291,22 +298,14 @@ impl PeerManager { .send(self.reserved_connected_peers.len()); } - fn insert_peer_info(&mut self, peer_id: &PeerId, data: PeerInfoInsert) { - let peers = if self.reserved_peers.contains(peer_id) { + fn get_assigned_peer_table_mut( + &mut self, + peer_id: &PeerId, + ) -> &mut HashMap { + if self.reserved_peers.contains(peer_id) { &mut self.reserved_connected_peers } else { &mut self.non_reserved_connected_peers - }; - match data { - PeerInfoInsert::Addresses(addresses) => { - insert_peer_addresses(peers, peer_id, addresses) - } - PeerInfoInsert::ClientVersion(client_version) => { - insert_client_version(peers, peer_id, client_version) - } - PeerInfoInsert::HeartbeatData(block_height) => { - insert_heartbeat_data(peers, peer_id, block_height) - } } } } @@ -316,25 +315,6 @@ pub struct ConnectionState { peers_allowed: bool, } -#[derive(Debug, Clone, Default)] -pub struct HeartbeatData { - pub block_height: Option, - pub last_heartbeat: Option, -} - -impl HeartbeatData { - pub fn new(block_height: BlockHeight) -> Self { - Self { - block_height: Some(block_height), - last_heartbeat: Some(Instant::now()), - } - } - - pub fn seconds_since_last_heartbeat(&self) -> Option { - self.last_heartbeat.map(|time| time.elapsed()) - } -} - impl ConnectionState { pub fn new() -> Arc> { Arc::new(RwLock::new(Self { @@ -369,13 +349,13 @@ fn insert_peer_addresses( } } -fn insert_heartbeat_data( +fn update_heartbeat( peers: &mut HashMap, peer_id: &PeerId, - heartbeat_data: HeartbeatData, + block_height: BlockHeight, ) { if let Some(peer) = peers.get_mut(peer_id) { - peer.heartbeat_data = heartbeat_data; + peer.heartbeat_data.update(block_height); } else { log_missing_peer(peer_id); } diff --git a/crates/services/p2p/src/peer_manager/heartbeat_data.rs b/crates/services/p2p/src/peer_manager/heartbeat_data.rs new file mode 100644 index 00000000..a2889dc2 --- /dev/null +++ b/crates/services/p2p/src/peer_manager/heartbeat_data.rs @@ -0,0 +1,89 @@ +use fuel_core_types::fuel_types::BlockHeight; +use std::{ + collections::VecDeque, + time::Duration, +}; +use tokio::time::Instant; + +#[derive(Debug, Clone)] +pub struct HeartbeatData { + pub block_height: Option, + pub last_heartbeat: Instant, + // Size of moving average window + pub window: u32, + pub durations: VecDeque, +} + +impl HeartbeatData { + pub fn new(window: u32) -> Self { + Self { + block_height: None, + last_heartbeat: Instant::now(), + window, + durations: VecDeque::with_capacity(window as usize), + } + } + + pub fn duration_since_last_heartbeat(&self) -> Duration { + self.last_heartbeat.elapsed() + } + + pub fn average_time_between_heartbeats(&self) -> Duration { + if self.durations.is_empty() { + Duration::from_secs(0) + } else { + self.durations.iter().sum::() / self.durations.len() as u32 + } + } + + fn add_new_duration(&mut self, new_duration: Duration) { + if self.durations.len() == self.window as usize { + self.durations.pop_back(); + } + self.durations.push_front(new_duration); + } + + pub fn update(&mut self, block_height: BlockHeight) { + self.block_height = Some(block_height); + let old_hearbeat = self.last_heartbeat; + self.last_heartbeat = Instant::now(); + let new_duration = self.last_heartbeat - old_hearbeat; + self.add_new_duration(new_duration); + } +} + +#[cfg(test)] +mod tests { + #![allow(non_snake_case)] + use super::*; + + #[tokio::test(start_paused = true)] + async fn duration_since_last_heartbeat__reads_correctly() { + let heartbeat_data = HeartbeatData::new(10); + tokio::time::advance(Duration::from_secs(10)).await; + assert_eq!( + heartbeat_data.duration_since_last_heartbeat(), + Duration::from_secs(10) + ); + } + + #[tokio::test(start_paused = true)] + async fn update__works_with_many() { + let intervals: Vec = + vec![5, 40, 19, 400, 23, 36, 33, 22, 11, 10, 9, 8, 72, 16, 5, 4]; + let mut heartbeat_data = HeartbeatData::new(10); + for (i, interval) in intervals.clone().into_iter().enumerate() { + tokio::time::advance(Duration::from_secs(interval)).await; + heartbeat_data.update(1.into()); + let bottom = if i < 10 { 0 } else { i - 9 }; + let range = &intervals[bottom..=i]; + let expected = range + .iter() + .map(|x| Duration::from_secs(*x)) + .sum::() + / range.len() as u32; + let actual = heartbeat_data.average_time_between_heartbeats(); + assert_eq!(actual, expected); + } + } +} diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index d466f16c..56eb6df7 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -21,6 +21,7 @@ use serde_with::{ serde_as, FromInto, }; +use thiserror::Error; use tokio::sync::oneshot; pub(crate) const REQUEST_RESPONSE_PROTOCOL_ID: &[u8] = b"/fuel/req_res/0.0.1"; @@ -80,14 +81,18 @@ pub enum OutboundResponse { Transactions(Option>>), } -#[derive(Debug)] +#[derive(Debug, Error)] pub enum RequestError { + #[error("Not currently connected to any peers")] NoPeersConnected, } -#[derive(Debug, Eq, PartialEq)] +#[derive(Debug, Eq, PartialEq, Error)] pub enum ResponseError { + #[error("Response channel does not exist")] ResponseChannelDoesNotExist, + #[error("Failed to send response")] SendingResponseFailed, + #[error("Failed to convert response to intermediate format")] ConversionToIntermediateFailed, } diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 1f97cb05..5e9f2c33 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -12,6 +12,7 @@ use crate::{ FuelP2PEvent, FuelP2PService, }, + peer_manager::PeerInfo, ports::{ BlockHeightImporter, P2pDb, @@ -53,24 +54,34 @@ use fuel_core_types::{ TransactionGossipData, }, }; -use futures::StreamExt; +use futures::{ + future::BoxFuture, + StreamExt, +}; use libp2p::{ gossipsub::MessageAcceptance, PeerId, }; +use libp2p_request_response::RequestId; use std::{ fmt::Debug, ops::Range, sync::Arc, }; -use tokio::sync::{ - broadcast, - mpsc, - oneshot, +use tokio::{ + sync::{ + broadcast, + mpsc, + oneshot, + }, + time::{ + Duration, + Instant, + }, }; use tracing::warn; -pub type Service = ServiceRunner>; +pub type Service = ServiceRunner, D, SharedState>>; enum TaskRequest { // Broadcast requests to p2p network @@ -107,65 +118,271 @@ impl Debug for TaskRequest { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum HeartBeatPeerReportReason { + OldHeartBeat, + LowHeartBeatFrequency, +} + +impl PeerReport for HeartBeatPeerReportReason { + fn get_score_from_report(&self) -> AppScore { + todo!() + } +} + +pub trait TaskP2PService: Send { + fn get_peer_ids(&self) -> Vec; + fn get_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)>; + fn get_peer_id_with_height(&self, height: &BlockHeight) -> Option; + + fn next_event(&mut self) -> BoxFuture<'_, Option>; + + fn publish_message( + &mut self, + message: GossipsubBroadcastRequest, + ) -> anyhow::Result<()>; + fn send_request_msg( + &mut self, + peer_id: Option, + request_msg: RequestMessage, + channel_item: ResponseChannelItem, + ) -> anyhow::Result<()>; + + fn send_response_msg( + &mut self, + request_id: RequestId, + message: OutboundResponse, + ) -> anyhow::Result<()>; + fn report_message( + &mut self, + message: GossipsubMessageInfo, + acceptance: GossipsubMessageAcceptance, + ) -> anyhow::Result<()>; + + fn report_peer( + &mut self, + peer_id: PeerId, + score: AppScore, + reporting_service: &str, + ) -> anyhow::Result<()>; + + fn update_block_height(&mut self, height: BlockHeight) -> anyhow::Result<()>; +} + +impl TaskP2PService for FuelP2PService { + fn get_peer_ids(&self) -> Vec { + self.get_peers_ids_iter().copied().collect() + } + + fn get_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)> { + self.peer_manager().get_all_peers().collect() + } + + fn get_peer_id_with_height(&self, height: &BlockHeight) -> Option { + self.peer_manager().get_peer_id_with_height(height) + } + + fn next_event(&mut self) -> BoxFuture<'_, Option> { + Box::pin(self.next_event()) + } + + fn publish_message( + &mut self, + message: GossipsubBroadcastRequest, + ) -> anyhow::Result<()> { + self.publish_message(message)?; + Ok(()) + } + + fn send_request_msg( + &mut self, + peer_id: Option, + request_msg: RequestMessage, + channel_item: ResponseChannelItem, + ) -> anyhow::Result<()> { + self.send_request_msg(peer_id, request_msg, channel_item)?; + Ok(()) + } + + fn send_response_msg( + &mut self, + request_id: RequestId, + message: OutboundResponse, + ) -> anyhow::Result<()> { + self.send_response_msg(request_id, message)?; + Ok(()) + } + + fn report_message( + &mut self, + message: GossipsubMessageInfo, + acceptance: GossipsubMessageAcceptance, + ) -> anyhow::Result<()> { + report_message(self, message, acceptance); + Ok(()) + } + + fn report_peer( + &mut self, + peer_id: PeerId, + score: AppScore, + reporting_service: &str, + ) -> anyhow::Result<()> { + self.report_peer(peer_id, score, reporting_service); + Ok(()) + } + + fn update_block_height(&mut self, height: BlockHeight) -> anyhow::Result<()> { + self.update_block_height(height); + Ok(()) + } +} + +pub trait Broadcast: Send { + fn report_peer( + &self, + peer_id: FuelPeerId, + report: HeartBeatPeerReportReason, + reporting_service: &'static str, + ) -> anyhow::Result<()>; + + fn block_height_broadcast( + &self, + block_height_data: BlockHeightHeartbeatData, + ) -> anyhow::Result<()>; + + fn tx_broadcast(&self, transaction: TransactionGossipData) -> anyhow::Result<()>; +} + +impl Broadcast for SharedState { + fn report_peer( + &self, + peer_id: FuelPeerId, + report: HeartBeatPeerReportReason, + reporting_service: &'static str, + ) -> anyhow::Result<()> { + self.report_peer(peer_id, report, reporting_service) + } + + fn block_height_broadcast( + &self, + block_height_data: BlockHeightHeartbeatData, + ) -> anyhow::Result<()> { + self.block_height_broadcast.send(block_height_data)?; + Ok(()) + } + + fn tx_broadcast(&self, transaction: TransactionGossipData) -> anyhow::Result<()> { + self.tx_broadcast.send(transaction)?; + Ok(()) + } +} + /// Orchestrates various p2p-related events between the inner `P2pService` /// and the top level `NetworkService`. -pub struct Task { - p2p_service: FuelP2PService, +pub struct Task { + p2p_service: P, db: Arc, next_block_height: BoxStream, /// Receive internal Task Requests request_receiver: mpsc::Receiver, - shared: SharedState, + broadcast: B, max_headers_per_request: u32, + // milliseconds wait time between peer heartbeat reputation checks + heartbeat_check_interval: Duration, + heartbeat_max_avg_interval: Duration, + heartbeat_max_time_since_last: Duration, + next_check_time: Instant, } -impl Task { +impl Task, D, SharedState> { pub fn new( config: Config, db: Arc, block_importer: Arc, ) -> Self { + let Config { + max_block_size, + max_headers_per_request, + heartbeat_check_interval, + heartbeat_max_avg_interval, + heartbeat_max_time_since_last, + .. + } = config; let (request_sender, request_receiver) = mpsc::channel(100); let (tx_broadcast, _) = broadcast::channel(100); let (block_height_broadcast, _) = broadcast::channel(100); let next_block_height = block_importer.next_block_height(); - let max_block_size = config.max_block_size; - let max_headers_per_request = config.max_headers_per_request; let p2p_service = FuelP2PService::new(config, PostcardCodec::new(max_block_size)); let reserved_peers_broadcast = p2p_service.peer_manager().reserved_peers_updates(); + let next_check_time = Instant::now() + heartbeat_check_interval; + Self { p2p_service, db, request_receiver, next_block_height, - shared: SharedState { + broadcast: SharedState { request_sender, tx_broadcast, reserved_peers_broadcast, block_height_broadcast, }, max_headers_per_request, + heartbeat_check_interval, + heartbeat_max_avg_interval, + heartbeat_max_time_since_last, + next_check_time, } } } +impl Task { + fn peer_heartbeat_reputation_checks(&self) -> anyhow::Result<()> { + for (peer_id, peer_info) in self.p2p_service.get_all_peer_info() { + if peer_info.heartbeat_data.duration_since_last_heartbeat() + > self.heartbeat_max_time_since_last + { + tracing::debug!("Peer {:?} has old heartbeat", peer_id); + let report = HeartBeatPeerReportReason::OldHeartBeat; + let service = "p2p"; + let peer_id = convert_peer_id(peer_id)?; + self.broadcast.report_peer(peer_id, report, service)?; + } else if peer_info.heartbeat_data.average_time_between_heartbeats() + > self.heartbeat_max_avg_interval + { + tracing::debug!("Peer {:?} has low heartbeat frequency", peer_id); + let report = HeartBeatPeerReportReason::LowHeartBeatFrequency; + let service = "p2p"; + let peer_id = convert_peer_id(peer_id)?; + self.broadcast.report_peer(peer_id, report, service)?; + } + } + Ok(()) + } +} + +fn convert_peer_id(peer_id: &PeerId) -> anyhow::Result { + let inner = Vec::try_from(*peer_id)?; + Ok(FuelPeerId::from(inner)) +} #[async_trait::async_trait] -impl RunnableService for Task +impl RunnableService for Task, D, SharedState> where Self: RunnableTask, { const NAME: &'static str = "P2P"; type SharedData = SharedState; - type Task = Task; + type Task = Task, D, SharedState>; type TaskParams = (); fn shared_data(&self) -> Self::SharedData { - self.shared.clone() + self.broadcast.clone() } async fn into_task( @@ -180,12 +397,16 @@ where // TODO: Add tests https://github.com/FuelLabs/fuel-core/issues/1275 #[async_trait::async_trait] -impl RunnableTask for Task +impl RunnableTask for Task where + P: TaskP2PService + 'static, D: P2pDb + 'static, + B: Broadcast + 'static, { async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { + tracing::debug!("P2P task is running"); let should_continue; + tokio::select! { biased; @@ -218,13 +439,13 @@ where } } Some(TaskRequest::GetPeerIds(channel)) => { - let peer_ids = self.p2p_service.get_peers_ids().copied().collect(); + let peer_ids = self.p2p_service.get_peer_ids(); let _ = channel.send(peer_ids); } Some(TaskRequest::GetBlock { height, channel }) => { let request_msg = RequestMessage::Block(height); let channel_item = ResponseChannelItem::Block(channel); - let peer = self.p2p_service.peer_manager().get_peer_id_with_height(&height); + let peer = self.p2p_service.get_peer_id_with_height(&height); let _ = self.p2p_service.send_request_msg(peer, request_msg, channel_item); } Some(TaskRequest::GetSealedHeaders { block_height_range, channel: response}) => { @@ -234,7 +455,7 @@ where // Note: this range has already been check for // validity in `SharedState::get_sealed_block_headers`. let block_height = BlockHeight::from(block_height_range.end - 1); - let peer = self.p2p_service.peer_manager() + let peer = self.p2p_service .get_peer_id_with_height(&block_height); let _ = self.p2p_service.send_request_msg(peer, request_msg, channel_item); } @@ -244,10 +465,11 @@ where let _ = self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel_item); } Some(TaskRequest::RespondWithGossipsubMessageReport((message, acceptance))) => { - report_message(&mut self.p2p_service, message, acceptance); + // report_message(&mut self.p2p_service, message, acceptance); + self.p2p_service.report_message(message, acceptance)?; } Some(TaskRequest::RespondWithPeerReport { peer_id, score, reporting_service }) => { - self.p2p_service.report_peer(peer_id, score, reporting_service) + let _ = self.p2p_service.report_peer(peer_id, score, reporting_service); } None => { unreachable!("The `Task` is holder of the `Sender`, so it should not be possible"); @@ -264,7 +486,7 @@ where block_height, }; - let _ = self.shared.block_height_broadcast.send(block_height_data); + let _ = self.broadcast.block_height_broadcast(block_height_data); } Some(FuelP2PEvent::GossipsubMessage { message, message_id, peer_id,.. }) => { let message_id = message_id.0; @@ -272,7 +494,7 @@ where match message { GossipsubMessage::NewTx(transaction) => { let next_transaction = GossipData::new(transaction, peer_id, message_id); - let _ = self.shared.tx_broadcast.send(next_transaction); + let _ = self.broadcast.tx_broadcast(next_transaction); }, GossipsubMessage::NewBlock(block) => { // todo: add logic to gossip newly received blocks @@ -341,6 +563,17 @@ where _ => (), } }, + _ = tokio::time::sleep_until(self.next_check_time) => { + should_continue = true; + let res = self.peer_heartbeat_reputation_checks(); + match res { + Ok(_) => tracing::debug!("Peer heartbeat reputation checks completed"), + Err(e) => { + tracing::error!("Failed to perform peer heartbeat reputation checks: {:?}", e); + } + } + self.next_check_time += self.heartbeat_check_interval; + }, latest_block_height = self.next_block_height.next() => { if let Some(latest_block_height) = latest_block_height { let _ = self.p2p_service.update_block_height(latest_block_height); @@ -351,6 +584,7 @@ where } } + tracing::debug!("P2P task is finished"); Ok(should_continue) } @@ -571,13 +805,20 @@ fn report_message( #[cfg(test)] pub mod tests { + #![allow(non_snake_case)] use crate::ports::P2pDb; use super::*; - use fuel_core_services::Service; + use crate::peer_manager::heartbeat_data::HeartbeatData; + use fuel_core_services::{ + Service, + State, + }; use fuel_core_storage::Result as StorageResult; use fuel_core_types::fuel_types::BlockHeight; + use futures::FutureExt; + use std::collections::VecDeque; #[derive(Clone, Debug)] struct FakeDb; @@ -631,4 +872,275 @@ pub mod tests { // Node with p2p service stopped assert!(service.stop_and_await().await.unwrap().stopped()); } + + struct FakeP2PService { + peer_info: Vec<(PeerId, PeerInfo)>, + } + + impl TaskP2PService for FakeP2PService { + fn get_peer_ids(&self) -> Vec { + todo!() + } + + fn get_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)> { + self.peer_info + .iter() + .map(|(peer_id, peer_info)| (peer_id, peer_info)) + .collect() + } + + fn get_peer_id_with_height(&self, _height: &BlockHeight) -> Option { + todo!() + } + + fn next_event(&mut self) -> BoxFuture<'_, Option> { + std::future::pending().boxed() + } + + fn publish_message( + &mut self, + _message: GossipsubBroadcastRequest, + ) -> anyhow::Result<()> { + todo!() + } + + fn send_request_msg( + &mut self, + _peer_id: Option, + _request_msg: RequestMessage, + _channel_item: ResponseChannelItem, + ) -> anyhow::Result<()> { + todo!() + } + + fn send_response_msg( + &mut self, + _request_id: RequestId, + _message: OutboundResponse, + ) -> anyhow::Result<()> { + todo!() + } + + fn report_message( + &mut self, + _message: GossipsubMessageInfo, + _acceptance: GossipsubMessageAcceptance, + ) -> anyhow::Result<()> { + todo!() + } + + fn report_peer( + &mut self, + _peer_id: PeerId, + _score: AppScore, + _reporting_service: &str, + ) -> anyhow::Result<()> { + todo!() + } + + fn update_block_height(&mut self, _height: BlockHeight) -> anyhow::Result<()> { + todo!() + } + } + + struct FakeDB; + + impl P2pDb for FakeDB { + fn get_sealed_block( + &self, + _height: &BlockHeight, + ) -> StorageResult> { + todo!() + } + + fn get_sealed_header( + &self, + _height: &BlockHeight, + ) -> StorageResult> { + todo!() + } + + fn get_sealed_headers( + &self, + _block_height_range: Range, + ) -> StorageResult> { + todo!() + } + + fn get_transactions( + &self, + _block_id: &BlockId, + ) -> StorageResult>> { + todo!() + } + } + + struct FakeBroadcast { + pub peer_reports: mpsc::Sender<(FuelPeerId, HeartBeatPeerReportReason, String)>, + } + + impl Broadcast for FakeBroadcast { + fn report_peer( + &self, + peer_id: FuelPeerId, + report: HeartBeatPeerReportReason, + reporting_service: &'static str, + ) -> anyhow::Result<()> { + self.peer_reports.try_send(( + peer_id, + report, + reporting_service.to_string(), + ))?; + Ok(()) + } + + fn block_height_broadcast( + &self, + _block_height_data: BlockHeightHeartbeatData, + ) -> anyhow::Result<()> { + todo!() + } + + fn tx_broadcast( + &self, + _transaction: TransactionGossipData, + ) -> anyhow::Result<()> { + todo!() + } + } + + #[tokio::test] + async fn peer_heartbeat_reputation_checks__slow_heartbeat_sends_reports() { + // given + let peer_id = PeerId::random(); + // more than limit + let last_duration = Duration::from_secs(30); + let mut durations = VecDeque::new(); + durations.push_front(last_duration); + + let heartbeat_data = HeartbeatData { + block_height: None, + last_heartbeat: Instant::now(), + window: 0, + durations, + }; + let peer_info = PeerInfo { + peer_addresses: Default::default(), + client_version: None, + heartbeat_data, + score: 100.0, + }; + let peer_info = vec![(peer_id, peer_info)]; + let p2p_service = FakeP2PService { peer_info }; + let (_request_sender, request_receiver) = mpsc::channel(100); + + let (report_sender, mut report_receiver) = mpsc::channel(100); + let broadcast = FakeBroadcast { + peer_reports: report_sender, + }; + + // Less than actual + let heartbeat_max_avg_interval = Duration::from_secs(20); + // Greater than actual + let heartbeat_max_time_since_last = Duration::from_secs(40); + + let mut task = Task { + p2p_service, + db: Arc::new(FakeDB), + next_block_height: FakeBlockImporter.next_block_height(), + request_receiver, + broadcast, + max_headers_per_request: 0, + heartbeat_check_interval: Duration::from_secs(0), + heartbeat_max_avg_interval, + heartbeat_max_time_since_last, + next_check_time: Instant::now(), + }; + let (watch_sender, watch_receiver) = tokio::sync::watch::channel(State::Started); + let mut watcher = StateWatcher::from(watch_receiver); + + // when + task.run(&mut watcher).await.unwrap(); + + // then + let (report_peer_id, report, reporting_service) = + report_receiver.recv().await.unwrap(); + + watch_sender.send(State::Stopped).unwrap(); + + assert_eq!( + FuelPeerId::from(peer_id.to_bytes().to_vec()), + report_peer_id + ); + assert_eq!(report, HeartBeatPeerReportReason::LowHeartBeatFrequency); + assert_eq!(reporting_service, "p2p"); + } + + #[tokio::test] + async fn peer_heartbeat_reputation_checks__old_heartbeat_sends_reports() { + // given + let peer_id = PeerId::random(); + // under the limit + let last_duration = Duration::from_secs(5); + let last_heartbeat = Instant::now() - Duration::from_secs(50); + let mut durations = VecDeque::new(); + durations.push_front(last_duration); + + let heartbeat_data = HeartbeatData { + block_height: None, + last_heartbeat, + window: 0, + durations, + }; + let peer_info = PeerInfo { + peer_addresses: Default::default(), + client_version: None, + heartbeat_data, + score: 100.0, + }; + let peer_info = vec![(peer_id, peer_info)]; + let p2p_service = FakeP2PService { peer_info }; + let (_request_sender, request_receiver) = mpsc::channel(100); + + let (report_sender, mut report_receiver) = mpsc::channel(100); + let broadcast = FakeBroadcast { + peer_reports: report_sender, + }; + + // Greater than actual + let heartbeat_max_avg_interval = Duration::from_secs(20); + // Less than actual + let heartbeat_max_time_since_last = Duration::from_secs(40); + + let mut task = Task { + p2p_service, + db: Arc::new(FakeDB), + next_block_height: FakeBlockImporter.next_block_height(), + request_receiver, + broadcast, + max_headers_per_request: 0, + heartbeat_check_interval: Duration::from_secs(0), + heartbeat_max_avg_interval, + heartbeat_max_time_since_last, + next_check_time: Instant::now(), + }; + let (watch_sender, watch_receiver) = tokio::sync::watch::channel(State::Started); + let mut watcher = StateWatcher::from(watch_receiver); + + // when + task.run(&mut watcher).await.unwrap(); + + // then + let (report_peer_id, report, reporting_service) = + report_receiver.recv().await.unwrap(); + + watch_sender.send(State::Stopped).unwrap(); + + assert_eq!( + FuelPeerId::from(peer_id.to_bytes().to_vec()), + report_peer_id + ); + assert_eq!(report, HeartBeatPeerReportReason::OldHeartBeat); + assert_eq!(reporting_service, "p2p"); + } } diff --git a/deployment/charts/templates/fuel-core-deploy.yaml b/deployment/charts/templates/fuel-core-deploy.yaml index a44d75f3..67e59542 100644 --- a/deployment/charts/templates/fuel-core-deploy.yaml +++ b/deployment/charts/templates/fuel-core-deploy.yaml @@ -220,6 +220,18 @@ spec: - "--max-transmit-size" - "{{ .Values.app.max_transmit_size }}" {{- end }} + {{- if .Values.app.heartbeat_check_interval }} + - "--heartbeat-check-interval" + - "{{ .Values.app.heartbeat_check_interval }}" + {{- end }} + {{- if .Values.app.heartbeat_max_avg_interval }} + - "--heartbeat-max-avg-interval" + - "{{ .Values.app.heartbeat_max_avg_interval }}" + {{- end }} + {{- if .Values.app.heartbeat_max_time_since_last }} + - "--heartbeat-max-time-since-last" + - "{{ .Values.app.heartbeat_max_time_since_last }}" + {{- end }} {{- if .Values.app.sync_block_stream_size }} - "--sync-block-stream-size" - "{{ .Values.app.sync_block_stream_size }}" diff --git a/deployment/charts/values.yaml b/deployment/charts/values.yaml index 62f50f2e..9842650f 100644 --- a/deployment/charts/values.yaml +++ b/deployment/charts/values.yaml @@ -21,6 +21,9 @@ app: max_headers_per_request: "${fuel_core_max_headers_per_request}" max_database_cache_size: "${fuel_core_max_database_cache_size}" max_transmit_size: "${fuel_core_max_buffer_size}" + heartbeat_check_interval: "${fuel_core_heartbeat_check_interval}" + heartbeat_max_avg_interval: "${fuel_core_heartbeat_max_avg_interval}" + heartbeat_max_time_since_last: "${fuel_core_heartbeat_max_time_since_last}" sync_block_stream_size: "${fuel_core_sync_block_stream_size}" sync_header_batch_size: "${fuel_core_sync_header_batch_size}" p2p_key: ${fuel_core_p2p_key}