diff --git a/linkerd/app/outbound/src/http/logical.rs b/linkerd/app/outbound/src/http/logical.rs index ea5e151d9e..6be91fb890 100644 --- a/linkerd/app/outbound/src/http/logical.rs +++ b/linkerd/app/outbound/src/http/logical.rs @@ -34,7 +34,7 @@ where ESvc::Future: Send, R: Resolve + Clone + Send + 'static, R::Resolution: Send, - R::Future: Send, + R::Future: Send + Unpin, { let ProxyConfig { buffer_capacity, @@ -57,7 +57,7 @@ where // the balancer need not drive them all directly. .push(svc::layer::mk(svc::SpawnReady::new)), ) - // Resolve the service to its endponts and balance requests over them. + // Resolve the service to its endpoints and balance requests over them. // // If the balancer has been empty/unavailable, eagerly fail requests. // When the balancer is in failfast, spawn the service in a background @@ -71,7 +71,8 @@ where )) .push(metrics.stack.layer(stack_labels("http", "balancer"))) .push(svc::layer::mk(svc::SpawnReady::new)) - .push(svc::FailFast::layer("HTTP Balancer", dispatch_timeout)), + .push(svc::FailFast::layer("HTTP Balancer", dispatch_timeout)) + .push(http::BoxResponse::layer()), ) .push(svc::MapErrLayer::new(Into::into)) // Drives the initial resolution via the service's readiness. @@ -79,11 +80,21 @@ where // The concrete address is only set when the profile could be // resolved. Endpoint resolution is skipped when there is no // concrete address. - .instrument(|c: &Concrete| match c.resolve.as_ref() { - None => debug_span!("concrete"), - Some(addr) => debug_span!("concrete", %addr), - }) + .instrument(|c: &Concrete| debug_span!("concrete", addr = %c.resolve)) .push_map_target(Concrete::from) + // If there's no resolveable address, bypass the load balancer. + .push(svc::UnwrapOr::layer( + svc::stack(endpoint.clone()) + .push_on_response( + svc::layers() + .push(http::BoxRequest::layer()) + .push(http::BoxResponse::layer()), + ) + .push_map_target(Endpoint::from_logical( + tls::NoClientTls::NotProvidedByServiceDiscovery, + )) + .into_inner(), + )) // Distribute requests over a distribution of balancers via a traffic // split. // diff --git a/linkerd/app/outbound/src/http/mod.rs b/linkerd/app/outbound/src/http/mod.rs index f6bb806aef..e3f8294ed8 100644 --- a/linkerd/app/outbound/src/http/mod.rs +++ b/linkerd/app/outbound/src/http/mod.rs @@ -85,7 +85,7 @@ impl Param for Logical { impl Param for Endpoint { fn param(&self) -> client::Settings { - match self.concrete.logical.protocol { + match self.logical.protocol { Version::H2 => client::Settings::H2, Version::Http1 => match self.metadata.protocol_hint() { ProtocolHint::Unknown => client::Settings::Http1, @@ -97,7 +97,7 @@ impl Param for Endpoint { impl Param> for Endpoint { fn param(&self) -> Option { - match self.concrete.logical.protocol { + match self.logical.protocol { Version::H2 => Some(SessionProtocol::Http2), Version::Http1 => match self.metadata.protocol_hint() { ProtocolHint::Http2 => Some(SessionProtocol::Http2), diff --git a/linkerd/app/outbound/src/http/tests.rs b/linkerd/app/outbound/src/http/tests.rs index 419cd5d2d9..02db65515b 100644 --- a/linkerd/app/outbound/src/http/tests.rs +++ b/linkerd/app/outbound/src/http/tests.rs @@ -98,7 +98,6 @@ where let accept = crate::server::accept_stack( &cfg, profiles, - support::connect::NoRawTcp, NoTcpBalancer, router, metrics.outbound.clone(), @@ -111,9 +110,9 @@ where #[derive(Clone, Debug)] struct NoTcpBalancer; -impl svc::NewService for NoTcpBalancer { +impl svc::NewService for NoTcpBalancer { type Service = Self; - fn new_service(&mut self, target: crate::tcp::Concrete) -> Self::Service { + fn new_service(&mut self, target: T) -> Self::Service { panic!( "no TCP load balancer should be created in this test!\n\ttarget = {:?}", target diff --git a/linkerd/app/outbound/src/resolve.rs b/linkerd/app/outbound/src/resolve.rs index 6acb381cbd..59533dccba 100644 --- a/linkerd/app/outbound/src/resolve.rs +++ b/linkerd/app/outbound/src/resolve.rs @@ -24,14 +24,11 @@ use std::{ time::Duration, }; -type ResolveStack = map_endpoint::Resolve< - EndpointFromMetadata, - RecoverDefault, AllowResolve>>, ->; +type ResolveStack = + map_endpoint::Resolve, ToAddr>>; fn new_resolve(resolve: R) -> ResolveStack where - T: Clone + Param, EndpointFromMetadata: map_endpoint::MapEndpoint, R: Resolve, R::Future: Send, @@ -39,7 +36,7 @@ where { map_endpoint::Resolve::new( EndpointFromMetadata, - RecoverDefault(Filter::new(resolve.into_service(), AllowResolve)), + Filter::new(resolve.into_service(), ToAddr), ) } @@ -50,7 +47,6 @@ pub fn layer( watchdog: Duration, ) -> impl layer::Layer> + Clone where - T: Clone + Param, R: Resolve + Clone, R::Resolution: Send, R::Future: Send, @@ -68,59 +64,14 @@ where } #[derive(Clone, Debug)] -pub struct AllowResolve; +pub struct ToAddr; -/// Wraps a `Resolve` to produce a default resolution when the resolution is -/// rejected. -#[derive(Clone, Debug)] -pub struct RecoverDefault(S); - -// === impl AllowResolve === +// === impl ToAddr === -impl

