Skip to content

Commit

Permalink
chore(http): allow http client configuration to vary by endpoint (#2935)
Browse files Browse the repository at this point in the history
Currently all HTTP client configuration is derived from the configuration loaded
at startup.

We'd like to allow this configuration to vary by endpoint so that, for example,
we can enable keepalives when communicating with other proxies. These keepalives
allow load balancers to eagerly detect connections that are in a bad state (i.e.
before attempting to write a request to that connection).

To setup this kind of per-endpoint configuration, this change modifies the HTTP
client to extract its configuration from the per-endpoint target so that calling
stacks can provide this configuration.

The inbound and outbound stacks continue to set these configurations based on
process-wide defaults, but we now have obvious places where we can set these
parameters based on discovery results.

There are no user-facing functional changes in this commit.
  • Loading branch information
olix0r authored Apr 30, 2024
1 parent 0b175c3 commit b3abab1
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 78 deletions.
36 changes: 17 additions & 19 deletions linkerd/app/inbound/src/http/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tracing::{debug, debug_span};
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct Http {
addr: Remote<ServerAddr>,
settings: http::client::Settings,
params: http::client::Params,
permit: policy::HttpRoutePermit,
}

Expand Down Expand Up @@ -83,6 +83,8 @@ impl<C> Inbound<C> {
{
self.map_stack(|config, rt, connect| {
let allow_profile = config.allow_discovery.clone();
let h1_params = config.proxy.connect.http1;
let h2_params = config.proxy.connect.http2.clone();

// Creates HTTP clients for each inbound port & HTTP settings.
let http = connect
Expand All @@ -93,15 +95,21 @@ impl<C> Inbound<C> {
.push(transport::metrics::Client::layer(rt.metrics.proxy.transport.clone()))
.check_service::<Http>()
.push_map_target(|(_version, target)| target)
.push(http::client::layer(
config.proxy.connect.http1,
config.proxy.connect.http2.clone(),
))
.push(http::client::layer())
.check_service::<Http>()
.push_on_service(svc::MapErr::layer_boxed())
.into_new_service()
.push_new_reconnect(config.proxy.connect.backoff)
.push_map_target(Http::from)
.push_map_target(move |t: Logical| {
Http {
addr: t.addr,
permit: t.permit,
params: match t.http {
http::Version::Http1 => http::client::Params::Http1(h1_params),
http::Version::H2 => http::client::Params::H2(h2_params.clone())
},
}
})
// Handle connection-level errors eagerly so that we can report 5XX failures in tap
// and metrics. HTTP error metrics are not incremented here so that errors are not
// double-counted--i.e., endpoint metrics track these responses and error metrics
Expand Down Expand Up @@ -434,19 +442,9 @@ impl Param<Remote<ServerAddr>> for Http {
}
}

impl Param<http::client::Settings> for Http {
fn param(&self) -> http::client::Settings {
self.settings
}
}

impl From<Logical> for Http {
fn from(l: Logical) -> Self {
Self {
addr: l.addr,
settings: l.http.into(),
permit: l.permit,
}
impl Param<http::client::Params> for Http {
fn param(&self) -> http::client::Params {
self.params.clone()
}
}

Expand Down
26 changes: 19 additions & 7 deletions linkerd/app/outbound/src/http/concrete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use super::{balance::EwmaConfig, client, handle_proxy_error_headers};
use crate::{http, stack_labels, BackendRef, Outbound, ParentRef};
use linkerd_app_core::{
config::QueueConfig,
config::{ConnectConfig, QueueConfig},
metrics::{prefix_labels, EndpointLabels, OutboundEndpointLabels},
profiles,
proxy::{
Expand Down Expand Up @@ -46,6 +46,9 @@ pub struct Endpoint<T> {
parent: T,
queue: QueueConfig,
close_server_connection_on_remote_proxy_error: bool,

http1: http::h1::PoolSettings,
http2: http::h2::ClientParams,
}

// === impl Outbound ===
Expand Down Expand Up @@ -100,6 +103,8 @@ impl<N> Outbound<N> {
svc::mk(move |_| futures::future::ready(Err(DispatcherFailed(message.clone()))))
});

let ConnectConfig { http1, http2, .. } = config.proxy.connect.clone();

inner
.push(balance::Balance::layer(config, rt, resolve))
.check_new_clone()
Expand All @@ -124,6 +129,9 @@ impl<N> Outbound<N> {
parent,
queue,
close_server_connection_on_remote_proxy_error: true,
// TODO(ver) read these from metadata.
http1,
http2: http2.clone(),
}
})),
Dispatch::Fail { message } => svc::Either::B(message),
Expand Down Expand Up @@ -258,25 +266,29 @@ where
}
}

impl<T> svc::Param<client::Settings> for Endpoint<T>
impl<T> svc::Param<client::Params> for Endpoint<T>
where
T: svc::Param<http::Version>,
{
fn param(&self) -> client::Settings {
fn param(&self) -> client::Params {
match self.param() {
http::Version::H2 => client::Settings::H2,
http::Version::H2 => client::Params::H2(self.http2.clone()),
http::Version::Http1 => {
// When the target is local (i.e. same as source of traffic)
// then do not perform a protocol upgrade to HTTP/2
if self.is_local {
return client::Settings::Http1;
return client::Params::Http1(self.http1);
}
match self.metadata.protocol_hint() {
// If the protocol hint is unknown or indicates that the
// endpoint's proxy will treat connections as opaque, do not
// perform a protocol upgrade to HTTP/2.
ProtocolHint::Unknown | ProtocolHint::Opaque => client::Settings::Http1,
ProtocolHint::Http2 => client::Settings::OrigProtoUpgrade,
ProtocolHint::Unknown | ProtocolHint::Opaque => {
client::Params::Http1(self.http1)
}
ProtocolHint::Http2 => {
client::Params::OrigProtoUpgrade(self.http2.clone(), self.http1)
}
}
}
}
Expand Down
10 changes: 9 additions & 1 deletion linkerd/app/outbound/src/http/concrete/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
};
use linkerd_app_core::{
classify,
config::QueueConfig,
config::{ConnectConfig, QueueConfig},
proxy::{
api_resolve::{ConcreteAddr, Metadata},
core::Resolve,
Expand Down Expand Up @@ -98,6 +98,10 @@ where
{
// TODO(ver) Configure queues from the target (i.e. from discovery).
let http_queue = config.http_request_queue;

// TODO(ver) Configure from discovery.
let ConnectConfig { http1, http2, .. } = config.proxy.connect.clone();

let inbound_ips = config.inbound_ips.clone();
let stack_metrics = rt.metrics.proxy.stack.clone();
let balance_metrics = rt.metrics.prom.http.balancer.clone();
Expand All @@ -109,6 +113,7 @@ where
svc::layer::mk(move |inner: N| {
let endpoint = svc::stack(inner)
.push_map_target({
let http2 = http2.clone();
let inbound_ips = inbound_ips.clone();
move |((addr, metadata), target): ((SocketAddr, Metadata), Self)| {
tracing::trace!(%addr, ?metadata, ?target, "Resolved endpoint");
Expand All @@ -123,6 +128,9 @@ where
// get `l5d-proxy-connection: close` response headers
// going through the balancer.
close_server_connection_on_remote_proxy_error: false,
// TODO(ver) Configure from metadata.
http1,
http2: http2.clone(),
}
}
})
Expand Down
12 changes: 4 additions & 8 deletions linkerd/app/outbound/src/http/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl<C> Outbound<C> {
pub fn push_http_tcp_client<T, B>(self) -> Outbound<svc::ArcNewHttp<T, B>>
where
// Http endpoint target.
T: svc::Param<http::client::Settings>,
T: svc::Param<http::client::Params>,
T: Clone + Send + Sync + 'static,
// Http endpoint body.
B: http::HttpBody<Error = Error> + std::fmt::Debug + Default + Send + 'static,
Expand All @@ -52,18 +52,14 @@ impl<C> Outbound<C> {
C::Metadata: Send + Unpin,
C::Future: Send + Unpin + 'static,
{
self.map_stack(|config, _, inner| {
let config::ConnectConfig {
http1, ref http2, ..
} = config.proxy.connect;

self.map_stack(|_, _, inner| {
// Initiates an HTTP client on the underlying transport. Prior-knowledge HTTP/2
// is typically used (i.e. when communicating with other proxies); though
// HTTP/1.x fallback is supported as needed.
svc::stack(inner.into_inner().into_service())
.check_service::<Connect<T>>()
.push_map_target(|(version, inner)| Connect { version, inner })
.push(http::client::layer(http1, http2.clone()))
.push(http::client::layer())
.push_on_service(svc::MapErr::layer_boxed())
.check_service::<T>()
.into_new_service()
Expand All @@ -77,7 +73,7 @@ impl<T> Outbound<svc::ArcNewHttp<T, http::BoxBody>> {
where
// Http endpoint target.
T: svc::Param<Remote<ServerAddr>>,
T: svc::Param<http::client::Settings>,
T: svc::Param<http::client::Params>,
T: svc::Param<Option<http::AuthorityOverride>>,
T: svc::Param<handle_proxy_error_headers::CloseServerConnection>,
T: svc::Param<metrics::EndpointLabels>,
Expand Down
21 changes: 16 additions & 5 deletions linkerd/app/outbound/src/http/endpoint/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,13 +318,24 @@ impl svc::Param<http::Version> for Endpoint {
}
}

impl svc::Param<http::client::Settings> for Endpoint {
fn param(&self) -> http::client::Settings {
impl svc::Param<http::client::Params> for Endpoint {
fn param(&self) -> http::client::Params {
match self.version {
http::Version::H2 => http::client::Settings::H2,
http::Version::H2 => http::client::Params::H2(Default::default()),
http::Version::Http1 => match self.hint {
ProtocolHint::Unknown | ProtocolHint::Opaque => http::client::Settings::Http1,
ProtocolHint::Http2 => http::client::Settings::OrigProtoUpgrade,
ProtocolHint::Unknown | ProtocolHint::Opaque => {
http::client::Params::Http1(http::h1::PoolSettings {
max_idle: 1,
idle_timeout: std::time::Duration::from_secs(1),
})
}
ProtocolHint::Http2 => http::client::Params::OrigProtoUpgrade(
http::h2::ClientParams::default(),
http::h1::PoolSettings {
max_idle: 1,
idle_timeout: std::time::Duration::from_secs(1),
},
),
},
}
}
Expand Down
62 changes: 25 additions & 37 deletions linkerd/proxy/http/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{h1, h2, orig_proto};
use futures::prelude::*;
use linkerd_error::{Error, Result};
use linkerd_http_box::BoxBody;
use linkerd_stack::{layer, MakeConnection, Param, Service, ServiceExt};
use linkerd_stack::{layer, ExtractParam, MakeConnection, Service, ServiceExt};
use std::{
marker::PhantomData,
pin::Pin,
Expand All @@ -18,17 +18,16 @@ use std::{
use tracing::instrument::{Instrument, Instrumented};
use tracing::{debug, debug_span};

#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub enum Settings {
Http1,
H2,
OrigProtoUpgrade,
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub enum Params {
Http1(h1::PoolSettings),
H2(h2::ClientParams),
OrigProtoUpgrade(h2::ClientParams, h1::PoolSettings),
}

pub struct MakeClient<C, B> {
pub struct MakeClient<X, C, B> {
connect: C,
h1_pool: h1::PoolSettings,
h2_params: h2::ClientParams,
params: X,
_marker: PhantomData<fn(B)>,
}

Expand All @@ -38,35 +37,28 @@ pub enum Client<C, T, B> {
OrigProtoUpgrade(orig_proto::Upgrade<C, T, B>),
}

pub fn layer<C, B>(
h1_pool: h1::PoolSettings,
h2: h2::ClientParams,
) -> impl layer::Layer<C, Service = MakeClient<C, B>> + Clone {
pub fn layer_via<X: Clone, C, B>(
params: X,
) -> impl layer::Layer<C, Service = MakeClient<X, C, B>> + Clone {
layer::mk(move |connect: C| MakeClient {
connect,
h1_pool,
h2_params: h2.clone(),
params: params.clone(),
_marker: PhantomData,
})
}

impl From<crate::Version> for Settings {
fn from(v: crate::Version) -> Self {
match v {
crate::Version::Http1 => Self::Http1,
crate::Version::H2 => Self::H2,
}
}
pub fn layer<C, B>() -> impl layer::Layer<C, Service = MakeClient<(), C, B>> + Clone {
layer_via(())
}

// === impl MakeClient ===

type MakeFuture<C, T, B> = Pin<Box<dyn Future<Output = Result<Client<C, T, B>>> + Send + 'static>>;

impl<C, T, B> tower::Service<T> for MakeClient<C, B>
impl<X, C, T, B> tower::Service<T> for MakeClient<X, C, B>
where
T: Clone + Send + Sync + 'static,
T: Param<Settings>,
X: ExtractParam<Params, T>,
C: MakeConnection<(crate::Version, T)> + Clone + Unpin + Send + Sync + 'static,
C::Connection: Unpin + Send,
C::Metadata: Send,
Expand All @@ -86,24 +78,21 @@ where

fn call(&mut self, target: T) -> Self::Future {
let connect = self.connect.clone();
let h1_pool = self.h1_pool;
let h2_params = self.h2_params.clone();
let settings = self.params.extract_param(&target);

Box::pin(async move {
let settings = target.param();
debug!(?settings, "Building HTTP client");

let client = match settings {
Settings::H2 => {
let h2 = h2::Connect::new(connect, h2_params).oneshot(target).await?;
Params::H2(params) => {
let h2 = h2::Connect::new(connect, params).oneshot(target).await?;
Client::H2(h2)
}
Settings::Http1 => Client::Http1(h1::Client::new(connect, target, h1_pool)),
Settings::OrigProtoUpgrade => {
let h2 = h2::Connect::new(connect.clone(), h2_params)
Params::Http1(params) => Client::Http1(h1::Client::new(connect, target, params)),
Params::OrigProtoUpgrade(h2params, h1params) => {
let h2 = h2::Connect::new(connect.clone(), h2params)
.oneshot(target.clone())
.await?;
let http1 = h1::Client::new(connect, target, h1_pool);
let http1 = h1::Client::new(connect, target, h1params);
Client::OrigProtoUpgrade(orig_proto::Upgrade::new(http1, h2))
}
};
Expand All @@ -113,12 +102,11 @@ where
}
}

impl<C: Clone, B> Clone for MakeClient<C, B> {
impl<X: Clone, C: Clone, B> Clone for MakeClient<X, C, B> {
fn clone(&self) -> Self {
Self {
connect: self.connect.clone(),
h1_pool: self.h1_pool,
h2_params: self.h2_params.clone(),
params: self.params.clone(),
_marker: self._marker,
}
}
Expand Down
2 changes: 1 addition & 1 deletion linkerd/proxy/http/src/h1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tracing::{debug, trace};
#[derive(Copy, Clone, Debug)]
pub struct WasAbsoluteForm(pub(crate) ());

#[derive(Copy, Clone, Debug)]
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct PoolSettings {
pub max_idle: usize,
pub idle_timeout: Duration,
Expand Down

0 comments on commit b3abab1

Please sign in to comment.