diff --git a/Cargo.lock b/Cargo.lock index 82f43ac5d54f5..086df23fd2a57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3025,7 +3025,6 @@ dependencies = [ "ctrlc 3.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "frame-support 2.0.0", "frame-system 2.0.0", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "hex-literal 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 14.0.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3077,7 +3076,7 @@ dependencies = [ "structopt 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-build-script-utils 2.0.0", "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", "vergen 3.0.4 (registry+https://github.com/rust-lang/crates.io-index)", "wasm-bindgen 0.2.57 (registry+https://github.com/rust-lang/crates.io-index)", "wasm-bindgen-futures 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3217,7 +3216,6 @@ name = "node-template" version = "2.0.0" dependencies = [ "ctrlc 3.1.3 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "node-template-runtime 2.0.0", @@ -3241,7 +3239,7 @@ dependencies = [ "sp-runtime 2.0.0", "sp-transaction-pool 2.0.0", "substrate-build-script-utils 2.0.0", - "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", "trie-root 0.15.2 (registry+https://github.com/rust-lang/crates.io-index)", "vergen 3.0.4 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -5738,6 +5736,7 @@ dependencies = [ "exit-future 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-timer 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "grafana-data-source 2.0.0", "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -5775,9 +5774,8 @@ dependencies = [ "substrate-test-runtime-client 2.0.0", "sysinfo 0.9.6 (registry+https://github.com/rust-lang/crates.io-index)", "target_info 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-executor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-timer 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", "tracing 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -7230,6 +7228,7 @@ dependencies = [ "bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "memchr 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.11.1 (registry+https://github.com/rust-lang/crates.io-index)", "pin-project-lite 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/bin/node-template/Cargo.toml b/bin/node-template/Cargo.toml index aaaae647cf564..0333e887ec4b8 100644 --- a/bin/node-template/Cargo.toml +++ b/bin/node-template/Cargo.toml @@ -11,10 +11,9 @@ path = "src/main.rs" [dependencies] futures = "0.3.1" -futures01 = { package = "futures", version = "0.1.29" } ctrlc = { version = "3.1.3", features = ["termination"] } log = "0.4.8" -tokio = "0.1.22" +tokio = { version = "0.2", features = ["rt-threaded"] } parking_lot = "0.9.0" codec = { package = "parity-scale-codec", version = "1.0.0" } trie-root = "0.15.2" diff --git a/bin/node-template/src/cli.rs b/bin/node-template/src/cli.rs index 16638c4af955d..44764e5c9db41 100644 --- a/bin/node-template/src/cli.rs +++ b/bin/node-template/src/cli.rs @@ -1,5 +1,5 @@ use crate::service; -use futures::{future::{select, Map}, FutureExt, TryFutureExt, channel::oneshot, compat::Future01CompatExt}; +use futures::{future::{select, Map, Either}, FutureExt, channel::oneshot}; use std::cell::RefCell; use tokio::runtime::Runtime; pub use sc_cli::{VersionInfo, IntoExit, error}; @@ -75,36 +75,23 @@ where let informant = informant::build(&service); - let future = select(exit, informant) - .map(|_| Ok(())) - .compat(); - - runtime.executor().spawn(future); + let handle = runtime.spawn(select(exit, informant)); // we eagerly drop the service so that the internal exit future is fired, // but we need to keep holding a reference to the global telemetry guard let _telemetry = service.telemetry(); - let service_res = { - let exit = e.into_exit(); - let service = service - .map_err(|err| error::Error::Service(err)) - .compat(); - let select = select(service, exit) - .map(|_| Ok(())) - .compat(); - runtime.block_on(select) - }; + let exit = e.into_exit(); + let service_res = runtime.block_on(select(service, exit)); let _ = exit_send.send(()); - // TODO [andre]: timeout this future #1318 - - use futures01::Future; + runtime.block_on(handle); - let _ = runtime.shutdown_on_idle().wait(); - - service_res + match service_res { + Either::Left((res, _)) => res.map_err(error::Error::Service), + Either::Right((_, _)) => Ok(()) + } } // handles ctrl-c diff --git a/bin/node-template/src/service.rs b/bin/node-template/src/service.rs index 92db95b5c7d89..7f6ff67d282d1 100644 --- a/bin/node-template/src/service.rs +++ b/bin/node-template/src/service.rs @@ -12,6 +12,7 @@ pub use sc_executor::NativeExecutor; use sp_consensus_aura::sr25519::{AuthorityPair as AuraPair}; use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider}; use sc_basic_authority; +use futures::{FutureExt, compat::Future01CompatExt}; // Our native executor instance. native_executor_instance!( @@ -163,7 +164,7 @@ pub fn new_full(config: Configuration { // start the full GRANDPA voter @@ -180,7 +181,7 @@ pub fn new_full(config: Configuration { grandpa::setup_disabled_grandpa( diff --git a/bin/node/cli/Cargo.toml b/bin/node/cli/Cargo.toml index 1f7c90bc3a6c5..ec54f5b374873 100644 --- a/bin/node/cli/Cargo.toml +++ b/bin/node/cli/Cargo.toml @@ -25,7 +25,6 @@ crate-type = ["cdylib", "rlib"] # third-party dependencies codec = { package = "parity-scale-codec", version = "1.0.6" } serde = { version = "1.0.102", features = ["derive"] } -futures01 = { package = "futures", version = "0.1.29" } futures = { version = "0.3.1", features = ["compat"] } hex-literal = "0.2.1" jsonrpc-core = "14.0.3" @@ -81,7 +80,7 @@ node-primitives = { version = "2.0.0", path = "../primitives" } node-executor = { version = "2.0.0", path = "../executor" } # CLI-specific dependencies -tokio = { version = "0.1.22", optional = true } +tokio = { version = "0.2", features = ["rt-threaded"], optional = true } sc-cli = { version = "2.0.0", optional = true, path = "../../../client/cli" } ctrlc = { version = "3.1.3", features = ["termination"], optional = true } node-transaction-factory = { version = "2.0.0", optional = true, path = "../transaction-factory" } diff --git a/bin/node/cli/src/cli.rs b/bin/node/cli/src/cli.rs index 8fb95bed687bb..7a4321802cf36 100644 --- a/bin/node/cli/src/cli.rs +++ b/bin/node/cli/src/cli.rs @@ -15,7 +15,6 @@ // along with Substrate. If not, see . pub use sc_cli::VersionInfo; -use tokio::prelude::Future; use tokio::runtime::{Builder as RuntimeBuilder, Runtime}; use sc_cli::{IntoExit, NoCustom, SharedParams, ImportParams, error}; use sc_service::{AbstractService, Roles as ServiceRoles, Configuration}; @@ -25,6 +24,7 @@ use sc_cli::{display_role, parse_and_prepare, GetSharedParams, ParseAndPrepare}; use crate::{service, ChainSpec, load_spec}; use crate::factory_impl::FactoryState; use node_transaction_factory::RuntimeAdapter; +use futures::{channel::oneshot, future::{select, Either}}; /// Custom subcommands. #[derive(Clone, Debug, StructOpt)] @@ -105,7 +105,10 @@ pub fn run(args: I, exit: E, version: sc_cli::VersionInfo) -> error::Re info!("Chain specification: {}", config.chain_spec.name()); info!("Node name: {}", config.name); info!("Roles: {}", display_role(&config)); - let runtime = RuntimeBuilder::new().name_prefix("main-tokio-").build() + let runtime = RuntimeBuilder::new() + .thread_name("main-tokio-") + .threaded_scheduler() + .build() .map_err(|e| format!("{:?}", e))?; match config.roles { ServiceRoles::LIGHT => run_until_exit( @@ -172,37 +175,25 @@ where T: AbstractService, E: IntoExit, { - use futures::{FutureExt, TryFutureExt, channel::oneshot, future::select, compat::Future01CompatExt}; - let (exit_send, exit) = oneshot::channel(); let informant = sc_cli::informant::build(&service); - let future = select(informant, exit) - .map(|_| Ok(())) - .compat(); - - runtime.executor().spawn(future); + let handle = runtime.spawn(select(exit, informant)); // we eagerly drop the service so that the internal exit future is fired, // but we need to keep holding a reference to the global telemetry guard let _telemetry = service.telemetry(); - let service_res = { - let exit = e.into_exit(); - let service = service - .map_err(|err| error::Error::Service(err)) - .compat(); - let select = select(service, exit) - .map(|_| Ok(())) - .compat(); - runtime.block_on(select) - }; + let exit = e.into_exit(); + let service_res = runtime.block_on(select(service, exit)); let _ = exit_send.send(()); - // TODO [andre]: timeout this future #1318 - let _ = runtime.shutdown_on_idle().wait(); + runtime.block_on(handle); - service_res + match service_res { + Either::Left((res, _)) => res.map_err(error::Error::Service), + Either::Right((_, _)) => Ok(()) + } } diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index 97ecb7a38f2f7..2f53fb7637d64 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -113,8 +113,8 @@ macro_rules! new_full_start { macro_rules! new_full { ($config:expr, $with_startup_data: expr) => {{ use futures::{ - stream::StreamExt, - future::{FutureExt, TryFutureExt}, + prelude::*, + compat::Future01CompatExt }; use sc_network::Event; @@ -191,9 +191,8 @@ macro_rules! new_full { service.keystore(), dht_event_stream, ); - let future01_authority_discovery = authority_discovery.map(|x| Ok(x)).compat(); - service.spawn_task(future01_authority_discovery); + service.spawn_task(authority_discovery); } // if the node isn't actively participating in consensus then it doesn't @@ -223,7 +222,7 @@ macro_rules! new_full { service.network(), service.on_exit(), service.spawn_task_handle(), - )?); + )?.compat().map(drop)); }, (true, false) => { // start the full GRANDPA voter @@ -239,7 +238,9 @@ macro_rules! new_full { }; // the GRANDPA voter task is considered infallible, i.e. // if it fails we take down the service with it. - service.spawn_essential_task(grandpa::run_grandpa_voter(grandpa_config)?); + service.spawn_essential_task( + grandpa::run_grandpa_voter(grandpa_config)?.compat().map(drop) + ); }, (_, true) => { grandpa::setup_disabled_grandpa( diff --git a/client/cli/Cargo.toml b/client/cli/Cargo.toml index 460cc2a05a32e..b653f982e6694 100644 --- a/client/cli/Cargo.toml +++ b/client/cli/Cargo.toml @@ -16,8 +16,8 @@ time = "0.1.42" ansi_term = "0.12.1" lazy_static = "1.4.0" app_dirs = "1.2.1" -tokio = "0.2.1" -futures = { version = "0.3.1", features = ["compat"] } +tokio = "0.2" +futures = "0.3.1" fdlimit = "0.1.1" serde_json = "1.0.41" sp-panic-handler = { version = "2.0.0", path = "../../primitives/panic-handler" } diff --git a/client/cli/src/informant.rs b/client/cli/src/informant.rs index de7c376f09fee..312e4017d5ff0 100644 --- a/client/cli/src/informant.rs +++ b/client/cli/src/informant.rs @@ -17,7 +17,7 @@ //! Console informant. Prints sync progress and block events. Runs on the calling thread. use sc_client_api::BlockchainEvents; -use futures::{StreamExt, TryStreamExt, FutureExt, future, compat::Stream01CompatExt}; +use futures::prelude::*; use log::{info, warn, trace}; use sp_runtime::traits::Header; use sc_service::AbstractService; @@ -33,8 +33,7 @@ pub fn build(service: &impl AbstractService) -> impl futures::Future impl futures::Future ParseAndPrepareExport<'a> { }); let mut export_fut = builder(config)? - .export_blocks(file, from.into(), to, json) - .compat(); + .export_blocks(file, from.into(), to, json); let fut = futures::future::poll_fn(|cx| { if exit_recv.try_recv().is_ok() { return Poll::Ready(Ok(())); @@ -485,8 +484,7 @@ impl<'a> ParseAndPrepareImport<'a> { }); let mut import_fut = builder(config)? - .import_blocks(file, false) - .compat(); + .import_blocks(file, false); let fut = futures::future::poll_fn(|cx| { if exit_recv.try_recv().is_ok() { return Poll::Ready(Ok(())); @@ -537,8 +535,7 @@ impl<'a> CheckBlock<'a> { let start = std::time::Instant::now(); let check = builder(config)? - .check_block(block_id) - .compat(); + .check_block(block_id); let mut runtime = tokio::runtime::Runtime::new().unwrap(); runtime.block_on(check)?; println!("Completed in {} ms.", start.elapsed().as_millis()); diff --git a/client/consensus/aura/Cargo.toml b/client/consensus/aura/Cargo.toml index ddae989b4170f..bcbf622cd3abc 100644 --- a/client/consensus/aura/Cargo.toml +++ b/client/consensus/aura/Cargo.toml @@ -14,8 +14,7 @@ sc-client-api = { version = "2.0.0", path = "../../api" } codec = { package = "parity-scale-codec", version = "1.0.0" } sp-consensus = { version = "0.8", path = "../../../primitives/consensus/common" } derive_more = "0.99.2" -futures = { version = "0.3.1", features = ["compat"] } -futures01 = { package = "futures", version = "0.1" } +futures = "0.3.1" futures-timer = "0.4.0" sp-inherents = { version = "2.0.0", path = "../../../primitives/inherents" } sc-keystore = { version = "2.0.0", path = "../../keystore" } @@ -41,3 +40,4 @@ substrate-test-runtime-client = { version = "2.0.0", path = "../../../test-utils tokio = "0.1.22" env_logger = "0.7.0" tempfile = "3.1.0" +futures01 = { package = "futures", version = "0.1" } diff --git a/client/consensus/aura/src/lib.rs b/client/consensus/aura/src/lib.rs index 13a4c5a777144..c49837972134e 100644 --- a/client/consensus/aura/src/lib.rs +++ b/client/consensus/aura/src/lib.rs @@ -153,7 +153,7 @@ pub fn start_aura( force_authoring: bool, keystore: KeyStorePtr, can_author_with: CAW, -) -> Result, sp_consensus::Error> where +) -> Result, sp_consensus::Error> where B: BlockT, C: ProvideRuntimeApi + BlockOf + ProvideCache + AuxStore + Send + Sync, C::Api: AuraApi>, @@ -189,7 +189,7 @@ pub fn start_aura( inherent_data_providers, AuraSlotCompatible, can_author_with, - ).map(|()| Ok::<(), ()>(())).compat()) + )) } struct AuraWorker { @@ -1019,7 +1019,10 @@ mod tests { false, keystore, sp_consensus::AlwaysCanAuthor, - ).expect("Starts aura"); + ) + .expect("Starts aura") + .unit_error() + .compat(); runtime.spawn(aura); } @@ -1030,7 +1033,7 @@ mod tests { })); runtime.block_on(future::join_all(import_notifications) - .map(|_| Ok::<(), ()>(())).compat()).unwrap(); + .unit_error().compat()).unwrap(); } #[test] diff --git a/client/consensus/babe/Cargo.toml b/client/consensus/babe/Cargo.toml index eb2875b3d37ff..53f60c76f909a 100644 --- a/client/consensus/babe/Cargo.toml +++ b/client/consensus/babe/Cargo.toml @@ -29,8 +29,7 @@ sc-consensus-uncles = { version = "0.8", path = "../uncles" } sc-consensus-slots = { version = "0.8", path = "../slots" } sp-runtime = { version = "2.0.0", path = "../../../primitives/runtime" } fork-tree = { version = "2.0.0", path = "../../../utils/fork-tree" } -futures = { version = "0.3.1", features = ["compat"] } -futures01 = { package = "futures", version = "0.1" } +futures = "0.3.1" futures-timer = "0.4.0" parking_lot = "0.9.0" log = "0.4.8" @@ -51,6 +50,7 @@ sc-block-builder = { version = "2.0.0", path = "../../block-builder" } tokio = "0.1.22" env_logger = "0.7.0" tempfile = "3.1.0" +futures01 = { package = "futures", version = "0.1" } [features] test-helpers = [] diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs index bbf19b706601f..4eb1e3915b674 100644 --- a/client/consensus/babe/src/lib.rs +++ b/client/consensus/babe/src/lib.rs @@ -289,7 +289,7 @@ pub fn start_babe(BabeParams { babe_link, can_author_with, }: BabeParams) -> Result< - impl futures01::Future, + impl futures::Future, sp_consensus::Error, > where B: BlockT, @@ -325,7 +325,7 @@ pub fn start_babe(BabeParams { )?; babe_info!("Starting BABE Authorship worker"); - let slot_worker = sc_consensus_slots::start_slot_worker( + Ok(sc_consensus_slots::start_slot_worker( config.0, select_chain, worker, @@ -333,9 +333,7 @@ pub fn start_babe(BabeParams { inherent_data_providers, babe_link.time_source, can_author_with, - ); - - Ok(slot_worker.map(|_| Ok::<(), ()>(())).compat()) + )) } struct BabeWorker { diff --git a/client/consensus/babe/src/tests.rs b/client/consensus/babe/src/tests.rs index 2ddb67fe4796e..305c89398269e 100644 --- a/client/consensus/babe/src/tests.rs +++ b/client/consensus/babe/src/tests.rs @@ -419,7 +419,7 @@ fn run_one_test( babe_link: data.link.clone(), keystore, can_author_with: sp_consensus::AlwaysCanAuthor, - }).expect("Starts babe")); + }).expect("Starts babe").unit_error().compat()); } runtime.spawn(futures01::future::poll_fn(move || { @@ -428,7 +428,7 @@ fn run_one_test( })); runtime.block_on(future::join_all(import_notifications) - .map(|_| Ok::<(), ()>(())).compat()).unwrap(); + .unit_error().compat()).unwrap(); } #[test] diff --git a/client/finality-grandpa/src/lib.rs b/client/finality-grandpa/src/lib.rs index 809e0ab88a3a3..dbecd9c9a4b7f 100644 --- a/client/finality-grandpa/src/lib.rs +++ b/client/finality-grandpa/src/lib.rs @@ -53,6 +53,7 @@ //! included in the newly-finalized chain. use futures::prelude::*; +use futures03::{StreamExt, future::ready}; use log::{debug, error, info}; use futures::sync::mpsc; use sc_client_api::{BlockchainEvents, CallExecutor, backend::{AuxStore, Backend}, ExecutionStrategy}; @@ -535,7 +536,7 @@ pub struct GrandpaParams { /// Handle to a future that will resolve on exit. pub on_exit: X, /// If supplied, can be used to hook on telemetry connection established events. - pub telemetry_on_connect: Option>, + pub telemetry_on_connect: Option>, /// A voting rule used to potentially restrict target votes. pub voting_rule: VR, /// How to spawn background tasks. @@ -608,9 +609,10 @@ pub fn run_grandpa_voter( .expect("authorities is always at least an empty vector; elements are always of type string") } ); - Ok(()) + ready(()) }) - .then(|_| -> Result<(), ()> { Ok(()) }); + .unit_error() + .compat(); futures::future::Either::A(events) } else { futures::future::Either::B(futures::future::empty()) diff --git a/client/service/Cargo.toml b/client/service/Cargo.toml index ce27b0995c5ef..c417479511330 100644 --- a/client/service/Cargo.toml +++ b/client/service/Cargo.toml @@ -15,14 +15,14 @@ wasmtime = [ [dependencies] derive_more = "0.99.2" -futures = "0.1.29" -futures03 = { package = "futures", version = "0.3.1", features = ["compat"] } +futures01 = { package = "futures", version = "0.1.29" } +futures = "0.3.1" parking_lot = "0.9.0" lazy_static = "1.4.0" log = "0.4.8" slog = { version = "2.5.2", features = ["nested-values"] } tokio-executor = "0.1.8" -tokio-timer = "0.2.11" +futures-timer = "2" exit-future = "0.2.0" serde = "1.0.101" serde_json = "1.0.41" @@ -60,4 +60,4 @@ substrate-test-runtime-client = { version = "2.0.0", path = "../../test-utils/ru sp-consensus-babe = { version = "0.8", path = "../../primitives/consensus/babe" } grandpa = { version = "2.0.0", package = "sc-finality-grandpa", path = "../finality-grandpa" } grandpa-primitives = { version = "2.0.0", package = "sp-finality-grandpa", path = "../../primitives/finality-grandpa" } -tokio = "0.1" +tokio = { version = "0.2", features = ["rt-core"] } diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 0160da9bbed8d..85642b3e41102 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -27,12 +27,10 @@ use sc_client_api::{ use sc_client::Client; use sc_chain_spec::{RuntimeGenesis, Extension}; use sp_consensus::import_queue::ImportQueue; -use futures::{prelude::*, sync::mpsc}; -use futures03::{ - compat::Compat, - FutureExt as _, TryFutureExt as _, - StreamExt as _, TryStreamExt as _, - future::{select, Either} +use futures::{ + Future, FutureExt, StreamExt, + channel::mpsc, + future::{select, ready} }; use sc_keystore::{Store as Keystore}; use log::{info, warn, error}; @@ -47,7 +45,7 @@ use sp_api::ProvideRuntimeApi; use sc_executor::{NativeExecutor, NativeExecutionDispatch}; use std::{ io::{Read, Write, Seek}, - marker::PhantomData, sync::Arc, time::SystemTime + marker::PhantomData, sync::Arc, time::SystemTime, pin::Pin }; use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; use sc_telemetry::{telemetry, SUBSTRATE_INFO}; @@ -682,7 +680,7 @@ pub trait ServiceBuilderCommand { self, input: impl Read + Seek + Send + 'static, force: bool, - ) -> Box + Send>; + ) -> Pin> + Send>>; /// Performs the blocks export. fn export_blocks( @@ -691,7 +689,7 @@ pub trait ServiceBuilderCommand { from: NumberFor, to: Option>, json: bool - ) -> Box>; + ) -> Pin>>>; /// Performs a revert of `blocks` blocks. fn revert_chain( @@ -703,7 +701,7 @@ pub trait ServiceBuilderCommand { fn check_block( self, block: BlockId - ) -> Box + Send>; + ) -> Pin> + Send>>; } impl @@ -795,7 +793,7 @@ ServiceBuilder< // List of asynchronous tasks to spawn. We collect them, then spawn them all at once. let (to_spawn_tx, to_spawn_rx) = - mpsc::unbounded:: + Send>>(); + mpsc::unbounded:: + Send>>>(); // A side-channel for essential tasks to communicate shutdown. let (essential_failed_tx, essential_failed_rx) = mpsc::unbounded(); @@ -879,7 +877,6 @@ ServiceBuilder< let is_validator = config.roles.is_authority(); let events = client.import_notification_stream() - .map(|v| Ok::<_, ()>(v)).compat() .for_each(move |notification| { let txpool = txpool.upgrade(); @@ -887,8 +884,8 @@ ServiceBuilder< let future = txpool.maintain( &BlockId::hash(notification.hash), ¬ification.retracted, - ).map(|_| Ok(())).compat(); - let _ = to_spawn_tx_.unbounded_send(Box::new(future)); + ); + let _ = to_spawn_tx_.unbounded_send(Box::pin(future)); } let offchain = offchain.as_ref().and_then(|o| o.upgrade()); @@ -897,15 +894,13 @@ ServiceBuilder< ¬ification.header, network_state_info.clone(), is_validator - ).map(|()| Ok(())); - let _ = to_spawn_tx_.unbounded_send(Box::new(Compat::new(future))); + ); + let _ = to_spawn_tx_.unbounded_send(Box::pin(future)); } - Ok(()) - }) - .select(exit.clone().map(Ok).compat()) - .then(|_| Ok(())); - let _ = to_spawn_tx.unbounded_send(Box::new(events)); + ready(()) + }); + let _ = to_spawn_tx.unbounded_send(Box::pin(select(events, exit.clone()).map(drop))); } { @@ -913,7 +908,6 @@ ServiceBuilder< let network = Arc::downgrade(&network); let transaction_pool_ = transaction_pool.clone(); let events = transaction_pool.import_notification_stream() - .map(|v| Ok::<_, ()>(v)).compat() .for_each(move |_| { if let Some(network) = network.upgrade() { network.trigger_repropagate(); @@ -923,12 +917,10 @@ ServiceBuilder< "ready" => status.ready, "future" => status.future ); - Ok(()) - }) - .select(exit.clone().map(Ok).compat()) - .then(|_| Ok(())); + ready(()) + }); - let _ = to_spawn_tx.unbounded_send(Box::new(events)); + let _ = to_spawn_tx.unbounded_send(Box::pin(select(events, exit.clone()).map(drop))); } // Periodically notify the telemetry. @@ -990,9 +982,9 @@ ServiceBuilder< "disk_write_per_sec" => info.usage.as_ref().map(|usage| usage.io.bytes_written).unwrap_or(0), ); - Ok(()) - }).select(exit.clone().map(Ok).compat()).then(|_| Ok(())); - let _ = to_spawn_tx.unbounded_send(Box::new(tel_task)); + ready(()) + }); + let _ = to_spawn_tx.unbounded_send(Box::pin(select(tel_task, exit.clone()).map(drop))); // Periodically send the network state to the telemetry. let (netstat_tx, netstat_rx) = mpsc::unbounded::<(NetworkStatus<_>, NetworkState)>(); @@ -1003,12 +995,12 @@ ServiceBuilder< "system.network_state"; "state" => network_state, ); - Ok(()) - }).select(exit.clone().map(Ok).compat()).then(|_| Ok(())); - let _ = to_spawn_tx.unbounded_send(Box::new(tel_task_2)); + ready(()) + }); + let _ = to_spawn_tx.unbounded_send(Box::pin(select(tel_task_2, exit.clone()).map(drop))); // RPC - let (system_rpc_tx, system_rpc_rx) = futures03::channel::mpsc::unbounded(); + let (system_rpc_tx, system_rpc_rx) = mpsc::unbounded(); let gen_handler = || { use sc_rpc::{chain, state, author, system}; @@ -1068,17 +1060,14 @@ ServiceBuilder< let rpc = start_rpc_servers(&config, gen_handler)?; - let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future( + let _ = to_spawn_tx.unbounded_send(Box::pin(select(build_network_future( config.roles, network_mut, client.clone(), network_status_sinks.clone(), system_rpc_rx, has_bootnodes, - ) - .map_err(|_| ()) - .select(exit.clone().map(Ok).compat()) - .then(|_| Ok(())))); + ), exit.clone()).map(drop))); let telemetry_connection_sinks: Arc>>> = Default::default(); @@ -1099,8 +1088,6 @@ ServiceBuilder< .map(|dur| dur.as_millis()) .unwrap_or(0); let future = telemetry.clone() - .map(|ev| Ok::<_, ()>(ev)) - .compat() .for_each(move |event| { // Safe-guard in case we add more events in the future. let sc_telemetry::TelemetryEvent::Connected = event; @@ -1119,11 +1106,11 @@ ServiceBuilder< telemetry_connection_sinks_.lock().retain(|sink| { sink.unbounded_send(()).is_ok() }); - Ok(()) + ready(()) }); - let _ = to_spawn_tx.unbounded_send(Box::new(future - .select(exit.clone().map(Ok).compat()) - .then(|_| Ok(())))); + let _ = to_spawn_tx.unbounded_send(Box::pin(select( + future, exit.clone() + ).map(drop))); telemetry }); @@ -1132,13 +1119,10 @@ ServiceBuilder< let future = select( grafana_data_source::run_server(port).boxed(), exit.clone() - ).map(|either| match either { - Either::Left((result, _)) => result.map_err(|_| ()), - Either::Right(_) => Ok(()) - }).compat(); + ).map(drop); - let _ = to_spawn_tx.unbounded_send(Box::new(future)); - } + let _ = to_spawn_tx.unbounded_send(Box::pin(future)); + } // Instrumentation if let Some(tracing_targets) = config.tracing_targets.as_ref() { diff --git a/client/service/src/chain_ops.rs b/client/service/src/chain_ops.rs index 0b86fb366f0b6..0c2fe79718a72 100644 --- a/client/service/src/chain_ops.rs +++ b/client/service/src/chain_ops.rs @@ -22,9 +22,6 @@ use crate::error::Error; use sc_chain_spec::{ChainSpec, RuntimeGenesis, Extension}; use log::{warn, info}; use futures::{future, prelude::*}; -use futures03::{ - TryFutureExt as _, -}; use sp_runtime::traits::{ Block as BlockT, NumberFor, One, Zero, Header, SaturatedConversion }; @@ -34,9 +31,7 @@ use sc_client::Client; use sp_consensus::import_queue::{IncomingBlock, Link, BlockImportError, BlockImportResult, ImportQueue}; use sp_consensus::BlockOrigin; -use std::{ - io::{Read, Write, Seek}, -}; +use std::{io::{Read, Write, Seek}, pin::Pin}; use sc_network::message; @@ -68,7 +63,7 @@ impl< self, input: impl Read + Seek + Send + 'static, force: bool, - ) -> Box + Send> { + ) -> Pin> + Send>> { struct WaitLink { imported_blocks: u64, has_error: bool, @@ -117,7 +112,7 @@ impl< // queue, the `Future` re-schedules itself and returns `Poll::Pending`. // This makes it possible either to interleave other operations in-between the block imports, // or to stop the operation completely. - let import = futures03::future::poll_fn(move |cx| { + let import = future::poll_fn(move |cx| { // Start by reading the number of blocks if not done so already. let count = match count { Some(c) => c, @@ -205,7 +200,7 @@ impl< return std::task::Poll::Pending; } }); - Box::new(import.compat()) + Box::pin(import) } fn export_blocks( @@ -214,7 +209,7 @@ impl< from: NumberFor, to: Option>, json: bool - ) -> Box> { + ) -> Pin>>> { let client = self.client; let mut block = from; @@ -233,7 +228,7 @@ impl< // `Poll::Pending`. // This makes it possible either to interleave other operations in-between the block exports, // or to stop the operation completely. - let export = futures03::future::poll_fn(move |cx| { + let export = future::poll_fn(move |cx| { if last < block { return std::task::Poll::Ready(Err("Invalid block range specified".into())); } @@ -274,7 +269,7 @@ impl< std::task::Poll::Pending }); - Box::new(export.compat()) + Box::pin(export) } fn revert_chain( @@ -295,7 +290,7 @@ impl< fn check_block( self, block_id: BlockId - ) -> Box + Send> { + ) -> Pin> + Send>> { match self.client.block(&block_id) { Ok(Some(block)) => { let mut buf = Vec::new(); @@ -304,8 +299,8 @@ impl< let reader = std::io::Cursor::new(buf); self.import_blocks(reader, true) } - Ok(None) => Box::new(future::err("Unknown block".into())), - Err(e) => Box::new(future::err(format!("Error reading block: {:?}", e).into())), + Ok(None) => Box::pin(future::err("Unknown block".into())), + Err(e) => Box::pin(future::err(format!("Error reading block: {:?}", e).into())), } } } diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 7a3c6fc9eaab3..87327d0967583 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -32,15 +32,17 @@ use std::marker::PhantomData; use std::net::SocketAddr; use std::collections::HashMap; use std::time::{Duration, Instant}; -use futures::sync::mpsc; +use std::task::{Poll, Context}; use parking_lot::Mutex; use sc_client::Client; use exit_future::Signal; -use futures::prelude::*; -use futures03::{ - future::{ready, FutureExt as _, TryFutureExt as _}, - stream::{StreamExt as _, TryStreamExt as _}, +use futures::{ + Future, FutureExt, Stream, StreamExt, TryFutureExt, + future::select, channel::mpsc, + compat::*, + sink::SinkExt, + task::{Spawn, SpawnExt, FutureObj, SpawnError}, }; use sc_network::{ NetworkService, NetworkState, specialization::NetworkSpecialization, @@ -67,8 +69,6 @@ pub use sc_rpc::Metadata as RpcMetadata; pub use std::{ops::Deref, result::Result, sync::Arc}; #[doc(hidden)] pub use sc_network::{FinalityProofProvider, OnDemand, config::BoxFinalityProofRequestBuilder}; -#[doc(hidden)] -pub use futures::future::Executor; const DEFAULT_PROTOCOL_ID: &str = "sup"; @@ -92,13 +92,13 @@ pub struct Service { /// A receiver for spawned essential-tasks concluding. essential_failed_rx: mpsc::UnboundedReceiver<()>, /// Sender for futures that must be spawned as background tasks. - to_spawn_tx: mpsc::UnboundedSender + Send>>, + to_spawn_tx: mpsc::UnboundedSender + Send>>>, /// Receiver for futures that must be spawned as background tasks. - to_spawn_rx: mpsc::UnboundedReceiver + Send>>, + to_spawn_rx: mpsc::UnboundedReceiver + Send>>>, /// List of futures to poll from `poll`. /// If spawning a background task is not possible, we instead push the task into this `Vec`. /// The elements must then be polled manually. - to_poll: Vec + Send>>, + to_poll: Vec + Send>>>, rpc_handlers: sc_rpc_server::RpcHandler, _rpc: Box, _telemetry: Option, @@ -109,42 +109,36 @@ pub struct Service { } /// Alias for a an implementation of `futures::future::Executor`. -pub type TaskExecutor = Arc + Send>> + Send + Sync>; +pub type TaskExecutor = Arc; /// An handle for spawning tasks in the service. #[derive(Clone)] pub struct SpawnTaskHandle { - sender: mpsc::UnboundedSender + Send>>, + sender: mpsc::UnboundedSender + Send>>>, on_exit: exit_future::Exit, } -impl Executor + Send>> for SpawnTaskHandle { - fn execute( - &self, - future: Box + Send>, - ) -> Result<(), futures::future::ExecuteError + Send>>> { - let exit = self.on_exit.clone().map(Ok).compat(); - let future = Box::new(future.select(exit).then(|_| Ok(()))); - if let Err(err) = self.sender.unbounded_send(future) { - let kind = futures::future::ExecuteErrorKind::Shutdown; - Err(futures::future::ExecuteError::new(kind, err.into_inner())) - } else { - Ok(()) - } +impl Spawn for SpawnTaskHandle { + fn spawn_obj(&self, future: FutureObj<'static, ()>) + -> Result<(), SpawnError> { + let future = select(self.on_exit.clone(), future).map(drop); + self.sender.unbounded_send(Box::pin(future)) + .map_err(|_| SpawnError::shutdown()) } } -impl futures03::task::Spawn for SpawnTaskHandle { - fn spawn_obj(&self, future: futures03::task::FutureObj<'static, ()>) - -> Result<(), futures03::task::SpawnError> { - self.execute(Box::new(futures03::compat::Compat::new(future.unit_error()))) - .map_err(|_| futures03::task::SpawnError::shutdown()) +type Boxed01Future01 = Box + Send + 'static>; + +impl futures01::future::Executor for SpawnTaskHandle { + fn execute(&self, future: Boxed01Future01) -> Result<(), futures01::future::ExecuteError>{ + self.spawn(future.compat().map(drop)); + Ok(()) } } /// Abstraction over a Substrate service. -pub trait AbstractService: 'static + Future + - Executor + Send>> + Send { +pub trait AbstractService: 'static + Future> + + Spawn + Send + Unpin { /// Type of block of this chain. type Block: BlockT; /// Backend storage for the client. @@ -168,12 +162,12 @@ pub trait AbstractService: 'static + Future + fn telemetry(&self) -> Option; /// Spawns a task in the background that runs the future passed as parameter. - fn spawn_task(&self, task: impl Future + Send + 'static); + fn spawn_task(&self, task: impl Future + Send + Unpin + 'static); /// Spawns a task in the background that runs the future passed as /// parameter. The given task is considered essential, i.e. if it errors we /// trigger a service exit. - fn spawn_essential_task(&self, task: impl Future + Send + 'static); + fn spawn_essential_task(&self, task: impl Future + Send + Unpin + 'static); /// Returns a handle for spawning tasks. fn spawn_task_handle(&self) -> SpawnTaskHandle; @@ -190,7 +184,7 @@ pub trait AbstractService: 'static + Future + /// /// If the request subscribes you to events, the `Sender` in the `RpcSession` object is used to /// send back spontaneous events. - fn rpc_query(&self, mem: &RpcSession, request: &str) -> Box, Error = ()> + Send>; + fn rpc_query(&self, mem: &RpcSession, request: &str) -> Pin> + Send>>; /// Get shared client instance. fn client(&self) -> Arc>; @@ -216,11 +210,11 @@ impl AbstractService Service, TSc, NetworkStatus, NetworkService, TExPool, TOc> where - TBl: BlockT, + TBl: BlockT + Unpin, TBackend: 'static + sc_client_api::backend::Backend, TExec: 'static + sc_client::CallExecutor + Send + Sync + Clone, TRtApi: 'static + Send + Sync, - TSc: sp_consensus::SelectChain + 'static + Clone + Send, + TSc: sp_consensus::SelectChain + 'static + Clone + Send + Unpin, TExPool: 'static + TransactionPool + TransactionPoolMaintainer, TOc: 'static + Send + Sync, @@ -248,25 +242,22 @@ where self.keystore.clone() } - fn spawn_task(&self, task: impl Future + Send + 'static) { - let exit = self.on_exit().map(Ok).compat(); - let task = task.select(exit).then(|_| Ok(())); - let _ = self.to_spawn_tx.unbounded_send(Box::new(task)); + fn spawn_task(&self, task: impl Future + Send + Unpin + 'static) { + let task = select(self.on_exit(), task).map(drop); + let _ = self.to_spawn_tx.unbounded_send(Box::pin(task)); } - fn spawn_essential_task(&self, task: impl Future + Send + 'static) { - let essential_failed = self.essential_failed_tx.clone(); + fn spawn_essential_task(&self, task: impl Future + Send + Unpin + 'static) { + let mut essential_failed = self.essential_failed_tx.clone(); let essential_task = std::panic::AssertUnwindSafe(task) .catch_unwind() - .then(move |_| { + .map(move |_| { error!("Essential task failed. Shutting down service."); let _ = essential_failed.send(()); - Ok(()) }); - let exit = self.on_exit().map(Ok::<_, ()>).compat(); - let task = essential_task.select(exit).then(|_| Ok(())); + let task = select(self.on_exit(), essential_task).map(drop); - let _ = self.to_spawn_tx.unbounded_send(Box::new(task)); + let _ = self.to_spawn_tx.unbounded_send(Box::pin(task)); } fn spawn_task_handle(&self) -> SpawnTaskHandle { @@ -276,8 +267,12 @@ where } } - fn rpc_query(&self, mem: &RpcSession, request: &str) -> Box, Error = ()> + Send> { - Box::new(self.rpc_handlers.handle_request(request, mem.metadata.clone())) + fn rpc_query(&self, mem: &RpcSession, request: &str) -> Pin> + Send>> { + Box::pin( + self.rpc_handlers.handle_request(request, mem.metadata.clone()) + .compat() + .map(|res| res.expect("this should never fail")) + ) } fn client(&self) -> Arc> { @@ -309,57 +304,56 @@ where } } -impl Future for +impl Future for Service { - type Item = (); - type Error = Error; + type Output = Result<(), Error>; - fn poll(&mut self) -> Poll { - match self.essential_failed_rx.poll() { - Ok(Async::NotReady) => {}, - Ok(Async::Ready(_)) | Err(_) => { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = Pin::into_inner(self); + + match Pin::new(&mut this.essential_failed_rx).poll_next(cx) { + Poll::Pending => {}, + Poll::Ready(_) => { // Ready(None) should not be possible since we hold a live // sender. - return Err(Error::Other("Essential task failed.".into())); + return Poll::Ready(Err(Error::Other("Essential task failed.".into()))); } } - while let Ok(Async::Ready(Some(task_to_spawn))) = self.to_spawn_rx.poll() { + while let Poll::Ready(Some(task_to_spawn)) = Pin::new(&mut this.to_spawn_rx).poll_next(cx) { + // TODO: Update to tokio 0.2 when libp2p get switched to std futures (#4383) let executor = tokio_executor::DefaultExecutor::current(); - if let Err(err) = executor.execute(task_to_spawn) { + use futures01::future::Executor; + if let Err(err) = executor.execute(task_to_spawn.unit_error().compat()) { debug!( target: "service", "Failed to spawn background task: {:?}; falling back to manual polling", err ); - self.to_poll.push(err.into_future()); + this.to_poll.push(Box::pin(err.into_future().compat().map(drop))); } } // Polling all the `to_poll` futures. - while let Some(pos) = self.to_poll.iter_mut().position(|t| t.poll().map(|t| t.is_ready()).unwrap_or(true)) { - let _ = self.to_poll.remove(pos); + while let Some(pos) = this.to_poll.iter_mut().position(|t| Pin::new(t).poll(cx).is_ready()) { + let _ = this.to_poll.remove(pos); } // The service future never ends. - Ok(Async::NotReady) + Poll::Pending } } -impl Executor + Send>> for +impl Spawn for Service { - fn execute( + fn spawn_obj( &self, - future: Box + Send> - ) -> Result<(), futures::future::ExecuteError + Send>>> { - if let Err(err) = self.to_spawn_tx.unbounded_send(future) { - let kind = futures::future::ExecuteErrorKind::Shutdown; - Err(futures::future::ExecuteError::new(kind, err.into_inner())) - } else { - Ok(()) - } + future: FutureObj<'static, ()> + ) -> Result<(), SpawnError> { + self.to_spawn_tx.unbounded_send(Box::pin(future)) + .map_err(|_| SpawnError::shutdown()) } } @@ -376,29 +370,23 @@ fn build_network_future< mut network: sc_network::NetworkWorker, client: Arc, status_sinks: Arc, NetworkState)>>>, - rpc_rx: futures03::channel::mpsc::UnboundedReceiver>, + mut rpc_rx: mpsc::UnboundedReceiver>, should_have_peers: bool, -) -> impl Future { - // Compatibility shim while we're transitioning to stable Futures. - // See https://github.com/paritytech/substrate/issues/3099 - let mut rpc_rx = futures03::compat::Compat::new(rpc_rx.map(|v| Ok::<_, ()>(v))); +) -> impl Future { + let mut imported_blocks_stream = client.import_notification_stream().fuse(); + let mut finality_notification_stream = client.finality_notification_stream().fuse(); - let mut imported_blocks_stream = client.import_notification_stream().fuse() - .map(|v| Ok::<_, ()>(v)).compat(); - let mut finality_notification_stream = client.finality_notification_stream().fuse() - .map(|v| Ok::<_, ()>(v)).compat(); - - futures::future::poll_fn(move || { + futures::future::poll_fn(move |cx| { let before_polling = Instant::now(); // We poll `imported_blocks_stream`. - while let Ok(Async::Ready(Some(notification))) = imported_blocks_stream.poll() { + while let Poll::Ready(Some(notification)) = Pin::new(&mut imported_blocks_stream).poll_next(cx) { network.on_block_imported(notification.hash, notification.header, Vec::new(), notification.is_new_best); } // We poll `finality_notification_stream`, but we only take the last event. let mut last = None; - while let Ok(Async::Ready(Some(item))) = finality_notification_stream.poll() { + while let Poll::Ready(Some(item)) = Pin::new(&mut finality_notification_stream).poll_next(cx) { last = Some(item); } if let Some(notification) = last { @@ -406,7 +394,7 @@ fn build_network_future< } // Poll the RPC requests and answer them. - while let Ok(Async::Ready(Some(request))) = rpc_rx.poll() { + while let Poll::Ready(Some(request)) = Pin::new(&mut rpc_rx).poll_next(cx) { match request { sc_rpc::system::Request::Health(sender) => { let _ = sender.send(sc_rpc::system::Health { @@ -466,7 +454,7 @@ fn build_network_future< } // Interval report for the external API. - status_sinks.lock().poll(|| { + status_sinks.lock().poll(cx, || { let status = NetworkStatus { sync_state: network.sync_state(), best_seen_block: network.best_seen_block(), @@ -481,12 +469,10 @@ fn build_network_future< }); // Main network polling. - let mut net_poll = futures03::future::poll_fn(|cx| futures03::future::Future::poll(Pin::new(&mut network), cx)) - .compat(); - if let Ok(Async::Ready(())) = net_poll.poll().map_err(|err| { + if let Poll::Ready(Ok(())) = Pin::new(&mut network).poll(cx).map_err(|err| { warn!(target: "service", "Error in network: {:?}", err); }) { - return Ok(Async::Ready(())); + return Poll::Ready(()); } // Now some diagnostic for performances. @@ -498,7 +484,7 @@ fn build_network_future< polling_dur ); - Ok(Async::NotReady) + Poll::Pending }) } @@ -596,7 +582,7 @@ impl RpcSession { /// messages. /// /// The `RpcSession` must be kept alive in order to receive messages on the sender. - pub fn new(sender: mpsc::Sender) -> RpcSession { + pub fn new(sender: futures01::sync::mpsc::Sender) -> RpcSession { RpcSession { metadata: sender.into(), } @@ -668,7 +654,7 @@ where let best_block_id = BlockId::hash(self.client.info().best_hash); let import_future = self.pool.submit_one(&best_block_id, uxt); let import_future = import_future - .then(move |import_result| { + .map(move |import_result| { match import_result { Ok(_) => report_handle.report_peer(who, reputation_change_good), Err(e) => match e.into_pool_error() { @@ -680,11 +666,9 @@ where Err(e) => debug!("Error converting pool error: {:?}", e), } } - ready(Ok(())) - }) - .compat(); + }); - if let Err(e) = self.executor.execute(Box::new(import_future)) { + if let Err(e) = self.executor.spawn(Box::new(import_future)) { warn!("Error scheduling extrinsic import: {:?}", e); } } @@ -700,7 +684,7 @@ where #[cfg(test)] mod tests { use super::*; - use futures03::executor::block_on; + use futures::executor::block_on; use sp_consensus::SelectChain; use sp_runtime::traits::BlindCheckable; use substrate_test_runtime_client::{prelude::*, runtime::{Extrinsic, Transfer}}; diff --git a/client/service/src/status_sinks.rs b/client/service/src/status_sinks.rs index 205a22d70f9f4..de5fe865736af 100644 --- a/client/service/src/status_sinks.rs +++ b/client/service/src/status_sinks.rs @@ -14,11 +14,11 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use futures::prelude::*; -use futures::sync::mpsc; -use futures::stream::futures_unordered::FuturesUnordered; -use std::time::{Duration, Instant}; -use tokio_timer::Delay; +use futures::{Stream, stream::futures_unordered::FuturesUnordered, channel::mpsc}; +use std::time::Duration; +use std::pin::Pin; +use std::task::{Poll, Context}; +use futures_timer::Delay; /// Holds a list of `UnboundedSender`s, each associated with a certain time period. Every time the /// period elapses, we push an element on the sender. @@ -29,7 +29,7 @@ pub struct StatusSinks { } struct YieldAfter { - delay: tokio_timer::Delay, + delay: Delay, interval: Duration, sender: Option>, } @@ -47,7 +47,7 @@ impl StatusSinks { /// The `interval` is the time period between two pushes on the sender. pub fn push(&mut self, interval: Duration, sender: mpsc::UnboundedSender) { self.entries.push(YieldAfter { - delay: Delay::new(Instant::now() + interval), + delay: Delay::new(interval), interval, sender: Some(sender), }) @@ -57,16 +57,16 @@ impl StatusSinks { /// pushes what it returns to the sender. /// /// This function doesn't return anything, but it should be treated as if it implicitly - /// returns `Ok(Async::NotReady)`. In particular, it should be called again when the task + /// returns `Poll::Pending`. In particular, it should be called again when the task /// is waken up. /// /// # Panic /// /// Panics if not called within the context of a task. - pub fn poll(&mut self, mut status_grab: impl FnMut() -> T) { + pub fn poll(&mut self, cx: &mut Context, mut status_grab: impl FnMut() -> T) { loop { - match self.entries.poll() { - Ok(Async::Ready(Some((sender, interval)))) => { + match Pin::new(&mut self.entries).poll_next(cx) { + Poll::Ready(Some((sender, interval))) => { let status = status_grab(); if sender.unbounded_send(status).is_ok() { self.entries.push(YieldAfter { @@ -74,33 +74,32 @@ impl StatusSinks { // waken up and the moment it is polled, the period is actually not // `interval` but `interval + `. We ignore this problem in // practice. - delay: Delay::new(Instant::now() + interval), + delay: Delay::new(interval), interval, sender: Some(sender), }); } } - Err(()) | - Ok(Async::Ready(None)) | - Ok(Async::NotReady) => break, + Poll::Ready(None) | + Poll::Pending => break, } } } } -impl Future for YieldAfter { - type Item = (mpsc::UnboundedSender, Duration); - type Error = (); +impl futures::Future for YieldAfter { + type Output = (mpsc::UnboundedSender, Duration); - fn poll(&mut self) -> Poll { - match self.delay.poll() { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(())) => { - let sender = self.sender.take() + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = Pin::into_inner(self); + + match Pin::new(&mut this.delay).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(()) => { + let sender = this.sender.take() .expect("sender is always Some unless the future is finished; qed"); - Ok(Async::Ready((sender, self.interval))) - }, - Err(_) => Err(()), + Poll::Ready((sender, this.interval)) + } } } } @@ -109,8 +108,9 @@ impl Future for YieldAfter { mod tests { use super::StatusSinks; use futures::prelude::*; - use futures::sync::mpsc; + use futures::channel::mpsc; use std::time::Duration; + use std::task::Poll; #[test] fn works() { @@ -125,18 +125,18 @@ mod tests { let mut runtime = tokio::runtime::Runtime::new().unwrap(); let mut val_order = 5; - runtime.spawn(futures::future::poll_fn(move || { - status_sinks.poll(|| { val_order += 1; val_order }); - Ok(Async::NotReady) + runtime.spawn(futures::future::poll_fn(move |cx| { + status_sinks.poll(cx, || { val_order += 1; val_order }); + Poll::<()>::Pending })); let done = rx .into_future() - .and_then(|(item, rest)| { + .then(|(item, rest)| { assert_eq!(item, Some(6)); rest.into_future() }) - .and_then(|(item, rest)| { + .then(|(item, rest)| { assert_eq!(item, Some(7)); rest.into_future() }) @@ -144,6 +144,6 @@ mod tests { assert_eq!(item, Some(8)); }); - runtime.block_on(done).unwrap(); + runtime.block_on(done); } } diff --git a/client/service/test/Cargo.toml b/client/service/test/Cargo.toml index 2789bfda0fe19..6fa6e145cfd5e 100644 --- a/client/service/test/Cargo.toml +++ b/client/service/test/Cargo.toml @@ -7,11 +7,11 @@ edition = "2018" [dependencies] tempfile = "3.1.0" tokio = "0.1.22" -futures = "0.1.29" +futures01 = { package = "futures", version = "0.1.29" } log = "0.4.8" env_logger = "0.7.0" fdlimit = "0.1.1" -futures03 = { package = "futures", version = "0.3.1", features = ["compat"] } +futures = { version = "0.3.1", features = ["compat"] } sc-service = { version = "2.0.0", default-features = false, path = "../../service" } sc-network = { version = "0.8", path = "../../network" } sp-consensus = { version = "0.8", path = "../../../primitives/consensus/common" } diff --git a/client/service/test/src/lib.rs b/client/service/test/src/lib.rs index 06a1edd189a29..dd6395e9c6228 100644 --- a/client/service/test/src/lib.rs +++ b/client/service/test/src/lib.rs @@ -21,7 +21,7 @@ use std::sync::{Arc, Mutex, MutexGuard}; use std::net::Ipv4Addr; use std::time::Duration; use log::info; -use futures::{Future, Stream, Poll}; +use futures01::{Future, Stream, Poll}; use tempfile::TempDir; use tokio::{runtime::Runtime, prelude::FutureExt}; use tokio::timer::Interval; @@ -72,12 +72,13 @@ impl From for SyncService { } } -impl> Future for SyncService { +impl> + Unpin> Future for SyncService { type Item = (); type Error = sc_service::Error; fn poll(&mut self) -> Poll { - self.0.lock().unwrap().poll() + let mut f = self.0.lock().unwrap(); + futures::compat::Compat::new(&mut *f).poll() } } @@ -458,7 +459,7 @@ pub fn sync( let first_user_data = &network.full_nodes[0].2; let best_block = BlockId::number(first_service.get().client().chain_info().best_number); let extrinsic = extrinsic_factory(&first_service.get(), first_user_data); - futures03::executor::block_on(first_service.get().transaction_pool().submit_one(&best_block, extrinsic)).unwrap(); + futures::executor::block_on(first_service.get().transaction_pool().submit_one(&best_block, extrinsic)).unwrap(); network.run_until_all_full( |_index, service| service.get().transaction_pool().ready().count() == 1, |_index, _service| true, diff --git a/utils/browser/src/lib.rs b/utils/browser/src/lib.rs index e404c6612906c..0dbde57182766 100644 --- a/utils/browser/src/lib.rs +++ b/utils/browser/src/lib.rs @@ -22,10 +22,7 @@ use service::{ ChainSpec, RuntimeGenesis }; use wasm_bindgen::prelude::*; -use futures::{ - TryFutureExt as _, FutureExt as _, Stream as _, Future as _, TryStreamExt as _, - channel::{oneshot, mpsc}, future::{poll_fn, ok}, compat::*, -}; +use futures::{prelude::*, channel::{oneshot, mpsc}, future::{poll_fn, ok}, compat::*}; use std::task::Poll; use std::pin::Pin; use chain_spec::Extension; @@ -82,8 +79,7 @@ struct RpcMessage { } /// Create a Client object that connects to a service. -pub fn start_client(service: impl AbstractService) -> Client { - let mut service = service.compat(); +pub fn start_client(mut service: impl AbstractService) -> Client { // We dispatch a background task responsible for processing the service. // // The main action performed by the code below consists in polling the service with @@ -94,10 +90,8 @@ pub fn start_client(service: impl AbstractService) -> Client { loop { match Pin::new(&mut rpc_send_rx).poll_next(cx) { Poll::Ready(Some(message)) => { - let fut = service.get_ref() + let fut = service .rpc_query(&message.session, &message.rpc_json) - .compat() - .unwrap_or_else(|_| None) .boxed(); let _ = message.send_back.send(fut); },