diff --git a/Cargo.toml b/Cargo.toml index 09bffa8..f80c21d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,3 +18,4 @@ hyper = "0.12.27" tokio = "0.1.19" tower = { git = "https://github.com/tower-rs/tower" } tower-hyper = { git = "https://github.com/tower-rs/tower-hyper" } +tower-reconnect = { git = "https://github.com/tower-rs/tower" } diff --git a/src/http-client.rs b/src/http-client.rs index 52f4f64..6153f3a 100644 --- a/src/http-client.rs +++ b/src/http-client.rs @@ -4,12 +4,9 @@ use hyper::{ Request, Response, Uri, }; use std::time::Duration; -use tower::{builder::ServiceBuilder, reconnect::Reconnect, Service, ServiceExt}; -use tower_hyper::{ - client::{Builder, Connect}, - retry::{Body, RetryPolicy}, - util::Connector, -}; +use tower::{builder::ServiceBuilder, Service, ServiceExt}; +use tower_hyper::{client::Connect, util::Connector, Body}; +use tower_reconnect::Reconnect; fn main() { let fut = futures::lazy(|| { @@ -22,29 +19,24 @@ fn main() { fn request() -> impl Future, Error = ()> { let connector = Connector::new(HttpConnector::new(1)); - let hyper = Connect::new(connector, Builder::new()); + let hyper = Connect::new(connector); - // RetryPolicy is a very simple policy that retries `n` times - // if the response has a 500 status code. Here, `n` is 5. - let policy = RetryPolicy::new(5); // We're calling the tower/examples/server.rs. let dst = Destination::try_from_uri(Uri::from_static("http://127.0.0.1:3000")).unwrap(); // Now, to build the service! We use two BufferLayers in order to: // - provide backpressure for the RateLimitLayer, and ConcurrencyLimitLayer - // - meet `RetryLayer`'s requirement that our service implement `Service + Clone` // - ..and to provide cheap clones on the service. - let maker = ServiceBuilder::new() + let service = ServiceBuilder::new() .buffer(5) .rate_limit(5, Duration::from_secs(1)) .concurrency_limit(5) - .retry(policy) .buffer(5) - .make_service(hyper); + .service(hyper); - // `Reconnect` accepts a destination and a MakeService, creating a new service + // `Reconnect` accepts a destination and a Service, creating a new service // any time the connection encounters an error. - let client = Reconnect::new(maker, dst); + let client = Reconnect::new(service, dst); let request = Request::builder() .method("GET") @@ -55,9 +47,5 @@ fn request() -> impl Future, Error = ()> { client .ready() .map_err(|e| panic!("Service is not ready: {:?}", e)) - .and_then(|mut c| { - c.call(request) - .map(|res| res.map(|b| b.into_inner())) - .map_err(|e| panic!("{:?}", e)) - }) + .and_then(|mut c| c.call(request).map_err(|e| panic!("{:?}", e))) } diff --git a/src/http-server.rs b/src/http-server.rs index c6e0f41..c34695f 100644 --- a/src/http-server.rs +++ b/src/http-server.rs @@ -2,7 +2,7 @@ use futures::{future, Future, Poll, Stream}; use hyper::{self, Body, Request, Response}; use tokio::net::TcpListener; use tower::{builder::ServiceBuilder, Service}; -use tower_hyper::{body::LiftBody, server::Server}; +use tower_hyper::server::Server; fn main() { hyper::rt::run(future::lazy(|| { @@ -11,10 +11,7 @@ fn main() { println!("Listening on http://{}", addr); - let maker = ServiceBuilder::new() - .concurrency_limit(5) - .make_service(MakeSvc); - + let maker = ServiceBuilder::new().concurrency_limit(5).service(MakeSvc); let server = Server::new(maker); bind.incoming() @@ -37,8 +34,8 @@ fn main() { } struct Svc; -impl Service>> for Svc { - type Response = Response<&'static str>; +impl Service> for Svc { + type Response = Response; type Error = hyper::Error; type Future = future::FutureResult; @@ -46,8 +43,8 @@ impl Service>> for Svc { Ok(().into()) } - fn call(&mut self, _req: Request>) -> Self::Future { - let res = Response::new("Hello World!"); + fn call(&mut self, _req: Request) -> Self::Future { + let res = Response::new(Body::from("Hello World!")); future::ok(res) } }