Skip to content

Commit

Permalink
outbound: Avoid building balancers when no concrete name (#890)
Browse files Browse the repository at this point in the history
The outbound balancers are always built, even when there is no
resolveable concrete address.

This change modifies the outbound `Concrete` target to require a
resolveable address. When no address is present, the balancer stack is
skipped in favor of an endpoint stack.
  • Loading branch information
olix0r authored Feb 1, 2021
1 parent 7c9e160 commit a093de7
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 189 deletions.
25 changes: 18 additions & 7 deletions linkerd/app/outbound/src/http/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ where
ESvc::Future: Send,
R: Resolve<Addr, Endpoint = Metadata, Error = Error> + Clone + Send + 'static,
R::Resolution: Send,
R::Future: Send,
R::Future: Send + Unpin,
{
let ProxyConfig {
buffer_capacity,
Expand All @@ -57,7 +57,7 @@ where
// the balancer need not drive them all directly.
.push(svc::layer::mk(svc::SpawnReady::new)),
)
// Resolve the service to its endponts and balance requests over them.
// Resolve the service to its endpoints and balance requests over them.
//
// If the balancer has been empty/unavailable, eagerly fail requests.
// When the balancer is in failfast, spawn the service in a background
Expand All @@ -71,19 +71,30 @@ where
))
.push(metrics.stack.layer(stack_labels("http", "balancer")))
.push(svc::layer::mk(svc::SpawnReady::new))
.push(svc::FailFast::layer("HTTP Balancer", dispatch_timeout)),
.push(svc::FailFast::layer("HTTP Balancer", dispatch_timeout))
.push(http::BoxResponse::layer()),
)
.push(svc::MapErrLayer::new(Into::into))
// Drives the initial resolution via the service's readiness.
.into_new_service()
// The concrete address is only set when the profile could be
// resolved. Endpoint resolution is skipped when there is no
// concrete address.
.instrument(|c: &Concrete| match c.resolve.as_ref() {
None => debug_span!("concrete"),
Some(addr) => debug_span!("concrete", %addr),
})
.instrument(|c: &Concrete| debug_span!("concrete", addr = %c.resolve))
.push_map_target(Concrete::from)
// If there's no resolveable address, bypass the load balancer.
.push(svc::UnwrapOr::layer(
svc::stack(endpoint.clone())
.push_on_response(
svc::layers()
.push(http::BoxRequest::layer())
.push(http::BoxResponse::layer()),
)
.push_map_target(Endpoint::from_logical(
tls::NoClientTls::NotProvidedByServiceDiscovery,
))
.into_inner(),
))
// Distribute requests over a distribution of balancers via a traffic
// split.
//
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/outbound/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl Param<normalize_uri::DefaultAuthority> for Logical {

impl Param<client::Settings> for Endpoint {
fn param(&self) -> client::Settings {
match self.concrete.logical.protocol {
match self.logical.protocol {
Version::H2 => client::Settings::H2,
Version::Http1 => match self.metadata.protocol_hint() {
ProtocolHint::Unknown => client::Settings::Http1,
Expand All @@ -97,7 +97,7 @@ impl Param<client::Settings> for Endpoint {

impl Param<Option<SessionProtocol>> for Endpoint {
fn param(&self) -> Option<SessionProtocol> {
match self.concrete.logical.protocol {
match self.logical.protocol {
Version::H2 => Some(SessionProtocol::Http2),
Version::Http1 => match self.metadata.protocol_hint() {
ProtocolHint::Http2 => Some(SessionProtocol::Http2),
Expand Down
5 changes: 2 additions & 3 deletions linkerd/app/outbound/src/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ where
let accept = crate::server::accept_stack(
&cfg,
profiles,
support::connect::NoRawTcp,
NoTcpBalancer,
router,
metrics.outbound.clone(),
Expand All @@ -111,9 +110,9 @@ where
#[derive(Clone, Debug)]
struct NoTcpBalancer;

impl svc::NewService<crate::tcp::Concrete> for NoTcpBalancer {
impl<T: std::fmt::Debug> svc::NewService<T> for NoTcpBalancer {
type Service = Self;
fn new_service(&mut self, target: crate::tcp::Concrete) -> Self::Service {
fn new_service(&mut self, target: T) -> Self::Service {
panic!(
"no TCP load balancer should be created in this test!\n\ttarget = {:?}",
target
Expand Down
63 changes: 7 additions & 56 deletions linkerd/app/outbound/src/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,19 @@ use std::{
time::Duration,
};

type ResolveStack<R> = map_endpoint::Resolve<
EndpointFromMetadata,
RecoverDefault<Filter<ResolveService<R>, AllowResolve>>,
>;
type ResolveStack<R> =
map_endpoint::Resolve<EndpointFromMetadata, Filter<ResolveService<R>, ToAddr>>;

fn new_resolve<T, R>(resolve: R) -> ResolveStack<R>
where
T: Clone + Param<std::net::SocketAddr>,
EndpointFromMetadata: map_endpoint::MapEndpoint<T, Metadata>,
R: Resolve<Addr, Endpoint = Metadata>,
R::Future: Send,
R::Resolution: Send,
{
map_endpoint::Resolve::new(
EndpointFromMetadata,
RecoverDefault(Filter::new(resolve.into_service(), AllowResolve)),
Filter::new(resolve.into_service(), ToAddr),
)
}

Expand All @@ -50,7 +47,6 @@ pub fn layer<T, E, R, N>(
watchdog: Duration,
) -> impl layer::Layer<N, Service = Stack<E, R, N>> + Clone
where
T: Clone + Param<std::net::SocketAddr>,
R: Resolve<Addr, Endpoint = Metadata> + Clone,
R::Resolution: Send,
R::Future: Send,
Expand All @@ -68,59 +64,14 @@ where
}

#[derive(Clone, Debug)]
pub struct AllowResolve;
pub struct ToAddr;

/// Wraps a `Resolve` to produce a default resolution when the resolution is
/// rejected.
#[derive(Clone, Debug)]
pub struct RecoverDefault<S>(S);

// === impl AllowResolve ===
// === impl ToAddr ===

impl<P> Predicate<Concrete<P>> for AllowResolve {
impl<P> Predicate<Concrete<P>> for ToAddr {
type Request = Addr;

fn check(&mut self, target: Concrete<P>) -> Result<Addr, Error> {
target.resolve.ok_or_else(|| discovery_rejected().into())
}
}

// === impl RecoverDefault ===

type Resolution<R> =
future::Either<R, stream::Once<future::Ready<Result<Update<Metadata>, Error>>>>;

impl<T, S> tower::Service<T> for RecoverDefault<S>
where
T: Param<std::net::SocketAddr>,
S: Resolve<T, Endpoint = Metadata, Error = Error>,
S::Future: Send + 'static,
S::Resolution: Send + 'static,
{
type Response = Resolution<S::Resolution>;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Error>> + Send + 'static>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
Poll::Ready(futures::ready!(self.0.poll_ready(cx)).map_err(Into::into))
}

fn call(&mut self, t: T) -> Self::Future {
let addr = t.param();
Box::pin(
self.0
.resolve(t)
.map_ok(future::Either::Left)
.or_else(move |error| {
if is_discovery_rejected(&*error) {
tracing::debug!(%error, %addr, "Synthesizing endpoint");
let endpoint = (addr, S::Endpoint::default());
let res = stream::once(future::ok(Update::Reset(vec![endpoint])));
return future::ok(future::Either::Right(res));
}

future::err(error)
}),
)
Ok(target.resolve)
}
}
59 changes: 20 additions & 39 deletions linkerd/app/outbound/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use linkerd_app_core::{
profiles,
proxy::{api_resolve::Metadata, core::resolve::Resolve},
spans::SpanConverter,
svc, tls,
svc,
transport::{listen, metrics::SensorIo},
Addr, Error, IpMatch, TraceContext,
};
Expand All @@ -33,7 +33,7 @@ where
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + std::fmt::Debug + Send + Unpin + 'static,
R: Resolve<Addr, Endpoint = Metadata, Error = Error> + Clone + Send + 'static,
R::Resolution: Send,
R::Future: Send,
R::Future: Send + Unpin,
C: svc::Service<tcp::Endpoint> + Clone + Send + 'static,
C::Response: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin,
C::Error: Into<Error>,
Expand All @@ -48,18 +48,11 @@ where
P::Future: Send,
P::Error: Send,
{
let tcp_balance = tcp::balance::stack(
&config.proxy,
tcp_connect.clone(),
resolve,
&metrics,
drain.clone(),
);
let tcp = tcp::balance::stack(&config.proxy, tcp_connect, resolve, &metrics, drain.clone());
let accept = accept_stack(
config,
profiles,
tcp_connect,
tcp_balance,
tcp,
http_router,
metrics.clone(),
span_sink,
Expand Down Expand Up @@ -106,11 +99,10 @@ where
.into_inner()
}

pub fn accept_stack<P, C, T, TSvc, H, HSvc, I>(
pub fn accept_stack<P, T, TSvc, H, HSvc, I>(
config: &Config,
profiles: P,
tcp_connect: C,
tcp_balance: T,
tcp: T,
http_router: H,
metrics: metrics::Proxy,
span_sink: Option<mpsc::Sender<oc::Span>>,
Expand All @@ -121,19 +113,10 @@ pub fn accept_stack<P, C, T, TSvc, H, HSvc, I>(
> + Clone
where
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + std::fmt::Debug + Send + Unpin + 'static,
C: svc::Service<tcp::Endpoint> + Clone + Send + 'static,
C::Response: io::AsyncRead + io::AsyncWrite + Send + Unpin,
C::Error: Into<Error>,
C::Future: Send,
T: svc::NewService<tcp::Concrete, Service = TSvc> + Clone + Send + 'static,
TSvc: svc::Service<io::PrefixedIo<I>, Response = ()>
+ svc::Service<I, Response = ()>
+ Send
+ 'static,
<TSvc as svc::Service<I>>::Error: Into<Error>,
<TSvc as svc::Service<I>>::Future: Send,
<TSvc as svc::Service<io::PrefixedIo<I>>>::Error: Into<Error>,
<TSvc as svc::Service<io::PrefixedIo<I>>>::Future: Send,
T: svc::NewService<(Option<Addr>, tcp::Logical), Service = TSvc> + Clone + Send + 'static,
TSvc: svc::Service<io::EitherIo<I, io::PrefixedIo<I>>, Response = ()> + Send + 'static,
TSvc::Error: Into<Error>,
TSvc::Future: Send,
H: svc::NewService<http::Logical, Service = HSvc> + Clone + Send + 'static,
HSvc: svc::Service<http::Request<http::BoxBody>, Response = http::Response<http::BoxBody>>
+ Send
Expand All @@ -154,14 +137,8 @@ where
..
} = config.proxy.clone();

let tcp_forward = svc::stack(tcp_connect)
.push_make_thunk()
.push_on_response(tcp::Forward::layer())
.push(svc::MapErrLayer::new(Into::into))
.into_new_service()
.push_map_target(tcp::Endpoint::from_logical(
tls::NoClientTls::NotProvidedByServiceDiscovery,
))
let tcp_forward = svc::stack(tcp.clone())
.push_map_target(|l| (None, l))
.into_inner();

svc::stack(http_router)
Expand Down Expand Up @@ -199,12 +176,12 @@ where
// When an HTTP version cannot be detected, we fallback to a logical
// TCP stack. This service needs to be buffered so that it can be
// cached and cloned per connection.
svc::stack(tcp_balance.clone())
.push_map_target(tcp::Concrete::from)
svc::stack(tcp.clone())
.push(profiles::split::layer())
.push_switch(ShouldResolve, tcp_forward.clone())
.push_on_response(
svc::layers()
.push_map_target(io::EitherIo::Right)
.push(metrics.stack.layer(stack_labels("tcp", "logical")))
.push(svc::layer::mk(svc::SpawnReady::new))
.push(svc::FailFast::layer("TCP Logical", dispatch_timeout))
Expand All @@ -226,10 +203,14 @@ where
// detection and just use the TCP logical stack directly. Unlike the
// above case, this stack need not be buffered, since `fn cache`
// applies its own buffer on the returned service.
svc::stack(tcp_balance)
.push_map_target(tcp::Concrete::from)
svc::stack(tcp)
.push(profiles::split::layer())
.push_switch(ShouldResolve, tcp_forward)
.push_on_response(
svc::layers()
.push_map_target(io::EitherIo::Left)
.push(metrics.stack.layer(stack_labels("tcp", "passthru"))),
)
.instrument(|_: &_| debug_span!("tcp.opaque"))
.into_inner(),
)
Expand Down
Loading

0 comments on commit a093de7

Please sign in to comment.