Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to hyper v0.13; use async/await #85

Merged
merged 3 commits into from
Dec 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion tendermint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ circle-ci = { repository = "interchainio/tendermint-rs" }

[dependencies]
bytes = "0.4"
bytes_0_5 = { version = "0.5", package = "bytes" }
chrono = { version = "0.4", features = ["serde"] }
failure = "0.1"
hyper = { version = "0.10" }
http = "0.2"
hyper = "0.13"
prost-amino = { version = "0.4.0" }
prost-amino-derive = { version = "0.4.0" }
rand_os = { version = "0.1" }
Expand All @@ -53,3 +55,4 @@ ed25519-dalek = {version = "1.0.0-pre.3", features = ["rand"]}

[dev-dependencies]
serde_json = "1"
tokio = "0.2"
132 changes: 63 additions & 69 deletions tendermint/src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use crate::{
rpc::{self, endpoint::*, Error, Response},
Genesis,
};
use bytes_0_5::buf::ext::BufExt;
use hyper::header;
use std::io::Read;

/// Tendermint RPC client.
///
Expand All @@ -20,139 +20,134 @@ pub struct Client {

impl Client {
/// Create a new Tendermint RPC client, connecting to the given address
pub fn new(address: &net::Address) -> Result<Self, Error> {
pub async fn new(address: &net::Address) -> Result<Self, Error> {
let client = Client {
address: address.clone(),
};
client.health()?;
client.health().await?;
Ok(client)
}

/// `/abci_info`: get information about the ABCI application.
pub fn abci_info(&self) -> Result<abci_info::AbciInfo, Error> {
Ok(self.perform(abci_info::Request)?.response)
pub async fn abci_info(&self) -> Result<abci_info::AbciInfo, Error> {
Ok(self.perform(abci_info::Request).await?.response)
}

/// `/abci_query`: query the ABCI application
pub fn abci_query<D>(
pub async fn abci_query(
&self,
path: Option<abci::Path>,
data: D,
data: impl Into<Vec<u8>>,
height: Option<Height>,
prove: bool,
) -> Result<abci_query::AbciQuery, Error>
where
D: Into<Vec<u8>>,
{
) -> Result<abci_query::AbciQuery, Error> {
Ok(self
.perform(abci_query::Request::new(path, data, height, prove))?
.perform(abci_query::Request::new(path, data, height, prove))
.await?
.response)
}

/// `/block`: get block at a given height.
pub fn block<H>(&self, height: H) -> Result<block::Response, Error>
where
H: Into<Height>,
{
self.perform(block::Request::new(height.into()))
pub async fn block(&self, height: impl Into<Height>) -> Result<block::Response, Error> {
self.perform(block::Request::new(height.into())).await
}

/// `/block`: get the latest block.
pub fn latest_block(&self) -> Result<block::Response, Error> {
self.perform(block::Request::default())
pub async fn latest_block(&self) -> Result<block::Response, Error> {
self.perform(block::Request::default()).await
}

/// `/block_results`: get ABCI results for a block at a particular height.
pub fn block_results<H>(&self, height: H) -> Result<block_results::Response, Error>
pub async fn block_results<H>(&self, height: H) -> Result<block_results::Response, Error>
where
H: Into<Height>,
{
self.perform(block_results::Request::new(height.into()))
.await
}

/// `/block_results`: get ABCI results for the latest block.
pub fn latest_block_results(&self) -> Result<block_results::Response, Error> {
self.perform(block_results::Request::default())
pub async fn latest_block_results(&self) -> Result<block_results::Response, Error> {
self.perform(block_results::Request::default()).await
}

/// `/blockchain`: get block headers for `min` <= `height` <= `max`.
///
/// Block headers are returned in descending order (highest first).
///
/// Returns at most 20 items.
pub fn blockchain<H>(&self, min: H, max: H) -> Result<blockchain::Response, Error>
where
H: Into<Height>,
{
pub async fn blockchain(
&self,
min: impl Into<Height>,
max: impl Into<Height>,
) -> Result<blockchain::Response, Error> {
// TODO(tarcieri): return errors for invalid params before making request?
self.perform(blockchain::Request::new(min.into(), max.into()))
.await
}

/// `/broadcast_tx_async`: broadcast a transaction, returning immediately.
pub fn broadcast_tx_async(
pub async fn broadcast_tx_async(
&self,
tx: Transaction,
) -> Result<broadcast::tx_async::Response, Error> {
self.perform(broadcast::tx_async::Request::new(tx))
self.perform(broadcast::tx_async::Request::new(tx)).await
}

/// `/broadcast_tx_sync`: broadcast a transaction, returning the response
/// from `CheckTx`.
pub fn broadcast_tx_sync(
pub async fn broadcast_tx_sync(
&self,
tx: Transaction,
) -> Result<broadcast::tx_sync::Response, Error> {
self.perform(broadcast::tx_sync::Request::new(tx))
self.perform(broadcast::tx_sync::Request::new(tx)).await
}

/// `/broadcast_tx_sync`: broadcast a transaction, returning the response
/// from `CheckTx`.
pub fn broadcast_tx_commit(
pub async fn broadcast_tx_commit(
&self,
tx: Transaction,
) -> Result<broadcast::tx_commit::Response, Error> {
self.perform(broadcast::tx_commit::Request::new(tx))
self.perform(broadcast::tx_commit::Request::new(tx)).await
}

/// `/commit`: get block commit at a given height.
pub fn commit<H>(&self, height: H) -> Result<commit::Response, Error>
where
H: Into<Height>,
{
self.perform(commit::Request::new(height.into()))
pub async fn commit(&self, height: impl Into<Height>) -> Result<commit::Response, Error> {
self.perform(commit::Request::new(height.into())).await
}

/// `/commit`: get the latest block commit
pub fn latest_commit(&self) -> Result<commit::Response, Error> {
self.perform(commit::Request::default())
pub async fn latest_commit(&self) -> Result<commit::Response, Error> {
self.perform(commit::Request::default()).await
}

/// `/health`: get node health.
///
/// Returns empty result (200 OK) on success, no response in case of an error.
pub fn health(&self) -> Result<(), Error> {
self.perform(health::Request)?;
pub async fn health(&self) -> Result<(), Error> {
self.perform(health::Request).await?;
Ok(())
}

/// `/genesis`: get genesis file.
pub fn genesis(&self) -> Result<Genesis, Error> {
Ok(self.perform(genesis::Request)?.genesis)
pub async fn genesis(&self) -> Result<Genesis, Error> {
Ok(self.perform(genesis::Request).await?.genesis)
}

/// `/net_info`: obtain information about P2P and other network connections.
pub fn net_info(&self) -> Result<net_info::Response, Error> {
self.perform(net_info::Request)
pub async fn net_info(&self) -> Result<net_info::Response, Error> {
self.perform(net_info::Request).await
}

/// `/status`: get Tendermint status including node info, pubkey, latest
/// block hash, app hash, block height and time.
pub fn status(&self) -> Result<status::Response, Error> {
self.perform(status::Request)
pub async fn status(&self) -> Result<status::Response, Error> {
self.perform(status::Request).await
}

/// Perform a request against the RPC endpoint
pub fn perform<R>(&self, request: R) -> Result<R::Response, Error>
pub async fn perform<R>(&self, request: R) -> Result<R::Response, Error>
where
R: rpc::Request,
{
Expand All @@ -168,26 +163,25 @@ impl Client {
}
};

let mut headers = hyper::header::Headers::new();

// TODO(tarcieri): persistent connections
headers.set(header::Connection::close());
headers.set(header::ContentType::json());
headers.set(header::UserAgent("tendermint.rs RPC client".to_owned()));

let http_client = hyper::Client::new();

let mut res = http_client
.request(hyper::Post, &format!("http://{}:{}/", host, port))
.headers(headers)
.body(&request_body[..])
.send()
.map_err(Error::server_error)?;

let mut response_body = Vec::new();
res.read_to_end(&mut response_body)
.map_err(Error::server_error)?;

R::Response::from_json(&response_body)
let mut request = hyper::Request::builder()
.method("POST")
.uri(&format!("http://{}:{}/", host, port))
.body(hyper::Body::from(request_body.into_bytes()))?;

{
let headers = request.headers_mut();
headers.insert(header::CONTENT_TYPE, "application/json".parse().unwrap());
headers.insert(
header::USER_AGENT,
format!("tendermint.rs/{}", env!("CARGO_PKG_VERSION"))
.parse()
.unwrap(),
);
}

let http_client = hyper::Client::builder().keep_alive(true).build_http();
let response = http_client.request(request).await?;
let response_body = hyper::body::aggregate(response.into_body()).await?;
R::Response::from_reader(response_body.reader())
}
}
24 changes: 23 additions & 1 deletion tendermint/src/rpc/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,23 @@ impl Error {
/// Create a new RPC error
pub fn new(code: Code, data: Option<String>) -> Error {
let message = code.to_string();

Error {
code,
message,
data,
}
}

/// Create a low-level HTTP error
pub fn http_error(message: impl Into<String>) -> Error {
Error {
code: Code::HttpError,
message: message.into(),
data: None,
}
}

/// Create a new invalid parameter error
pub fn invalid_params(data: &str) -> Error {
Error::new(Code::InvalidParams, Some(data.to_string()))
Expand Down Expand Up @@ -91,9 +101,15 @@ impl Fail for Error {
}
}

impl From<http::Error> for Error {
fn from(http_error: http::Error) -> Error {
Error::http_error(http_error.to_string())
}
}

impl From<hyper::Error> for Error {
fn from(hyper_error: hyper::Error) -> Error {
panic!("what am I supposed to do with this? {:?}", hyper_error);
Error::http_error(hyper_error.to_string())
}
}

Expand All @@ -103,6 +119,10 @@ impl From<hyper::Error> for Error {
/// <https://github.com/tendermint/tendermint/blob/master/rpc/lib/types/types.go>
#[derive(Copy, Clone, Debug, Eq, Fail, Hash, PartialEq, PartialOrd, Ord)]
pub enum Code {
/// Low-level HTTP error
#[fail(display = "HTTP error")]
HttpError,

/// Parse error i.e. invalid JSON (-32700)
#[fail(display = "Parse error. Invalid JSON")]
ParseError,
Expand Down Expand Up @@ -142,6 +162,7 @@ impl Code {
impl From<i32> for Code {
fn from(value: i32) -> Code {
match value {
0 => Code::HttpError,
-32700 => Code::ParseError,
-32600 => Code::InvalidRequest,
-32601 => Code::MethodNotFound,
Expand All @@ -156,6 +177,7 @@ impl From<i32> for Code {
impl From<Code> for i32 {
fn from(code: Code) -> i32 {
match code {
Code::HttpError => 0,
Code::ParseError => -32700,
Code::InvalidRequest => -32600,
Code::MethodNotFound => -32601,
Expand Down
11 changes: 7 additions & 4 deletions tendermint/src/rpc/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@

use super::{Error, Id, Version};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::io::Read;

/// JSONRPC responses
pub trait Response: Serialize + DeserializeOwned + Sized {
/// Parse a JSONRPC response from a JSON string
fn from_json<T>(response: T) -> Result<Self, Error>
where
T: AsRef<[u8]>,
{
fn from_string(response: impl AsRef<[u8]>) -> Result<Self, Error> {
let wrapper: Wrapper<Self> =
serde_json::from_slice(response.as_ref()).map_err(Error::parse_error)?;
wrapper.into_result()
}

/// Parse a JSONRPC response from an `io::Reader`
fn from_reader(reader: impl Read) -> Result<Self, Error> {
let wrapper: Wrapper<Self> = serde_json::from_reader(reader).map_err(Error::parse_error)?;
wrapper.into_result()
}
}
Expand Down
Loading