diff --git a/linkerd/app/core/src/svc.rs b/linkerd/app/core/src/svc.rs index 96e1068b7f..8bbf0ed80c 100644 --- a/linkerd/app/core/src/svc.rs +++ b/linkerd/app/core/src/svc.rs @@ -18,7 +18,11 @@ use tower::{ make::MakeService, }; pub use tower::{ - layer::Layer, service_fn as mk, spawn_ready::SpawnReady, util::Either, Service, ServiceExt, + layer::Layer, + service_fn as mk, + spawn_ready::SpawnReady, + util::{Either, MapErrLayer}, + Service, ServiceExt, }; #[derive(Clone, Debug)] diff --git a/linkerd/app/outbound/src/http/logical.rs b/linkerd/app/outbound/src/http/logical.rs index b9af4d1eb6..754cf3928a 100644 --- a/linkerd/app/outbound/src/http/logical.rs +++ b/linkerd/app/outbound/src/http/logical.rs @@ -73,6 +73,7 @@ where .push(svc::FailFast::layer("HTTP Balancer", dispatch_timeout)) .push(metrics.stack.layer(stack_labels("http", "concrete"))), ) + .push(svc::MapErrLayer::new(Into::into)) .into_new_service() .check_new_service::>() .instrument(|c: &Concrete| match c.resolve.as_ref() { diff --git a/linkerd/app/outbound/src/ingress.rs b/linkerd/app/outbound/src/ingress.rs index 02bd5e5c13..b3a344dbaf 100644 --- a/linkerd/app/outbound/src/ingress.rs +++ b/linkerd/app/outbound/src/ingress.rs @@ -73,6 +73,7 @@ where .into_inner(); svc::stack(http) + .push_on_response(svc::MapErrLayer::new(Into::into)) .check_new_service::>() .push_map_target(http::Logical::from) .push(profiles::discover::layer( diff --git a/linkerd/app/outbound/src/server.rs b/linkerd/app/outbound/src/server.rs index f3ff48b7ed..4b01b68981 100644 --- a/linkerd/app/outbound/src/server.rs +++ b/linkerd/app/outbound/src/server.rs @@ -146,6 +146,7 @@ where let tcp_forward = svc::stack(tcp_connect) .push_make_thunk() .push_on_response(tcp::Forward::layer()) + .push(svc::MapErrLayer::new(Into::into)) .into_new_service() .push_on_response(metrics.stack.layer(stack_labels("tcp", "forward"))) .push_map_target(tcp::Endpoint::from_logical( diff --git a/linkerd/stack/src/future_service.rs b/linkerd/stack/src/future_service.rs deleted file mode 100644 index 639fcc2c6f..0000000000 --- a/linkerd/stack/src/future_service.rs +++ /dev/null @@ -1,59 +0,0 @@ -use futures::{ready, TryFuture, TryFutureExt}; -use linkerd_error::Error; -use std::pin::Pin; -use std::task::{Context, Poll}; - -/// Implements a `Service` from a `Future` that produces a `Service`. -#[derive(Debug)] -pub struct FutureService { - inner: Inner, -} - -#[derive(Debug)] -enum Inner { - Future(F), - Service(S), -} - -// === impl FutureService === - -impl FutureService { - pub fn new(fut: F) -> Self { - Self { - inner: Inner::Future(fut), - } - } -} - -impl tower::Service for FutureService -where - F: TryFuture + Unpin, - F::Error: Into, - S: tower::Service, - S::Error: Into, -{ - type Response = S::Response; - type Error = Error; - type Future = futures::future::MapErr Error>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - loop { - self.inner = match self.inner { - Inner::Future(ref mut fut) => { - let fut = Pin::new(fut); - let svc = ready!(fut.try_poll(cx).map_err(Into::into)?); - Inner::Service(svc) - } - Inner::Service(ref mut svc) => return svc.poll_ready(cx).map_err(Into::into), - }; - } - } - - fn call(&mut self, req: Req) -> Self::Future { - if let Inner::Service(ref mut svc) = self.inner { - return svc.call(req).map_err(Into::into); - } - - panic!("Called before ready"); - } -} diff --git a/linkerd/stack/src/lib.rs b/linkerd/stack/src/lib.rs index 907ab98099..c6a7591477 100644 --- a/linkerd/stack/src/lib.rs +++ b/linkerd/stack/src/lib.rs @@ -6,7 +6,6 @@ mod box_future; mod box_new_service; mod fail; mod fail_on_error; -mod future_service; pub mod layer; pub mod make_thunk; pub mod map_target; @@ -25,7 +24,6 @@ pub use self::{ box_new_service::BoxNewService, fail::Fail, fail_on_error::FailOnError, - future_service::FutureService, make_thunk::MakeThunk, map_target::{MapTarget, MapTargetLayer, MapTargetService}, new_service::NewService, @@ -39,3 +37,4 @@ pub use self::{ unwrap_or::NewUnwrapOr, }; pub use tower::util::Either; +pub use tower::util::{future_service, FutureService};