From f69a499d16b0e77ee527edb964f76b9506d228b2 Mon Sep 17 00:00:00 2001 From: Turner Date: Thu, 7 Sep 2023 15:51:54 -0700 Subject: [PATCH 01/16] Add rolling average to heartbeat data, add future to select for heartbeat checks --- crates/services/p2p/src/peer_manager.rs | 102 ++++++++++++++---------- crates/services/p2p/src/service.rs | 40 +++++++++- 2 files changed, 97 insertions(+), 45 deletions(-) diff --git a/crates/services/p2p/src/peer_manager.rs b/crates/services/p2p/src/peer_manager.rs index 6a723e9566d..f2aae1d146e 100644 --- a/crates/services/p2p/src/peer_manager.rs +++ b/crates/services/p2p/src/peer_manager.rs @@ -40,7 +40,7 @@ const MIN_GOSSIPSUB_SCORE_BEFORE_BAN: AppScore = GRAYLIST_THRESHOLD; pub struct PeerInfo { pub peer_addresses: HashSet, pub client_version: Option, - pub heartbeat_data: HeartbeatData, + pub heartbeat_data: Option, pub score: AppScore, } @@ -55,12 +55,6 @@ impl Default for PeerInfo { } } -enum PeerInfoInsert { - Addresses(Vec), - ClientVersion(String), - HeartbeatData(HeartbeatData), -} - /// Manages Peers and their events #[derive(Debug)] pub struct PeerManager { @@ -117,14 +111,14 @@ impl PeerManager { ) { if let Some(time_elapsed) = self .get_peer_info(peer_id) - .and_then(|info| info.heartbeat_data.seconds_since_last_heartbeat()) + .and_then(|info| info.heartbeat_data.as_ref()) + .map(|data| data.seconds_since_last_heartbeat()) { - debug!(target: "fuel-p2p", "Previous heartbeat happened {:?} seconds ago", time_elapsed); + debug!(target: "fuel-p2p", "Previous heartbeat happened {:?} milliseconds ago", time_elapsed.as_millis()); } - let heartbeat_data = HeartbeatData::new(block_height); - - self.insert_peer_info(peer_id, PeerInfoInsert::HeartbeatData(heartbeat_data)); + let peers = self.get_relevant_peers(peer_id); + update_heartbeat(peers, peer_id, block_height); } /// Returns `true` signaling that the peer should be disconnected @@ -137,7 +131,8 @@ impl PeerManager { if initial_connection { self.handle_initial_connection(peer_id, addresses) } else { - self.insert_peer_info(peer_id, PeerInfoInsert::Addresses(addresses)); + let peers = self.get_relevant_peers(peer_id); + insert_peer_addresses(peers, peer_id, addresses); false } } @@ -148,8 +143,9 @@ impl PeerManager { addresses: Vec, agent_version: String, ) { - self.insert_peer_info(peer_id, PeerInfoInsert::ClientVersion(agent_version)); - self.insert_peer_info(peer_id, PeerInfoInsert::Addresses(addresses)); + let peers = self.get_relevant_peers(peer_id); + insert_client_version(peers, peer_id, agent_version); + insert_peer_addresses(peers, peer_id, addresses); } pub fn batch_update_score_with_decay(&mut self) { @@ -243,7 +239,11 @@ impl PeerManager { .iter() .chain(self.reserved_connected_peers.iter()) .filter(|(_, peer_info)| { - peer_info.heartbeat_data.block_height >= Some(*height) + peer_info + .heartbeat_data + .as_ref() + .map(|data| data.block_height) + >= Some(*height) }) .map(|(peer_id, _)| *peer_id) .choose(&mut range) @@ -280,7 +280,8 @@ impl PeerManager { self.send_reserved_peers_update(); } - self.insert_peer_info(peer_id, PeerInfoInsert::Addresses(addresses)); + let peers = self.get_relevant_peers(peer_id); + insert_peer_addresses(peers, peer_id, addresses); false } @@ -291,22 +292,11 @@ impl PeerManager { .send(self.reserved_connected_peers.len()); } - fn insert_peer_info(&mut self, peer_id: &PeerId, data: PeerInfoInsert) { - let peers = if self.reserved_peers.contains(peer_id) { + fn get_relevant_peers(&mut self, peer_id: &PeerId) -> &mut HashMap { + if self.reserved_peers.contains(peer_id) { &mut self.reserved_connected_peers } else { &mut self.non_reserved_connected_peers - }; - match data { - PeerInfoInsert::Addresses(addresses) => { - insert_peer_addresses(peers, peer_id, addresses) - } - PeerInfoInsert::ClientVersion(client_version) => { - insert_client_version(peers, peer_id, client_version) - } - PeerInfoInsert::HeartbeatData(block_height) => { - insert_heartbeat_data(peers, peer_id, block_height) - } } } } @@ -316,22 +306,43 @@ pub struct ConnectionState { peers_allowed: bool, } -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct HeartbeatData { - pub block_height: Option, - pub last_heartbeat: Option, + pub block_height: BlockHeight, + pub last_heartbeat: Instant, + // Size of moving average window + window: u32, + // How many heartbeats into the first window have been received + count: u32, + // Moving average of duration between heartbeats + moving_average: Duration, } impl HeartbeatData { - pub fn new(block_height: BlockHeight) -> Self { + pub fn new(block_height: BlockHeight, window: u32) -> Self { Self { - block_height: Some(block_height), - last_heartbeat: Some(Instant::now()), + block_height, + last_heartbeat: Instant::now(), + window, + count: 0, + moving_average: Duration::from_secs(0), } } - pub fn seconds_since_last_heartbeat(&self) -> Option { - self.last_heartbeat.map(|time| time.elapsed()) + pub fn seconds_since_last_heartbeat(&self) -> Duration { + self.last_heartbeat.elapsed() + } + + pub fn update(&mut self, block_height: BlockHeight) { + self.block_height = block_height; + let old_hearbeat = self.last_heartbeat; + self.last_heartbeat = Instant::now(); + let new_duration = self.last_heartbeat - old_hearbeat; + if self.count < self.window { + self.count += 1; + } + self.moving_average = + (self.moving_average * (self.count - 1) + new_duration) / self.count; } } @@ -369,13 +380,22 @@ fn insert_peer_addresses( } } -fn insert_heartbeat_data( +fn update_heartbeat( peers: &mut HashMap, peer_id: &PeerId, - heartbeat_data: HeartbeatData, + block_height: BlockHeight, ) { + const HEARTBEAT_AVG_WINDOW: u32 = 10; if let Some(peer) = peers.get_mut(peer_id) { - peer.heartbeat_data = heartbeat_data; + match peer.heartbeat_data { + Some(ref mut heartbeat_data) => { + heartbeat_data.update(block_height); + } + None => { + peer.heartbeat_data = + Some(HeartbeatData::new(block_height, HEARTBEAT_AVG_WINDOW)); + } + } } else { log_missing_peer(peer_id); } diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 1f97cb05690..a3b4cbccfb6 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -63,10 +63,16 @@ use std::{ ops::Range, sync::Arc, }; -use tokio::sync::{ - broadcast, - mpsc, - oneshot, +use tokio::{ + sync::{ + broadcast, + mpsc, + oneshot, + }, + time::{ + Duration, + Instant, + }, }; use tracing::warn; @@ -117,6 +123,9 @@ pub struct Task { request_receiver: mpsc::Receiver, shared: SharedState, max_headers_per_request: u32, + // milliseconds wait time between peer heartbeat reputation checks + check_frequency: Duration, + next_check_time: Instant, } impl Task { @@ -137,6 +146,11 @@ impl Task { let reserved_peers_broadcast = p2p_service.peer_manager().reserved_peers_updates(); + // TODO: Parameterize + let check_frequency = 10_000; // ten seconds + let check_frequency = Duration::from_millis(check_frequency); + let next_check_time = Instant::now() + check_frequency; + Self { p2p_service, db, @@ -149,8 +163,14 @@ impl Task { block_height_broadcast, }, max_headers_per_request, + check_frequency, + next_check_time, } } + + fn peer_heartbeat_reputation_checks(&self) -> anyhow::Result<()> { + todo!() + } } #[async_trait::async_trait] @@ -186,6 +206,7 @@ where { async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { let should_continue; + tokio::select! { biased; @@ -341,6 +362,17 @@ where _ => (), } }, + _ = tokio::time::sleep_until(self.next_check_time) => { + should_continue = true; + let res = self.peer_heartbeat_reputation_checks(); + match res { + Ok(_) => tracing::debug!("Peer heartbeat reputation checks completed"), + Err(e) => { + tracing::error!("Failed to perform peer heartbeat reputation checks: {:?}", e); + } + } + self.next_check_time = self.next_check_time + self.check_frequency; + }, latest_block_height = self.next_block_height.next() => { if let Some(latest_block_height) = latest_block_height { let _ = self.p2p_service.update_block_height(latest_block_height); From 602b5f5b9506955b1c92c0591d3bb4a0ee8d2aeb Mon Sep 17 00:00:00 2001 From: Turner Date: Thu, 7 Sep 2023 17:43:31 -0700 Subject: [PATCH 02/16] Add punishment code... without tests :( --- crates/services/p2p/src/peer_manager.rs | 58 ++++++++++++------------- crates/services/p2p/src/service.rs | 38 +++++++++++++++- 2 files changed, 65 insertions(+), 31 deletions(-) diff --git a/crates/services/p2p/src/peer_manager.rs b/crates/services/p2p/src/peer_manager.rs index f2aae1d146e..140452499c0 100644 --- a/crates/services/p2p/src/peer_manager.rs +++ b/crates/services/p2p/src/peer_manager.rs @@ -40,17 +40,17 @@ const MIN_GOSSIPSUB_SCORE_BEFORE_BAN: AppScore = GRAYLIST_THRESHOLD; pub struct PeerInfo { pub peer_addresses: HashSet, pub client_version: Option, - pub heartbeat_data: Option, + pub heartbeat_data: HeartbeatData, pub score: AppScore, } -impl Default for PeerInfo { - fn default() -> Self { +impl PeerInfo { + pub fn new(heartbeat_avg_window: u32) -> Self { Self { + peer_addresses: HashSet::new(), + client_version: None, + heartbeat_data: HeartbeatData::new(heartbeat_avg_window), score: DEFAULT_APP_SCORE, - client_version: Default::default(), - heartbeat_data: Default::default(), - peer_addresses: Default::default(), } } } @@ -111,8 +111,7 @@ impl PeerManager { ) { if let Some(time_elapsed) = self .get_peer_info(peer_id) - .and_then(|info| info.heartbeat_data.as_ref()) - .map(|data| data.seconds_since_last_heartbeat()) + .map(|info| info.heartbeat_data.duration_since_last_heartbeat()) { debug!(target: "fuel-p2p", "Previous heartbeat happened {:?} milliseconds ago", time_elapsed.as_millis()); } @@ -193,6 +192,12 @@ impl PeerManager { self.non_reserved_connected_peers.get(peer_id) } + pub fn get_all_peers(&self) -> impl Iterator { + self.non_reserved_connected_peers + .iter() + .chain(self.reserved_connected_peers.iter()) + } + pub fn get_disconnected_reserved_peers(&self) -> impl Iterator { self.reserved_peers .iter() @@ -239,11 +244,7 @@ impl PeerManager { .iter() .chain(self.reserved_connected_peers.iter()) .filter(|(_, peer_info)| { - peer_info - .heartbeat_data - .as_ref() - .map(|data| data.block_height) - >= Some(*height) + peer_info.heartbeat_data.block_height >= Some(*height) }) .map(|(peer_id, _)| *peer_id) .choose(&mut range) @@ -255,6 +256,8 @@ impl PeerManager { peer_id: &PeerId, addresses: Vec, ) -> bool { + const HEARTBEAT_AVG_WINDOW: u32 = 10; + // if the connected Peer is not from the reserved peers if !self.reserved_peers.contains(peer_id) { let non_reserved_peers_connected = self.non_reserved_connected_peers.len(); @@ -272,10 +275,10 @@ impl PeerManager { } self.non_reserved_connected_peers - .insert(*peer_id, PeerInfo::default()); + .insert(*peer_id, PeerInfo::new(HEARTBEAT_AVG_WINDOW)); } else { self.reserved_connected_peers - .insert(*peer_id, PeerInfo::default()); + .insert(*peer_id, PeerInfo::new(HEARTBEAT_AVG_WINDOW)); self.send_reserved_peers_update(); } @@ -308,7 +311,7 @@ pub struct ConnectionState { #[derive(Debug, Clone)] pub struct HeartbeatData { - pub block_height: BlockHeight, + pub block_height: Option, pub last_heartbeat: Instant, // Size of moving average window window: u32, @@ -319,9 +322,9 @@ pub struct HeartbeatData { } impl HeartbeatData { - pub fn new(block_height: BlockHeight, window: u32) -> Self { + pub fn new(window: u32) -> Self { Self { - block_height, + block_height: None, last_heartbeat: Instant::now(), window, count: 0, @@ -329,12 +332,16 @@ impl HeartbeatData { } } - pub fn seconds_since_last_heartbeat(&self) -> Duration { + pub fn duration_since_last_heartbeat(&self) -> Duration { self.last_heartbeat.elapsed() } + pub fn average_time_between_heartbeats(&self) -> Duration { + self.moving_average + } + pub fn update(&mut self, block_height: BlockHeight) { - self.block_height = block_height; + self.block_height = Some(block_height); let old_hearbeat = self.last_heartbeat; self.last_heartbeat = Instant::now(); let new_duration = self.last_heartbeat - old_hearbeat; @@ -385,17 +392,8 @@ fn update_heartbeat( peer_id: &PeerId, block_height: BlockHeight, ) { - const HEARTBEAT_AVG_WINDOW: u32 = 10; if let Some(peer) = peers.get_mut(peer_id) { - match peer.heartbeat_data { - Some(ref mut heartbeat_data) => { - heartbeat_data.update(block_height); - } - None => { - peer.heartbeat_data = - Some(HeartbeatData::new(block_height, HEARTBEAT_AVG_WINDOW)); - } - } + peer.heartbeat_data.update(block_height); } else { log_missing_peer(peer_id); } diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index a3b4cbccfb6..1ed9f634651 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -113,6 +113,18 @@ impl Debug for TaskRequest { } } +pub enum HeartBeatPeerReportReason { + OldHeartBeat, + LowHeartBeatFrequency, + NoHeartBeats, +} + +impl PeerReport for HeartBeatPeerReportReason { + fn get_score_from_report(&self) -> AppScore { + todo!() + } +} + /// Orchestrates various p2p-related events between the inner `P2pService` /// and the top level `NetworkService`. pub struct Task { @@ -169,10 +181,34 @@ impl Task { } fn peer_heartbeat_reputation_checks(&self) -> anyhow::Result<()> { - todo!() + const MAX_HEARTBEAT_AGE: Duration = Duration::from_millis(5_000); + const MAX_AVG_TIME_BETWEEN_HEARTBEATS: Duration = Duration::from_millis(1_000); + for (peer_id, peer_info) in self.p2p_service.peer_manager().get_all_peers() { + if peer_info.heartbeat_data.duration_since_last_heartbeat() + > MAX_HEARTBEAT_AGE + { + let report = HeartBeatPeerReportReason::OldHeartBeat; + let service = "p2p"; + let peer_id = convert_peer_id(peer_id)?; + self.shared.report_peer(peer_id, report, service)?; + } else if peer_info.heartbeat_data.average_time_between_heartbeats() + > MAX_AVG_TIME_BETWEEN_HEARTBEATS + { + let report = HeartBeatPeerReportReason::LowHeartBeatFrequency; + let service = "p2p"; + let peer_id = convert_peer_id(peer_id)?; + self.shared.report_peer(peer_id, report, service)?; + } + } + Ok(()) } } +fn convert_peer_id(peer_id: &PeerId) -> anyhow::Result { + let inner = Vec::try_from(peer_id.clone())?; + Ok(FuelPeerId::from(inner)) +} + #[async_trait::async_trait] impl RunnableService for Task where From 0f0bd2f7ea6d2edb8be515afe3a9447650a7064e Mon Sep 17 00:00:00 2001 From: Turner Date: Thu, 7 Sep 2023 17:48:02 -0700 Subject: [PATCH 03/16] Update changelog, remove unused variant --- CHANGELOG.md | 1 + crates/services/p2p/src/service.rs | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1bdbd931655..b1e80b37061 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ Description of the upcoming release here. - [#1274](https://github.com/FuelLabs/fuel-core/pull/1274): Added tests to benchmark block synchronization. - [#1263](https://github.com/FuelLabs/fuel-core/pull/1263): Add gas benchmarks for `ED19` and `ECR1` instructions. - [#1331](https://github.com/FuelLabs/fuel-core/pull/1331): Add peer reputation reporting to block import code +- [#1356](https://github.com/FuelLabs/fuel-core/pull/1356): Add peer reputation reporting to heartbeat code ### Changed diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 1ed9f634651..c542dd21c14 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -116,7 +116,6 @@ impl Debug for TaskRequest { pub enum HeartBeatPeerReportReason { OldHeartBeat, LowHeartBeatFrequency, - NoHeartBeats, } impl PeerReport for HeartBeatPeerReportReason { From 644cb791b8c29eb56a006ff5afb6f5e23d229e2e Mon Sep 17 00:00:00 2001 From: Turner Date: Wed, 13 Sep 2023 15:17:13 -0700 Subject: [PATCH 04/16] Add config values for heartbeat checks --- crates/services/p2p/src/config.rs | 13 +++++++++++++ crates/services/p2p/src/service.rs | 31 +++++++++++++++++------------- 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/crates/services/p2p/src/config.rs b/crates/services/p2p/src/config.rs index 1f91aa06d33..116dc55a120 100644 --- a/crates/services/p2p/src/config.rs +++ b/crates/services/p2p/src/config.rs @@ -131,6 +131,13 @@ pub struct Config { /// Sets the keep-alive timeout of idle connections. pub set_connection_keep_alive: Duration, + /// Time between checking heartbeat avgs for each peer + pub heartbeat_check_interval: Duration, + /// Max avg time between heartbeats for a given peer before getting reputation penalty + pub heartbeat_max_avg_interval: Duration, + /// Max time since a given peer has sent a heartbeat before getting reputation penalty + pub heartbeat_max_time_since_last: Duration, + /// Enables prometheus metrics for this fuel-service pub metrics: bool, @@ -176,6 +183,9 @@ impl Config { heartbeat_config: self.heartbeat_config, set_request_timeout: self.set_request_timeout, set_connection_keep_alive: self.set_connection_keep_alive, + heartbeat_check_interval: self.heartbeat_check_interval, + heartbeat_max_avg_interval: self.heartbeat_max_time_since_last, + heartbeat_max_time_since_last: self.heartbeat_max_time_since_last, metrics: self.metrics, state: Initialized(()), }) @@ -218,6 +228,9 @@ impl Config { heartbeat_config: HeartbeatConfig::default(), set_request_timeout: REQ_RES_TIMEOUT, set_connection_keep_alive: REQ_RES_TIMEOUT, + heartbeat_check_interval: Duration::from_secs(10), + heartbeat_max_avg_interval: Duration::from_secs(20), + heartbeat_max_time_since_last: Duration::from_secs(40), info_interval: Some(Duration::from_secs(3)), identify_interval: Some(Duration::from_secs(5)), metrics: false, diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index c542dd21c14..6b43878ef62 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -135,7 +135,9 @@ pub struct Task { shared: SharedState, max_headers_per_request: u32, // milliseconds wait time between peer heartbeat reputation checks - check_frequency: Duration, + heartbeat_check_interval: Duration, + heartbeat_max_avg_interval: Duration, + heartbeat_max_time_since_last: Duration, next_check_time: Instant, } @@ -145,22 +147,25 @@ impl Task { db: Arc, block_importer: Arc, ) -> Self { + let Config { + max_block_size, + max_headers_per_request, + heartbeat_check_interval, + heartbeat_max_avg_interval, + heartbeat_max_time_since_last, + .. + } = config; let (request_sender, request_receiver) = mpsc::channel(100); let (tx_broadcast, _) = broadcast::channel(100); let (block_height_broadcast, _) = broadcast::channel(100); let next_block_height = block_importer.next_block_height(); - let max_block_size = config.max_block_size; - let max_headers_per_request = config.max_headers_per_request; let p2p_service = FuelP2PService::new(config, PostcardCodec::new(max_block_size)); let reserved_peers_broadcast = p2p_service.peer_manager().reserved_peers_updates(); - // TODO: Parameterize - let check_frequency = 10_000; // ten seconds - let check_frequency = Duration::from_millis(check_frequency); - let next_check_time = Instant::now() + check_frequency; + let next_check_time = Instant::now() + heartbeat_check_interval; Self { p2p_service, @@ -174,24 +179,24 @@ impl Task { block_height_broadcast, }, max_headers_per_request, - check_frequency, + heartbeat_check_interval, + heartbeat_max_avg_interval, + heartbeat_max_time_since_last, next_check_time, } } fn peer_heartbeat_reputation_checks(&self) -> anyhow::Result<()> { - const MAX_HEARTBEAT_AGE: Duration = Duration::from_millis(5_000); - const MAX_AVG_TIME_BETWEEN_HEARTBEATS: Duration = Duration::from_millis(1_000); for (peer_id, peer_info) in self.p2p_service.peer_manager().get_all_peers() { if peer_info.heartbeat_data.duration_since_last_heartbeat() - > MAX_HEARTBEAT_AGE + > self.heartbeat_max_time_since_last { let report = HeartBeatPeerReportReason::OldHeartBeat; let service = "p2p"; let peer_id = convert_peer_id(peer_id)?; self.shared.report_peer(peer_id, report, service)?; } else if peer_info.heartbeat_data.average_time_between_heartbeats() - > MAX_AVG_TIME_BETWEEN_HEARTBEATS + > self.heartbeat_max_avg_interval { let report = HeartBeatPeerReportReason::LowHeartBeatFrequency; let service = "p2p"; @@ -406,7 +411,7 @@ where tracing::error!("Failed to perform peer heartbeat reputation checks: {:?}", e); } } - self.next_check_time = self.next_check_time + self.check_frequency; + self.next_check_time = self.next_check_time + self.heartbeat_check_interval; }, latest_block_height = self.next_block_height.next() => { if let Some(latest_block_height) = latest_block_height { From 092702073503eed4bb27e5b97e96718f41067912 Mon Sep 17 00:00:00 2001 From: Turner Date: Wed, 13 Sep 2023 15:30:36 -0700 Subject: [PATCH 05/16] Update docstring --- crates/services/p2p/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/services/p2p/src/config.rs b/crates/services/p2p/src/config.rs index 116dc55a120..1ce3f25294c 100644 --- a/crates/services/p2p/src/config.rs +++ b/crates/services/p2p/src/config.rs @@ -131,7 +131,7 @@ pub struct Config { /// Sets the keep-alive timeout of idle connections. pub set_connection_keep_alive: Duration, - /// Time between checking heartbeat avgs for each peer + /// Time between checking heartbeat status for all peers pub heartbeat_check_interval: Duration, /// Max avg time between heartbeats for a given peer before getting reputation penalty pub heartbeat_max_avg_interval: Duration, From b60341bfc8a2622bbc8cdc5844765d1fef95a5f1 Mon Sep 17 00:00:00 2001 From: Turner Date: Wed, 13 Sep 2023 17:48:37 -0700 Subject: [PATCH 06/16] Abstract Task P2P to enable tests --- Cargo.lock | 1 + crates/services/p2p/Cargo.toml | 1 + crates/services/p2p/src/p2p_service.rs | 4 +- .../p2p/src/request_response/messages.rs | 9 +- crates/services/p2p/src/service.rs | 146 ++++++++++++++++-- 5 files changed, 142 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fd9ac84ee2c..c93ffc98c17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3017,6 +3017,7 @@ dependencies = [ "serde", "serde_with", "sha2 0.10.7", + "thiserror", "tokio", "tracing", "tracing-attributes", diff --git a/crates/services/p2p/Cargo.toml b/crates/services/p2p/Cargo.toml index dcdfb4de3ec..1a1d75d9510 100644 --- a/crates/services/p2p/Cargo.toml +++ b/crates/services/p2p/Cargo.toml @@ -60,6 +60,7 @@ serde_with = "1.11" sha2 = "0.10" tokio = { workspace = true, features = ["sync"] } tracing = { workspace = true } +thiserror = "1.0.47" [dev-dependencies] ctor = "0.1" diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 967cd09f1c1..83912e9199b 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -275,7 +275,7 @@ impl FuelP2PService { .collect() } - pub fn get_peers_ids(&self) -> impl Iterator { + pub fn get_peers_ids_iter(&self) -> impl Iterator { self.peer_manager.get_peers_ids() } @@ -309,7 +309,7 @@ impl FuelP2PService { let peer_id = match peer_id { Some(peer_id) => peer_id, _ => { - let peers = self.get_peers_ids(); + let peers = self.get_peers_ids_iter(); let peers_count = self.peer_manager.total_peers_connected(); if peers_count == 0 { diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index d466f16cfa5..56eb6df75dc 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -21,6 +21,7 @@ use serde_with::{ serde_as, FromInto, }; +use thiserror::Error; use tokio::sync::oneshot; pub(crate) const REQUEST_RESPONSE_PROTOCOL_ID: &[u8] = b"/fuel/req_res/0.0.1"; @@ -80,14 +81,18 @@ pub enum OutboundResponse { Transactions(Option>>), } -#[derive(Debug)] +#[derive(Debug, Error)] pub enum RequestError { + #[error("Not currently connected to any peers")] NoPeersConnected, } -#[derive(Debug, Eq, PartialEq)] +#[derive(Debug, Eq, PartialEq, Error)] pub enum ResponseError { + #[error("Response channel does not exist")] ResponseChannelDoesNotExist, + #[error("Failed to send response")] SendingResponseFailed, + #[error("Failed to convert response to intermediate format")] ConversionToIntermediateFailed, } diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 6b43878ef62..86b64faa3f8 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -12,6 +12,7 @@ use crate::{ FuelP2PEvent, FuelP2PService, }, + peer_manager::PeerInfo, ports::{ BlockHeightImporter, P2pDb, @@ -53,11 +54,15 @@ use fuel_core_types::{ TransactionGossipData, }, }; -use futures::StreamExt; +use futures::{ + future::BoxFuture, + StreamExt, +}; use libp2p::{ gossipsub::MessageAcceptance, PeerId, }; +use libp2p_request_response::RequestId; use std::{ fmt::Debug, ops::Range, @@ -76,7 +81,7 @@ use tokio::{ }; use tracing::warn; -pub type Service = ServiceRunner>; +pub type Service = ServiceRunner, D>>; enum TaskRequest { // Broadcast requests to p2p network @@ -124,10 +129,118 @@ impl PeerReport for HeartBeatPeerReportReason { } } +pub trait TaskP2PService { + fn get_peer_ids(&self) -> Vec; + fn get_peer_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)>; + fn get_peer_id_with_height(&self, height: &BlockHeight) -> Option; + + fn next_event<'a>(&'a mut self) -> BoxFuture<'a, Option>; + + fn publish_message( + &mut self, + message: GossipsubBroadcastRequest, + ) -> anyhow::Result<()>; + fn send_request_msg( + &mut self, + peer_id: Option, + request_msg: RequestMessage, + channel_item: ResponseChannelItem, + ) -> anyhow::Result<()>; + + fn send_response_msg( + &mut self, + request_id: RequestId, + message: OutboundResponse, + ) -> anyhow::Result<()>; + fn report_message( + &mut self, + message: GossipsubMessageInfo, + acceptance: GossipsubMessageAcceptance, + ) -> anyhow::Result<()>; + + fn report_peer( + &mut self, + peer_id: PeerId, + score: AppScore, + reporting_service: &'static str, + ) -> anyhow::Result<()>; + + fn update_block_height(&mut self, height: BlockHeight) -> anyhow::Result<()>; +} + +impl TaskP2PService for FuelP2PService { + fn get_peer_ids(&self) -> Vec { + self.get_peers_ids_iter().copied().collect() + } + + fn get_peer_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)> { + self.peer_manager().get_all_peers().collect() + } + + fn get_peer_id_with_height(&self, height: &BlockHeight) -> Option { + self.peer_manager().get_peer_id_with_height(height) + } + + fn next_event<'a>(&'a mut self) -> BoxFuture<'a, Option> { + Box::pin(self.next_event()) + } + + fn publish_message( + &mut self, + message: GossipsubBroadcastRequest, + ) -> anyhow::Result<()> { + self.publish_message(message)?; + Ok(()) + } + + fn send_request_msg( + &mut self, + peer_id: Option, + request_msg: RequestMessage, + channel_item: ResponseChannelItem, + ) -> anyhow::Result<()> { + self.send_request_msg(peer_id, request_msg, channel_item)?; + Ok(()) + } + + fn send_response_msg( + &mut self, + request_id: RequestId, + message: OutboundResponse, + ) -> anyhow::Result<()> { + self.send_response_msg(request_id, message)?; + Ok(()) + } + + fn report_message( + &mut self, + message: GossipsubMessageInfo, + acceptance: GossipsubMessageAcceptance, + ) -> anyhow::Result<()> { + report_message(self, message, acceptance); + Ok(()) + } + + fn report_peer( + &mut self, + peer_id: PeerId, + score: AppScore, + reporting_service: &'static str, + ) -> anyhow::Result<()> { + self.report_peer(peer_id, score, reporting_service); + Ok(()) + } + + fn update_block_height(&mut self, height: BlockHeight) -> anyhow::Result<()> { + self.update_block_height(height); + Ok(()) + } +} + /// Orchestrates various p2p-related events between the inner `P2pService` /// and the top level `NetworkService`. -pub struct Task { - p2p_service: FuelP2PService, +pub struct Task { + p2p_service: P, db: Arc, next_block_height: BoxStream, /// Receive internal Task Requests @@ -141,7 +254,7 @@ pub struct Task { next_check_time: Instant, } -impl Task { +impl Task, D> { pub fn new( config: Config, db: Arc, @@ -185,9 +298,10 @@ impl Task { next_check_time, } } - +} +impl Task { fn peer_heartbeat_reputation_checks(&self) -> anyhow::Result<()> { - for (peer_id, peer_info) in self.p2p_service.peer_manager().get_all_peers() { + for (peer_id, peer_info) in self.p2p_service.get_peer_all_peer_info() { if peer_info.heartbeat_data.duration_since_last_heartbeat() > self.heartbeat_max_time_since_last { @@ -214,14 +328,14 @@ fn convert_peer_id(peer_id: &PeerId) -> anyhow::Result { } #[async_trait::async_trait] -impl RunnableService for Task +impl RunnableService for Task, D> where Self: RunnableTask, { const NAME: &'static str = "P2P"; type SharedData = SharedState; - type Task = Task; + type Task = Task, D>; type TaskParams = (); fn shared_data(&self) -> Self::SharedData { @@ -240,8 +354,9 @@ where // TODO: Add tests https://github.com/FuelLabs/fuel-core/issues/1275 #[async_trait::async_trait] -impl RunnableTask for Task +impl RunnableTask for Task where + P: TaskP2PService + Send + 'static, D: P2pDb + 'static, { async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { @@ -279,13 +394,13 @@ where } } Some(TaskRequest::GetPeerIds(channel)) => { - let peer_ids = self.p2p_service.get_peers_ids().copied().collect(); + let peer_ids = self.p2p_service.get_peer_ids(); let _ = channel.send(peer_ids); } Some(TaskRequest::GetBlock { height, channel }) => { let request_msg = RequestMessage::Block(height); let channel_item = ResponseChannelItem::Block(channel); - let peer = self.p2p_service.peer_manager().get_peer_id_with_height(&height); + let peer = self.p2p_service.get_peer_id_with_height(&height); let _ = self.p2p_service.send_request_msg(peer, request_msg, channel_item); } Some(TaskRequest::GetSealedHeaders { block_height_range, channel: response}) => { @@ -295,7 +410,7 @@ where // Note: this range has already been check for // validity in `SharedState::get_sealed_block_headers`. let block_height = BlockHeight::from(block_height_range.end - 1); - let peer = self.p2p_service.peer_manager() + let peer = self.p2p_service .get_peer_id_with_height(&block_height); let _ = self.p2p_service.send_request_msg(peer, request_msg, channel_item); } @@ -305,10 +420,11 @@ where let _ = self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel_item); } Some(TaskRequest::RespondWithGossipsubMessageReport((message, acceptance))) => { - report_message(&mut self.p2p_service, message, acceptance); + // report_message(&mut self.p2p_service, message, acceptance); + self.p2p_service.report_message(message, acceptance)?; } Some(TaskRequest::RespondWithPeerReport { peer_id, score, reporting_service }) => { - self.p2p_service.report_peer(peer_id, score, reporting_service) + let _ = self.p2p_service.report_peer(peer_id, score, reporting_service); } None => { unreachable!("The `Task` is holder of the `Sender`, so it should not be possible"); From 8265bc05906462ce5b7611071d0095196cc7224c Mon Sep 17 00:00:00 2001 From: Turner Date: Wed, 13 Sep 2023 17:53:02 -0700 Subject: [PATCH 07/16] Remove weird lifetime --- crates/services/p2p/src/service.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 86b64faa3f8..2e7fcc2573c 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -162,7 +162,7 @@ pub trait TaskP2PService { &mut self, peer_id: PeerId, score: AppScore, - reporting_service: &'static str, + reporting_service: &str, ) -> anyhow::Result<()>; fn update_block_height(&mut self, height: BlockHeight) -> anyhow::Result<()>; @@ -225,7 +225,7 @@ impl TaskP2PService for FuelP2PService { &mut self, peer_id: PeerId, score: AppScore, - reporting_service: &'static str, + reporting_service: &str, ) -> anyhow::Result<()> { self.report_peer(peer_id, score, reporting_service); Ok(()) From 2183dd1bdcce1ab820fa24304324c7be18981c4e Mon Sep 17 00:00:00 2001 From: Turner Date: Thu, 14 Sep 2023 11:21:19 -0700 Subject: [PATCH 08/16] Add broadcast trait and incomplete test --- crates/services/p2p/src/service.rs | 176 ++++++++++++++++++++++++++--- 1 file changed, 159 insertions(+), 17 deletions(-) diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 2e7fcc2573c..05783d7bff2 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -81,7 +81,7 @@ use tokio::{ }; use tracing::warn; -pub type Service = ServiceRunner, D>>; +pub type Service = ServiceRunner, D, SharedState>>; enum TaskRequest { // Broadcast requests to p2p network @@ -129,7 +129,7 @@ impl PeerReport for HeartBeatPeerReportReason { } } -pub trait TaskP2PService { +pub trait TaskP2PService: Send { fn get_peer_ids(&self) -> Vec; fn get_peer_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)>; fn get_peer_id_with_height(&self, height: &BlockHeight) -> Option; @@ -237,15 +237,55 @@ impl TaskP2PService for FuelP2PService { } } +pub trait Broadcast: Send { + fn report_peer( + &self, + peer_id: FuelPeerId, + report: T, + reporting_service: &'static str, + ) -> anyhow::Result<()>; + + fn block_height_broadcast( + &self, + block_height_data: BlockHeightHeartbeatData, + ) -> anyhow::Result<()>; + + fn tx_broadcast(&self, transaction: TransactionGossipData) -> anyhow::Result<()>; +} + +impl Broadcast for SharedState { + fn report_peer( + &self, + peer_id: FuelPeerId, + report: T, + reporting_service: &'static str, + ) -> anyhow::Result<()> { + self.report_peer(peer_id, report, reporting_service) + } + + fn block_height_broadcast( + &self, + block_height_data: BlockHeightHeartbeatData, + ) -> anyhow::Result<()> { + self.block_height_broadcast.send(block_height_data)?; + Ok(()) + } + + fn tx_broadcast(&self, transaction: TransactionGossipData) -> anyhow::Result<()> { + self.tx_broadcast.send(transaction)?; + Ok(()) + } +} + /// Orchestrates various p2p-related events between the inner `P2pService` /// and the top level `NetworkService`. -pub struct Task { +pub struct Task { p2p_service: P, db: Arc, next_block_height: BoxStream, /// Receive internal Task Requests request_receiver: mpsc::Receiver, - shared: SharedState, + broadcast: B, max_headers_per_request: u32, // milliseconds wait time between peer heartbeat reputation checks heartbeat_check_interval: Duration, @@ -254,7 +294,7 @@ pub struct Task { next_check_time: Instant, } -impl Task, D> { +impl Task, D, SharedState> { pub fn new( config: Config, db: Arc, @@ -285,7 +325,7 @@ impl Task, D> { db, request_receiver, next_block_height, - shared: SharedState { + broadcast: SharedState { request_sender, tx_broadcast, reserved_peers_broadcast, @@ -299,7 +339,7 @@ impl Task, D> { } } } -impl Task { +impl Task { fn peer_heartbeat_reputation_checks(&self) -> anyhow::Result<()> { for (peer_id, peer_info) in self.p2p_service.get_peer_all_peer_info() { if peer_info.heartbeat_data.duration_since_last_heartbeat() @@ -308,14 +348,14 @@ impl Task { let report = HeartBeatPeerReportReason::OldHeartBeat; let service = "p2p"; let peer_id = convert_peer_id(peer_id)?; - self.shared.report_peer(peer_id, report, service)?; + self.broadcast.report_peer(peer_id, report, service)?; } else if peer_info.heartbeat_data.average_time_between_heartbeats() > self.heartbeat_max_avg_interval { let report = HeartBeatPeerReportReason::LowHeartBeatFrequency; let service = "p2p"; let peer_id = convert_peer_id(peer_id)?; - self.shared.report_peer(peer_id, report, service)?; + self.broadcast.report_peer(peer_id, report, service)?; } } Ok(()) @@ -328,18 +368,18 @@ fn convert_peer_id(peer_id: &PeerId) -> anyhow::Result { } #[async_trait::async_trait] -impl RunnableService for Task, D> +impl RunnableService for Task, D, SharedState> where Self: RunnableTask, { const NAME: &'static str = "P2P"; type SharedData = SharedState; - type Task = Task, D>; + type Task = Task, D, SharedState>; type TaskParams = (); fn shared_data(&self) -> Self::SharedData { - self.shared.clone() + self.broadcast.clone() } async fn into_task( @@ -354,10 +394,11 @@ where // TODO: Add tests https://github.com/FuelLabs/fuel-core/issues/1275 #[async_trait::async_trait] -impl RunnableTask for Task +impl RunnableTask for Task where - P: TaskP2PService + Send + 'static, + P: TaskP2PService + 'static, D: P2pDb + 'static, + B: Broadcast + 'static, { async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { let should_continue; @@ -441,7 +482,7 @@ where block_height, }; - let _ = self.shared.block_height_broadcast.send(block_height_data); + let _ = self.broadcast.block_height_broadcast(block_height_data); } Some(FuelP2PEvent::GossipsubMessage { message, message_id, peer_id,.. }) => { let message_id = message_id.0; @@ -449,7 +490,7 @@ where match message { GossipsubMessage::NewTx(transaction) => { let next_transaction = GossipData::new(transaction, peer_id, message_id); - let _ = self.shared.tx_broadcast.send(next_transaction); + let _ = self.broadcast.tx_broadcast(next_transaction); }, GossipsubMessage::NewBlock(block) => { // todo: add logic to gossip newly received blocks @@ -763,7 +804,10 @@ pub mod tests { use super::*; - use fuel_core_services::Service; + use fuel_core_services::{ + Service, + State, + }; use fuel_core_storage::Result as StorageResult; use fuel_core_types::fuel_types::BlockHeight; @@ -819,4 +863,102 @@ pub mod tests { // Node with p2p service stopped assert!(service.stop_and_await().await.unwrap().stopped()); } + + struct FakeP2PService; + + impl TaskP2PService for FakeP2PService { + fn get_peer_ids(&self) -> Vec { + todo!() + } + + fn get_peer_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)> { + todo!() + } + + fn get_peer_id_with_height(&self, height: &BlockHeight) -> Option { + todo!() + } + + fn next_event<'a>(&'a mut self) -> BoxFuture<'a, Option> { + todo!() + } + + fn publish_message( + &mut self, + message: GossipsubBroadcastRequest, + ) -> anyhow::Result<()> { + todo!() + } + + fn send_request_msg( + &mut self, + peer_id: Option, + request_msg: RequestMessage, + channel_item: ResponseChannelItem, + ) -> anyhow::Result<()> { + todo!() + } + + fn send_response_msg( + &mut self, + request_id: RequestId, + message: OutboundResponse, + ) -> anyhow::Result<()> { + todo!() + } + + fn report_message( + &mut self, + message: GossipsubMessageInfo, + acceptance: GossipsubMessageAcceptance, + ) -> anyhow::Result<()> { + todo!() + } + + fn report_peer( + &mut self, + peer_id: PeerId, + score: AppScore, + reporting_service: &str, + ) -> anyhow::Result<()> { + todo!() + } + + fn update_block_height(&mut self, height: BlockHeight) -> anyhow::Result<()> { + todo!() + } + } + + #[tokio::test] + async fn peer_heartbeat_reputation_checks__sends_reports() { + // given + let p2p_service = FakeP2PService; + let (request_sender, request_receiver) = mpsc::channel(100); + let broadcast = SharedState { + request_sender, + tx_broadcast: broadcast::channel(100).0, + reserved_peers_broadcast: broadcast::channel(100).0, + block_height_broadcast: broadcast::channel(100).0, + }; + + let mut task = Task { + p2p_service, + db: Arc::new(()), + next_block_height: Pin {}, + request_receiver, + broadcast, + max_headers_per_request: 0, + heartbeat_check_interval: Duration::from_secs(0), + heartbeat_max_avg_interval: Default::default(), + heartbeat_max_time_since_last: Default::default(), + next_check_time: Instant::now(), + }; + let (watch_sender, watch_receiver) = watch::channel(State::Started); + let mut watcher = StateWatcher::from(watch_receiver); + // when + task.run(&mut watcher).await.unwrap(); + + // then + todo!() + } } From 61187a9b161abf321dcd80853b3720caac83ab47 Mon Sep 17 00:00:00 2001 From: Turner Date: Thu, 14 Sep 2023 13:25:52 -0700 Subject: [PATCH 09/16] Add a couple tests to verify checks behavior --- crates/services/p2p/src/peer_manager.rs | 6 +- crates/services/p2p/src/service.rs | 226 +++++++++++++++++++++--- 2 files changed, 205 insertions(+), 27 deletions(-) diff --git a/crates/services/p2p/src/peer_manager.rs b/crates/services/p2p/src/peer_manager.rs index 140452499c0..b281427425d 100644 --- a/crates/services/p2p/src/peer_manager.rs +++ b/crates/services/p2p/src/peer_manager.rs @@ -314,11 +314,11 @@ pub struct HeartbeatData { pub block_height: Option, pub last_heartbeat: Instant, // Size of moving average window - window: u32, + pub window: u32, // How many heartbeats into the first window have been received - count: u32, + pub count: u32, // Moving average of duration between heartbeats - moving_average: Duration, + pub moving_average: Duration, } impl HeartbeatData { diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 05783d7bff2..fe9d49305dd 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -118,6 +118,7 @@ impl Debug for TaskRequest { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum HeartBeatPeerReportReason { OldHeartBeat, LowHeartBeatFrequency, @@ -131,7 +132,7 @@ impl PeerReport for HeartBeatPeerReportReason { pub trait TaskP2PService: Send { fn get_peer_ids(&self) -> Vec; - fn get_peer_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)>; + fn get_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)>; fn get_peer_id_with_height(&self, height: &BlockHeight) -> Option; fn next_event<'a>(&'a mut self) -> BoxFuture<'a, Option>; @@ -173,7 +174,7 @@ impl TaskP2PService for FuelP2PService { self.get_peers_ids_iter().copied().collect() } - fn get_peer_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)> { + fn get_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)> { self.peer_manager().get_all_peers().collect() } @@ -238,10 +239,10 @@ impl TaskP2PService for FuelP2PService { } pub trait Broadcast: Send { - fn report_peer( + fn report_peer( &self, peer_id: FuelPeerId, - report: T, + report: HeartBeatPeerReportReason, reporting_service: &'static str, ) -> anyhow::Result<()>; @@ -254,10 +255,10 @@ pub trait Broadcast: Send { } impl Broadcast for SharedState { - fn report_peer( + fn report_peer( &self, peer_id: FuelPeerId, - report: T, + report: HeartBeatPeerReportReason, reporting_service: &'static str, ) -> anyhow::Result<()> { self.report_peer(peer_id, report, reporting_service) @@ -341,10 +342,14 @@ impl Task, D, SharedState> { } impl Task { fn peer_heartbeat_reputation_checks(&self) -> anyhow::Result<()> { - for (peer_id, peer_info) in self.p2p_service.get_peer_all_peer_info() { + for (peer_id, peer_info) in self.p2p_service.get_all_peer_info() { + let average_time_between_heartbeats = + peer_info.heartbeat_data.average_time_between_heartbeats(); + let heartbeat_max_avg_interval = self.heartbeat_max_avg_interval; if peer_info.heartbeat_data.duration_since_last_heartbeat() > self.heartbeat_max_time_since_last { + tracing::debug!("Peer {:?} has old heartbeat", peer_id); let report = HeartBeatPeerReportReason::OldHeartBeat; let service = "p2p"; let peer_id = convert_peer_id(peer_id)?; @@ -352,6 +357,7 @@ impl Task { } else if peer_info.heartbeat_data.average_time_between_heartbeats() > self.heartbeat_max_avg_interval { + tracing::debug!("Peer {:?} has low heartbeat frequency", peer_id); let report = HeartBeatPeerReportReason::LowHeartBeatFrequency; let service = "p2p"; let peer_id = convert_peer_id(peer_id)?; @@ -401,6 +407,7 @@ where B: Broadcast + 'static, { async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { + tracing::debug!("P2P task is running"); let should_continue; tokio::select! { @@ -580,6 +587,7 @@ where } } + tracing::debug!("P2P task is finished"); Ok(should_continue) } @@ -804,12 +812,14 @@ pub mod tests { use super::*; + use crate::peer_manager::HeartbeatData; use fuel_core_services::{ Service, State, }; use fuel_core_storage::Result as StorageResult; use fuel_core_types::fuel_types::BlockHeight; + use futures::FutureExt; #[derive(Clone, Debug)] struct FakeDb; @@ -864,15 +874,20 @@ pub mod tests { assert!(service.stop_and_await().await.unwrap().stopped()); } - struct FakeP2PService; + struct FakeP2PService { + peer_info: Vec<(PeerId, PeerInfo)>, + } impl TaskP2PService for FakeP2PService { fn get_peer_ids(&self) -> Vec { todo!() } - fn get_peer_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)> { - todo!() + fn get_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)> { + self.peer_info + .iter() + .map(|(peer_id, peer_info)| (peer_id, peer_info)) + .collect() } fn get_peer_id_with_height(&self, height: &BlockHeight) -> Option { @@ -880,7 +895,7 @@ pub mod tests { } fn next_event<'a>(&'a mut self) -> BoxFuture<'a, Option> { - todo!() + std::future::pending().boxed() } fn publish_message( @@ -929,36 +944,199 @@ pub mod tests { } } + struct FakeDB; + + impl P2pDb for FakeDB { + fn get_sealed_block( + &self, + height: &BlockHeight, + ) -> StorageResult> { + todo!() + } + + fn get_sealed_header( + &self, + height: &BlockHeight, + ) -> StorageResult> { + todo!() + } + + fn get_sealed_headers( + &self, + block_height_range: Range, + ) -> StorageResult> { + todo!() + } + + fn get_transactions( + &self, + block_id: &BlockId, + ) -> StorageResult>> { + todo!() + } + } + + struct FakeBroadcast { + pub peer_reports: mpsc::Sender<(FuelPeerId, HeartBeatPeerReportReason, String)>, + } + + impl Broadcast for FakeBroadcast { + fn report_peer( + &self, + peer_id: FuelPeerId, + report: HeartBeatPeerReportReason, + reporting_service: &'static str, + ) -> anyhow::Result<()> { + self.peer_reports.try_send(( + peer_id, + report, + reporting_service.to_string(), + ))?; + Ok(()) + } + + fn block_height_broadcast( + &self, + block_height_data: BlockHeightHeartbeatData, + ) -> anyhow::Result<()> { + todo!() + } + + fn tx_broadcast(&self, transaction: TransactionGossipData) -> anyhow::Result<()> { + todo!() + } + } + #[tokio::test] - async fn peer_heartbeat_reputation_checks__sends_reports() { + async fn peer_heartbeat_reputation_checks__slow_heartbeat_sends_reports() { // given - let p2p_service = FakeP2PService; + let peer_id = PeerId::random(); + // more than limit + let moving_average = Duration::from_secs(30); + + let heartbeat_data = HeartbeatData { + block_height: None, + last_heartbeat: Instant::now(), + window: 0, + count: 0, + moving_average, + }; + let peer_info = PeerInfo { + peer_addresses: Default::default(), + client_version: None, + heartbeat_data, + score: 100.0, + }; + let peer_info = vec![(peer_id, peer_info)]; + let p2p_service = FakeP2PService { peer_info }; let (request_sender, request_receiver) = mpsc::channel(100); - let broadcast = SharedState { - request_sender, - tx_broadcast: broadcast::channel(100).0, - reserved_peers_broadcast: broadcast::channel(100).0, - block_height_broadcast: broadcast::channel(100).0, + + let (report_sender, mut report_receiver) = mpsc::channel(100); + let broadcast = FakeBroadcast { + peer_reports: report_sender, }; + // Less than actual + let heartbeat_max_avg_interval = Duration::from_secs(20); + // Greater than actual + let heartbeat_max_time_since_last = Duration::from_secs(40); + let mut task = Task { p2p_service, - db: Arc::new(()), - next_block_height: Pin {}, + db: Arc::new(FakeDB), + next_block_height: FakeBlockImporter.next_block_height(), request_receiver, broadcast, max_headers_per_request: 0, heartbeat_check_interval: Duration::from_secs(0), - heartbeat_max_avg_interval: Default::default(), - heartbeat_max_time_since_last: Default::default(), + heartbeat_max_avg_interval, + heartbeat_max_time_since_last, next_check_time: Instant::now(), }; - let (watch_sender, watch_receiver) = watch::channel(State::Started); + let (watch_sender, watch_receiver) = tokio::sync::watch::channel(State::Started); let mut watcher = StateWatcher::from(watch_receiver); + // when task.run(&mut watcher).await.unwrap(); // then - todo!() + let (report_peer_id, report, reporting_service) = + report_receiver.recv().await.unwrap(); + + watch_sender.send(State::Stopped).unwrap(); + + assert_eq!( + FuelPeerId::from(peer_id.to_bytes().to_vec()), + report_peer_id + ); + assert_eq!(report, HeartBeatPeerReportReason::LowHeartBeatFrequency); + assert_eq!(reporting_service, "p2p"); + } + + #[tokio::test] + async fn peer_heartbeat_reputation_checks__old_heartbeat_sends_reports() { + // given + let peer_id = PeerId::random(); + // under the limit + let moving_average = Duration::from_secs(5); + let last_heartbeat = Instant::now() - Duration::from_secs(50); + + let heartbeat_data = HeartbeatData { + block_height: None, + last_heartbeat, + window: 0, + count: 0, + moving_average, + }; + let peer_info = PeerInfo { + peer_addresses: Default::default(), + client_version: None, + heartbeat_data, + score: 100.0, + }; + let peer_info = vec![(peer_id, peer_info)]; + let p2p_service = FakeP2PService { peer_info }; + let (request_sender, request_receiver) = mpsc::channel(100); + + let (report_sender, mut report_receiver) = mpsc::channel(100); + let broadcast = FakeBroadcast { + peer_reports: report_sender, + }; + + // Greater than actual + let heartbeat_max_avg_interval = Duration::from_secs(20); + // Less than actual + let heartbeat_max_time_since_last = Duration::from_secs(40); + + let mut task = Task { + p2p_service, + db: Arc::new(FakeDB), + next_block_height: FakeBlockImporter.next_block_height(), + request_receiver, + broadcast, + max_headers_per_request: 0, + heartbeat_check_interval: Duration::from_secs(0), + heartbeat_max_avg_interval, + heartbeat_max_time_since_last, + next_check_time: Instant::now(), + }; + let (watch_sender, watch_receiver) = tokio::sync::watch::channel(State::Started); + let mut watcher = StateWatcher::from(watch_receiver); + + // when + task.run(&mut watcher).await.unwrap(); + + // then + let (report_peer_id, report, reporting_service) = + report_receiver.recv().await.unwrap(); + + watch_sender.send(State::Stopped).unwrap(); + + assert_eq!( + FuelPeerId::from(peer_id.to_bytes().to_vec()), + report_peer_id + ); + assert_eq!(report, HeartBeatPeerReportReason::OldHeartBeat); + assert_eq!(reporting_service, "p2p"); } } From bd46994d0c22973a85ad620b846e56e9486d25a0 Mon Sep 17 00:00:00 2001 From: Turner Date: Thu, 14 Sep 2023 13:31:15 -0700 Subject: [PATCH 10/16] Fix warnings --- crates/services/p2p/src/service.rs | 49 +++++++++++++++--------------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index fe9d49305dd..acccf484be3 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -343,9 +343,6 @@ impl Task, D, SharedState> { impl Task { fn peer_heartbeat_reputation_checks(&self) -> anyhow::Result<()> { for (peer_id, peer_info) in self.p2p_service.get_all_peer_info() { - let average_time_between_heartbeats = - peer_info.heartbeat_data.average_time_between_heartbeats(); - let heartbeat_max_avg_interval = self.heartbeat_max_avg_interval; if peer_info.heartbeat_data.duration_since_last_heartbeat() > self.heartbeat_max_time_since_last { @@ -808,6 +805,7 @@ fn report_message( #[cfg(test)] pub mod tests { + #![allow(non_snake_case)] use crate::ports::P2pDb; use super::*; @@ -890,7 +888,7 @@ pub mod tests { .collect() } - fn get_peer_id_with_height(&self, height: &BlockHeight) -> Option { + fn get_peer_id_with_height(&self, _height: &BlockHeight) -> Option { todo!() } @@ -900,46 +898,46 @@ pub mod tests { fn publish_message( &mut self, - message: GossipsubBroadcastRequest, + _message: GossipsubBroadcastRequest, ) -> anyhow::Result<()> { todo!() } fn send_request_msg( &mut self, - peer_id: Option, - request_msg: RequestMessage, - channel_item: ResponseChannelItem, + _peer_id: Option, + _request_msg: RequestMessage, + _channel_item: ResponseChannelItem, ) -> anyhow::Result<()> { todo!() } fn send_response_msg( &mut self, - request_id: RequestId, - message: OutboundResponse, + _request_id: RequestId, + _message: OutboundResponse, ) -> anyhow::Result<()> { todo!() } fn report_message( &mut self, - message: GossipsubMessageInfo, - acceptance: GossipsubMessageAcceptance, + _message: GossipsubMessageInfo, + _acceptance: GossipsubMessageAcceptance, ) -> anyhow::Result<()> { todo!() } fn report_peer( &mut self, - peer_id: PeerId, - score: AppScore, - reporting_service: &str, + _peer_id: PeerId, + _score: AppScore, + _reporting_service: &str, ) -> anyhow::Result<()> { todo!() } - fn update_block_height(&mut self, height: BlockHeight) -> anyhow::Result<()> { + fn update_block_height(&mut self, _height: BlockHeight) -> anyhow::Result<()> { todo!() } } @@ -949,28 +947,28 @@ pub mod tests { impl P2pDb for FakeDB { fn get_sealed_block( &self, - height: &BlockHeight, + _height: &BlockHeight, ) -> StorageResult> { todo!() } fn get_sealed_header( &self, - height: &BlockHeight, + _height: &BlockHeight, ) -> StorageResult> { todo!() } fn get_sealed_headers( &self, - block_height_range: Range, + _block_height_range: Range, ) -> StorageResult> { todo!() } fn get_transactions( &self, - block_id: &BlockId, + _block_id: &BlockId, ) -> StorageResult>> { todo!() } @@ -997,12 +995,15 @@ pub mod tests { fn block_height_broadcast( &self, - block_height_data: BlockHeightHeartbeatData, + _block_height_data: BlockHeightHeartbeatData, ) -> anyhow::Result<()> { todo!() } - fn tx_broadcast(&self, transaction: TransactionGossipData) -> anyhow::Result<()> { + fn tx_broadcast( + &self, + _transaction: TransactionGossipData, + ) -> anyhow::Result<()> { todo!() } } @@ -1029,7 +1030,7 @@ pub mod tests { }; let peer_info = vec![(peer_id, peer_info)]; let p2p_service = FakeP2PService { peer_info }; - let (request_sender, request_receiver) = mpsc::channel(100); + let (_request_sender, request_receiver) = mpsc::channel(100); let (report_sender, mut report_receiver) = mpsc::channel(100); let broadcast = FakeBroadcast { @@ -1096,7 +1097,7 @@ pub mod tests { }; let peer_info = vec![(peer_id, peer_info)]; let p2p_service = FakeP2PService { peer_info }; - let (request_sender, request_receiver) = mpsc::channel(100); + let (_request_sender, request_receiver) = mpsc::channel(100); let (report_sender, mut report_receiver) = mpsc::channel(100); let broadcast = FakeBroadcast { From 4c598730d4f7bdbf6a894a4c4da3c341867c9e75 Mon Sep 17 00:00:00 2001 From: Turner Date: Thu, 14 Sep 2023 13:50:53 -0700 Subject: [PATCH 11/16] Appease Clippy-sama, rename function --- bin/fuel-core/src/cli/run/p2p.rs | 19 +++++++++++++++++++ crates/services/p2p/Cargo.toml | 2 +- crates/services/p2p/src/peer_manager.rs | 13 ++++++++----- crates/services/p2p/src/service.rs | 10 +++++----- 4 files changed, 33 insertions(+), 11 deletions(-) diff --git a/bin/fuel-core/src/cli/run/p2p.rs b/bin/fuel-core/src/cli/run/p2p.rs index b48631eaa06..2e3f5f43a58 100644 --- a/bin/fuel-core/src/cli/run/p2p.rs +++ b/bin/fuel-core/src/cli/run/p2p.rs @@ -175,6 +175,18 @@ pub struct P2PArgs { /// Cannot be zero. #[clap(long = "heartbeat-max-failures", default_value = "5", env)] pub heartbeat_max_failures: NonZeroU32, + + /// For peer reputations, the interval at which to check heartbeat health for all peers + #[clap(long = "heartbeat-check-interval", default_value = "5", env)] + pub heartbeat_check_interval: u64, + + /// For peer reputations, the maximum average interval between heartbeats for a peer before penalty + #[clap(long = "heartbeat-max-avg-interval", default_value = "20", env)] + pub heartbeat_max_avg_interval: u64, + + /// For peer reputations, the maximum time since last heartbeat before penalty + #[clap(long = "heartbeat-max-time-since-last", default_value = "40", env)] + pub heartbeat_max_time_since_last: u64, } #[derive(Debug, Clone, Args)] @@ -302,6 +314,13 @@ impl P2PArgs { heartbeat_config, set_request_timeout: Duration::from_secs(self.request_timeout), set_connection_keep_alive: Duration::from_secs(self.connection_keep_alive), + heartbeat_check_interval: Duration::from_secs(self.heartbeat_check_interval), + heartbeat_max_avg_interval: Duration::from_secs( + self.heartbeat_max_avg_interval, + ), + heartbeat_max_time_since_last: Duration::from_secs( + self.heartbeat_max_time_since_last, + ), info_interval: Some(Duration::from_secs(self.info_interval)), identify_interval: Some(Duration::from_secs(self.identify_interval)), metrics, diff --git a/crates/services/p2p/Cargo.toml b/crates/services/p2p/Cargo.toml index 1a1d75d9510..06e82564e75 100644 --- a/crates/services/p2p/Cargo.toml +++ b/crates/services/p2p/Cargo.toml @@ -58,9 +58,9 @@ rand = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_with = "1.11" sha2 = "0.10" +thiserror = "1.0.47" tokio = { workspace = true, features = ["sync"] } tracing = { workspace = true } -thiserror = "1.0.47" [dev-dependencies] ctor = "0.1" diff --git a/crates/services/p2p/src/peer_manager.rs b/crates/services/p2p/src/peer_manager.rs index b281427425d..25f7090c9d8 100644 --- a/crates/services/p2p/src/peer_manager.rs +++ b/crates/services/p2p/src/peer_manager.rs @@ -116,7 +116,7 @@ impl PeerManager { debug!(target: "fuel-p2p", "Previous heartbeat happened {:?} milliseconds ago", time_elapsed.as_millis()); } - let peers = self.get_relevant_peers(peer_id); + let peers = self.get_relevant_peers_mut(peer_id); update_heartbeat(peers, peer_id, block_height); } @@ -130,7 +130,7 @@ impl PeerManager { if initial_connection { self.handle_initial_connection(peer_id, addresses) } else { - let peers = self.get_relevant_peers(peer_id); + let peers = self.get_relevant_peers_mut(peer_id); insert_peer_addresses(peers, peer_id, addresses); false } @@ -142,7 +142,7 @@ impl PeerManager { addresses: Vec, agent_version: String, ) { - let peers = self.get_relevant_peers(peer_id); + let peers = self.get_relevant_peers_mut(peer_id); insert_client_version(peers, peer_id, agent_version); insert_peer_addresses(peers, peer_id, addresses); } @@ -283,7 +283,7 @@ impl PeerManager { self.send_reserved_peers_update(); } - let peers = self.get_relevant_peers(peer_id); + let peers = self.get_relevant_peers_mut(peer_id); insert_peer_addresses(peers, peer_id, addresses); false @@ -295,7 +295,10 @@ impl PeerManager { .send(self.reserved_connected_peers.len()); } - fn get_relevant_peers(&mut self, peer_id: &PeerId) -> &mut HashMap { + fn get_relevant_peers_mut( + &mut self, + peer_id: &PeerId, + ) -> &mut HashMap { if self.reserved_peers.contains(peer_id) { &mut self.reserved_connected_peers } else { diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index acccf484be3..d47543a03e8 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -135,7 +135,7 @@ pub trait TaskP2PService: Send { fn get_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)>; fn get_peer_id_with_height(&self, height: &BlockHeight) -> Option; - fn next_event<'a>(&'a mut self) -> BoxFuture<'a, Option>; + fn next_event(&mut self) -> BoxFuture<'_, Option>; fn publish_message( &mut self, @@ -182,7 +182,7 @@ impl TaskP2PService for FuelP2PService { self.peer_manager().get_peer_id_with_height(height) } - fn next_event<'a>(&'a mut self) -> BoxFuture<'a, Option> { + fn next_event(&mut self) -> BoxFuture<'_, Option> { Box::pin(self.next_event()) } @@ -366,7 +366,7 @@ impl Task { } fn convert_peer_id(peer_id: &PeerId) -> anyhow::Result { - let inner = Vec::try_from(peer_id.clone())?; + let inner = Vec::try_from(*peer_id)?; Ok(FuelPeerId::from(inner)) } @@ -572,7 +572,7 @@ where tracing::error!("Failed to perform peer heartbeat reputation checks: {:?}", e); } } - self.next_check_time = self.next_check_time + self.heartbeat_check_interval; + self.next_check_time += self.heartbeat_check_interval; }, latest_block_height = self.next_block_height.next() => { if let Some(latest_block_height) = latest_block_height { @@ -892,7 +892,7 @@ pub mod tests { todo!() } - fn next_event<'a>(&'a mut self) -> BoxFuture<'a, Option> { + fn next_event(&mut self) -> BoxFuture<'_, Option> { std::future::pending().boxed() } From 19d750a61e5bb169471909f3a90e34736fa272a4 Mon Sep 17 00:00:00 2001 From: Turner Date: Thu, 14 Sep 2023 13:57:17 -0700 Subject: [PATCH 12/16] Add values to deployment config --- deployment/charts/templates/fuel-core-deploy.yaml | 12 ++++++++++++ deployment/charts/values.yaml | 3 +++ 2 files changed, 15 insertions(+) diff --git a/deployment/charts/templates/fuel-core-deploy.yaml b/deployment/charts/templates/fuel-core-deploy.yaml index a44d75f3c52..b683e1e4312 100644 --- a/deployment/charts/templates/fuel-core-deploy.yaml +++ b/deployment/charts/templates/fuel-core-deploy.yaml @@ -220,6 +220,18 @@ spec: - "--max-transmit-size" - "{{ .Values.app.max_transmit_size }}" {{- end }} + {{- if .Values.app.heartbeat_check_interval }} + - "--heartbeat-check-interval" + - "{{ .Values.app.heartbeat_check_interval }}" + {{- end }} + {{- if .Values.app.heartbeat_max_avg_interval }} + - "--heartbeat-max-avg-interval" + - "{{ .Values.app.heartbeat_max_avg_interval }}" + {{- end }} + {{- if .Values.app.heartbeat_max_time_since_last }} + - "--heartbeat-max-time-since-last" + - "{{ .Values.app.heartbeat_max_time_since_last }}" + {{- end }} {{- if .Values.app.sync_block_stream_size }} - "--sync-block-stream-size" - "{{ .Values.app.sync_block_stream_size }}" diff --git a/deployment/charts/values.yaml b/deployment/charts/values.yaml index 62f50f2e72e..9842650f3bc 100644 --- a/deployment/charts/values.yaml +++ b/deployment/charts/values.yaml @@ -21,6 +21,9 @@ app: max_headers_per_request: "${fuel_core_max_headers_per_request}" max_database_cache_size: "${fuel_core_max_database_cache_size}" max_transmit_size: "${fuel_core_max_buffer_size}" + heartbeat_check_interval: "${fuel_core_heartbeat_check_interval}" + heartbeat_max_avg_interval: "${fuel_core_heartbeat_max_avg_interval}" + heartbeat_max_time_since_last: "${fuel_core_heartbeat_max_time_since_last}" sync_block_stream_size: "${fuel_core_sync_block_stream_size}" sync_header_batch_size: "${fuel_core_sync_header_batch_size}" p2p_key: ${fuel_core_p2p_key} From 05d707a6d51beeebac6dcd92476d93186ef38997 Mon Sep 17 00:00:00 2001 From: Turner Date: Thu, 14 Sep 2023 14:09:58 -0700 Subject: [PATCH 13/16] Fix formating --- deployment/charts/templates/fuel-core-deploy.yaml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/deployment/charts/templates/fuel-core-deploy.yaml b/deployment/charts/templates/fuel-core-deploy.yaml index b683e1e4312..67e59542e91 100644 --- a/deployment/charts/templates/fuel-core-deploy.yaml +++ b/deployment/charts/templates/fuel-core-deploy.yaml @@ -221,16 +221,16 @@ spec: - "{{ .Values.app.max_transmit_size }}" {{- end }} {{- if .Values.app.heartbeat_check_interval }} - - "--heartbeat-check-interval" - - "{{ .Values.app.heartbeat_check_interval }}" + - "--heartbeat-check-interval" + - "{{ .Values.app.heartbeat_check_interval }}" {{- end }} {{- if .Values.app.heartbeat_max_avg_interval }} - - "--heartbeat-max-avg-interval" - - "{{ .Values.app.heartbeat_max_avg_interval }}" + - "--heartbeat-max-avg-interval" + - "{{ .Values.app.heartbeat_max_avg_interval }}" {{- end }} {{- if .Values.app.heartbeat_max_time_since_last }} - - "--heartbeat-max-time-since-last" - - "{{ .Values.app.heartbeat_max_time_since_last }}" + - "--heartbeat-max-time-since-last" + - "{{ .Values.app.heartbeat_max_time_since_last }}" {{- end }} {{- if .Values.app.sync_block_stream_size }} - "--sync-block-stream-size" From a3f578d9a895574665be4f2aff516da368e88438 Mon Sep 17 00:00:00 2001 From: Mitchell Turner Date: Thu, 14 Sep 2023 18:04:17 -0700 Subject: [PATCH 14/16] Update crates/services/p2p/src/peer_manager.rs Co-authored-by: Brandon Kite --- crates/services/p2p/src/peer_manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/services/p2p/src/peer_manager.rs b/crates/services/p2p/src/peer_manager.rs index 25f7090c9d8..e12c4770b84 100644 --- a/crates/services/p2p/src/peer_manager.rs +++ b/crates/services/p2p/src/peer_manager.rs @@ -295,7 +295,7 @@ impl PeerManager { .send(self.reserved_connected_peers.len()); } - fn get_relevant_peers_mut( + fn get_assigned_peer_table_mut( &mut self, peer_id: &PeerId, ) -> &mut HashMap { From f62d4ca196eaeb35b4137ede66f0cc75ef766fbc Mon Sep 17 00:00:00 2001 From: mtchtrnr Date: Thu, 14 Sep 2023 18:19:15 -0700 Subject: [PATCH 15/16] Fix names --- crates/services/p2p/src/peer_manager.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/services/p2p/src/peer_manager.rs b/crates/services/p2p/src/peer_manager.rs index e12c4770b84..c4dc734488a 100644 --- a/crates/services/p2p/src/peer_manager.rs +++ b/crates/services/p2p/src/peer_manager.rs @@ -116,7 +116,7 @@ impl PeerManager { debug!(target: "fuel-p2p", "Previous heartbeat happened {:?} milliseconds ago", time_elapsed.as_millis()); } - let peers = self.get_relevant_peers_mut(peer_id); + let peers = self.get_assigned_peer_table_mut(peer_id); update_heartbeat(peers, peer_id, block_height); } @@ -130,7 +130,7 @@ impl PeerManager { if initial_connection { self.handle_initial_connection(peer_id, addresses) } else { - let peers = self.get_relevant_peers_mut(peer_id); + let peers = self.get_assigned_peer_table_mut(peer_id); insert_peer_addresses(peers, peer_id, addresses); false } @@ -142,7 +142,7 @@ impl PeerManager { addresses: Vec, agent_version: String, ) { - let peers = self.get_relevant_peers_mut(peer_id); + let peers = self.get_assigned_peer_table_mut(peer_id); insert_client_version(peers, peer_id, agent_version); insert_peer_addresses(peers, peer_id, addresses); } @@ -283,7 +283,7 @@ impl PeerManager { self.send_reserved_peers_update(); } - let peers = self.get_relevant_peers_mut(peer_id); + let peers = self.get_assigned_peer_table_mut(peer_id); insert_peer_addresses(peers, peer_id, addresses); false From 623a2054d3cdb4f89c5569c66cfe6968baec6f7d Mon Sep 17 00:00:00 2001 From: Turner Date: Fri, 15 Sep 2023 12:48:36 -0700 Subject: [PATCH 16/16] Redesign moving avg with tests --- crates/services/p2p/Cargo.toml | 2 +- crates/services/p2p/src/peer_manager.rs | 53 ++--------- .../p2p/src/peer_manager/heartbeat_data.rs | 89 +++++++++++++++++++ crates/services/p2p/src/service.rs | 17 ++-- 4 files changed, 106 insertions(+), 55 deletions(-) create mode 100644 crates/services/p2p/src/peer_manager/heartbeat_data.rs diff --git a/crates/services/p2p/Cargo.toml b/crates/services/p2p/Cargo.toml index 06e82564e75..86dcda5266c 100644 --- a/crates/services/p2p/Cargo.toml +++ b/crates/services/p2p/Cargo.toml @@ -71,7 +71,7 @@ fuel-core-types = { path = "../../types", features = [ "test-helpers" ] } rand = { workspace = true } -tokio = { workspace = true, features = ["full"] } +tokio = { workspace = true, features = ["full", "test-util"] } tracing-attributes = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } diff --git a/crates/services/p2p/src/peer_manager.rs b/crates/services/p2p/src/peer_manager.rs index c4dc734488a..ba700267031 100644 --- a/crates/services/p2p/src/peer_manager.rs +++ b/crates/services/p2p/src/peer_manager.rs @@ -22,15 +22,18 @@ use std::{ Arc, RwLock, }, - time::Duration, }; -use tokio::time::Instant; use tracing::{ debug, info, }; -use crate::gossipsub_config::GRAYLIST_THRESHOLD; +use crate::{ + gossipsub_config::GRAYLIST_THRESHOLD, + peer_manager::heartbeat_data::HeartbeatData, +}; + +pub mod heartbeat_data; /// At this point we better just ban the peer const MIN_GOSSIPSUB_SCORE_BEFORE_BAN: AppScore = GRAYLIST_THRESHOLD; @@ -312,50 +315,6 @@ pub struct ConnectionState { peers_allowed: bool, } -#[derive(Debug, Clone)] -pub struct HeartbeatData { - pub block_height: Option, - pub last_heartbeat: Instant, - // Size of moving average window - pub window: u32, - // How many heartbeats into the first window have been received - pub count: u32, - // Moving average of duration between heartbeats - pub moving_average: Duration, -} - -impl HeartbeatData { - pub fn new(window: u32) -> Self { - Self { - block_height: None, - last_heartbeat: Instant::now(), - window, - count: 0, - moving_average: Duration::from_secs(0), - } - } - - pub fn duration_since_last_heartbeat(&self) -> Duration { - self.last_heartbeat.elapsed() - } - - pub fn average_time_between_heartbeats(&self) -> Duration { - self.moving_average - } - - pub fn update(&mut self, block_height: BlockHeight) { - self.block_height = Some(block_height); - let old_hearbeat = self.last_heartbeat; - self.last_heartbeat = Instant::now(); - let new_duration = self.last_heartbeat - old_hearbeat; - if self.count < self.window { - self.count += 1; - } - self.moving_average = - (self.moving_average * (self.count - 1) + new_duration) / self.count; - } -} - impl ConnectionState { pub fn new() -> Arc> { Arc::new(RwLock::new(Self { diff --git a/crates/services/p2p/src/peer_manager/heartbeat_data.rs b/crates/services/p2p/src/peer_manager/heartbeat_data.rs new file mode 100644 index 00000000000..a2889dc2df9 --- /dev/null +++ b/crates/services/p2p/src/peer_manager/heartbeat_data.rs @@ -0,0 +1,89 @@ +use fuel_core_types::fuel_types::BlockHeight; +use std::{ + collections::VecDeque, + time::Duration, +}; +use tokio::time::Instant; + +#[derive(Debug, Clone)] +pub struct HeartbeatData { + pub block_height: Option, + pub last_heartbeat: Instant, + // Size of moving average window + pub window: u32, + pub durations: VecDeque, +} + +impl HeartbeatData { + pub fn new(window: u32) -> Self { + Self { + block_height: None, + last_heartbeat: Instant::now(), + window, + durations: VecDeque::with_capacity(window as usize), + } + } + + pub fn duration_since_last_heartbeat(&self) -> Duration { + self.last_heartbeat.elapsed() + } + + pub fn average_time_between_heartbeats(&self) -> Duration { + if self.durations.is_empty() { + Duration::from_secs(0) + } else { + self.durations.iter().sum::() / self.durations.len() as u32 + } + } + + fn add_new_duration(&mut self, new_duration: Duration) { + if self.durations.len() == self.window as usize { + self.durations.pop_back(); + } + self.durations.push_front(new_duration); + } + + pub fn update(&mut self, block_height: BlockHeight) { + self.block_height = Some(block_height); + let old_hearbeat = self.last_heartbeat; + self.last_heartbeat = Instant::now(); + let new_duration = self.last_heartbeat - old_hearbeat; + self.add_new_duration(new_duration); + } +} + +#[cfg(test)] +mod tests { + #![allow(non_snake_case)] + use super::*; + + #[tokio::test(start_paused = true)] + async fn duration_since_last_heartbeat__reads_correctly() { + let heartbeat_data = HeartbeatData::new(10); + tokio::time::advance(Duration::from_secs(10)).await; + assert_eq!( + heartbeat_data.duration_since_last_heartbeat(), + Duration::from_secs(10) + ); + } + + #[tokio::test(start_paused = true)] + async fn update__works_with_many() { + let intervals: Vec = + vec![5, 40, 19, 400, 23, 36, 33, 22, 11, 10, 9, 8, 72, 16, 5, 4]; + let mut heartbeat_data = HeartbeatData::new(10); + for (i, interval) in intervals.clone().into_iter().enumerate() { + tokio::time::advance(Duration::from_secs(interval)).await; + heartbeat_data.update(1.into()); + let bottom = if i < 10 { 0 } else { i - 9 }; + let range = &intervals[bottom..=i]; + let expected = range + .iter() + .map(|x| Duration::from_secs(*x)) + .sum::() + / range.len() as u32; + let actual = heartbeat_data.average_time_between_heartbeats(); + assert_eq!(actual, expected); + } + } +} diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index d47543a03e8..5e9f2c33bdb 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -810,7 +810,7 @@ pub mod tests { use super::*; - use crate::peer_manager::HeartbeatData; + use crate::peer_manager::heartbeat_data::HeartbeatData; use fuel_core_services::{ Service, State, @@ -818,6 +818,7 @@ pub mod tests { use fuel_core_storage::Result as StorageResult; use fuel_core_types::fuel_types::BlockHeight; use futures::FutureExt; + use std::collections::VecDeque; #[derive(Clone, Debug)] struct FakeDb; @@ -1013,14 +1014,15 @@ pub mod tests { // given let peer_id = PeerId::random(); // more than limit - let moving_average = Duration::from_secs(30); + let last_duration = Duration::from_secs(30); + let mut durations = VecDeque::new(); + durations.push_front(last_duration); let heartbeat_data = HeartbeatData { block_height: None, last_heartbeat: Instant::now(), window: 0, - count: 0, - moving_average, + durations, }; let peer_info = PeerInfo { peer_addresses: Default::default(), @@ -1079,15 +1081,16 @@ pub mod tests { // given let peer_id = PeerId::random(); // under the limit - let moving_average = Duration::from_secs(5); + let last_duration = Duration::from_secs(5); let last_heartbeat = Instant::now() - Duration::from_secs(50); + let mut durations = VecDeque::new(); + durations.push_front(last_duration); let heartbeat_data = HeartbeatData { block_height: None, last_heartbeat, window: 0, - count: 0, - moving_average, + durations, }; let peer_info = PeerInfo { peer_addresses: Default::default(),