Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Update the service to std futures #4447

Merged
merged 40 commits into from
Jan 14, 2020
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
21e9162
Switch service to futures03
expenses Dec 18, 2019
3e705f6
Fix tests
expenses Dec 18, 2019
566f190
Fix service test and cli
expenses Dec 18, 2019
3dd8ff4
Re-add Executor trait to SpawnTaskHandle
expenses Dec 19, 2019
be72319
Fix node-service
expenses Dec 19, 2019
b260cfe
Update babe
expenses Dec 19, 2019
07ba97d
Fix browser node
expenses Dec 19, 2019
84032dd
Update aura
expenses Dec 19, 2019
ae7dc83
Revert back to tokio-executor to fix runtime panic
expenses Dec 19, 2019
b0308aa
Add todo item
expenses Dec 19, 2019
ca1f5f2
Fix service tests again
expenses Dec 19, 2019
b455854
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Dec 20, 2019
f650750
Timeout test futures
expenses Dec 20, 2019
a3edd86
Fix tests
expenses Dec 20, 2019
846224b
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Dec 21, 2019
a087651
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Jan 2, 2020
326da2a
nits
expenses Jan 3, 2020
dd538b6
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Jan 3, 2020
62e45b2
Fix service test
expenses Jan 3, 2020
b4e8d03
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Jan 6, 2020
f4ba4fa
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Jan 6, 2020
5d03719
Remove zstd patch
expenses Jan 6, 2020
9f57013
Re-add futures01 to aura and babe tests as a dev-dep
expenses Jan 6, 2020
5cf66ca
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Jan 7, 2020
49ab8a1
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Jan 7, 2020
ab73e60
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Jan 8, 2020
1390491
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Jan 9, 2020
4a57419
Change failing test to tee
expenses Jan 9, 2020
2f99610
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Jan 10, 2020
cd25fcc
Fix node
expenses Jan 10, 2020
078992f
Upgrade tokio
expenses Jan 10, 2020
208291a
fix society
gui1117 Jan 10, 2020
4e29d08
Merge remote-tracking branch 'parity/gui-fix-society' into ashley-ser…
expenses Jan 10, 2020
9c19763
Start switching grandpa to stable futures
expenses Jan 10, 2020
51e0ead
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Jan 13, 2020
ebb26a2
Revert "Start switching grandpa to stable futures"
expenses Jan 13, 2020
5a51c55
Fix utils
expenses Jan 13, 2020
01e2073
Revert substrate service test
expenses Jan 14, 2020
6ad061b
Merge remote-tracking branch 'parity/master' into ashley-service-futures
expenses Jan 14, 2020
87c740e
Revert gitlab
expenses Jan 14, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions bin/node-template/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::service;
use futures::{future::{select, Map}, FutureExt, TryFutureExt, channel::oneshot, compat::Future01CompatExt};
use futures::{future::{select, Map}, FutureExt, TryFutureExt, channel::oneshot};
use std::cell::RefCell;
use tokio::runtime::Runtime;
pub use sc_cli::{VersionInfo, IntoExit, error};
Expand Down Expand Up @@ -87,9 +87,6 @@ where

let service_res = {
let exit = e.into_exit();
let service = service
.map_err(|err| error::Error::Service(err))
.compat();
let select = select(service, exit)
.map(|_| Ok(()))
.compat();
Expand Down
5 changes: 3 additions & 2 deletions bin/node-template/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub use sc_executor::NativeExecutor;
use sp_consensus_aura::sr25519::{AuthorityPair as AuraPair};
use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider};
use sc_basic_authority;
use futures::{FutureExt, compat::Future01CompatExt};

