Skip to content

Commit

Permalink
Merge pull request #36 from rustaceanrob/reader
Browse files Browse the repository at this point in the history
peer: clean up reader and peer
  • Loading branch information
rustaceanrob authored Jul 13, 2024
2 parents 6f85fff + 52bbf26 commit a3c7701
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 45 deletions.
1 change: 1 addition & 0 deletions src/peers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ mod tests {
use super::MessageTimer;

#[tokio::test]
#[ignore = "time wasting"]
async fn test_timer_works() {
let mut timer = MessageTimer::new();
assert!(!timer.unresponsive());
Expand Down
8 changes: 2 additions & 6 deletions src/peers/outbound_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,12 @@ impl MessageGenerator for V1OutboundMessage {
serialize(&data)
}

fn get_addr(&mut self) -> Vec<u8> {
fn addr(&mut self) -> Vec<u8> {
let data = RawNetworkMessage::new(self.network.magic(), NetworkMessage::GetAddr);
serialize(&data)
}

fn get_headers(
&mut self,
locator_hashes: Vec<BlockHash>,
stop_hash: Option<BlockHash>,
) -> Vec<u8> {
fn headers(&mut self, locator_hashes: Vec<BlockHash>, stop_hash: Option<BlockHash>) -> Vec<u8> {
let msg =
GetHeadersMessage::new(locator_hashes, stop_hash.unwrap_or(BlockHash::all_zeros()));
let data =
Expand Down
8 changes: 5 additions & 3 deletions src/peers/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl Peer {
.write_all(&version_message)
.await
.map_err(|_| PeerError::BufferWrite)?;
self.message_timer.track();
let (reader, mut writer) = stream.into_split();
let (tx, mut rx) = mpsc::channel(32);
let mut peer_reader = Reader::new(reader, tx, self.network);
Expand All @@ -104,7 +105,7 @@ impl Peer {
}
select! {
// The peer sent us a message
peer_message = tokio::time::timeout(Duration::from_secs(CONNECTION_TIMEOUT), rx.recv())=> {
peer_message = tokio::time::timeout(Duration::from_secs(CONNECTION_TIMEOUT), rx.recv()) => {
if let Ok(peer_message) = peer_message {
match peer_message {
Some(message) => {
Expand Down Expand Up @@ -157,6 +158,7 @@ impl Peer {
match message {
PeerMessage::Version(version) => {
self.message_counter.got_version();
self.message_timer.untrack();
self.main_thread_sender
.send(PeerThreadMessage {
nonce: self.nonce,
Expand Down Expand Up @@ -288,14 +290,14 @@ impl Peer {
MainThreadMessage::GetAddr => {
self.message_counter.sent_addrs();
writer
.write_all(&message_generator.get_addr())
.write_all(&message_generator.addr())
.await
.map_err(|_| PeerError::BufferWrite)?;
}
MainThreadMessage::GetHeaders(config) => {
self.message_counter.sent_header();
self.message_timer.track();
let message = message_generator.get_headers(config.locators, config.stop_hash);
let message = message_generator.headers(config.locators, config.stop_hash);
writer
.write_all(&message)
.await
Expand Down
52 changes: 22 additions & 30 deletions src/peers/reader.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
use std::time::SystemTime;
use std::time::UNIX_EPOCH;

use bitcoin::consensus::deserialize;
use bitcoin::consensus::deserialize_partial;
use bitcoin::consensus::Decodable;
Expand All @@ -24,28 +21,21 @@ use crate::node::messages::RejectPayload;

const ONE_MONTH: u64 = 2_500_000;
const ONE_MINUTE: u64 = 60;
// The peer must have sent at least 10 messages to trigger DOS
const MINIMUM_DOS_THRESHOLD: u64 = 10;
// We allow up to 5000 messages per second
const RATE_LIMIT: u64 = 5000;
const MAX_MESSAGE_BYTES: u32 = 1024 * 1024 * 32;
// From Bitcoin Core PR #29575
const MAX_ADDR: usize = 1_000;
const MAX_INV: usize = 50_000;
const MAX_HEADERS: usize = 2_000;

pub(crate) struct Reader {
num_messages: u64,
start_time: u64,
stream: OwnedReadHalf,
tx: Sender<PeerMessage>,
network: Network,
}

impl Reader {
pub fn new(stream: OwnedReadHalf, tx: Sender<PeerMessage>, network: Network) -> Self {
let start_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time went backwards")
.as_secs();
Self {
num_messages: 0,
start_time,
stream,
tx,
network,
Expand All @@ -54,7 +44,7 @@ impl Reader {

pub(crate) async fn read_from_remote(&mut self) -> Result<(), PeerReadError> {
loop {
// v1 headers are 24 bytes
// V1 headers are 24 bytes
let mut message_buf = vec![0_u8; 24];
let _ = self
.stream
Expand All @@ -69,21 +59,9 @@ impl Reader {
return Err(PeerReadError::Deserialization);
}
// Message is too long
if header.length > (1024 * 1024 * 32) as u32 {
if header.length > MAX_MESSAGE_BYTES {
return Err(PeerReadError::Deserialization);
}
// DOS protection
self.num_messages += 1;
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time went backwards")
.as_secs();
let duration = now - self.start_time;
if self.num_messages > MINIMUM_DOS_THRESHOLD
&& self.num_messages.checked_div(duration).unwrap_or(0) > RATE_LIMIT
{
return Err(PeerReadError::TooManyMessages);
}
let mut contents_buf = vec![0_u8; header.length as usize];
let _ = self.stream.read_exact(&mut contents_buf).await.unwrap();
message_buf.extend_from_slice(&contents_buf);
Expand Down Expand Up @@ -111,6 +89,9 @@ fn parse_message(message: &NetworkMessage) -> Option<PeerMessage> {
})),
NetworkMessage::Verack => Some(PeerMessage::Verack),
NetworkMessage::Addr(addresses) => {
if addresses.len() > MAX_ADDR {
return Some(PeerMessage::Disconnect);
}
let addresses: Vec<Address> = addresses
.iter()
.filter(|f| f.1.services.has(ServiceFlags::COMPACT_FILTERS))
Expand All @@ -120,6 +101,9 @@ fn parse_message(message: &NetworkMessage) -> Option<PeerMessage> {
Some(PeerMessage::Addr(addresses))
}
NetworkMessage::Inv(inventory) => {
if inventory.len() > MAX_INV {
return Some(PeerMessage::Disconnect);
}
let mut hashes = Vec::new();
for i in inventory {
match i {
Expand All @@ -142,7 +126,12 @@ fn parse_message(message: &NetworkMessage) -> Option<PeerMessage> {
NetworkMessage::MemPool => None,
NetworkMessage::Tx(_) => None,
NetworkMessage::Block(block) => Some(PeerMessage::Block(block.clone())),
NetworkMessage::Headers(headers) => Some(PeerMessage::Headers(headers.clone())),
NetworkMessage::Headers(headers) => {
if headers.len() > MAX_HEADERS {
return Some(PeerMessage::Disconnect);
}
Some(PeerMessage::Headers(headers.clone()))
}
NetworkMessage::SendHeaders => None,
NetworkMessage::GetAddr => None,
NetworkMessage::Ping(nonce) => Some(PeerMessage::Ping(*nonce)),
Expand Down Expand Up @@ -174,6 +163,9 @@ fn parse_message(message: &NetworkMessage) -> Option<PeerMessage> {
NetworkMessage::FeeFilter(_) => None,
NetworkMessage::WtxidRelay => None,
NetworkMessage::AddrV2(addresses) => {
if addresses.len() > MAX_ADDR {
return Some(PeerMessage::Disconnect);
}
let addresses: Vec<Address> = addresses
.iter()
.filter(|f| f.services.has(ServiceFlags::COMPACT_FILTERS))
Expand Down
8 changes: 2 additions & 6 deletions src/peers/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,9 @@ pub(crate) trait MessageGenerator {

fn verack(&mut self) -> Vec<u8>;

fn get_addr(&mut self) -> Vec<u8>;
fn addr(&mut self) -> Vec<u8>;

fn get_headers(
&mut self,
locator_hashes: Vec<BlockHash>,
stop_hash: Option<BlockHash>,
) -> Vec<u8>;
fn headers(&mut self, locator_hashes: Vec<BlockHash>, stop_hash: Option<BlockHash>) -> Vec<u8>;

fn cf_headers(&mut self, message: GetCFHeaders) -> Vec<u8>;

Expand Down

0 comments on commit a3c7701

Please sign in to comment.