Predicate> for AllowResolve { +impl

Predicate> for ToAddr { type Request = Addr; fn check(&mut self, target: Concrete

) -> Result { - target.resolve.ok_or_else(|| discovery_rejected().into()) - } -} - -// === impl RecoverDefault === - -type Resolution = - future::Either, Error>>>>; - -impl tower::Service for RecoverDefault -where - T: Param, - S: Resolve, - S::Future: Send + 'static, - S::Resolution: Send + 'static, -{ - type Response = Resolution; - type Error = Error; - type Future = Pin> + Send + 'static>>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - Poll::Ready(futures::ready!(self.0.poll_ready(cx)).map_err(Into::into)) - } - - fn call(&mut self, t: T) -> Self::Future { - let addr = t.param(); - Box::pin( - self.0 - .resolve(t) - .map_ok(future::Either::Left) - .or_else(move |error| { - if is_discovery_rejected(&*error) { - tracing::debug!(%error, %addr, "Synthesizing endpoint"); - let endpoint = (addr, S::Endpoint::default()); - let res = stream::once(future::ok(Update::Reset(vec![endpoint]))); - return future::ok(future::Either::Right(res)); - } - - future::err(error) - }), - ) + Ok(target.resolve) } } diff --git a/linkerd/app/outbound/src/server.rs b/linkerd/app/outbound/src/server.rs index 1a29f3d5a1..dd8bd7dd94 100644 --- a/linkerd/app/outbound/src/server.rs +++ b/linkerd/app/outbound/src/server.rs @@ -8,7 +8,7 @@ use linkerd_app_core::{ profiles, proxy::{api_resolve::Metadata, core::resolve::Resolve}, spans::SpanConverter, - svc, tls, + svc, transport::{listen, metrics::SensorIo}, Addr, Error, IpMatch, TraceContext, }; @@ -33,7 +33,7 @@ where I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + std::fmt::Debug + Send + Unpin + 'static, R: Resolve + Clone + Send + 'static, R::Resolution: Send, - R::Future: Send, + R::Future: Send + Unpin, C: svc::Service + Clone + Send + 'static, C::Response: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin, C::Error: Into, @@ -48,18 +48,11 @@ where P::Future: Send, P::Error: Send, { - let tcp_balance = tcp::balance::stack( - &config.proxy, - tcp_connect.clone(), - resolve, - &metrics, - drain.clone(), - ); + let tcp = tcp::balance::stack(&config.proxy, tcp_connect, resolve, &metrics, drain.clone()); let accept = accept_stack( config, profiles, - tcp_connect, - tcp_balance, + tcp, http_router, metrics.clone(), span_sink, @@ -106,11 +99,10 @@ where .into_inner() } -pub fn accept_stack( +pub fn accept_stack( config: &Config, profiles: P, - tcp_connect: C, - tcp_balance: T, + tcp: T, http_router: H, metrics: metrics::Proxy, span_sink: Option>, @@ -121,19 +113,10 @@ pub fn accept_stack( > + Clone where I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + std::fmt::Debug + Send + Unpin + 'static, - C: svc::Service + Clone + Send + 'static, - C::Response: io::AsyncRead + io::AsyncWrite + Send + Unpin, - C::Error: Into, - C::Future: Send, - T: svc::NewService + Clone + Send + 'static, - TSvc: svc::Service, Response = ()> - + svc::Service - + Send - + 'static, - >::Error: Into, - >::Future: Send, - >>::Error: Into, - >>::Future: Send, + T: svc::NewService<(Option, tcp::Logical), Service = TSvc> + Clone + Send + 'static, + TSvc: svc::Service>, Response = ()> + Send + 'static, + TSvc::Error: Into, + TSvc::Future: Send, H: svc::NewService + Clone + Send + 'static, HSvc: svc::Service, Response = http::Response> + Send @@ -154,14 +137,8 @@ where .. } = config.proxy.clone(); - let tcp_forward = svc::stack(tcp_connect) - .push_make_thunk() - .push_on_response(tcp::Forward::layer()) - .push(svc::MapErrLayer::new(Into::into)) - .into_new_service() - .push_map_target(tcp::Endpoint::from_logical( - tls::NoClientTls::NotProvidedByServiceDiscovery, - )) + let tcp_forward = svc::stack(tcp.clone()) + .push_map_target(|l| (None, l)) .into_inner(); svc::stack(http_router) @@ -199,12 +176,12 @@ where // When an HTTP version cannot be detected, we fallback to a logical // TCP stack. This service needs to be buffered so that it can be // cached and cloned per connection. - svc::stack(tcp_balance.clone()) - .push_map_target(tcp::Concrete::from) + svc::stack(tcp.clone()) .push(profiles::split::layer()) .push_switch(ShouldResolve, tcp_forward.clone()) .push_on_response( svc::layers() + .push_map_target(io::EitherIo::Right) .push(metrics.stack.layer(stack_labels("tcp", "logical"))) .push(svc::layer::mk(svc::SpawnReady::new)) .push(svc::FailFast::layer("TCP Logical", dispatch_timeout)) @@ -226,10 +203,14 @@ where // detection and just use the TCP logical stack directly. Unlike the // above case, this stack need not be buffered, since `fn cache` // applies its own buffer on the returned service. - svc::stack(tcp_balance) - .push_map_target(tcp::Concrete::from) + svc::stack(tcp) .push(profiles::split::layer()) .push_switch(ShouldResolve, tcp_forward) + .push_on_response( + svc::layers() + .push_map_target(io::EitherIo::Left) + .push(metrics.stack.layer(stack_labels("tcp", "passthru"))), + ) .instrument(|_: &_| debug_span!("tcp.opaque")) .into_inner(), ) diff --git a/linkerd/app/outbound/src/target.rs b/linkerd/app/outbound/src/target.rs index d9d67e9830..1e1bad69b8 100644 --- a/linkerd/app/outbound/src/target.rs +++ b/linkerd/app/outbound/src/target.rs @@ -24,7 +24,7 @@ pub struct Logical

{ #[derive(Clone, Debug)] pub struct Concrete

{ - pub resolve: Option, + pub resolve: Addr, pub logical: Logical

, } @@ -34,7 +34,7 @@ pub struct Endpoint

{ pub target_addr: SocketAddr, pub tls: tls::ConditionalClientTls, pub metadata: Metadata, - pub concrete: Concrete

, + pub logical: Logical

, } #[derive(Clone, Copy, Debug)] @@ -148,22 +148,12 @@ impl std::fmt::Debug for Logical

{ // === impl Concrete === -impl

From<(Option, Logical

)> for Concrete

{ - fn from((resolve, logical): (Option, Logical

)) -> Self { +impl

From<(Addr, Logical

)> for Concrete

{ + fn from((resolve, logical): (Addr, Logical

)) -> Self { Self { resolve, logical } } } -/// Produces an address to be used if resolution is rejected. -impl

Param for Concrete

{ - fn param(&self) -> SocketAddr { - self.resolve - .as_ref() - .and_then(|a| a.socket_addr()) - .unwrap_or(self.logical.orig_dst) - } -} - // === impl Endpoint === impl

Endpoint

{ @@ -179,20 +169,14 @@ impl

Endpoint

{ addr: logical.param(), metadata: Metadata::default(), tls: Conditional::None(reason), - concrete: Concrete { - logical, - resolve: None, - }, + logical, target_addr, }, Some((addr, metadata)) => Self { addr, tls: EndpointFromMetadata::client_tls(&metadata), metadata, - concrete: Concrete { - logical, - resolve: None, - }, + logical, target_addr, }, } @@ -231,7 +215,7 @@ impl

Param for Endpoint

{ impl

Param for Endpoint

{ fn param(&self) -> metrics::OutboundEndpointLabels { metrics::OutboundEndpointLabels { - authority: Some(self.concrete.logical.addr().to_http_authority()), + authority: Some(self.logical.addr().to_http_authority()), labels: metrics::prefix_labels("dst", self.metadata.labels().iter()), server_id: self.tls.clone(), target_addr: self.target_addr, @@ -249,9 +233,8 @@ impl std::hash::Hash for Endpoint

{ fn hash(&self, state: &mut H) { self.addr.hash(state); self.tls.hash(state); - self.concrete.resolve.hash(state); - self.concrete.logical.orig_dst.hash(state); - self.concrete.logical.protocol.hash(state); + self.logical.orig_dst.hash(state); + self.logical.protocol.hash(state); } } @@ -298,7 +281,7 @@ impl MapEndpoint, Metadata> for Endpoint addr, tls: Self::client_tls(&metadata), metadata, - concrete: concrete.clone(), + logical: concrete.logical.clone(), target_addr: concrete.logical.orig_dst, } } diff --git a/linkerd/app/outbound/src/tcp/balance.rs b/linkerd/app/outbound/src/tcp/balance.rs index f39a952a86..d56e98ce84 100644 --- a/linkerd/app/outbound/src/tcp/balance.rs +++ b/linkerd/app/outbound/src/tcp/balance.rs @@ -1,10 +1,10 @@ -use super::{Concrete, Endpoint}; +use super::{Concrete, Endpoint, Logical}; use crate::resolve; use linkerd_app_core::{ config::ProxyConfig, drain, io, metrics, proxy::{api_resolve::Metadata, core::Resolve, tcp}, - svc, Addr, Conditional, Error, + svc, tls, Addr, Conditional, Error, }; use tracing::debug_span; @@ -16,9 +16,8 @@ pub fn stack( metrics: &metrics::Proxy, drain: drain::Watch, ) -> impl svc::NewService< - Concrete, - Service = impl svc::Service - + svc::Service, Response = (), Error = Error, Future = impl Send>, + (Option, Logical), + Service = impl svc::Service, > + Clone where I: io::AsyncRead + io::AsyncWrite + std::fmt::Debug + Send + Unpin + 'static, @@ -26,11 +25,11 @@ where C::Response: io::AsyncRead + io::AsyncWrite + Send + Unpin, C::Error: Into, C::Future: Send, - R: Resolve + Clone + 'static, + R: Resolve + Clone + Send + 'static, R::Resolution: Send, - R::Future: Send, + R::Future: Send + Unpin, { - svc::stack(connect) + svc::stack(connect.clone()) .push_make_thunk() .instrument(|t: &Endpoint| match t.tls.as_ref() { Conditional::Some(tls) => { @@ -49,7 +48,27 @@ where )) .push(metrics.stack.layer(crate::stack_labels("tcp", "balancer"))) .push(tcp::Forward::layer()) - .push(drain::Retain::layer(drain)), + .push(drain::Retain::layer(drain.clone())), ) .into_new_service() + .push_map_target(Concrete::from) + // If there's no resolveable address, bypass the load balancer. + .push(svc::UnwrapOr::layer( + svc::stack(connect) + .push_make_thunk() + .push(svc::MapErrLayer::new(Into::into)) + .instrument(|t: &Endpoint| debug_span!("tcp.forward", server.addr = %t.addr)) + .push_on_response( + svc::layers() + .push(tcp::Forward::layer()) + .push(drain::Retain::layer(drain)), + ) + .into_new_service() + .push_map_target(Endpoint::from_logical( + tls::NoClientTls::NotProvidedByServiceDiscovery, + )) + .into_inner(), + )) + .check_new_service::<(Option, Logical), I>() + .into_inner() } diff --git a/linkerd/app/outbound/src/tcp/opaque_transport.rs b/linkerd/app/outbound/src/tcp/opaque_transport.rs index cfb2e3abf8..95fa8cb514 100644 --- a/linkerd/app/outbound/src/tcp/opaque_transport.rs +++ b/linkerd/app/outbound/src/tcp/opaque_transport.rs @@ -112,7 +112,7 @@ where #[cfg(test)] mod test { use super::*; - use crate::target::{Concrete, Endpoint, Logical}; + use crate::target::{Endpoint, Logical}; use futures::future; use linkerd_app_core::{ io::{self, AsyncWriteExt}, @@ -131,13 +131,10 @@ mod test { target_addr: ([127, 0, 0, 2], 4321).into(), tls: Conditional::None(tls::NoClientTls::NotProvidedByServiceDiscovery), metadata, - concrete: Concrete { - resolve: None, - logical: Logical { - orig_dst: ([127, 0, 0, 2], 4321).into(), - profile: None, - protocol: (), - }, + logical: Logical { + orig_dst: ([127, 0, 0, 2], 4321).into(), + profile: None, + protocol: (), }, } } @@ -172,7 +169,7 @@ mod test { inner: service_fn(|ep: Endpoint<()>| { assert_eq!(ep.addr.port(), 4143); let hdr = TransportHeader { - port: ep.concrete.logical.orig_dst.port(), + port: ep.logical.orig_dst.port(), name: None, protocol: None, }; @@ -242,7 +239,7 @@ mod test { inner: service_fn(|ep: Endpoint<()>| { assert_eq!(ep.addr.port(), 4143); let hdr = TransportHeader { - port: ep.concrete.logical.orig_dst.port(), + port: ep.logical.orig_dst.port(), name: None, protocol: None, }; diff --git a/linkerd/app/outbound/src/tcp/tests.rs b/linkerd/app/outbound/src/tcp/tests.rs index dbbf5d6589..bca6d66e47 100644 --- a/linkerd/app/outbound/src/tcp/tests.rs +++ b/linkerd/app/outbound/src/tcp/tests.rs @@ -1,4 +1,4 @@ -use super::{Concrete, Endpoint, Logical}; +use super::{Endpoint, Logical}; use crate::test_util::{ support::{ connect::{Connect, ConnectFuture}, @@ -33,13 +33,10 @@ async fn plaintext_tcp() { // ports or anything. These will just be used so that the proxy has a socket // address to resolve, etc. let target_addr = SocketAddr::new([0, 0, 0, 0].into(), 666); - let concrete = Concrete { - logical: Logical { - orig_dst: target_addr, - profile: Some(profile::only_default()), - protocol: (), - }, - resolve: Some(target_addr.into()), + let logical = Logical { + orig_dst: target_addr, + profile: Some(profile::only_default()), + protocol: (), }; let cfg = default_config(target_addr); @@ -59,17 +56,14 @@ async fn plaintext_tcp() { // Configure the mock destination resolver to just give us a single endpoint // for the target, which always exists and has no metadata. - let resolver = support::resolver().endpoint_exists( - concrete.resolve.clone().unwrap(), - target_addr, - support::resolver::Metadata::default(), - ); + let resolver = + support::resolver().endpoint_exists(target_addr, target_addr, Default::default()); // Build the outbound TCP balancer stack. let (_, drain) = drain::channel(); let (metrics, _) = metrics::Metrics::new(Duration::from_secs(10)); let forward = super::balance::stack(&cfg.proxy, connect, resolver, &metrics.outbound, drain) - .new_service(concrete); + .new_service((Some(target_addr.into()), logical)); forward .oneshot(client_io) @@ -83,23 +77,17 @@ async fn tls_when_hinted() { let _trace = support::trace_init(); let tls_addr = SocketAddr::new([0, 0, 0, 0].into(), 5550); - let tls_concrete = Concrete { - logical: Logical { - orig_dst: tls_addr, - profile: Some(profile::only_default()), - protocol: (), - }, - resolve: Some(tls_addr.into()), + let tls_logical = Logical { + orig_dst: tls_addr, + profile: Some(profile::only_default()), + protocol: (), }; let plain_addr = SocketAddr::new([0, 0, 0, 0].into(), 5551); - let plain_concrete = Concrete { - logical: Logical { - orig_dst: tls_addr, - profile: Some(profile::only_default()), - protocol: (), - }, - resolve: Some(plain_addr.into()), + let plain_logical = Logical { + orig_dst: tls_addr, + profile: Some(profile::only_default()), + protocol: (), }; let cfg = default_config(plain_addr); @@ -135,12 +123,8 @@ async fn tls_when_hinted() { // Configure the mock destination resolver to just give us a single endpoint // for the target, which always exists and has no metadata. let resolver = support::resolver() - .endpoint_exists( - plain_concrete.resolve.clone().unwrap(), - plain_addr, - support::resolver::Metadata::default(), - ) - .endpoint_exists(tls_concrete.resolve.clone().unwrap(), tls_addr, tls_meta); + .endpoint_exists(plain_addr, plain_addr, Default::default()) + .endpoint_exists(tls_addr, tls_addr, tls_meta); // Configure mock IO for the "client". let mut client_io = support::io(); @@ -153,11 +137,11 @@ async fn tls_when_hinted() { super::balance::stack(&cfg.proxy, connect, resolver, &metrics.outbound, drain); let plain = balance - .new_service(plain_concrete) + .new_service((Some(plain_addr.into()), plain_logical)) .oneshot(client_io.build()) .err_into::(); let tls = balance - .new_service(tls_concrete) + .new_service((Some(tls_addr.into()), tls_logical)) .oneshot(client_io.build()) .err_into::();