Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switch to reqwest #491

Merged
merged 4 commits into from
May 17, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ pin-project = "1.0"
# Optional deps
## HTTP
base64 = { version = "0.13", optional = true }
hyper = { version = "0.14", optional = true, default-features = false, features = ["client", "http1", "stream", "tcp"] }
hyper-tls = { version = "0.5", optional = true }
hyper-proxy = {version = "0.9.0", default-features = false, optional = true }
hyper-rustls = { version = "0.22.1", default-features = false, features = ["webpki-tokio"], optional = true }
bytes = { version = "1.0", optional = true }
reqwest = { version = "0.11", optional = true, default-features = false }
headers = { version = "0.3", optional = true }
## WS
async-native-tls = { git = "https://github.com/async-email/async-native-tls.git", rev = "b5b5562d6cea77f913d4cbe448058c031833bf17", optional = true, default-features = false }
Expand Down Expand Up @@ -66,11 +64,13 @@ tokio-stream = { version = "0.1", features = ["net"] }

[features]
default = ["http-tls", "signing", "ws-tls-tokio", "ipc-tokio"]
eip-1193 = ["js-sys", "wasm-bindgen", "wasm-bindgen-futures", "futures-timer/wasm-bindgen", "rand", "getrandom"]
_http_base = ["hyper", "url", "base64", "headers"]
http = ["_http_base", "hyper-proxy/tls"]
http-tls = ["hyper-tls", "http"]
http-rustls = ["hyper-rustls", "_http_base", "hyper-proxy/rustls-webpki"]
wasm = ["js-sys", "wasm-bindgen", "wasm-bindgen-futures", "futures-timer/wasm-bindgen", "rand", "getrandom"]
eip-1193 = ["wasm"]
_http_base = ["reqwest", "bytes", "url", "base64", "headers"]
http = ["_http_base"]
http-tls = ["http", "reqwest/default-tls"]
http-native-tls = ["http", "reqwest/native-tls"]
http-rustls-tls = ["http", "reqwest/rustls-tls"]
clouds56 marked this conversation as resolved.
Show resolved Hide resolved
signing = ["secp256k1"]
ws-tokio = ["soketto", "url", "tokio", "tokio-util"]
ws-async-std = ["soketto", "url", "async-std"]
Expand Down
172 changes: 63 additions & 109 deletions src/transports/http.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
//! HTTP Transport

