From 06c15ff760a5ad67d27afd494403317a1aced20b Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 18 Jan 2021 12:17:24 +0100 Subject: [PATCH 01/19] tendermint-proto: Update bytes to 1.0 and prost to 0.7 --- proto/Cargo.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/proto/Cargo.toml b/proto/Cargo.toml index 37d2b84f7..462a238d6 100644 --- a/proto/Cargo.toml +++ b/proto/Cargo.toml @@ -17,9 +17,9 @@ description = """ all-features = true [dependencies] -prost = { version = "0.6" } -prost-types = { version = "0.6" } -bytes = "0.5" +prost = "0.7" +prost-types = "0.7" +bytes = "1.0" anomaly = "0.2" thiserror = "1.0" serde = { version = "1.0", features = ["derive"] } From d27e7f3fa4ece0044c074ac6ffd6820ad7def6c9 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 18 Jan 2021 12:22:37 +0100 Subject: [PATCH 02/19] tendermint: Update bytes to 1.0 and prost to 0.7 --- tendermint/Cargo.toml | 6 +++--- tendermint/src/time.rs | 10 +++------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/tendermint/Cargo.toml b/tendermint/Cargo.toml index 22b5be5e6..7ff56c6da 100644 --- a/tendermint/Cargo.toml +++ b/tendermint/Cargo.toml @@ -35,15 +35,15 @@ crate-type = ["cdylib", "rlib"] [dependencies] anomaly = "0.2" async-trait = "0.1" -bytes = "0.5" +bytes = "1.0" chrono = { version = "0.4", features = ["serde"] } ed25519 = "1" ed25519-dalek = { version = "1", features = ["serde"] } futures = "0.3" num-traits = "0.2" once_cell = "1.3" -prost = "0.6" -prost-types = "0.6" +prost = "0.7" +prost-types = "0.7" serde = { version = "1", features = ["derive"] } serde_json = "1" serde_bytes = "0.11" diff --git a/tendermint/src/time.rs b/tendermint/src/time.rs index 48897a325..06ec79e38 100644 --- a/tendermint/src/time.rs +++ b/tendermint/src/time.rs @@ -5,7 +5,7 @@ use crate::error::{Error, Kind}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use std::convert::TryFrom; +use std::convert::{Infallible, TryFrom}; use std::fmt; use std::ops::{Add, Sub}; use std::str::FromStr; @@ -23,7 +23,7 @@ pub struct Time(DateTime); impl Protobuf for Time {} impl TryFrom for Time { - type Error = anomaly::BoxError; + type Error = Infallible; fn try_from(value: Timestamp) -> Result { // prost_types::Timestamp has a SystemTime converter but @@ -33,11 +33,7 @@ impl TryFrom for Time { nanos: value.nanos, }; - Ok(SystemTime::try_from(prost_value) - .map_err(|e| { - Kind::OutOfRange.context(format!("time before EPOCH by {} seconds", e.as_secs())) - })? - .into()) + Ok(SystemTime::from(prost_value).into()) } } From d8a622335338f7df8117ddb86cb4418401b169ee Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 18 Jan 2021 13:16:22 +0100 Subject: [PATCH 03/19] tendermint-rpc: Update tokio to 1.0, hyper to 0.14, async-tungstenite to 0.12 and bytes to 1.0 --- TODO | 1 + rpc/Cargo.toml | 10 +++---- rpc/src/client/sync.rs | 2 +- rpc/src/client/transport/http.rs | 11 ++++--- rpc/src/client/transport/router.rs | 8 ++--- rpc/src/client/transport/websocket.rs | 42 +++++++++++++-------------- 6 files changed, 39 insertions(+), 35 deletions(-) create mode 100644 TODO diff --git a/TODO b/TODO new file mode 100644 index 000000000..a0e427fa0 --- /dev/null +++ b/TODO @@ -0,0 +1 @@ +rpc: use rand crate instead of getrandom diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index c07e9be98..5ce75c9c1 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -39,16 +39,16 @@ websocket-client = [ "async-trait", "async-tungstenite", "futures", + "tokio/rt", "tokio/fs", "tokio/macros", - "tokio/stream", "tokio/sync", "tokio/time", "tracing" ] [dependencies] -bytes = "0.5" +bytes = "1.0" chrono = "0.4" getrandom = "0.1" serde = { version = "1", features = [ "derive" ] } @@ -62,10 +62,10 @@ subtle-encoding = { version = "0.5", features = ["bech32-preview"] } walkdir = "2.3" async-trait = { version = "0.1", optional = true } -async-tungstenite = { version = "0.9", features = ["tokio-runtime"], optional = true } +async-tungstenite = { version = "0.12", features = ["tokio-runtime"], optional = true } futures = { version = "0.3", optional = true } http = { version = "0.2", optional = true } -hyper = { version = "0.13", optional = true } -tokio = { version = "0.2", optional = true } +hyper = { version = "0.14", optional = true, features = ["client", "http1", "tcp"] } +tokio = { version = "1.0", optional = true } tracing = { version = "0.1", optional = true } pin-project = "1.0.1" diff --git a/rpc/src/client/sync.rs b/rpc/src/client/sync.rs index 275a9b8b8..47900a32d 100644 --- a/rpc/src/client/sync.rs +++ b/rpc/src/client/sync.rs @@ -55,6 +55,6 @@ impl Stream for ChannelRx { type Item = T; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().0.poll_next(cx) + self.project().0.poll_recv(cx) } } diff --git a/rpc/src/client/transport/http.rs b/rpc/src/client/transport/http.rs index cfb52ae71..6007f04a2 100644 --- a/rpc/src/client/transport/http.rs +++ b/rpc/src/client/transport/http.rs @@ -1,12 +1,14 @@ //! HTTP-based transport for Tendermint RPC Client. -use crate::client::transport::utils::get_tcp_host_port; -use crate::{Client, Response, Result, SimpleRequest}; use async_trait::async_trait; -use bytes::buf::ext::BufExt; +use hyper::body::Buf; use hyper::header; + use tendermint::net; +use crate::client::transport::utils::get_tcp_host_port; +use crate::{Client, Response, Result, SimpleRequest}; + /// A JSON-RPC/HTTP Tendermint RPC client (implements [`Client`]). /// /// Does not provide [`Event`] subscription facilities (see [`WebSocketClient`] @@ -62,7 +64,8 @@ impl Client for HttpClient { .unwrap(), ); } - let http_client = hyper::Client::builder().build_http(); + + let http_client = hyper::Client::new(); let response = http_client.request(request).await?; let response_body = hyper::body::aggregate(response.into_body()).await?; R::Response::from_reader(response_body.reader()) diff --git a/rpc/src/client/transport/router.rs b/rpc/src/client/transport/router.rs index f13a029cb..93fe4d88f 100644 --- a/rpc/src/client/transport/router.rs +++ b/rpc/src/client/transport/router.rs @@ -125,9 +125,9 @@ mod test { } async fn must_recv(ch: &mut ChannelRx, timeout_ms: u64) -> T { - let mut delay = time::delay_for(Duration::from_millis(timeout_ms)); + let delay = time::sleep(Duration::from_millis(timeout_ms)); tokio::select! { - _ = &mut delay, if !delay.is_elapsed() => panic!("timed out waiting for recv"), + _ = delay, if !delay.is_elapsed() => panic!("timed out waiting for recv"), Some(v) = ch.recv() => v, } } @@ -136,9 +136,9 @@ mod test { where T: std::fmt::Debug, { - let mut delay = time::delay_for(Duration::from_millis(timeout_ms)); + let delay = time::sleep(Duration::from_millis(timeout_ms)); tokio::select! { - _ = &mut delay, if !delay.is_elapsed() => (), + _ = delay, if !delay.is_elapsed() => (), Some(v) = ch.recv() => panic!("got unexpected result from channel: {:?}", v), } } diff --git a/rpc/src/client/transport/websocket.rs b/rpc/src/client/transport/websocket.rs index e8e97e469..cf4e747b5 100644 --- a/rpc/src/client/transport/websocket.rs +++ b/rpc/src/client/transport/websocket.rs @@ -301,21 +301,27 @@ impl WebSocketClientDriver { pub async fn run(mut self) -> Result<()> { let mut ping_interval = tokio::time::interval_at(Instant::now().add(PING_INTERVAL), PING_INTERVAL); - let mut recv_timeout = tokio::time::delay_for(PING_INTERVAL); + loop { + let next_msg = tokio::time::timeout(RECV_TIMEOUT, self.stream.next()); + tokio::select! { - Some(res) = self.stream.next() => match res { - Ok(msg) => { - // Reset the receive timeout every time we successfully - // receive a message from the remote endpoint. - recv_timeout.reset(Instant::now().add(PING_INTERVAL)); + res = next_msg => match res { + Ok(Some(Ok(msg))) => { self.handle_incoming_msg(msg).await? }, - Err(e) => return Err( - Error::websocket_error( - format!("failed to read from WebSocket connection: {}", e), - ), - ), + Ok(Some(Err(e))) => return Err(Error::websocket_error( + format!("failed to read from WebSocket connection: {}", e), + )), + Ok(None) => { + // Websocket stream is over, let's continue in case + // we still receive commands . + continue; + }, + Err(_) => return Err(Error::websocket_error(format!( + "reading from WebSocket connection timed out after {} seconds", + RECV_TIMEOUT.as_secs() + ))), }, Some(cmd) = self.cmd_rx.recv() => match cmd { DriverCommand::Subscribe(subs_cmd) => self.subscribe(subs_cmd).await?, @@ -323,13 +329,7 @@ impl WebSocketClientDriver { DriverCommand::SimpleRequest(req_cmd) => self.simple_request(req_cmd).await?, DriverCommand::Terminate => return self.close().await, }, - _ = ping_interval.next() => self.ping().await?, - _ = &mut recv_timeout => { - return Err(Error::websocket_error(format!( - "reading from WebSocket connection timed out after {} seconds", - RECV_TIMEOUT.as_secs() - ))); - } + _ = ping_interval.tick() => self.ping().await?, } } } @@ -586,7 +586,7 @@ mod test { loop { tokio::select! { Some(ev) = self.event_rx.recv() => self.publish_event(ev), - Some(res) = self.listener.next() => self.handle_incoming(res.unwrap()).await, + Ok((stream, _)) = self.listener.accept() => self.handle_incoming(stream).await, Some(res) = self.terminate_rx.recv() => { self.terminate().await; return res; @@ -678,8 +678,8 @@ mod test { async fn run(mut self) -> Result<()> { loop { tokio::select! { - Some(res) = self.conn.next() => { - if let Some(ret) = self.handle_incoming_msg(res.unwrap()).await { + Some(Ok(msg)) = self.conn.next() => { + if let Some(ret) = self.handle_incoming_msg(msg).await { return ret; } } From 1ba6b31f25ff1d78aef0fc2d1b4b45035532c562 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 18 Jan 2021 13:21:20 +0100 Subject: [PATCH 04/19] tendermint-p2p: Update prost to 0.7 --- p2p/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 93998d28b..b5f8fae88 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -21,7 +21,7 @@ ed25519-dalek = "1" eyre = "0.6" hkdf = "0.10.0" merlin = "2" -prost = "0.6" +prost = "0.7" rand_core = { version = "0.5", features = ["std"] } sha2 = "0.9" subtle = "2" From 8048a91795b578074bdde89021752f4f7fcfb6e3 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 18 Jan 2021 13:51:42 +0100 Subject: [PATCH 05/19] tendermint-light-client: Update tokio to 1.0 --- light-client/Cargo.toml | 2 +- light-client/src/utils/block_on.rs | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/light-client/Cargo.toml b/light-client/Cargo.toml index abb316415..17dbec708 100644 --- a/light-client/Cargo.toml +++ b/light-client/Cargo.toml @@ -49,7 +49,7 @@ serde_derive = "1.0.106" sled = { version = "0.34.3", optional = true } static_assertions = "1.1.0" thiserror = "1.0.15" -tokio = { version = "0.2", optional = true } +tokio = { version = "1.0", features = ["rt"], optional = true } [dev-dependencies] tendermint-testgen = { path = "../testgen"} diff --git a/light-client/src/utils/block_on.rs b/light-client/src/utils/block_on.rs index 50003d970..17a3dc967 100644 --- a/light-client/src/utils/block_on.rs +++ b/light-client/src/utils/block_on.rs @@ -11,15 +11,14 @@ where F::Output: Send, { std::thread::spawn(move || { - let mut rt = tokio::runtime::Builder::new() - .basic_scheduler() + let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .map_err(|_| IoError::Runtime)?; if let Some(timeout) = timeout { - let task = async { tokio::time::timeout(timeout, f).await }; - rt.block_on(task).map_err(|_| IoError::Timeout(timeout)) + rt.block_on(tokio::time::timeout(timeout, f)) + .map_err(|_| IoError::Timeout(timeout)) } else { Ok(rt.block_on(f)) } From d019f10ede1ae7a0dc4c9fa9d395d2b55d3e9118 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 18 Jan 2021 14:03:09 +0100 Subject: [PATCH 06/19] tendermint-light-node: Update tokio to 1.0 --- light-node/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/light-node/Cargo.toml b/light-node/Cargo.toml index e232e063f..1d9dfc979 100644 --- a/light-node/Cargo.toml +++ b/light-node/Cargo.toml @@ -40,7 +40,6 @@ tendermint = { version = "0.17.1", path = "../tendermint" } tendermint-light-client = { version = "0.17.1", path = "../light-client", features = ["lightstore-sled"] } tendermint-rpc = { version = "0.17.1", path = "../rpc", features = ["http-client"] } thiserror = "1.0" -tokio = { version = "0.2", features = ["full"] } [dependencies.abscissa_core] version = "0.5.0" @@ -53,3 +52,4 @@ abscissa_core = { version = "0.5.0", features = ["testing"] } futures = { version = "0.3", features = [ "compat" ] } once_cell = "1.2" pretty_assertions = "0.6" +tokio = { version = "1.0", features = ["rt", "macros"] } From 60bf1d549c95b1550f92d46f6e661a1b194a5c10 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 18 Jan 2021 14:30:54 +0100 Subject: [PATCH 07/19] Update changelog --- CHANGELOG.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f3dc2415..61a7a144b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,20 @@ ## Unreleased +## IMPROVEMENTS: + +* `[all]` Update all crates to use the latest version of the following dependencies: ([#764]) + - `tokio` (`1.0`) + - `hyper` (`0.14`) + - `prost` (`0.7`) + - `bytes` (`1.0`) + - `async-tungstenite` (`0.12`) + ### BUG FIXES * `[light-client]` The `sled`-backed lightstore is now feature-guarded under the `lightstore-sled` feature, which is enabled by default for now. ([#772]) +[#764]: https://github.com/informalsystems/tendermint-rs/issues/764 [#772]: https://github.com/informalsystems/tendermint-rs/pull/772 ## v0.17.1 From e933fad981420c6f4478adb11f769f879f7074d5 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 18 Jan 2021 14:42:07 +0100 Subject: [PATCH 08/19] Fix comment --- rpc/src/client/transport/websocket.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rpc/src/client/transport/websocket.rs b/rpc/src/client/transport/websocket.rs index cf4e747b5..21e33baf5 100644 --- a/rpc/src/client/transport/websocket.rs +++ b/rpc/src/client/transport/websocket.rs @@ -315,7 +315,7 @@ impl WebSocketClientDriver { )), Ok(None) => { // Websocket stream is over, let's continue in case - // we still receive commands . + // we still receive commands via the `cmd_rx` channel. continue; }, Err(_) => return Err(Error::websocket_error(format!( @@ -859,4 +859,4 @@ mod test { ); } } -} +} \ No newline at end of file From 84b4c3e0a78028c475fa847bcde66b04f06b9dc8 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 18 Jan 2021 14:46:01 +0100 Subject: [PATCH 09/19] Formatting --- rpc/src/client/transport/websocket.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/src/client/transport/websocket.rs b/rpc/src/client/transport/websocket.rs index 21e33baf5..7db57a9f8 100644 --- a/rpc/src/client/transport/websocket.rs +++ b/rpc/src/client/transport/websocket.rs @@ -859,4 +859,4 @@ mod test { ); } } -} \ No newline at end of file +} From c02c0dba0c8a3b9db4b580258cb329f35c8c43d6 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 18 Jan 2021 14:48:14 +0100 Subject: [PATCH 10/19] Enable multi-threaded Tokio runtime --- rpc/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 5ce75c9c1..25c2451c5 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -39,7 +39,7 @@ websocket-client = [ "async-trait", "async-tungstenite", "futures", - "tokio/rt", + "tokio/rt-multi-thread", "tokio/fs", "tokio/macros", "tokio/sync", From 59f5f76b66055432627aa853d8b13b712b06ac02 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 18 Jan 2021 14:54:27 +0100 Subject: [PATCH 11/19] tools(proto-compiler): Update prost-build to 0.7 --- tools/proto-compiler/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/proto-compiler/Cargo.toml b/tools/proto-compiler/Cargo.toml index ff5e6b608..1c00dab7d 100644 --- a/tools/proto-compiler/Cargo.toml +++ b/tools/proto-compiler/Cargo.toml @@ -7,7 +7,7 @@ publish = false [dependencies] walkdir = { version = "2.3" } -prost-build = { version = "0.6" } +prost-build = { version = "0.7" } git2 = { version = "0.13" } tempdir = { version = "0.3" } subtle-encoding = { version = "0.5" } From 2f66ad96b199084cdca88845c25883b15755944d Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 18 Jan 2021 15:00:08 +0100 Subject: [PATCH 12/19] tools(rpc-probe): Update tokio to 1.0 and async-tungstenite to 0.12 --- tools/rpc-probe/Cargo.toml | 4 ++-- tools/rpc-probe/src/client.rs | 2 +- tools/rpc-probe/src/error.rs | 4 ++-- tools/rpc-probe/src/plan.rs | 6 ++++-- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/tools/rpc-probe/Cargo.toml b/tools/rpc-probe/Cargo.toml index b687db3bc..f6e31e351 100644 --- a/tools/rpc-probe/Cargo.toml +++ b/tools/rpc-probe/Cargo.toml @@ -17,7 +17,7 @@ description = """ all-features = true [dependencies] -async-tungstenite = { version = "0.9", features = [ "tokio-runtime" ] } +async-tungstenite = { version = "0.12", features = [ "tokio-runtime" ] } futures = "0.3" getrandom = "0.1" log = "0.4" @@ -27,5 +27,5 @@ simple_logger = "1.11" structopt = "0.3" subtle-encoding = "0.5.1" thiserror = "1.0" -tokio = { version = "0.2", features = [ "full" ] } +tokio = { version = "1.0", features = [ "full" ] } uuid = "0.8" diff --git a/tools/rpc-probe/src/client.rs b/tools/rpc-probe/src/client.rs index 23c41b695..96cb2282e 100644 --- a/tools/rpc-probe/src/client.rs +++ b/tools/rpc-probe/src/client.rs @@ -68,7 +68,7 @@ impl Client { /// the given target height. pub async fn wait_for_height(&mut self, h: u64) -> Result<()> { let (mut subs, _) = self.subscribe(&uuid_v4(), "tm.event = 'NewBlock'").await?; - while let Some(result) = subs.next().await { + while let Some(result) = subs.recv().await { let resp = result?; // TODO(thane): Find a more readable way of getting this value. let height = resp diff --git a/tools/rpc-probe/src/error.rs b/tools/rpc-probe/src/error.rs index ac16b7317..a1124f304 100644 --- a/tools/rpc-probe/src/error.rs +++ b/tools/rpc-probe/src/error.rs @@ -40,8 +40,8 @@ impl From for Error { } } -impl From for Error { - fn from(e: tokio::time::Elapsed) -> Self { +impl From for Error { + fn from(e: tokio::time::error::Elapsed) -> Self { Self::Timeout(e.to_string()) } } diff --git a/tools/rpc-probe/src/plan.rs b/tools/rpc-probe/src/plan.rs index 5a028ac28..74e7ceaa1 100644 --- a/tools/rpc-probe/src/plan.rs +++ b/tools/rpc-probe/src/plan.rs @@ -319,7 +319,7 @@ async fn execute_interaction( info!("Executing interaction \"{}\"", inner_interaction.name); if let Some(wait) = inner_interaction.pre_wait { debug!("Sleeping for {} seconds", wait.as_secs_f64()); - tokio::time::delay_for(wait).await; + tokio::time::sleep(wait).await; } if let Some(h) = inner_interaction.min_height { debug!("Waiting for height {}", h); @@ -419,7 +419,9 @@ async fn execute_subscription( }; write_json(&config.in_path, name, &response_json).await?; - let mut timeout = tokio::time::delay_for(subs.max_time); + let timeout = tokio::time::sleep(subs.max_time); + tokio::pin!(timeout); + let mut event_count = 0_usize; loop { tokio::select! { From 0c3ea29fe0d392e9a64515e9c20923d273437e4c Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 18 Jan 2021 15:00:28 +0100 Subject: [PATCH 13/19] tools(kvstore-test): Update tokio to 1.0 --- tools/kvstore-test/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/kvstore-test/Cargo.toml b/tools/kvstore-test/Cargo.toml index 9d706c23d..f2d12e1c7 100644 --- a/tools/kvstore-test/Cargo.toml +++ b/tools/kvstore-test/Cargo.toml @@ -13,5 +13,5 @@ futures = "0.3" tendermint = { version = "0.17.1", path = "../../tendermint" } tendermint-light-client = { version = "0.17.1", path = "../../light-client" } tendermint-rpc = { version = "0.17.1", path = "../../rpc", features = [ "http-client", "websocket-client" ] } -tokio = { version = "0.2", features = [ "macros" ] } +tokio = { version = "1.0", features = [ "rt-multi-thread", "macros" ] } contracts = "0.4.0" From 44ea51b113bf7cf70effb4531bb71c93226a60c8 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 18 Jan 2021 15:07:19 +0100 Subject: [PATCH 14/19] tools(kvstore-test): Fix tests --- tools/kvstore-test/tests/tendermint.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/tools/kvstore-test/tests/tendermint.rs b/tools/kvstore-test/tests/tendermint.rs index 3fa768f6e..a4f72455d 100644 --- a/tools/kvstore-test/tests/tendermint.rs +++ b/tools/kvstore-test/tests/tendermint.rs @@ -251,7 +251,9 @@ mod rpc { let mut cur_tx_id = 0_u32; while !expected_tx_values.is_empty() { - let mut delay = tokio::time::delay_for(Duration::from_secs(5)); + let delay = tokio::time::sleep(Duration::from_secs(5)); + tokio::pin!(delay); + tokio::select! { Some(res) = subs.next() => { let ev = res.unwrap(); @@ -309,7 +311,7 @@ mod rpc { .broadcast_tx_async(Transaction::from(tx.into_bytes())) .await .unwrap(); - tokio::time::delay_for(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; } }); @@ -322,7 +324,9 @@ mod rpc { ); while expected_new_blocks > 0 && !expected_tx_values.is_empty() { - let mut timeout = tokio::time::delay_for(Duration::from_secs(5)); + let timeout = tokio::time::sleep(Duration::from_secs(5)); + tokio::pin!(timeout); + tokio::select! { Some(res) = combined_subs.next() => { let ev: Event = res.unwrap(); @@ -398,7 +402,10 @@ mod rpc { ) -> Result { let mut subs = websocket_client.subscribe(EventType::Tx.into()).await?; let _ = http_client.broadcast_tx_async(tx.clone()).await?; - let mut timeout = tokio::time::delay_for(Duration::from_secs(3)); + + let timeout = tokio::time::sleep(Duration::from_secs(3)); + tokio::pin!(timeout); + tokio::select! { Some(res) = subs.next() => { let ev = res?; From ba9e998f26e4b67cb33a87b00aa2306738ca1508 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 18 Jan 2021 15:21:05 +0100 Subject: [PATCH 15/19] Need to use an async block to avoid `no timer running` error --- light-client/src/utils/block_on.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/light-client/src/utils/block_on.rs b/light-client/src/utils/block_on.rs index 17a3dc967..77a7f7fc9 100644 --- a/light-client/src/utils/block_on.rs +++ b/light-client/src/utils/block_on.rs @@ -17,8 +17,8 @@ where .map_err(|_| IoError::Runtime)?; if let Some(timeout) = timeout { - rt.block_on(tokio::time::timeout(timeout, f)) - .map_err(|_| IoError::Timeout(timeout)) + let task = async { tokio::time::timeout(timeout, f).await }; + rt.block_on(task).map_err(|_| IoError::Timeout(timeout)) } else { Ok(rt.block_on(f)) } From 348c246685b5e9969ae31dca24e8715353334324 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 18 Jan 2021 15:32:39 +0100 Subject: [PATCH 16/19] Introduce local enum to improve readability of websocket client loop --- rpc/src/client/transport/websocket.rs | 30 ++++++++++++++++++++------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/rpc/src/client/transport/websocket.rs b/rpc/src/client/transport/websocket.rs index 7db57a9f8..b343bdba0 100644 --- a/rpc/src/client/transport/websocket.rs +++ b/rpc/src/client/transport/websocket.rs @@ -17,9 +17,10 @@ use async_trait::async_trait; use async_tungstenite::tokio::{connect_async, TokioAdapter}; use async_tungstenite::tungstenite::protocol::frame::coding::CloseCode; use async_tungstenite::tungstenite::protocol::CloseFrame; +use async_tungstenite::tungstenite::Error as TungsteniteError; use async_tungstenite::tungstenite::Message; use async_tungstenite::WebSocketStream; -use futures::{SinkExt, StreamExt}; +use futures::{FutureExt, SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::collections::HashMap; @@ -302,26 +303,39 @@ impl WebSocketClientDriver { let mut ping_interval = tokio::time::interval_at(Instant::now().add(PING_INTERVAL), PING_INTERVAL); + enum NextMsg { + Ok(Message), + StreamEnded, + Timeout, + WebSocketError(TungsteniteError), + } + loop { - let next_msg = tokio::time::timeout(RECV_TIMEOUT, self.stream.next()); + let next_msg = + tokio::time::timeout(RECV_TIMEOUT, self.stream.next()).map(|res| match res { + Ok(Some(Ok(msg))) => NextMsg::Ok(msg), + Ok(Some(Err(e))) => NextMsg::WebSocketError(e), + Ok(None) => NextMsg::StreamEnded, + Err(_) => NextMsg::Timeout, + }); tokio::select! { res = next_msg => match res { - Ok(Some(Ok(msg))) => { + NextMsg::Ok(msg) => { self.handle_incoming_msg(msg).await? }, - Ok(Some(Err(e))) => return Err(Error::websocket_error( - format!("failed to read from WebSocket connection: {}", e), - )), - Ok(None) => { + NextMsg::StreamEnded => { // Websocket stream is over, let's continue in case // we still receive commands via the `cmd_rx` channel. continue; }, - Err(_) => return Err(Error::websocket_error(format!( + NextMsg::Timeout => return Err(Error::websocket_error(format!( "reading from WebSocket connection timed out after {} seconds", RECV_TIMEOUT.as_secs() ))), + NextMsg::WebSocketError(e) => return Err(Error::websocket_error( + format!("failed to read from WebSocket connection: {}", e), + )), }, Some(cmd) = self.cmd_rx.recv() => match cmd { DriverCommand::Subscribe(subs_cmd) => self.subscribe(subs_cmd).await?, From 477881026205da4d8911c5e59a9bccd9e5e55f15 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 18 Jan 2021 19:03:52 +0100 Subject: [PATCH 17/19] Restore original behavior of receive timeout in client loop --- rpc/src/client/transport/websocket.rs | 49 +++++++++++---------------- 1 file changed, 19 insertions(+), 30 deletions(-) diff --git a/rpc/src/client/transport/websocket.rs b/rpc/src/client/transport/websocket.rs index b343bdba0..f6533221a 100644 --- a/rpc/src/client/transport/websocket.rs +++ b/rpc/src/client/transport/websocket.rs @@ -17,10 +17,9 @@ use async_trait::async_trait; use async_tungstenite::tokio::{connect_async, TokioAdapter}; use async_tungstenite::tungstenite::protocol::frame::coding::CloseCode; use async_tungstenite::tungstenite::protocol::CloseFrame; -use async_tungstenite::tungstenite::Error as TungsteniteError; use async_tungstenite::tungstenite::Message; use async_tungstenite::WebSocketStream; -use futures::{FutureExt, SinkExt, StreamExt}; +use futures::{SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::collections::HashMap; @@ -303,39 +302,23 @@ impl WebSocketClientDriver { let mut ping_interval = tokio::time::interval_at(Instant::now().add(PING_INTERVAL), PING_INTERVAL); - enum NextMsg { - Ok(Message), - StreamEnded, - Timeout, - WebSocketError(TungsteniteError), - } + let recv_timeout = tokio::time::sleep(RECV_TIMEOUT); + tokio::pin!(recv_timeout); loop { - let next_msg = - tokio::time::timeout(RECV_TIMEOUT, self.stream.next()).map(|res| match res { - Ok(Some(Ok(msg))) => NextMsg::Ok(msg), - Ok(Some(Err(e))) => NextMsg::WebSocketError(e), - Ok(None) => NextMsg::StreamEnded, - Err(_) => NextMsg::Timeout, - }); - tokio::select! { - res = next_msg => match res { - NextMsg::Ok(msg) => { + Some(res) = self.stream.next() => match res { + Ok(msg) => { + // Reset the receive timeout every time we successfully + // receive a message from the remote endpoint. + recv_timeout.as_mut().reset(Instant::now().add(RECV_TIMEOUT)); self.handle_incoming_msg(msg).await? }, - NextMsg::StreamEnded => { - // Websocket stream is over, let's continue in case - // we still receive commands via the `cmd_rx` channel. - continue; - }, - NextMsg::Timeout => return Err(Error::websocket_error(format!( - "reading from WebSocket connection timed out after {} seconds", - RECV_TIMEOUT.as_secs() - ))), - NextMsg::WebSocketError(e) => return Err(Error::websocket_error( - format!("failed to read from WebSocket connection: {}", e), - )), + Err(e) => return Err( + Error::websocket_error( + format!("failed to read from WebSocket connection: {}", e), + ), + ), }, Some(cmd) = self.cmd_rx.recv() => match cmd { DriverCommand::Subscribe(subs_cmd) => self.subscribe(subs_cmd).await?, @@ -344,6 +327,12 @@ impl WebSocketClientDriver { DriverCommand::Terminate => return self.close().await, }, _ = ping_interval.tick() => self.ping().await?, + _ = &mut recv_timeout => { + return Err(Error::websocket_error(format!( + "reading from WebSocket connection timed out after {} seconds", + RECV_TIMEOUT.as_secs() + ))); + } } } } From f8a37a67034122dd8e11e522716b10b33c219b34 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 18 Jan 2021 19:10:06 +0100 Subject: [PATCH 18/19] Restore panicking behavior in test code --- rpc/src/client/transport/websocket.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/rpc/src/client/transport/websocket.rs b/rpc/src/client/transport/websocket.rs index f6533221a..da4a6549a 100644 --- a/rpc/src/client/transport/websocket.rs +++ b/rpc/src/client/transport/websocket.rs @@ -589,7 +589,10 @@ mod test { loop { tokio::select! { Some(ev) = self.event_rx.recv() => self.publish_event(ev), - Ok((stream, _)) = self.listener.accept() => self.handle_incoming(stream).await, + res = self.listener.accept() => { + let (stream, _) = res.unwrap(); + self.handle_incoming(stream).await + } Some(res) = self.terminate_rx.recv() => { self.terminate().await; return res; From f84e6aa7bb5fdd79e2b5a3caf65088c033150d17 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 18 Jan 2021 19:11:23 +0100 Subject: [PATCH 19/19] Restore panicking behavior in test code, part two --- rpc/src/client/transport/websocket.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rpc/src/client/transport/websocket.rs b/rpc/src/client/transport/websocket.rs index da4a6549a..c95d6e5d2 100644 --- a/rpc/src/client/transport/websocket.rs +++ b/rpc/src/client/transport/websocket.rs @@ -684,8 +684,8 @@ mod test { async fn run(mut self) -> Result<()> { loop { tokio::select! { - Some(Ok(msg)) = self.conn.next() => { - if let Some(ret) = self.handle_incoming_msg(msg).await { + Some(msg) = self.conn.next() => { + if let Some(ret) = self.handle_incoming_msg(msg.unwrap()).await { return ret; } }