diff --git a/Cargo.toml b/Cargo.toml index 525b15ee..0f5d2d72 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ futures-util = { version = "0.3.16", default-features = false, features = ["allo http-body-util = "0.1.0" tokio = { version = "1", features = ["macros", "test-util", "signal"] } tokio-test = "0.4" +tower = { version = "0.5", features = ["util"] } tower-test = "0.4" pretty_env_logger = "0.5" @@ -77,7 +78,7 @@ full = [ client = ["hyper/client", "tokio/net", "dep:tracing", "dep:futures-channel", "dep:tower-service"] client-legacy = ["client", "dep:socket2", "tokio/sync", "dep:libc", "dep:futures-util"] -client-pool = ["dep:futures-util", "dep:tower-layer"] +client-pool = ["client", "dep:futures-util", "dep:tower-layer"] client-proxy = ["client", "dep:base64", "dep:ipnet", "dep:percent-encoding"] client-proxy-system = ["dep:system-configuration", "dep:windows-registry"] @@ -99,6 +100,10 @@ __internal_happy_eyeballs_tests = [] [[example]] name = "client" +required-features = ["client-legacy", "client-pool", "http1", "tokio"] + +[[example]] +name = "client_legacy" required-features = ["client-legacy", "http1", "tokio"] [[example]] diff --git a/examples/client.rs b/examples/client.rs index 04defac0..d40b357b 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,37 +1,157 @@ -use std::env; +use tower::ServiceExt; +use tower_service::Service; -use http_body_util::Empty; -use hyper::Request; -use hyper_util::client::legacy::{connect::HttpConnector, Client}; +use hyper_util::client::pool; #[tokio::main(flavor = "current_thread")] -async fn main() -> Result<(), Box> { - let url = match env::args().nth(1) { - Some(url) => url, - None => { - eprintln!("Usage: client "); - return Ok(()); - } - }; - - // HTTPS requires picking a TLS implementation, so give a better - // warning if the user tries to request an 'https' URL. - let url = url.parse::()?; - if url.scheme_str() != Some("http") { - eprintln!("This example only works with 'http' URLs."); - return Ok(()); +async fn main() -> Result<(), Box> { + send_nego().await +} + +async fn send_h1() -> Result<(), Box> { + let tcp = hyper_util::client::legacy::connect::HttpConnector::new(); + + let http1 = tcp.and_then(|conn| { + Box::pin(async move { + let (mut tx, c) = hyper::client::conn::http1::handshake::< + _, + http_body_util::Empty, + >(conn) + .await?; + tokio::spawn(async move { + if let Err(e) = c.await { + eprintln!("connection error: {:?}", e); + } + }); + let svc = tower::service_fn(move |req| tx.send_request(req)); + Ok::<_, Box>(svc) + }) + }); + + let mut p = pool::Cache::new(http1).build(); + + let mut c = p.call(http::Uri::from_static("http://hyper.rs")).await?; + eprintln!("{:?}", c); + + let req = http::Request::builder() + .header("host", "hyper.rs") + .body(http_body_util::Empty::new()) + .unwrap(); + + c.ready().await?; + let resp = c.call(req).await?; + eprintln!("{:?}", resp); + + Ok(()) +} + +async fn send_h2() -> Result<(), Box> { + let tcp = hyper_util::client::legacy::connect::HttpConnector::new(); + + let http2 = tcp.and_then(|conn| { + Box::pin(async move { + let (mut tx, c) = hyper::client::conn::http2::handshake::< + _, + _, + http_body_util::Empty, + >(hyper_util::rt::TokioExecutor::new(), conn) + .await?; + println!("connected"); + tokio::spawn(async move { + if let Err(e) = c.await { + eprintln!("connection error: {:?}", e); + } + }); + let svc = tower::service_fn(move |req| tx.send_request(req)); + Ok::<_, Box>(svc) + }) + }); + + let mut p = pool::Singleton::new(http2); + + for _ in 0..5 { + let mut c = p + .call(http::Uri::from_static("http://localhost:3000")) + .await?; + eprintln!("{:?}", c); + + let req = http::Request::builder() + .header("host", "hyper.rs") + .body(http_body_util::Empty::new()) + .unwrap(); + + c.ready().await?; + let resp = c.call(req).await?; + eprintln!("{:?}", resp); } - let client = Client::builder(hyper_util::rt::TokioExecutor::new()).build(HttpConnector::new()); + Ok(()) +} - let req = Request::builder() - .uri(url) - .body(Empty::::new())?; +async fn send_nego() -> Result<(), Box> { + let tcp = hyper_util::client::legacy::connect::HttpConnector::new(); - let resp = client.request(req).await?; + let http1 = tower::layer::layer_fn(|tcp| { + tower::service_fn(move |dst| { + let inner = tcp.call(dst); + async move { + let conn = inner.await?; + let (mut tx, c) = hyper::client::conn::http1::handshake::< + _, + http_body_util::Empty, + >(conn) + .await?; + tokio::spawn(async move { + if let Err(e) = c.await { + eprintln!("connection error: {:?}", e); + } + }); + let svc = tower::service_fn(move |req| tx.send_request(req)); + Ok::<_, Box>(svc) + } + }) + }); - eprintln!("{:?} {:?}", resp.version(), resp.status()); - eprintln!("{:#?}", resp.headers()); + let http2 = tower::layer::layer_fn(|tcp| { + tower::service_fn(move |dst| { + let inner = tcp.call(dst); + async move { + let conn = inner.await?; + let (mut tx, c) = hyper::client::conn::http2::handshake::< + _, + _, + http_body_util::Empty, + >(hyper_util::rt::TokioExecutor::new(), conn) + .await?; + println!("connected"); + tokio::spawn(async move { + if let Err(e) = c.await { + eprintln!("connection error: {:?}", e); + } + }); + let svc = tower::service_fn(move |req| tx.send_request(req)); + Ok::<_, Box>(svc) + } + }) + }); + + let mut svc = pool::negotiate(tcp, |_| false, http1, http2); + + for _ in 0..5 { + let mut c = svc + .call(http::Uri::from_static("http://localhost:3000")) + .await?; + eprintln!("{:?}", c); + + let req = http::Request::builder() + .header("host", "hyper.rs") + .body(http_body_util::Empty::new()) + .unwrap(); + + c.ready().await?; + let resp = c.call(req).await?; + eprintln!("{:?}", resp); + } Ok(()) } diff --git a/examples/client_legacy.rs b/examples/client_legacy.rs new file mode 100644 index 00000000..04defac0 --- /dev/null +++ b/examples/client_legacy.rs @@ -0,0 +1,37 @@ +use std::env; + +use http_body_util::Empty; +use hyper::Request; +use hyper_util::client::legacy::{connect::HttpConnector, Client}; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), Box> { + let url = match env::args().nth(1) { + Some(url) => url, + None => { + eprintln!("Usage: client "); + return Ok(()); + } + }; + + // HTTPS requires picking a TLS implementation, so give a better + // warning if the user tries to request an 'https' URL. + let url = url.parse::()?; + if url.scheme_str() != Some("http") { + eprintln!("This example only works with 'http' URLs."); + return Ok(()); + } + + let client = Client::builder(hyper_util::rt::TokioExecutor::new()).build(HttpConnector::new()); + + let req = Request::builder() + .uri(url) + .body(Empty::::new())?; + + let resp = client.request(req).await?; + + eprintln!("{:?} {:?}", resp.version(), resp.status()); + eprintln!("{:#?}", resp.headers()); + + Ok(()) +} diff --git a/src/client/pool/mod.rs b/src/client/pool/mod.rs index 9f1a8fce..fd505416 100644 --- a/src/client/pool/mod.rs +++ b/src/client/pool/mod.rs @@ -1,4 +1,8 @@ //! Composable pool services +//! +//! This module contains various concepts of a connection pool separated into +//! their own concerns. This allows for users to compose the layers, along with +//! any other layers, when constructing custom connection pools. pub mod cache; pub mod map; diff --git a/src/lib.rs b/src/lib.rs index 65bbe465..ef133f6c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,11 @@ mod common; pub mod rt; #[cfg(feature = "server")] pub mod server; -#[cfg(any(feature = "service", feature = "client-legacy"))] +#[cfg(any( + feature = "service", + feature = "client-legacy", + feature = "client-pool", +))] pub mod service; mod error;