Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

outbound: Avoid building balancers when no concrete name #890

Merged
merged 4 commits into from
Feb 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions linkerd/app/outbound/src/http/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ where
ESvc::Future: Send,
R: Resolve<Addr, Endpoint = Metadata, Error = Error> + Clone + Send + 'static,
R::Resolution: Send,
R::Future: Send,
R::Future: Send + Unpin,
{
let ProxyConfig {
buffer_capacity,
Expand All @@ -57,7 +57,7 @@ where
// the balancer need not drive them all directly.
.push(svc::layer::mk(svc::SpawnReady::new)),
)
// Resolve the service to its endponts and balance requests over them.
// Resolve the service to its endpoints and balance requests over them.
//
// If the balancer has been empty/unavailable, eagerly fail requests.
// When the balancer is in failfast, spawn the service in a background
Expand All @@ -71,19 +71,30 @@ where
))
.push(metrics.stack.layer(stack_labels("http", "balancer")))
.push(svc::layer::mk(svc::SpawnReady::new))
.push(svc::FailFast::layer("HTTP Balancer", dispatch_timeout)),
.push(svc::FailFast::layer("HTTP Balancer", dispatch_timeout))
.push(http::BoxResponse::layer()),
)
.push(svc::MapErrLayer::new(Into::into))
// Drives the initial resolution via the service's readiness.
.into_new_service()
// The concrete address is only set when the profile could be
// resolved. Endpoint resolution is skipped when there is no
// concrete address.
.instrument(|c: &Concrete| match c.resolve.as_ref() {
None => debug_span!("concrete"),
Some(addr) => debug_span!("concrete", %addr),
})
.instrument(|c: &Concrete| debug_span!("concrete", addr = %c.resolve))
.push_map_target(Concrete::from)
// If there's no resolveable address, bypass the load balancer.
.push(svc::UnwrapOr::layer(
svc::stack(endpoint.clone())
.push_on_response(
svc::layers()
.push(http::BoxRequest::layer())
.push(http::BoxResponse::layer()),
)
.push_map_target(Endpoint::from_logical(
tls::NoClientTls::NotProvidedByServiceDiscovery,
))
.into_inner(),
Comment on lines +87 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIOLI: should this stack have its own debug span? no_concrete or something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The endpoint stack is annotated, so it should be pretty obvious if the spans are just logical>endpoint without a concrete span

))
// Distribute requests over a distribution of balancers via a traffic
// split.
//
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/outbound/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl Param<normalize_uri::DefaultAuthority> for Logical {

impl Param<client::Settings> for Endpoint {
fn param(&self) -> client::Settings {
match self.concrete.logical.protocol {
match self.logical.protocol {
Version::H2 => client::Settings::H2,
Version::Http1 => match self.metadata.protocol_hint() {
ProtocolHint::Unknown => client::Settings::Http1,
Expand All @@ -97,7 +97,7 @@ impl Param<client::Settings> for Endpoint {

impl Param<Option<SessionProtocol>> for Endpoint {
fn param(&self) -> Option<SessionProtocol> {
match self.concrete.logical.protocol {
match self.logical.protocol {
Version::H2 => Some(SessionProtocol::Http2),
Version::Http1 => match self.metadata.protocol_hint() {
ProtocolHint::Http2 => Some(SessionProtocol::Http2),
Expand Down
5 changes: 2 additions & 3 deletions linkerd/app/outbound/src/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ where
let accept = crate::server::accept_stack(
&cfg,
profiles,
support::connect::NoRawTcp,
NoTcpBalancer,
router,
metrics.outbound.clone(),
Expand All @@ -111,9 +110,9 @@ where
#[derive(Clone, Debug)]
struct NoTcpBalancer;

impl svc::NewService<crate::tcp::Concrete> for NoTcpBalancer {
impl<T: std::fmt::Debug> svc::NewService<T> for NoTcpBalancer {
type Service = Self;
fn new_service(&mut self, target: crate::tcp::Concrete) -> Self::Service {
fn new_service(&mut self, target: T) -> Self::Service {
panic!(
"no TCP load balancer should be created in this test!\n\ttarget = {:?}",
target
Expand Down
63 changes: 7 additions & 56 deletions linkerd/app/outbound/src/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,19 @@ use std::{
time::Duration,
};

type ResolveStack<R> = map_endpoint::Resolve<
EndpointFromMetadata,
RecoverDefault<Filter<ResolveService<R>, AllowResolve>>,
>;
type ResolveStack<R> =
map_endpoint::Resolve<EndpointFromMetadata, Filter<ResolveService<R>, ToAddr>>;

fn new_resolve<T, R>(resolve: R) -> ResolveStack<R>
where
T: Clone + Param<std::net::SocketAddr>,
EndpointFromMetadata: map_endpoint::MapEndpoint<T, Metadata>,
R: Resolve<Addr, Endpoint = Metadata>,
R::Future: Send,
R::Resolution: Send,
{
map_endpoint::Resolve::new(
EndpointFromMetadata,
RecoverDefault(Filter::new(resolve.into_service(), AllowResolve)),
Filter::new(resolve.into_service(), ToAddr),
)
}

Expand All @@ -50,7 +47,6 @@ pub fn layer<T, E, R, N>(
watchdog: Duration,
) -> impl layer::Layer<N, Service = Stack<E, R, N>> + Clone
where
T: Clone + Param<std::net::SocketAddr>,
R: Resolve<Addr, Endpoint = Metadata> + Clone,
R::Resolution: Send,
R::Future: Send,
Expand All @@ -68,59 +64,14 @@ where
}

#[derive(Clone, Debug)]
pub struct AllowResolve;
pub struct ToAddr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth a comment explaining what this does?


/// Wraps a `Resolve` to produce a default resolution when the resolution is
/// rejected.
#[derive(Clone, Debug)]
pub struct RecoverDefault<S>(S);

// === impl AllowResolve ===
// === impl ToAddr ===

impl<P> Predicate<Concrete<P>> for AllowResolve {
impl<P> Predicate<Concrete<P>> for ToAddr {
type Request = Addr;

fn check(&mut self, target: Concrete<P>) -> Result<Addr, Error> {
target.resolve.ok_or_else(|| discovery_rejected().into())
}
}

// === impl RecoverDefault ===

type Resolution<R> =
future::Either<R, stream::Once<future::Ready<Result<Update<Metadata>, Error>>>>;

impl<T, S> tower::Service<T> for RecoverDefault<S>
where
T: Param<std::net::SocketAddr>,
S: Resolve<T, Endpoint = Metadata, Error = Error>,
S::Future: Send + 'static,
S::Resolution: Send + 'static,
{
type Response = Resolution<S::Resolution>;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Error>> + Send + 'static>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
Poll::Ready(futures::ready!(self.0.poll_ready(cx)).map_err(Into::into))
}

fn call(&mut self, t: T) -> Self::Future {
let addr = t.param();
Box::pin(
self.0
.resolve(t)
.map_ok(future::Either::Left)
.or_else(move |error| {
if is_discovery_rejected(&*error) {
tracing::debug!(%error, %addr, "Synthesizing endpoint");
let endpoint = (addr, S::Endpoint::default());
let res = stream::once(future::ok(Update::Reset(vec![endpoint])));
return future::ok(future::Either::Right(res));
}

future::err(error)
}),
)
Ok(target.resolve)
}
}
59 changes: 20 additions & 39 deletions linkerd/app/outbound/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use linkerd_app_core::{
profiles,
proxy::{api_resolve::Metadata, core::resolve::Resolve},
spans::SpanConverter,
svc, tls,
svc,
transport::{listen, metrics::SensorIo},
Addr, Error, IpMatch, TraceContext,
};
Expand All @@ -33,7 +33,7 @@ where
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + std::fmt::Debug + Send + Unpin + 'static,
R: Resolve<Addr, Endpoint = Metadata, Error = Error> + Clone + Send + 'static,
R::Resolution: Send,
R::Future: Send,
R::Future: Send + Unpin,
C: svc::Service<tcp::Endpoint> + Clone + Send + 'static,
C::Response: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin,
C::Error: Into<Error>,
Expand All @@ -48,18 +48,11 @@ where
P::Future: Send,
P::Error: Send,
{
let tcp_balance = tcp::balance::stack(
&config.proxy,
tcp_connect.clone(),
resolve,
&metrics,
drain.clone(),
);
let tcp = tcp::balance::stack(&config.proxy, tcp_connect, resolve, &metrics, drain.clone());
let accept = accept_stack(
config,
profiles,
tcp_connect,
tcp_balance,
tcp,
http_router,
metrics.clone(),
span_sink,
Expand Down Expand Up @@ -106,11 +99,10 @@ where
.into_inner()
}

pub fn accept_stack<P, C, T, TSvc, H, HSvc, I>(
pub fn accept_stack<P, T, TSvc, H, HSvc, I>(
config: &Config,
profiles: P,
tcp_connect: C,
tcp_balance: T,
tcp: T,
http_router: H,
metrics: metrics::Proxy,
span_sink: Option<mpsc::Sender<oc::Span>>,
Expand All @@ -121,19 +113,10 @@ pub fn accept_stack<P, C, T, TSvc, H, HSvc, I>(
> + Clone
where
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + std::fmt::Debug + Send + Unpin + 'static,
C: svc::Service<tcp::Endpoint> + Clone + Send + 'static,
C::Response: io::AsyncRead + io::AsyncWrite + Send + Unpin,
C::Error: Into<Error>,
C::Future: Send,
T: svc::NewService<tcp::Concrete, Service = TSvc> + Clone + Send + 'static,
TSvc: svc::Service<io::PrefixedIo<I>, Response = ()>
+ svc::Service<I, Response = ()>
+ Send
+ 'static,
<TSvc as svc::Service<I>>::Error: Into<Error>,
<TSvc as svc::Service<I>>::Future: Send,
<TSvc as svc::Service<io::PrefixedIo<I>>>::Error: Into<Error>,
<TSvc as svc::Service<io::PrefixedIo<I>>>::Future: Send,
T: svc::NewService<(Option<Addr>, tcp::Logical), Service = TSvc> + Clone + Send + 'static,
TSvc: svc::Service<io::EitherIo<I, io::PrefixedIo<I>>, Response = ()> + Send + 'static,
TSvc::Error: Into<Error>,
TSvc::Future: Send,
H: svc::NewService<http::Logical, Service = HSvc> + Clone + Send + 'static,
HSvc: svc::Service<http::Request<http::BoxBody>, Response = http::Response<http::BoxBody>>
+ Send
Expand All @@ -154,14 +137,8 @@ where
..
} = config.proxy.clone();

let tcp_forward = svc::stack(tcp_connect)
.push_make_thunk()
.push_on_response(tcp::Forward::layer())
.push(svc::MapErrLayer::new(Into::into))
.into_new_service()
.push_map_target(tcp::Endpoint::from_logical(
tls::NoClientTls::NotProvidedByServiceDiscovery,
))
let tcp_forward = svc::stack(tcp.clone())
.push_map_target(|l| (None, l))
.into_inner();

svc::stack(http_router)
Expand Down Expand Up @@ -199,12 +176,12 @@ where
// When an HTTP version cannot be detected, we fallback to a logical
// TCP stack. This service needs to be buffered so that it can be
// cached and cloned per connection.
svc::stack(tcp_balance.clone())
.push_map_target(tcp::Concrete::from)
svc::stack(tcp.clone())
.push(profiles::split::layer())
.push_switch(ShouldResolve, tcp_forward.clone())
.push_on_response(
svc::layers()
.push_map_target(io::EitherIo::Right)
.push(metrics.stack.layer(stack_labels("tcp", "logical")))
.push(svc::layer::mk(svc::SpawnReady::new))
.push(svc::FailFast::layer("TCP Logical", dispatch_timeout))
Expand All @@ -226,10 +203,14 @@ where
// detection and just use the TCP logical stack directly. Unlike the
// above case, this stack need not be buffered, since `fn cache`
// applies its own buffer on the returned service.
svc::stack(tcp_balance)
.push_map_target(tcp::Concrete::from)
svc::stack(tcp)
.push(profiles::split::layer())
.push_switch(ShouldResolve, tcp_forward)
.push_on_response(
svc::layers()
.push_map_target(io::EitherIo::Left)
.push(metrics.stack.layer(stack_labels("tcp", "passthru"))),
)
.instrument(|_: &_| debug_span!("tcp.opaque"))
.into_inner(),
)
Expand Down
Loading