Skip to content

Commit

Permalink
Merge pull request #22 from dignifiedquire/h1
Browse files Browse the repository at this point in the history
feat: implement client for async-h1
  • Loading branch information
yoshuawuyts authored Apr 11, 2020
2 parents 98359bf + fe36283 commit 27987ed
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 105 deletions.
18 changes: 13 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ documentation = "https://docs.rs/http-client"
description = "Types and traits for http clients."
keywords = ["http", "service", "client", "futures", "async"]
categories = ["asynchronous", "web-programming", "web-programming::http-client", "web-programming::websocket"]
authors = ["Yoshua Wuyts <yoshuawuyts@gmail.com>"]
authors = ["Yoshua Wuyts <yoshuawuyts@gmail.com>", "dignifiedquire <me@dignifiedquire.com>"]
readme = "README.md"
edition = "2018"

Expand All @@ -16,18 +16,26 @@ features = ["docs"]
rustdoc-args = ["--cfg", "feature=\"docs\""]

[features]
docs = ["native_client"]
default = ["h1_client"]
docs = ["h1_client"]
h1_client = ["async-h1", "async-std", "async-native-tls"]
native_client = ["curl_client", "wasm_client"]
curl_client = ["isahc"]
curl_client = ["isahc", "async-std"]
wasm_client = ["js-sys", "web-sys", "wasm-bindgen", "wasm-bindgen-futures"]

[dependencies]
futures = { version = "0.3.1", features = ["compat", "io-compat"] }
http = "0.1.19"
http-types = { version = "1.0.1", features = ["hyperium_http"] }
log = "0.4.7"

# h1-client
async-h1 = { version = "1.0.0", optional = true }
async-std = { version = "1.4.0", default-features = false, optional = true }
async-native-tls = { version = "0.3.1", optional = true }

# isahc-client
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
isahc = { version = "0.8", optional = true, default-features = false, features = ["http2"] }
isahc = { version = "0.9", optional = true, default-features = false, features = ["http2"] }

# wasm-client
[target.'cfg(target_arch = "wasm32")'.dependencies]
Expand Down
80 changes: 80 additions & 0 deletions src/h1.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//! http-client implementation for async-h1.
use super::{HttpClient, Request, Response};

use async_h1::client;
use futures::future::BoxFuture;
use http_types::{Error, StatusCode};

/// Async-h1 based HTTP Client.
#[derive(Debug)]
pub struct H1Client {}

impl Default for H1Client {
fn default() -> Self {
Self::new()
}
}

impl H1Client {
/// Create a new instance.
pub fn new() -> Self {
Self {}
}
}

impl Clone for H1Client {
fn clone(&self) -> Self {
Self {}
}
}

impl HttpClient for H1Client {
type Error = Error;

fn send(&self, req: Request) -> BoxFuture<'static, Result<Response, Self::Error>> {
Box::pin(async move {
// Insert host
let host = req
.url()
.host_str()
.ok_or_else(|| Error::from_str(StatusCode::BadRequest, "missing hostname"))?;

let scheme = req.url().scheme();
if scheme != "http" && scheme != "https" {
return Err(Error::from_str(
StatusCode::BadRequest,
format!("invalid url scheme '{}'", scheme),
));
}

let addr = req
.url()
.socket_addrs(|| match req.url().scheme() {
"http" => Some(80),
"https" => Some(443),
_ => None,
})?
.into_iter()
.next()
.ok_or_else(|| Error::from_str(StatusCode::BadRequest, "missing valid address"))?;

log::trace!("> Scheme: {}", scheme);

match scheme {
"http" => {
let stream = async_std::net::TcpStream::connect(addr).await?;
client::connect(stream, req).await
}
"https" => {
let raw_stream = async_std::net::TcpStream::connect(addr).await?;

let stream = async_native_tls::connect(host, raw_stream).await?;

client::connect(stream, req).await
}
_ => unreachable!(),
}
})
}
}
23 changes: 11 additions & 12 deletions src/isahc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
use super::{Body, HttpClient, Request, Response};

use async_std::io::BufReader;
use futures::future::BoxFuture;
use isahc::http;

use std::sync::Arc;

