Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

If we stay long without recieving any message, send out a ping #101

Merged
merged 1 commit into from
Nov 29, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 50 additions & 11 deletions crates/floresta-wire/src/p2p_wire/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,19 @@ use bitcoin::{
BlockHash, BlockHeader, Network, Transaction,
};
use futures::{AsyncRead, AsyncWrite, AsyncWriteExt, FutureExt};
use log::{debug, error, warn};
use log::{error, warn};
use std::{
fmt::Debug,
sync::Arc,
time::{Duration, Instant},
};

/// If we send a ping, and our peer takes more than PING_TIMEOUT to
/// reply, disconnect.
const PING_TIMEOUT: u64 = 30;
/// If the last message we've got was more than XX, send out a ping
const SEND_PING_TIMEOUT: u64 = 60;

#[derive(Debug, PartialEq)]
enum State {
None,
Expand Down Expand Up @@ -65,8 +71,9 @@ pub struct Peer<T: Transport> {
user_agent: String,
messages: u64,
start_time: Instant,
last_message: Instant,
current_best_block: i32,
last_ping: Instant,
last_ping: Option<Instant>,
id: u32,
node_tx: Sender<NodeNotification>,
state: State,
Expand All @@ -75,6 +82,7 @@ pub struct Peer<T: Transport> {
address_id: usize,
feeler: bool,
wants_addrv2: bool,
shutdown: bool,
}
#[derive(Debug, Error)]
pub enum PeerError {
Expand All @@ -92,6 +100,8 @@ pub enum PeerError {
MagicBitsMismatch,
#[error("Peer sent us too many message in a short period of time")]
TooManyMessages,
#[error("Peer timed a ping out")]
Timeout,
}
impl Debug for Peer<TcpStream> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand All @@ -105,7 +115,7 @@ type Result<T> = std::result::Result<T, PeerError>;
impl<T: Transport> Peer<T> {
pub async fn read_loop(mut self) -> Result<()> {
let err = self.peer_loop_inner().await;
debug!("Peer connection loop closed: {err:?}");
warn!("Peer connection loop closed: {err:?}");
self.send_to_node(PeerMessages::Disconnected(self.address_id))
.await;
Ok(())
Expand Down Expand Up @@ -133,18 +143,40 @@ impl<T: Transport> Peer<T> {
}
}
};
if self.shutdown {
return Ok(());
}
// If we send a ping and our peer doesn't respond in time, disconnect
if let Some(when) = self.last_ping {
if when.elapsed().as_secs() > PING_TIMEOUT {
return Err(PeerError::Timeout);
}
}

// Send a ping to check if this peer is still good
let last_message = self.last_message.elapsed().as_secs();
if last_message > SEND_PING_TIMEOUT {
let nonce = rand::random();
self.last_ping = Some(Instant::now());
self.write(NetworkMessage::Ping(nonce)).await?;
}

// divide the number of messages by the number of seconds we've been connected,
// if it's more than 100 msg/sec, this peer is sending us too many messages, and we should
// disconnect.
if self.messages > 0
&& self.messages / Instant::now().duration_since(self.start_time).as_secs() > 10
{
let msg_sec = self
.messages
.checked_div(Instant::now().duration_since(self.start_time).as_secs())
.unwrap_or(0);

if msg_sec > 10 {
error!(
"Peer {} is sending us too many messages, disconnecting",
self.id
);
return Err(PeerError::TooManyMessages);
}

if let State::SentVersion(when) = self.state {
if Instant::now().duration_since(when) > Duration::from_secs(10) {
return Err(PeerError::UnexpectedMessage);
Expand Down Expand Up @@ -181,6 +213,7 @@ impl<T: Transport> Peer<T> {
.await;
}
NodeRequest::Shutdown => {
self.shutdown = true;
let _ = self.stream.shutdown();
}
NodeRequest::GetAddresses => {
Expand All @@ -201,6 +234,8 @@ impl<T: Transport> Peer<T> {
Ok(())
}
pub async fn handle_peer_message(&mut self, message: RawNetworkMessage) -> Result<()> {
self.last_message = Instant::now();

match self.state {
State::Connected => match message.payload {
NetworkMessage::Inv(inv) => {
Expand Down Expand Up @@ -262,10 +297,12 @@ impl<T: Transport> Peer<T> {
self.wants_addrv2 = true;
self.write(NetworkMessage::SendAddrV2).await?;
}
NetworkMessage::Pong(_) => {
self.last_ping = None;
}
NetworkMessage::Unknown { command, payload } => {
warn!("Unknown message: {} {:?}", command, payload);
}

// Explicitly ignore these messages, if something changes in the future
// this would cause a compile error.
NetworkMessage::Verack
Expand All @@ -288,7 +325,6 @@ impl<T: Transport> Peer<T> {
| NetworkMessage::GetCFilters(_)
| NetworkMessage::MemPool
| NetworkMessage::MerkleBlock(_)
| NetworkMessage::Pong(_)
| NetworkMessage::SendCmpct(_) => {}
},
State::None | State::SentVersion(_) => match message.payload {
Expand Down Expand Up @@ -381,7 +417,8 @@ impl<T: Transport> Peer<T> {
current_best_block: -1,
id,
mempool,
last_ping: Instant::now(),
last_ping: None,
last_message: Instant::now(),
network,
node_tx,
services: ServiceFlags::NONE,
Expand All @@ -394,6 +431,7 @@ impl<T: Transport> Peer<T> {
node_requests,
feeler,
wants_addrv2: false,
shutdown: false,
};
spawn(peer.read_loop());
}
Expand Down Expand Up @@ -426,7 +464,8 @@ impl<T: Transport> Peer<T> {
current_best_block: -1,
id,
mempool,
last_ping: Instant::now(),
last_ping: None,
last_message: Instant::now(),
network,
node_tx,
services: ServiceFlags::NONE,
Expand All @@ -439,11 +478,11 @@ impl<T: Transport> Peer<T> {
node_requests,
feeler,
wants_addrv2: false,
shutdown: false,
};
spawn(peer.read_loop());
}
async fn handle_ping(&mut self, nonce: u64) -> Result<()> {
self.last_ping = Instant::now();
let pong = make_pong(nonce);
self.write(pong).await
}
Expand Down
Loading