Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
pow: add Version for quick-check of metadata state and refactor lock …
Browse files Browse the repository at this point in the history
…handling (#9698)

* pow: add Version for quick-check of metadata state and refactor lock handling

* typo: mut self -> self

* Run rustfmt

* typo: grammar
  • Loading branch information
sorpaas authored Oct 12, 2021
1 parent 66ba332 commit e308d0f
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 76 deletions.
18 changes: 6 additions & 12 deletions client/consensus/pow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,12 @@
mod worker;

pub use crate::worker::{MiningBuild, MiningMetadata, MiningWorker};
pub use crate::worker::{MiningBuild, MiningHandle, MiningMetadata};

use crate::worker::UntilImportedOrTimeout;
use codec::{Decode, Encode};
use futures::{Future, StreamExt};
use log::*;
use parking_lot::Mutex;
use prometheus_endpoint::Registry;
use sc_client_api::{self, backend::AuxStore, BlockOf, BlockchainEvents};
use sc_consensus::{
Expand Down Expand Up @@ -525,7 +524,7 @@ pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, L, CIDP, CAW>(
build_time: Duration,
can_author_with: CAW,
) -> (
Arc<Mutex<MiningWorker<Block, Algorithm, C, L, <E::Proposer as Proposer<Block>>::Proof>>>,
MiningHandle<Block, Algorithm, C, L, <E::Proposer as Proposer<Block>>::Proof>,
impl Future<Output = ()>,
)
where
Expand All @@ -543,12 +542,7 @@ where
CAW: CanAuthorWith<Block> + Clone + Send + 'static,
{
let mut timer = UntilImportedOrTimeout::new(client.import_notification_stream(), timeout);
let worker = Arc::new(Mutex::new(MiningWorker {
build: None,
algorithm: algorithm.clone(),
block_import,
justification_sync_link,
}));
let worker = MiningHandle::new(algorithm.clone(), block_import, justification_sync_link);
let worker_ret = worker.clone();

let task = async move {
Expand All @@ -559,7 +553,7 @@ where

if sync_oracle.is_major_syncing() {
debug!(target: "pow", "Skipping proposal due to sync.");
worker.lock().on_major_syncing();
worker.on_major_syncing();
continue
}

Expand Down Expand Up @@ -587,7 +581,7 @@ where
continue
}

if worker.lock().best_hash() == Some(best_hash) {
if worker.best_hash() == Some(best_hash) {
continue
}

Expand Down Expand Up @@ -682,7 +676,7 @@ where
proposal,
};

worker.lock().on_build(build);
worker.on_build(build);
}
};

Expand Down
210 changes: 146 additions & 64 deletions client/consensus/pow/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use futures::{
};
use futures_timer::Delay;
use log::*;
use parking_lot::Mutex;
use sc_client_api::ImportNotifications;
use sc_consensus::{BlockImportParams, BoxBlockImport, StateAction, StorageChanges};
use sp_consensus::{BlockOrigin, Proposal};
Expand All @@ -30,7 +31,16 @@ use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT},
DigestItem,
};
use std::{borrow::Cow, collections::HashMap, pin::Pin, time::Duration};
use std::{
borrow::Cow,
collections::HashMap,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};

use crate::{PowAlgorithm, PowIntermediate, Seal, INTERMEDIATE_KEY, POW_ENGINE_ID};

Expand Down Expand Up @@ -60,21 +70,26 @@ pub struct MiningBuild<
pub proposal: Proposal<Block, sp_api::TransactionFor<C, Block>, Proof>,
}

/// Version of the mining worker.
#[derive(Eq, PartialEq, Clone, Copy)]
pub struct Version(usize);

/// Mining worker that exposes structs to query the current mining build and submit mined blocks.
pub struct MiningWorker<
pub struct MiningHandle<
Block: BlockT,
Algorithm: PowAlgorithm<Block>,
C: sp_api::ProvideRuntimeApi<Block>,
L: sc_consensus::JustificationSyncLink<Block>,
Proof,
> {
pub(crate) build: Option<MiningBuild<Block, Algorithm, C, Proof>>,
pub(crate) algorithm: Algorithm,
pub(crate) block_import: BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>,
pub(crate) justification_sync_link: L,
version: Arc<AtomicUsize>,
algorithm: Arc<Algorithm>,
justification_sync_link: Arc<L>,
build: Arc<Mutex<Option<MiningBuild<Block, Algorithm, C, Proof>>>>,
block_import: Arc<Mutex<BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>>>,
}

