diff --git a/tendermint/Cargo.toml b/tendermint/Cargo.toml index 13be9a9b2..81108cc01 100644 --- a/tendermint/Cargo.toml +++ b/tendermint/Cargo.toml @@ -32,9 +32,11 @@ circle-ci = { repository = "interchainio/tendermint-rs" } [dependencies] bytes = "0.4" +bytes_0_5 = { version = "0.5", package = "bytes" } chrono = { version = "0.4", features = ["serde"] } failure = "0.1" -hyper = { version = "0.10" } +http = "0.2" +hyper = "0.13" prost-amino = { version = "0.4.0" } prost-amino-derive = { version = "0.4.0" } rand_os = { version = "0.1" } @@ -53,3 +55,4 @@ ed25519-dalek = {version = "1.0.0-pre.3", features = ["rand"]} [dev-dependencies] serde_json = "1" +tokio = "0.2" diff --git a/tendermint/src/rpc/client.rs b/tendermint/src/rpc/client.rs index b4db14bec..563567408 100644 --- a/tendermint/src/rpc/client.rs +++ b/tendermint/src/rpc/client.rs @@ -7,8 +7,8 @@ use crate::{ rpc::{self, endpoint::*, Error, Response}, Genesis, }; +use bytes_0_5::buf::ext::BufExt; use hyper::header; -use std::io::Read; /// Tendermint RPC client. /// @@ -20,59 +20,55 @@ pub struct Client { impl Client { /// Create a new Tendermint RPC client, connecting to the given address - pub fn new(address: &net::Address) -> Result { + pub async fn new(address: &net::Address) -> Result { let client = Client { address: address.clone(), }; - client.health()?; + client.health().await?; Ok(client) } /// `/abci_info`: get information about the ABCI application. - pub fn abci_info(&self) -> Result { - Ok(self.perform(abci_info::Request)?.response) + pub async fn abci_info(&self) -> Result { + Ok(self.perform(abci_info::Request).await?.response) } /// `/abci_query`: query the ABCI application - pub fn abci_query( + pub async fn abci_query( &self, path: Option, - data: D, + data: impl Into>, height: Option, prove: bool, - ) -> Result - where - D: Into>, - { + ) -> Result { Ok(self - .perform(abci_query::Request::new(path, data, height, prove))? + .perform(abci_query::Request::new(path, data, height, prove)) + .await? .response) } /// `/block`: get block at a given height. - pub fn block(&self, height: H) -> Result - where - H: Into, - { - self.perform(block::Request::new(height.into())) + pub async fn block(&self, height: impl Into) -> Result { + self.perform(block::Request::new(height.into())).await } /// `/block`: get the latest block. - pub fn latest_block(&self) -> Result { - self.perform(block::Request::default()) + pub async fn latest_block(&self) -> Result { + self.perform(block::Request::default()).await } /// `/block_results`: get ABCI results for a block at a particular height. - pub fn block_results(&self, height: H) -> Result + pub async fn block_results(&self, height: H) -> Result where H: Into, { self.perform(block_results::Request::new(height.into())) + .await } /// `/block_results`: get ABCI results for the latest block. - pub fn latest_block_results(&self) -> Result { - self.perform(block_results::Request::default()) + pub async fn latest_block_results(&self) -> Result { + self.perform(block_results::Request::default()).await } /// `/blockchain`: get block headers for `min` <= `height` <= `max`. @@ -80,79 +76,78 @@ impl Client { /// Block headers are returned in descending order (highest first). /// /// Returns at most 20 items. - pub fn blockchain(&self, min: H, max: H) -> Result - where - H: Into, - { + pub async fn blockchain( + &self, + min: impl Into, + max: impl Into, + ) -> Result { // TODO(tarcieri): return errors for invalid params before making request? self.perform(blockchain::Request::new(min.into(), max.into())) + .await } /// `/broadcast_tx_async`: broadcast a transaction, returning immediately. - pub fn broadcast_tx_async( + pub async fn broadcast_tx_async( &self, tx: Transaction, ) -> Result { - self.perform(broadcast::tx_async::Request::new(tx)) + self.perform(broadcast::tx_async::Request::new(tx)).await } /// `/broadcast_tx_sync`: broadcast a transaction, returning the response /// from `CheckTx`. - pub fn broadcast_tx_sync( + pub async fn broadcast_tx_sync( &self, tx: Transaction, ) -> Result { - self.perform(broadcast::tx_sync::Request::new(tx)) + self.perform(broadcast::tx_sync::Request::new(tx)).await } /// `/broadcast_tx_sync`: broadcast a transaction, returning the response /// from `CheckTx`. - pub fn broadcast_tx_commit( + pub async fn broadcast_tx_commit( &self, tx: Transaction, ) -> Result { - self.perform(broadcast::tx_commit::Request::new(tx)) + self.perform(broadcast::tx_commit::Request::new(tx)).await } /// `/commit`: get block commit at a given height. - pub fn commit(&self, height: H) -> Result - where - H: Into, - { - self.perform(commit::Request::new(height.into())) + pub async fn commit(&self, height: impl Into) -> Result { + self.perform(commit::Request::new(height.into())).await } /// `/commit`: get the latest block commit - pub fn latest_commit(&self) -> Result { - self.perform(commit::Request::default()) + pub async fn latest_commit(&self) -> Result { + self.perform(commit::Request::default()).await } /// `/health`: get node health. /// /// Returns empty result (200 OK) on success, no response in case of an error. - pub fn health(&self) -> Result<(), Error> { - self.perform(health::Request)?; + pub async fn health(&self) -> Result<(), Error> { + self.perform(health::Request).await?; Ok(()) } /// `/genesis`: get genesis file. - pub fn genesis(&self) -> Result { - Ok(self.perform(genesis::Request)?.genesis) + pub async fn genesis(&self) -> Result { + Ok(self.perform(genesis::Request).await?.genesis) } /// `/net_info`: obtain information about P2P and other network connections. - pub fn net_info(&self) -> Result { - self.perform(net_info::Request) + pub async fn net_info(&self) -> Result { + self.perform(net_info::Request).await } /// `/status`: get Tendermint status including node info, pubkey, latest /// block hash, app hash, block height and time. - pub fn status(&self) -> Result { - self.perform(status::Request) + pub async fn status(&self) -> Result { + self.perform(status::Request).await } /// Perform a request against the RPC endpoint - pub fn perform(&self, request: R) -> Result + pub async fn perform(&self, request: R) -> Result where R: rpc::Request, { @@ -168,26 +163,25 @@ impl Client { } }; - let mut headers = hyper::header::Headers::new(); - - // TODO(tarcieri): persistent connections - headers.set(header::Connection::close()); - headers.set(header::ContentType::json()); - headers.set(header::UserAgent("tendermint.rs RPC client".to_owned())); - - let http_client = hyper::Client::new(); - - let mut res = http_client - .request(hyper::Post, &format!("http://{}:{}/", host, port)) - .headers(headers) - .body(&request_body[..]) - .send() - .map_err(Error::server_error)?; - - let mut response_body = Vec::new(); - res.read_to_end(&mut response_body) - .map_err(Error::server_error)?; - - R::Response::from_json(&response_body) + let mut request = hyper::Request::builder() + .method("POST") + .uri(&format!("http://{}:{}/", host, port)) + .body(hyper::Body::from(request_body.into_bytes()))?; + + { + let headers = request.headers_mut(); + headers.insert(header::CONTENT_TYPE, "application/json".parse().unwrap()); + headers.insert( + header::USER_AGENT, + format!("tendermint.rs/{}", env!("CARGO_PKG_VERSION")) + .parse() + .unwrap(), + ); + } + + let http_client = hyper::Client::builder().keep_alive(true).build_http(); + let response = http_client.request(request).await?; + let response_body = hyper::body::aggregate(response.into_body()).await?; + R::Response::from_reader(response_body.reader()) } } diff --git a/tendermint/src/rpc/error.rs b/tendermint/src/rpc/error.rs index 728e82a34..974a734cb 100644 --- a/tendermint/src/rpc/error.rs +++ b/tendermint/src/rpc/error.rs @@ -21,6 +21,7 @@ impl Error { /// Create a new RPC error pub fn new(code: Code, data: Option) -> Error { let message = code.to_string(); + Error { code, message, @@ -28,6 +29,15 @@ impl Error { } } + /// Create a low-level HTTP error + pub fn http_error(message: impl Into) -> Error { + Error { + code: Code::HttpError, + message: message.into(), + data: None, + } + } + /// Create a new invalid parameter error pub fn invalid_params(data: &str) -> Error { Error::new(Code::InvalidParams, Some(data.to_string())) @@ -91,9 +101,15 @@ impl Fail for Error { } } +impl From for Error { + fn from(http_error: http::Error) -> Error { + Error::http_error(http_error.to_string()) + } +} + impl From for Error { fn from(hyper_error: hyper::Error) -> Error { - panic!("what am I supposed to do with this? {:?}", hyper_error); + Error::http_error(hyper_error.to_string()) } } @@ -103,6 +119,10 @@ impl From for Error { /// #[derive(Copy, Clone, Debug, Eq, Fail, Hash, PartialEq, PartialOrd, Ord)] pub enum Code { + /// Low-level HTTP error + #[fail(display = "HTTP error")] + HttpError, + /// Parse error i.e. invalid JSON (-32700) #[fail(display = "Parse error. Invalid JSON")] ParseError, @@ -142,6 +162,7 @@ impl Code { impl From for Code { fn from(value: i32) -> Code { match value { + 0 => Code::HttpError, -32700 => Code::ParseError, -32600 => Code::InvalidRequest, -32601 => Code::MethodNotFound, @@ -156,6 +177,7 @@ impl From for Code { impl From for i32 { fn from(code: Code) -> i32 { match code { + Code::HttpError => 0, Code::ParseError => -32700, Code::InvalidRequest => -32600, Code::MethodNotFound => -32601, diff --git a/tendermint/src/rpc/response.rs b/tendermint/src/rpc/response.rs index ef0ada330..fc47a94db 100644 --- a/tendermint/src/rpc/response.rs +++ b/tendermint/src/rpc/response.rs @@ -2,17 +2,20 @@ use super::{Error, Id, Version}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use std::io::Read; /// JSONRPC responses pub trait Response: Serialize + DeserializeOwned + Sized { /// Parse a JSONRPC response from a JSON string - fn from_json(response: T) -> Result - where - T: AsRef<[u8]>, - { + fn from_string(response: impl AsRef<[u8]>) -> Result { let wrapper: Wrapper = serde_json::from_slice(response.as_ref()).map_err(Error::parse_error)?; + wrapper.into_result() + } + /// Parse a JSONRPC response from an `io::Reader` + fn from_reader(reader: impl Read) -> Result { + let wrapper: Wrapper = serde_json::from_reader(reader).map_err(Error::parse_error)?; wrapper.into_result() } } diff --git a/tendermint/tests/integration.rs b/tendermint/tests/integration.rs index 4e8fb206a..1092458f2 100644 --- a/tendermint/tests/integration.rs +++ b/tendermint/tests/integration.rs @@ -9,18 +9,29 @@ /// cargo test -- --ignored /// ``` mod rpc { + use core::future::Future; use tendermint::rpc::Client; + use tokio::runtime::Builder; + + fn block_on(future: F) -> F::Output { + Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap() + .block_on(future) + } /// Get the address of the local node pub fn localhost_rpc_client() -> Client { - Client::new(&"tcp://127.0.0.1:26657".parse().unwrap()).unwrap() + block_on(Client::new(&"tcp://127.0.0.1:26657".parse().unwrap())).unwrap() } /// `/abci_info` endpoint #[test] #[ignore] fn abci_info() { - let abci_info = localhost_rpc_client().abci_info().unwrap(); + let abci_info = block_on(localhost_rpc_client().abci_info()).unwrap(); assert_eq!(&abci_info.data, "GaiaApp"); } @@ -29,9 +40,8 @@ mod rpc { #[ignore] fn abci_query() { let key = "unpopulated_key".parse().unwrap(); - let abci_query = localhost_rpc_client() - .abci_query(Some(key), vec![], None, false) - .unwrap(); + let abci_query = + block_on(localhost_rpc_client().abci_query(Some(key), vec![], None, false)).unwrap(); assert_eq!(abci_query.key.as_ref().unwrap(), &Vec::::new()); assert_eq!(abci_query.value.as_ref(), None); } @@ -41,7 +51,7 @@ mod rpc { #[ignore] fn block() { let height = 1u64; - let block_info = localhost_rpc_client().block(height).unwrap(); + let block_info = block_on(localhost_rpc_client().block(height)).unwrap(); assert_eq!(block_info.block_meta.header.height.value(), height); } @@ -50,7 +60,7 @@ mod rpc { #[ignore] fn block_results() { let height = 1u64; - let block_results = localhost_rpc_client().block_results(height).unwrap(); + let block_results = block_on(localhost_rpc_client().block_results(height)).unwrap(); assert_eq!(block_results.height.value(), height); } @@ -58,7 +68,7 @@ mod rpc { #[test] #[ignore] fn blockchain() { - let blockchain_info = localhost_rpc_client().blockchain(1u64, 10u64).unwrap(); + let blockchain_info = block_on(localhost_rpc_client().blockchain(1u64, 10u64)).unwrap(); assert_eq!(blockchain_info.block_metas.len(), 10); } @@ -67,7 +77,7 @@ mod rpc { #[ignore] fn commit() { let height = 1u64; - let commit_info = localhost_rpc_client().block(height).unwrap(); + let commit_info = block_on(localhost_rpc_client().block(height)).unwrap(); assert_eq!(commit_info.block_meta.header.height.value(), height); } @@ -75,7 +85,7 @@ mod rpc { #[test] #[ignore] fn genesis() { - let genesis = localhost_rpc_client().genesis().unwrap(); + let genesis = block_on(localhost_rpc_client().genesis()).unwrap(); assert_eq!( genesis.consensus_params.validator.pub_key_types[0].to_string(), "ed25519" @@ -86,7 +96,7 @@ mod rpc { #[test] #[ignore] fn net_info() { - let net_info = localhost_rpc_client().net_info().unwrap(); + let net_info = block_on(localhost_rpc_client().net_info()).unwrap(); assert!(net_info.listening); } @@ -94,7 +104,7 @@ mod rpc { #[test] #[ignore] fn status_integration() { - let status = localhost_rpc_client().status().unwrap(); + let status = block_on(localhost_rpc_client().status()).unwrap(); // For lack of better things to test assert_eq!(status.validator_info.voting_power.value(), 10); diff --git a/tendermint/tests/rpc.rs b/tendermint/tests/rpc.rs index 6b533d5b2..78909ecc3 100644 --- a/tendermint/tests/rpc.rs +++ b/tendermint/tests/rpc.rs @@ -16,7 +16,7 @@ mod endpoints { #[test] fn abci_info() { - let response = endpoint::abci_info::Response::from_json(&read_json_fixture("abci_info")) + let response = endpoint::abci_info::Response::from_string(&read_json_fixture("abci_info")) .unwrap() .response; @@ -26,16 +26,17 @@ mod endpoints { #[test] fn abci_query() { - let response = endpoint::abci_query::Response::from_json(&read_json_fixture("abci_query")) - .unwrap() - .response; + let response = + endpoint::abci_query::Response::from_string(&read_json_fixture("abci_query")) + .unwrap() + .response; assert_eq!(response.height.value(), 1); } #[test] fn block() { - let response = endpoint::block::Response::from_json(&read_json_fixture("block")).unwrap(); + let response = endpoint::block::Response::from_string(&read_json_fixture("block")).unwrap(); let tendermint::Block { header, @@ -57,7 +58,7 @@ mod endpoints { #[test] fn block_results() { let response = - endpoint::block_results::Response::from_json(&read_json_fixture("block_results")) + endpoint::block_results::Response::from_string(&read_json_fixture("block_results")) .unwrap(); assert_eq!(response.height.value(), 1814); @@ -94,7 +95,7 @@ mod endpoints { #[test] fn blockchain() { let response = - endpoint::blockchain::Response::from_json(&read_json_fixture("blockchain")).unwrap(); + endpoint::blockchain::Response::from_string(&read_json_fixture("blockchain")).unwrap(); assert_eq!(response.last_height.value(), 488_556); assert_eq!(response.block_metas.len(), 10); @@ -105,7 +106,7 @@ mod endpoints { #[test] fn broadcast_tx_async() { - let response = endpoint::broadcast::tx_async::Response::from_json(&read_json_fixture( + let response = endpoint::broadcast::tx_async::Response::from_string(&read_json_fixture( "broadcast_tx_async", )) .unwrap(); @@ -118,7 +119,7 @@ mod endpoints { #[test] fn broadcast_tx_sync() { - let response = endpoint::broadcast::tx_sync::Response::from_json(&read_json_fixture( + let response = endpoint::broadcast::tx_sync::Response::from_string(&read_json_fixture( "broadcast_tx_sync", )) .unwrap(); @@ -133,7 +134,7 @@ mod endpoints { #[test] fn broadcast_tx_sync_int() { - let response = endpoint::broadcast::tx_sync::Response::from_json(&read_json_fixture( + let response = endpoint::broadcast::tx_sync::Response::from_string(&read_json_fixture( "broadcast_tx_sync_int", )) .unwrap(); @@ -148,7 +149,7 @@ mod endpoints { #[test] fn broadcast_tx_commit() { - let response = endpoint::broadcast::tx_commit::Response::from_json(&read_json_fixture( + let response = endpoint::broadcast::tx_commit::Response::from_string(&read_json_fixture( "broadcast_tx_commit", )) .unwrap(); @@ -161,7 +162,8 @@ mod endpoints { #[test] fn commit() { - let response = endpoint::commit::Response::from_json(&read_json_fixture("commit")).unwrap(); + let response = + endpoint::commit::Response::from_string(&read_json_fixture("commit")).unwrap(); let header = response.signed_header.header; assert_eq!(header.chain_id.as_ref(), EXAMPLE_CHAIN); // For now we just want to make sure the commit including precommits and a block_id exist @@ -176,7 +178,7 @@ mod endpoints { #[test] fn genesis() { let response = - endpoint::genesis::Response::from_json(&read_json_fixture("genesis")).unwrap(); + endpoint::genesis::Response::from_string(&read_json_fixture("genesis")).unwrap(); let tendermint::Genesis { chain_id, @@ -190,13 +192,13 @@ mod endpoints { #[test] fn health() { - endpoint::health::Response::from_json(&read_json_fixture("health")).unwrap(); + endpoint::health::Response::from_string(&read_json_fixture("health")).unwrap(); } #[test] fn net_info() { let response = - endpoint::net_info::Response::from_json(&read_json_fixture("net_info")).unwrap(); + endpoint::net_info::Response::from_string(&read_json_fixture("net_info")).unwrap(); assert_eq!(response.n_peers, 2); assert_eq!(response.peers[0].node_info.network.as_str(), EXAMPLE_CHAIN); @@ -204,7 +206,8 @@ mod endpoints { #[test] fn status() { - let response = endpoint::status::Response::from_json(&read_json_fixture("status")).unwrap(); + let response = + endpoint::status::Response::from_string(&read_json_fixture("status")).unwrap(); assert_eq!(response.node_info.network.as_str(), EXAMPLE_CHAIN); assert_eq!(response.sync_info.latest_block_height.value(), 410_744); @@ -214,7 +217,7 @@ mod endpoints { #[test] fn validators() { let response = - endpoint::validators::Response::from_json(&read_json_fixture("validators")).unwrap(); + endpoint::validators::Response::from_string(&read_json_fixture("validators")).unwrap(); assert_eq!(response.block_height.value(), 42); @@ -224,7 +227,7 @@ mod endpoints { #[test] fn jsonrpc_error() { - let result = endpoint::blockchain::Response::from_json(&read_json_fixture("error")); + let result = endpoint::blockchain::Response::from_string(&read_json_fixture("error")); if let Err(err) = result { assert_eq!(err.code(), rpc::error::Code::InternalError);