Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Remove more instances of futures01 #4633

Merged
merged 30 commits into from
Feb 28, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8da9a8a
Start removing last few instances of futures01
expenses Jan 15, 2020
1c61728
Use to_poll on wasm
expenses Jan 15, 2020
8f42407
Revert "Use to_poll on wasm"
expenses Jan 16, 2020
a80cc31
Fix fg test
expenses Jan 16, 2020
66f837a
Upgrade network test futures
expenses Jan 20, 2020
4e9e5c6
Update offchain hyper version
expenses Jan 20, 2020
7a429c6
Update service test
expenses Jan 21, 2020
738fff9
Merge remote-tracking branch 'parity/master' into ashley-more-futures
expenses Jan 21, 2020
19c8b38
Merge remote-tracking branch 'parity/master' into ashley-more-futures
expenses Jan 21, 2020
5d36bef
Merge remote-tracking branch 'parity/master' into ashley-more-futures
expenses Jan 24, 2020
9a196df
bump tokio to 0.2.10
expenses Jan 24, 2020
29b70b8
Removed some unneeded tokios
expenses Jan 24, 2020
6aeecf7
Merge remote-tracking branch 'parity/master' into ashley-more-futures
expenses Jan 27, 2020
c4fd68b
fixes
expenses Jan 27, 2020
553f9c1
Merge remote-tracking branch 'parity/master' into ashley-more-futures
expenses Jan 27, 2020
8b28b97
fix run_until_all_full
expenses Jan 27, 2020
b692425
Merge remote-tracking branch 'parity/master' into ashley-more-futures
expenses Jan 29, 2020
7c55e50
Make service test debuggable
expenses Jan 29, 2020
837aa0b
Update client/offchain/src/api/http.rs
expenses Jan 29, 2020
06ca442
Add service_test to test-int output
expenses Jan 29, 2020
25f1579
Merge branch 'ashley-more-futures' of github.com:paritytech/substrate…
expenses Jan 29, 2020
a0d4b37
Merge remote-tracking branch 'parity/master' into ashley-more-futures
expenses Jan 30, 2020
4270f7f
nitpicking
expenses Jan 30, 2020
b5e8f13
Merge remote-tracking branch 'parity/master' into HEAD
expenses Feb 6, 2020
64856fe
Finally fix test
expenses Feb 10, 2020
18fe65a
Merge remote-tracking branch 'parity/master' into ashley-more-futures
expenses Feb 10, 2020
6eb50f6
Merge remote-tracking branch 'parity/master' into ashley-more-futures
expenses Feb 27, 2020
bcc5b8b
Give up and revert client/serviec/test
expenses Feb 28, 2020
53c4ccf
Revert gitlab ci too
expenses Feb 28, 2020
143d62e
Merge remote-tracking branch 'parity/master' into ashley-more-futures
expenses Feb 28, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion client/network-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ edition = "2018"

[dependencies]
log = "0.4.8"
futures01 = { package = "futures", version = "0.1.29" }
futures = { version = "0.3.1", features = ["compat"] }
expenses marked this conversation as resolved.
Show resolved Hide resolved
futures-timer = "0.4.0"
lru = "0.1.2"
Expand Down
7 changes: 3 additions & 4 deletions client/network-gossip/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::state_machine::{ConsensusGossip, TopicNotification};
use sc_network::message::generic::ConsensusMessage;
use sc_network::{Event, ReputationChange};

