Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Decrease the peer reputation on invalid block requests #8260

Merged
merged 5 commits into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 118 additions & 22 deletions client/network/src/block_request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,30 @@ use crate::protocol::{message::BlockAttributes};
use crate::request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig};
use crate::schema::v1::block_request::FromBlock;
use crate::schema::v1::{BlockResponse, Direction};
use crate::{PeerId, ReputationChange};
use futures::channel::{mpsc, oneshot};
use futures::stream::StreamExt;
use log::debug;
use lru::LruCache;
use prost::Message;
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{Block as BlockT, Header, One, Zero};
use std::cmp::min;
use std::sync::{Arc};
use std::sync::Arc;
use std::time::Duration;
use std::hash::{Hasher, Hash};

const LOG_TARGET: &str = "block-request-handler";
const LOG_TARGET: &str = "sync";
const MAX_BLOCKS_IN_RESPONSE: usize = 128;
const MAX_BODY_BYTES: usize = 8 * 1024 * 1024;
const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2;

mod rep {
use super::ReputationChange as Rep;

/// Reputation change when a peer sent us the same request multiple times.
pub const SAME_REQUEST: Rep = Rep::new(i32::min_value(), "Same block request multiple times");
Comment on lines +48 to +49
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it so punitive to ask the same thing twice?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still waiting for some answer from the author of the linked issue to see if that fixes the problem. As reported, they assume that someone is spamming them with the same request over and over again.

It is also fine to send the same request twice (or even more times when you wait long enough between the requests), however the same request should not be send multiple times in a short time period. This probably just reveals some bug in the sync code, but I still don't see any problem why we should no restrict this.

}

/// Generates a [`ProtocolConfig`] for the block request protocol, refusing incoming requests.
pub fn generate_protocol_config(protocol_id: &ProtocolId) -> ProtocolConfig {
Expand All @@ -61,15 +72,45 @@ pub(crate) fn generate_protocol_name(protocol_id: &ProtocolId) -> String {
s
}

/// The key for [`BlockRequestHandler::seen_requests`].
#[derive(Eq, PartialEq)]
struct SeenRequestsKey<B: BlockT> {
peer: PeerId,
from: BlockId<B>,
max_blocks: usize,
direction: Direction,
}

impl<B: BlockT> Hash for SeenRequestsKey<B> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.peer.hash(state);
self.max_blocks.hash(state);
self.direction.hash(state);

match self.from {
BlockId::Hash(h) => h.hash(state),
BlockId::Number(n) => n.hash(state),
}
}
}

