Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to Tokio 1.0 #628

Merged
merged 8 commits into from
Jul 7, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -14,3 +14,6 @@ members = [
"test",
"ws",
]

[patch.crates-io]
parity-tokio-ipc = { git = "https://github.com/Xanewok/parity-tokio-ipc", branch = "mio-07" }
6 changes: 3 additions & 3 deletions core-client/transports/Cargo.toml
Original file line number Diff line number Diff line change
@@ -44,11 +44,11 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
url = "1.7"

hyper = { version = "0.13", optional = true }
hyper-tls = { version = "0.4", optional = true }
hyper = { version = "0.14", features = ["client", "http1", "server"], optional = true }
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
hyper-tls = { version = "0.5", optional = true }
jsonrpc-server-utils = { version = "17.1", path = "../../server-utils", optional = true }
parity-tokio-ipc = { version = "0.8", optional = true }
tokio = { version = "0.2", optional = true }
tokio = { version = "1", optional = true }
websocket = { version = "0.24", optional = true }

[dev-dependencies]
2 changes: 1 addition & 1 deletion http/Cargo.toml
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ version = "17.1.0"

[dependencies]
futures = "0.3"
hyper = "0.13"
hyper = { version = "0.14", features = ["client", "server", "stream"] }
jsonrpc-core = { version = "17.1", path = "../core" }
jsonrpc-server-utils = { version = "17.1", path = "../server-utils" }
log = "0.4"
2 changes: 1 addition & 1 deletion http/src/lib.rs
Original file line number Diff line number Diff line change
@@ -566,7 +566,7 @@ fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
let raw_socket = ();

let server_builder =
hyper::Server::from_tcp(listener).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
hyper::server::Server::from_tcp(listener).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
// Add current host to allowed headers.
// NOTE: we need to use `l.local_addr()` instead of `addr`
// it might be different!
2 changes: 1 addition & 1 deletion ipc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ env_logger = "0.7"
lazy_static = "1.0"

[target.'cfg(not(windows))'.dev-dependencies]
tokio = { version = "0.2", default-features = false, features = ["uds", "time", "rt-threaded", "io-driver"] }
tokio = { version = "1", default-features = false, features = ["net", "time", "rt-multi-thread"] }

[badges]
travis-ci = { repository = "paritytech/jsonrpc", branch = "master"}
7 changes: 4 additions & 3 deletions server-utils/Cargo.toml
Original file line number Diff line number Diff line change
@@ -11,14 +11,15 @@ repository = "https://github.com/paritytech/jsonrpc"
version = "17.1.0"

[dependencies]
bytes = "0.5"
bytes = "1.0"
futures = "0.3"
globset = "0.4"
jsonrpc-core = { version = "17.1", path = "../core" }
lazy_static = "1.1.0"
log = "0.4"
tokio = { version = "0.2", features = ["rt-threaded", "io-driver", "io-util", "time", "tcp"] }
tokio-util = { version = "0.3", features = ["codec"] }
tokio = { version = "1", features = ["rt-multi-thread", "io-util", "time", "net"] }
tokio-util = { version = "0.6", features = ["codec"] }
tokio-stream = { version = "0.1", features = ["net"] }

unicase = "2.0"

1 change: 1 addition & 0 deletions server-utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ extern crate log;
extern crate lazy_static;

pub use tokio;
pub use tokio_stream;
pub use tokio_util;

