Skip to content

Commit

Permalink
feat(outbound): Configure HTTP/2 client overrides from discovery (#2937)
Browse files Browse the repository at this point in the history
This commit updates the outbound proxy to read HTTP/2 client parameters from
Destination responses and to use these values to override the process-wide
defaults.
  • Loading branch information
olix0r authored Apr 30, 2024
1 parent e278b75 commit b45b6cb
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1730,6 +1730,7 @@ dependencies = [
"http-body",
"linkerd-addr",
"linkerd-error",
"linkerd-http-h2",
"linkerd-identity",
"linkerd-proxy-core",
"linkerd-stack",
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/outbound/src/http/concrete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,16 @@ impl<N> Outbound<N> {
}
Dispatch::Forward(addr, metadata) => svc::Either::A(svc::Either::B({
let is_local = inbound_ips.contains(&addr.ip());
let http2 = http2.override_from(metadata.http2_client_params());
Endpoint {
is_local,
addr,
metadata,
parent,
queue,
close_server_connection_on_remote_proxy_error: true,
// TODO(ver) read these from metadata.
http1,
http2: http2.clone(),
http2,
}
})),
Dispatch::Fail { message } => svc::Either::B(message),
Expand Down
3 changes: 2 additions & 1 deletion linkerd/app/outbound/src/http/concrete/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ where
move |((addr, metadata), target): ((SocketAddr, Metadata), Self)| {
tracing::trace!(%addr, ?metadata, ?target, "Resolved endpoint");
let is_local = inbound_ips.contains(&addr.ip());
let http2 = http2.override_from(metadata.http2_client_params());
Endpoint {
addr: Remote(ServerAddr(addr)),
metadata,
Expand All @@ -130,7 +131,7 @@ where
close_server_connection_on_remote_proxy_error: false,
// TODO(ver) Configure from metadata.
http1,
http2: http2.clone(),
http2,
}
}
})
Expand Down
16 changes: 16 additions & 0 deletions linkerd/http/h2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,19 @@ pub enum FlowControl {
initial_connection_window_size: u32,
},
}

// === impl ClientParams ===

impl ClientParams {
pub fn override_from(&self, overrides: &Self) -> Self {
Self {
flow_control: overrides.flow_control.or(self.flow_control),
keep_alive: overrides.keep_alive.or(self.keep_alive),
max_concurrent_reset_streams: overrides
.max_concurrent_reset_streams
.or(self.max_concurrent_reset_streams),
max_frame_size: overrides.max_frame_size.or(self.max_frame_size),
max_send_buf_size: overrides.max_send_buf_size.or(self.max_send_buf_size),
}
}
}
3 changes: 2 additions & 1 deletion linkerd/proxy/api-resolve/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ Implements the Resolve trait using the proxy's gRPC API

[dependencies]
futures = { version = "0.3", default-features = false }
linkerd2-proxy-api = { version = "0.13", features = ["destination"] }
linkerd-addr = { path = "../../addr" }
linkerd-error = { path = "../../error" }
linkerd2-proxy-api = { version = "0.13", features = ["destination"] }
linkerd-proxy-core = { path = "../core" }
linkerd-http-h2 = { path = "../../http/h2" }
linkerd-stack = { path = "../../stack" }
linkerd-tonic-stream = { path = "../../tonic-stream" }
linkerd-tls = { path = "../../tls" }
Expand Down
10 changes: 10 additions & 0 deletions linkerd/proxy/api-resolve/src/metadata.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use http::uri::Authority;
use linkerd_http_h2::ClientParams as HTTP2ClientParams;
use linkerd_tls::client::ClientTls;
use std::collections::BTreeMap;

Expand All @@ -24,6 +25,8 @@ pub struct Metadata {

/// Used to override the the authority if needed
authority_override: Option<Authority>,

http2: HTTP2ClientParams,
}

#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
Expand All @@ -49,6 +52,7 @@ impl Default for Metadata {
authority_override: None,
tagged_transport_port: None,
protocol_hint: ProtocolHint::Unknown,
http2: HTTP2ClientParams::default(),
}
}
}
Expand All @@ -61,6 +65,7 @@ impl Metadata {
identity: Option<ClientTls>,
authority_override: Option<Authority>,
weight: u32,
http2: HTTP2ClientParams,
) -> Self {
Self {
labels: labels.into_iter().collect::<BTreeMap<_, _>>().into(),
Expand All @@ -69,6 +74,7 @@ impl Metadata {
identity,
authority_override,
weight,
http2,
}
}

Expand Down Expand Up @@ -96,4 +102,8 @@ impl Metadata {
pub fn authority_override(&self) -> Option<&Authority> {
self.authority_override.as_ref()
}

pub fn http2_client_params(&self) -> &HTTP2ClientParams {
&self.http2
}
}
171 changes: 169 additions & 2 deletions linkerd/proxy/api-resolve/src/pb.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use crate::{
api::destination::{
protocol_hint::{OpaqueTransport, Protocol},
AuthorityOverride, TlsIdentity, WeightedAddr,
AuthorityOverride, Http2ClientParams, TlsIdentity, WeightedAddr,
},
api::net::TcpAddress,
metadata::{Metadata, ProtocolHint},
};
use http::uri::Authority;
use linkerd_identity::Id;
use linkerd_tls::{client::ServerId, ClientTls, ServerName};
use std::{collections::HashMap, net::SocketAddr, str::FromStr};
use std::{collections::HashMap, net::SocketAddr, str::FromStr, time::Duration};

/// Construct a new labeled `SocketAddr `from a protobuf `WeightedAddr`.
pub fn to_addr_meta(
Expand Down Expand Up @@ -41,13 +41,16 @@ pub fn to_addr_meta(
}

let tls_id = pb.tls_identity.and_then(to_identity);
let http2 = pb.http2.map(to_http2_client_params).unwrap_or_default();

let meta = Metadata::new(
labels,
proto_hint,
tagged_transport_port,
tls_id,
authority_override,
pb.weight,
http2,
);
Some((addr, meta))
}
Expand Down Expand Up @@ -146,6 +149,60 @@ pub(crate) fn to_sock_addr(pb: TcpAddress) -> Option<SocketAddr> {
}
}