/// Handler for incoming block requests from a remote peer.
pub struct BlockRequestHandler<B> {
pub struct BlockRequestHandler<B: BlockT> {
client: Arc<dyn Client<B>>,
request_receiver: mpsc::Receiver<IncomingRequest>,
/// Maps from request to number of times we have seen this request.
///
/// This is used to check if a peer is spamming us with the same request.
seen_requests: LruCache<SeenRequestsKey<B>, usize>,
}

impl <B: BlockT> BlockRequestHandler<B> {
impl<B: BlockT> BlockRequestHandler<B> {
/// Create a new [`BlockRequestHandler`].
pub fn new(protocol_id: &ProtocolId, client: Arc<dyn Client<B>>) -> (Self, ProtocolConfig) {
pub fn new(
protocol_id: &ProtocolId,
client: Arc<dyn Client<B>>,
num_peer_hint: usize,
) -> (Self, ProtocolConfig) {
// Rate of arrival multiplied with the waiting time in the queue equals the queue length.
//
// An average Polkadot node serves less than 5 requests per second. The 95th percentile
Expand All @@ -82,29 +123,33 @@ impl <B: BlockT> BlockRequestHandler<B> {
let mut protocol_config = generate_protocol_config(protocol_id);
protocol_config.inbound_queue = Some(tx);

(Self { client, request_receiver }, protocol_config)
let seen_requests = LruCache::new(num_peer_hint * 2);

(Self { client, request_receiver, seen_requests }, protocol_config)
}

/// Run [`BlockRequestHandler`].
pub async fn run(mut self) {
while let Some(request) = self.request_receiver.next().await {
let IncomingRequest { peer, payload, pending_response } = request;

match self.handle_request(payload, pending_response) {
match self.handle_request(payload, pending_response, &peer) {
Ok(()) => debug!(target: LOG_TARGET, "Handled block request from {}.", peer),
Err(e) => debug!(
target: LOG_TARGET,
"Failed to handle block request from {}: {}",
peer, e,
peer,
e,
),
}
}
}

fn handle_request(
&self,
&mut self,
payload: Vec<u8>,
pending_response: oneshot::Sender<OutgoingResponse>
pending_response: oneshot::Sender<OutgoingResponse>,
peer: &PeerId,
) -> Result<(), HandleRequestError> {
let request = crate::schema::v1::BlockRequest::decode(&payload[..])?;

Expand All @@ -127,16 +172,75 @@ impl <B: BlockT> BlockRequestHandler<B> {

let direction = Direction::from_i32(request.direction)
.ok_or(HandleRequestError::ParseDirection)?;

let key = SeenRequestsKey {
peer: *peer,
max_blocks,
direction,
from: from_block_id.clone(),
};

let mut reputation_changes = Vec::new();

if let Some(requests) = self.seen_requests.get_mut(&key) {
*requests = requests.saturating_add(1);

if *requests > MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER {
reputation_changes.push(rep::SAME_REQUEST);
}
} else {
self.seen_requests.put(key, 1);
}

debug!(
target: LOG_TARGET,
"Handling block request from {}: Starting at `{:?}` with maximum blocks \
of `{}` and direction `{:?}`.",
peer,
from_block_id,
max_blocks,
direction,
);

let attributes = BlockAttributes::from_be_u32(request.fields)?;

let result = if reputation_changes.is_empty() {
let block_response = self.get_block_response(
attributes,
from_block_id,
direction,
max_blocks,
)?;

let mut data = Vec::with_capacity(block_response.encoded_len());
block_response.encode(&mut data)?;

Ok(data)
} else {
Err(())
};

pending_response.send(OutgoingResponse {
result,
reputation_changes,
}).map_err(|_| HandleRequestError::SendResponse)
}

fn get_block_response(
&self,
attributes: BlockAttributes,
mut block_id: BlockId<B>,
direction: Direction,
max_blocks: usize,
) -> Result<BlockResponse, HandleRequestError> {
let get_header = attributes.contains(BlockAttributes::HEADER);
let get_body = attributes.contains(BlockAttributes::BODY);
let get_justification = attributes.contains(BlockAttributes::JUSTIFICATION);

let mut blocks = Vec::new();
let mut block_id = from_block_id;

let mut total_size: usize = 0;
while let Some(header) = self.client.header(block_id).unwrap_or(None) {
while let Some(header) = self.client.header(block_id).unwrap_or_default() {
let number = *header.number();
let hash = header.hash();
let parent_hash = *header.parent_hash();
Expand All @@ -153,7 +257,7 @@ impl <B: BlockT> BlockRequestHandler<B> {
.map(|extrinsic| extrinsic.encode())
.collect(),
None => {
log::trace!(target: "sync", "Missing data for block request.");
log::trace!(target: LOG_TARGET, "Missing data for block request.");
break;
}
}
Expand Down Expand Up @@ -195,15 +299,7 @@ impl <B: BlockT> BlockRequestHandler<B> {
}
}

let res = BlockResponse { blocks };

let mut data = Vec::with_capacity(res.encoded_len());
res.encode(&mut data)?;

pending_response.send(OutgoingResponse {
result: Ok(data),
reputation_changes: Vec::new(),
}).map_err(|_| HandleRequestError::SendResponse)
Ok(BlockResponse { blocks })
}
}

Expand Down
1 change: 1 addition & 0 deletions client/network/src/gossip/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ fn build_test_full_node(network_config: config::NetworkConfiguration)
let (handler, protocol_config) = BlockRequestHandler::new(
&protocol_id,
client.clone(),
50,
);
async_std::task::spawn(handler.run().boxed());
protocol_config
Expand Down
1 change: 1 addition & 0 deletions client/network/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ fn build_test_full_node(config: config::NetworkConfiguration)
let (handler, protocol_config) = BlockRequestHandler::new(
&protocol_id,
client.clone(),
50,
);
async_std::task::spawn(handler.run().boxed());
protocol_config
Expand Down
6 changes: 5 additions & 1 deletion client/network/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,11 @@ pub trait TestNetFactory: Sized {
let protocol_id = ProtocolId::from("test-protocol-name");

let block_request_protocol_config = {
let (handler, protocol_config) = BlockRequestHandler::new(&protocol_id, client.clone());
let (handler, protocol_config) = BlockRequestHandler::new(
&protocol_id,
client.clone(),
50,
);
self.spawn_task(handler.run().boxed());
protocol_config
};
Expand Down
2 changes: 2 additions & 0 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,8 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>(
let (handler, protocol_config) = BlockRequestHandler::new(
&protocol_id,
client.clone(),
config.network.default_peers_set.in_peers as usize
+ config.network.default_peers_set.out_peers as usize,
);
spawn_handle.spawn("block_request_handler", handler.run());
protocol_config
Expand Down