From 0d847d6317fbba0b8989fb8607a1fd85d1d3aa35 Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Fri, 12 Jul 2024 07:31:15 -1000 Subject: [PATCH 1/2] peer: add timer for high latency --- CHECKLIST.md | 10 ++++---- src/peers/counter.rs | 56 ++++++++++++++++++++++++++++++++++++++++++++ src/peers/peer.rs | 22 +++++++++++++++-- 3 files changed, 81 insertions(+), 7 deletions(-) diff --git a/CHECKLIST.md b/CHECKLIST.md index ff9318b..df15899 100644 --- a/CHECKLIST.md +++ b/CHECKLIST.md @@ -84,8 +84,8 @@ - [ ] Set up "timer" - [x] Check for DOS - [x] Message counter - - [ ] `Ping` if peer has not been heard from -- [ ] `Disconnect` peers with high latency + - [ ] `Ping` if peer has not been heard from (Probably better to just disconnect) +- [x] `Disconnect` peers with high latency (If we send a critical message and a peer doesn't respond in 5 seconds, disconnect) - [ ] Add BIP-324 with V1 fallback #### Transaction Broadcaster @@ -105,7 +105,7 @@ - [ ] Chain - [x] Usual extend - [x] Fork with less work - - [ ] Orphaned fork + - [x] Orphaned fork - [x] Fork with equal work - [x] Fork with more work - [ ] CF header chain @@ -128,11 +128,11 @@ - [x] Valid transaction broadcast - [x] Live fork - [x] Fork with SQL - - [x] Fork with a stale anchor checkpoint start + - [x] Fork with a stale anchor checkpoint start - [x] Depth two fork, SQL - [ ] CI - [x] MacOS, Windows, Linux - [x] 1.63, stable, beta, nightly - [x] Format and clippy - [ ] Regtest sync with Bitcoin Core - - [x] On PR \ No newline at end of file + - [x] On PR diff --git a/src/peers/counter.rs b/src/peers/counter.rs index e44a18c..ce78c30 100644 --- a/src/peers/counter.rs +++ b/src/peers/counter.rs @@ -1,3 +1,7 @@ +use tokio::time::Instant; + +const FIVE_SEC: u64 = 5; + // Very simple denial of service protection so a peer cannot spam us with unsolicited messages. #[derive(Debug, Clone)] pub(crate) struct MessageCounter { @@ -92,3 +96,55 @@ impl MessageCounter { || self.tx < 0 } } + +// +#[derive(Debug, Clone)] +pub(crate) struct MessageTimer { + tracked_time: Option, +} + +impl MessageTimer { + pub(crate) fn new() -> Self { + Self { tracked_time: None } + } + + pub(crate) fn track(&mut self) { + self.tracked_time = Some(Instant::now()) + } + + pub(crate) fn untrack(&mut self) { + self.tracked_time = None; + } + + pub(crate) fn unresponsive(&self) -> bool { + match self.tracked_time { + Some(time) => Instant::now().duration_since(time).as_secs() > FIVE_SEC, + None => false, + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use tokio::time; + + use super::MessageTimer; + + #[tokio::test] + async fn test_timer_works() { + let mut timer = MessageTimer::new(); + assert!(!timer.unresponsive()); + timer.track(); + assert!(!timer.unresponsive()); + timer.untrack(); + assert!(!timer.unresponsive()); + timer.untrack(); + assert!(!timer.unresponsive()); + timer.track(); + assert!(!timer.unresponsive()); + time::sleep(Duration::from_secs(6)).await; + assert!(timer.unresponsive()); + } +} diff --git a/src/peers/peer.rs b/src/peers/peer.rs index 5506a56..80a4897 100644 --- a/src/peers/peer.rs +++ b/src/peers/peer.rs @@ -15,9 +15,13 @@ use crate::{ peers::outbound_messages::V1OutboundMessage, }; -use super::{counter::MessageCounter, reader::Reader, traits::MessageGenerator}; +use super::{ + counter::{MessageCounter, MessageTimer}, + reader::Reader, + traits::MessageGenerator, +}; -const CONNECTION_TIMEOUT: u64 = 3; +const CONNECTION_TIMEOUT: u64 = 2; pub(crate) struct Peer { nonce: u32, @@ -27,6 +31,7 @@ pub(crate) struct Peer { main_thread_recv: Receiver, network: Network, message_counter: MessageCounter, + message_timer: MessageTimer, } impl Peer { @@ -39,6 +44,7 @@ impl Peer { main_thread_recv: Receiver, ) -> Self { let message_counter = MessageCounter::new(); + let message_timer = MessageTimer::new(); Self { nonce, ip_addr, @@ -47,6 +53,7 @@ impl Peer { main_thread_recv, network, message_counter, + message_timer, } } @@ -92,6 +99,9 @@ impl Peer { if self.message_counter.unsolicited() { return Ok(()); } + if self.message_timer.unresponsive() { + return Ok(()); + } select! { // The peer sent us a message peer_message = rx.recv() => { @@ -171,6 +181,7 @@ impl Peer { } PeerMessage::Headers(headers) => { self.message_counter.got_header(); + self.message_timer.untrack(); self.main_thread_sender .send(PeerThreadMessage { nonce: self.nonce, @@ -182,6 +193,7 @@ impl Peer { } PeerMessage::FilterHeaders(cf_headers) => { self.message_counter.got_filter_header(); + self.message_timer.untrack(); self.main_thread_sender .send(PeerThreadMessage { nonce: self.nonce, @@ -193,6 +205,7 @@ impl Peer { } PeerMessage::Filter(filter) => { self.message_counter.got_filter(); + self.message_timer.untrack(); self.main_thread_sender .send(PeerThreadMessage { nonce: self.nonce, @@ -204,6 +217,7 @@ impl Peer { } PeerMessage::Block(block) => { self.message_counter.got_block(); + self.message_timer.untrack(); self.main_thread_sender .send(PeerThreadMessage { nonce: self.nonce, @@ -278,6 +292,7 @@ impl Peer { } MainThreadMessage::GetHeaders(config) => { self.message_counter.sent_header(); + self.message_timer.track(); let message = message_generator.get_headers(config.locators, config.stop_hash); writer .write_all(&message) @@ -286,6 +301,7 @@ impl Peer { } MainThreadMessage::GetFilterHeaders(config) => { self.message_counter.sent_filter_header(); + self.message_timer.track(); let message = message_generator.cf_headers(config); writer .write_all(&message) @@ -294,6 +310,7 @@ impl Peer { } MainThreadMessage::GetFilters(config) => { self.message_counter.sent_filters(); + self.message_timer.track(); let message = message_generator.filters(config); writer .write_all(&message) @@ -302,6 +319,7 @@ impl Peer { } MainThreadMessage::GetBlock(message) => { self.message_counter.sent_block(); + self.message_timer.track(); let message = message_generator.block(message); writer .write_all(&message) From 9c008ac379e5f4bd0bbd4afa8c482eb6a971a6f3 Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Fri, 12 Jul 2024 07:54:55 -1000 Subject: [PATCH 2/2] peer: timeout reader thread --- src/peers/peer.rs | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/src/peers/peer.rs b/src/peers/peer.rs index 80a4897..4fec27d 100644 --- a/src/peers/peer.rs +++ b/src/peers/peer.rs @@ -104,21 +104,23 @@ impl Peer { } select! { // The peer sent us a message - peer_message = rx.recv() => { - match peer_message { - Some(message) => { - match self.handle_peer_message(message, &mut writer, &mut outbound_messages).await { - Ok(()) => continue, - Err(e) => { - match e { - // We were told by the reader thread to disconnect from this peer - PeerError::DisconnectCommand => return Ok(()), - _ => continue, - } - }, - } - }, - None => continue, + peer_message = tokio::time::timeout(Duration::from_secs(CONNECTION_TIMEOUT), rx.recv())=> { + if let Ok(peer_message) = peer_message { + match peer_message { + Some(message) => { + match self.handle_peer_message(message, &mut writer, &mut outbound_messages).await { + Ok(()) => continue, + Err(e) => { + match e { + // We were told by the reader thread to disconnect from this peer + PeerError::DisconnectCommand => return Ok(()), + _ => continue, + } + }, + } + }, + None => continue, + } } } // The main thread sent us a message