impl<Block, Algorithm, C, L, Proof> MiningWorker<Block, Algorithm, C, L, Proof>
impl<Block, Algorithm, C, L, Proof> MiningHandle<Block, Algorithm, C, L, Proof>
where
Block: BlockT,
C: sp_api::ProvideRuntimeApi<Block>,
Expand All @@ -83,35 +98,65 @@ where
L: sc_consensus::JustificationSyncLink<Block>,
sp_api::TransactionFor<C, Block>: Send + 'static,
{
/// Get the current best hash. `None` if the worker has just started or the client is doing
/// major syncing.
pub fn best_hash(&self) -> Option<Block::Hash> {
self.build.as_ref().map(|b| b.metadata.best_hash)
fn increment_version(&self) {
self.version.fetch_add(1, Ordering::SeqCst);
}

pub(crate) fn on_major_syncing(&mut self) {
self.build = None;
pub(crate) fn new(
algorithm: Algorithm,
block_import: BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>,
justification_sync_link: L,
) -> Self {
Self {
version: Arc::new(AtomicUsize::new(0)),
algorithm: Arc::new(algorithm),
justification_sync_link: Arc::new(justification_sync_link),
build: Arc::new(Mutex::new(None)),
block_import: Arc::new(Mutex::new(block_import)),
}
}

pub(crate) fn on_build(&mut self, build: MiningBuild<Block, Algorithm, C, Proof>) {
self.build = Some(build);
pub(crate) fn on_major_syncing(&self) {
let mut build = self.build.lock();
*build = None;
self.increment_version();
}

pub(crate) fn on_build(&self, value: MiningBuild<Block, Algorithm, C, Proof>) {
let mut build = self.build.lock();
*build = Some(value);
self.increment_version();
}

/// Get the version of the mining worker.
///
/// This returns type `Version` which can only compare equality. If `Version` is unchanged, then
/// it can be certain that `best_hash` and `metadata` were not changed.
pub fn version(&self) -> Version {
Version(self.version.load(Ordering::SeqCst))
}

/// Get the current best hash. `None` if the worker has just started or the client is doing
/// major syncing.
pub fn best_hash(&self) -> Option<Block::Hash> {
self.build.lock().as_ref().map(|b| b.metadata.best_hash)
}

/// Get a copy of the current mining metadata, if available.
pub fn metadata(&self) -> Option<MiningMetadata<Block::Hash, Algorithm::Difficulty>> {
self.build.as_ref().map(|b| b.metadata.clone())
self.build.lock().as_ref().map(|b| b.metadata.clone())
}

/// Submit a mined seal. The seal will be validated again. Returns true if the submission is
/// successful.
pub async fn submit(&mut self, seal: Seal) -> bool {
if let Some(build) = self.build.take() {
pub async fn submit(&self, seal: Seal) -> bool {
if let Some(metadata) = self.metadata() {
match self.algorithm.verify(
&BlockId::Hash(build.metadata.best_hash),
&build.metadata.pre_hash,
build.metadata.pre_runtime.as_ref().map(|v| &v[..]),
&BlockId::Hash(metadata.best_hash),
&metadata.pre_hash,
metadata.pre_runtime.as_ref().map(|v| &v[..]),
&seal,
build.metadata.difficulty,
metadata.difficulty,
) {
Ok(true) => (),
Ok(false) => {
Expand All @@ -130,55 +175,92 @@ where
return false
},
}
} else {
warn!(
target: "pow",
"Unable to import mined block: metadata does not exist",
);
return false
}

let seal = DigestItem::Seal(POW_ENGINE_ID, seal);
let (header, body) = build.proposal.block.deconstruct();

let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
import_block.post_digests.push(seal);
import_block.body = Some(body);
import_block.state_action =
StateAction::ApplyChanges(StorageChanges::Changes(build.proposal.storage_changes));

let intermediate = PowIntermediate::<Algorithm::Difficulty> {
difficulty: Some(build.metadata.difficulty),
};

import_block
.intermediates
.insert(Cow::from(INTERMEDIATE_KEY), Box::new(intermediate) as Box<_>);

let header = import_block.post_header();
match self.block_import.import_block(import_block, HashMap::default()).await {
Ok(res) => {
res.handle_justification(
&header.hash(),
*header.number(),
&mut self.justification_sync_link,
);

info!(
target: "pow",
"✅ Successfully mined block on top of: {}",
build.metadata.best_hash
);
true
},
Err(err) => {
warn!(
target: "pow",
"Unable to import mined block: {:?}",
err,
);
false
},
let build = if let Some(build) = {
let mut build = self.build.lock();
let value = build.take();
if value.is_some() {
self.increment_version();
}
value
} {
build
} else {
warn!(
target: "pow",
"Unable to import mined block: build does not exist",
);
false
return false
};

let seal = DigestItem::Seal(POW_ENGINE_ID, seal);
let (header, body) = build.proposal.block.deconstruct();

let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
import_block.post_digests.push(seal);
import_block.body = Some(body);
import_block.state_action =
StateAction::ApplyChanges(StorageChanges::Changes(build.proposal.storage_changes));

let intermediate = PowIntermediate::<Algorithm::Difficulty> {
difficulty: Some(build.metadata.difficulty),
};

import_block
.intermediates
.insert(Cow::from(INTERMEDIATE_KEY), Box::new(intermediate) as Box<_>);

let header = import_block.post_header();
let mut block_import = self.block_import.lock();

match block_import.import_block(import_block, HashMap::default()).await {
Ok(res) => {
res.handle_justification(
&header.hash(),
*header.number(),
&self.justification_sync_link,
);

info!(
target: "pow",
"✅ Successfully mined block on top of: {}",
build.metadata.best_hash
);
true
},
Err(err) => {
warn!(
target: "pow",
"Unable to import mined block: {:?}",
err,
);
false
},
}
}
}

impl<Block, Algorithm, C, L, Proof> Clone for MiningHandle<Block, Algorithm, C, L, Proof>
where
Block: BlockT,
Algorithm: PowAlgorithm<Block>,
C: sp_api::ProvideRuntimeApi<Block>,
L: sc_consensus::JustificationSyncLink<Block>,
{
fn clone(&self) -> Self {
Self {
version: self.version.clone(),
algorithm: self.algorithm.clone(),
justification_sync_link: self.justification_sync_link.clone(),
build: self.build.clone(),
block_import: self.block_import.clone(),
}
}
}
Expand Down

0 comments on commit e308d0f

Please sign in to comment.