Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

sc-consensus-beefy: add peer reputation cost/benefit changes #13881

Merged
merged 6 commits into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 205 additions & 59 deletions client/consensus/beefy/src/communication/gossip.rs

Large diffs are not rendered by default.

33 changes: 33 additions & 0 deletions client/consensus/beefy/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,39 @@ pub fn beefy_peers_set_config(
cfg
}

// cost scalars for reporting peers.
mod cost {
use sc_network::ReputationChange as Rep;
// Message that's for an outdated round.
pub(super) const OUTDATED_MESSAGE: Rep = Rep::new(-50, "BEEFY: Past message");
// Message that's from the future relative to our current set-id.
pub(super) const FUTURE_MESSAGE: Rep = Rep::new(-100, "BEEFY: Future message");
// Vote message containing bad signature.
pub(super) const BAD_SIGNATURE: Rep = Rep::new(-100, "BEEFY: Bad signature");
// Message received with vote from voter not in validator set.
pub(super) const UNKNOWN_VOTER: Rep = Rep::new(-150, "BEEFY: Unknown voter");
// A message received that cannot be evaluated relative to our current state.
pub(super) const OUT_OF_SCOPE_MESSAGE: Rep = Rep::new(-500, "BEEFY: Out-of-scope message");
// Message containing invalid proof.
pub(super) const INVALID_PROOF: Rep = Rep::new(-5000, "BEEFY: Invalid commit");
// Reputation cost per signature checked for invalid proof.
pub(super) const PER_SIGNATURE_CHECKED: i32 = -25;
// Reputation cost per byte for un-decodable message.
pub(super) const PER_UNDECODABLE_BYTE: i32 = -5;
// On-demand request was refused by peer.
pub(super) const REFUSAL_RESPONSE: Rep = Rep::new(-100, "BEEFY: Proof request refused");
// On-demand request for a proof that can't be found in the backend.
pub(super) const UNKOWN_PROOF_REQUEST: Rep = Rep::new(-150, "BEEFY: Unknown proof request");
}

