Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Tokio to 1.0, Hyper to 0.14, Prost to 0.7 and Bytes to 1.0 #783

Merged
merged 19 commits into from
Jan 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
## Unreleased

## IMPROVEMENTS:

* `[all]` Update all crates to use the latest version of the following dependencies: ([#764])
- `tokio` (`1.0`)
- `hyper` (`0.14`)
- `prost` (`0.7`)
- `bytes` (`1.0`)
- `async-tungstenite` (`0.12`)

### BUG FIXES

* `[light-client]` The `sled`-backed lightstore is now feature-guarded under
the `lightstore-sled` feature, which is enabled by default for now. ([#772])

[#764]: https://github.com/informalsystems/tendermint-rs/issues/764
[#772]: https://github.com/informalsystems/tendermint-rs/pull/772

## v0.17.1
Expand Down
1 change: 1 addition & 0 deletions TODO
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
rpc: use rand crate instead of getrandom
2 changes: 1 addition & 1 deletion light-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ serde_derive = "1.0.106"
sled = { version = "0.34.3", optional = true }
static_assertions = "1.1.0"
thiserror = "1.0.15"
tokio = { version = "0.2", optional = true }
tokio = { version = "1.0", features = ["rt"], optional = true }

[dev-dependencies]
tendermint-testgen = { path = "../testgen"}
Expand Down
3 changes: 1 addition & 2 deletions light-client/src/utils/block_on.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ where
F::Output: Send,
{
std::thread::spawn(move || {
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|_| IoError::Runtime)?;
Expand Down
2 changes: 1 addition & 1 deletion light-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ tendermint = { version = "0.17.1", path = "../tendermint" }
tendermint-light-client = { version = "0.17.1", path = "../light-client", features = ["lightstore-sled"] }
tendermint-rpc = { version = "0.17.1", path = "../rpc", features = ["http-client"] }
thiserror = "1.0"
tokio = { version = "0.2", features = ["full"] }

[dependencies.abscissa_core]
version = "0.5.0"
Expand All @@ -53,3 +52,4 @@ abscissa_core = { version = "0.5.0", features = ["testing"] }
futures = { version = "0.3", features = [ "compat" ] }
once_cell = "1.2"
pretty_assertions = "0.6"
tokio = { version = "1.0", features = ["rt", "macros"] }
2 changes: 1 addition & 1 deletion p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ ed25519-dalek = "1"
eyre = "0.6"
hkdf = "0.10.0"
merlin = "2"
prost = "0.6"
prost = "0.7"
rand_core = { version = "0.5", features = ["std"] }
sha2 = "0.9"
subtle = "2"
Expand Down
6 changes: 3 additions & 3 deletions proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ description = """
all-features = true

[dependencies]
prost = { version = "0.6" }
prost-types = { version = "0.6" }
bytes = "0.5"
prost = "0.7"
prost-types = "0.7"
bytes = "1.0"
anomaly = "0.2"
thiserror = "1.0"
serde = { version = "1.0", features = ["derive"] }
Expand Down
10 changes: 5 additions & 5 deletions rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ websocket-client = [
"async-trait",
"async-tungstenite",
"futures",
"tokio/rt-multi-thread",
"tokio/fs",
"tokio/macros",
"tokio/stream",
"tokio/sync",
"tokio/time",
"tracing"
]

[dependencies]
bytes = "0.5"
bytes = "1.0"
chrono = "0.4"
getrandom = "0.1"
serde = { version = "1", features = [ "derive" ] }
Expand All @@ -62,10 +62,10 @@ subtle-encoding = { version = "0.5", features = ["bech32-preview"] }
walkdir = "2.3"

async-trait = { version = "0.1", optional = true }
async-tungstenite = { version = "0.9", features = ["tokio-runtime"], optional = true }
async-tungstenite = { version = "0.12", features = ["tokio-runtime"], optional = true }
futures = { version = "0.3", optional = true }
http = { version = "0.2", optional = true }
hyper = { version = "0.13", optional = true }
tokio = { version = "0.2", optional = true }
hyper = { version = "0.14", optional = true, features = ["client", "http1", "tcp"] }
tokio = { version = "1.0", optional = true }
tracing = { version = "0.1", optional = true }
pin-project = "1.0.1"
2 changes: 1 addition & 1 deletion rpc/src/client/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ impl<T> Stream for ChannelRx<T> {
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().0.poll_next(cx)
self.project().0.poll_recv(cx)
}
}
11 changes: 7 additions & 4 deletions rpc/src/client/transport/http.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
//! HTTP-based transport for Tendermint RPC Client.

use crate::client::transport::utils::get_tcp_host_port;
use crate::{Client, Response, Result, SimpleRequest};
use async_trait::async_trait;
use bytes::buf::ext::BufExt;
use hyper::body::Buf;
use hyper::header;

use tendermint::net;

use crate::client::transport::utils::get_tcp_host_port;
use crate::{Client, Response, Result, SimpleRequest};

/// A JSON-RPC/HTTP Tendermint RPC client (implements [`Client`]).
///
/// Does not provide [`Event`] subscription facilities (see [`WebSocketClient`]
Expand Down Expand Up @@ -62,7 +64,8 @@ impl Client for HttpClient {
.unwrap(),
);
}
let http_client = hyper::Client::builder().build_http();

let http_client = hyper::Client::new();
let response = http_client.request(request).await?;
let response_body = hyper::body::aggregate(response.into_body()).await?;
R::Response::from_reader(response_body.reader())
Expand Down
8 changes: 4 additions & 4 deletions rpc/src/client/transport/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ mod test {
}

async fn must_recv<T>(ch: &mut ChannelRx<T>, timeout_ms: u64) -> T {
let mut delay = time::delay_for(Duration::from_millis(timeout_ms));
let delay = time::sleep(Duration::from_millis(timeout_ms));
tokio::select! {
_ = &mut delay, if !delay.is_elapsed() => panic!("timed out waiting for recv"),
_ = delay, if !delay.is_elapsed() => panic!("timed out waiting for recv"),
Some(v) = ch.recv() => v,
}
}
Expand All @@ -136,9 +136,9 @@ mod test {
where
T: std::fmt::Debug,
{
let mut delay = time::delay_for(Duration::from_millis(timeout_ms));
let delay = time::sleep(Duration::from_millis(timeout_ms));
tokio::select! {
_ = &mut delay, if !delay.is_elapsed() => (),
_ = delay, if !delay.is_elapsed() => (),
Some(v) = ch.recv() => panic!("got unexpected result from channel: {:?}", v),
}
}
Expand Down
18 changes: 12 additions & 6 deletions rpc/src/client/transport/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,17 @@ impl WebSocketClientDriver {
pub async fn run(mut self) -> Result<()> {
let mut ping_interval =
tokio::time::interval_at(Instant::now().add(PING_INTERVAL), PING_INTERVAL);
let mut recv_timeout = tokio::time::delay_for(PING_INTERVAL);

let recv_timeout = tokio::time::sleep(RECV_TIMEOUT);
tokio::pin!(recv_timeout);

loop {
tokio::select! {
Some(res) = self.stream.next() => match res {
Ok(msg) => {
// Reset the receive timeout every time we successfully
// receive a message from the remote endpoint.
recv_timeout.reset(Instant::now().add(PING_INTERVAL));
recv_timeout.as_mut().reset(Instant::now().add(RECV_TIMEOUT));
self.handle_incoming_msg(msg).await?
},
Err(e) => return Err(
Expand All @@ -323,7 +326,7 @@ impl WebSocketClientDriver {
DriverCommand::SimpleRequest(req_cmd) => self.simple_request(req_cmd).await?,
DriverCommand::Terminate => return self.close().await,
},
_ = ping_interval.next() => self.ping().await?,
_ = ping_interval.tick() => self.ping().await?,
_ = &mut recv_timeout => {
return Err(Error::websocket_error(format!(
"reading from WebSocket connection timed out after {} seconds",
Expand Down Expand Up @@ -586,7 +589,10 @@ mod test {
loop {
tokio::select! {
Some(ev) = self.event_rx.recv() => self.publish_event(ev),
Some(res) = self.listener.next() => self.handle_incoming(res.unwrap()).await,
res = self.listener.accept() => {
let (stream, _) = res.unwrap();
self.handle_incoming(stream).await
}
Some(res) = self.terminate_rx.recv() => {
self.terminate().await;
return res;
Expand Down Expand Up @@ -678,8 +684,8 @@ mod test {
async fn run(mut self) -> Result<()> {
loop {
tokio::select! {
Some(res) = self.conn.next() => {
if let Some(ret) = self.handle_incoming_msg(res.unwrap()).await {
Some(msg) = self.conn.next() => {
if let Some(ret) = self.handle_incoming_msg(msg.unwrap()).await {
return ret;
}
}
Expand Down
6 changes: 3 additions & 3 deletions tendermint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ crate-type = ["cdylib", "rlib"]
[dependencies]
anomaly = "0.2"
async-trait = "0.1"
bytes = "0.5"
bytes = "1.0"
chrono = { version = "0.4", features = ["serde"] }
ed25519 = "1"
ed25519-dalek = { version = "1", features = ["serde"] }
futures = "0.3"
num-traits = "0.2"
once_cell = "1.3"
prost = "0.6"
prost-types = "0.6"
prost = "0.7"
prost-types = "0.7"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_bytes = "0.11"
Expand Down
10 changes: 3 additions & 7 deletions tendermint/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::error::{Error, Kind};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

use std::convert::TryFrom;
use std::convert::{Infallible, TryFrom};
use std::fmt;
use std::ops::{Add, Sub};
use std::str::FromStr;
Expand All @@ -23,7 +23,7 @@ pub struct Time(DateTime<Utc>);
impl Protobuf<Timestamp> for Time {}

impl TryFrom<Timestamp> for Time {
type Error = anomaly::BoxError;
type Error = Infallible;

fn try_from(value: Timestamp) -> Result<Self, Self::Error> {
// prost_types::Timestamp has a SystemTime converter but
Expand All @@ -33,11 +33,7 @@ impl TryFrom<Timestamp> for Time {
nanos: value.nanos,
};

Ok(SystemTime::try_from(prost_value)
.map_err(|e| {
Kind::OutOfRange.context(format!("time before EPOCH by {} seconds", e.as_secs()))
})?
.into())
Ok(SystemTime::from(prost_value).into())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conversion from a prost Timestamp to SystemTime is now infallible because prost_types::Timestamp now provides a Into<SystemTime>.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

}
}

Expand Down
2 changes: 1 addition & 1 deletion tools/kvstore-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ futures = "0.3"
tendermint = { version = "0.17.1", path = "../../tendermint" }
tendermint-light-client = { version = "0.17.1", path = "../../light-client" }
tendermint-rpc = { version = "0.17.1", path = "../../rpc", features = [ "http-client", "websocket-client" ] }
tokio = { version = "0.2", features = [ "macros" ] }
tokio = { version = "1.0", features = [ "rt-multi-thread", "macros" ] }
contracts = "0.4.0"
15 changes: 11 additions & 4 deletions tools/kvstore-test/tests/tendermint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ mod rpc {
let mut cur_tx_id = 0_u32;

while !expected_tx_values.is_empty() {
let mut delay = tokio::time::delay_for(Duration::from_secs(5));
let delay = tokio::time::sleep(Duration::from_secs(5));
tokio::pin!(delay);

tokio::select! {
Some(res) = subs.next() => {
let ev = res.unwrap();
Expand Down Expand Up @@ -309,7 +311,7 @@ mod rpc {
.broadcast_tx_async(Transaction::from(tx.into_bytes()))
.await
.unwrap();
tokio::time::delay_for(Duration::from_millis(100)).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
});

Expand All @@ -322,7 +324,9 @@ mod rpc {
);

while expected_new_blocks > 0 && !expected_tx_values.is_empty() {
let mut timeout = tokio::time::delay_for(Duration::from_secs(5));
let timeout = tokio::time::sleep(Duration::from_secs(5));
tokio::pin!(timeout);

tokio::select! {
Some(res) = combined_subs.next() => {
let ev: Event = res.unwrap();
Expand Down Expand Up @@ -398,7 +402,10 @@ mod rpc {
) -> Result<TxInfo, tendermint_rpc::Error> {
let mut subs = websocket_client.subscribe(EventType::Tx.into()).await?;
let _ = http_client.broadcast_tx_async(tx.clone()).await?;
let mut timeout = tokio::time::delay_for(Duration::from_secs(3));

let timeout = tokio::time::sleep(Duration::from_secs(3));
tokio::pin!(timeout);

tokio::select! {
Some(res) = subs.next() => {
let ev = res?;
Expand Down
2 changes: 1 addition & 1 deletion tools/proto-compiler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ publish = false

[dependencies]
walkdir = { version = "2.3" }
prost-build = { version = "0.6" }
prost-build = { version = "0.7" }
git2 = { version = "0.13" }
tempdir = { version = "0.3" }
subtle-encoding = { version = "0.5" }
4 changes: 2 additions & 2 deletions tools/rpc-probe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ description = """
all-features = true

[dependencies]
async-tungstenite = { version = "0.9", features = [ "tokio-runtime" ] }
async-tungstenite = { version = "0.12", features = [ "tokio-runtime" ] }
futures = "0.3"
getrandom = "0.1"
log = "0.4"
Expand All @@ -27,5 +27,5 @@ simple_logger = "1.11"
structopt = "0.3"
subtle-encoding = "0.5.1"
thiserror = "1.0"
tokio = { version = "0.2", features = [ "full" ] }
tokio = { version = "1.0", features = [ "full" ] }
uuid = "0.8"
2 changes: 1 addition & 1 deletion tools/rpc-probe/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl Client {
/// the given target height.
pub async fn wait_for_height(&mut self, h: u64) -> Result<()> {
let (mut subs, _) = self.subscribe(&uuid_v4(), "tm.event = 'NewBlock'").await?;
while let Some(result) = subs.next().await {
while let Some(result) = subs.recv().await {
let resp = result?;
// TODO(thane): Find a more readable way of getting this value.
let height = resp
Expand Down
4 changes: 2 additions & 2 deletions tools/rpc-probe/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ impl From<async_tungstenite::tungstenite::Error> for Error {
}
}

impl From<tokio::time::Elapsed> for Error {
fn from(e: tokio::time::Elapsed) -> Self {
impl From<tokio::time::error::Elapsed> for Error {
fn from(e: tokio::time::error::Elapsed) -> Self {
Self::Timeout(e.to_string())
}
}
Expand Down
6 changes: 4 additions & 2 deletions tools/rpc-probe/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ async fn execute_interaction(
info!("Executing interaction \"{}\"", inner_interaction.name);
if let Some(wait) = inner_interaction.pre_wait {
debug!("Sleeping for {} seconds", wait.as_secs_f64());
tokio::time::delay_for(wait).await;
tokio::time::sleep(wait).await;
}
if let Some(h) = inner_interaction.min_height {
debug!("Waiting for height {}", h);
Expand Down Expand Up @@ -419,7 +419,9 @@ async fn execute_subscription(
};
write_json(&config.in_path, name, &response_json).await?;

let mut timeout = tokio::time::delay_for(subs.max_time);
let timeout = tokio::time::sleep(subs.max_time);
tokio::pin!(timeout);

let mut event_count = 0_usize;
loop {
tokio::select! {
Expand Down