pub mod cors;
5 changes: 2 additions & 3 deletions server-utils/src/reactor.rs
Original file line number Diff line number Diff line change
@@ -96,9 +96,8 @@ impl RpcEventLoop {
pub fn with_name(name: Option<String>) -> io::Result<Self> {
let (stop, stopped) = futures::channel::oneshot::channel();

let mut tb = runtime::Builder::new();
tb.core_threads(1);
tb.threaded_scheduler();
let mut tb = runtime::Builder::new_multi_thread();
tb.worker_threads(1);
tb.enable_all();

if let Some(name) = name {
15 changes: 8 additions & 7 deletions server-utils/src/suspendable_stream.rs
Original file line number Diff line number Diff line change
@@ -2,9 +2,7 @@ use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::Poll;
use std::time::Duration;

use tokio::time::Delay;
use std::time::{Duration, Instant};

/// `Incoming` is a stream of incoming sockets
/// Polling the stream may return a temporary io::Error (for instance if we can't open the connection because of "too many open files" limit)
@@ -19,7 +17,7 @@ pub struct SuspendableStream<S> {
next_delay: Duration,
initial_delay: Duration,
max_delay: Duration,
timeout: Option<Delay>,
timeout: Option<Instant>,
Xanewok marked this conversation as resolved.
Show resolved Hide resolved
}

impl<S> SuspendableStream<S> {
@@ -44,8 +42,11 @@ where

fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
loop {
if let Some(timeout) = self.timeout.as_mut() {
match Pin::new(timeout).poll(cx) {
if let Some(timeout) = &self.timeout {
let timeout = tokio::time::Instant::from_std(*timeout);
let sleep = tokio::time::sleep_until(timeout);
futures::pin_mut!(sleep);
match sleep.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(()) => {}
}
@@ -78,7 +79,7 @@ where
};
debug!("Error accepting connection: {}", err);
debug!("The server will stop accepting connections for {:?}", self.next_delay);
self.timeout = Some(tokio::time::delay_for(self.next_delay));
self.timeout = Some(Instant::now() + self.next_delay);
}
}
}
6 changes: 3 additions & 3 deletions stdio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -13,11 +13,11 @@ version = "17.1.0"
futures = "0.3"
jsonrpc-core = { version = "17.1", path = "../core" }
log = "0.4"
tokio = { version = "0.2", features = ["io-std", "io-driver", "io-util"] }
tokio-util = { version = "0.3", features = ["codec"] }
tokio = { version = "1", features = ["io-std", "io-util"] }
tokio-util = { version = "0.6", features = ["codec"] }

[dev-dependencies]
tokio = { version = "0.2", features = ["rt-core", "macros"] }
tokio = { version = "1", features = ["rt", "macros"] }
lazy_static = "1.0"
env_logger = "0.7"

2 changes: 2 additions & 0 deletions tcp/src/server.rs
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ use tower_service::Service as _;

use crate::futures::{self, future};
use crate::jsonrpc::{middleware, MetaIoHandler, Metadata, Middleware};
use crate::server_utils::tokio_stream::wrappers::TcpListenerStream;
use crate::server_utils::{codecs, reactor, tokio, tokio_util::codec::Framed, SuspendableStream};

use crate::dispatch::{Dispatcher, PeerMessageQueue, SenderChannels};
@@ -94,6 +95,7 @@ where
executor.executor().spawn(async move {
let start = async {
let listener = tokio::net::TcpListener::bind(&address).await?;
let listener = TcpListenerStream::new(listener);
let connections = SuspendableStream::new(listener);

let server = connections.map(|socket| {
14 changes: 7 additions & 7 deletions tcp/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::net::{Shutdown, SocketAddr};
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
@@ -23,7 +23,7 @@ fn casual_server() -> ServerBuilder {
}

fn run_future<O>(fut: impl std::future::Future<Output = O> + Send) -> O {
let mut rt = tokio::runtime::Runtime::new().unwrap();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(fut)
}

@@ -60,9 +60,9 @@ fn disconnect() {
let _server = server.start(&addr).expect("Server must run with no issues");

run_future(async move {
let stream = TcpStream::connect(&addr).await.unwrap();
let mut stream = TcpStream::connect(&addr).await.unwrap();
assert_eq!(stream.peer_addr().unwrap(), addr);
stream.shutdown(::std::net::Shutdown::Both).unwrap();
stream.shutdown().await.unwrap();
});

::std::thread::sleep(::std::time::Duration::from_millis(50));
@@ -76,7 +76,7 @@ fn dummy_request(addr: &SocketAddr, data: Vec<u8>) -> Vec<u8> {
let stream = async move {
let mut stream = TcpStream::connect(addr).await?;
stream.write_all(&data).await?;
stream.shutdown(Shutdown::Write)?;
stream.shutdown().await?;
let mut read_buf = vec![];
let _ = stream.read_to_end(&mut read_buf).await;

@@ -243,7 +243,7 @@ fn message() {

let client = async move {
let stream = TcpStream::connect(&addr);
let delay = tokio::time::delay_for(Duration::from_millis(500));
let delay = tokio::time::sleep(Duration::from_millis(500));
let (stream, _) = futures::join!(stream, delay);
let mut stream = stream?;

@@ -272,7 +272,7 @@ fn message() {
let data = b"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}\n";
stream.write_all(&data[..]).await?;

stream.shutdown(Shutdown::Write).unwrap();
stream.shutdown().await.unwrap();
let mut read_buf = vec![];
let _ = stream.read_to_end(&mut read_buf).await?;