Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
pow: replace the thread-base mining loop with a future-based mining w…
Browse files Browse the repository at this point in the history
…orker (#7060)

* New worker design

* Remove unused thread import

* Add back missing inherent data provider registration

* Add function to get a Cloned metadata

* Add some docs

* Derive Eq and PartialEq for MiningMetadata

* Fix cargo lock

* Fix line width

* Add docs and fix issues in UntilImportedOrTimeout

* Update client/consensus/pow/src/lib.rs

Co-authored-by: David <dvdplm@gmail.com>

* Add back comments

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
Co-authored-by: David <dvdplm@gmail.com>
  • Loading branch information
3 people authored Sep 18, 2020
1 parent a1079f4 commit 618710d
Show file tree
Hide file tree
Showing 4 changed files with 358 additions and 168 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions client/consensus/pow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ sp-consensus-pow = { version = "0.8.0-rc6", path = "../../../primitives/consensu
sp-consensus = { version = "0.8.0-rc6", path = "../../../primitives/consensus/common" }
log = "0.4.8"
futures = { version = "0.3.1", features = ["compat"] }
futures-timer = "3.0.1"
parking_lot = "0.10.0"
sp-timestamp = { version = "2.0.0-rc6", path = "../../../primitives/timestamp" }
derive_more = "0.99.2"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus", version = "0.8.0-rc6"}
309 changes: 141 additions & 168 deletions client/consensus/pow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@
//! as the storage, but it is not recommended as it won't work well with light
//! clients.

use std::sync::Arc;
use std::any::Any;
use std::borrow::Cow;
use std::thread;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::cmp::Ordering;
use sc_client_api::{BlockOf, backend::AuxStore};
mod worker;

pub use crate::worker::{MiningWorker, MiningMetadata, MiningBuild};

use std::{
sync::Arc, any::Any, borrow::Cow, collections::HashMap, marker::PhantomData,
cmp::Ordering, time::Duration,
};
use futures::{prelude::*, future::Either};
use parking_lot::Mutex;
use sc_client_api::{BlockOf, backend::AuxStore, BlockchainEvents};
use sp_blockchain::{HeaderBackend, ProvideCache, well_known_cache_keys::Id as CacheKeyId};
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use sp_runtime::{Justification, RuntimeString};
Expand All @@ -61,6 +64,8 @@ use sc_client_api;
use log::*;
use sp_timestamp::{InherentError as TIError, TimestampInherentData};

use crate::worker::UntilImportedOrTimeout;

#[derive(derive_more::Display, Debug)]
pub enum Error<B: BlockT> {
#[display(fmt = "Header uses the wrong engine {:?}", _0)]
Expand Down Expand Up @@ -193,15 +198,6 @@ pub trait PowAlgorithm<B: BlockT> {
seal: &Seal,
difficulty: Self::Difficulty,
) -> Result<bool, Error<B>>;
/// Mine a seal that satisfies the given difficulty.
fn mine(
&self,
parent: &BlockId<B>,
pre_hash: &B::Hash,
pre_digest: Option<&[u8]>,
difficulty: Self::Difficulty,
round: u32,
) -> Result<Option<Seal>, Error<B>>;
}

/// A block importer for PoW.
Expand Down Expand Up @@ -534,194 +530,171 @@ pub fn import_queue<B, Transaction, Algorithm>(
))
}

