Skip to content

Commit

Permalink
create parallel tasks extension (paritytech#5249)
Browse files Browse the repository at this point in the history
  • Loading branch information
NikVolf authored and General-Beck committed Mar 17, 2020
1 parent 6bccb5d commit 704335a
Show file tree
Hide file tree
Showing 25 changed files with 189 additions and 26 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/node/testing/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ impl BenchDb {
None,
None,
ExecutionExtensions::new(profile.into_execution_strategies(), None),
sp_core::tasks::executor(),
None,
).expect("Should not fail");

Expand Down
2 changes: 1 addition & 1 deletion client/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub use light::*;
pub use notifications::*;
pub use proof_provider::*;

pub use sp_state_machine::{StorageProof, ExecutionStrategy};
pub use sp_state_machine::{StorageProof, ExecutionStrategy, CloneableSpawn};

/// Utility methods for the client.
pub mod utils {
Expand Down
5 changes: 3 additions & 2 deletions client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use std::io;
use std::collections::HashMap;

use sc_client_api::{
ForkBlocks, UsageInfo, MemoryInfo, BadBlocks, IoInfo, MemorySize,
ForkBlocks, UsageInfo, MemoryInfo, BadBlocks, IoInfo, MemorySize, CloneableSpawn,
execution_extensions::ExecutionExtensions,
backend::{NewBlockState, PrunableStateChangesTrieStorage},
};
Expand Down Expand Up @@ -292,6 +292,7 @@ pub fn new_client<E, Block, RA>(
fork_blocks: ForkBlocks<Block>,
bad_blocks: BadBlocks<Block>,
execution_extensions: ExecutionExtensions<Block>,
spawn_handle: Box<dyn CloneableSpawn>,
prometheus_registry: Option<Registry>,
) -> Result<(
sc_client::Client<
Expand All @@ -309,7 +310,7 @@ pub fn new_client<E, Block, RA>(
E: CodeExecutor + RuntimeInfo,
{
let backend = Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?);
let executor = sc_client::LocalCallExecutor::new(backend.clone(), executor);
let executor = sc_client::LocalCallExecutor::new(backend.clone(), executor, spawn_handle);
Ok((
sc_client::Client::new(
backend.clone(),
Expand Down
3 changes: 3 additions & 0 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ fn new_full_parts<TBl, TRtApi, TExecDisp>(
fork_blocks,
bad_blocks,
extensions,
Box::new(tasks_builder.spawn_handle()),
config.prometheus_config.as_ref().map(|config| config.registry.clone()),
)?
};
Expand Down Expand Up @@ -366,6 +367,7 @@ impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> {
sc_client::light::new_fetch_checker::<_, TBl, _>(
light_blockchain.clone(),
executor.clone(),
Box::new(tasks_builder.spawn_handle()),
),
);
let fetcher = Arc::new(sc_network::config::OnDemand::new(fetch_checker));
Expand All @@ -375,6 +377,7 @@ impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> {
backend.clone(),
config.expect_chain_spec().as_storage_builder(),
executor,
Box::new(tasks_builder.spawn_handle()),
config.prometheus_config.as_ref().map(|config| config.registry.clone()),
)?);

Expand Down
7 changes: 7 additions & 0 deletions client/service/src/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use futures::{
compat::*,
task::{Spawn, FutureObj, SpawnError},
};
use sc_client_api::CloneableSpawn;

/// Type alias for service task executor (usually runtime).
pub type ServiceTaskExecutor = Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>;
Expand Down Expand Up @@ -118,6 +119,12 @@ impl Spawn for SpawnTaskHandle {
}
}

impl sc_client_api::CloneableSpawn for SpawnTaskHandle {
fn clone(&self) -> Box<dyn CloneableSpawn> {
Box::new(Clone::clone(self))
}
}

type Boxed01Future01 = Box<dyn futures01::Future<Item = (), Error = ()> + Send + 'static>;

impl futures01::future::Executor<Boxed01Future01> for SpawnTaskHandle {
Expand Down
10 changes: 9 additions & 1 deletion client/src/call_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,27 @@ use sc_executor::{RuntimeVersion, RuntimeInfo, NativeVersion};
use sp_externalities::Extensions;
use sp_core::{NativeOrEncoded, NeverNativeValue, traits::CodeExecutor};
use sp_api::{ProofRecorder, InitializeBlock, StorageTransactionCache};
use sc_client_api::{backend, call_executor::CallExecutor};
use sc_client_api::{backend, call_executor::CallExecutor, CloneableSpawn};

/// Call executor that executes methods locally, querying all required
/// data from local backend.
pub struct LocalCallExecutor<B, E> {
backend: Arc<B>,
executor: E,
spawn_handle: Box<dyn CloneableSpawn>,
}

impl<B, E> LocalCallExecutor<B, E> {
/// Creates new instance of local call executor.
pub fn new(
backend: Arc<B>,
executor: E,
spawn_handle: Box<dyn CloneableSpawn>,
) -> Self {
LocalCallExecutor {
backend,
executor,
spawn_handle,
}
}
}
Expand All @@ -54,6 +57,7 @@ impl<B, E> Clone for LocalCallExecutor<B, E> where E: Clone {
LocalCallExecutor {
backend: self.backend.clone(),
executor: self.executor.clone(),
spawn_handle: self.spawn_handle.clone(),
}
}
}
Expand Down Expand Up @@ -91,6 +95,7 @@ where
call_data,
extensions.unwrap_or_default(),
&state_runtime_code.runtime_code()?,
self.spawn_handle.clone(),
).execute_using_consensus_failure_handler::<_, NeverNativeValue, fn() -> _>(
strategy.get_manager(),
None,
Expand Down Expand Up @@ -164,6 +169,7 @@ where
call_data,
extensions.unwrap_or_default(),
&runtime_code,
self.spawn_handle.clone(),
)
// TODO: https://github.com/paritytech/substrate/issues/4455
// .with_storage_transaction_cache(storage_transaction_cache.as_mut().map(|c| &mut **c))
Expand All @@ -180,6 +186,7 @@ where
call_data,
extensions.unwrap_or_default(),
&state_runtime_code.runtime_code()?,
self.spawn_handle.clone(),
)
.with_storage_transaction_cache(storage_transaction_cache.as_mut().map(|c| &mut **c))
.execute_using_consensus_failure_handler(execution_manager, native_call)
Expand Down Expand Up @@ -218,6 +225,7 @@ where
trie_state,
overlay,
&self.executor,
self.spawn_handle.clone(),
method,
call_data,
&sp_state_machine::backend::BackendRuntimeCode::new(trie_state).runtime_code()?,
Expand Down
17 changes: 13 additions & 4 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub use sc_client_api::{
},
execution_extensions::{ExecutionExtensions, ExecutionStrategies},
notifications::{StorageNotifications, StorageEventStream},
CallExecutor, ExecutorProvider, ProofProvider,
CallExecutor, ExecutorProvider, ProofProvider, CloneableSpawn,
};
use sp_blockchain::Error;
use prometheus_endpoint::Registry;
Expand Down Expand Up @@ -135,6 +135,7 @@ pub fn new_in_mem<E, Block, S, RA>(
genesis_storage: &S,
keystore: Option<sp_core::traits::BareCryptoStorePtr>,
prometheus_registry: Option<Registry>,
spawn_handle: Box<dyn CloneableSpawn>,
) -> sp_blockchain::Result<Client<
in_mem::Backend<Block>,
LocalCallExecutor<in_mem::Backend<Block>, E>,
Expand All @@ -145,7 +146,7 @@ pub fn new_in_mem<E, Block, S, RA>(
S: BuildStorage,
Block: BlockT,
{
new_with_backend(Arc::new(in_mem::Backend::new()), executor, genesis_storage, keystore, prometheus_registry)
new_with_backend(Arc::new(in_mem::Backend::new()), executor, genesis_storage, keystore, spawn_handle, prometheus_registry)
}

/// Create a client with the explicitly provided backend.
Expand All @@ -155,6 +156,7 @@ pub fn new_with_backend<B, E, Block, S, RA>(
executor: E,
build_genesis_storage: &S,
keystore: Option<sp_core::traits::BareCryptoStorePtr>,
spawn_handle: Box<dyn CloneableSpawn>,
prometheus_registry: Option<Registry>,
) -> sp_blockchain::Result<Client<B, LocalCallExecutor<B, E>, Block, RA>>
where
Expand All @@ -163,7 +165,7 @@ pub fn new_with_backend<B, E, Block, S, RA>(
Block: BlockT,
B: backend::LocalBackend<Block> + 'static,
{
let call_executor = LocalCallExecutor::new(backend.clone(), executor);
let call_executor = LocalCallExecutor::new(backend.clone(), executor, spawn_handle);
let extensions = ExecutionExtensions::new(Default::default(), keystore);
Client::new(
backend,
Expand Down Expand Up @@ -1124,7 +1126,13 @@ impl<B, E, Block, RA> ProofProvider<Block> for Client<B, E, Block, RA> where

let state = self.state_at(id)?;
let header = self.prepare_environment_block(id)?;
prove_execution(state, header, &self.executor, method, call_data).map(|(r, p)| {
prove_execution(
state,
header,
&self.executor,
method,
call_data,
).map(|(r, p)| {
(r, StorageProof::merge(vec![p, code_proof]))
})
}
Expand Down Expand Up @@ -3482,6 +3490,7 @@ pub(crate) mod tests {
&substrate_test_runtime_client::GenesisParameters::default().genesis_storage(),
None,
None,
sp_core::tasks::executor(),
)
.unwrap();

Expand Down
7 changes: 7 additions & 0 deletions client/src/genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ mod tests {
AccountKeyring, Sr25519Keyring,
};
use sp_runtime::traits::BlakeTwo256;
use sp_core::tasks::executor as tasks_executor;
use hex_literal::*;

native_executor_instance!(
Expand Down Expand Up @@ -101,6 +102,7 @@ mod tests {
&header.encode(),
Default::default(),
&runtime_code,
tasks_executor(),
).execute(
ExecutionStrategy::NativeElseWasm,
).unwrap();
Expand All @@ -115,6 +117,7 @@ mod tests {
&tx.encode(),
Default::default(),
&runtime_code,
tasks_executor(),
).execute(
ExecutionStrategy::NativeElseWasm,
).unwrap();
Expand All @@ -129,6 +132,7 @@ mod tests {
&[],
Default::default(),
&runtime_code,
tasks_executor(),
).execute(
ExecutionStrategy::NativeElseWasm,
).unwrap();
Expand Down Expand Up @@ -179,6 +183,7 @@ mod tests {
&b1data,
Default::default(),
&runtime_code,
tasks_executor(),
).execute(
ExecutionStrategy::NativeElseWasm,
).unwrap();
Expand Down Expand Up @@ -210,6 +215,7 @@ mod tests {
&b1data,
Default::default(),
&runtime_code,
tasks_executor(),
).execute(
ExecutionStrategy::AlwaysWasm,
).unwrap();
Expand Down Expand Up @@ -241,6 +247,7 @@ mod tests {
&b1data,
Default::default(),
&runtime_code,
tasks_executor(),
).execute(
ExecutionStrategy::NativeElseWasm,
);
Expand Down
1 change: 1 addition & 0 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
//! LocalCallExecutor::new(
//! backend.clone(),
//! NativeExecutor::<LocalExecutor>::new(WasmExecutionMethod::Interpreted, None, 8),
//! sp_core::tasks::executor(),
//! ),
//! // This parameter provides the storage for the chain genesis.
//! &<Storage>::default(),
Expand Down
11 changes: 9 additions & 2 deletions client/src/light/call_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use sp_runtime::{
use sp_externalities::Extensions;
use sp_state_machine::{
self, Backend as StateBackend, OverlayedChanges, ExecutionStrategy, create_proof_check_backend,
execution_proof_check_on_trie_backend, ExecutionManager, StorageProof,
execution_proof_check_on_trie_backend, ExecutionManager, StorageProof, CloneableSpawn,
};
use hash_db::Hasher;

Expand Down Expand Up @@ -216,6 +216,7 @@ pub fn prove_execution<Block, S, E>(
/// Proof should include both environment preparation proof and method execution proof.
pub fn check_execution_proof<Header, E, H>(
executor: &E,
spawn_handle: Box<dyn CloneableSpawn>,
request: &RemoteCallRequest<Header>,
remote_proof: StorageProof,
) -> ClientResult<Vec<u8>>
Expand All @@ -227,6 +228,7 @@ pub fn check_execution_proof<Header, E, H>(
{
check_execution_proof_with_make_header::<Header, E, H, _>(
executor,
spawn_handle,
request,
remote_proof,
|header| <Header as HeaderT>::new(
Expand All @@ -241,6 +243,7 @@ pub fn check_execution_proof<Header, E, H>(

fn check_execution_proof_with_make_header<Header, E, H, MakeNextHeader: Fn(&Header) -> Header>(
executor: &E,
spawn_handle: Box<dyn CloneableSpawn>,
request: &RemoteCallRequest<Header>,
remote_proof: StorageProof,
make_next_header: MakeNextHeader,
Expand All @@ -267,6 +270,7 @@ fn check_execution_proof_with_make_header<Header, E, H, MakeNextHeader: Fn(&Head
&trie_backend,
&mut changes,
executor,
spawn_handle.clone(),
"Core_initialize_block",
&next_header.encode(),
&runtime_code,
Expand All @@ -277,6 +281,7 @@ fn check_execution_proof_with_make_header<Header, E, H, MakeNextHeader: Fn(&Head
&trie_backend,
&mut changes,
executor,
spawn_handle,
&request.method,
&request.call_data,
&runtime_code,
Expand All @@ -292,7 +297,7 @@ mod tests {
runtime::{Header, Digest, Block}, TestClient, ClientBlockImportExt,
};
use sc_executor::{NativeExecutor, WasmExecutionMethod};
use sp_core::H256;
use sp_core::{H256, tasks::executor as tasks_executor};
use sc_client_api::backend::{Backend, NewBlockState};
use crate::in_mem::Backend as InMemBackend;
use sc_client_api::ProofProvider;
Expand Down Expand Up @@ -387,6 +392,7 @@ mod tests {
// check remote execution proof locally
let local_result = check_execution_proof::<_, _, BlakeTwo256>(
&local_executor(),
tasks_executor(),
&RemoteCallRequest {
block: substrate_test_runtime_client::runtime::Hash::default(),
header: remote_header,
Expand Down Expand Up @@ -414,6 +420,7 @@ mod tests {
// check remote execution proof locally
let execution_result = check_execution_proof_with_make_header::<_, _, BlakeTwo256, _>(
&local_executor(),
tasks_executor(),
&RemoteCallRequest {
block: substrate_test_runtime_client::runtime::Hash::default(),
header: remote_header,
Expand Down
Loading

0 comments on commit 704335a

Please sign in to comment.