Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Update the service to std futures #4447

Merged
merged 40 commits into from
Jan 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
21e9162
Switch service to futures03
expenses Dec 18, 2019
3e705f6
Fix tests
expenses Dec 18, 2019
566f190
Fix service test and cli
expenses Dec 18, 2019
3dd8ff4
Re-add Executor trait to SpawnTaskHandle
expenses Dec 19, 2019
be72319
Fix node-service
expenses Dec 19, 2019
b260cfe
Update babe
expenses Dec 19, 2019
07ba97d
Fix browser node
expenses Dec 19, 2019
84032dd
Update aura
expenses Dec 19, 2019
ae7dc83
Revert back to tokio-executor to fix runtime panic
expenses Dec 19, 2019
b0308aa
Add todo item
expenses Dec 19, 2019
ca1f5f2
Fix service tests again
expenses Dec 19, 2019
b455854
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Dec 20, 2019
f650750
Timeout test futures
expenses Dec 20, 2019
a3edd86
Fix tests
expenses Dec 20, 2019
846224b
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Dec 21, 2019
a087651
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Jan 2, 2020
326da2a
nits
expenses Jan 3, 2020
dd538b6
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Jan 3, 2020
62e45b2
Fix service test
expenses Jan 3, 2020
b4e8d03
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Jan 6, 2020
f4ba4fa
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Jan 6, 2020
5d03719
Remove zstd patch
expenses Jan 6, 2020
9f57013
Re-add futures01 to aura and babe tests as a dev-dep
expenses Jan 6, 2020
5cf66ca
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Jan 7, 2020
49ab8a1
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Jan 7, 2020
ab73e60
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Jan 8, 2020
1390491
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Jan 9, 2020
4a57419
Change failing test to tee
expenses Jan 9, 2020
2f99610
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Jan 10, 2020
cd25fcc
Fix node
expenses Jan 10, 2020
078992f
Upgrade tokio
expenses Jan 10, 2020
208291a
fix society
gui1117 Jan 10, 2020
4e29d08
Merge remote-tracking branch 'parity/gui-fix-society' into ashley-ser…
expenses Jan 10, 2020
9c19763
Start switching grandpa to stable futures
expenses Jan 10, 2020
51e0ead
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Jan 13, 2020
ebb26a2
Revert "Start switching grandpa to stable futures"
expenses Jan 13, 2020
5a51c55
Fix utils
expenses Jan 13, 2020
01e2073
Revert substrate service test
expenses Jan 14, 2020
6ad061b
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Jan 14, 2020
87c740e
Revert gitlab
expenses Jan 14, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions bin/node-template/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ path = "src/main.rs"

[dependencies]
futures = "0.3.1"
futures01 = { package = "futures", version = "0.1.29" }
ctrlc = { version = "3.1.3", features = ["termination"] }
log = "0.4.8"
tokio = "0.1.22"
tokio = { version = "0.2", features = ["rt-threaded"] }
parking_lot = "0.9.0"
codec = { package = "parity-scale-codec", version = "1.0.0" }
trie-root = "0.15.2"
Expand Down
31 changes: 9 additions & 22 deletions bin/node-template/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::service;
use futures::{future::{select, Map}, FutureExt, TryFutureExt, channel::oneshot, compat::Future01CompatExt};
use futures::{future::{select, Map, Either}, FutureExt, channel::oneshot};
use std::cell::RefCell;
use tokio::runtime::Runtime;
pub use sc_cli::{VersionInfo, IntoExit, error};
Expand Down Expand Up @@ -75,36 +75,23 @@ where

let informant = informant::build(&service);

let future = select(exit, informant)
.map(|_| Ok(()))
.compat();

runtime.executor().spawn(future);
let handle = runtime.spawn(select(exit, informant));

// we eagerly drop the service so that the internal exit future is fired,
// but we need to keep holding a reference to the global telemetry guard
let _telemetry = service.telemetry();

let service_res = {
let exit = e.into_exit();
let service = service
.map_err(|err| error::Error::Service(err))
.compat();
let select = select(service, exit)
.map(|_| Ok(()))
.compat();
runtime.block_on(select)
};
let exit = e.into_exit();
let service_res = runtime.block_on(select(service, exit));

let _ = exit_send.send(());

// TODO [andre]: timeout this future #1318

use futures01::Future;
runtime.block_on(handle);

let _ = runtime.shutdown_on_idle().wait();

service_res
match service_res {
Either::Left((res, _)) => res.map_err(error::Error::Service),
Either::Right((_, _)) => Ok(())
}
}

// handles ctrl-c
Expand Down
5 changes: 3 additions & 2 deletions bin/node-template/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub use sc_executor::NativeExecutor;
use sp_consensus_aura::sr25519::{AuthorityPair as AuraPair};
use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider};
use sc_basic_authority;
use futures::{FutureExt, compat::Future01CompatExt};

