From ee3425334de2e042e0671cf523effae226150763 Mon Sep 17 00:00:00 2001 From: Valentin Kettner Date: Mon, 7 Jun 2021 13:48:27 +0200 Subject: [PATCH 1/2] Redo http transport The existing implementation has accumulated a lot of crust from initial async support and use of hyper. Now that is using reqwest we can do many things simpler. Removed things that were needed for hyper but are handled already by reqwest: - setting the proxy - setting the auth header - setting the content type header - setting content length header Other changes: - Move from struct with Future impl to BoxFuture. We were already using BoxFuture internally anyawy. - Log request and response as debug strings. This makes sure no control characters like newlines end up in the output. - Use from_utf8_lossy for the response log so that we can still partially log non utf8 responses. - Fix not handling batch responses according to the jsonrpc specification which does not guarantee the ordering of the list. - Add more context to error logs. - Allow giving a custom client to the transport. This is useful for example to set a timeout. --- Cargo.toml | 2 +- src/helpers.rs | 16 +- src/transports/http.rs | 368 ++++++++++++++++------------------------- src/transports/mod.rs | 2 +- 4 files changed, 160 insertions(+), 228 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a88d019e..3432b9ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ pin-project = "1.0" ## HTTP base64 = { version = "0.13", optional = true } bytes = { version = "1.0", optional = true } -reqwest = { version = "0.11", optional = true, default-features = false } +reqwest = { version = "0.11", optional = true, default-features = false, features = ["json"] } headers = { version = "0.3", optional = true } ## WS # async-native-tls = { git = "https://github.com/async-email/async-native-tls.git", rev = "b5b5562d6cea77f913d4cbe448058c031833bf17", optional = true, default-features = false } diff --git a/src/helpers.rs b/src/helpers.rs index bdd02477..b71f2e2f 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -6,6 +6,7 @@ use futures::{ Future, }; use pin_project::pin_project; +use serde::de::DeserializeOwned; use std::{marker::PhantomData, pin::Pin}; /// Takes any type which is deserializable from rpc::Value and such a value and @@ -70,12 +71,19 @@ pub fn build_request(id: usize, method: &str, params: Vec) -> rpc::C /// Parse bytes slice into JSON-RPC response. /// It looks for arbitrary_precision feature as a temporary workaround for https://github.com/tomusdrw/rust-web3/issues/460. pub fn to_response_from_slice(response: &[u8]) -> error::Result { + arbitrary_precision_deserialize_workaround(response).map_err(|e| Error::InvalidResponse(format!("{:?}", e))) +} + +/// Deserialize bytes into T. +/// It looks for arbitrary_precision feature as a temporary workaround for https://github.com/tomusdrw/rust-web3/issues/460. +pub fn arbitrary_precision_deserialize_workaround(bytes: &[u8]) -> Result +where + T: DeserializeOwned, +{ if cfg!(feature = "arbitrary_precision") { - let val: serde_json::Value = - serde_json::from_slice(response).map_err(|e| Error::InvalidResponse(format!("{:?}", e)))?; - serde_json::from_value(val).map_err(|e| Error::InvalidResponse(format!("{:?}", e))) + serde_json::from_value(serde_json::from_slice(bytes)?) } else { - serde_json::from_slice(response).map_err(|e| Error::InvalidResponse(format!("{:?}", e))) + serde_json::from_slice(bytes) } } diff --git a/src/transports/http.rs b/src/transports/http.rs index 4c81c1b1..fccdf219 100644 --- a/src/transports/http.rs +++ b/src/transports/http.rs @@ -1,247 +1,172 @@ //! HTTP Transport -use crate::{error, helpers, rpc, BatchTransport, Error, RequestId, Transport}; +use crate::{ + error::{Error, Result}, + helpers, BatchTransport, RequestId, Transport, +}; #[cfg(not(feature = "wasm"))] use futures::future::BoxFuture; #[cfg(feature = "wasm")] use futures::future::LocalBoxFuture as BoxFuture; -use futures::{ - self, - task::{Context, Poll}, - Future, FutureExt, -}; -use reqwest::header::HeaderValue; +use headers::HeaderValue; +use jsonrpc_core::types::{Call, Output, Request, Value}; +use reqwest::{Client, Url}; +use serde::de::DeserializeOwned; use std::{ - fmt, - ops::Deref, - pin::Pin, + collections::HashMap, sync::{ - atomic::{self, AtomicUsize}, + atomic::{AtomicUsize, Ordering}, Arc, }, }; -use url::Url; - -impl From for Error { - fn from(err: reqwest::Error) -> Self { - Error::Transport(format!("{:?}", err)) - } -} -impl From for Error { - fn from(err: reqwest::header::InvalidHeaderValue) -> Self { - Error::Transport(format!("{}", err)) - } +/// HTTP Transport +#[derive(Clone, Debug)] +pub struct Http { + // Client is already an Arc so doesn't need to be part of inner. + client: Client, + inner: Arc, } -// The max string length of a request without transfer-encoding: chunked. -const MAX_SINGLE_CHUNK: usize = 256; - -/// HTTP Transport (synchronous) -#[derive(Debug, Clone)] -pub struct Http { - id: Arc, - url: reqwest::Url, - basic_auth: Option, - client: reqwest::Client, +#[derive(Debug)] +struct Inner { + url: Url, + id: AtomicUsize, } impl Http { /// Create new HTTP transport connecting to given URL. - pub fn new(url: &str) -> error::Result { + /// + /// Note that the http [Client] automatically enables some features like setting the basic auth + /// header or enabling a proxy from the environment. You can customize it with + /// [Http::with_client]. + pub fn new(url: &str) -> Result { #[allow(unused_mut)] - let mut client_builder = reqwest::Client::builder(); - - #[cfg(feature = "http-native-tls")] - { - client_builder = client_builder.use_native_tls(); - } - - #[cfg(feature = "http-rustls-tls")] - { - client_builder = client_builder.use_rustls_tls(); - } - + let mut builder = Client::builder(); #[cfg(not(feature = "wasm"))] { - let proxy_env = std::env::var("HTTPS_PROXY"); - client_builder = match proxy_env { - Ok(proxy_scheme) => { - let proxy = reqwest::Proxy::all(proxy_scheme.as_str())?; - client_builder.proxy(proxy) - } - Err(_) => client_builder.no_proxy(), - }; + builder = builder.user_agent(HeaderValue::from_static("web3.rs")); } - - let client = client_builder.build()?; - - let basic_auth = { - let url = Url::parse(url)?; - let user = url.username(); - let auth = format!("{}:{}", user, url.password().unwrap_or_default()); - if &auth == ":" { - None - } else { - Some(HeaderValue::from_str(&format!("Basic {}", base64::encode(&auth)))?) - } - }; - - Ok(Http { - id: Arc::new(AtomicUsize::new(1)), - url: url.parse()?, - basic_auth, - client, - }) + let client = builder + .build() + .map_err(|err| Error::Transport(format!("failed to build client: {}", err)))?; + Ok(Self::with_client(client, url.parse()?)) } - fn send_request(&self, id: RequestId, request: rpc::Request, extract: F) -> Response - where - F: Fn(Vec) -> O, - { - let request = helpers::to_string(&request); - log::debug!("[{}] Sending: {} to {}", id, request, self.url); - let len = request.len(); - - let mut request_builder = self - .client - .post(self.url.clone()) - .header( - reqwest::header::CONTENT_TYPE, - HeaderValue::from_static("application/json"), - ) - .header(reqwest::header::USER_AGENT, HeaderValue::from_static("web3.rs")) - .body(request); - - // Don't send chunked request - if len < MAX_SINGLE_CHUNK { - request_builder = request_builder.header(reqwest::header::CONTENT_LENGTH, len.to_string()); + /// Like `new` but with a user provided client instance. + pub fn with_client(client: Client, url: Url) -> Self { + Self { + client, + inner: Arc::new(Inner { + url, + id: AtomicUsize::new(0), + }), } + } - // Send basic auth header - if let Some(ref basic_auth) = self.basic_auth { - request_builder = request_builder.header(reqwest::header::AUTHORIZATION, basic_auth.clone()); - } + fn next_id(&self) -> RequestId { + self.inner.id.fetch_add(1, Ordering::AcqRel) + } - let result = request_builder.send(); - Response::new(id, Box::pin(result), extract) + fn new_request(&self) -> (Client, Url) { + (self.client.clone(), self.inner.url.clone()) } } +// Id is only used for logging. +async fn execute_rpc(client: &Client, url: Url, request: &Request, id: RequestId) -> Result { + log::debug!("[id:{}] sending request: {:?}", id, serde_json::to_string(&request)?); + let response = client + .post(url) + .json(request) + .send() + .await + .map_err(|err| Error::Transport(format!("failed to send request: {}", err)))?; + let status = response.status(); + let response = response + .bytes() + .await + .map_err(|err| Error::Transport(format!("failed to read response bytes: {}", err)))?; + log::debug!( + "[id:{}] received response: {:?}", + id, + String::from_utf8_lossy(&response).as_ref() + ); + if !status.is_success() { + return Err(Error::Transport(format!( + "response status code is not success: {}", + status + ))); + } + helpers::arbitrary_precision_deserialize_workaround(&response) + .map_err(|err| Error::Transport(format!("failed to deserialize response: {}", err))) +} + +type RpcResult = Result; + impl Transport for Http { - type Out = Response) -> error::Result>; + type Out = BoxFuture<'static, Result>; - fn prepare(&self, method: &str, params: Vec) -> (RequestId, rpc::Call) { - let id = self.id.fetch_add(1, atomic::Ordering::AcqRel); + fn prepare(&self, method: &str, params: Vec) -> (RequestId, Call) { + let id = self.next_id(); let request = helpers::build_request(id, method, params); - (id, request) } - fn send(&self, id: RequestId, request: rpc::Call) -> Self::Out { - self.send_request(id, rpc::Request::Single(request), single_response) + fn send(&self, id: RequestId, call: Call) -> Self::Out { + let (client, url) = self.new_request(); + Box::pin(async move { + let output: Output = execute_rpc(&client, url, &Request::Single(call), id).await?; + helpers::to_result_from_output(output) + }) } } impl BatchTransport for Http { - type Batch = Response) -> error::Result>>>; + type Batch = BoxFuture<'static, Result>>; fn send_batch(&self, requests: T) -> Self::Batch where - T: IntoIterator, + T: IntoIterator, { - let mut it = requests.into_iter(); - let (id, first) = it.next().map(|x| (x.0, Some(x.1))).unwrap_or_else(|| (0, None)); - let requests = first.into_iter().chain(it.map(|x| x.1)).collect(); - - self.send_request(id, rpc::Request::Batch(requests), batch_response) - } -} - -/// Parse bytes RPC response into `Result`. -fn single_response>(response: T) -> error::Result { - let response = - helpers::to_response_from_slice(&*response).map_err(|e| Error::InvalidResponse(format!("{:?}", e)))?; - match response { - rpc::Response::Single(output) => helpers::to_result_from_output(output), - _ => Err(Error::InvalidResponse("Expected single, got batch.".into())), - } -} - -/// Parse bytes RPC batch response into `Result`. -fn batch_response>(response: T) -> error::Result>> { - let response = - helpers::to_response_from_slice(&*response).map_err(|e| Error::InvalidResponse(format!("{:?}", e)))?; - match response { - rpc::Response::Batch(outputs) => Ok(outputs.into_iter().map(helpers::to_result_from_output).collect()), - _ => Err(Error::InvalidResponse("Expected batch, got single.".into())), + // Batch calls don't need an id but it helps associate the response log with the request log. + let id = self.next_id(); + let (client, url) = self.new_request(); + let (ids, calls): (Vec<_>, Vec<_>) = requests.into_iter().unzip(); + Box::pin(async move { + let outputs: Vec = execute_rpc(&client, url, &Request::Batch(calls), id).await?; + handle_batch_response(&ids, outputs) + }) } } -type ResponseFuture = BoxFuture<'static, reqwest::Result>; -type BodyFuture = BoxFuture<'static, reqwest::Result>; -enum ResponseState { - Waiting(ResponseFuture), - Reading(BodyFuture), -} - -/// A future representing a response to a pending request. -pub struct Response { - id: RequestId, - extract: T, - state: ResponseState, -} - -impl Response { - /// Creates a new `Response` - pub fn new(id: RequestId, response: ResponseFuture, extract: T) -> Self { - log::trace!("[{}] Request pending.", id); - Response { - id, - extract, - state: ResponseState::Waiting(response), - } - } +// According to the jsonrpc specification batch responses can be returned in any order so we need to +// restore the intended order. +fn handle_batch_response(ids: &[RequestId], outputs: Vec) -> Result> { + if ids.len() != outputs.len() { + return Err(Error::InvalidResponse("unexpected number of responses".to_string())); + } + let mut outputs = outputs + .into_iter() + .map(|output| Ok((id_of_output(&output)?, helpers::to_result_from_output(output)))) + .collect::>>()?; + ids.iter() + .map(|id| { + outputs + .remove(id) + .ok_or_else(|| Error::InvalidResponse(format!("batch response is missing id {}", id))) + }) + .collect() } -// We can do this because `hyper::client::ResponseFuture: Unpin`. -impl Unpin for Response {} - -impl Future for Response -where - T: Fn(Vec) -> error::Result, - Out: fmt::Debug, -{ - type Output = error::Result; - - fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll { - let id = self.id; - loop { - match self.state { - ResponseState::Waiting(ref mut waiting) => { - log::trace!("[{}] Checking response.", id); - let response = ready!(waiting.poll_unpin(ctx))?; - if !response.status().is_success() { - return Poll::Ready(Err(Error::Transport(format!( - "Unexpected response status code: {}", - response.status() - )))); - } - self.state = ResponseState::Reading(Box::pin(response.bytes())); - } - ResponseState::Reading(ref mut body) => { - log::trace!("[{}] Reading body.", id); - let chunk = ready!(body.poll_unpin(ctx))?; - let response = chunk.to_vec(); - log::trace!( - "[{}] Extracting result from:\n{}", - self.id, - std::str::from_utf8(&response).unwrap_or("") - ); - return Poll::Ready((self.extract)(response)); - } - } - } +fn id_of_output(output: &Output) -> Result { + let id = match output { + Output::Success(success) => &success.id, + Output::Failure(failure) => &failure.id, + }; + match id { + jsonrpc_core::Id::Num(num) => Ok(*num as RequestId), + _ => Err(Error::InvalidResponse("response id is not u64".to_string())), } } @@ -249,35 +174,11 @@ where mod tests { use super::*; - #[test] - fn http_supports_basic_auth_with_user_and_password() { - let http = Http::new("https://user:password@127.0.0.1:8545").unwrap(); - assert!(http.basic_auth.is_some()); - assert_eq!( - http.basic_auth, - Some(HeaderValue::from_static("Basic dXNlcjpwYXNzd29yZA==")) - ) - } - - #[test] - fn http_supports_basic_auth_with_user_no_password() { - let http = Http::new("https://username:@127.0.0.1:8545").unwrap(); - assert!(http.basic_auth.is_some()); - assert_eq!(http.basic_auth, Some(HeaderValue::from_static("Basic dXNlcm5hbWU6"))) - } - - #[test] - fn http_supports_basic_auth_with_only_password() { - let http = Http::new("https://:password@127.0.0.1:8545").unwrap(); - assert!(http.basic_auth.is_some()); - assert_eq!(http.basic_auth, Some(HeaderValue::from_static("Basic OnBhc3N3b3Jk"))) - } - async fn server(req: hyper::Request) -> hyper::Result> { use hyper::body::HttpBody; - let expected = r#"{"jsonrpc":"2.0","method":"eth_getAccounts","params":[],"id":1}"#; - let response = r#"{"jsonrpc":"2.0","id":1,"result":"x"}"#; + let expected = r#"{"jsonrpc":"2.0","method":"eth_getAccounts","params":[],"id":0}"#; + let response = r#"{"jsonrpc":"2.0","id":0,"result":"x"}"#; assert_eq!(req.method(), &hyper::Method::POST); assert_eq!(req.uri().path(), "/"); @@ -312,6 +213,29 @@ mod tests { println!("Got response"); // then - assert_eq!(response, Ok(rpc::Value::String("x".into()))); + assert_eq!(response, Ok(Value::String("x".into()))); + } + + #[test] + fn handles_batch_response_being_in_different_order_than_input() { + let ids = vec![0, 1, 2]; + // This order is different from the ids. + let outputs = [1u64, 0, 2] + .iter() + .map(|&id| { + Output::Success(jsonrpc_core::Success { + jsonrpc: None, + result: id.into(), + id: jsonrpc_core::Id::Num(id), + }) + }) + .collect(); + let results = handle_batch_response(&ids, outputs) + .unwrap() + .into_iter() + .map(|result| result.unwrap().as_u64().unwrap() as usize) + .collect::>(); + // The order of the ids should have been restored. + assert_eq!(ids, results); } } diff --git a/src/transports/mod.rs b/src/transports/mod.rs index 13cb0d49..df762993 100644 --- a/src/transports/mod.rs +++ b/src/transports/mod.rs @@ -26,7 +26,7 @@ pub mod test; #[cfg(feature = "url")] impl From for crate::Error { fn from(err: url::ParseError) -> Self { - crate::Error::Transport(format!("{:?}", err)) + crate::Error::Transport(format!("failed to parse url: {}", err)) } } From d6e1f528caa0c4aa3a4de54fc0b9b0b42edc4e56 Mon Sep 17 00:00:00 2001 From: Valentin Date: Mon, 7 Jun 2021 18:36:16 +0200 Subject: [PATCH 2/2] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Tomasz Drwięga --- src/transports/http.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/transports/http.rs b/src/transports/http.rs index fccdf219..680c110f 100644 --- a/src/transports/http.rs +++ b/src/transports/http.rs @@ -8,7 +8,6 @@ use crate::{ use futures::future::BoxFuture; #[cfg(feature = "wasm")] use futures::future::LocalBoxFuture as BoxFuture; -use headers::HeaderValue; use jsonrpc_core::types::{Call, Output, Request, Value}; use reqwest::{Client, Url}; use serde::de::DeserializeOwned; @@ -45,7 +44,7 @@ impl Http { let mut builder = Client::builder(); #[cfg(not(feature = "wasm"))] { - builder = builder.user_agent(HeaderValue::from_static("web3.rs")); + builder = builder.user_agent(headers::HeaderValue::from_static("web3.rs")); } let client = builder .build()