-
Notifications
You must be signed in to change notification settings - Fork 1.7k
fix(light/response) : handle bad responses #9756
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,7 @@ | |
//! will take the raw data received here and extract meaningful results from it. | ||
|
||
use std::cmp; | ||
use std::collections::{HashMap, BTreeSet}; | ||
use std::collections::{HashMap, HashSet, BTreeSet}; | ||
use std::marker::PhantomData; | ||
use std::sync::Arc; | ||
|
||
|
@@ -42,6 +42,9 @@ use self::request::CheckedRequest; | |
|
||
pub use self::request::{Request, Response, HeaderRef}; | ||
|
||
use types::request::ResponseError; | ||
use self::request::Error as ValidityError; | ||
|
||
#[cfg(test)] | ||
mod tests; | ||
|
||
|
@@ -69,13 +72,19 @@ pub mod error { | |
} | ||
|
||
errors { | ||
#[doc = "Request was faulty"] | ||
FaultyRequest(req_id: super::ReqId, bad_responses: usize, num_providers: usize) { | ||
description("Faulty request found") | ||
display("The request: {} was determined as faulty, {}/{} peer(s) gave bad response", req_id, bad_responses, num_providers) | ||
} | ||
|
||
#[doc = "Max number of on-demand query attempts reached without result."] | ||
MaxAttemptReach(query_index: usize) { | ||
description("On-demand query limit reached") | ||
display("On-demand query limit reached on query #{}", query_index) | ||
} | ||
|
||
#[doc = "No reply with current peer set, time out occured while waiting for new peers for additional query attempt."] | ||
#[doc = "No reply with current peer set, time out occurred while waiting for new peers for additional query attempt."] | ||
TimeoutOnNewPeers(query_index: usize, remaining_attempts: usize) { | ||
description("Timeout for On-demand query") | ||
display("Timeout for On-demand query; {} query attempts remain for query #{}", remaining_attempts, query_index) | ||
|
@@ -124,6 +133,10 @@ struct Pending { | |
required_capabilities: Capabilities, | ||
responses: Vec<Response>, | ||
sender: oneshot::Sender<PendingResponse>, | ||
/// This will collect how many bad responses we get from each peer per request | ||
// When we get `|bad_responses| > peers / 2` then regard the request as `faulty` | ||
// This, can happen for several reasons such as a request for a hash that doesn't exist | ||
bad_responses: HashSet<PeerId>, | ||
base_query_index: usize, | ||
remaining_query_count: usize, | ||
query_id_history: BTreeSet<PeerId>, | ||
|
@@ -173,8 +186,7 @@ impl Pending { | |
|
||
// supply a response. | ||
fn supply_response(&mut self, cache: &Mutex<Cache>, response: &basic_request::Response) | ||
-> Result<(), basic_request::ResponseError<self::request::Error>> | ||
{ | ||
-> Result<(), basic_request::ResponseError<self::request::Error>> { | ||
match self.requests.supply_response(&cache, response) { | ||
Ok(response) => { | ||
let idx = self.responses.len(); | ||
|
@@ -227,7 +239,15 @@ impl Pending { | |
self.required_capabilities = capabilities; | ||
} | ||
|
||
// returning no reponse, it will result in an error. | ||
fn add_bad_response(&mut self, peer: PeerId) { | ||
self.bad_responses.insert(peer); | ||
} | ||
|
||
fn is_bad_response(&self, total_peers: usize) -> bool { | ||
self.bad_responses.len() > total_peers / 2 | ||
} | ||
|
||
// returning no response, it will result in an error. | ||
// self is consumed on purpose. | ||
fn no_response(self) { | ||
trace!(target: "on_demand", "Dropping a pending query (no reply) at query #{}", self.query_id_history.len()); | ||
|
@@ -237,14 +257,24 @@ impl Pending { | |
} | ||
} | ||
|
||
// returning a peer discovery timeout during query attempts | ||
// Returning a peer discovery timeout during query attempts | ||
fn time_out(self) { | ||
trace!(target: "on_demand", "Dropping a pending query (no new peer time out) at query #{}", self.query_id_history.len()); | ||
let err = self::error::ErrorKind::TimeoutOnNewPeers(self.requests.num_answered(), self.query_id_history.len()); | ||
if self.sender.send(Err(err.into())).is_err() { | ||
debug!(target: "on_demand", "Dropped oneshot channel receiver on time out"); | ||
} | ||
} | ||
|
||
// The given request is determined as faulty and drop it accordingly | ||
fn set_as_faulty_request(self, total_peers: usize, req_id: ReqId) { | ||
let bad_peers = self.bad_responses.len(); | ||
trace!(target: "on_demand", "The request: {} was determined as faulty, {}/{} peer(s) gave bad response", req_id, bad_peers, total_peers); | ||
let err = self::error::ErrorKind::FaultyRequest(req_id, bad_peers, total_peers); | ||
if self.sender.send(Err(err.into())).is_err() { | ||
debug!(target: "on_demand", "Dropped oneshot channel receiver on time out"); | ||
} | ||
} | ||
} | ||
|
||
// helper to guess capabilities required for a given batch of network requests. | ||
|
@@ -403,6 +433,7 @@ impl OnDemand { | |
required_capabilities: capabilities, | ||
responses, | ||
sender, | ||
bad_responses: HashSet::new(), | ||
base_query_index: 0, | ||
remaining_query_count: 0, | ||
query_id_history: BTreeSet::new(), | ||
|
@@ -443,12 +474,15 @@ impl OnDemand { | |
// iterate over all pending requests, and check them for hang-up. | ||
// then, try and find a peer who can serve it. | ||
let peers = self.peers.read(); | ||
*pending = ::std::mem::replace(&mut *pending, Vec::new()).into_iter() | ||
*pending = ::std::mem::replace(&mut *pending, Vec::new()) | ||
.into_iter() | ||
.filter(|pending| !pending.sender.is_canceled()) | ||
.filter_map(|mut pending| { | ||
// the peer we dispatch to is chosen randomly | ||
|
||
let num_peers = peers.len(); | ||
let history_len = pending.query_id_history.len(); | ||
|
||
// The first peer to dispatch the request is chosen at random | ||
let offset = if history_len == 0 { | ||
pending.remaining_query_count = self.base_retry_count; | ||
let rand = rand::random::<usize>(); | ||
|
@@ -457,15 +491,21 @@ impl OnDemand { | |
} else { | ||
pending.base_query_index + history_len | ||
} % cmp::max(num_peers, 1); | ||
|
||
let init_remaining_query_count = pending.remaining_query_count; // to fail in case of big reduction of nb of peers | ||
for (peer_id, peer) in peers.iter().chain(peers.iter()) | ||
.skip(offset).take(num_peers) { | ||
|
||
for (peer_id, peer) in peers | ||
.iter() | ||
.cycle() | ||
.skip(offset) | ||
.take(num_peers) | ||
{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think cycle is clearer in this context and formatted the code to be easier to read |
||
// TODO: see which requests can be answered by the cache? | ||
if pending.remaining_query_count == 0 { | ||
break | ||
} | ||
|
||
if pending.query_id_history.insert(peer_id.clone()) { | ||
if pending.query_id_history.insert(*peer_id) { | ||
|
||
if !peer.can_fulfill(&pending.required_capabilities) { | ||
trace!(target: "on_demand", "Peer {} without required capabilities, skipping, {} remaining attempts", peer_id, pending.remaining_query_count); | ||
|
@@ -588,12 +628,26 @@ impl Handler for OnDemand { | |
} | ||
|
||
fn on_responses(&self, ctx: &EventContext, req_id: ReqId, responses: &[basic_request::Response]) { | ||
// the req_id was not found in the pending request | ||
let mut pending = match self.in_transit.write().remove(&req_id) { | ||
Some(req) => req, | ||
None => return, | ||
}; | ||
|
||
// Handle the case if the response is empty | ||
if responses.is_empty() { | ||
let total_peers = self.peers.read().len(); | ||
|
||
// Register the response as bad for that `peer` | ||
pending.add_bad_response(ctx.peer()); | ||
|
||
// Majority of responses are bad, drop the request | ||
if pending.is_bad_response(total_peers) { | ||
pending.set_as_faulty_request(total_peers, req_id); | ||
return; | ||
} | ||
|
||
// no remaining queries attempts left on the response | ||
if pending.remaining_query_count == 0 { | ||
pending.no_response(); | ||
return; | ||
|
@@ -608,12 +662,53 @@ impl Handler for OnDemand { | |
// 2. pending.requests.supply_response | ||
// 3. if extracted on-demand response, keep it for later. | ||
for response in responses { | ||
if let Err(e) = pending.supply_response(&*self.cache, response) { | ||
let peer = ctx.peer(); | ||
debug!(target: "on_demand", "Peer {} gave bad response: {:?}", peer, e); | ||
ctx.disable_peer(peer); | ||
match pending.supply_response(&*self.cache, response) { | ||
Err(ResponseError::Validity(err)) => { | ||
match err { | ||
// This can be a malformed request or bad response but we can't determine which at the moment! | ||
// Thus, register the response as bad and wait for more responses in order to take a decision | ||
ValidityError::BadProof | ValidityError::Empty => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can get a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure I understand this. Why would it happen now that we have #9591 (unless someone is running node, older than 2.0.7-stable/2.1.2-beta)? And how can we distinguish this from invalid proof? |
||
let peer = ctx.peer(); | ||
trace!(target: "on_demand", "Peer {} gave a potential bad response on req_id: {} - can't determine whether | ||
it was bad request or response yet", peer, req_id); | ||
ordian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
pending.add_bad_response(peer); | ||
} | ||
|
||
break; | ||
// Bad response, punish the peer | ||
ValidityError::Decoder(_) | ||
| ValidityError::HeaderByNumber | ||
| ValidityError::Trie(_) | ||
| ValidityError::TooFewResults(_, _) | ||
| ValidityError::TooManyResults(_, _) | ||
| ValidityError::UnresolvedHeader(_) | ||
| ValidityError::WrongTrieRoot(_, _) | ||
| ValidityError::WrongKind | ||
| ValidityError::WrongNumber(_, _) | ||
| ValidityError::WrongHash(_, _) | ||
| ValidityError::WrongHeaderSequence => { | ||
let peer = ctx.peer(); | ||
pending.add_bad_response(peer); | ||
debug!(target: "on_demand", "Peer {} gave bad response", peer); | ||
ctx.disable_peer(peer); | ||
} | ||
} | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Verbosity, I think it helps to know which errors can occur |
||
Err(ResponseError::Unexpected) => { | ||
let peer = ctx.peer(); | ||
pending.add_bad_response(peer); | ||
debug!(target: "on_demand", "Peer {} gave bad response", peer); | ||
ctx.disable_peer(peer); | ||
} | ||
// `Good` response continue | ||
_ => (), | ||
} | ||
|
||
// Majority of responses are bad, drop the request | ||
let total_peers = self.peers.read().len(); | ||
if pending.is_bad_response(total_peers) { | ||
pending.set_as_faulty_request(total_peers, req_id); | ||
return; | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider renaming this to
faulty_request
orbad_request
to follow the same naming convention astime_out
andno_response
(or rename those).