Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

client/beefy: fix on-demand justifications sync for old blocks #12767

Merged
merged 8 commits into from
Nov 28, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,17 @@

//! Generating request logic for request/response protocol for syncing BEEFY justifications.
use beefy_primitives::{crypto::AuthorityId, BeefyApi, ValidatorSet};
use beefy_primitives::{crypto::AuthorityId, ValidatorSet};
use codec::Encode;
use futures::channel::{oneshot, oneshot::Canceled};
use log::{debug, error, warn};
use log::{debug, warn};
use parking_lot::Mutex;
use sc_network::{PeerId, ProtocolName};
use sc_network_common::{
request_responses::{IfDisconnected, RequestFailure},
service::NetworkRequest,
};
use sp_api::ProvideRuntimeApi;
use sp_runtime::{
generic::BlockId,
traits::{Block, NumberFor},
};
use sp_runtime::traits::{Block, NumberFor};
use std::{collections::VecDeque, result::Result, sync::Arc};

use crate::{
Expand All @@ -46,14 +42,19 @@ type Response = Result<Vec<u8>, RequestFailure>;
/// Used to receive a response from the network.
type ResponseReceiver = oneshot::Receiver<Response>;

#[derive(Clone, Debug)]
struct RequestInfo<B: Block> {
block: NumberFor<B>,
active_set: ValidatorSet<AuthorityId>,
}

enum State<B: Block> {
Idle,
AwaitingResponse(PeerId, NumberFor<B>, ResponseReceiver),
AwaitingResponse(PeerId, RequestInfo<B>, ResponseReceiver),
}

pub struct OnDemandJustificationsEngine<B: Block, R> {
pub struct OnDemandJustificationsEngine<B: Block> {
network: Arc<dyn NetworkRequest + Send + Sync>,
runtime: Arc<R>,
protocol_name: ProtocolName,

live_peers: Arc<Mutex<KnownPeers<B>>>,
Expand All @@ -62,21 +63,14 @@ pub struct OnDemandJustificationsEngine<B: Block, R> {
state: State<B>,
}

impl<B, R> OnDemandJustificationsEngine<B, R>
where
B: Block,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B>,
{
impl<B: Block> OnDemandJustificationsEngine<B> {
pub fn new(
network: Arc<dyn NetworkRequest + Send + Sync>,
runtime: Arc<R>,
protocol_name: ProtocolName,
live_peers: Arc<Mutex<KnownPeers<B>>>,
) -> Self {
Self {
network,
runtime,
protocol_name,
live_peers,
peers_cache: VecDeque::new(),
Expand All @@ -100,10 +94,15 @@ where
None
}

fn request_from_peer(&mut self, peer: PeerId, block: NumberFor<B>) {
debug!(target: "beefy::sync", "🥩 requesting justif #{:?} from peer {:?}", block, peer);
fn request_from_peer(&mut self, peer: PeerId, req_info: RequestInfo<B>) {
debug!(
target: "beefy::sync",
"🥩 requesting justif #{:?} from peer {:?}",
req_info.block,
peer,
);

let payload = JustificationRequest::<B> { begin: block }.encode();
let payload = JustificationRequest::<B> { begin: req_info.block }.encode();

let (tx, rx) = oneshot::channel();

Expand All @@ -115,11 +114,13 @@ where
IfDisconnected::ImmediateError,
);

self.state = State::AwaitingResponse(peer, block, rx);
self.state = State::AwaitingResponse(peer, req_info, rx);
}

/// If no other request is in progress, start new justification request for `block`.
pub fn request(&mut self, block: NumberFor<B>) {
/// Start new justification request for `block`, if no other request is in progress.
///
/// `active_set` will be used to verify validity of potential responses.
pub fn request(&mut self, block: NumberFor<B>, active_set: ValidatorSet<AuthorityId>) {
// ignore new requests while there's already one pending
if matches!(self.state, State::AwaitingResponse(_, _, _)) {
return
Expand All @@ -129,7 +130,7 @@ where
// Start the requests engine - each unsuccessful received response will automatically
// trigger a new request to the next peer in the `peers_cache` until there are none left.
if let Some(peer) = self.try_next_peer() {
self.request_from_peer(peer, block);
self.request_from_peer(peer, RequestInfo { block, active_set });
} else {
debug!(target: "beefy::sync", "🥩 no good peers to request justif #{:?} from", block);
}
Expand All @@ -138,11 +139,10 @@ where
/// Cancel any pending request for block numbers smaller or equal to `block`.
pub fn cancel_requests_older_than(&mut self, block: NumberFor<B>) {
match &self.state {
State::AwaitingResponse(_, number, _) if *number <= block => {
State::AwaitingResponse(_, req_info, _) if req_info.block <= block => {
debug!(
target: "beefy::sync",
"🥩 cancel pending request for justification #{:?}",
number
target: "beefy::sync", "🥩 cancel pending request for justification #{:?}",
req_info.block
);
self.state = State::Idle;
},
Expand All @@ -153,77 +153,65 @@ where
fn process_response(
&mut self,
peer: PeerId,
block: NumberFor<B>,
validator_set: &ValidatorSet<AuthorityId>,
req_info: &RequestInfo<B>,
response: Result<Response, Canceled>,
) -> Result<BeefyVersionedFinalityProof<B>, Error> {
response
.map_err(|e| {
debug!(
target: "beefy::sync",
"🥩 for on demand justification #{:?}, peer {:?} hung up: {:?}",
block, peer, e
req_info.block, peer, e
);
Error::InvalidResponse
})?
.map_err(|e| {
debug!(
target: "beefy::sync",
"🥩 for on demand justification #{:?}, peer {:?} error: {:?}",
block, peer, e
req_info.block, peer, e
);
Error::InvalidResponse
})
.and_then(|encoded| {
decode_and_verify_finality_proof::<B>(&encoded[..], block, &validator_set).map_err(
|e| {
debug!(
target: "beefy::sync",
"🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}",
block, peer, e
);
Error::InvalidResponse
},
decode_and_verify_finality_proof::<B>(
&encoded[..],
req_info.block,
&req_info.active_set,
)
.map_err(|e| {
debug!(
target: "beefy::sync",
"🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}",
req_info.block, peer, e
);
Error::InvalidResponse
})
})
}

pub async fn next(&mut self) -> Option<BeefyVersionedFinalityProof<B>> {
let (peer, block, resp) = match &mut self.state {
let (peer, req_info, resp) = match &mut self.state {
State::Idle => {
futures::pending!();
// Doesn't happen as 'futures::pending!()' is an 'await' barrier that never passes.
return None
},
State::AwaitingResponse(peer, block, receiver) => {
State::AwaitingResponse(peer, req_info, receiver) => {
let resp = receiver.await;
(*peer, *block, resp)
(*peer, req_info.clone(), resp)
},
};
// We received the awaited response. Our 'receiver' will never generate any other response,
// meaning we're done with current state. Move the engine to `State::Idle`.
self.state = State::Idle;

let block_id = BlockId::number(block);
let validator_set = self
.runtime
.runtime_api()
.validator_set(&block_id)
.map_err(|e| {
error!(target: "beefy::sync", "🥩 Runtime API error {:?} in on-demand justif engine.", e);
e
})
.ok()?
.or_else(|| {
error!(target: "beefy::sync", "🥩 BEEFY pallet not available for block {:?}.", block);
None
})?;

self.process_response(peer, block, &validator_set, resp)
let block = req_info.block;
self.process_response(peer, &req_info, resp)
.map_err(|_| {
// No valid justification received, try next peer in our set.
if let Some(peer) = self.try_next_peer() {
self.request_from_peer(peer, block);
self.request_from_peer(peer, req_info);
} else {
warn!(target: "beefy::sync", "🥩 ran out of peers to request justif #{:?} from", block);
}
Expand Down
29 changes: 16 additions & 13 deletions client/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ where
// The `GossipValidator` adds and removes known peers based on valid votes and network events.
let on_demand_justifications = OnDemandJustificationsEngine::new(
network.clone(),
runtime.clone(),
justifications_protocol_name,
known_peers,
);
Expand Down Expand Up @@ -295,7 +294,7 @@ where
persisted_state,
};

let worker = worker::BeefyWorker::<_, _, _, _, _>::new(worker_params);
let worker = worker::BeefyWorker::<_, _, _, _>::new(worker_params);

futures::future::join(
worker.run(block_import_justif, finality_notifications),
Expand Down Expand Up @@ -377,17 +376,8 @@ where
break state
}

// Check if we should move up the chain.
let parent_hash = *header.parent_hash();
if *header.number() == One::one() ||
runtime
.runtime_api()
.validator_set(&BlockId::hash(parent_hash))
.ok()
.flatten()
.is_none()
{
// We've reached pallet genesis, initialize voter here.
if *header.number() == One::one() {
// We've reached chain genesis, initialize voter here.
let genesis_num = *header.number();
let genesis_set = expect_validator_set(runtime, BlockId::hash(header.hash()))
.and_then(genesis_set_sanity_check)?;
Expand All @@ -408,6 +398,19 @@ where
sessions.push_front(Rounds::new(*header.number(), active));
}

// Check if state is still available if we move up the chain.
let parent_hash = *header.parent_hash();
runtime
.runtime_api()
.validator_set(&BlockId::hash(parent_hash))
.ok()
.flatten()
.ok_or_else(|| {
let msg = format!("{}. Could not initialize BEEFY voter.", parent_hash);
error!(target: "beefy", "🥩 {}", msg);
ClientError::Consensus(sp_consensus::Error::StateUnavailable(msg))
})?;

// Move up the chain.
header = blockchain.expect_header(BlockId::Hash(parent_hash))?;
};
Expand Down
4 changes: 4 additions & 0 deletions client/beefy/src/round.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ where
}
}

pub(crate) fn validator_set(&self) -> &ValidatorSet<Public> {
&self.validator_set
}

pub(crate) fn validator_set_id(&self) -> ValidatorSetId {
self.validator_set.id()
}
Expand Down
Loading