// benefit scalars for reporting peers.
mod benefit {
use sc_network::ReputationChange as Rep;
pub(super) const VOTE_MESSAGE: Rep = Rep::new(100, "BEEFY: Round vote message");
pub(super) const KNOWN_VOTE_MESSAGE: Rep = Rep::new(50, "BEEFY: Known vote");
pub(super) const VALIDATED_PROOF: Rep = Rep::new(100, "BEEFY: Justification");
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
12 changes: 8 additions & 4 deletions client/consensus/beefy/src/communication/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@

//! Logic for keeping track of BEEFY peers.

// TODO (issue #12296): replace this naive peer tracking with generic one that infers data
// from multiple network protocols.

use sc_network::PeerId;
use sc_network::{PeerId, ReputationChange};
use sp_runtime::traits::{Block, NumberFor, Zero};
use std::collections::{HashMap, VecDeque};

/// Report specifying a reputation change for a given peer.
#[derive(Debug, PartialEq)]
pub(crate) struct PeerReport {
pub who: PeerId,
pub cost_benefit: ReputationChange,
}

struct PeerData<B: Block> {
last_voted_on: NumberFor<B>,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ use sp_runtime::traits::Block;
use std::{marker::PhantomData, sync::Arc};

use crate::{
communication::request_response::{
on_demand_justifications_protocol_config, Error, JustificationRequest,
BEEFY_SYNC_LOG_TARGET,
communication::{
cost,
request_response::{
on_demand_justifications_protocol_config, Error, JustificationRequest,
BEEFY_SYNC_LOG_TARGET,
},
},
metric_inc,
metrics::{register_metrics, OnDemandIncomingRequestsMetrics},
Expand Down Expand Up @@ -69,17 +72,20 @@ impl<B: Block> IncomingRequest<B> {
/// Params:
/// - The raw request to decode
/// - Reputation changes to apply for the peer in case decoding fails.
pub fn try_from_raw(
pub fn try_from_raw<F>(
raw: netconfig::IncomingRequest,
reputation_changes: Vec<ReputationChange>,
) -> Result<Self, Error> {
reputation_changes_on_err: F,
) -> Result<Self, Error>
where
F: FnOnce(usize) -> Vec<ReputationChange>,
{
let netconfig::IncomingRequest { payload, peer, pending_response } = raw;
let payload = match JustificationRequest::decode(&mut payload.as_ref()) {
Ok(payload) => payload,
Err(err) => {
let response = netconfig::OutgoingResponse {
result: Err(()),
reputation_changes,
reputation_changes: reputation_changes_on_err(payload.len()),
sent_feedback: None,
};
if let Err(_) = pending_response.send(response) {
Expand Down Expand Up @@ -111,11 +117,11 @@ impl IncomingRequestReceiver {
pub async fn recv<B, F>(&mut self, reputation_changes: F) -> Result<IncomingRequest<B>, Error>
where
B: Block,
F: FnOnce() -> Vec<ReputationChange>,
F: FnOnce(usize) -> Vec<ReputationChange>,
{
let req = match self.raw.next().await {
None => return Err(Error::RequestChannelExhausted),
Some(raw) => IncomingRequest::<B>::try_from_raw(raw, reputation_changes())?,
Some(raw) => IncomingRequest::<B>::try_from_raw(raw, reputation_changes)?,
};
Ok(req)
}
Expand Down Expand Up @@ -159,26 +165,20 @@ where

// Sends back justification response if justification found in client backend.
fn handle_request(&self, request: IncomingRequest<B>) -> Result<(), Error> {
// TODO (issue #12293): validate `request` and change peer reputation for invalid requests.

let maybe_encoded_proof = if let Some(hash) =
self.client.block_hash(request.payload.begin).map_err(Error::Client)?
{
self.client
.justifications(hash)
.map_err(Error::Client)?
.and_then(|justifs| justifs.get(BEEFY_ENGINE_ID).cloned())
// No BEEFY justification present.
.ok_or(())
} else {
Err(())
};

let mut reputation_changes = vec![];
let maybe_encoded_proof = self
.client
.block_hash(request.payload.begin)
.ok()
.flatten()
.and_then(|hash| self.client.justifications(hash).ok().flatten())
.and_then(|justifs| justifs.get(BEEFY_ENGINE_ID).cloned())
.ok_or_else(|| reputation_changes.push(cost::UNKOWN_PROOF_REQUEST));
request
.pending_response
.send(netconfig::OutgoingResponse {
result: maybe_encoded_proof,
reputation_changes: Vec::new(),
reputation_changes,
sent_feedback: None,
})
.map_err(|_| Error::SendResponse)
Expand All @@ -188,7 +188,17 @@ where
pub async fn run(mut self) {
trace!(target: BEEFY_SYNC_LOG_TARGET, "🥩 Running BeefyJustifsRequestHandler");

while let Ok(request) = self.request_receiver.recv(|| vec![]).await {
while let Ok(request) = self
.request_receiver
.recv(|bytes| {
let bytes = bytes.min(i32::MAX as usize) as i32;
vec![ReputationChange::new(
bytes.saturating_mul(cost::PER_UNDECODABLE_BYTE),
"BEEFY: Bad request payload",
)]
})
.await
{
let peer = request.peer;
match self.handle_request(request) {
Ok(()) => {
Expand All @@ -199,8 +209,8 @@ where
)
},
Err(e) => {
// peer reputation changes already applied in `self.handle_request()`
metric_inc!(self, beefy_failed_justification_responses);
// TODO (issue #12293): apply reputation changes here based on error type.
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 Failed to handle BEEFY justification request from {:?}: {}", peer, e,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use codec::{Decode, Encode, Error as CodecError};
use sc_network::{config::RequestResponseConfig, PeerId};
use sp_runtime::traits::{Block, NumberFor};

use crate::communication::beefy_protocol_name::justifications_protocol_name;
use crate::communication::{beefy_protocol_name::justifications_protocol_name, peers::PeerReport};
use incoming_requests_handler::IncomingRequestReceiver;

// 10 seems reasonable, considering justifs are explicitly requested only
Expand Down Expand Up @@ -76,7 +76,7 @@ pub struct JustificationRequest<B: Block> {
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
pub(crate) enum Error {
#[error(transparent)]
Client(#[from] sp_blockchain::Error),

Expand All @@ -99,5 +99,8 @@ pub enum Error {
SendResponse,

#[error("Received invalid response.")]
InvalidResponse,
InvalidResponse(PeerReport),

#[error("Internal error while getting response.")]
ResponseError,
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ use sp_runtime::traits::{Block, NumberFor};
use std::{collections::VecDeque, result::Result, sync::Arc};

use crate::{
communication::request_response::{Error, JustificationRequest, BEEFY_SYNC_LOG_TARGET},
communication::{
benefit, cost,
peers::PeerReport,
request_response::{Error, JustificationRequest, BEEFY_SYNC_LOG_TARGET},
},
justification::{decode_and_verify_finality_proof, BeefyVersionedFinalityProof},
metric_inc,
metrics::{register_metrics, OnDemandOutgoingRequestsMetrics},
Expand All @@ -54,6 +58,16 @@ enum State<B: Block> {
AwaitingResponse(PeerId, RequestInfo<B>, ResponseReceiver),
}

/// Possible engine responses.
pub(crate) enum ResponseInfo<B: Block> {
/// No peer response available yet.
Pending,
/// Valid justification provided alongside peer reputation changes.
ValidProof(BeefyVersionedFinalityProof<B>, PeerReport),
/// No justification yet, only peer reputation changes.
PeerReport(PeerReport),
}

pub struct OnDemandJustificationsEngine<B: Block> {
network: Arc<dyn NetworkRequest + Send + Sync>,
protocol_name: ProtocolName,
Expand Down Expand Up @@ -84,12 +98,10 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
}

fn reset_peers_cache_for_block(&mut self, block: NumberFor<B>) {
// TODO (issue #12296): replace peer selection with generic one that involves all protocols.
self.peers_cache = self.live_peers.lock().further_than(block);
}

fn try_next_peer(&mut self) -> Option<PeerId> {
// TODO (issue #12296): replace peer selection with generic one that involves all protocols.
let live = self.live_peers.lock();
while let Some(peer) = self.peers_cache.pop_front() {
if live.contains(&peer) {
Expand Down Expand Up @@ -159,56 +171,65 @@ impl<B: Block> OnDemandJustificationsEngine<B> {

fn process_response(
&mut self,
peer: PeerId,
peer: &PeerId,
req_info: &RequestInfo<B>,
response: Result<Response, Canceled>,
) -> Result<BeefyVersionedFinalityProof<B>, Error> {
response
.map_err(|e| {
metric_inc!(self, beefy_on_demand_justification_peer_hang_up);
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 for on demand justification #{:?}, peer {:?} hung up: {:?}",
req_info.block,
peer,
e
"🥩 on-demand sc-network channel sender closed, err: {:?}", e
);
Error::InvalidResponse
Error::ResponseError
})?
.map_err(|e| {
metric_inc!(self, beefy_on_demand_justification_peer_error);
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 for on demand justification #{:?}, peer {:?} error: {:?}",
req_info.block,
peer,
e
);
Error::InvalidResponse
match e {
RequestFailure::Refused => {
metric_inc!(self, beefy_on_demand_justification_peer_refused);
let peer_report =
PeerReport { who: *peer, cost_benefit: cost::REFUSAL_RESPONSE };
Error::InvalidResponse(peer_report)
},
_ => {
metric_inc!(self, beefy_on_demand_justification_peer_error);
Error::ResponseError
},
}
})
.and_then(|encoded| {
decode_and_verify_finality_proof::<B>(
&encoded[..],
req_info.block,
&req_info.active_set,
)
.map_err(|e| {
.map_err(|(err, signatures_checked)| {
metric_inc!(self, beefy_on_demand_justification_invalid_proof);
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}",
req_info.block, peer, e
req_info.block, peer, err
);
Error::InvalidResponse
let mut cost = cost::INVALID_PROOF;
cost.value +=
cost::PER_SIGNATURE_CHECKED.saturating_mul(signatures_checked as i32);
Error::InvalidResponse(PeerReport { who: *peer, cost_benefit: cost })
})
})
}

pub async fn next(&mut self) -> Option<BeefyVersionedFinalityProof<B>> {
pub(crate) async fn next(&mut self) -> ResponseInfo<B> {
let (peer, req_info, resp) = match &mut self.state {
State::Idle => {
futures::future::pending::<()>().await;
return None
return ResponseInfo::Pending
},
State::AwaitingResponse(peer, req_info, receiver) => {
let resp = receiver.await;
Expand All @@ -220,8 +241,8 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
self.state = State::Idle;

let block = req_info.block;
self.process_response(peer, &req_info, resp)
.map_err(|_| {
match self.process_response(&peer, &req_info, resp) {
Err(err) => {
// No valid justification received, try next peer in our set.
if let Some(peer) = self.try_next_peer() {
self.request_from_peer(peer, req_info);
Expand All @@ -231,15 +252,22 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
"🥩 ran out of peers to request justif #{:?} from", block
);
}
})
.map(|proof| {
// Report peer based on error type.
if let Error::InvalidResponse(peer_report) = err {
ResponseInfo::PeerReport(peer_report)
} else {
ResponseInfo::Pending
}
},
Ok(proof) => {
metric_inc!(self, beefy_on_demand_justification_good_proof);
debug!(
target: BEEFY_SYNC_LOG_TARGET,
"🥩 received valid on-demand justif #{:?} from {:?}", block, peer
);
proof
})
.ok()
let peer_report = PeerReport { who: peer, cost_benefit: benefit::VALIDATED_PROOF };
ResponseInfo::ValidProof(proof, peer_report)
},
}
}
}
1 change: 1 addition & 0 deletions client/consensus/beefy/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ where
.ok_or_else(|| ImportError("Unknown validator set".to_string()))?;

decode_and_verify_finality_proof::<Block>(&encoded[..], number, &validator_set)
.map_err(|(err, _)| err)
}
}

Expand Down
Loading