use crate::{error, helpers, rpc, BatchTransport, Error, RequestId, Transport};
#[cfg(not(feature = "wasm"))]
use futures::future::BoxFuture;
#[cfg(feature = "wasm")]
use futures::future::LocalBoxFuture as BoxFuture;
use futures::{
self,
task::{Context, Poll},
Future, FutureExt, StreamExt,
Future, FutureExt,
};
use hyper::header::HeaderValue;
use reqwest::header::HeaderValue;
use std::{
env, fmt,
fmt,
ops::Deref,
pin::Pin,
sync::{
Expand All @@ -18,105 +22,59 @@ use std::{
};
use url::Url;

impl From<hyper::Error> for Error {
fn from(err: hyper::Error) -> Self {
impl From<reqwest::Error> for Error {
fn from(err: reqwest::Error) -> Self {
Error::Transport(format!("{:?}", err))
}
}

impl From<hyper::http::uri::InvalidUri> for Error {
fn from(err: hyper::http::uri::InvalidUri) -> Self {
Error::Transport(format!("{:?}", err))
}
}

impl From<hyper::header::InvalidHeaderValue> for Error {
fn from(err: hyper::header::InvalidHeaderValue) -> Self {
impl From<reqwest::header::InvalidHeaderValue> for Error {
fn from(err: reqwest::header::InvalidHeaderValue) -> Self {
Error::Transport(format!("{}", err))
}
}

// The max string length of a request without transfer-encoding: chunked.
const MAX_SINGLE_CHUNK: usize = 256;

#[cfg(feature = "http-tls")]
#[derive(Debug, Clone)]
enum Client {
Proxy(hyper::Client<hyper_proxy::ProxyConnector<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>>),
NoProxy(hyper::Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>>),
}

#[cfg(feature = "http-rustls")]
#[derive(Debug, Clone)]
enum Client {
Proxy(hyper::Client<hyper_proxy::ProxyConnector<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>>>),
NoProxy(hyper::Client<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>>),
}

#[cfg(not(any(feature = "http-tls", feature = "http-rustls")))]
#[derive(Debug, Clone)]
enum Client {
Proxy(hyper::Client<hyper_proxy::ProxyConnector<hyper::client::HttpConnector>>),
NoProxy(hyper::Client<hyper::client::HttpConnector>),
}

impl Client {
pub fn request(&self, req: hyper::Request<hyper::Body>) -> hyper::client::ResponseFuture {
match self {
Client::Proxy(client) => client.request(req),
Client::NoProxy(client) => client.request(req),
}
}
}

/// HTTP Transport (synchronous)
#[derive(Debug, Clone)]
pub struct Http {
id: Arc<AtomicUsize>,
url: hyper::Uri,
url: reqwest::Url,
basic_auth: Option<HeaderValue>,
client: Client,
client: reqwest::Client,
}

impl Http {
/// Create new HTTP transport connecting to given URL.
#[allow(unused_mut)]
clouds56 marked this conversation as resolved.
Show resolved Hide resolved
pub fn new(url: &str) -> error::Result<Self> {
#[cfg(feature = "http-tls")]
let (proxy_env, connector) = { (env::var("HTTPS_PROXY"), hyper_tls::HttpsConnector::new()) };
#[cfg(feature = "http-rustls")]
let (proxy_env, connector) = {
(
env::var("HTTPS_PROXY"),
hyper_rustls::HttpsConnector::with_webpki_roots(),
)
};
#[cfg(not(any(feature = "http-tls", feature = "http-rustls")))]
let (proxy_env, connector) = { (env::var("HTTP_PROXY"), hyper::client::HttpConnector::new()) };
let mut client_builder = reqwest::Client::builder();

let client = match proxy_env {
Ok(proxy) => {
let mut url = url::Url::parse(&proxy)?;
let username = String::from(url.username());
let password = String::from(url.password().unwrap_or_default());

url.set_username("").map_err(|_| Error::Internal)?;
url.set_password(None).map_err(|_| Error::Internal)?;

let uri = url.to_string().parse()?;
#[cfg(feature = "http-native-tls")]
{
client_builder = client_builder.use_native_tls();
}

let mut proxy = hyper_proxy::Proxy::new(hyper_proxy::Intercept::All, uri);
#[cfg(feature = "http-rustls-tls")]
{
client_builder = client_builder.use_rustls_tls();
}

if username != "" {
let credentials = headers::Authorization::basic(&username, &password);
proxy.set_authorization(credentials);
#[cfg(not(feature = "wasm"))]
{
let proxy_env = std::env::var("HTTPS_PROXY");
client_builder = match proxy_env {
Ok(proxy_scheme) => {
let proxy = reqwest::Proxy::all(proxy_scheme.as_str())?;
client_builder.proxy(proxy)
}
Err(_) => client_builder.no_proxy(),
};
}

let proxy_connector = hyper_proxy::ProxyConnector::from_proxy(connector, proxy)?;

Client::Proxy(hyper::Client::builder().build(proxy_connector))
}
Err(_) => Client::NoProxy(hyper::Client::builder().build(connector)),
};
let client = client_builder.build()?;

let basic_auth = {
let url = Url::parse(url)?;
Expand Down Expand Up @@ -144,29 +102,29 @@ impl Http {
let request = helpers::to_string(&request);
log::debug!("[{}] Sending: {} to {}", id, request, self.url);
let len = request.len();
let mut req = hyper::Request::new(hyper::Body::from(request));
*req.method_mut() = hyper::Method::POST;
*req.uri_mut() = self.url.clone();
req.headers_mut().insert(
hyper::header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
);
req.headers_mut()
.insert(hyper::header::USER_AGENT, HeaderValue::from_static("web3.rs"));

let mut request_builder = self
.client
.post(self.url.clone())
.header(
reqwest::header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
)
.header(reqwest::header::USER_AGENT, HeaderValue::from_static("web3.rs"))
.body(request);

// Don't send chunked request
if len < MAX_SINGLE_CHUNK {
req.headers_mut().insert(hyper::header::CONTENT_LENGTH, len.into());
request_builder = request_builder.header(reqwest::header::CONTENT_LENGTH, len.to_string());
}

// Send basic auth header
if let Some(ref basic_auth) = self.basic_auth {
req.headers_mut()
.insert(hyper::header::AUTHORIZATION, basic_auth.clone());
request_builder = request_builder.header(reqwest::header::AUTHORIZATION, basic_auth.clone());
}
let result = self.client.request(req);

Response::new(id, result, extract)
let result = request_builder.send();
Response::new(id, Box::pin(result), extract)
}
}

Expand Down Expand Up @@ -220,9 +178,11 @@ fn batch_response<T: Deref<Target = [u8]>>(response: T) -> error::Result<Vec<err
}
}

type ResponseFuture = BoxFuture<'static, reqwest::Result<reqwest::Response>>;
type BodyFuture = BoxFuture<'static, reqwest::Result<bytes::Bytes>>;
enum ResponseState {
Waiting(hyper::client::ResponseFuture),
Reading(Vec<u8>, hyper::Body),
Waiting(ResponseFuture),
Reading(BodyFuture),
}

/// A future representing a response to a pending request.
Expand All @@ -234,7 +194,7 @@ pub struct Response<T> {

impl<T> Response<T> {
/// Creates a new `Response`
pub fn new(id: RequestId, response: hyper::client::ResponseFuture, extract: T) -> Self {
pub fn new(id: RequestId, response: ResponseFuture, extract: T) -> Self {
log::trace!("[{}] Request pending.", id);
Response {
id,
Expand Down Expand Up @@ -267,24 +227,18 @@ where
response.status()
))));
}
self.state = ResponseState::Reading(Default::default(), response.into_body());
self.state = ResponseState::Reading(Box::pin(response.bytes()));
}
ResponseState::Reading(ref mut content, ref mut body) => {
ResponseState::Reading(ref mut body) => {
log::trace!("[{}] Reading body.", id);
match ready!(body.poll_next_unpin(ctx)) {
Some(chunk) => {
content.extend(&*chunk?);
}
None => {
let response = std::mem::take(content);
log::trace!(
"[{}] Extracting result from:\n{}",
self.id,
std::str::from_utf8(&response).unwrap_or("<invalid utf8>")
);
return Poll::Ready((self.extract)(response));
}
}
let chunk = ready!(body.poll_unpin(ctx))?;
let response = chunk.to_vec();
log::trace!(
"[{}] Extracting result from:\n{}",
self.id,
std::str::from_utf8(&response).unwrap_or("<invalid utf8>")
);
return Poll::Ready((self.extract)(response));
}
}
}
Expand Down