fn to_http2_client_params(pb: Http2ClientParams) -> linkerd_http_h2::ClientParams {
use linkerd_http_h2 as h2;

h2::ClientParams {
flow_control: pb.flow_control.and_then(|pb| {
if pb.adaptive_flow_control {
return Some(h2::FlowControl::Adaptive);
}
let initial_connection_window_size = if pb.initial_connection_window_size > 0 {
pb.initial_connection_window_size
} else {
return None;
};
let initial_stream_window_size = if pb.initial_stream_window_size > 0 {
pb.initial_stream_window_size
} else {
return None;
};
Some(h2::FlowControl::Fixed {
initial_connection_window_size,
initial_stream_window_size,
})
}),
keep_alive: pb.keep_alive.and_then(|pb| {
let Some(interval) = pb.interval.and_then(|pb| Duration::try_from(pb).ok()) else {
return None;
};
let Some(timeout) = pb.timeout.and_then(|pb| Duration::try_from(pb).ok()) else {
return None;
};
Some(h2::ClientKeepAlive {
interval,
timeout,
while_idle: pb.while_idle,
})
}),
max_concurrent_reset_streams: pb
.internals
.as_ref()
.map(|i| i.max_concurrent_reset_streams as usize)
.filter(|n| *n > 0),
max_frame_size: pb
.internals
.as_ref()
.map(|i| i.max_frame_size)
.filter(|n| *n > 0),
max_send_buf_size: pb
.internals
.as_ref()
.map(|i| i.max_send_buf_size as usize)
.filter(|n| *n > 0),
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -226,4 +283,114 @@ mod tests {

assert_eq!(expected_identity, to_identity(pb_id));
}

#[test]
fn http2_client_params() {
use linkerd2_proxy_api::destination::http2_client_params as pb;
use linkerd_http_h2 as h2;

assert_eq!(
h2::ClientParams {
flow_control: Some(h2::FlowControl::Adaptive),
..Default::default()
},
to_http2_client_params(Http2ClientParams {
flow_control: Some(pb::FlowControl {
adaptive_flow_control: true,
initial_connection_window_size: 0,
initial_stream_window_size: 0,
}),
..Default::default()
}),
);

assert_eq!(
h2::ClientParams {
flow_control: Some(h2::FlowControl::Fixed {
initial_stream_window_size: 10,
initial_connection_window_size: 100,
}),
..Default::default()
},
to_http2_client_params(Http2ClientParams {
flow_control: Some(pb::FlowControl {
initial_connection_window_size: 100,
initial_stream_window_size: 10,
..Default::default()
}),
..Default::default()
}),
);

assert_eq!(
h2::ClientParams::default(),
to_http2_client_params(Http2ClientParams {
flow_control: Some(pb::FlowControl {
initial_stream_window_size: 10,
..Default::default()
}),
..Default::default()
}),
);
assert_eq!(
h2::ClientParams {
keep_alive: Some(h2::ClientKeepAlive {
interval: Duration::from_secs(10),
timeout: Duration::from_secs(20),
while_idle: true,
}),
..Default::default()
},
to_http2_client_params(Http2ClientParams {
keep_alive: Some(pb::KeepAlive {
interval: Some(Duration::from_secs(10).try_into().unwrap()),
timeout: Some(Duration::from_secs(20).try_into().unwrap()),
while_idle: true,
}),
..Default::default()
}),
);

assert_eq!(
h2::ClientParams {
max_frame_size: Some(10),
..Default::default()
},
to_http2_client_params(Http2ClientParams {
internals: Some(pb::Internals {
max_frame_size: 10,
..Default::default()
}),
..Default::default()
}),
);

assert_eq!(
h2::ClientParams {
max_send_buf_size: Some(10),
..Default::default()
},
to_http2_client_params(Http2ClientParams {
internals: Some(pb::Internals {
max_send_buf_size: 10,
..Default::default()
}),
..Default::default()
}),
);

assert_eq!(
h2::ClientParams {
max_concurrent_reset_streams: Some(10),
..Default::default()
},
to_http2_client_params(Http2ClientParams {
internals: Some(pb::Internals {
max_concurrent_reset_streams: 10,
..Default::default()
}),
..Default::default()
}),
);
}
}

0 comments on commit b45b6cb

Please sign in to comment.