Expand Down Expand Up @@ -46,25 +48,22 @@ impl HttpClient for IsahcClient {
fn send(&self, req: Request) -> BoxFuture<'static, Result<Response, Self::Error>> {
let client = self.client.clone();
Box::pin(async move {
let (parts, body) = req.into_parts();

let body = if body.is_empty() {
isahc::Body::empty()
} else {
match body.len {
Some(len) => isahc::Body::reader_sized(body, len),
None => isahc::Body::reader(body),
}
let req_hyperium: http::Request<http_types::Body> = req.into();
let (parts, body) = req_hyperium.into_parts();
let body = match body.len() {
Some(len) => isahc::Body::from_reader_sized(body, len as u64),
None => isahc::Body::from_reader(body),
};

let req: http::Request<isahc::Body> = http::Request::from_parts(parts, body);

let res = client.send_async(req).await?;

let (parts, body) = res.into_parts();
let body = Body::from_reader(body);

let len = body.len().map(|len| len as usize);
let body = Body::from_reader(BufReader::new(body), len);
let res = http::Response::from_parts(parts, body);
Ok(res)
Ok(res.into())
})
}
}
99 changes: 11 additions & 88 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,6 @@
)]

use futures::future::BoxFuture;
use futures::io::{AsyncRead, Cursor};

use std::error::Error;
use std::fmt::{self, Debug};
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

#[cfg_attr(feature = "docs", doc(cfg(curl_client)))]
#[cfg(all(feature = "curl_client", not(target_arch = "wasm32")))]
Expand All @@ -35,11 +28,15 @@ pub mod wasm;
#[cfg(feature = "native_client")]
pub mod native;

#[cfg_attr(feature = "docs", doc(cfg(h1_client)))]
#[cfg(feature = "h1_client")]
pub mod h1;

/// An HTTP Request type with a streaming body.
pub type Request = http::Request<Body>;
pub type Request = http_types::Request;

/// An HTTP Response type with a streaming body.
pub type Response = http::Response<Body>;
pub type Response = http_types::Response;

/// An abstract HTTP client.
///
Expand All @@ -55,90 +52,16 @@ pub type Response = http::Response<Body>;
///
/// How `Clone` is implemented is up to the implementors, but in an ideal scenario combining this
/// with the `Client` builder will allow for high connection reuse, improving latency.
pub trait HttpClient: Debug + Unpin + Send + Sync + Clone + 'static {
pub trait HttpClient: std::fmt::Debug + Unpin + Send + Sync + Clone + 'static {
/// The associated error type.
type Error: Error + Send + Sync;
type Error: Send + Sync + Into<Error>;

/// Perform a request.
fn send(&self, req: Request) -> BoxFuture<'static, Result<Response, Self::Error>>;
}

/// The raw body of an http request or response.
///
/// A body is a stream of `Bytes` values, which are shared handles to byte buffers.
/// Both `Body` and `Bytes` values can be easily created from standard owned byte buffer types
/// like `Vec<u8>` or `String`, using the `From` trait.
pub struct Body {
reader: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
/// Intentionally use `u64` over `usize` here.
/// `usize` won't work if you try to send 10GB file from 32bit host.
#[allow(dead_code)] // not all backends make use of this
len: Option<u64>,
}

impl Body {
/// Create a new empty body.
pub fn empty() -> Self {
Self {
reader: None,
len: Some(0),
}
}

/// Create a new instance from a reader.
pub fn from_reader(reader: impl AsyncRead + Unpin + Send + 'static) -> Self {
Self {
reader: Some(Box::new(reader)),
len: None,
}
}

/// Validate that the body was created with `Body::empty()`.
pub fn is_empty(&self) -> bool {
self.reader.is_none()
}
}
pub type Body = http_types::Body;

impl AsyncRead for Body {
#[allow(missing_doc_code_examples)]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
match self.reader.as_mut() {
Some(reader) => Pin::new(reader).poll_read(cx, buf),
None => Poll::Ready(Ok(0)),
}
}
}

impl fmt::Debug for Body {
#[allow(missing_doc_code_examples)]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Body").field("reader", &"<hidden>").finish()
}
}

impl From<Vec<u8>> for Body {
#[allow(missing_doc_code_examples)]
#[inline]
fn from(vec: Vec<u8>) -> Body {
let len = vec.len() as u64;
Self {
reader: Some(Box::new(Cursor::new(vec))),
len: Some(len),
}
}
}

impl<R: AsyncRead + Unpin + Send + 'static> From<Box<R>> for Body {
/// Converts an `AsyncRead` into a Body.
#[allow(missing_doc_code_examples)]
fn from(reader: Box<R>) -> Self {
Self {
reader: Some(reader),
len: None,
}
}
}
/// Error type.
pub type Error = http_types::Error;

0 comments on commit 27987ed

Please sign in to comment.