Skip to content

Commit

Permalink
Merge pull request #35 from rustaceanrob/timer
Browse files Browse the repository at this point in the history
peer: add timer for high latency
  • Loading branch information
rustaceanrob authored Jul 12, 2024
2 parents 23492a7 + 9c008ac commit 6f85fff
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 22 deletions.
10 changes: 5 additions & 5 deletions CHECKLIST.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@
- [ ] Set up "timer"
- [x] Check for DOS
- [x] Message counter
- [ ] `Ping` if peer has not been heard from
- [ ] `Disconnect` peers with high latency
- [ ] `Ping` if peer has not been heard from (Probably better to just disconnect)
- [x] `Disconnect` peers with high latency (If we send a critical message and a peer doesn't respond in 5 seconds, disconnect)
- [ ] Add BIP-324 with V1 fallback

#### Transaction Broadcaster
Expand All @@ -105,7 +105,7 @@
- [ ] Chain
- [x] Usual extend
- [x] Fork with less work
- [ ] Orphaned fork
- [x] Orphaned fork
- [x] Fork with equal work
- [x] Fork with more work
- [ ] CF header chain
Expand All @@ -128,11 +128,11 @@
- [x] Valid transaction broadcast
- [x] Live fork
- [x] Fork with SQL
- [x] Fork with a stale anchor checkpoint start
- [x] Fork with a stale anchor checkpoint start
- [x] Depth two fork, SQL
- [ ] CI
- [x] MacOS, Windows, Linux
- [x] 1.63, stable, beta, nightly
- [x] Format and clippy
- [ ] Regtest sync with Bitcoin Core
- [x] On PR
- [x] On PR
56 changes: 56 additions & 0 deletions src/peers/counter.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
use tokio::time::Instant;

const FIVE_SEC: u64 = 5;

// Very simple denial of service protection so a peer cannot spam us with unsolicited messages.
#[derive(Debug, Clone)]
pub(crate) struct MessageCounter {
Expand Down Expand Up @@ -92,3 +96,55 @@ impl MessageCounter {
|| self.tx < 0
}
}

//
#[derive(Debug, Clone)]
pub(crate) struct MessageTimer {
tracked_time: Option<Instant>,
}

impl MessageTimer {
pub(crate) fn new() -> Self {
Self { tracked_time: None }
}

pub(crate) fn track(&mut self) {
self.tracked_time = Some(Instant::now())
}

pub(crate) fn untrack(&mut self) {
self.tracked_time = None;
}

pub(crate) fn unresponsive(&self) -> bool {
match self.tracked_time {
Some(time) => Instant::now().duration_since(time).as_secs() > FIVE_SEC,
None => false,
}
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use tokio::time;

use super::MessageTimer;

#[tokio::test]
async fn test_timer_works() {
let mut timer = MessageTimer::new();
assert!(!timer.unresponsive());
timer.track();
assert!(!timer.unresponsive());
timer.untrack();
assert!(!timer.unresponsive());
timer.untrack();
assert!(!timer.unresponsive());
timer.track();
assert!(!timer.unresponsive());
time::sleep(Duration::from_secs(6)).await;
assert!(timer.unresponsive());
}
}
54 changes: 37 additions & 17 deletions src/peers/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ use crate::{
peers::outbound_messages::V1OutboundMessage,
};

use super::{counter::MessageCounter, reader::Reader, traits::MessageGenerator};
use super::{
counter::{MessageCounter, MessageTimer},
reader::Reader,
traits::MessageGenerator,
};

const CONNECTION_TIMEOUT: u64 = 3;
const CONNECTION_TIMEOUT: u64 = 2;

pub(crate) struct Peer {
nonce: u32,
Expand All @@ -27,6 +31,7 @@ pub(crate) struct Peer {
main_thread_recv: Receiver<MainThreadMessage>,
network: Network,
message_counter: MessageCounter,
message_timer: MessageTimer,
}

impl Peer {
Expand All @@ -39,6 +44,7 @@ impl Peer {
main_thread_recv: Receiver<MainThreadMessage>,
) -> Self {
let message_counter = MessageCounter::new();
let message_timer = MessageTimer::new();
Self {
nonce,
ip_addr,
Expand All @@ -47,6 +53,7 @@ impl Peer {
main_thread_recv,
network,
message_counter,
message_timer,
}
}

Expand Down Expand Up @@ -92,23 +99,28 @@ impl Peer {
if self.message_counter.unsolicited() {
return Ok(());
}
if self.message_timer.unresponsive() {
return Ok(());
}
select! {
// The peer sent us a message
peer_message = rx.recv() => {
match peer_message {
Some(message) => {
match self.handle_peer_message(message, &mut writer, &mut outbound_messages).await {
Ok(()) => continue,
Err(e) => {
match e {
// We were told by the reader thread to disconnect from this peer
PeerError::DisconnectCommand => return Ok(()),
_ => continue,
}
},
}
},
None => continue,
peer_message = tokio::time::timeout(Duration::from_secs(CONNECTION_TIMEOUT), rx.recv())=> {
if let Ok(peer_message) = peer_message {
match peer_message {
Some(message) => {
match self.handle_peer_message(message, &mut writer, &mut outbound_messages).await {
Ok(()) => continue,
Err(e) => {
match e {
// We were told by the reader thread to disconnect from this peer
PeerError::DisconnectCommand => return Ok(()),
_ => continue,
}
},
}
},
None => continue,
}
}
}
// The main thread sent us a message
Expand Down Expand Up @@ -171,6 +183,7 @@ impl Peer {
}
PeerMessage::Headers(headers) => {
self.message_counter.got_header();
self.message_timer.untrack();
self.main_thread_sender
.send(PeerThreadMessage {
nonce: self.nonce,
Expand All @@ -182,6 +195,7 @@ impl Peer {
}
PeerMessage::FilterHeaders(cf_headers) => {
self.message_counter.got_filter_header();
self.message_timer.untrack();
self.main_thread_sender
.send(PeerThreadMessage {
nonce: self.nonce,
Expand All @@ -193,6 +207,7 @@ impl Peer {
}
PeerMessage::Filter(filter) => {
self.message_counter.got_filter();
self.message_timer.untrack();
self.main_thread_sender
.send(PeerThreadMessage {
nonce: self.nonce,
Expand All @@ -204,6 +219,7 @@ impl Peer {
}
PeerMessage::Block(block) => {
self.message_counter.got_block();
self.message_timer.untrack();
self.main_thread_sender
.send(PeerThreadMessage {
nonce: self.nonce,
Expand Down Expand Up @@ -278,6 +294,7 @@ impl Peer {
}
MainThreadMessage::GetHeaders(config) => {
self.message_counter.sent_header();
self.message_timer.track();
let message = message_generator.get_headers(config.locators, config.stop_hash);
writer
.write_all(&message)
Expand All @@ -286,6 +303,7 @@ impl Peer {
}
MainThreadMessage::GetFilterHeaders(config) => {
self.message_counter.sent_filter_header();
self.message_timer.track();
let message = message_generator.cf_headers(config);
writer
.write_all(&message)
Expand All @@ -294,6 +312,7 @@ impl Peer {
}
MainThreadMessage::GetFilters(config) => {
self.message_counter.sent_filters();
self.message_timer.track();
let message = message_generator.filters(config);
writer
.write_all(&message)
Expand All @@ -302,6 +321,7 @@ impl Peer {
}
MainThreadMessage::GetBlock(message) => {
self.message_counter.sent_block();
self.message_timer.track();
let message = message_generator.block(message);
writer
.write_all(&message)
Expand Down

0 comments on commit 6f85fff

Please sign in to comment.