// Our native executor instance.
native_executor_instance!(
Expand Down Expand Up @@ -159,7 +160,7 @@ pub fn new_full<C: Send + Default + 'static>(config: Configuration<C, GenesisCon
service.network(),
service.on_exit(),
service.spawn_task_handle(),
)?);
)?.compat().map(drop));
},
(true, false) => {
// start the full GRANDPA voter
Expand All @@ -176,7 +177,7 @@ pub fn new_full<C: Send + Default + 'static>(config: Configuration<C, GenesisCon

// the GRANDPA voter task is considered infallible, i.e.
// if it fails we take down the service with it.
service.spawn_essential_task(grandpa::run_grandpa_voter(voter_config)?);
service.spawn_essential_task(grandpa::run_grandpa_voter(voter_config)?.compat().map(drop));
},
(_, true) => {
grandpa::setup_disabled_grandpa(
Expand Down
7 changes: 5 additions & 2 deletions bin/node/cli/src/browser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use log::{debug, info};
use std::sync::Arc;
use sc_service::{AbstractService, RpcSession, Roles as ServiceRoles, Configuration, config::DatabaseConfig};
use wasm_bindgen::prelude::*;
use futures::{FutureExt, TryFutureExt};

/// Starts the client.
///
Expand Down Expand Up @@ -75,7 +76,9 @@ fn start_inner(wasm_ext: wasm_ext::ffi::Transport) -> Result<Client, Box<dyn std
loop {
match rpc_send_rx.poll() {
Ok(Async::Ready(Some(message))) => {
let fut = service.rpc_query(&message.session, &message.rpc_json);
let fut = service.rpc_query(&message.session, &message.rpc_json)
.unit_error()
.compat();
let _ = message.send_back.send(Box::new(fut));
},
Ok(Async::NotReady) => break,
Expand All @@ -84,7 +87,7 @@ fn start_inner(wasm_ext: wasm_ext::ffi::Transport) -> Result<Client, Box<dyn std
}

loop {
match service.poll().map_err(|_| ())? {
match (&mut service).compat().poll().map_err(|_| ())? {
Async::Ready(()) => return Ok(Async::Ready(())),
Async::NotReady => break
}
Expand Down
5 changes: 1 addition & 4 deletions bin/node/cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ where
T: AbstractService,
E: IntoExit,
{
use futures::{FutureExt, TryFutureExt, channel::oneshot, future::select, compat::Future01CompatExt};
use futures::{FutureExt, TryFutureExt, channel::oneshot, future::select};

let (exit_send, exit) = oneshot::channel();

Expand All @@ -196,9 +196,6 @@ where

let service_res = {
let exit = e.into_exit();
let service = service
.map_err(|err| error::Error::Service(err))
.compat();
let select = select(service, exit)
.map(|_| Ok(()))
.compat();
Expand Down
21 changes: 9 additions & 12 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,11 @@ macro_rules! new_full_start {
/// concrete types instead.
macro_rules! new_full {
($config:expr, $with_startup_data: expr) => {{
use futures01::sync::mpsc;
use sc_network::DhtEvent;
use futures::{
compat::Stream01CompatExt,
compat::Future01CompatExt,
stream::StreamExt,
future::{FutureExt, TryFutureExt},
future::FutureExt,
};

let (
Expand Down Expand Up @@ -147,7 +146,7 @@ macro_rules! new_full {
// This estimates the authority set size to be somewhere below 10 000 thereby setting the channel buffer size to
// 10 000.
let (dht_event_tx, dht_event_rx) =
mpsc::channel::<DhtEvent>(10_000);
futures::channel::mpsc::channel::<DhtEvent>(10_000);

let service = builder.with_network_protocol(|_| Ok(crate::service::NodeProtocol::new()))?
.with_finality_proof_provider(|client, backend|
Expand Down Expand Up @@ -190,19 +189,15 @@ macro_rules! new_full {
let babe = sc_consensus_babe::start_babe(babe_config)?;
service.spawn_essential_task(babe);

let future03_dht_event_rx = dht_event_rx.compat()
.map(|x| x.expect("<mpsc::channel::Receiver as Stream> never returns an error; qed"))
.boxed();
let authority_discovery = sc_authority_discovery::AuthorityDiscovery::new(
service.client(),
service.network(),
sentry_nodes,
service.keystore(),
future03_dht_event_rx,
dht_event_rx.boxed(),
);
let future01_authority_discovery = authority_discovery.map(|x| Ok(x)).compat();

service.spawn_task(future01_authority_discovery);
service.spawn_task(authority_discovery);
}

// if the node isn't actively participating in consensus then it doesn't
Expand Down Expand Up @@ -232,7 +227,7 @@ macro_rules! new_full {
service.network(),
service.on_exit(),
service.spawn_task_handle(),
)?);
)?.compat().map(drop));
},
(true, false) => {
// start the full GRANDPA voter
Expand All @@ -248,7 +243,9 @@ macro_rules! new_full {
};
// the GRANDPA voter task is considered infallible, i.e.
// if it fails we take down the service with it.
service.spawn_essential_task(grandpa::run_grandpa_voter(grandpa_config)?);
service.spawn_essential_task(
grandpa::run_grandpa_voter(grandpa_config)?.compat().map(drop)
);
},
(_, true) => {
grandpa::setup_disabled_grandpa(
Expand Down
2 changes: 1 addition & 1 deletion client/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ ansi_term = "0.12.1"
lazy_static = "1.4.0"
app_dirs = "1.2.1"
tokio = "0.2.1"
futures = { version = "0.3.1", features = ["compat"] }
futures = "0.3.1"
fdlimit = "0.1.1"
serde_json = "1.0.41"
sp-panic-handler = { version = "2.0.0", path = "../../primitives/panic-handler" }
Expand Down
7 changes: 3 additions & 4 deletions client/cli/src/informant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! Console informant. Prints sync progress and block events. Runs on the calling thread.

use sc_client_api::BlockchainEvents;
use futures::{StreamExt, TryStreamExt, FutureExt, future, compat::Stream01CompatExt};
use futures::{StreamExt, FutureExt, future};
expenses marked this conversation as resolved.
Show resolved Hide resolved
use log::{info, warn};
use sp_runtime::traits::Header;
use sc_service::AbstractService;
Expand All @@ -33,11 +33,10 @@ pub fn build(service: &impl AbstractService) -> impl futures::Future<Output = ()

let display_notifications = service
.network_status(Duration::from_millis(5000))
.compat()
.try_for_each(move |(net_status, _)| {
.for_each(move |(net_status, _)| {
let info = client.info();
display.display(&info, net_status);
future::ok(())
future::ready(())
});

let client = service.client();
Expand Down
11 changes: 4 additions & 7 deletions client/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub use traits::{GetSharedParams, AugmentClap};
use app_dirs::{AppInfo, AppDataType};
use log::info;
use lazy_static::lazy_static;
use futures::{Future, compat::Future01CompatExt, executor::block_on};
use futures::{Future, executor::block_on};
use sc_telemetry::TelemetryEndpoints;
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
Expand Down Expand Up @@ -422,8 +422,7 @@ impl<'a> ParseAndPrepareExport<'a> {
});

let mut export_fut = builder(config)?
.export_blocks(file, from.into(), to, json)
.compat();
.export_blocks(file, from.into(), to, json);
let fut = futures::future::poll_fn(|cx| {
if exit_recv.try_recv().is_ok() {
return Poll::Ready(Ok(()));
Expand Down Expand Up @@ -481,8 +480,7 @@ impl<'a> ParseAndPrepareImport<'a> {
});

let mut import_fut = builder(config)?
.import_blocks(file, false)
.compat();
.import_blocks(file, false);
let fut = futures::future::poll_fn(|cx| {
if exit_recv.try_recv().is_ok() {
return Poll::Ready(Ok(()));
Expand Down Expand Up @@ -533,8 +531,7 @@ impl<'a> CheckBlock<'a> {

let start = std::time::Instant::now();
let check = builder(config)?
.check_block(block_id)
.compat();
.check_block(block_id);
let mut runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(check)?;
println!("Completed in {} ms.", start.elapsed().as_millis());
Expand Down
3 changes: 1 addition & 2 deletions client/consensus/aura/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ sc-client-api = { version = "2.0.0", path = "../../api" }
codec = { package = "parity-scale-codec", version = "1.0.0" }
sp-consensus = { version = "0.8", path = "../../../primitives/consensus/common" }
derive_more = "0.99.2"
futures = { version = "0.3.1", features = ["compat"] }
futures01 = { package = "futures", version = "0.1" }
futures = "0.3.1"
futures-timer = "0.4.0"
sp-inherents = { version = "2.0.0", path = "../../../primitives/inherents" }
sc-keystore = { version = "2.0.0", path = "../../keystore" }
Expand Down
17 changes: 10 additions & 7 deletions client/consensus/aura/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ pub fn start_aura<B, C, SC, E, I, P, SO, CAW, Error, H>(
force_authoring: bool,
keystore: KeyStorePtr,
can_author_with: CAW,
) -> Result<impl futures01::Future<Item = (), Error = ()>, sp_consensus::Error> where
) -> Result<impl futures::Future<Output = ()>, sp_consensus::Error> where
expenses marked this conversation as resolved.
Show resolved Hide resolved
B: BlockT<Header=H>,
C: ProvideRuntimeApi + BlockOf + ProvideCache<B> + AuxStore + Send + Sync,
C::Api: AuraApi<B, AuthorityId<P>>,
Expand Down Expand Up @@ -192,7 +192,7 @@ pub fn start_aura<B, C, SC, E, I, P, SO, CAW, Error, H>(
inherent_data_providers,
AuraSlotCompatible,
can_author_with,
).map(|()| Ok::<(), ()>(())).compat())
))
}

struct AuraWorker<C, E, I, P, SO> {
Expand Down Expand Up @@ -906,18 +906,21 @@ mod tests {
false,
keystore,
sp_consensus::AlwaysCanAuthor,
).expect("Starts aura");
)
.expect("Starts aura")
.unit_error()
.compat();

runtime.spawn(aura);
}

runtime.spawn(futures01::future::poll_fn(move || {
runtime.spawn(futures::future::poll_fn(move |_| {
net.lock().poll();
expenses marked this conversation as resolved.
Show resolved Hide resolved
Ok::<_, ()>(futures01::Async::NotReady::<()>)
}));
std::task::Poll::<()>::Pending
}).unit_error().compat());

runtime.block_on(future::join_all(import_notifications)
.map(|_| Ok::<(), ()>(())).compat()).unwrap();
.unit_error().compat()).unwrap();
}

#[test]
Expand Down
3 changes: 1 addition & 2 deletions client/consensus/babe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ sc-consensus-uncles = { version = "0.8", path = "../uncles" }
sc-consensus-slots = { version = "0.8", path = "../slots" }
sp-runtime = { version = "2.0.0", path = "../../../primitives/runtime" }
fork-tree = { version = "2.0.0", path = "../../../utils/fork-tree" }
futures = { version = "0.3.1", features = ["compat"] }
futures01 = { package = "futures", version = "0.1" }
futures = "0.3.1"
futures-timer = "0.4.0"
parking_lot = "0.9.0"
log = "0.4.8"
Expand Down
8 changes: 3 additions & 5 deletions client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, Error>(BabeParams {
babe_link,
can_author_with,
}: BabeParams<B, C, E, I, SO, SC, CAW>) -> Result<
impl futures01::Future<Item=(), Error=()>,
impl futures::Future<Output=()>,
sp_consensus::Error,
> where
B: BlockT<Hash=H256>,
Expand Down Expand Up @@ -324,17 +324,15 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, Error>(BabeParams {
)?;

babe_info!("Starting BABE Authorship worker");
let slot_worker = sc_consensus_slots::start_slot_worker(
Ok(sc_consensus_slots::start_slot_worker(
config.0,
select_chain,
worker,
sync_oracle,
inherent_data_providers,
babe_link.time_source,
can_author_with,
);

Ok(slot_worker.map(|_| Ok::<(), ()>(())).compat())
))
}

struct BabeWorker<B: BlockT, C, E, I, SO> {
Expand Down
10 changes: 5 additions & 5 deletions client/consensus/babe/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,16 +403,16 @@ fn run_one_test(
babe_link: data.link.clone(),
keystore,
can_author_with: sp_consensus::AlwaysCanAuthor,
}).expect("Starts babe"));
}).expect("Starts babe").unit_error().compat());
}

runtime.spawn(futures01::future::poll_fn(move || {
runtime.spawn(futures::future::poll_fn(move |_| {
net.lock().poll();
expenses marked this conversation as resolved.
Show resolved Hide resolved
Ok::<_, ()>(futures01::Async::NotReady::<()>)
}));
std::task::Poll::<()>::Pending
}).unit_error().compat());

runtime.block_on(future::join_all(import_notifications)
.map(|_| Ok::<(), ()>(())).compat()).unwrap();
.unit_error().compat()).unwrap();
}

#[test]
Expand Down
Loading