use futures::{prelude::*, channel::mpsc, compat::Compat01As03, task::SpawnExt as _};
use futures::{prelude::*, channel::mpsc, task::SpawnExt as _};
use libp2p::PeerId;
use parking_lot::Mutex;
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
Expand Down Expand Up @@ -50,7 +50,7 @@ impl<B: BlockT> GossipEngine<B> {

// We grab the event stream before registering the notifications protocol, otherwise we
// might miss events.
let event_stream = network.event_stream();
let mut event_stream = network.event_stream();

network.register_notifications_protocol(engine_id);
state_machine.register_validator(&mut network, engine_id, validator);
Expand Down Expand Up @@ -89,8 +89,7 @@ impl<B: BlockT> GossipEngine<B> {
}

let res = executor.spawn(async move {
let mut stream = Compat01As03::new(event_stream);
while let Some(Ok(event)) = stream.next().await {
while let Some(event) = event_stream.next().await {
match event {
Event::NotificationStreamOpened { remote, engine_id: msg_engine_id, roles } => {
if msg_engine_id != engine_id {
Expand Down
7 changes: 4 additions & 3 deletions client/network-gossip/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use futures::prelude::*;
use sc_network::{specialization::NetworkSpecialization, Event, ExHashT, NetworkService, PeerId, ReputationChange};
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
use std::sync::Arc;
use std::pin::Pin;

mod bridge;
mod state_machine;
Expand All @@ -70,7 +71,7 @@ mod validator;
/// Abstraction over a network.
pub trait Network<B: BlockT> {
/// Returns a stream of events representing what happens on the network.
fn event_stream(&self) -> Box<dyn futures01::Stream<Item = Event, Error = ()> + Send>;
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = Event> + Send>>;

/// Adjust the reputation of a node.
fn report_peer(&self, peer_id: PeerId, reputation: ReputationChange);
Expand All @@ -97,8 +98,8 @@ pub trait Network<B: BlockT> {
}

impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Network<B> for Arc<NetworkService<B, S, H>> {
fn event_stream(&self) -> Box<dyn futures01::Stream<Item = Event, Error = ()> + Send> {
Box::new(NetworkService::event_stream(self).map(|v| Ok::<_, ()>(v)).compat())
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
Box::pin(NetworkService::event_stream(self))
}

fn report_peer(&self, peer_id: PeerId, reputation: ReputationChange) {
Expand Down
2 changes: 1 addition & 1 deletion client/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ parking_lot = "0.9.0"
lazy_static = "1.4.0"
log = "0.4.8"
slog = { version = "2.5.2", features = ["nested-values"] }
tokio-executor = "0.1.8"
tokio = { version = "0.2", features = ["rt-core"] }
futures-timer = "2"
exit-future = "0.2.0"
serde = "1.0.101"
Expand Down
22 changes: 7 additions & 15 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use parking_lot::Mutex;
use sc_client::Client;
use exit_future::Signal;
use futures::{
Future, FutureExt, Stream, StreamExt, TryFutureExt,
Future, FutureExt, Stream, StreamExt,
future::select, channel::mpsc,
compat::*,
sink::SinkExt,
Expand Down Expand Up @@ -108,6 +108,8 @@ pub struct Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> {
marker: PhantomData<TBl>,
}

impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Unpin for Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> {}

/// Alias for a an implementation of `futures::future::Executor`.
pub type TaskExecutor = Arc<dyn Spawn + Send + Sync>;

Expand Down Expand Up @@ -210,11 +212,11 @@ impl<TBl, TBackend, TExec, TRtApi, TSc, TNetSpec, TExPool, TOc> AbstractService
Service<TBl, Client<TBackend, TExec, TBl, TRtApi>, TSc, NetworkStatus<TBl>,
NetworkService<TBl, TNetSpec, TBl::Hash>, TExPool, TOc>
where
TBl: BlockT + Unpin,
TBl: BlockT,
TBackend: 'static + sc_client_api::backend::Backend<TBl>,
TExec: 'static + sc_client::CallExecutor<TBl> + Send + Sync + Clone,
TRtApi: 'static + Send + Sync,
TSc: sp_consensus::SelectChain<TBl> + 'static + Clone + Send + Unpin,
TSc: sp_consensus::SelectChain<TBl> + 'static + Clone + Send,
TExPool: 'static + TransactionPool<Block = TBl>
+ TransactionPoolMaintainer<Block = TBl>,
TOc: 'static + Send + Sync,
Expand Down Expand Up @@ -304,7 +306,7 @@ where
}
}

impl<TBl: Unpin, TCl, TSc: Unpin, TNetStatus, TNet, TTxPool, TOc> Future for
impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Future for
Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc>
{
type Output = Result<(), Error>;
Expand All @@ -322,17 +324,7 @@ impl<TBl: Unpin, TCl, TSc: Unpin, TNetStatus, TNet, TTxPool, TOc> Future for
}

while let Poll::Ready(Some(task_to_spawn)) = Pin::new(&mut this.to_spawn_rx).poll_next(cx) {
// TODO: Update to tokio 0.2 when libp2p get switched to std futures (#4383)
let executor = tokio_executor::DefaultExecutor::current();
use futures01::future::Executor;
if let Err(err) = executor.execute(task_to_spawn.unit_error().compat()) {
debug!(
target: "service",
"Failed to spawn background task: {:?}; falling back to manual polling",
err
);
this.to_poll.push(Box::pin(err.into_future().compat().map(drop)));
}
tokio::spawn(task_to_spawn);
expenses marked this conversation as resolved.
Show resolved Hide resolved
}

// Polling all the `to_poll` futures.
Expand Down