Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to Tokio 1.0 #628

Merged
merged 8 commits into from
Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core-client/transports/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
url = "1.7"

hyper = { version = "0.13", optional = true }
hyper-tls = { version = "0.4", optional = true }
hyper = { version = "0.14", features = ["client", "http1"], optional = true }
hyper-tls = { version = "0.5", optional = true }
jsonrpc-server-utils = { version = "17.1", path = "../../server-utils", optional = true }
parity-tokio-ipc = { version = "0.8", optional = true }
tokio = { version = "0.2", optional = true }
parity-tokio-ipc = { version = "0.9", optional = true }
tokio = { version = "1", optional = true }
websocket = { version = "0.24", optional = true }

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ version = "17.1.0"

[dependencies]
futures = "0.3"
hyper = "0.13"
hyper = { version = "0.14", features = ["http1", "tcp", "server", "stream"] }
jsonrpc-core = { version = "17.1", path = "../core" }
jsonrpc-server-utils = { version = "17.1", path = "../server-utils" }
log = "0.4"
Expand Down
4 changes: 2 additions & 2 deletions ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ log = "0.4"
tower-service = "0.3"
jsonrpc-core = { version = "17.1", path = "../core" }
jsonrpc-server-utils = { version = "17.1", path = "../server-utils", default-features = false }
parity-tokio-ipc = "0.8"
parity-tokio-ipc = "0.9"
parking_lot = "0.11.0"

[dev-dependencies]
env_logger = "0.7"
lazy_static = "1.0"

[target.'cfg(not(windows))'.dev-dependencies]
tokio = { version = "0.2", default-features = false, features = ["uds", "time", "rt-threaded", "io-driver"] }
tokio = { version = "1", default-features = false, features = ["net", "time", "rt-multi-thread"] }

[badges]
travis-ci = { repository = "paritytech/jsonrpc", branch = "master"}
7 changes: 4 additions & 3 deletions ipc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ mod tests {
reply.expect("there should be one reply")
};

let mut rt = tokio::runtime::Runtime::new().unwrap();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(reply).expect("wait for reply")
}

Expand Down Expand Up @@ -609,9 +609,10 @@ mod tests {
tx.send(true).expect("failed to report that the server has stopped");
});

let mut rt = tokio::runtime::Runtime::new().unwrap();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let timeout = tokio::time::delay_for(Duration::from_millis(500));
let timeout = tokio::time::sleep(Duration::from_millis(500));
futures::pin_mut!(timeout);

