Skip to content

Commit

Permalink
client/beefy: fix on-demand justifications sync for old blocks (parit…
Browse files Browse the repository at this point in the history
…ytech#12767)

* client/beefy: fix on-demand justif sync for old blocks

When receiving BEEFY justifications for old blocks the state might
be pruned for them, in which case justification verification fails
because BEEFY validator set cannot be retrieved from runtime state.

Fix this by having the voter give the validator set to the
`OnDemandJustificationsEngine` as request information. On receiving
a BEEFY justification for requested block, the provided validator
set will be used to validate the justification.

Signed-off-by: acatangiu <adrian@parity.io>

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* impl review suggestions

* client/beefy: fail initialization if state unavailable

* beefy: remove spammy log

Signed-off-by: acatangiu <adrian@parity.io>
Co-authored-by: parity-processbot <>
Co-authored-by: Bastian Köcher <git@kchr.de>
  • Loading branch information
2 people authored and ltfschoen committed Feb 22, 2023
1 parent 586f098 commit 29d2174
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,17 @@

//! Generating request logic for request/response protocol for syncing BEEFY justifications.
use beefy_primitives::{crypto::AuthorityId, BeefyApi, ValidatorSet};
use beefy_primitives::{crypto::AuthorityId, ValidatorSet};
use codec::Encode;
use futures::channel::{oneshot, oneshot::Canceled};
use log::{debug, error, warn};
use log::{debug, warn};
use parking_lot::Mutex;
use sc_network::{PeerId, ProtocolName};
use sc_network_common::{
request_responses::{IfDisconnected, RequestFailure},
service::NetworkRequest,
};
use sp_api::ProvideRuntimeApi;
use sp_runtime::{
generic::BlockId,
traits::{Block, NumberFor},
};
use sp_runtime::traits::{Block, NumberFor};
use std::{collections::VecDeque, result::Result, sync::Arc};

use crate::{
Expand All @@ -46,14 +42,19 @@ type Response = Result<Vec<u8>, RequestFailure>;
/// Used to receive a response from the network.
type ResponseReceiver = oneshot::Receiver<Response>;

#[derive(Clone, Debug)]
struct RequestInfo<B: Block> {
block: NumberFor<B>,
active_set: ValidatorSet<AuthorityId>,
}

enum State<B: Block> {
Idle,
AwaitingResponse(PeerId, NumberFor<B>, ResponseReceiver),
AwaitingResponse(PeerId, RequestInfo<B>, ResponseReceiver),
}

pub struct OnDemandJustificationsEngine<B: Block, R> {
pub struct OnDemandJustificationsEngine<B: Block> {
network: Arc<dyn NetworkRequest + Send + Sync>,
runtime: Arc<R>,
protocol_name: ProtocolName,

live_peers: Arc<Mutex<KnownPeers<B>>>,
Expand All @@ -62,21 +63,14 @@ pub struct OnDemandJustificationsEngine<B: Block, R> {
state: State<B>,
}

impl<B, R> OnDemandJustificationsEngine<B, R>
where
B: Block,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B>,
{
impl<B: Block> OnDemandJustificationsEngine<B> {
pub fn new(
network: Arc<dyn NetworkRequest + Send + Sync>,
runtime: Arc<R>,
protocol_name: ProtocolName,
live_peers: Arc<Mutex<KnownPeers<B>>>,
) -> Self {
Self {
network,
runtime,
protocol_name,
live_peers,
peers_cache: VecDeque::new(),
Expand All @@ -100,10 +94,15 @@ where
None
}

fn request_from_peer(&mut self, peer: PeerId, block: NumberFor<B>) {
debug!(target: "beefy::sync", "🥩 requesting justif #{:?} from peer {:?}", block, peer);
fn request_from_peer(&mut self, peer: PeerId, req_info: RequestInfo<B>) {
debug!(
target: "beefy::sync",
"🥩 requesting justif #{:?} from peer {:?}",
req_info.block,
peer,
);

let payload = JustificationRequest::<B> { begin: block }.encode();
let payload = JustificationRequest::<B> { begin: req_info.block }.encode();

let (tx, rx) = oneshot::channel();

Expand All @@ -115,11 +114,13 @@ where
IfDisconnected::ImmediateError,
);

self.state = State::AwaitingResponse(peer, block, rx);
self.state = State::AwaitingResponse(peer, req_info, rx);
}

/// If no other request is in progress, start new justification request for `block`.
pub fn request(&mut self, block: NumberFor<B>) {
/// Start new justification request for `block`, if no other request is in progress.
///
/// `active_set` will be used to verify validity of potential responses.
pub fn request(&mut self, block: NumberFor<B>, active_set: ValidatorSet<AuthorityId>) {
// ignore new requests while there's already one pending
if matches!(self.state, State::AwaitingResponse(_, _, _)) {
return
Expand All @@ -129,7 +130,7 @@ where
// Start the requests engine - each unsuccessful received response will automatically
// trigger a new request to the next peer in the `peers_cache` until there are none left.
if let Some(peer) = self.try_next_peer() {
self.request_from_peer(peer, block);
self.request_from_peer(peer, RequestInfo { block, active_set });
} else {
debug!(target: "beefy::sync", "🥩 no good peers to request justif #{:?} from", block);
}
Expand All @@ -138,11 +139,10 @@ where
/// Cancel any pending request for block numbers smaller or equal to `block`.
pub fn cancel_requests_older_than(&mut self, block: NumberFor<B>) {
match &self.state {
State::AwaitingResponse(_, number, _) if *number <= block => {
State::AwaitingResponse(_, req_info, _) if req_info.block <= block => {
debug!(
target: "beefy::sync",
"🥩 cancel pending request for justification #{:?}",
number
target: "beefy::sync", "🥩 cancel pending request for justification #{:?}",
req_info.block
);
self.state = State::Idle;
},
Expand All @@ -153,77 +153,65 @@ where
fn process_response(
&mut self,
peer: PeerId,
block: NumberFor<B>,
validator_set: &ValidatorSet<AuthorityId>,
req_info: &RequestInfo<B>,
response: Result<Response, Canceled>,
) -> Result<BeefyVersionedFinalityProof<B>, Error> {
response
.map_err(|e| {
debug!(
target: "beefy::sync",
"🥩 for on demand justification #{:?}, peer {:?} hung up: {:?}",
block, peer, e
req_info.block, peer, e
);
Error::InvalidResponse
})?
.map_err(|e| {
debug!(
target: "beefy::sync",
"🥩 for on demand justification #{:?}, peer {:?} error: {:?}",
block, peer, e
req_info.block, peer, e
);
Error::InvalidResponse
})
.and_then(|encoded| {
decode_and_verify_finality_proof::<B>(&encoded[..], block, &validator_set).map_err(
|e| {
debug!(
target: "beefy::sync",
"🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}",
block, peer, e
);
Error::InvalidResponse
},
decode_and_verify_finality_proof::<B>(
&encoded[..],
req_info.block,
&req_info.active_set,
)
.map_err(|e| {
debug!(
target: "beefy::sync",
"🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}",
req_info.block, peer, e
);
Error::InvalidResponse
})
})
}

pub async fn next(&mut self) -> Option<BeefyVersionedFinalityProof<B>> {
let (peer, block, resp) = match &mut self.state {
let (peer, req_info, resp) = match &mut self.state {
State::Idle => {
futures::pending!();
// Doesn't happen as 'futures::pending!()' is an 'await' barrier that never passes.
return None
},
State::AwaitingResponse(peer, block, receiver) => {
State::AwaitingResponse(peer, req_info, receiver) => {
let resp = receiver.await;
(*peer, *block, resp)
(*peer, req_info.clone(), resp)
},
};
// We received the awaited response. Our 'receiver' will never generate any other response,
// meaning we're done with current state. Move the engine to `State::Idle`.
self.state = State::Idle;

let block_id = BlockId::number(block);
let validator_set = self
.runtime
.runtime_api()
.validator_set(&block_id)
.map_err(|e| {
error!(target: "beefy::sync", "🥩 Runtime API error {:?} in on-demand justif engine.", e);
e
})
.ok()?
.or_else(|| {
error!(target: "beefy::sync", "🥩 BEEFY pallet not available for block {:?}.", block);
None
})?;

self.process_response(peer, block, &validator_set, resp)
let block = req_info.block;
self.process_response(peer, &req_info, resp)
.map_err(|_| {
// No valid justification received, try next peer in our set.
if let Some(peer) = self.try_next_peer() {
self.request_from_peer(peer, block);
self.request_from_peer(peer, req_info);
} else {
warn!(target: "beefy::sync", "🥩 ran out of peers to request justif #{:?} from", block);
}
Expand Down
29 changes: 16 additions & 13 deletions client/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ where
// The `GossipValidator` adds and removes known peers based on valid votes and network events.
let on_demand_justifications = OnDemandJustificationsEngine::new(
network.clone(),
runtime.clone(),
justifications_protocol_name,
known_peers,
);
Expand Down Expand Up @@ -295,7 +294,7 @@ where
persisted_state,
};

let worker = worker::BeefyWorker::<_, _, _, _, _>::new(worker_params);
let worker = worker::BeefyWorker::<_, _, _, _>::new(worker_params);

futures::future::join(
worker.run(block_import_justif, finality_notifications),
Expand Down Expand Up @@ -377,17 +376,8 @@ where
break state
}

// Check if we should move up the chain.
let parent_hash = *header.parent_hash();
if *header.number() == One::one() ||
runtime
.runtime_api()
.validator_set(&BlockId::hash(parent_hash))
.ok()
.flatten()
.is_none()
{
// We've reached pallet genesis, initialize voter here.
if *header.number() == One::one() {
// We've reached chain genesis, initialize voter here.
let genesis_num = *header.number();
let genesis_set = expect_validator_set(runtime, BlockId::hash(header.hash()))
.and_then(genesis_set_sanity_check)?;
Expand All @@ -408,6 +398,19 @@ where
sessions.push_front(Rounds::new(*header.number(), active));
}

// Check if state is still available if we move up the chain.
let parent_hash = *header.parent_hash();
runtime
.runtime_api()
.validator_set(&BlockId::hash(parent_hash))
.ok()
.flatten()
.ok_or_else(|| {
let msg = format!("{}. Could not initialize BEEFY voter.", parent_hash);
error!(target: "beefy", "🥩 {}", msg);
ClientError::Consensus(sp_consensus::Error::StateUnavailable(msg))
})?;

// Move up the chain.
header = blockchain.expect_header(BlockId::Hash(parent_hash))?;
};
Expand Down
4 changes: 4 additions & 0 deletions client/beefy/src/round.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ where
}
}

pub(crate) fn validator_set(&self) -> &ValidatorSet<Public> {
&self.validator_set
}

pub(crate) fn validator_set_id(&self) -> ValidatorSetId {
self.validator_set.id()
}
Expand Down
Loading

0 comments on commit 29d2174

Please sign in to comment.