diff --git a/Cargo.lock b/Cargo.lock index 62969a0efa4dc..2e285d35f7cb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4717,7 +4717,7 @@ name = "substrate-offchain" version = "2.0.0" dependencies = [ "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4730,7 +4730,6 @@ dependencies = [ "substrate-primitives 2.0.0", "substrate-test-runtime-client 2.0.0", "substrate-transaction-pool 2.0.0", - "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/core/offchain/Cargo.toml b/core/offchain/Cargo.toml index ed201c9316ad5..3ee8e8580dfb0 100644 --- a/core/offchain/Cargo.toml +++ b/core/offchain/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" [dependencies] client = { package = "substrate-client", path = "../../core/client" } -futures = "0.1.25" +futures-preview = "0.3.0-alpha.17" log = "0.4" offchain-primitives = { package = "substrate-offchain-primitives", path = "./primitives" } codec = { package = "parity-scale-codec", version = "1.0.0", features = ["derive"] } @@ -23,7 +23,6 @@ keystore = { package = "substrate-keystore", path = "../keystore" } env_logger = "0.6" client-db = { package = "substrate-client-db", path = "../../core/client/db/", default-features = true } test-client = { package = "substrate-test-runtime-client", path = "../../core/test-runtime/client" } -tokio = "0.1.7" [features] default = [] diff --git a/core/offchain/src/api.rs b/core/offchain/src/api.rs index c13b79131bcb3..468e98a566d6d 100644 --- a/core/offchain/src/api.rs +++ b/core/offchain/src/api.rs @@ -23,7 +23,7 @@ use std::{ }; use client::backend::OffchainStorage; -use futures::{Stream, Future, sync::mpsc}; +use futures::{StreamExt as _, Future, future, channel::mpsc}; use log::{info, debug, warn, error}; use network::{PeerId, Multiaddr, NetworkStateInfo}; use codec::{Encode, Decode}; @@ -297,14 +297,14 @@ impl AsyncApi { } /// Run a processing task for the API - pub fn process(mut self) -> impl Future { + pub fn process(mut self) -> impl Future { let receiver = self.receiver.take().expect("Take invoked only once."); receiver.for_each(move |msg| { match msg { ExtMessage::SubmitExtrinsic(ext) => self.submit_extrinsic(ext), } - Ok(()) + future::ready(()) }) } diff --git a/core/offchain/src/lib.rs b/core/offchain/src/lib.rs index 5525546f2558a..075a2bd8375f6 100644 --- a/core/offchain/src/lib.rs +++ b/core/offchain/src/lib.rs @@ -98,7 +98,7 @@ impl OffchainWorkers< number: &::Number, pool: &Arc>, network_state: Arc, - ) -> impl Future where + ) -> impl Future where A: ChainApi + 'static, { let runtime = self.client.runtime_api(); @@ -129,9 +129,9 @@ impl OffchainWorkers< log::error!("Error running offchain workers at {:?}: {:?}", at, e); } }); - futures::future::Either::A(runner.process()) + futures::future::Either::Left(runner.process()) } else { - futures::future::Either::B(futures::future::ok(())) + futures::future::Either::Right(futures::future::ready(())) } } } @@ -152,7 +152,6 @@ fn spawn_worker(f: impl FnOnce() -> () + Send + 'static) { #[cfg(test)] mod tests { use super::*; - use futures::Future; use network::{Multiaddr, PeerId}; struct MockNetworkStateInfo(); @@ -171,7 +170,6 @@ mod tests { fn should_call_into_runtime_and_produce_extrinsic() { // given let _ = env_logger::try_init(); - let runtime = tokio::runtime::Runtime::new().unwrap(); let client = Arc::new(test_client::new()); let pool = Arc::new(Pool::new(Default::default(), transaction_pool::ChainApi::new(client.clone()))); let db = client_db::offchain::LocalStorage::new_test(); @@ -179,10 +177,9 @@ mod tests { // when let offchain = OffchainWorkers::new(client, db); - runtime.executor().spawn(offchain.on_block_imported(&0u64, &pool, network_state.clone())); + futures::executor::block_on(offchain.on_block_imported(&0u64, &pool, network_state)); // then - runtime.shutdown_on_idle().wait().unwrap(); assert_eq!(pool.status().ready, 1); assert_eq!(pool.ready().next().unwrap().is_propagateable(), false); } diff --git a/core/service/src/components.rs b/core/service/src/components.rs index 47781aa79d810..d2b6131ed0164 100644 --- a/core/service/src/components.rs +++ b/core/service/src/components.rs @@ -36,7 +36,7 @@ use crate::config::Configuration; use primitives::{Blake2Hasher, H256, traits::BareCryptoStorePtr}; use rpc::{self, apis::system::SystemInfo}; use futures::{prelude::*, future::Executor}; -use futures03::channel::mpsc; +use futures03::{FutureExt as _, channel::mpsc, compat::Compat}; // Type aliases. // These exist mainly to avoid typing `::Foo` all over the code. @@ -279,7 +279,9 @@ impl OffchainWorker for C where pool: &Arc>, network_state: &Arc, ) -> error::Result + Send>> { - Ok(Box::new(offchain.on_block_imported(number, pool, network_state.clone()))) + let future = offchain.on_block_imported(number, pool, network_state.clone()) + .map(|()| Ok(())); + Ok(Box::new(Compat::new(future))) } }