// Our native executor instance.
native_executor_instance!(
Expand Down Expand Up @@ -163,7 +164,7 @@ pub fn new_full<C: Send + Default + 'static>(config: Configuration<C, GenesisCon
service.network(),
service.on_exit(),
service.spawn_task_handle(),
)?);
)?.compat().map(drop));
},
(true, false) => {
// start the full GRANDPA voter
Expand All @@ -180,7 +181,7 @@ pub fn new_full<C: Send + Default + 'static>(config: Configuration<C, GenesisCon

// the GRANDPA voter task is considered infallible, i.e.
// if it fails we take down the service with it.
service.spawn_essential_task(grandpa::run_grandpa_voter(voter_config)?);
service.spawn_essential_task(grandpa::run_grandpa_voter(voter_config)?.compat().map(drop));
},
(_, true) => {
grandpa::setup_disabled_grandpa(
Expand Down
3 changes: 1 addition & 2 deletions bin/node/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ crate-type = ["cdylib", "rlib"]
# third-party dependencies
codec = { package = "parity-scale-codec", version = "1.0.6" }
serde = { version = "1.0.102", features = ["derive"] }
futures01 = { package = "futures", version = "0.1.29" }
futures = { version = "0.3.1", features = ["compat"] }
hex-literal = "0.2.1"
jsonrpc-core = "14.0.3"
Expand Down Expand Up @@ -81,7 +80,7 @@ node-primitives = { version = "2.0.0", path = "../primitives" }
node-executor = { version = "2.0.0", path = "../executor" }

# CLI-specific dependencies
tokio = { version = "0.1.22", optional = true }
tokio = { version = "0.2", features = ["rt-threaded"], optional = true }
sc-cli = { version = "2.0.0", optional = true, path = "../../../client/cli" }
ctrlc = { version = "3.1.3", features = ["termination"], optional = true }
node-transaction-factory = { version = "2.0.0", optional = true, path = "../transaction-factory" }
Expand Down
35 changes: 13 additions & 22 deletions bin/node/cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

pub use sc_cli::VersionInfo;
use tokio::prelude::Future;
use tokio::runtime::{Builder as RuntimeBuilder, Runtime};
use sc_cli::{IntoExit, NoCustom, SharedParams, ImportParams, error};
use sc_service::{AbstractService, Roles as ServiceRoles, Configuration};
Expand All @@ -25,6 +24,7 @@ use sc_cli::{display_role, parse_and_prepare, GetSharedParams, ParseAndPrepare};
use crate::{service, ChainSpec, load_spec};
use crate::factory_impl::FactoryState;
use node_transaction_factory::RuntimeAdapter;
use futures::{channel::oneshot, future::{select, Either}};

/// Custom subcommands.
#[derive(Clone, Debug, StructOpt)]
Expand Down Expand Up @@ -105,7 +105,10 @@ pub fn run<I, T, E>(args: I, exit: E, version: sc_cli::VersionInfo) -> error::Re
info!("Chain specification: {}", config.chain_spec.name());
info!("Node name: {}", config.name);
info!("Roles: {}", display_role(&config));
let runtime = RuntimeBuilder::new().name_prefix("main-tokio-").build()
let runtime = RuntimeBuilder::new()
.thread_name("main-tokio-")
.threaded_scheduler()
.build()
.map_err(|e| format!("{:?}", e))?;
match config.roles {
ServiceRoles::LIGHT => run_until_exit(
Expand Down Expand Up @@ -172,37 +175,25 @@ where
T: AbstractService,
E: IntoExit,
{
use futures::{FutureExt, TryFutureExt, channel::oneshot, future::select, compat::Future01CompatExt};

let (exit_send, exit) = oneshot::channel();

let informant = sc_cli::informant::build(&service);

let future = select(informant, exit)
.map(|_| Ok(()))
.compat();

runtime.executor().spawn(future);
let handle = runtime.spawn(select(exit, informant));

// we eagerly drop the service so that the internal exit future is fired,
// but we need to keep holding a reference to the global telemetry guard
let _telemetry = service.telemetry();

let service_res = {
let exit = e.into_exit();
let service = service
.map_err(|err| error::Error::Service(err))
.compat();
let select = select(service, exit)
.map(|_| Ok(()))
.compat();
runtime.block_on(select)
};
let exit = e.into_exit();
let service_res = runtime.block_on(select(service, exit));

let _ = exit_send.send(());

// TODO [andre]: timeout this future #1318
let _ = runtime.shutdown_on_idle().wait();
runtime.block_on(handle);

service_res
match service_res {
Either::Left((res, _)) => res.map_err(error::Error::Service),
Either::Right((_, _)) => Ok(())
}
}
13 changes: 7 additions & 6 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ macro_rules! new_full_start {
macro_rules! new_full {
($config:expr, $with_startup_data: expr) => {{
use futures::{
stream::StreamExt,
future::{FutureExt, TryFutureExt},
prelude::*,
compat::Future01CompatExt
};
use sc_network::Event;

Expand Down Expand Up @@ -191,9 +191,8 @@ macro_rules! new_full {
service.keystore(),
dht_event_stream,
);
let future01_authority_discovery = authority_discovery.map(|x| Ok(x)).compat();

service.spawn_task(future01_authority_discovery);
service.spawn_task(authority_discovery);
}

// if the node isn't actively participating in consensus then it doesn't
Expand Down Expand Up @@ -223,7 +222,7 @@ macro_rules! new_full {
service.network(),
service.on_exit(),
service.spawn_task_handle(),
)?);
)?.compat().map(drop));
},
(true, false) => {
// start the full GRANDPA voter
Expand All @@ -239,7 +238,9 @@ macro_rules! new_full {
};
// the GRANDPA voter task is considered infallible, i.e.
// if it fails we take down the service with it.
service.spawn_essential_task(grandpa::run_grandpa_voter(grandpa_config)?);
service.spawn_essential_task(
grandpa::run_grandpa_voter(grandpa_config)?.compat().map(drop)
);
},
(_, true) => {
grandpa::setup_disabled_grandpa(
Expand Down
4 changes: 2 additions & 2 deletions client/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ time = "0.1.42"
ansi_term = "0.12.1"
lazy_static = "1.4.0"
app_dirs = "1.2.1"
tokio = "0.2.1"
futures = { version = "0.3.1", features = ["compat"] }
tokio = "0.2"
futures = "0.3.1"
fdlimit = "0.1.1"
serde_json = "1.0.41"
sp-panic-handler = { version = "2.0.0", path = "../../primitives/panic-handler" }
Expand Down
7 changes: 3 additions & 4 deletions client/cli/src/informant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! Console informant. Prints sync progress and block events. Runs on the calling thread.

use sc_client_api::BlockchainEvents;
use futures::{StreamExt, TryStreamExt, FutureExt, future, compat::Stream01CompatExt};
use futures::prelude::*;
use log::{info, warn, trace};
use sp_runtime::traits::Header;
use sc_service::AbstractService;
Expand All @@ -33,16 +33,15 @@ pub fn build(service: &impl AbstractService) -> impl futures::Future<Output = ()

let display_notifications = service
.network_status(Duration::from_millis(5000))
.compat()
.try_for_each(move |(net_status, _)| {
.for_each(move |(net_status, _)| {
let info = client.usage_info();
if let Some(ref usage) = info.usage {
trace!(target: "usage", "Usage statistics: {}", usage);
} else {
trace!(target: "usage", "Usage statistics not displayed as backend does not provide it")
}
display.display(&info, net_status);
future::ok(())
future::ready(())
});

let client = service.client();
Expand Down
11 changes: 4 additions & 7 deletions client/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub use traits::GetSharedParams;
use app_dirs::{AppInfo, AppDataType};
use log::info;
use lazy_static::lazy_static;
use futures::{Future, compat::Future01CompatExt, executor::block_on};
use futures::{Future, executor::block_on};
use sc_telemetry::TelemetryEndpoints;
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
Expand Down Expand Up @@ -426,8 +426,7 @@ impl<'a> ParseAndPrepareExport<'a> {
});

let mut export_fut = builder(config)?
.export_blocks(file, from.into(), to, json)
.compat();
.export_blocks(file, from.into(), to, json);
let fut = futures::future::poll_fn(|cx| {
if exit_recv.try_recv().is_ok() {
return Poll::Ready(Ok(()));
Expand Down Expand Up @@ -485,8 +484,7 @@ impl<'a> ParseAndPrepareImport<'a> {
});

let mut import_fut = builder(config)?
.import_blocks(file, false)
.compat();
.import_blocks(file, false);
let fut = futures::future::poll_fn(|cx| {
if exit_recv.try_recv().is_ok() {
return Poll::Ready(Ok(()));
Expand Down Expand Up @@ -537,8 +535,7 @@ impl<'a> CheckBlock<'a> {

let start = std::time::Instant::now();
let check = builder(config)?
.check_block(block_id)
.compat();
.check_block(block_id);
let mut runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(check)?;
println!("Completed in {} ms.", start.elapsed().as_millis());
Expand Down
4 changes: 2 additions & 2 deletions client/consensus/aura/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ sc-client-api = { version = "2.0.0", path = "../../api" }
codec = { package = "parity-scale-codec", version = "1.0.0" }
sp-consensus = { version = "0.8", path = "../../../primitives/consensus/common" }
derive_more = "0.99.2"
futures = { version = "0.3.1", features = ["compat"] }
futures01 = { package = "futures", version = "0.1" }
futures = "0.3.1"
futures-timer = "0.4.0"
sp-inherents = { version = "2.0.0", path = "../../../primitives/inherents" }
sc-keystore = { version = "2.0.0", path = "../../keystore" }
Expand All @@ -41,3 +40,4 @@ substrate-test-runtime-client = { version = "2.0.0", path = "../../../test-utils
tokio = "0.1.22"
env_logger = "0.7.0"
tempfile = "3.1.0"
futures01 = { package = "futures", version = "0.1" }
Loading