match futures::future::select(rx, timeout).await {
futures::future::Either::Left((result, _)) => {
Expand Down
7 changes: 4 additions & 3 deletions server-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ repository = "https://github.com/paritytech/jsonrpc"
version = "17.1.0"

[dependencies]
bytes = "0.5"
bytes = "1.0"
futures = "0.3"
globset = "0.4"
jsonrpc-core = { version = "17.1", path = "../core" }
lazy_static = "1.1.0"
log = "0.4"
tokio = { version = "0.2", features = ["rt-threaded", "io-driver", "io-util", "time", "tcp"] }
tokio-util = { version = "0.3", features = ["codec"] }
tokio = { version = "1", features = ["rt-multi-thread", "io-util", "time", "net"] }
tokio-util = { version = "0.6", features = ["codec"] }
tokio-stream = { version = "0.1", features = ["net"] }

unicase = "2.0"

Expand Down
1 change: 1 addition & 0 deletions server-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ extern crate log;
extern crate lazy_static;

pub use tokio;
pub use tokio_stream;
pub use tokio_util;

pub mod cors;
Expand Down
5 changes: 2 additions & 3 deletions server-utils/src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,8 @@ impl RpcEventLoop {
pub fn with_name(name: Option<String>) -> io::Result<Self> {
let (stop, stopped) = futures::channel::oneshot::channel();

let mut tb = runtime::Builder::new();
tb.core_threads(1);
tb.threaded_scheduler();
let mut tb = runtime::Builder::new_multi_thread();
tb.worker_threads(1);
tb.enable_all();

if let Some(name) = name {
Expand Down
15 changes: 8 additions & 7 deletions server-utils/src/suspendable_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::Poll;
use std::time::Duration;

use tokio::time::Delay;
use std::time::{Duration, Instant};

/// `Incoming` is a stream of incoming sockets
/// Polling the stream may return a temporary io::Error (for instance if we can't open the connection because of "too many open files" limit)
Expand All @@ -19,7 +17,7 @@ pub struct SuspendableStream<S> {
next_delay: Duration,
initial_delay: Duration,
max_delay: Duration,
timeout: Option<Delay>,
timeout: Option<Instant>,
Xanewok marked this conversation as resolved.
Show resolved Hide resolved
}

impl<S> SuspendableStream<S> {
Expand All @@ -44,8 +42,11 @@ where

fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
loop {
if let Some(timeout) = self.timeout.as_mut() {
match Pin::new(timeout).poll(cx) {
if let Some(timeout) = &self.timeout {
let timeout = tokio::time::Instant::from_std(*timeout);
let sleep = tokio::time::sleep_until(timeout);
futures::pin_mut!(sleep);
match sleep.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(()) => {}
}
Expand Down Expand Up @@ -78,7 +79,7 @@ where
};
debug!("Error accepting connection: {}", err);
debug!("The server will stop accepting connections for {:?}", self.next_delay);
self.timeout = Some(tokio::time::delay_for(self.next_delay));
self.timeout = Some(Instant::now() + self.next_delay);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions stdio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ version = "17.1.0"
futures = "0.3"
jsonrpc-core = { version = "17.1", path = "../core" }
log = "0.4"
tokio = { version = "0.2", features = ["io-std", "io-driver", "io-util"] }
tokio-util = { version = "0.3", features = ["codec"] }
tokio = { version = "1", features = ["io-std", "io-util"] }
tokio-util = { version = "0.6", features = ["codec"] }

[dev-dependencies]
tokio = { version = "0.2", features = ["rt-core", "macros"] }
tokio = { version = "1", features = ["rt", "macros"] }
lazy_static = "1.0"
env_logger = "0.7"

Expand Down
2 changes: 2 additions & 0 deletions tcp/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use tower_service::Service as _;

use crate::futures::{self, future};
use crate::jsonrpc::{middleware, MetaIoHandler, Metadata, Middleware};
use crate::server_utils::tokio_stream::wrappers::TcpListenerStream;
use crate::server_utils::{codecs, reactor, tokio, tokio_util::codec::Framed, SuspendableStream};

use crate::dispatch::{Dispatcher, PeerMessageQueue, SenderChannels};
Expand Down Expand Up @@ -94,6 +95,7 @@ where
executor.executor().spawn(async move {
let start = async {
let listener = tokio::net::TcpListener::bind(&address).await?;
let listener = TcpListenerStream::new(listener);
let connections = SuspendableStream::new(listener);

let server = connections.map(|socket| {
Expand Down
14 changes: 7 additions & 7 deletions tcp/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::net::{Shutdown, SocketAddr};
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -23,7 +23,7 @@ fn casual_server() -> ServerBuilder {
}

fn run_future<O>(fut: impl std::future::Future<Output = O> + Send) -> O {
let mut rt = tokio::runtime::Runtime::new().unwrap();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(fut)
}

Expand Down Expand Up @@ -60,9 +60,9 @@ fn disconnect() {
let _server = server.start(&addr).expect("Server must run with no issues");

run_future(async move {
let stream = TcpStream::connect(&addr).await.unwrap();
let mut stream = TcpStream::connect(&addr).await.unwrap();
assert_eq!(stream.peer_addr().unwrap(), addr);
stream.shutdown(::std::net::Shutdown::Both).unwrap();
stream.shutdown().await.unwrap();
});

::std::thread::sleep(::std::time::Duration::from_millis(50));
Expand All @@ -76,7 +76,7 @@ fn dummy_request(addr: &SocketAddr, data: Vec<u8>) -> Vec<u8> {
let stream = async move {
let mut stream = TcpStream::connect(addr).await?;
stream.write_all(&data).await?;
stream.shutdown(Shutdown::Write)?;
stream.shutdown().await?;
let mut read_buf = vec![];
let _ = stream.read_to_end(&mut read_buf).await;

Expand Down Expand Up @@ -243,7 +243,7 @@ fn message() {

let client = async move {
let stream = TcpStream::connect(&addr);
let delay = tokio::time::delay_for(Duration::from_millis(500));
let delay = tokio::time::sleep(Duration::from_millis(500));
let (stream, _) = futures::join!(stream, delay);
let mut stream = stream?;

Expand Down Expand Up @@ -272,7 +272,7 @@ fn message() {
let data = b"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}\n";
stream.write_all(&data[..]).await?;

stream.shutdown(Shutdown::Write).unwrap();
stream.shutdown().await.unwrap();
let mut read_buf = vec![];
let _ = stream.read_to_end(&mut read_buf).await?;

Expand Down