From 22d36757176982befd38bbe2bb51f9b5eff4af7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Tue, 2 Mar 2021 21:30:33 +0100 Subject: [PATCH 1/4] AuRa improvements Hot and fresh AuRa improvements. This pr does the following: - Move code belonging to the import queue etc to import_queue.rs - Introduce `ImportQueueParams` and `StartAuraParams` structs to make it more easier to understand what parameters we pass to AuRa. - Introduce `CheckForEquivocation` to tell AuRa if it should check for equivocation on block import. This is required for parachains, because they are allowed to equivocate when they build two blocks for the same slot, but for different relay chain parents. --- bin/node-template/node/src/service.rs | 50 +- client/consensus/aura/src/import_queue.rs | 540 +++++++++++++++++++++ client/consensus/aura/src/lib.rs | 549 +++------------------- 3 files changed, 644 insertions(+), 495 deletions(-) create mode 100644 client/consensus/aura/src/import_queue.rs diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index 92518ef22dee2..d5c3a383190c3 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -9,6 +9,7 @@ use sp_inherents::InherentDataProviders; use sc_executor::native_executor_instance; pub use sc_executor::NativeExecutor; use sp_consensus_aura::sr25519::{AuthorityPair as AuraPair}; +use sc_consensus_aura::{ImportQueueParams, StartAuraParams}; use sc_finality_grandpa::SharedVoterState; use sc_keystore::LocalKeystore; use sc_telemetry::TelemetrySpan; @@ -67,15 +68,18 @@ pub fn new_partial(config: &Configuration) -> Result( - sc_consensus_aura::slot_duration(&*client)?, - aura_block_import.clone(), - Some(Box::new(grandpa_block_import.clone())), - client.clone(), - inherent_data_providers.clone(), - &task_manager.spawn_essential_handle(), - config.prometheus_registry(), - sp_consensus::CanAuthorWithNativeVersion::new(client.executor().clone()), + let import_queue = sc_consensus_aura::import_queue::( + ImportQueueParams { + block_import: aura_block_import.clone(), + justification_import: Some(Box::new(grandpa_block_import.clone())), + client: client.clone(), + inherent_data_providers: inherent_data_providers.clone(), + spawner: &task_manager.spawn_essential_handle(), + can_author_with: sp_consensus::CanAuthorWithNativeVersion::new(client.executor().clone()), + slot_duration: sc_consensus_aura::slot_duration(&*client)?, + registry: config.prometheus_registry(), + check_for_equivocation: Default::default(), + }, )?; Ok(sc_service::PartialComponents { @@ -185,7 +189,7 @@ pub fn new_full(mut config: Configuration) -> Result )?; if role.is_authority() { - let proposer = sc_basic_authorship::ProposerFactory::new( + let proposer_factory = sc_basic_authorship::ProposerFactory::new( task_manager.spawn_handle(), client.clone(), transaction_pool, @@ -195,18 +199,20 @@ pub fn new_full(mut config: Configuration) -> Result let can_author_with = sp_consensus::CanAuthorWithNativeVersion::new(client.executor().clone()); - let aura = sc_consensus_aura::start_aura::<_, _, _, _, _, AuraPair, _, _, _,_>( - sc_consensus_aura::slot_duration(&*client)?, - client.clone(), - select_chain, - block_import, - proposer, - network.clone(), - inherent_data_providers.clone(), - force_authoring, - backoff_authoring_blocks, - keystore_container.sync_keystore(), - can_author_with, + let aura = sc_consensus_aura::start_aura::( + StartAuraParams { + slot_duration: sc_consensus_aura::slot_duration(&*client)?, + client: client.clone(), + select_chain, + block_import, + proposer_factory, + inherent_data_providers: inherent_data_providers.clone(), + force_authoring, + backoff_authoring_blocks, + keystore: keystore_container.sync_keystore(), + can_author_with, + sync_oracle: network.clone(), + }, )?; // the AURA authoring task is considered essential, i.e. if it diff --git a/client/consensus/aura/src/import_queue.rs b/client/consensus/aura/src/import_queue.rs new file mode 100644 index 0000000000000..ab9a267b1e4a3 --- /dev/null +++ b/client/consensus/aura/src/import_queue.rs @@ -0,0 +1,540 @@ +// This file is part of Substrate. + +// Copyright (C) 2018-2021 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::{ + AuthorityId, find_pre_digest, slot_author, aura_err, Error, AuraSlotCompatible, SlotDuration, + register_aura_inherent_data_provider, authorities, +}; +use std::{ + sync::Arc, time::Duration, thread, marker::PhantomData, hash::Hash, fmt::Debug, + collections::HashMap, +}; +use log::{debug, info, trace}; +use prometheus_endpoint::Registry; +use codec::{Encode, Decode, Codec}; +use sp_consensus::{ + BlockImport, CanAuthorWith, ForkChoiceStrategy, BlockImportParams, + BlockOrigin, Error as ConsensusError, BlockCheckParams, ImportResult, + import_queue::{ + Verifier, BasicQueue, DefaultImportQueue, BoxJustificationImport, + }, +}; +use sc_client_api::{backend::AuxStore, BlockOf}; +use sp_blockchain::{well_known_cache_keys::{self, Id as CacheKeyId}, ProvideCache, HeaderBackend}; +use sp_block_builder::BlockBuilder as BlockBuilderApi; +use sp_runtime::{generic::{BlockId, OpaqueDigestItemId}, Justification}; +use sp_runtime::traits::{Block as BlockT, Header, DigestItemFor, Zero}; +use sp_api::ProvideRuntimeApi; +use sp_core::crypto::Pair; +use sp_inherents::{InherentDataProviders, InherentData}; +use sp_timestamp::InherentError as TIError; +use sc_telemetry::{telemetry, CONSENSUS_TRACE, CONSENSUS_DEBUG, CONSENSUS_INFO}; +use sc_consensus_slots::{CheckedHeader, SlotCompatible, check_equivocation}; +use sp_consensus_slots::Slot; +use sp_api::ApiExt; +use sp_consensus_aura::{ + digests::CompatibleDigestItem, AuraApi, inherents::AuraInherentData, + ConsensusLog, AURA_ENGINE_ID, +}; + +/// check a header has been signed by the right key. If the slot is too far in the future, an error +/// will be returned. If it's successful, returns the pre-header and the digest item +/// containing the seal. +/// +/// This digest item will always return `Some` when used with `as_aura_seal`. +fn check_header( + client: &C, + slot_now: Slot, + mut header: B::Header, + hash: B::Hash, + authorities: &[AuthorityId

], + check_for_equivocation: CheckForEquivocation, +) -> Result)>, Error> where + DigestItemFor: CompatibleDigestItem, + P::Signature: Codec, + C: sc_client_api::backend::AuxStore, + P::Public: Encode + Decode + PartialEq + Clone, +{ + let seal = match header.digest_mut().pop() { + Some(x) => x, + None => return Err(Error::HeaderUnsealed(hash)), + }; + + let sig = seal.as_aura_seal().ok_or_else(|| { + aura_err(Error::HeaderBadSeal(hash)) + })?; + + let slot = find_pre_digest::(&header)?; + + if slot > slot_now { + header.digest_mut().push(seal); + Ok(CheckedHeader::Deferred(header, slot)) + } else { + // check the signature is valid under the expected authority and + // chain state. + let expected_author = match slot_author::

(slot, &authorities) { + None => return Err(Error::SlotAuthorNotFound), + Some(author) => author, + }; + + let pre_hash = header.hash(); + + if P::verify(&sig, pre_hash.as_ref(), expected_author) { + if check_for_equivocation.check_for_equivocation() { + if let Some(equivocation_proof) = check_equivocation( + client, + slot_now, + slot, + &header, + expected_author, + ).map_err(Error::Client)? { + info!( + target: "aura", + "Slot author is equivocating at slot {} with headers {:?} and {:?}", + slot, + equivocation_proof.first_header.hash(), + equivocation_proof.second_header.hash(), + ); + } + } + + Ok(CheckedHeader::Checked(header, (slot, seal))) + } else { + Err(Error::BadSignature(hash)) + } + } +} + +/// A verifier for Aura blocks. +pub struct AuraVerifier { + client: Arc, + phantom: PhantomData

, + inherent_data_providers: InherentDataProviders, + can_author_with: CAW, + check_for_equivocation: CheckForEquivocation, +} + +impl AuraVerifier { + pub(crate) fn new( + client: Arc, + inherent_data_providers: InherentDataProviders, + can_author_with: CAW, + check_for_equivocation: CheckForEquivocation, + ) -> Self { + Self { + client, + inherent_data_providers, + can_author_with, + check_for_equivocation, + phantom: PhantomData, + } + } +} + +impl AuraVerifier where + P: Send + Sync + 'static, + CAW: Send + Sync + 'static, +{ + fn check_inherents( + &self, + block: B, + block_id: BlockId, + inherent_data: InherentData, + timestamp_now: u64, + ) -> Result<(), Error> where + C: ProvideRuntimeApi, C::Api: BlockBuilderApi, + CAW: CanAuthorWith, + { + const MAX_TIMESTAMP_DRIFT_SECS: u64 = 60; + + if let Err(e) = self.can_author_with.can_author_with(&block_id) { + debug!( + target: "aura", + "Skipping `check_inherents` as authoring version is not compatible: {}", + e, + ); + + return Ok(()) + } + + let inherent_res = self.client.runtime_api().check_inherents( + &block_id, + block, + inherent_data, + ).map_err(|e| Error::Client(e.into()))?; + + if !inherent_res.ok() { + inherent_res + .into_errors() + .try_for_each(|(i, e)| match TIError::try_from(&i, &e) { + Some(TIError::ValidAtTimestamp(timestamp)) => { + // halt import until timestamp is valid. + // reject when too far ahead. + if timestamp > timestamp_now + MAX_TIMESTAMP_DRIFT_SECS { + return Err(Error::TooFarInFuture); + } + + let diff = timestamp.saturating_sub(timestamp_now); + info!( + target: "aura", + "halting for block {} seconds in the future", + diff + ); + telemetry!(CONSENSUS_INFO; "aura.halting_for_future_block"; + "diff" => ?diff + ); + thread::sleep(Duration::from_secs(diff)); + Ok(()) + }, + Some(TIError::Other(e)) => Err(Error::Runtime(e.into())), + None => Err(Error::DataProvider( + self.inherent_data_providers.error_to_string(&i, &e) + )), + }) + } else { + Ok(()) + } + } +} + +impl Verifier for AuraVerifier where + C: ProvideRuntimeApi + + Send + + Sync + + sc_client_api::backend::AuxStore + + ProvideCache + + BlockOf, + C::Api: BlockBuilderApi + AuraApi> + ApiExt, + DigestItemFor: CompatibleDigestItem, + P: Pair + Send + Sync + 'static, + P::Public: Send + Sync + Hash + Eq + Clone + Decode + Encode + Debug + 'static, + P::Signature: Encode + Decode, + CAW: CanAuthorWith + Send + Sync + 'static, +{ + fn verify( + &mut self, + origin: BlockOrigin, + header: B::Header, + justification: Option, + mut body: Option>, + ) -> Result<(BlockImportParams, Option)>>), String> { + let mut inherent_data = self.inherent_data_providers + .create_inherent_data() + .map_err(|e| e.into_string())?; + let (timestamp_now, slot_now, _) = AuraSlotCompatible.extract_timestamp_and_slot(&inherent_data) + .map_err(|e| format!("Could not extract timestamp and slot: {:?}", e))?; + let hash = header.hash(); + let parent_hash = *header.parent_hash(); + let authorities = authorities(self.client.as_ref(), &BlockId::Hash(parent_hash)) + .map_err(|e| format!("Could not fetch authorities at {:?}: {:?}", parent_hash, e))?; + + // we add one to allow for some small drift. + // FIXME #1019 in the future, alter this queue to allow deferring of + // headers + let checked_header = check_header::( + &self.client, + slot_now + 1, + header, + hash, + &authorities[..], + self.check_for_equivocation, + ).map_err(|e| e.to_string())?; + match checked_header { + CheckedHeader::Checked(pre_header, (slot, seal)) => { + // if the body is passed through, we need to use the runtime + // to check that the internally-set timestamp in the inherents + // actually matches the slot set in the seal. + if let Some(inner_body) = body.take() { + inherent_data.aura_replace_inherent_data(slot); + let block = B::new(pre_header.clone(), inner_body); + + // skip the inherents verification if the runtime API is old. + if self.client + .runtime_api() + .has_api_with::, _>( + &BlockId::Hash(parent_hash), + |v| v >= 2, + ) + .map_err(|e| format!("{:?}", e))? + { + self.check_inherents( + block.clone(), + BlockId::Hash(parent_hash), + inherent_data, + timestamp_now, + ).map_err(|e| e.to_string())?; + } + + let (_, inner_body) = block.deconstruct(); + body = Some(inner_body); + } + + trace!(target: "aura", "Checked {:?}; importing.", pre_header); + telemetry!(CONSENSUS_TRACE; "aura.checked_and_importing"; "pre_header" => ?pre_header); + + // Look for an authorities-change log. + let maybe_keys = pre_header.digest() + .logs() + .iter() + .filter_map(|l| l.try_to::>>( + OpaqueDigestItemId::Consensus(&AURA_ENGINE_ID) + )) + .find_map(|l| match l { + ConsensusLog::AuthoritiesChange(a) => Some( + vec![(well_known_cache_keys::AUTHORITIES, a.encode())] + ), + _ => None, + }); + + let mut import_block = BlockImportParams::new(origin, pre_header); + import_block.post_digests.push(seal); + import_block.body = body; + import_block.justification = justification; + import_block.fork_choice = Some(ForkChoiceStrategy::LongestChain); + import_block.post_hash = Some(hash); + + Ok((import_block, maybe_keys)) + } + CheckedHeader::Deferred(a, b) => { + debug!(target: "aura", "Checking {:?} failed; {:?}, {:?}.", hash, a, b); + telemetry!(CONSENSUS_DEBUG; "aura.header_too_far_in_future"; + "hash" => ?hash, "a" => ?a, "b" => ?b + ); + Err(format!("Header {:?} rejected: too far in the future", hash)) + } + } + } +} + +fn initialize_authorities_cache(client: &C) -> Result<(), ConsensusError> where + A: Codec + Debug, + B: BlockT, + C: ProvideRuntimeApi + BlockOf + ProvideCache, + C::Api: AuraApi, +{ + // no cache => no initialization + let cache = match client.cache() { + Some(cache) => cache, + None => return Ok(()), + }; + + // check if we already have initialized the cache + let map_err = |error| sp_consensus::Error::from(sp_consensus::Error::ClientImport( + format!( + "Error initializing authorities cache: {}", + error, + ))); + + let genesis_id = BlockId::Number(Zero::zero()); + let genesis_authorities: Option> = cache + .get_at(&well_known_cache_keys::AUTHORITIES, &genesis_id) + .unwrap_or(None) + .and_then(|(_, _, v)| Decode::decode(&mut &v[..]).ok()); + if genesis_authorities.is_some() { + return Ok(()); + } + + let genesis_authorities = authorities(client, &genesis_id)?; + cache.initialize(&well_known_cache_keys::AUTHORITIES, genesis_authorities.encode()) + .map_err(map_err)?; + + Ok(()) +} + +/// A block-import handler for Aura. +pub struct AuraBlockImport, P> { + inner: I, + client: Arc, + _phantom: PhantomData<(Block, P)>, +} + +impl, P> Clone for AuraBlockImport { + fn clone(&self) -> Self { + AuraBlockImport { + inner: self.inner.clone(), + client: self.client.clone(), + _phantom: PhantomData, + } + } +} + +impl, P> AuraBlockImport { + /// New aura block import. + pub fn new( + inner: I, + client: Arc, + ) -> Self { + Self { + inner, + client, + _phantom: PhantomData, + } + } +} + +impl BlockImport for AuraBlockImport where + I: BlockImport> + Send + Sync, + I::Error: Into, + C: HeaderBackend + ProvideRuntimeApi, + P: Pair + Send + Sync + 'static, + P::Public: Clone + Eq + Send + Sync + Hash + Debug + Encode + Decode, + P::Signature: Encode + Decode, +{ + type Error = ConsensusError; + type Transaction = sp_api::TransactionFor; + + fn check_block( + &mut self, + block: BlockCheckParams, + ) -> Result { + self.inner.check_block(block).map_err(Into::into) + } + + fn import_block( + &mut self, + block: BlockImportParams, + new_cache: HashMap>, + ) -> Result { + let hash = block.post_hash(); + let slot = find_pre_digest::(&block.header) + .expect("valid Aura headers must contain a predigest; \ + header has been already verified; qed"); + + let parent_hash = *block.header.parent_hash(); + let parent_header = self.client.header(BlockId::Hash(parent_hash)) + .map_err(|e| ConsensusError::ChainLookup(e.to_string()))? + .ok_or_else(|| ConsensusError::ChainLookup(aura_err( + Error::::ParentUnavailable(parent_hash, hash) + ).into()))?; + + let parent_slot = find_pre_digest::(&parent_header) + .expect("valid Aura headers contain a pre-digest; \ + parent header has already been verified; qed"); + + // make sure that slot number is strictly increasing + if slot <= parent_slot { + return Err( + ConsensusError::ClientImport(aura_err( + Error::::SlotMustIncrease(parent_slot, slot) + ).into()) + ); + } + + self.inner.import_block(block, new_cache).map_err(Into::into) + } +} + +/// Should we check for equivocation of a block author? +#[derive(Debug, Clone, Copy)] +pub enum CheckForEquivocation { + /// Yes, check for equivocation. + /// + /// This is the default setting for this. + Yes, + /// No, don't check for equivocation. + No, +} + +impl CheckForEquivocation { + /// Should we check for equivocation? + fn check_for_equivocation(self) -> bool { + matches!(self, Self::Yes) + } +} + +impl Default for CheckForEquivocation { + fn default() -> Self { + Self::Yes + } +} + +/// Parameters of [`import_queue`]. +pub struct ImportQueueParams<'a, Block, I, C, S, CAW> { + /// The block import to use. + pub block_import: I, + /// The justification import. + pub justification_import: Option>, + /// The client to interact with the chain. + pub client: Arc, + /// The inherent data provider, to create the inherent data. + pub inherent_data_providers: InherentDataProviders, + /// The spawner to spawn background tasks. + pub spawner: &'a S, + /// The prometheus registry. + pub registry: Option<&'a Registry>, + /// Can we author with the current node? + pub can_author_with: CAW, + /// Should we check for equivocation? + pub check_for_equivocation: CheckForEquivocation, + /// The duration of one slot. + pub slot_duration: SlotDuration, +} + +/// Start an import queue for the Aura consensus algorithm. +pub fn import_queue<'a, P, Block, I, C, S, CAW>( + ImportQueueParams { + block_import, + justification_import, + client, + inherent_data_providers, + spawner, + registry, + can_author_with, + check_for_equivocation, + slot_duration, + }: ImportQueueParams<'a, Block, I, C, S, CAW> +) -> Result, sp_consensus::Error> where + Block: BlockT, + C::Api: BlockBuilderApi + AuraApi> + ApiExt, + C: 'static + + ProvideRuntimeApi + + BlockOf + + ProvideCache + + Send + + Sync + + AuxStore + + HeaderBackend, + I: BlockImport> + + Send + + Sync + + 'static, + DigestItemFor: CompatibleDigestItem, + P: Pair + Send + Sync + 'static, + P::Public: Clone + Eq + Send + Sync + Hash + Debug + Encode + Decode, + P::Signature: Encode + Decode, + S: sp_core::traits::SpawnEssentialNamed, + CAW: CanAuthorWith + Send + Sync + 'static, +{ + register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.get())?; + initialize_authorities_cache(&*client)?; + + let verifier = AuraVerifier::<_, P, _>::new( + client, + inherent_data_providers, + can_author_with, + check_for_equivocation, + ); + + Ok(BasicQueue::new( + verifier, + Box::new(block_import), + justification_import, + spawner, + registry, + )) +} diff --git a/client/consensus/aura/src/lib.rs b/client/consensus/aura/src/lib.rs index 71aa7bdb7c742..1c30f136ea00f 100644 --- a/client/consensus/aura/src/lib.rs +++ b/client/consensus/aura/src/lib.rs @@ -31,50 +31,34 @@ //! NOTE: Aura itself is designed to be generic over the crypto used. #![forbid(missing_docs, unsafe_code)] use std::{ - sync::Arc, time::Duration, thread, marker::PhantomData, hash::Hash, fmt::Debug, pin::Pin, - collections::HashMap, convert::{TryFrom, TryInto}, + sync::Arc, marker::PhantomData, hash::Hash, fmt::Debug, pin::Pin, convert::{TryFrom, TryInto}, }; use futures::prelude::*; use parking_lot::Mutex; -use log::{debug, info, trace}; -use prometheus_endpoint::Registry; +use log::{debug, trace}; use codec::{Encode, Decode, Codec}; use sp_consensus::{ BlockImport, Environment, Proposer, CanAuthorWith, ForkChoiceStrategy, BlockImportParams, - BlockOrigin, Error as ConsensusError, SelectChain, SlotData, BlockCheckParams, ImportResult, - import_queue::{ - Verifier, BasicQueue, DefaultImportQueue, BoxJustificationImport, - }, + BlockOrigin, Error as ConsensusError, SelectChain, SlotData, }; use sc_client_api::{backend::AuxStore, BlockOf}; -use sp_blockchain::{ - self, Result as CResult, well_known_cache_keys::{self, Id as CacheKeyId}, - ProvideCache, HeaderBackend, -}; -use sp_block_builder::BlockBuilder as BlockBuilderApi; +use sp_blockchain::{Result as CResult, well_known_cache_keys, ProvideCache, HeaderBackend}; use sp_core::crypto::Public; use sp_application_crypto::{AppKey, AppPublic}; -use sp_runtime::{generic::{BlockId, OpaqueDigestItemId}, traits::NumberFor, Justification}; +use sp_runtime::{generic::BlockId, traits::NumberFor}; use sp_runtime::traits::{Block as BlockT, Header, DigestItemFor, Zero, Member}; use sp_api::ProvideRuntimeApi; use sp_core::crypto::Pair; use sp_keystore::{SyncCryptoStorePtr, SyncCryptoStore}; use sp_inherents::{InherentDataProviders, InherentData}; -use sp_timestamp::{ - TimestampInherentData, InherentType as TimestampInherent, InherentError as TIError -}; -use sc_telemetry::{telemetry, CONSENSUS_TRACE, CONSENSUS_DEBUG, CONSENSUS_INFO}; - -use sc_consensus_slots::{ - CheckedHeader, SlotInfo, SlotCompatible, StorageChanges, check_equivocation, - BackoffAuthoringBlocksStrategy, -}; +use sp_timestamp::{TimestampInherentData, InherentType as TimestampInherent}; +use sc_consensus_slots::{SlotInfo, SlotCompatible, StorageChanges, BackoffAuthoringBlocksStrategy}; use sp_consensus_slots::Slot; -use sp_api::ApiExt; +mod import_queue; pub use sp_consensus_aura::{ ConsensusLog, AuraApi, AURA_ENGINE_ID, digests::CompatibleDigestItem, @@ -84,6 +68,7 @@ pub use sp_consensus_aura::{ }, }; pub use sp_consensus::SyncOracle; +pub use import_queue::{ImportQueueParams, import_queue, AuraBlockImport, CheckForEquivocation}; type AuthorityId

=

::Public; @@ -133,26 +118,54 @@ impl SlotCompatible for AuraSlotCompatible { } } +/// Parameters of [`start_aura`]. +pub struct StartAuraParams { + /// The duration of a slot. + pub slot_duration: SlotDuration, + /// The client to interact with the chain. + pub client: Arc, + /// A select chain implementation to select the best block. + pub select_chain: SC, + /// The block import. + pub block_import: I, + /// The proposer factory to build proposer instances. + pub proposer_factory: PF, + /// The sync oracle that can give us the current sync status. + pub sync_oracle: SO, + /// The inherent data providers to create the inherent data. + pub inherent_data_providers: InherentDataProviders, + /// Should we force the authoring of blocks? + pub force_authoring: bool, + /// The backoff strategy when we miss slots. + pub backoff_authoring_blocks: Option, + /// The keystore used by the node. + pub keystore: SyncCryptoStorePtr, + /// Can we author a block with this node? + pub can_author_with: CAW, +} + /// Start the aura worker. The returned future should be run in a futures executor. -pub fn start_aura( - slot_duration: SlotDuration, - client: Arc, - select_chain: SC, - block_import: I, - env: E, - sync_oracle: SO, - inherent_data_providers: InherentDataProviders, - force_authoring: bool, - backoff_authoring_blocks: Option, - keystore: SyncCryptoStorePtr, - can_author_with: CAW, +pub fn start_aura( + StartAuraParams { + slot_duration, + client, + select_chain, + block_import, + proposer_factory: env, + sync_oracle, + inherent_data_providers, + force_authoring, + backoff_authoring_blocks, + keystore, + can_author_with, + }: StartAuraParams, ) -> Result, sp_consensus::Error> where B: BlockT, C: ProvideRuntimeApi + BlockOf + ProvideCache + AuxStore + HeaderBackend + Send + Sync, C::Api: AuraApi>, SC: SelectChain, - E: Environment + Send + Sync + 'static, - E::Proposer: Proposer>, + PF: Environment + Send + Sync + 'static, + PF::Proposer: Proposer>, P: Pair + Send + Sync, P::Public: AppPublic + Hash + Member + Encode + Decode, P::Signature: TryFrom> + Hash + Member + Encode + Decode, @@ -430,290 +443,21 @@ fn find_pre_digest(header: &B::Header) -> Result( - client: &C, - slot_now: Slot, - mut header: B::Header, - hash: B::Hash, - authorities: &[AuthorityId

], -) -> Result)>, Error> where - DigestItemFor: CompatibleDigestItem, - P::Signature: Codec, - C: sc_client_api::backend::AuxStore, - P::Public: Encode + Decode + PartialEq + Clone, -{ - let seal = match header.digest_mut().pop() { - Some(x) => x, - None => return Err(Error::HeaderUnsealed(hash)), - }; - - let sig = seal.as_aura_seal().ok_or_else(|| { - aura_err(Error::HeaderBadSeal(hash)) - })?; - - let slot = find_pre_digest::(&header)?; - - if slot > slot_now { - header.digest_mut().push(seal); - Ok(CheckedHeader::Deferred(header, slot)) +/// Register the aura inherent data provider, if not registered already. +fn register_aura_inherent_data_provider( + inherent_data_providers: &InherentDataProviders, + slot_duration: u64, +) -> Result<(), sp_consensus::Error> { + if !inherent_data_providers.has_provider(&INHERENT_IDENTIFIER) { + inherent_data_providers + .register_provider(InherentDataProvider::new(slot_duration)) + .map_err(Into::into) + .map_err(sp_consensus::Error::InherentData) } else { - // check the signature is valid under the expected authority and - // chain state. - let expected_author = match slot_author::

(slot, &authorities) { - None => return Err(Error::SlotAuthorNotFound), - Some(author) => author, - }; - - let pre_hash = header.hash(); - - if P::verify(&sig, pre_hash.as_ref(), expected_author) { - if let Some(equivocation_proof) = check_equivocation( - client, - slot_now, - slot, - &header, - expected_author, - ).map_err(Error::Client)? { - info!( - "Slot author is equivocating at slot {} with headers {:?} and {:?}", - slot, - equivocation_proof.first_header.hash(), - equivocation_proof.second_header.hash(), - ); - } - - Ok(CheckedHeader::Checked(header, (slot, seal))) - } else { - Err(Error::BadSignature(hash)) - } - } -} - -/// A verifier for Aura blocks. -pub struct AuraVerifier { - client: Arc, - phantom: PhantomData

, - inherent_data_providers: sp_inherents::InherentDataProviders, - can_author_with: CAW, -} - -impl AuraVerifier where - P: Send + Sync + 'static, - CAW: Send + Sync + 'static, -{ - fn check_inherents( - &self, - block: B, - block_id: BlockId, - inherent_data: InherentData, - timestamp_now: u64, - ) -> Result<(), Error> where - C: ProvideRuntimeApi, C::Api: BlockBuilderApi, - CAW: CanAuthorWith, - { - const MAX_TIMESTAMP_DRIFT_SECS: u64 = 60; - - if let Err(e) = self.can_author_with.can_author_with(&block_id) { - debug!( - target: "aura", - "Skipping `check_inherents` as authoring version is not compatible: {}", - e, - ); - - return Ok(()) - } - - let inherent_res = self.client.runtime_api().check_inherents( - &block_id, - block, - inherent_data, - ).map_err(|e| Error::Client(e.into()))?; - - if !inherent_res.ok() { - inherent_res - .into_errors() - .try_for_each(|(i, e)| match TIError::try_from(&i, &e) { - Some(TIError::ValidAtTimestamp(timestamp)) => { - // halt import until timestamp is valid. - // reject when too far ahead. - if timestamp > timestamp_now + MAX_TIMESTAMP_DRIFT_SECS { - return Err(Error::TooFarInFuture); - } - - let diff = timestamp.saturating_sub(timestamp_now); - info!( - target: "aura", - "halting for block {} seconds in the future", - diff - ); - telemetry!(CONSENSUS_INFO; "aura.halting_for_future_block"; - "diff" => ?diff - ); - thread::sleep(Duration::from_secs(diff)); - Ok(()) - }, - Some(TIError::Other(e)) => Err(Error::Runtime(e.into())), - None => Err(Error::DataProvider( - self.inherent_data_providers.error_to_string(&i, &e) - )), - }) - } else { - Ok(()) - } - } -} - -#[forbid(deprecated)] -impl Verifier for AuraVerifier where - C: ProvideRuntimeApi + - Send + - Sync + - sc_client_api::backend::AuxStore + - ProvideCache + - BlockOf, - C::Api: BlockBuilderApi + AuraApi> + ApiExt, - DigestItemFor: CompatibleDigestItem, - P: Pair + Send + Sync + 'static, - P::Public: Send + Sync + Hash + Eq + Clone + Decode + Encode + Debug + 'static, - P::Signature: Encode + Decode, - CAW: CanAuthorWith + Send + Sync + 'static, -{ - fn verify( - &mut self, - origin: BlockOrigin, - header: B::Header, - justification: Option, - mut body: Option>, - ) -> Result<(BlockImportParams, Option)>>), String> { - let mut inherent_data = self.inherent_data_providers - .create_inherent_data() - .map_err(|e| e.into_string())?; - let (timestamp_now, slot_now, _) = AuraSlotCompatible.extract_timestamp_and_slot(&inherent_data) - .map_err(|e| format!("Could not extract timestamp and slot: {:?}", e))?; - let hash = header.hash(); - let parent_hash = *header.parent_hash(); - let authorities = authorities(self.client.as_ref(), &BlockId::Hash(parent_hash)) - .map_err(|e| format!("Could not fetch authorities at {:?}: {:?}", parent_hash, e))?; - - // we add one to allow for some small drift. - // FIXME #1019 in the future, alter this queue to allow deferring of - // headers - let checked_header = check_header::( - &self.client, - slot_now + 1, - header, - hash, - &authorities[..], - ).map_err(|e| e.to_string())?; - match checked_header { - CheckedHeader::Checked(pre_header, (slot, seal)) => { - // if the body is passed through, we need to use the runtime - // to check that the internally-set timestamp in the inherents - // actually matches the slot set in the seal. - if let Some(inner_body) = body.take() { - inherent_data.aura_replace_inherent_data(slot); - let block = B::new(pre_header.clone(), inner_body); - - // skip the inherents verification if the runtime API is old. - if self.client - .runtime_api() - .has_api_with::, _>( - &BlockId::Hash(parent_hash), - |v| v >= 2, - ) - .map_err(|e| format!("{:?}", e))? - { - self.check_inherents( - block.clone(), - BlockId::Hash(parent_hash), - inherent_data, - timestamp_now, - ).map_err(|e| e.to_string())?; - } - - let (_, inner_body) = block.deconstruct(); - body = Some(inner_body); - } - - trace!(target: "aura", "Checked {:?}; importing.", pre_header); - telemetry!(CONSENSUS_TRACE; "aura.checked_and_importing"; "pre_header" => ?pre_header); - - // Look for an authorities-change log. - let maybe_keys = pre_header.digest() - .logs() - .iter() - .filter_map(|l| l.try_to::>>( - OpaqueDigestItemId::Consensus(&AURA_ENGINE_ID) - )) - .find_map(|l| match l { - ConsensusLog::AuthoritiesChange(a) => Some( - vec![(well_known_cache_keys::AUTHORITIES, a.encode())] - ), - _ => None, - }); - - let mut import_block = BlockImportParams::new(origin, pre_header); - import_block.post_digests.push(seal); - import_block.body = body; - import_block.justification = justification; - import_block.fork_choice = Some(ForkChoiceStrategy::LongestChain); - import_block.post_hash = Some(hash); - - Ok((import_block, maybe_keys)) - } - CheckedHeader::Deferred(a, b) => { - debug!(target: "aura", "Checking {:?} failed; {:?}, {:?}.", hash, a, b); - telemetry!(CONSENSUS_DEBUG; "aura.header_too_far_in_future"; - "hash" => ?hash, "a" => ?a, "b" => ?b - ); - Err(format!("Header {:?} rejected: too far in the future", hash)) - } - } - } -} - -fn initialize_authorities_cache(client: &C) -> Result<(), ConsensusError> where - A: Codec + Debug, - B: BlockT, - C: ProvideRuntimeApi + BlockOf + ProvideCache, - C::Api: AuraApi, -{ - // no cache => no initialization - let cache = match client.cache() { - Some(cache) => cache, - None => return Ok(()), - }; - - // check if we already have initialized the cache - let map_err = |error| sp_consensus::Error::from(sp_consensus::Error::ClientImport( - format!( - "Error initializing authorities cache: {}", - error, - ))); - - let genesis_id = BlockId::Number(Zero::zero()); - let genesis_authorities: Option> = cache - .get_at(&well_known_cache_keys::AUTHORITIES, &genesis_id) - .unwrap_or(None) - .and_then(|(_, _, v)| Decode::decode(&mut &v[..]).ok()); - if genesis_authorities.is_some() { - return Ok(()); + Ok(()) } - - let genesis_authorities = authorities(client, &genesis_id)?; - cache.initialize(&well_known_cache_keys::AUTHORITIES, genesis_authorities.encode()) - .map_err(map_err)?; - - Ok(()) } -#[allow(deprecated)] fn authorities(client: &C, at: &BlockId) -> Result, ConsensusError> where A: Codec + Debug, B: BlockT, @@ -731,145 +475,6 @@ fn authorities(client: &C, at: &BlockId) -> Result, Consensus .ok_or_else(|| sp_consensus::Error::InvalidAuthoritiesSet.into()) } -/// Register the aura inherent data provider, if not registered already. -fn register_aura_inherent_data_provider( - inherent_data_providers: &InherentDataProviders, - slot_duration: u64, -) -> Result<(), sp_consensus::Error> { - if !inherent_data_providers.has_provider(&INHERENT_IDENTIFIER) { - inherent_data_providers - .register_provider(InherentDataProvider::new(slot_duration)) - .map_err(Into::into) - .map_err(sp_consensus::Error::InherentData) - } else { - Ok(()) - } -} - -/// A block-import handler for Aura. -pub struct AuraBlockImport, P> { - inner: I, - client: Arc, - _phantom: PhantomData<(Block, P)>, -} - -impl, P> Clone for AuraBlockImport { - fn clone(&self) -> Self { - AuraBlockImport { - inner: self.inner.clone(), - client: self.client.clone(), - _phantom: PhantomData, - } - } -} - -impl, P> AuraBlockImport { - /// New aura block import. - pub fn new( - inner: I, - client: Arc, - ) -> Self { - Self { - inner, - client, - _phantom: PhantomData, - } - } -} - -impl BlockImport for AuraBlockImport where - I: BlockImport> + Send + Sync, - I::Error: Into, - C: HeaderBackend + ProvideRuntimeApi, - P: Pair + Send + Sync + 'static, - P::Public: Clone + Eq + Send + Sync + Hash + Debug + Encode + Decode, - P::Signature: Encode + Decode, -{ - type Error = ConsensusError; - type Transaction = sp_api::TransactionFor; - - fn check_block( - &mut self, - block: BlockCheckParams, - ) -> Result { - self.inner.check_block(block).map_err(Into::into) - } - - fn import_block( - &mut self, - block: BlockImportParams, - new_cache: HashMap>, - ) -> Result { - let hash = block.post_hash(); - let slot = find_pre_digest::(&block.header) - .expect("valid Aura headers must contain a predigest; \ - header has been already verified; qed"); - - let parent_hash = *block.header.parent_hash(); - let parent_header = self.client.header(BlockId::Hash(parent_hash)) - .map_err(|e| ConsensusError::ChainLookup(e.to_string()))? - .ok_or_else(|| ConsensusError::ChainLookup(aura_err( - Error::::ParentUnavailable(parent_hash, hash) - ).into()))?; - - let parent_slot = find_pre_digest::(&parent_header) - .expect("valid Aura headers contain a pre-digest; \ - parent header has already been verified; qed"); - - // make sure that slot number is strictly increasing - if slot <= parent_slot { - return Err( - ConsensusError::ClientImport(aura_err( - Error::::SlotMustIncrease(parent_slot, slot) - ).into()) - ); - } - - self.inner.import_block(block, new_cache).map_err(Into::into) - } -} - -/// Start an import queue for the Aura consensus algorithm. -pub fn import_queue( - slot_duration: SlotDuration, - block_import: I, - justification_import: Option>, - client: Arc, - inherent_data_providers: InherentDataProviders, - spawner: &S, - registry: Option<&Registry>, - can_author_with: CAW, -) -> Result, sp_consensus::Error> where - B: BlockT, - C::Api: BlockBuilderApi + AuraApi> + ApiExt, - C: 'static + ProvideRuntimeApi + BlockOf + ProvideCache + Send + Sync + AuxStore + HeaderBackend, - I: BlockImport> + Send + Sync + 'static, - DigestItemFor: CompatibleDigestItem, - P: Pair + Send + Sync + 'static, - P::Public: Clone + Eq + Send + Sync + Hash + Debug + Encode + Decode, - P::Signature: Encode + Decode, - S: sp_core::traits::SpawnEssentialNamed, - CAW: CanAuthorWith + Send + Sync + 'static, -{ - register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.get())?; - initialize_authorities_cache(&*client)?; - - let verifier = AuraVerifier::<_, P, _> { - client, - inherent_data_providers, - phantom: PhantomData, - can_author_with, - }; - - Ok(BasicQueue::new( - verifier, - Box::new(block_import), - justification_import, - spawner, - registry, - )) -} - #[cfg(test)] mod tests { use super::*; @@ -884,7 +489,7 @@ mod tests { use sc_client_api::BlockchainEvents; use sp_consensus_aura::sr25519::AuthorityPair; use sc_consensus_slots::{SimpleSlotWorker, BackoffAuthoringOnFinalizedHeadLagging}; - use std::{task::Poll, time::Instant}; + use std::{task::Poll, time::{Instant, Duration}}; use sc_block_builder::BlockBuilderProvider; use sp_runtime::traits::Header as _; use substrate_test_runtime_client::{TestClient, runtime::{Header, H256}}; @@ -941,7 +546,7 @@ mod tests { } impl TestNetFactory for AuraTestNet { - type Verifier = AuraVerifier; + type Verifier = import_queue::AuraVerifier; type PeerData = (); /// Create new test network with peers and given config. @@ -964,12 +569,12 @@ mod tests { ).expect("Registers aura inherent data provider"); assert_eq!(slot_duration.get(), SLOT_DURATION); - AuraVerifier { + import_queue::AuraVerifier::new( client, inherent_data_providers, - phantom: Default::default(), - can_author_with: AlwaysCanAuthor, - } + AlwaysCanAuthor, + CheckForEquivocation::Yes, + ) }, PeersClient::Light(_, _) => unreachable!("No (yet) tests for light client + Aura"), } @@ -982,14 +587,12 @@ mod tests { fn peers(&self) -> &Vec> { &self.peers } - fn mut_peers>)>(&mut self, closure: F) { closure(&mut self.peers); } } #[test] - #[allow(deprecated)] fn authoring_blocks() { sp_tracing::try_init_simple(); let net = AuraTestNet::new(3); @@ -1033,19 +636,19 @@ mod tests { &inherent_data_providers, slot_duration.get() ).expect("Registers aura inherent data provider"); - aura_futures.push(start_aura::<_, _, _, _, _, AuthorityPair, _, _, _, _>( + aura_futures.push(start_aura::(StartAuraParams { slot_duration, - client.clone(), + block_import: client.clone(), select_chain, client, - environ, - DummyOracle, + proposer_factory: environ, + sync_oracle: DummyOracle, inherent_data_providers, - false, - Some(BackoffAuthoringOnFinalizedHeadLagging::default()), + force_authoring: false, + backoff_authoring_blocks: Some(BackoffAuthoringOnFinalizedHeadLagging::default()), keystore, - sp_consensus::AlwaysCanAuthor, - ).expect("Starts aura")); + can_author_with: sp_consensus::AlwaysCanAuthor, + }).expect("Starts aura")); } futures::executor::block_on(future::select( From 5e2825b7d618e5dd11bc5ebe3697b2f22edd35c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Fri, 5 Mar 2021 14:02:16 +0100 Subject: [PATCH 2/4] Update client/consensus/aura/src/import_queue.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com> --- client/consensus/aura/src/import_queue.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/consensus/aura/src/import_queue.rs b/client/consensus/aura/src/import_queue.rs index ab9a267b1e4a3..638931477a99c 100644 --- a/client/consensus/aura/src/import_queue.rs +++ b/client/consensus/aura/src/import_queue.rs @@ -16,6 +16,8 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +//! Module implementing the logic for verifying and importing AuRa blocks. + use crate::{ AuthorityId, find_pre_digest, slot_author, aura_err, Error, AuraSlotCompatible, SlotDuration, register_aura_inherent_data_provider, authorities, From ea8ad414a21604162e990259e5f450a0efe01333 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Fri, 5 Mar 2021 16:06:41 +0100 Subject: [PATCH 3/4] Fix compilation --- bin/node-template/node/src/service.rs | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index d5c3a383190c3..36248e622c04a 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -8,7 +8,7 @@ use sc_service::{error::Error as ServiceError, Configuration, TaskManager}; use sp_inherents::InherentDataProviders; use sc_executor::native_executor_instance; pub use sc_executor::NativeExecutor; -use sp_consensus_aura::sr25519::{AuthorityPair as AuraPair}; +use sp_consensus_aura::sr25519::AuthorityPair as AuraPair; use sc_consensus_aura::{ImportQueueParams, StartAuraParams}; use sc_finality_grandpa::SharedVoterState; use sc_keystore::LocalKeystore; @@ -44,7 +44,7 @@ pub fn new_partial(config: &Configuration) -> Result(&config)?; @@ -295,15 +295,18 @@ pub fn new_light(mut config: Configuration) -> Result client.clone(), ); - let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _, _>( - sc_consensus_aura::slot_duration(&*client)?, - aura_block_import, - Some(Box::new(grandpa_block_import)), - client.clone(), - InherentDataProviders::new(), - &task_manager.spawn_essential_handle(), - config.prometheus_registry(), - sp_consensus::NeverCanAuthor, + let import_queue = sc_consensus_aura::import_queue::( + ImportQueueParams { + block_import: aura_block_import.clone(), + justification_import: Some(Box::new(grandpa_block_import.clone())), + client: client.clone(), + inherent_data_providers: InherentDataProviders::new(), + spawner: &task_manager.spawn_essential_handle(), + can_author_with: sp_consensus::NeverCanAuthor, + slot_duration: sc_consensus_aura::slot_duration(&*client)?, + registry: config.prometheus_registry(), + check_for_equivocation: Default::default(), + }, )?; let (network, network_status_sinks, system_rpc_tx, network_starter) = From d6d3ee288b73643814d811271cc39ab9e900fc7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Fri, 5 Mar 2021 16:15:09 +0100 Subject: [PATCH 4/4] AAA --- bin/node-template/node/src/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index 36248e622c04a..a5030f1b35175 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -296,7 +296,7 @@ pub fn new_light(mut config: Configuration) -> Result ); let import_queue = sc_consensus_aura::import_queue::( - ImportQueueParams { + ImportQueueParams { block_import: aura_block_import.clone(), justification_import: Some(Box::new(grandpa_block_import.clone())), client: client.clone(),