/// Start the background mining thread for PoW. Note that because PoW mining
/// is CPU-intensive, it is not possible to use an async future to define this.
/// However, it's not recommended to use background threads in the rest of the
/// codebase.
/// Start the mining worker for PoW. This function provides the necessary helper functions that can
/// be used to implement a miner. However, it does not do the CPU-intensive mining itself.
///
/// Two values are returned -- a worker, which contains functions that allows querying the current
/// mining metadata and submitting mined blocks, and a future, which must be polled to fill in
/// information in the worker.
///
/// `pre_runtime` is a parameter that allows a custom additional pre-runtime
/// digest to be inserted for blocks being built. This can encode authorship
/// information, or just be a graffiti. `round` is for number of rounds the
/// CPU miner runs each time. This parameter should be tweaked so that each
/// mining round is within sub-second time.
pub fn start_mine<B: BlockT, C, Algorithm, E, SO, S, CAW>(
mut block_import: BoxBlockImport<B, sp_api::TransactionFor<C, B>>,
/// `pre_runtime` is a parameter that allows a custom additional pre-runtime digest to be inserted
/// for blocks being built. This can encode authorship information, or just be a graffiti.
pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, CAW>(
block_import: BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>,
client: Arc<C>,
select_chain: S,
algorithm: Algorithm,
mut env: E,
pre_runtime: Option<Vec<u8>>,
round: u32,
mut sync_oracle: SO,
build_time: std::time::Duration,
select_chain: Option<S>,
pre_runtime: Option<Vec<u8>>,
inherent_data_providers: sp_inherents::InherentDataProviders,
timeout: Duration,
build_time: Duration,
can_author_with: CAW,
) where
C: HeaderBackend<B> + AuxStore + ProvideRuntimeApi<B> + 'static,
Algorithm: PowAlgorithm<B> + Send + Sync + 'static,
E: Environment<B> + Send + Sync + 'static,
) -> (Arc<Mutex<MiningWorker<Block, Algorithm, C>>>, impl Future<Output = ()>) where
Block: BlockT,
C: ProvideRuntimeApi<Block> + BlockchainEvents<Block> + 'static,
S: SelectChain<Block> + 'static,
Algorithm: PowAlgorithm<Block> + Clone,
Algorithm::Difficulty: 'static,
E: Environment<Block> + Send + Sync + 'static,
E::Error: std::fmt::Debug,
E::Proposer: Proposer<B, Transaction = sp_api::TransactionFor<C, B>>,
SO: SyncOracle + Send + Sync + 'static,
S: SelectChain<B> + 'static,
CAW: CanAuthorWith<B> + Send + 'static,
E::Proposer: Proposer<Block, Transaction = sp_api::TransactionFor<C, Block>>,
SO: SyncOracle + Clone + Send + Sync + 'static,
CAW: CanAuthorWith<Block> + Clone + Send + 'static,
{
if let Err(_) = register_pow_inherent_data_provider(&inherent_data_providers) {
warn!("Registering inherent data provider for timestamp failed");
}

thread::spawn(move || {
loop {
match mine_loop(
&mut block_import,
client.as_ref(),
&algorithm,
&mut env,
pre_runtime.as_ref(),
round,
&mut sync_oracle,
build_time.clone(),
select_chain.as_ref(),
&inherent_data_providers,
&can_author_with,
) {
Ok(()) => (),
Err(e) => error!(
"Mining block failed with {:?}. Sleep for 1 second before restarting...",
e
),
}
std::thread::sleep(std::time::Duration::new(1, 0));
}
});
}
let timer = UntilImportedOrTimeout::new(client.import_notification_stream(), timeout);
let worker = Arc::new(Mutex::new(MiningWorker::<Block, Algorithm, C> {
build: None,
algorithm: algorithm.clone(),
block_import,
}));
let worker_ret = worker.clone();

