Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

consensus: handle justification sync for blocks authored locally #8698

Merged
16 commits merged into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
let slot_duration = sc_consensus_aura::slot_duration(&*client)?;
let raw_slot_duration = slot_duration.slot_duration();

let aura = sc_consensus_aura::start_aura::<AuraPair, _, _, _, _, _, _, _, _, _, _>(
let aura = sc_consensus_aura::start_aura::<AuraPair, _, _, _, _, _, _, _, _, _, _, _>(
andresilva marked this conversation as resolved.
Show resolved Hide resolved
StartAuraParams {
slot_duration,
client: client.clone(),
Expand All @@ -253,6 +253,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
keystore: keystore_container.sync_keystore(),
can_author_with,
sync_oracle: network.clone(),
justification_sync_link: network.clone(),
block_proposal_slot_portion: SlotProportion::new(2f32 / 3f32),
telemetry: telemetry.as_ref().map(|x| x.handle()),
},
Expand Down
1 change: 1 addition & 0 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ pub fn new_full_base(
env: proposer,
block_import,
sync_oracle: network.clone(),
justification_sync_link: network.clone(),
create_inherent_data_providers: move |parent, ()| {
let client_clone = client_clone.clone();
async move {
Expand Down
33 changes: 20 additions & 13 deletions client/consensus/aura/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ fn slot_author<P: Pair>(slot: Slot, authorities: &[AuthorityId<P>]) -> Option<&A
}

/// Parameters of [`start_aura`].
pub struct StartAuraParams<C, SC, I, PF, SO, BS, CAW, IDP> {
pub struct StartAuraParams<C, SC, I, PF, SO, CIDP, BS, CAW, L> {
/// The duration of a slot.
pub slot_duration: SlotDuration,
/// The client to interact with the chain.
Expand All @@ -120,7 +120,7 @@ pub struct StartAuraParams<C, SC, I, PF, SO, BS, CAW, IDP> {
/// The sync oracle that can give us the current sync status.
pub sync_oracle: SO,
/// Something that can create the inherent data providers.
pub create_inherent_data_providers: IDP,
pub create_inherent_data_providers: CIDP,
/// Should we force the authoring of blocks?
pub force_authoring: bool,
/// The backoff strategy when we miss slots.
Expand All @@ -129,6 +129,8 @@ pub struct StartAuraParams<C, SC, I, PF, SO, BS, CAW, IDP> {
pub keystore: SyncCryptoStorePtr,
/// Can we author a block with this node?
pub can_author_with: CAW,
/// Hook into the sync module to control the justification sync process.
pub justification_sync_link: L,
/// The proportion of the slot dedicated to proposing.
///
/// The block proposing will be limited to this proportion of the slot from the starting of the
Expand All @@ -140,7 +142,7 @@ pub struct StartAuraParams<C, SC, I, PF, SO, BS, CAW, IDP> {
}

/// Start the aura worker. The returned future should be run in a futures executor.
pub fn start_aura<P, B, C, SC, PF, I, SO, CAW, BS, Error, IDP>(
pub fn start_aura<P, B, C, SC, I, PF, SO, CIDP, BS, CAW, L, Error>(
StartAuraParams {
slot_duration,
client,
Expand All @@ -153,26 +155,28 @@ pub fn start_aura<P, B, C, SC, PF, I, SO, CAW, BS, Error, IDP>(
backoff_authoring_blocks,
keystore,
can_author_with,
justification_sync_link,
block_proposal_slot_portion,
telemetry,
}: StartAuraParams<C, SC, I, PF, SO, BS, CAW, IDP>,
}: StartAuraParams<C, SC, I, PF, SO, CIDP, BS, CAW, L>,
) -> Result<impl Future<Output = ()>, sp_consensus::Error> where
P: Pair + Send + Sync,
P::Public: AppPublic + Hash + Member + Encode + Decode,
P::Signature: TryFrom<Vec<u8>> + Hash + Member + Encode + Decode,
B: BlockT,
C: ProvideRuntimeApi<B> + BlockOf + ProvideCache<B> + AuxStore + HeaderBackend<B> + Send + Sync,
C::Api: AuraApi<B, AuthorityId<P>>,
SC: SelectChain<B>,
I: BlockImport<B, Transaction = sp_api::TransactionFor<C, B>> + Send + Sync + 'static,
PF: Environment<B, Error = Error> + Send + Sync + 'static,
PF::Proposer: Proposer<B, Error = Error, Transaction = sp_api::TransactionFor<C, B>>,
P: Pair + Send + Sync,
P::Public: AppPublic + Hash + Member + Encode + Decode,
P::Signature: TryFrom<Vec<u8>> + Hash + Member + Encode + Decode,
I: BlockImport<B, Transaction = sp_api::TransactionFor<C, B>> + Send + Sync + 'static,
Error: std::error::Error + Send + From<sp_consensus::Error> + 'static,
SO: SyncOracle + Send + Sync + Clone,
CAW: CanAuthorWith<B> + Send,
CIDP: CreateInherentDataProviders<B, ()> + Send,
CIDP::InherentDataProviders: InherentDataProviderExt + Send,
BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + 'static,
IDP: CreateInherentDataProviders<B, ()> + Send,
IDP::InherentDataProviders: InherentDataProviderExt + Send,
CAW: CanAuthorWith<B> + Send,
L: sp_consensus::JustificationSyncLink<B> + 'static,
Error: std::error::Error + Send + From<sp_consensus::Error> + 'static,
{
let worker = build_aura_worker::<P, _, _, _, _, _, _, _>(BuildAuraWorkerParams {
client: client.clone(),
Expand All @@ -191,6 +195,7 @@ pub fn start_aura<P, B, C, SC, PF, I, SO, CAW, BS, Error, IDP>(
select_chain,
worker,
sync_oracle,
justification_sync_link,
create_inherent_data_providers,
can_author_with,
))
Expand Down Expand Up @@ -726,7 +731,7 @@ mod tests {

let slot_duration = slot_duration(&*client).expect("slot duration available");

aura_futures.push(start_aura::<AuthorityPair, _, _, _, _, _, _, _, _, _, _>(StartAuraParams {
aura_futures.push(start_aura::<AuthorityPair, _, _, _, _, _, _, _, _, _, _, _>(StartAuraParams {
slot_duration,
block_import: client.clone(),
select_chain,
Expand All @@ -746,6 +751,7 @@ mod tests {
backoff_authoring_blocks: Some(BackoffAuthoringOnFinalizedHeadLagging::default()),
keystore,
can_author_with: sp_consensus::AlwaysCanAuthor,
justification_sync_link: (),
block_proposal_slot_portion: SlotProportion::new(0.5),
telemetry: None,
}).expect("Starts aura"));
Expand Down Expand Up @@ -873,6 +879,7 @@ mod tests {
chain_head: head,
block_size_limit: None,
},
&mut (),
)).unwrap();

// The returned block should be imported and we should be able to get its header by now.
Expand Down
40 changes: 27 additions & 13 deletions client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ impl std::ops::Deref for Config {
}

/// Parameters for BABE.
pub struct BabeParams<B: BlockT, C, E, I, SO, SC, CAW, BS, IDP> {
pub struct BabeParams<B: BlockT, C, SC, E, I, SO, CIDP, BS, CAW, L> {
/// The keystore that manages the keys of the node.
pub keystore: SyncCryptoStorePtr,

Expand All @@ -385,7 +385,7 @@ pub struct BabeParams<B: BlockT, C, E, I, SO, SC, CAW, BS, IDP> {
pub sync_oracle: SO,

/// Something that can create the inherent data providers.
pub create_inherent_data_providers: IDP,
pub create_inherent_data_providers: CIDP,

/// Force authoring of blocks even if we are offline
pub force_authoring: bool,
Expand All @@ -399,6 +399,9 @@ pub struct BabeParams<B: BlockT, C, E, I, SO, SC, CAW, BS, IDP> {
/// Checks if the current native implementation can author with a runtime at a given block.
pub can_author_with: CAW,

/// Hook into the sync module to control the justification sync process.
pub justification_sync_link: L,
andresilva marked this conversation as resolved.
Show resolved Hide resolved

/// The proportion of the slot dedicated to proposing.
///
/// The block proposing will be limited to this proportion of the slot from the starting of the
Expand All @@ -411,7 +414,7 @@ pub struct BabeParams<B: BlockT, C, E, I, SO, SC, CAW, BS, IDP> {
}

/// Start the babe worker.
pub fn start_babe<B, C, SC, E, I, SO, CAW, BS, Error, IDP>(BabeParams {
pub fn start_babe<B, C, SC, E, I, SO, CIDP, BS, CAW, L, Error>(BabeParams {
keystore,
client,
select_chain,
Expand All @@ -423,28 +426,38 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, BS, Error, IDP>(BabeParams {
backoff_authoring_blocks,
babe_link,
can_author_with,
justification_sync_link,
block_proposal_slot_portion,
telemetry,
}: BabeParams<B, C, E, I, SO, SC, CAW, BS, IDP>) -> Result<
}: BabeParams<B, C, SC, E, I, SO, CIDP, BS, CAW, L>) -> Result<
BabeWorker<B>,
sp_consensus::Error,
> where
B: BlockT,
C: ProvideRuntimeApi<B> + ProvideCache<B> + ProvideUncles<B> + BlockchainEvents<B>
+ HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>
+ Send + Sync + 'static,
C: ProvideRuntimeApi<B>
+ ProvideCache<B>
+ ProvideUncles<B>
+ BlockchainEvents<B>
+ HeaderBackend<B>
+ HeaderMetadata<B, Error = ClientError>
+ Send
+ Sync
+ 'static,
C::Api: BabeApi<B>,
SC: SelectChain<B> + 'static,
E: Environment<B, Error = Error> + Send + Sync + 'static,
E::Proposer: Proposer<B, Error = Error, Transaction = sp_api::TransactionFor<C, B>>,
I: BlockImport<B, Error = ConsensusError, Transaction = sp_api::TransactionFor<C, B>> + Send
+ Sync + 'static,
Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
I: BlockImport<B, Error = ConsensusError, Transaction = sp_api::TransactionFor<C, B>>
+ Send
+ Sync
+ 'static,
SO: SyncOracle + Send + Sync + Clone + 'static,
CAW: CanAuthorWith<B> + Send + Sync + 'static,
CIDP: CreateInherentDataProviders<B, ()> + Send + Sync + 'static,
CIDP::InherentDataProviders: InherentDataProviderExt + Send,
BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + 'static,
IDP: CreateInherentDataProviders<B, ()> + Send + Sync + 'static,
IDP::InherentDataProviders: InherentDataProviderExt + Send,
CAW: CanAuthorWith<B> + Send + Sync + 'static,
L: sp_consensus::JustificationSyncLink<B> + 'static,
Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
{
const HANDLE_BUFFER_SIZE: usize = 1024;

Expand Down Expand Up @@ -472,6 +485,7 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, BS, Error, IDP>(BabeParams {
select_chain,
worker,
sync_oracle,
justification_sync_link,
create_inherent_data_providers,
can_author_with,
);
Expand Down
5 changes: 2 additions & 3 deletions client/consensus/babe/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,7 @@ fn rejects_empty_block() {
})
}

fn run_one_test(
mutator: impl Fn(&mut TestHeader, Stage) + Send + Sync + 'static,
) {
fn run_one_test(mutator: impl Fn(&mut TestHeader, Stage) + Send + Sync + 'static) {
sp_tracing::try_init_simple();
let mutator = Arc::new(mutator) as Mutator;

Expand Down Expand Up @@ -473,6 +471,7 @@ fn run_one_test(
babe_link: data.link.clone(),
keystore,
can_author_with: sp_consensus::AlwaysCanAuthor,
justification_sync_link: (),
block_proposal_slot_portion: SlotProportion::new(0.5),
telemetry: None,
}).expect("Starts babe"));
Expand Down
11 changes: 7 additions & 4 deletions client/consensus/pow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,20 +519,21 @@ pub fn import_queue<B, Transaction, Algorithm>(
///
/// `pre_runtime` is a parameter that allows a custom additional pre-runtime digest to be inserted
/// for blocks being built. This can encode authorship information, or just be a graffiti.
pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, CAW, CIDP>(
pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, L, CIDP, CAW>(
block_import: BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>,
client: Arc<C>,
select_chain: S,
algorithm: Algorithm,
mut env: E,
mut sync_oracle: SO,
justification_sync_link: L,
pre_runtime: Option<Vec<u8>>,
create_inherent_data_providers: CIDP,
timeout: Duration,
build_time: Duration,
can_author_with: CAW,
) -> (
Arc<Mutex<MiningWorker<Block, Algorithm, C, <E::Proposer as Proposer<Block>>::Proof>>>,
Arc<Mutex<MiningWorker<Block, Algorithm, C, L, <E::Proposer as Proposer<Block>>::Proof>>>,
impl Future<Output = ()>,
) where
Block: BlockT,
Expand All @@ -544,14 +545,16 @@ pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, CAW, CIDP>(
E::Error: std::fmt::Debug,
E::Proposer: Proposer<Block, Transaction = sp_api::TransactionFor<C, Block>>,
SO: SyncOracle + Clone + Send + Sync + 'static,
CAW: CanAuthorWith<Block> + Clone + Send + 'static,
L: sp_consensus::JustificationSyncLink<Block>,
CIDP: CreateInherentDataProviders<Block, ()>,
CAW: CanAuthorWith<Block> + Clone + Send + 'static,
{
let mut timer = UntilImportedOrTimeout::new(client.import_notification_stream(), timeout);
let worker = Arc::new(Mutex::new(MiningWorker::<Block, Algorithm, C, _> {
let worker = Arc::new(Mutex::new(MiningWorker {
build: None,
algorithm: algorithm.clone(),
block_import,
justification_sync_link,
}));
let worker_ret = worker.clone();

Expand Down
19 changes: 15 additions & 4 deletions client/consensus/pow/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@

use std::{pin::Pin, time::Duration, collections::HashMap, borrow::Cow};
use sc_client_api::ImportNotifications;
use sp_runtime::{DigestItem, traits::Block as BlockT, generic::BlockId};
use sp_consensus::{Proposal, BlockOrigin, BlockImportParams, import_queue::BoxBlockImport};
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Header as HeaderT},
DigestItem,
};
use futures::{prelude::*, task::{Context, Poll}};
use futures_timer::Delay;
use log::*;
Expand Down Expand Up @@ -57,18 +61,22 @@ pub struct MiningWorker<
Block: BlockT,
Algorithm: PowAlgorithm<Block>,
C: sp_api::ProvideRuntimeApi<Block>,
Proof
L: sp_consensus::JustificationSyncLink<Block>,
Proof,
> {
pub(crate) build: Option<MiningBuild<Block, Algorithm, C, Proof>>,
pub(crate) algorithm: Algorithm,
pub(crate) block_import: BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>,
pub(crate) justification_sync_link: L,
}

impl<Block, Algorithm, C, Proof> MiningWorker<Block, Algorithm, C, Proof> where
impl<Block, Algorithm, C, L, Proof> MiningWorker<Block, Algorithm, C, L, Proof>
where
Block: BlockT,
C: sp_api::ProvideRuntimeApi<Block>,
Algorithm: PowAlgorithm<Block>,
Algorithm::Difficulty: 'static + Send,
L: sp_consensus::JustificationSyncLink<Block>,
sp_api::TransactionFor<C, Block>: Send + 'static,
{
/// Get the current best hash. `None` if the worker has just started or the client is doing
Expand Down Expand Up @@ -139,8 +147,11 @@ impl<Block, Algorithm, C, Proof> MiningWorker<Block, Algorithm, C, Proof> where
Box::new(intermediate) as Box<_>,
);

let header = import_block.post_header();
match self.block_import.import_block(import_block, HashMap::default()).await {
Ok(_) => {
Ok(res) => {
res.handle_justification(&header.hash(), *header.number(), &mut self.justification_sync_link);

info!(
target: "pow",
"✅ Successfully mined block on top of: {}",
Expand Down
Loading