Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

peer: clean up reader and peer #36

Merged
merged 1 commit into from
Jul 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/peers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ mod tests {
use super::MessageTimer;

#[tokio::test]
#[ignore = "time wasting"]
async fn test_timer_works() {
let mut timer = MessageTimer::new();
assert!(!timer.unresponsive());
Expand Down
8 changes: 2 additions & 6 deletions src/peers/outbound_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,12 @@ impl MessageGenerator for V1OutboundMessage {
serialize(&data)
}

fn get_addr(&mut self) -> Vec<u8> {
fn addr(&mut self) -> Vec<u8> {
let data = RawNetworkMessage::new(self.network.magic(), NetworkMessage::GetAddr);
serialize(&data)
}

fn get_headers(
&mut self,
locator_hashes: Vec<BlockHash>,
stop_hash: Option<BlockHash>,
) -> Vec<u8> {
fn headers(&mut self, locator_hashes: Vec<BlockHash>, stop_hash: Option<BlockHash>) -> Vec<u8> {
let msg =
GetHeadersMessage::new(locator_hashes, stop_hash.unwrap_or(BlockHash::all_zeros()));
let data =
Expand Down
8 changes: 5 additions & 3 deletions src/peers/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl Peer {
.write_all(&version_message)
.await
.map_err(|_| PeerError::BufferWrite)?;
self.message_timer.track();
let (reader, mut writer) = stream.into_split();
let (tx, mut rx) = mpsc::channel(32);
let mut peer_reader = Reader::new(reader, tx, self.network);
Expand All @@ -104,7 +105,7 @@ impl Peer {
}
select! {
// The peer sent us a message
peer_message = tokio::time::timeout(Duration::from_secs(CONNECTION_TIMEOUT), rx.recv())=> {
peer_message = tokio::time::timeout(Duration::from_secs(CONNECTION_TIMEOUT), rx.recv()) => {
if let Ok(peer_message) = peer_message {
match peer_message {
Some(message) => {
Expand Down Expand Up @@ -157,6 +158,7 @@ impl Peer {
match message {
PeerMessage::Version(version) => {
self.message_counter.got_version();
self.message_timer.untrack();
self.main_thread_sender
.send(PeerThreadMessage {
nonce: self.nonce,
Expand Down Expand Up @@ -288,14 +290,14 @@ impl Peer {
MainThreadMessage::GetAddr => {
self.message_counter.sent_addrs();
writer
.write_all(&message_generator.get_addr())
.write_all(&message_generator.addr())
.await
.map_err(|_| PeerError::BufferWrite)?;
}
MainThreadMessage::GetHeaders(config) => {
self.message_counter.sent_header();
self.message_timer.track();
let message = message_generator.get_headers(config.locators, config.stop_hash);
let message = message_generator.headers(config.locators, config.stop_hash);
writer
.write_all(&message)
.await
Expand Down
52 changes: 22 additions & 30 deletions src/peers/reader.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
use std::time::SystemTime;
use std::time::UNIX_EPOCH;

use bitcoin::consensus::deserialize;
use bitcoin::consensus::deserialize_partial;
use bitcoin::consensus::Decodable;
Expand All @@ -24,28 +21,21 @@ use crate::node::messages::RejectPayload;

const ONE_MONTH: u64 = 2_500_000;
const ONE_MINUTE: u64 = 60;
// The peer must have sent at least 10 messages to trigger DOS
const MINIMUM_DOS_THRESHOLD: u64 = 10;
// We allow up to 5000 messages per second
const RATE_LIMIT: u64 = 5000;
const MAX_MESSAGE_BYTES: u32 = 1024 * 1024 * 32;
// From Bitcoin Core PR #29575
const MAX_ADDR: usize = 1_000;
const MAX_INV: usize = 50_000;
const MAX_HEADERS: usize = 2_000;

pub(crate) struct Reader {
num_messages: u64,
start_time: u64,
stream: OwnedReadHalf,
tx: Sender<PeerMessage>,
network: Network,
}

impl Reader {
pub fn new(stream: OwnedReadHalf, tx: Sender<PeerMessage>, network: Network) -> Self {
let start_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time went backwards")
.as_secs();
Self {
num_messages: 0,
start_time,
stream,
tx,
network,
Expand All @@ -54,7 +44,7 @@ impl Reader {

pub(crate) async fn read_from_remote(&mut self) -> Result<(), PeerReadError> {
loop {
// v1 headers are 24 bytes
// V1 headers are 24 bytes
let mut message_buf = vec![0_u8; 24];
let _ = self
.stream
Expand All @@ -69,21 +59,9 @@ impl Reader {
return Err(PeerReadError::Deserialization);
}
// Message is too long
if header.length > (1024 * 1024 * 32) as u32 {
if header.length > MAX_MESSAGE_BYTES {
return Err(PeerReadError::Deserialization);
}
// DOS protection
self.num_messages += 1;
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time went backwards")
.as_secs();
let duration = now - self.start_time;
if self.num_messages > MINIMUM_DOS_THRESHOLD
&& self.num_messages.checked_div(duration).unwrap_or(0) > RATE_LIMIT
{
return Err(PeerReadError::TooManyMessages);
}
let mut contents_buf = vec![0_u8; header.length as usize];
let _ = self.stream.read_exact(&mut contents_buf).await.unwrap();
message_buf.extend_from_slice(&contents_buf);
Expand Down Expand Up @@ -111,6 +89,9 @@ fn parse_message(message: &NetworkMessage) -> Option<PeerMessage> {
})),
NetworkMessage::Verack => Some(PeerMessage::Verack),
NetworkMessage::Addr(addresses) => {
if addresses.len() > MAX_ADDR {
return Some(PeerMessage::Disconnect);
}
let addresses: Vec<Address> = addresses
.iter()
.filter(|f| f.1.services.has(ServiceFlags::COMPACT_FILTERS))
Expand All @@ -120,6 +101,9 @@ fn parse_message(message: &NetworkMessage) -> Option<PeerMessage> {
Some(PeerMessage::Addr(addresses))
}
NetworkMessage::Inv(inventory) => {
if inventory.len() > MAX_INV {
return Some(PeerMessage::Disconnect);
}
let mut hashes = Vec::new();
for i in inventory {
match i {
Expand All @@ -142,7 +126,12 @@ fn parse_message(message: &NetworkMessage) -> Option<PeerMessage> {
NetworkMessage::MemPool => None,
NetworkMessage::Tx(_) => None,
NetworkMessage::Block(block) => Some(PeerMessage::Block(block.clone())),
NetworkMessage::Headers(headers) => Some(PeerMessage::Headers(headers.clone())),
NetworkMessage::Headers(headers) => {
if headers.len() > MAX_HEADERS {
return Some(PeerMessage::Disconnect);
}
Some(PeerMessage::Headers(headers.clone()))
}
NetworkMessage::SendHeaders => None,
NetworkMessage::GetAddr => None,
NetworkMessage::Ping(nonce) => Some(PeerMessage::Ping(*nonce)),
Expand Down Expand Up @@ -174,6 +163,9 @@ fn parse_message(message: &NetworkMessage) -> Option<PeerMessage> {
NetworkMessage::FeeFilter(_) => None,
NetworkMessage::WtxidRelay => None,
NetworkMessage::AddrV2(addresses) => {
if addresses.len() > MAX_ADDR {
return Some(PeerMessage::Disconnect);
}
let addresses: Vec<Address> = addresses
.iter()
.filter(|f| f.services.has(ServiceFlags::COMPACT_FILTERS))
Expand Down
8 changes: 2 additions & 6 deletions src/peers/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,9 @@ pub(crate) trait MessageGenerator {

fn verack(&mut self) -> Vec<u8>;

fn get_addr(&mut self) -> Vec<u8>;
fn addr(&mut self) -> Vec<u8>;

fn get_headers(
&mut self,
locator_hashes: Vec<BlockHash>,
stop_hash: Option<BlockHash>,
) -> Vec<u8>;
fn headers(&mut self, locator_hashes: Vec<BlockHash>, stop_hash: Option<BlockHash>) -> Vec<u8>;

fn cf_headers(&mut self, message: GetCFHeaders) -> Vec<u8>;

Expand Down
Loading