let task = timer.for_each(move |()| {
let worker = worker.clone();

fn mine_loop<B: BlockT, C, Algorithm, E, SO, S, CAW>(
block_import: &mut BoxBlockImport<B, sp_api::TransactionFor<C, B>>,
client: &C,
algorithm: &Algorithm,
env: &mut E,
pre_runtime: Option<&Vec<u8>>,
round: u32,
sync_oracle: &mut SO,
build_time: std::time::Duration,
select_chain: Option<&S>,
inherent_data_providers: &sp_inherents::InherentDataProviders,
can_author_with: &CAW,
) -> Result<(), Error<B>> where
C: HeaderBackend<B> + AuxStore + ProvideRuntimeApi<B>,
Algorithm: PowAlgorithm<B>,
Algorithm::Difficulty: 'static,
E: Environment<B>,
E::Proposer: Proposer<B, Transaction = sp_api::TransactionFor<C, B>>,
E::Error: std::fmt::Debug,
SO: SyncOracle,
S: SelectChain<B>,
sp_api::TransactionFor<C, B>: 'static,
CAW: CanAuthorWith<B>,
{
'outer: loop {
if sync_oracle.is_major_syncing() {
debug!(target: "pow", "Skipping proposal due to sync.");
std::thread::sleep(std::time::Duration::new(1, 0));
continue 'outer
worker.lock().on_major_syncing();
return Either::Left(future::ready(()))
}

let (best_hash, best_header) = match select_chain {
Some(select_chain) => {
let header = select_chain.best_chain()
.map_err(Error::BestHeaderSelectChain)?;
let hash = header.hash();
(hash, header)
},
None => {
let hash = client.info().best_hash;
let header = client.header(BlockId::Hash(hash))
.map_err(Error::BestHeader)?
.ok_or(Error::NoBestHeader)?;
(hash, header)
let best_header = match select_chain.best_chain() {
Ok(x) => x,
Err(err) => {
warn!(
target: "pow",
"Unable to pull new block for authoring. \
Select best chain error: {:?}",
err
);
return Either::Left(future::ready(()))
},
};
let best_hash = best_header.hash();

if let Err(err) = can_author_with.can_author_with(&BlockId::Hash(best_hash)) {
warn!(
target: "pow",
"Skipping proposal `can_author_with` returned: {} \
Probably a node update is required!",
Probably a node update is required!",
err,
);
std::thread::sleep(std::time::Duration::from_secs(1));
continue 'outer
return Either::Left(future::ready(()))
}

let proposer = futures::executor::block_on(env.init(&best_header))
.map_err(|e| Error::Environment(format!("{:?}", e)))?;

let inherent_data = inherent_data_providers
.create_inherent_data().map_err(Error::CreateInherents)?;
let mut inherent_digest = Digest::default();
if let Some(pre_runtime) = &pre_runtime {
inherent_digest.push(DigestItem::PreRuntime(POW_ENGINE_ID, pre_runtime.to_vec()));
if worker.lock().best_hash() == Some(best_hash) {
return Either::Left(future::ready(()))
}
let proposal = futures::executor::block_on(proposer.propose(
inherent_data,
inherent_digest,
build_time.clone(),
RecordProof::No,
)).map_err(|e| Error::BlockProposingError(format!("{:?}", e)))?;

let (header, body) = proposal.block.deconstruct();
let (difficulty, seal) = {
let difficulty = algorithm.difficulty(best_hash)?;

loop {
let seal = algorithm.mine(
&BlockId::Hash(best_hash),
&header.hash(),
pre_runtime.map(|v| &v[..]),
difficulty,
round,
)?;

if let Some(seal) = seal {
break (difficulty, seal)
}

if best_hash != client.info().best_hash {
continue 'outer
}
}
// The worker is locked for the duration of the whole proposing period. Within this period,
// the mining target is outdated and useless anyway.

let difficulty = match algorithm.difficulty(best_hash) {
Ok(x) => x,
Err(err) => {
warn!(
target: "pow",
"Unable to propose new block for authoring. \
Fetch difficulty failed: {:?}",
err,
);
return Either::Left(future::ready(()))
},
};

log::info!("✅ Successfully mined block: {}", best_hash);

let (hash, seal) = {
let seal = DigestItem::Seal(POW_ENGINE_ID, seal);
let mut header = header.clone();
header.digest_mut().push(seal);
let hash = header.hash();
let seal = header.digest_mut().pop()
.expect("Pushed one seal above; length greater than zero; qed");
(hash, seal)
let awaiting_proposer = env.init(&best_header);
let inherent_data = match inherent_data_providers.create_inherent_data() {
Ok(x) => x,
Err(err) => {
warn!(
target: "pow",
"Unable to propose new block for authoring. \
Creating inherent data failed: {:?}",
err,
);
return Either::Left(future::ready(()))
},
};
let mut inherent_digest = Digest::<Block::Hash>::default();
if let Some(pre_runtime) = &pre_runtime {
inherent_digest.push(DigestItem::PreRuntime(POW_ENGINE_ID, pre_runtime.to_vec()));
}

let intermediate = PowIntermediate::<Algorithm::Difficulty> {
difficulty: Some(difficulty),
};
let pre_runtime = pre_runtime.clone();

Either::Right(async move {
let proposer = match awaiting_proposer.await {
Ok(x) => x,
Err(err) => {
warn!(
target: "pow",
"Unable to propose new block for authoring. \
Creating proposer failed: {:?}",
err,
);
return
},
};

let proposal = match proposer.propose(
inherent_data,
inherent_digest,
build_time.clone(),
RecordProof::No,
).await {
Ok(x) => x,
Err(err) => {
warn!(
target: "pow",
"Unable to propose new block for authoring. \
Creating proposal failed: {:?}",
err,
);
return
},
};

let build = MiningBuild::<Block, Algorithm, C> {
metadata: MiningMetadata {
best_hash,
pre_hash: proposal.block.header().hash(),
pre_runtime: pre_runtime.clone(),
difficulty,
},
proposal,
};

let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
import_block.post_digests.push(seal);
import_block.body = Some(body);
import_block.storage_changes = Some(proposal.storage_changes);
import_block.intermediates.insert(
Cow::from(INTERMEDIATE_KEY),
Box::new(intermediate) as Box<dyn Any>
);
import_block.post_hash = Some(hash);
worker.lock().on_build(build);
})
});

block_import.import_block(import_block, HashMap::default())
.map_err(|e| Error::BlockBuiltError(best_hash, e))?;
}
(worker_ret, task)
}

/// Find PoW pre-runtime.
Expand Down
Loading

0 comments on commit 618710d

Please sign in to comment.