diff --git a/Cargo.lock b/Cargo.lock index 16cd497953..a416da3aa3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1730,6 +1730,7 @@ dependencies = [ "http-body", "linkerd-addr", "linkerd-error", + "linkerd-http-h2", "linkerd-identity", "linkerd-proxy-core", "linkerd-stack", diff --git a/linkerd/app/outbound/src/http/concrete.rs b/linkerd/app/outbound/src/http/concrete.rs index edb61d2bd9..1dcc9468fb 100644 --- a/linkerd/app/outbound/src/http/concrete.rs +++ b/linkerd/app/outbound/src/http/concrete.rs @@ -122,6 +122,7 @@ impl Outbound { } Dispatch::Forward(addr, metadata) => svc::Either::A(svc::Either::B({ let is_local = inbound_ips.contains(&addr.ip()); + let http2 = http2.override_from(metadata.http2_client_params()); Endpoint { is_local, addr, @@ -129,9 +130,8 @@ impl Outbound { parent, queue, close_server_connection_on_remote_proxy_error: true, - // TODO(ver) read these from metadata. http1, - http2: http2.clone(), + http2, } })), Dispatch::Fail { message } => svc::Either::B(message), diff --git a/linkerd/app/outbound/src/http/concrete/balance.rs b/linkerd/app/outbound/src/http/concrete/balance.rs index 9b9e44f551..27d6b5d127 100644 --- a/linkerd/app/outbound/src/http/concrete/balance.rs +++ b/linkerd/app/outbound/src/http/concrete/balance.rs @@ -118,6 +118,7 @@ where move |((addr, metadata), target): ((SocketAddr, Metadata), Self)| { tracing::trace!(%addr, ?metadata, ?target, "Resolved endpoint"); let is_local = inbound_ips.contains(&addr.ip()); + let http2 = http2.override_from(metadata.http2_client_params()); Endpoint { addr: Remote(ServerAddr(addr)), metadata, @@ -130,7 +131,7 @@ where close_server_connection_on_remote_proxy_error: false, // TODO(ver) Configure from metadata. http1, - http2: http2.clone(), + http2, } } }) diff --git a/linkerd/http/h2/src/lib.rs b/linkerd/http/h2/src/lib.rs index 9aea08f001..6bbc4244ff 100644 --- a/linkerd/http/h2/src/lib.rs +++ b/linkerd/http/h2/src/lib.rs @@ -45,3 +45,19 @@ pub enum FlowControl { initial_connection_window_size: u32, }, } + +// === impl ClientParams === + +impl ClientParams { + pub fn override_from(&self, overrides: &Self) -> Self { + Self { + flow_control: overrides.flow_control.or(self.flow_control), + keep_alive: overrides.keep_alive.or(self.keep_alive), + max_concurrent_reset_streams: overrides + .max_concurrent_reset_streams + .or(self.max_concurrent_reset_streams), + max_frame_size: overrides.max_frame_size.or(self.max_frame_size), + max_send_buf_size: overrides.max_send_buf_size.or(self.max_send_buf_size), + } + } +} diff --git a/linkerd/proxy/api-resolve/Cargo.toml b/linkerd/proxy/api-resolve/Cargo.toml index 999f3d99e9..449a6b12f1 100644 --- a/linkerd/proxy/api-resolve/Cargo.toml +++ b/linkerd/proxy/api-resolve/Cargo.toml @@ -11,10 +11,11 @@ Implements the Resolve trait using the proxy's gRPC API [dependencies] futures = { version = "0.3", default-features = false } +linkerd2-proxy-api = { version = "0.13", features = ["destination"] } linkerd-addr = { path = "../../addr" } linkerd-error = { path = "../../error" } -linkerd2-proxy-api = { version = "0.13", features = ["destination"] } linkerd-proxy-core = { path = "../core" } +linkerd-http-h2 = { path = "../../http/h2" } linkerd-stack = { path = "../../stack" } linkerd-tonic-stream = { path = "../../tonic-stream" } linkerd-tls = { path = "../../tls" } diff --git a/linkerd/proxy/api-resolve/src/metadata.rs b/linkerd/proxy/api-resolve/src/metadata.rs index cd9ce1d6a8..a1cc2b0e3f 100644 --- a/linkerd/proxy/api-resolve/src/metadata.rs +++ b/linkerd/proxy/api-resolve/src/metadata.rs @@ -1,4 +1,5 @@ use http::uri::Authority; +use linkerd_http_h2::ClientParams as HTTP2ClientParams; use linkerd_tls::client::ClientTls; use std::collections::BTreeMap; @@ -24,6 +25,8 @@ pub struct Metadata { /// Used to override the the authority if needed authority_override: Option, + + http2: HTTP2ClientParams, } #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] @@ -49,6 +52,7 @@ impl Default for Metadata { authority_override: None, tagged_transport_port: None, protocol_hint: ProtocolHint::Unknown, + http2: HTTP2ClientParams::default(), } } } @@ -61,6 +65,7 @@ impl Metadata { identity: Option, authority_override: Option, weight: u32, + http2: HTTP2ClientParams, ) -> Self { Self { labels: labels.into_iter().collect::>().into(), @@ -69,6 +74,7 @@ impl Metadata { identity, authority_override, weight, + http2, } } @@ -96,4 +102,8 @@ impl Metadata { pub fn authority_override(&self) -> Option<&Authority> { self.authority_override.as_ref() } + + pub fn http2_client_params(&self) -> &HTTP2ClientParams { + &self.http2 + } } diff --git a/linkerd/proxy/api-resolve/src/pb.rs b/linkerd/proxy/api-resolve/src/pb.rs index 021caef1b3..7bef87afe3 100644 --- a/linkerd/proxy/api-resolve/src/pb.rs +++ b/linkerd/proxy/api-resolve/src/pb.rs @@ -1,7 +1,7 @@ use crate::{ api::destination::{ protocol_hint::{OpaqueTransport, Protocol}, - AuthorityOverride, TlsIdentity, WeightedAddr, + AuthorityOverride, Http2ClientParams, TlsIdentity, WeightedAddr, }, api::net::TcpAddress, metadata::{Metadata, ProtocolHint}, @@ -9,7 +9,7 @@ use crate::{ use http::uri::Authority; use linkerd_identity::Id; use linkerd_tls::{client::ServerId, ClientTls, ServerName}; -use std::{collections::HashMap, net::SocketAddr, str::FromStr}; +use std::{collections::HashMap, net::SocketAddr, str::FromStr, time::Duration}; /// Construct a new labeled `SocketAddr `from a protobuf `WeightedAddr`. pub fn to_addr_meta( @@ -41,6 +41,8 @@ pub fn to_addr_meta( } let tls_id = pb.tls_identity.and_then(to_identity); + let http2 = pb.http2.map(to_http2_client_params).unwrap_or_default(); + let meta = Metadata::new( labels, proto_hint, @@ -48,6 +50,7 @@ pub fn to_addr_meta( tls_id, authority_override, pb.weight, + http2, ); Some((addr, meta)) } @@ -146,6 +149,60 @@ pub(crate) fn to_sock_addr(pb: TcpAddress) -> Option { } } +fn to_http2_client_params(pb: Http2ClientParams) -> linkerd_http_h2::ClientParams { + use linkerd_http_h2 as h2; + + h2::ClientParams { + flow_control: pb.flow_control.and_then(|pb| { + if pb.adaptive_flow_control { + return Some(h2::FlowControl::Adaptive); + } + let initial_connection_window_size = if pb.initial_connection_window_size > 0 { + pb.initial_connection_window_size + } else { + return None; + }; + let initial_stream_window_size = if pb.initial_stream_window_size > 0 { + pb.initial_stream_window_size + } else { + return None; + }; + Some(h2::FlowControl::Fixed { + initial_connection_window_size, + initial_stream_window_size, + }) + }), + keep_alive: pb.keep_alive.and_then(|pb| { + let Some(interval) = pb.interval.and_then(|pb| Duration::try_from(pb).ok()) else { + return None; + }; + let Some(timeout) = pb.timeout.and_then(|pb| Duration::try_from(pb).ok()) else { + return None; + }; + Some(h2::ClientKeepAlive { + interval, + timeout, + while_idle: pb.while_idle, + }) + }), + max_concurrent_reset_streams: pb + .internals + .as_ref() + .map(|i| i.max_concurrent_reset_streams as usize) + .filter(|n| *n > 0), + max_frame_size: pb + .internals + .as_ref() + .map(|i| i.max_frame_size) + .filter(|n| *n > 0), + max_send_buf_size: pb + .internals + .as_ref() + .map(|i| i.max_send_buf_size as usize) + .filter(|n| *n > 0), + } +} + #[cfg(test)] mod tests { use super::*; @@ -226,4 +283,114 @@ mod tests { assert_eq!(expected_identity, to_identity(pb_id)); } + + #[test] + fn http2_client_params() { + use linkerd2_proxy_api::destination::http2_client_params as pb; + use linkerd_http_h2 as h2; + + assert_eq!( + h2::ClientParams { + flow_control: Some(h2::FlowControl::Adaptive), + ..Default::default() + }, + to_http2_client_params(Http2ClientParams { + flow_control: Some(pb::FlowControl { + adaptive_flow_control: true, + initial_connection_window_size: 0, + initial_stream_window_size: 0, + }), + ..Default::default() + }), + ); + + assert_eq!( + h2::ClientParams { + flow_control: Some(h2::FlowControl::Fixed { + initial_stream_window_size: 10, + initial_connection_window_size: 100, + }), + ..Default::default() + }, + to_http2_client_params(Http2ClientParams { + flow_control: Some(pb::FlowControl { + initial_connection_window_size: 100, + initial_stream_window_size: 10, + ..Default::default() + }), + ..Default::default() + }), + ); + + assert_eq!( + h2::ClientParams::default(), + to_http2_client_params(Http2ClientParams { + flow_control: Some(pb::FlowControl { + initial_stream_window_size: 10, + ..Default::default() + }), + ..Default::default() + }), + ); + assert_eq!( + h2::ClientParams { + keep_alive: Some(h2::ClientKeepAlive { + interval: Duration::from_secs(10), + timeout: Duration::from_secs(20), + while_idle: true, + }), + ..Default::default() + }, + to_http2_client_params(Http2ClientParams { + keep_alive: Some(pb::KeepAlive { + interval: Some(Duration::from_secs(10).try_into().unwrap()), + timeout: Some(Duration::from_secs(20).try_into().unwrap()), + while_idle: true, + }), + ..Default::default() + }), + ); + + assert_eq!( + h2::ClientParams { + max_frame_size: Some(10), + ..Default::default() + }, + to_http2_client_params(Http2ClientParams { + internals: Some(pb::Internals { + max_frame_size: 10, + ..Default::default() + }), + ..Default::default() + }), + ); + + assert_eq!( + h2::ClientParams { + max_send_buf_size: Some(10), + ..Default::default() + }, + to_http2_client_params(Http2ClientParams { + internals: Some(pb::Internals { + max_send_buf_size: 10, + ..Default::default() + }), + ..Default::default() + }), + ); + + assert_eq!( + h2::ClientParams { + max_concurrent_reset_streams: Some(10), + ..Default::default() + }, + to_http2_client_params(Http2ClientParams { + internals: Some(pb::Internals { + max_concurrent_reset_streams: 10, + ..Default::default() + }), + ..Default::default() + }), + ); + } }