From e9bd66ba052c3e158deee83421dc39fb01c575f3 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Sat, 18 Feb 2023 20:45:45 +0000 Subject: [PATCH 01/13] outbound: Decouple outbound HTTP server from logical target --- linkerd/app/outbound/src/http/server.rs | 20 +++++--------------- linkerd/app/outbound/src/ingress.rs | 9 +++++++++ linkerd/app/outbound/src/protocol.rs | 9 +++++++++ linkerd/app/outbound/src/sidecar.rs | 9 +++++++++ 4 files changed, 32 insertions(+), 15 deletions(-) diff --git a/linkerd/app/outbound/src/http/server.rs b/linkerd/app/outbound/src/http/server.rs index 2ba08695ab..32221e0d92 100644 --- a/linkerd/app/outbound/src/http/server.rs +++ b/linkerd/app/outbound/src/http/server.rs @@ -1,10 +1,9 @@ -use super::{IdentityRequired, Logical, ProxyConnectionClose}; +use super::{IdentityRequired, ProxyConnectionClose}; use crate::{http, trace_labels, Outbound}; use linkerd_app_core::{ errors, http_tracing, io, svc::{self, ExtractParam}, - transport::addrs::*, - Addr, Error, Result, + Error, Result, }; #[derive(Copy, Clone, Debug)] @@ -12,9 +11,6 @@ pub(crate) struct ServerRescue { emit_headers: bool, } -#[derive(Copy, Clone, Debug)] -struct Target(T); - impl Outbound { /// Builds a [`svc::NewService`] stack that prepares HTTP requests to be /// proxied. @@ -37,7 +33,8 @@ impl Outbound { >, > where - T: svc::Param + 'static, + // Target + T: svc::Param, // HTTP outbound stack N: svc::NewService + Clone + Send + Sync + 'static, NSvc: svc::Service, Response = http::Response>, @@ -80,14 +77,7 @@ impl Outbound { ) // Convert origin form HTTP/1 URIs to absolute form for Hyper's // `Client`. - .push(http::NewNormalizeUri::layer_via(|target: &T| { - let addr = match target.param() { - Logical::Route(addr, _) => Addr::from(addr), - Logical::Forward(Remote(ServerAddr(addr)), _) => Addr::from(addr), - }; - - http::normalize_uri::DefaultAuthority(Some(addr.to_http_authority())) - })) + .push(http::NewNormalizeUri::layer()) // Record when a HTTP/1 URI originated in absolute form .push_on_service(http::normalize_uri::MarkAbsoluteForm::layer()) .push(svc::ArcNewService::layer()) diff --git a/linkerd/app/outbound/src/ingress.rs b/linkerd/app/outbound/src/ingress.rs index c24679fc5c..b66f7ae806 100644 --- a/linkerd/app/outbound/src/ingress.rs +++ b/linkerd/app/outbound/src/ingress.rs @@ -258,6 +258,15 @@ impl svc::Param for Http { } } +impl svc::Param for Http { + fn param(&self) -> http::normalize_uri::DefaultAuthority { + http::normalize_uri::DefaultAuthority(match &self.parent { + http::Logical::Route(addr, _) => Some(addr.as_http_authority()), + http::Logical::Forward(..) => None, + }) + } +} + impl TryFrom>> for Http { type Error = ProfileRequired; diff --git a/linkerd/app/outbound/src/protocol.rs b/linkerd/app/outbound/src/protocol.rs index 2915a0b0d7..79146776a0 100644 --- a/linkerd/app/outbound/src/protocol.rs +++ b/linkerd/app/outbound/src/protocol.rs @@ -132,3 +132,12 @@ where self.parent.param() } } + +impl svc::Param for Http +where + T: svc::Param, +{ + fn param(&self) -> http::normalize_uri::DefaultAuthority { + self.parent.param() + } +} diff --git a/linkerd/app/outbound/src/sidecar.rs b/linkerd/app/outbound/src/sidecar.rs index 3cbdb6ceec..72a6d72b95 100644 --- a/linkerd/app/outbound/src/sidecar.rs +++ b/linkerd/app/outbound/src/sidecar.rs @@ -105,6 +105,15 @@ impl svc::Param for Sidecar { } } +impl svc::Param for Sidecar { + fn param(&self) -> http::normalize_uri::DefaultAuthority { + http::normalize_uri::DefaultAuthority(self.profile.as_ref().and_then(|p| { + p.logical_addr() + .map(|profiles::LogicalAddr(a)| a.as_http_authority()) + })) + } +} + impl svc::Param for Sidecar { fn param(&self) -> http::Logical { if let Some(profile) = self.profile.clone() { From 3b8dae0379cfc97681dbb5abf736659e7b7574fc Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Sun, 19 Feb 2023 20:59:06 +0000 Subject: [PATCH 02/13] outbound: Introduce a new outbound-route watch type To support new outbound route types, this change adds an `outbound::http::Routes` type that is an enum of available logical router configurations. Accordingly, the Profile-specific routing stack is moved to a new `http::logical::profile` sub-module. This sets the stage to further adapt routing configurations. --- linkerd/app/gateway/src/http.rs | 82 +++- linkerd/app/gateway/src/http/tests.rs | 14 + linkerd/app/gateway/src/server.rs | 7 + linkerd/app/outbound/src/discover.rs | 8 + linkerd/app/outbound/src/http.rs | 104 +++-- linkerd/app/outbound/src/http/concrete.rs | 16 +- linkerd/app/outbound/src/http/logical.rs | 425 +++++------------- .../app/outbound/src/http/logical/profile.rs | 302 +++++++++++++ linkerd/app/outbound/src/ingress.rs | 144 ++++-- linkerd/app/outbound/src/protocol.rs | 17 +- linkerd/app/outbound/src/sidecar.rs | 125 ++++-- linkerd/proxy/http/src/header_from_target.rs | 22 +- linkerd/service-profiles/src/lib.rs | 12 +- 13 files changed, 805 insertions(+), 473 deletions(-) create mode 100644 linkerd/app/outbound/src/http/logical/profile.rs diff --git a/linkerd/app/gateway/src/http.rs b/linkerd/app/gateway/src/http.rs index 2445ff2e48..e23254d846 100644 --- a/linkerd/app/gateway/src/http.rs +++ b/linkerd/app/gateway/src/http.rs @@ -18,8 +18,8 @@ use linkerd_app_outbound as outbound; use std::{ cmp::{Eq, PartialEq}, fmt::Debug, - hash::Hash, }; +use tokio::sync::watch; mod gateway; #[cfg(test)] @@ -28,10 +28,10 @@ mod tests; pub(crate) use self::gateway::NewHttpGateway; /// Target for outbound HTTP gateway stacks. -#[derive(Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Clone, Debug)] pub struct Target { addr: GatewayAddr, - target: outbound::http::Logical, + routes: watch::Receiver, version: http::Version, parent: T, } @@ -73,7 +73,7 @@ impl Gateway { T: svc::Param, T: svc::Param, T: svc::Param, - T: svc::Param>, + T: svc::Param>>, T: svc::Param, T: svc::Param, T: Clone + Send + Sync + Unpin + 'static, @@ -82,7 +82,7 @@ impl Gateway { // HTTP outbound stack. N: svc::NewService< outbound::http::concrete::Endpoint< - outbound::http::logical::Concrete, + outbound::http::logical::Concrete>, >, Service = NSvc, >, @@ -117,21 +117,22 @@ impl Gateway { // Only permit gateway traffic to endpoints for which we have // discovery information. .push_filter(|(_, parent): (_, T)| -> Result<_, GatewayDomainInvalid> { - let target = { - let profile = svc::Param::>::param(&parent) - .ok_or(GatewayDomainInvalid)?; - - if let Some(profiles::LogicalAddr(addr)) = profile.logical_addr() { - outbound::http::Logical::Route(addr, profile) - } else if let Some((addr, metadata)) = profile.endpoint() { - outbound::http::Logical::Forward(Remote(ServerAddr(addr)), metadata) - } else { - return Err(GatewayDomainInvalid); - } + let routes = { + let profile = + svc::Param::>>::param(&parent) + .ok_or(GatewayDomainInvalid)?; + + let mut route = mk_route(&*profile.borrow()).ok_or(GatewayDomainInvalid)?; + outbound::http::spawn_routes(profile, move |p: &profiles::Profile| { + if let Some(r) = mk_route(p) { + route = r; + } + route.clone() + }) }; Ok(Target { - target, + routes, addr: parent.param(), version: parent.param(), parent, @@ -152,6 +153,27 @@ impl Gateway { } } +fn mk_route(profile: &profiles::Profile) -> Option { + if let Some(addr) = profile.addr.clone() { + return Some(outbound::http::Routes::Profile( + outbound::http::ProfileRoutes { + addr, + routes: profile.http_routes.clone(), + targets: profile.targets.clone(), + }, + )); + } + + if let Some((addr, metadata)) = profile.endpoint.clone() { + return Some(outbound::http::Routes::Endpoint( + Remote(ServerAddr(addr)), + metadata, + )); + } + + None +} + // === impl ByRequestVersion === impl svc::router::SelectRoute> for ByRequestVersion { @@ -171,7 +193,7 @@ impl Target { fn discard_parent(self) -> Target { Target { addr: self.addr, - target: self.target, + routes: self.routes, version: self.version, parent: (), } @@ -199,8 +221,26 @@ where } } -impl svc::Param for Target { - fn param(&self) -> outbound::http::Logical { - self.target.clone() +impl svc::Param> for Target { + fn param(&self) -> watch::Receiver { + self.routes.clone() + } +} + +// Implement PartialEq, Eq, and Hash for Target, ignoring the watch. + +impl PartialEq for Target { + fn eq(&self, other: &Self) -> bool { + self.addr == other.addr && self.version == other.version + } +} + +impl Eq for Target {} + +impl std::hash::Hash for Target { + fn hash(&self, state: &mut H) { + self.addr.hash(state); + self.version.hash(state); + self.parent.hash(state); } } diff --git a/linkerd/app/gateway/src/http/tests.rs b/linkerd/app/gateway/src/http/tests.rs index d65dd3818f..f70014e7cd 100644 --- a/linkerd/app/gateway/src/http/tests.rs +++ b/linkerd/app/gateway/src/http/tests.rs @@ -94,6 +94,20 @@ async fn upgraded_request_remains_relative_form() { } } + impl svc::Param>> for Target { + fn param(&self) -> Option> { + Some( + linkerd_app_test::profile::only(profiles::Profile { + addr: Some(profiles::LogicalAddr( + "web.test.example.com:80".parse().unwrap(), + )), + ..profiles::Profile::default() + }) + .into(), + ) + } + } + impl svc::Param for Target { fn param(&self) -> http::Version { http::Version::H2 diff --git a/linkerd/app/gateway/src/server.rs b/linkerd/app/gateway/src/server.rs index 4d26fef28e..af7f3bba75 100644 --- a/linkerd/app/gateway/src/server.rs +++ b/linkerd/app/gateway/src/server.rs @@ -6,6 +6,7 @@ use linkerd_app_core::{ use linkerd_app_inbound::{self as inbound, GatewayAddr, GatewayDomainInvalid}; use linkerd_app_outbound::{self as outbound}; use std::fmt::Debug; +use tokio::sync::watch; /// Target for HTTP stacks. #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -173,6 +174,12 @@ impl svc::Param> for Http { } } +impl svc::Param>> for Http { + fn param(&self) -> Option> { + self.parent.param() + } +} + impl svc::Param for Http where T: svc::Param, diff --git a/linkerd/app/outbound/src/discover.rs b/linkerd/app/outbound/src/discover.rs index 28b6aba9fd..097b547417 100644 --- a/linkerd/app/outbound/src/discover.rs +++ b/linkerd/app/outbound/src/discover.rs @@ -8,6 +8,7 @@ use std::{ hash::{Hash, Hasher}, ops::Deref, }; +use tokio::sync::watch; use tracing::debug; #[cfg(test)] @@ -87,6 +88,13 @@ impl svc::Param> for Discovery { } } +impl svc::Param>> for Discovery { + fn param(&self) -> Option> { + let p = self.profile.clone()?; + Some(p.into()) + } +} + impl svc::Param> for Discovery { fn param(&self) -> Option { self.profile.as_ref().and_then(|p| p.logical_addr()) diff --git a/linkerd/app/outbound/src/http.rs b/linkerd/app/outbound/src/http.rs index fd90f35363..a476396723 100644 --- a/linkerd/app/outbound/src/http.rs +++ b/linkerd/app/outbound/src/http.rs @@ -9,9 +9,12 @@ use linkerd_app_core::{ api_resolve::{ConcreteAddr, Metadata}, core::Resolve, }, - svc, Error, + svc, + transport::addrs::*, + Error, }; use std::{fmt::Debug, hash::Hash}; +use tokio::sync::watch; pub mod concrete; mod endpoint; @@ -22,14 +25,46 @@ mod retry; mod server; mod strip_proxy_error; -pub use self::logical::Logical; +pub use self::logical::{LogicalAddr, ProfileRoutes, Routes}; pub(crate) use self::require_id_header::IdentityRequired; pub use linkerd_app_core::proxy::http::{self as http, *}; #[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub struct Http { - target: Logical, - version: http::Version, +pub struct Http(T); + +pub fn spawn_routes( + mut profile_rx: watch::Receiver, + mut mk: impl FnMut(&profiles::Profile) -> Routes + Send + Sync + 'static, +) -> watch::Receiver { + let (tx, rx) = { + let routes = (mk)(&*profile_rx.borrow()); + watch::channel(routes) + }; + + tokio::spawn(async move { + loop { + tokio::select! { + biased; + _ = tx.closed() => return, + _ = profile_rx.changed() => { + let routes = (mk)(&*profile_rx.borrow()); + if tx.send(routes).is_err() { + return; + } + } + } + } + }); + + rx +} + +pub fn spawn_routes_default(addr: Remote) -> watch::Receiver { + let (tx, rx) = watch::channel(Routes::Endpoint(addr, Default::default())); + tokio::spawn(async move { + tx.closed().await; + }); + rx } // === impl Outbound === @@ -55,12 +90,12 @@ impl Outbound { where // Logical HTTP target. T: svc::Param, - T: svc::Param, - T: Clone + Send + Sync + 'static, + T: svc::Param>, + T: Clone + Debug + PartialEq + Eq + Hash + Send + Sync + 'static, // Endpoint resolution. R: Resolve, // HTTP client stack - N: svc::NewService>, Service = NSvc>, + N: svc::NewService>>, Service = NSvc>, N: Clone + Send + Sync + Unpin + 'static, NSvc: svc::Service< http::Request, @@ -75,11 +110,7 @@ impl Outbound { .push_http_logical() .map_stack(move |config, _, stk| { stk.push_new_idle_cached(config.discovery_idle_timeout) - // Use a dedicated target type to configure parameters for the - // HTTP stack. It also helps narrow the cache key to just the - // logical target and HTTP version, discarding any other target - // information. - .push_map_target(Http::new) + .push_map_target(Http) .push(svc::ArcNewService::layer()) }) } @@ -87,45 +118,20 @@ impl Outbound { // === impl Http === -impl Http { - pub fn new(parent: T) -> Self - where - T: svc::Param, - T: svc::Param, - { - Self { - target: parent.param(), - version: parent.param(), - } - } -} - -impl svc::Param for Http { +impl svc::Param for Http +where + T: svc::Param, +{ fn param(&self) -> http::Version { - self.version - } -} - -impl svc::Param for Http { - fn param(&self) -> Logical { - self.target.clone() + self.0.param() } } -impl svc::Param> for Http { - fn param(&self) -> Option { - match self.target { - Logical::Route(ref addr, _) => Some(profiles::LogicalAddr(addr.clone())), - Logical::Forward(_, _) => None, - } - } -} - -impl svc::Param> for Http { - fn param(&self) -> Option { - match self.target { - Logical::Route(_, ref rx) => Some(rx.clone()), - Logical::Forward(_, _) => None, - } +impl svc::Param> for Http +where + T: svc::Param>, +{ + fn param(&self) -> watch::Receiver { + self.0.param() } } diff --git a/linkerd/app/outbound/src/http/concrete.rs b/linkerd/app/outbound/src/http/concrete.rs index 838d556ca2..2bae3aaccb 100644 --- a/linkerd/app/outbound/src/http/concrete.rs +++ b/linkerd/app/outbound/src/http/concrete.rs @@ -222,7 +222,7 @@ impl svc::Param> for Endpoint { impl svc::Param for Endpoint where - T: svc::Param>, + T: svc::Param>, { fn param(&self) -> transport::labels::Key { transport::labels::Key::OutboundClient(self.param()) @@ -231,16 +231,11 @@ where impl svc::Param for Endpoint where - T: svc::Param>, + T: svc::Param>, { fn param(&self) -> metrics::OutboundEndpointLabels { - let authority = self - .parent - .param() - .as_ref() - .map(|profiles::LogicalAddr(a)| a.as_http_authority()); metrics::OutboundEndpointLabels { - authority, + authority: self.parent.param(), labels: metrics::prefix_labels("dst", self.metadata.labels().iter()), server_id: self.param(), target_addr: self.addr.into(), @@ -250,10 +245,10 @@ where impl svc::Param for Endpoint where - T: svc::Param>, + T: svc::Param>, { fn param(&self) -> metrics::EndpointLabels { - metrics::EndpointLabels::from(svc::Param::::param(self)) + metrics::EndpointLabels::Outbound(self.param()) } } @@ -335,6 +330,7 @@ impl tap::Inspect for Endpoint { } fn route_labels(&self, req: &http::Request) -> Option { + // FIXME(ver) create a dedicated extension type for route labels. req.extensions() .get::() .map(|r| r.labels().clone()) diff --git a/linkerd/app/outbound/src/http/logical.rs b/linkerd/app/outbound/src/http/logical.rs index ce1fe5cffe..470a9f09db 100644 --- a/linkerd/app/outbound/src/http/logical.rs +++ b/linkerd/app/outbound/src/http/logical.rs @@ -1,31 +1,36 @@ //! A stack that routes HTTP requests to concrete backends. -use super::{concrete, retry}; +use super::concrete; use crate::Outbound; use linkerd_app_core::{ - classify, metrics, - profiles::{self, Profile}, - proxy::{ - api_resolve::Metadata, - http::{self, balance}, - }, + metrics, + proxy::{api_resolve::Metadata, http}, svc, transport::addrs::*, - Error, Infallible, NameAddr, CANONICAL_DST_HEADER, + Addr, Error, Infallible, NameAddr, CANONICAL_DST_HEADER, }; use linkerd_distribute as distribute; -use std::{fmt::Debug, hash::Hash, sync::Arc, time}; +use std::{fmt::Debug, hash::Hash}; use tokio::sync::watch; -#[derive(Clone, Debug)] -pub enum Logical { - Route(NameAddr, profiles::Receiver), - Forward(Remote, Metadata), +mod profile; + +pub use self::profile::Routes as ProfileRoutes; + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct LogicalAddr(pub Addr); + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum Routes { + Profile(profile::Routes), + // XXX Remove this variant when policy routes are added. + Endpoint(Remote, Metadata), } #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct Concrete { target: concrete::Dispatch, + authority: Option, parent: T, } @@ -36,39 +41,32 @@ pub struct NoRoute; #[derive(Debug, thiserror::Error)] #[error("logical service {addr}: {source}")] pub struct LogicalError { - addr: NameAddr, + addr: Addr, #[source] source: Error, } #[derive(Clone, Debug, PartialEq, Eq)] -struct Params { - parent: T, - addr: NameAddr, - routes: Arc<[(profiles::http::RequestMatch, RouteParams)]>, - backends: distribute::Backends>, -} - -#[derive(Clone, Debug, PartialEq, Eq, Hash)] -struct RouteParams { - parent: T, - addr: NameAddr, - profile: profiles::http::Route, - distribution: Distribution, +enum RouterParams { + Profile(profile::Params), + // XXX Remove this variant when policy routes are added. + Endpoint(Remote, Metadata, T), } type BackendCache = distribute::BackendCache, N, S>; type Distribution = distribute::Distribution>; +// Only applies to requests with profiles. +// +// TODO Add l5d-dst-canonical header to requests. +// +// TODO(ver) move this into the endpoint stack so that we can only +// set this on meshed connections. #[derive(Clone, Debug)] -struct Routable { - parent: T, - addr: NameAddr, - profile: profiles::Receiver, -} +struct CanonicalDstHeader(NameAddr); #[derive(Clone, Debug)] -struct CanonicalDstHeader(NameAddr); +struct Router {} // === impl Outbound === @@ -95,283 +93,123 @@ impl Outbound { > where // Logical target. - T: svc::Param, + T: svc::Param>, T: Eq + Hash + Clone + Debug + Send + Sync + 'static, // Concrete stack. N: svc::NewService, Service = NSvc> + Clone + Send + Sync + 'static, - NSvc: svc::Service, Response = http::Response> - + Clone - + Send - + Sync - + 'static, - NSvc::Error: Into, + NSvc: svc::Service< + http::Request, + Response = http::Response, + Error = Error, + >, + NSvc: Clone + Send + Sync + 'static, NSvc::Future: Send, { - self.map_stack(|_, rt, concrete| { - let route = svc::layers() - .push_on_service( - svc::layers() - .push(http::BoxRequest::layer()) - // The router does not take the backend's availability into - // consideration, so we must eagerly fail requests to prevent - // leaking tasks onto the runtime. - .push(svc::LoadShed::layer()), - ) - .push(http::insert::NewInsert::, _>::layer()) - .push( - rt.metrics - .proxy - .http_profile_route_actual - .to_layer::>(), - ) - // Depending on whether or not the request can be - // retried, it may have one of two `Body` types. This - // layer unifies any `Body` type into `BoxBody`. - .push_on_service(http::BoxRequest::erased()) - // Sets an optional retry policy. - .push(retry::layer( - rt.metrics.proxy.http_profile_route_retry.clone(), - )) - // Sets an optional request timeout. - .push(http::NewTimeout::layer()) - // Records per-route metrics. - .push( - rt.metrics - .proxy - .http_profile_route - .to_layer::>(), - ) - // Sets the per-route response classifier as a request - // extension. - .push(classify::NewClassify::layer()) - // TODO(ver) .push(svc::NewMapErr::layer_from_target::()) - .push_on_service(http::BoxResponse::layer()); - - // A `NewService`--instantiated once per logical target--that caches - // a set of concrete services so that, as the watch provides new - // `Params`, we can reuse inner services. - let router = svc::layers() - // Each `RouteParams` provides a `Distribution` that is used to - // choose a concrete service for a given route. - .push(BackendCache::layer()) - // Lazily cache a service for each `RouteParams` - // returned from the `SelectRoute` impl. - .push_on_service(route) - .push(svc::NewOneshotRoute::, _, _>::layer_cached()); - + self.map_stack(|_config, rt, concrete| { // For each `T` target, watch its `Profile`, rebuilding a // router stack. let watch = concrete - .clone() // Share the concrete stack with each router stack. .lift_new() - // Rebuild this router stack every time the profile changes. - .push_on_service(router) - .push(svc::NewSpawnWatch::::layer_into::>()); + .push_on_service(router_layer(rt.metrics.proxy.clone())) + // Rebuild the inner router stack every time the watch changes. + .push(svc::NewSpawnWatch::::layer_into::>()); watch - // Add l5d-dst-canonical header to requests. - // - // TODO(ver) move this into the endpoint stack so that we can only - // set this on meshed connections. - // - // TODO(ver) do we need to strip headers here? - .push(http::NewHeaderFromTarget::::layer()) - .push(svc::NewMapErr::layer_from_target::()) - .push_switch( - |parent: T| -> Result<_, Infallible> { - Ok(match parent.param() { - Logical::Route(addr, profile) => svc::Either::A(Routable { - addr, - parent, - profile, - }), - Logical::Forward(addr, meta) => svc::Either::B(Concrete { - target: concrete::Dispatch::Forward(addr, meta), - parent, - }), - }) - }, - concrete.into_inner(), - ) + .push_on_service(svc::MapErr::layer_boxed()) .push(svc::ArcNewService::layer()) }) } } -// === impl Routable === - -impl svc::Param> for Routable { - fn param(&self) -> watch::Receiver { - self.profile.clone().into() - } -} - -impl svc::Param for Routable { - fn param(&self) -> CanonicalDstHeader { - CanonicalDstHeader(self.addr.clone()) - } -} - -// === impl Params === - -impl From<(Profile, Routable)> for Params +fn router_layer( + metrics: metrics::Proxy, +) -> impl svc::Layer< + N, + Service = svc::ArcNewService< + RouterParams, + impl svc::Service< + http::Request, + Response = http::Response, + Error = Error, + Future = impl Send, + > + Clone, + >, +> + Clone where - T: Eq + Hash + Clone + Debug, + T: Clone + Debug + Eq + Hash + Send + Sync + 'static, + N: svc::NewService, Service = S>, + N: Clone + Send + Sync + 'static, + S: svc::Service< + http::Request, + Response = http::Response, + Error = Error, + >, + S: Clone + Send + Sync + 'static, + S::Future: Send, { - fn from((profile, routable): (Profile, Routable)) -> Self { - const EWMA: balance::EwmaConfig = balance::EwmaConfig { - default_rtt: time::Duration::from_millis(30), - decay: time::Duration::from_secs(10), - }; - - // Create concrete targets for all of the profile's routes. - let (backends, distribution) = if profile.targets.is_empty() { - let concrete = Concrete { - target: concrete::Dispatch::Balance(routable.addr.clone(), EWMA), - parent: routable.parent.clone(), - }; - let backends = std::iter::once(concrete.clone()).collect(); - let distribution = Distribution::first_available(std::iter::once(concrete)); - (backends, distribution) - } else { - let backends = profile - .targets - .iter() - .map(|t| Concrete { - target: concrete::Dispatch::Balance(t.addr.clone(), EWMA), - parent: routable.parent.clone(), - }) - .collect(); - let distribution = Distribution::random_available(profile.targets.iter().cloned().map( - |profiles::Target { addr, weight }| { - let concrete = Concrete { - target: concrete::Dispatch::Balance(addr, EWMA), - parent: routable.parent.clone(), - }; - (concrete, weight) + svc::layer::mk(move |concrete: N| { + svc::stack(concrete.clone()) + .push_switch( + |prms: RouterParams| { + Ok::<_, Infallible>(match prms { + RouterParams::Endpoint(remote, meta, parent) => svc::Either::A(Concrete { + target: concrete::Dispatch::Forward(remote, meta), + authority: None, + parent, + }), + RouterParams::Profile(ps) => svc::Either::B(ps), + }) }, - )) - .expect("distribution must be valid"); - - (backends, distribution) - }; - - let routes = profile - .http_routes - .iter() - .cloned() - .map(|(req_match, profile)| { - let params = RouteParams { - addr: routable.addr.clone(), - profile, - parent: routable.parent.clone(), - distribution: distribution.clone(), - }; - (req_match, params) - }) - // Add a default route. - .chain(std::iter::once(( - profiles::http::RequestMatch::default(), - RouteParams { - addr: routable.addr.clone(), - profile: Default::default(), - parent: routable.parent.clone(), - distribution: distribution.clone(), - }, - ))) - .collect::>(); - - Self { - addr: routable.addr, - parent: routable.parent, - backends, - routes, - } - } + svc::stack(concrete) + .push(profile::layer(metrics.clone())) + .into_inner(), + ) + .push(svc::NewMapErr::layer_from_target::()) + .push_on_service(svc::MapErr::layer_boxed()) + .push(svc::ArcNewService::layer()) + .into_inner() + }) } -impl svc::Param>> for Params -where - T: Eq + Hash + Clone + Debug, -{ - fn param(&self) -> distribute::Backends> { - self.backends.clone() - } -} +// === impl RouterParams === -impl svc::Param for Params +impl From<(Routes, T)> for RouterParams where T: Eq + Hash + Clone + Debug, { - fn param(&self) -> profiles::LogicalAddr { - profiles::LogicalAddr(self.addr.clone()) + fn from((routes, parent): (Routes, T)) -> Self { + match routes { + Routes::Profile(routes) => Self::Profile((routes, parent).into()), + Routes::Endpoint(addr, metadata) => Self::Endpoint(addr, metadata, parent), + } } } -impl svc::router::SelectRoute> for Params +impl svc::Param for RouterParams where - T: Eq + Hash + Clone + Debug, + T: Clone + Debug + Eq + Hash, { - type Key = RouteParams; - type Error = NoRoute; - - fn select(&self, req: &http::Request) -> Result { - profiles::http::route_for_request(&*self.routes, req) - .ok_or(NoRoute) - .cloned() - } -} - -// === impl RouteParams === - -impl svc::Param for RouteParams { - fn param(&self) -> profiles::LogicalAddr { - profiles::LogicalAddr(self.addr.clone()) - } -} - -impl svc::Param> for RouteParams { - fn param(&self) -> Distribution { - self.distribution.clone() - } -} - -impl svc::Param for RouteParams { - fn param(&self) -> profiles::http::Route { - self.profile.clone() - } -} - -impl svc::Param for RouteParams { - fn param(&self) -> metrics::ProfileRouteLabels { - metrics::ProfileRouteLabels::outbound( - profiles::LogicalAddr(self.addr.clone()), - &self.profile, - ) - } -} - -impl svc::Param for RouteParams { - fn param(&self) -> http::ResponseTimeout { - http::ResponseTimeout(self.profile.timeout()) - } -} - -impl svc::Param for RouteParams { - fn param(&self) -> classify::Request { - self.profile.response_classes().clone().into() + fn param(&self) -> LogicalAddr { + LogicalAddr(match self { + Self::Profile(ref p) => { + let profile::LogicalAddr(addr) = p.param(); + addr.into() + } + Self::Endpoint(Remote(ServerAddr(ref addr)), ..) => (*addr).into(), + }) } } // === impl LogicalError === -impl From<(&Routable, Error)> for LogicalError { - fn from((target, source): (&Routable, Error)) -> Self { - Self { - addr: target.addr.clone(), - source, - } +impl From<(&RouterParams, Error)> for LogicalError +where + T: Eq + Hash + Clone + Debug, +{ + fn from((target, source): (&RouterParams, Error)) -> Self { + let LogicalAddr(addr) = svc::Param::param(target); + Self { addr, source } } } @@ -386,12 +224,9 @@ where } } -impl svc::Param> for Concrete -where - T: svc::Param>, -{ - fn param(&self) -> Option { - self.parent.param() +impl svc::Param> for Concrete { + fn param(&self) -> Option { + self.authority.clone() } } @@ -401,36 +236,6 @@ impl svc::Param for Concrete { } } -// === impl Logical === - -impl std::cmp::PartialEq for Logical { - fn eq(&self, other: &Self) -> bool { - match (self, other) { - (Self::Route(laddr, _), Self::Route(raddr, _)) => laddr == raddr, - (Self::Forward(laddr, lmeta), Self::Forward(raddr, rmeta)) => { - laddr == raddr && lmeta == rmeta - } - _ => false, - } - } -} - -impl std::cmp::Eq for Logical {} - -impl std::hash::Hash for Logical { - fn hash(&self, state: &mut H) { - match self { - Self::Route(addr, _) => { - addr.hash(state); - } - Self::Forward(addr, meta) => { - addr.hash(state); - meta.hash(state); - } - } - } -} - // === impl CanonicalDstHeader === impl From for http::HeaderPair { diff --git a/linkerd/app/outbound/src/http/logical/profile.rs b/linkerd/app/outbound/src/http/logical/profile.rs new file mode 100644 index 0000000000..ca974d8fa2 --- /dev/null +++ b/linkerd/app/outbound/src/http/logical/profile.rs @@ -0,0 +1,302 @@ +use super::{ + super::{concrete, retry}, + BackendCache, Concrete, Distribution, NoRoute, +}; +use linkerd_app_core::{ + classify, metrics, + proxy::http::{self, balance}, + svc, Error, +}; +use linkerd_distribute as distribute; +use std::{fmt::Debug, hash::Hash, sync::Arc, time}; + +pub use linkerd_app_core::profiles::{ + http::{route_for_request, RequestMatch, Route}, + LogicalAddr, Profile, Target, +}; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct Routes { + pub addr: LogicalAddr, + pub routes: Arc<[(RequestMatch, Route)]>, + pub targets: Arc<[Target]>, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(super) struct Params { + parent: T, + addr: LogicalAddr, + profile_routes: Arc<[(RequestMatch, RouteParams)]>, + backends: distribute::Backends>, +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub(super) struct RouteParams { + parent: T, + addr: LogicalAddr, + profile: Route, + distribution: Distribution, +} + +// Wraps a `NewService`--instantiated once per logical target--that caches a set +// of concrete services so that, as the watch provides new `Params`, we can +// reuse inner services. +pub(super) fn layer( + metrics: metrics::Proxy, +) -> impl svc::Layer< + N, + Service = svc::ArcNewService< + Params, + impl svc::Service< + http::Request, + Response = http::Response, + Error = Error, + Future = impl Send, + > + Clone, + >, +> + Clone +where + T: Clone + Debug + Eq + Hash + Send + Sync + 'static, + N: svc::NewService, Service = S> + Clone + Send + Sync + 'static, + S: svc::Service< + http::Request, + Response = http::Response, + Error = Error, + >, + S: Clone + Send + Sync + 'static, + S::Future: Send, +{ + svc::layer::mk(move |inner| { + svc::stack(inner) + // Each `RouteParams` provides a `Distribution` that is used to + // choose a concrete service for a given route. + .push(BackendCache::layer()) + // Lazily cache a service for each `RouteParams` + // returned from the `SelectRoute` impl. + .push_on_service(route_layer(metrics.clone())) + .push(svc::NewOneshotRoute::, _, _>::layer_cached()) + .push(svc::ArcNewService::layer()) + .into_inner() + }) +} + +fn route_layer( + metrics: metrics::Proxy, +) -> impl svc::Layer< + N, + Service = svc::ArcNewService< + RouteParams, + impl svc::Service< + http::Request, + Response = http::Response, + Error = Error, + Future = impl Send, + > + Clone, + >, +> + Clone +where + T: Clone + Debug + Eq + Hash + Send + Sync + 'static, + N: svc::NewService, Service = S> + Clone + Send + Sync + 'static, + S: svc::Service< + http::Request, + Response = http::Response, + Error = Error, + >, + S: Clone + Send + Sync + 'static, + S::Future: Send, +{ + svc::layer::mk(move |inner| { + svc::stack(inner) + .check_new_service::, http::Request>() + .push_on_service( + svc::layers() + .push(http::BoxRequest::layer()) + // The router does not take the backend's availability into + // consideration, so we must eagerly fail requests to prevent + // leaking tasks onto the runtime. + .push(svc::LoadShed::layer()), + ) + .push(http::insert::NewInsert::::layer()) + .push( + metrics + .http_profile_route_actual + .to_layer::>(), + ) + // Depending on whether or not the request can be + // retried, it may have one of two `Body` types. This + // layer unifies any `Body` type into `BoxBody`. + .push_on_service(http::BoxRequest::erased()) + // Sets an optional retry policy. + .push(retry::layer(metrics.http_profile_route_retry.clone())) + // Sets an optional request timeout. + .push(http::NewTimeout::layer()) + // Records per-route metrics. + .push( + metrics + .http_profile_route + .to_layer::>(), + ) + // Sets the per-route response classifier as a request + // extension. + .push(classify::NewClassify::layer()) + // TODO(ver) .push(svc::NewMapErr::layer_from_target::()) + .push_on_service(http::BoxResponse::layer()) + .push(svc::ArcNewService::layer()) + .into_inner() + }) +} + +// === impl Params === + +impl From<(Routes, T)> for Params +where + T: Eq + Hash + Clone + Debug, +{ + fn from((routes, parent): (Routes, T)) -> Self { + let Routes { + addr: LogicalAddr(addr), + routes, + targets, + } = routes; + + const EWMA: balance::EwmaConfig = balance::EwmaConfig { + default_rtt: time::Duration::from_millis(30), + decay: time::Duration::from_secs(10), + }; + + // Create concrete targets for all of the profile's routes. + let (backends, distribution) = if targets.is_empty() { + let concrete = Concrete { + target: concrete::Dispatch::Balance(addr.clone(), EWMA), + authority: Some(addr.as_http_authority()), + parent: parent.clone(), + }; + let backends = std::iter::once(concrete.clone()).collect(); + let distribution = Distribution::first_available(std::iter::once(concrete)); + (backends, distribution) + } else { + let backends = targets + .iter() + .map(|t| Concrete { + target: concrete::Dispatch::Balance(t.addr.clone(), EWMA), + authority: Some(t.addr.as_http_authority()), + parent: parent.clone(), + }) + .collect(); + let distribution = Distribution::random_available(targets.iter().cloned().map( + |Target { addr, weight }| { + let concrete = Concrete { + authority: Some(addr.as_http_authority()), + target: concrete::Dispatch::Balance(addr, EWMA), + parent: parent.clone(), + }; + (concrete, weight) + }, + )) + .expect("distribution must be valid"); + + (backends, distribution) + }; + + let profile_routes = routes + .iter() + .cloned() + .map(|(req_match, profile)| { + let params = RouteParams { + addr: LogicalAddr(addr.clone()), + profile, + parent: parent.clone(), + distribution: distribution.clone(), + }; + (req_match, params) + }) + // Add a default route. + .chain(std::iter::once(( + RequestMatch::default(), + RouteParams { + addr: LogicalAddr(addr.clone()), + profile: Default::default(), + parent: parent.clone(), + distribution: distribution.clone(), + }, + ))) + .collect::>(); + + Self { + addr: LogicalAddr(addr), + parent, + backends, + profile_routes, + } + } +} + +impl svc::Param>> for Params +where + T: Eq + Hash + Clone + Debug, +{ + fn param(&self) -> distribute::Backends> { + self.backends.clone() + } +} + +impl svc::Param for Params +where + T: Eq + Hash + Clone + Debug, +{ + fn param(&self) -> LogicalAddr { + self.addr.clone() + } +} + +impl svc::router::SelectRoute> for Params +where + T: Eq + Hash + Clone + Debug, +{ + type Key = RouteParams; + type Error = NoRoute; + + fn select(&self, req: &http::Request) -> Result { + route_for_request(&*self.profile_routes, req) + .ok_or(NoRoute) + .cloned() + } +} + +// === impl RouteParams === + +impl svc::Param for RouteParams { + fn param(&self) -> LogicalAddr { + self.addr.clone() + } +} + +impl svc::Param> for RouteParams { + fn param(&self) -> Distribution { + self.distribution.clone() + } +} + +impl svc::Param for RouteParams { + fn param(&self) -> Route { + self.profile.clone() + } +} + +impl svc::Param for RouteParams { + fn param(&self) -> metrics::ProfileRouteLabels { + metrics::ProfileRouteLabels::outbound(self.addr.clone(), &self.profile) + } +} + +impl svc::Param for RouteParams { + fn param(&self) -> http::ResponseTimeout { + http::ResponseTimeout(self.profile.timeout()) + } +} + +impl svc::Param for RouteParams { + fn param(&self) -> classify::Request { + self.profile.response_classes().clone().into() + } +} diff --git a/linkerd/app/outbound/src/ingress.rs b/linkerd/app/outbound/src/ingress.rs index b66f7ae806..5dd8c6a764 100644 --- a/linkerd/app/outbound/src/ingress.rs +++ b/linkerd/app/outbound/src/ingress.rs @@ -8,10 +8,11 @@ use linkerd_app_core::{ }, svc::{self, stack::Param}, transport::addrs::*, - Error, Infallible, NameAddr, Result, + Addr, Error, Infallible, NameAddr, Result, }; use std::fmt::Debug; use thiserror::Error; +use tokio::sync::watch; #[derive(Clone, Debug, PartialEq, Eq, Hash)] struct Http { @@ -31,6 +32,12 @@ enum RequestTarget { Orig(OrigDstAddr), } +#[derive(Clone, Debug)] +struct Logical { + addr: Addr, + routes: watch::Receiver, +} + #[derive(Debug, Error)] #[error("ingress-mode routing requires a service profile: {0}")] struct ProfileRequired(NameAddr); @@ -79,7 +86,7 @@ impl Outbound<()> { .push_http_cached(resolve) .push_http_server() .map_stack(|_, _, stk| { - stk.check_new_service::, _>() + stk.check_new_service::, _>() .push_filter(Http::try_from) }) .push_discover(profiles); @@ -252,69 +259,132 @@ impl Param for Http { } } -impl svc::Param for Http { - fn param(&self) -> http::Logical { - self.parent.clone() +impl svc::Param for Http { + fn param(&self) -> http::LogicalAddr { + http::LogicalAddr(self.parent.addr.clone()) } } -impl svc::Param for Http { +impl svc::Param for Http { fn param(&self) -> http::normalize_uri::DefaultAuthority { - http::normalize_uri::DefaultAuthority(match &self.parent { - http::Logical::Route(addr, _) => Some(addr.as_http_authority()), - http::Logical::Forward(..) => None, - }) + http::normalize_uri::DefaultAuthority(Some(self.parent.addr.to_http_authority())) } } -impl TryFrom>> for Http { +impl svc::Param> for Http { + fn param(&self) -> watch::Receiver { + self.parent.routes.clone() + } +} + +impl TryFrom>> for Http { type Error = ProfileRequired; fn try_from( parent: discover::Discovery>, ) -> std::result::Result { match ( - &**parent, - svc::Param::>::param(&parent), + (**parent).clone(), + svc::Param::>::param(&parent).map(watch::Receiver::from), ) { - (RequestTarget::Named(addr), Some(profile)) => { - if let Some(profiles::LogicalAddr(addr)) = profile.logical_addr() { - return Ok(Http { - version: (*parent).param(), - parent: http::Logical::Route(addr, profile), - }); - } - - let (addr, metadata) = profile - .endpoint() - .ok_or_else(|| ProfileRequired(addr.clone()))?; + (RequestTarget::Named(addr), profile) => { + let profile = profile.ok_or_else(|| ProfileRequired(addr.clone()))?; + let mut route = + mk_route(&*profile.borrow()).ok_or_else(|| ProfileRequired(addr.clone()))?; + let routes = http::spawn_routes(profile, move |p: &profiles::Profile| { + if let Some(r) = mk_route(p) { + route = r; + } + route.clone() + }); + Ok(Http { version: (*parent).param(), - parent: http::Logical::Forward(Remote(ServerAddr(addr)), metadata), + parent: Logical { + addr: addr.into(), + routes, + }, }) } - (RequestTarget::Named(addr), None) => Err(ProfileRequired(addr.clone())), - - (RequestTarget::Orig(OrigDstAddr(addr)), profile) => { - if let Some(profile) = profile { - if let Some((addr, metadata)) = profile.endpoint() { - return Ok(Http { - version: (*parent).param(), - parent: http::Logical::Forward(Remote(ServerAddr(addr)), metadata), - }); - } - } + (RequestTarget::Orig(OrigDstAddr(addr)), Some(profile)) => { + let route = mk_route(&*profile.borrow()); + let routes = if let Some(route) = route { + let mut route = route; + http::spawn_routes(profile, move |p: &profiles::Profile| { + if let Some(r) = mk_route(p) { + route = r; + } + route.clone() + }) + } else { + http::spawn_routes_default(Remote(ServerAddr(addr))) + }; + Ok(Http { + version: (*parent).param(), + parent: Logical { + addr: addr.into(), + routes, + }, + }) + } + (RequestTarget::Orig(OrigDstAddr(addr)), None) => { + let routes = http::spawn_routes_default(Remote(ServerAddr(addr))); Ok(Http { version: (*parent).param(), - parent: http::Logical::Forward(Remote(ServerAddr(*addr)), Default::default()), + parent: Logical { + addr: addr.into(), + routes, + }, }) } } } } +fn mk_route( + profiles::Profile { + addr, + endpoint, + http_routes, + targets, + .. + }: &profiles::Profile, +) -> Option { + if let Some(addr) = addr.clone() { + return Some(http::Routes::Profile(http::ProfileRoutes { + addr, + routes: http_routes.clone(), + targets: targets.clone(), + })); + } + + if let Some((addr, metadata)) = endpoint.clone() { + return Some(http::Routes::Endpoint(Remote(ServerAddr(addr)), metadata)); + } + + None +} + +mod foo {} + +// === impl Logical === + +impl std::cmp::PartialEq for Logical { + fn eq(&self, other: &Self) -> bool { + self.addr == other.addr + } +} + +impl std::cmp::Eq for Logical {} + +impl std::hash::Hash for Logical { + fn hash(&self, state: &mut H) { + self.addr.hash(state); + } +} + // === impl Opaq === impl std::ops::Deref for Opaq { diff --git a/linkerd/app/outbound/src/protocol.rs b/linkerd/app/outbound/src/protocol.rs index 79146776a0..bc83276bdf 100644 --- a/linkerd/app/outbound/src/protocol.rs +++ b/linkerd/app/outbound/src/protocol.rs @@ -124,15 +124,6 @@ impl svc::Param for Http { } } -impl svc::Param for Http -where - T: svc::Param, -{ - fn param(&self) -> http::Logical { - self.parent.param() - } -} - impl svc::Param for Http where T: svc::Param, @@ -141,3 +132,11 @@ where self.parent.param() } } + +impl std::ops::Deref for Http { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.parent + } +} diff --git a/linkerd/app/outbound/src/sidecar.rs b/linkerd/app/outbound/src/sidecar.rs index 72a6d72b95..da8fe88d38 100644 --- a/linkerd/app/outbound/src/sidecar.rs +++ b/linkerd/app/outbound/src/sidecar.rs @@ -1,4 +1,8 @@ -use crate::{discover, http, opaq, protocol::Protocol, Outbound}; +use crate::{ + discover, http, opaq, + protocol::{self, Protocol}, + Outbound, +}; use linkerd_app_core::{ io, profiles, proxy::{ @@ -10,6 +14,7 @@ use linkerd_app_core::{ Error, }; use std::fmt::Debug; +use tokio::sync::watch; use tracing::info_span; /// A target type holding discovery information for a sidecar proxy. @@ -19,6 +24,15 @@ struct Sidecar { profile: Option, } +#[derive(Clone, Debug)] +struct HttpSidecar { + orig_dst: OrigDstAddr, + version: http::Version, + routes: watch::Receiver, +} + +// === impl Outbound === + impl Outbound<()> { pub fn mk_sidecar(&self, profiles: P, resolve: R) -> svc::ArcNewTcp where @@ -34,12 +48,46 @@ impl Outbound<()> { R: Resolve, { let opaq = self.to_tcp_connect().push_opaq_cached(resolve.clone()); + let http = self .to_tcp_connect() .push_tcp_endpoint() .push_http_tcp_client() .push_http_cached(resolve) - .push_http_server(); + .push_http_server() + .into_stack() + .push_map_target(|parent: protocol::Http| { + let version = svc::Param::::param(&parent); + let orig_dst = (*parent).orig_dst; + let routes = match (*parent).profile.clone() { + Some(profile) => { + http::spawn_routes(profile.into(), move |profile: &profiles::Profile| { + if let Some(addr) = profile.addr.clone() { + return http::Routes::Profile(http::ProfileRoutes { + addr, + routes: profile.http_routes.clone(), + targets: profile.targets.clone(), + }); + } + + if let Some((addr, metadata)) = profile.endpoint.clone() { + return http::Routes::Endpoint(Remote(ServerAddr(addr)), metadata); + } + + http::Routes::Endpoint( + Remote(ServerAddr(*orig_dst)), + Default::default(), + ) + }) + } + None => http::spawn_routes_default(Remote(ServerAddr(*orig_dst))), + }; + HttpSidecar { + orig_dst, + version, + routes, + } + }); opaq.push_protocol(http.into_inner()) // Use a dedicated target type to bind discovery results to the @@ -105,32 +153,6 @@ impl svc::Param for Sidecar { } } -impl svc::Param for Sidecar { - fn param(&self) -> http::normalize_uri::DefaultAuthority { - http::normalize_uri::DefaultAuthority(self.profile.as_ref().and_then(|p| { - p.logical_addr() - .map(|profiles::LogicalAddr(a)| a.as_http_authority()) - })) - } -} - -impl svc::Param for Sidecar { - fn param(&self) -> http::Logical { - if let Some(profile) = self.profile.clone() { - if let Some(profiles::LogicalAddr(addr)) = profile.logical_addr() { - return http::Logical::Route(addr, profile); - } - - if let Some((addr, metadata)) = profile.endpoint() { - return http::Logical::Forward(Remote(ServerAddr(addr)), metadata); - } - } - - let OrigDstAddr(addr) = self.orig_dst; - http::Logical::Forward(Remote(ServerAddr(addr)), Default::default()) - } -} - impl svc::Param for Sidecar { fn param(&self) -> opaq::Logical { if let Some(profile) = self.profile.clone() { @@ -161,3 +183,50 @@ impl std::hash::Hash for Sidecar { self.orig_dst.hash(state); } } + +// === impl HttpSidecar === + +impl svc::Param for HttpSidecar { + fn param(&self) -> http::Version { + self.version + } +} + +impl svc::Param for HttpSidecar { + fn param(&self) -> http::LogicalAddr { + http::LogicalAddr(match *self.routes.borrow() { + http::Routes::Endpoint(Remote(ServerAddr(addr)), ..) => addr.into(), + http::Routes::Profile(ref routes) => routes.addr.0.clone().into(), + }) + } +} + +impl svc::Param> for HttpSidecar { + fn param(&self) -> watch::Receiver { + self.routes.clone() + } +} + +impl svc::Param for HttpSidecar { + fn param(&self) -> http::normalize_uri::DefaultAuthority { + http::normalize_uri::DefaultAuthority(match *self.routes.borrow() { + http::Routes::Profile(ref routes) => Some((*routes.addr).as_http_authority()), + http::Routes::Endpoint(..) => None, + }) + } +} + +impl std::cmp::PartialEq for HttpSidecar { + fn eq(&self, other: &Self) -> bool { + self.orig_dst == other.orig_dst && self.version == other.version + } +} + +impl std::cmp::Eq for HttpSidecar {} + +impl std::hash::Hash for HttpSidecar { + fn hash(&self, state: &mut H) { + self.orig_dst.hash(state); + self.version.hash(state); + } +} diff --git a/linkerd/proxy/http/src/header_from_target.rs b/linkerd/proxy/http/src/header_from_target.rs index bc811508cd..08c245bf4f 100644 --- a/linkerd/proxy/http/src/header_from_target.rs +++ b/linkerd/proxy/http/src/header_from_target.rs @@ -1,13 +1,14 @@ use crate::HeaderPair; use http::header::{HeaderName, HeaderValue}; -use linkerd_stack::{layer, NewService, Param}; +use linkerd_stack::{layer, ExtractParam, NewService}; use std::task::{Context, Poll}; /// Wraps an HTTP `Service` so that the Stack's `T -typed target` is cloned into /// each request's headers. #[derive(Clone, Debug)] -pub struct NewHeaderFromTarget { +pub struct NewHeaderFromTarget { inner: N, + extract: X, _marker: std::marker::PhantomData H>, } @@ -20,25 +21,32 @@ pub struct HeaderFromTarget { // === impl NewHeaderFromTarget === -impl NewHeaderFromTarget { - pub fn layer() -> impl layer::Layer + Clone { +impl NewHeaderFromTarget { + pub fn layer_via(extract: X) -> impl layer::Layer + Clone { layer::mk(move |inner| Self { inner, + extract: extract.clone(), _marker: std::marker::PhantomData, }) } } -impl NewService for NewHeaderFromTarget +impl NewHeaderFromTarget { + pub fn layer() -> impl layer::Layer + Clone { + Self::layer_via(()) + } +} + +impl NewService for NewHeaderFromTarget where H: Into, - T: Param, + X: ExtractParam, N: NewService, { type Service = HeaderFromTarget; fn new_service(&self, t: T) -> Self::Service { - let HeaderPair(name, value) = t.param().into(); + let HeaderPair(name, value) = self.extract.extract_param(&t).into(); let inner = self.inner.new_service(t); HeaderFromTarget { name, value, inner } } diff --git a/linkerd/service-profiles/src/lib.rs b/linkerd/service-profiles/src/lib.rs index 14ca3166af..ea8209cea9 100644 --- a/linkerd/service-profiles/src/lib.rs +++ b/linkerd/service-profiles/src/lib.rs @@ -48,10 +48,10 @@ pub struct Profile { pub struct LookupAddr(pub Addr); /// A bound logical service address -#[derive(Clone, Hash, Eq, PartialEq)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct LogicalAddr(pub NameAddr); -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct Target { pub addr: NameAddr, pub weight: u32, @@ -254,6 +254,14 @@ impl From for NameAddr { } } +impl std::ops::Deref for LogicalAddr { + type Target = NameAddr; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + // === impl Target === impl fmt::Debug for Target { From f0759efee3f8dc035dfe70acd688f1b0d5706c84 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Mon, 20 Feb 2023 04:46:36 +0000 Subject: [PATCH 03/13] fixup --- linkerd/app/gateway/src/http.rs | 2 -- linkerd/app/gateway/src/http/tests.rs | 10 +--------- linkerd/app/outbound/src/discover.rs | 3 +-- 3 files changed, 2 insertions(+), 13 deletions(-) diff --git a/linkerd/app/gateway/src/http.rs b/linkerd/app/gateway/src/http.rs index e23254d846..07af177e8c 100644 --- a/linkerd/app/gateway/src/http.rs +++ b/linkerd/app/gateway/src/http.rs @@ -227,8 +227,6 @@ impl svc::Param> for Target { } } -// Implement PartialEq, Eq, and Hash for Target, ignoring the watch. - impl PartialEq for Target { fn eq(&self, other: &Self) -> bool { self.addr == other.addr && self.version == other.version diff --git a/linkerd/app/gateway/src/http/tests.rs b/linkerd/app/gateway/src/http/tests.rs index f70014e7cd..5c90917b20 100644 --- a/linkerd/app/gateway/src/http/tests.rs +++ b/linkerd/app/gateway/src/http/tests.rs @@ -96,15 +96,7 @@ async fn upgraded_request_remains_relative_form() { impl svc::Param>> for Target { fn param(&self) -> Option> { - Some( - linkerd_app_test::profile::only(profiles::Profile { - addr: Some(profiles::LogicalAddr( - "web.test.example.com:80".parse().unwrap(), - )), - ..profiles::Profile::default() - }) - .into(), - ) + svc::Param::>::param(self).map(Into::into) } } diff --git a/linkerd/app/outbound/src/discover.rs b/linkerd/app/outbound/src/discover.rs index 097b547417..aeca8a8ccb 100644 --- a/linkerd/app/outbound/src/discover.rs +++ b/linkerd/app/outbound/src/discover.rs @@ -90,8 +90,7 @@ impl svc::Param> for Discovery { impl svc::Param>> for Discovery { fn param(&self) -> Option> { - let p = self.profile.clone()?; - Some(p.into()) + self.profile.clone().map(Into::into) } } From 67c9b698a9975716af9c7ab68e333f72fb7511cb Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Mon, 20 Feb 2023 17:38:43 +0000 Subject: [PATCH 04/13] comments/consistency --- linkerd/app/outbound/src/http/logical.rs | 11 ++++++----- linkerd/app/outbound/src/sidecar.rs | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/linkerd/app/outbound/src/http/logical.rs b/linkerd/app/outbound/src/http/logical.rs index 470a9f09db..6d1b826ab2 100644 --- a/linkerd/app/outbound/src/http/logical.rs +++ b/linkerd/app/outbound/src/http/logical.rs @@ -17,13 +17,16 @@ mod profile; pub use self::profile::Routes as ProfileRoutes; +/// Indicates the address used for logical routing. #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct LogicalAddr(pub Addr); +/// Configures the flavor of HTTP routing. #[derive(Clone, Debug, PartialEq, Eq)] pub enum Routes { Profile(profile::Routes), - // XXX Remove this variant when policy routes are added. + + // TODO(ver) Remove this variant when policy routes are fully wired up. Endpoint(Remote, Metadata), } @@ -49,7 +52,8 @@ pub struct LogicalError { #[derive(Clone, Debug, PartialEq, Eq)] enum RouterParams { Profile(profile::Params), - // XXX Remove this variant when policy routes are added. + + // TODO(ver) Remove this variant when policy routes are fully wired up. Endpoint(Remote, Metadata, T), } @@ -65,9 +69,6 @@ type Distribution = distribute::Distribution>; #[derive(Clone, Debug)] struct CanonicalDstHeader(NameAddr); -#[derive(Clone, Debug)] -struct Router {} - // === impl Outbound === impl Outbound { diff --git a/linkerd/app/outbound/src/sidecar.rs b/linkerd/app/outbound/src/sidecar.rs index da8fe88d38..bd8d065f06 100644 --- a/linkerd/app/outbound/src/sidecar.rs +++ b/linkerd/app/outbound/src/sidecar.rs @@ -195,8 +195,8 @@ impl svc::Param for HttpSidecar { impl svc::Param for HttpSidecar { fn param(&self) -> http::LogicalAddr { http::LogicalAddr(match *self.routes.borrow() { - http::Routes::Endpoint(Remote(ServerAddr(addr)), ..) => addr.into(), http::Routes::Profile(ref routes) => routes.addr.0.clone().into(), + http::Routes::Endpoint(Remote(ServerAddr(addr)), ..) => addr.into(), }) } } From fd04961b20f14487abccd178938b5564384caa2b Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Mon, 20 Feb 2023 22:08:36 +0000 Subject: [PATCH 05/13] Simplify type names/reexports --- linkerd/app/gateway/src/http.rs | 2 +- linkerd/app/outbound/src/http.rs | 2 +- linkerd/app/outbound/src/http/logical.rs | 4 +--- linkerd/app/outbound/src/ingress.rs | 2 +- linkerd/app/outbound/src/sidecar.rs | 2 +- 5 files changed, 5 insertions(+), 7 deletions(-) diff --git a/linkerd/app/gateway/src/http.rs b/linkerd/app/gateway/src/http.rs index 07af177e8c..0db5a4cf7d 100644 --- a/linkerd/app/gateway/src/http.rs +++ b/linkerd/app/gateway/src/http.rs @@ -156,7 +156,7 @@ impl Gateway { fn mk_route(profile: &profiles::Profile) -> Option { if let Some(addr) = profile.addr.clone() { return Some(outbound::http::Routes::Profile( - outbound::http::ProfileRoutes { + outbound::http::profile::Routes { addr, routes: profile.http_routes.clone(), targets: profile.targets.clone(), diff --git a/linkerd/app/outbound/src/http.rs b/linkerd/app/outbound/src/http.rs index a476396723..10575b8345 100644 --- a/linkerd/app/outbound/src/http.rs +++ b/linkerd/app/outbound/src/http.rs @@ -25,7 +25,7 @@ mod retry; mod server; mod strip_proxy_error; -pub use self::logical::{LogicalAddr, ProfileRoutes, Routes}; +pub use self::logical::{profile, LogicalAddr, Routes}; pub(crate) use self::require_id_header::IdentityRequired; pub use linkerd_app_core::proxy::http::{self as http, *}; diff --git a/linkerd/app/outbound/src/http/logical.rs b/linkerd/app/outbound/src/http/logical.rs index 6d1b826ab2..e48877b1d5 100644 --- a/linkerd/app/outbound/src/http/logical.rs +++ b/linkerd/app/outbound/src/http/logical.rs @@ -13,9 +13,7 @@ use linkerd_distribute as distribute; use std::{fmt::Debug, hash::Hash}; use tokio::sync::watch; -mod profile; - -pub use self::profile::Routes as ProfileRoutes; +pub mod profile; /// Indicates the address used for logical routing. #[derive(Clone, Debug, PartialEq, Eq, Hash)] diff --git a/linkerd/app/outbound/src/ingress.rs b/linkerd/app/outbound/src/ingress.rs index 5dd8c6a764..8c59af7f6d 100644 --- a/linkerd/app/outbound/src/ingress.rs +++ b/linkerd/app/outbound/src/ingress.rs @@ -353,7 +353,7 @@ fn mk_route( }: &profiles::Profile, ) -> Option { if let Some(addr) = addr.clone() { - return Some(http::Routes::Profile(http::ProfileRoutes { + return Some(http::Routes::Profile(http::profile::Routes { addr, routes: http_routes.clone(), targets: targets.clone(), diff --git a/linkerd/app/outbound/src/sidecar.rs b/linkerd/app/outbound/src/sidecar.rs index bd8d065f06..9f600be9cd 100644 --- a/linkerd/app/outbound/src/sidecar.rs +++ b/linkerd/app/outbound/src/sidecar.rs @@ -63,7 +63,7 @@ impl Outbound<()> { Some(profile) => { http::spawn_routes(profile.into(), move |profile: &profiles::Profile| { if let Some(addr) = profile.addr.clone() { - return http::Routes::Profile(http::ProfileRoutes { + return http::Routes::Profile(http::profile::Routes { addr, routes: profile.http_routes.clone(), targets: profile.targets.clone(), From e753d4202090662f6b6e57c5cbc4d38ef6708df6 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Mon, 20 Feb 2023 23:02:22 +0000 Subject: [PATCH 06/13] more setup --- linkerd/app/outbound/src/http/logical.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/linkerd/app/outbound/src/http/logical.rs b/linkerd/app/outbound/src/http/logical.rs index e48877b1d5..d9fd042331 100644 --- a/linkerd/app/outbound/src/http/logical.rs +++ b/linkerd/app/outbound/src/http/logical.rs @@ -22,8 +22,10 @@ pub struct LogicalAddr(pub Addr); /// Configures the flavor of HTTP routing. #[derive(Clone, Debug, PartialEq, Eq)] pub enum Routes { + /// Service profile routes. Profile(profile::Routes), + /// Fallback endpoint forwarding. // TODO(ver) Remove this variant when policy routes are fully wired up. Endpoint(Remote, Metadata), } @@ -148,7 +150,8 @@ where S::Future: Send, { svc::layer::mk(move |concrete: N| { - svc::stack(concrete.clone()) + let profile = svc::stack(concrete.clone()).push(profile::layer(metrics.clone())); + svc::stack(concrete) .push_switch( |prms: RouterParams| { Ok::<_, Infallible>(match prms { @@ -160,9 +163,7 @@ where RouterParams::Profile(ps) => svc::Either::B(ps), }) }, - svc::stack(concrete) - .push(profile::layer(metrics.clone())) - .into_inner(), + profile.into_inner(), ) .push(svc::NewMapErr::layer_from_target::()) .push_on_service(svc::MapErr::layer_boxed()) From 9f9b42fa3e08618043da6fa38f58c71e47b20372 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 21 Feb 2023 01:22:37 +0000 Subject: [PATCH 07/13] forward-port cleanup --- linkerd/app/outbound/src/http/logical.rs | 108 ++++++++++++----------- 1 file changed, 57 insertions(+), 51 deletions(-) diff --git a/linkerd/app/outbound/src/http/logical.rs b/linkerd/app/outbound/src/http/logical.rs index d9fd042331..7e374efa98 100644 --- a/linkerd/app/outbound/src/http/logical.rs +++ b/linkerd/app/outbound/src/http/logical.rs @@ -112,7 +112,7 @@ impl Outbound { let watch = concrete // Share the concrete stack with each router stack. .lift_new() - .push_on_service(router_layer(rt.metrics.proxy.clone())) + .push_on_service(RouterParams::layer(rt.metrics.proxy.clone())) // Rebuild the inner router stack every time the watch changes. .push(svc::NewSpawnWatch::::layer_into::>()); @@ -123,57 +123,63 @@ impl Outbound { } } -fn router_layer( - metrics: metrics::Proxy, -) -> impl svc::Layer< - N, - Service = svc::ArcNewService< - RouterParams, - impl svc::Service< - http::Request, - Response = http::Response, - Error = Error, - Future = impl Send, - > + Clone, - >, -> + Clone +// === impl RouterParams === + +impl RouterParams where T: Clone + Debug + Eq + Hash + Send + Sync + 'static, - N: svc::NewService, Service = S>, - N: Clone + Send + Sync + 'static, - S: svc::Service< - http::Request, - Response = http::Response, - Error = Error, - >, - S: Clone + Send + Sync + 'static, - S::Future: Send, { - svc::layer::mk(move |concrete: N| { - let profile = svc::stack(concrete.clone()).push(profile::layer(metrics.clone())); - svc::stack(concrete) - .push_switch( - |prms: RouterParams| { - Ok::<_, Infallible>(match prms { - RouterParams::Endpoint(remote, meta, parent) => svc::Either::A(Concrete { - target: concrete::Dispatch::Forward(remote, meta), - authority: None, - parent, - }), - RouterParams::Profile(ps) => svc::Either::B(ps), - }) - }, - profile.into_inner(), - ) - .push(svc::NewMapErr::layer_from_target::()) - .push_on_service(svc::MapErr::layer_boxed()) - .push(svc::ArcNewService::layer()) - .into_inner() - }) + fn layer( + metrics: metrics::Proxy, + ) -> impl svc::Layer< + N, + Service = svc::ArcNewService< + RouterParams, + impl svc::Service< + http::Request, + Response = http::Response, + Error = Error, + Future = impl Send, + > + Clone, + >, + > + Clone + where + N: svc::NewService, Service = S>, + N: Clone + Send + Sync + 'static, + S: svc::Service< + http::Request, + Response = http::Response, + Error = Error, + >, + S: Clone + Send + Sync + 'static, + S::Future: Send, + { + svc::layer::mk(move |concrete: N| { + let profile = svc::stack(concrete.clone()).push(profile::layer(metrics.clone())); + svc::stack(concrete) + .push_switch( + |prms: RouterParams| { + Ok::<_, Infallible>(match prms { + RouterParams::Endpoint(remote, meta, parent) => { + svc::Either::A(Concrete { + target: concrete::Dispatch::Forward(remote, meta), + authority: None, + parent, + }) + } + RouterParams::Profile(profile) => svc::Either::B(profile), + }) + }, + profile.into_inner(), + ) + .push(svc::NewMapErr::layer_from_target::()) + .push_on_service(svc::MapErr::layer_boxed()) + .push(svc::ArcNewService::layer()) + .into_inner() + }) + } } -// === impl RouterParams === - impl From<(Routes, T)> for RouterParams where T: Eq + Hash + Clone + Debug, @@ -191,13 +197,13 @@ where T: Clone + Debug + Eq + Hash, { fn param(&self) -> LogicalAddr { - LogicalAddr(match self { + match self { Self::Profile(ref p) => { let profile::LogicalAddr(addr) = p.param(); - addr.into() + LogicalAddr(addr.into()) } - Self::Endpoint(Remote(ServerAddr(ref addr)), ..) => (*addr).into(), - }) + Self::Endpoint(Remote(ServerAddr(ref addr)), ..) => LogicalAddr((*addr).into()), + } } } From 339208c0f09bba4d97fb8391a7faf29c6a15f492 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 21 Feb 2023 01:46:29 +0000 Subject: [PATCH 08/13] hang profile layers off param types --- linkerd/app/outbound/src/http/logical.rs | 3 +- .../app/outbound/src/http/logical/profile.rs | 210 +++++++++--------- 2 files changed, 110 insertions(+), 103 deletions(-) diff --git a/linkerd/app/outbound/src/http/logical.rs b/linkerd/app/outbound/src/http/logical.rs index 7e374efa98..c3f5d03d93 100644 --- a/linkerd/app/outbound/src/http/logical.rs +++ b/linkerd/app/outbound/src/http/logical.rs @@ -155,7 +155,8 @@ where S::Future: Send, { svc::layer::mk(move |concrete: N| { - let profile = svc::stack(concrete.clone()).push(profile::layer(metrics.clone())); + let profile = + svc::stack(concrete.clone()).push(profile::Params::layer(metrics.clone())); svc::stack(concrete) .push_switch( |prms: RouterParams| { diff --git a/linkerd/app/outbound/src/http/logical/profile.rs b/linkerd/app/outbound/src/http/logical/profile.rs index ca974d8fa2..17bce6801e 100644 --- a/linkerd/app/outbound/src/http/logical/profile.rs +++ b/linkerd/app/outbound/src/http/logical/profile.rs @@ -38,116 +38,54 @@ pub(super) struct RouteParams { distribution: Distribution, } +// === impl Params === + // Wraps a `NewService`--instantiated once per logical target--that caches a set // of concrete services so that, as the watch provides new `Params`, we can // reuse inner services. -pub(super) fn layer( - metrics: metrics::Proxy, -) -> impl svc::Layer< - N, - Service = svc::ArcNewService< - Params, - impl svc::Service< - http::Request, - Response = http::Response, - Error = Error, - Future = impl Send, - > + Clone, - >, -> + Clone -where - T: Clone + Debug + Eq + Hash + Send + Sync + 'static, - N: svc::NewService, Service = S> + Clone + Send + Sync + 'static, - S: svc::Service< - http::Request, - Response = http::Response, - Error = Error, - >, - S: Clone + Send + Sync + 'static, - S::Future: Send, -{ - svc::layer::mk(move |inner| { - svc::stack(inner) - // Each `RouteParams` provides a `Distribution` that is used to - // choose a concrete service for a given route. - .push(BackendCache::layer()) - // Lazily cache a service for each `RouteParams` - // returned from the `SelectRoute` impl. - .push_on_service(route_layer(metrics.clone())) - .push(svc::NewOneshotRoute::, _, _>::layer_cached()) - .push(svc::ArcNewService::layer()) - .into_inner() - }) -} - -fn route_layer( - metrics: metrics::Proxy, -) -> impl svc::Layer< - N, - Service = svc::ArcNewService< - RouteParams, - impl svc::Service< - http::Request, - Response = http::Response, - Error = Error, - Future = impl Send, - > + Clone, - >, -> + Clone +impl Params where T: Clone + Debug + Eq + Hash + Send + Sync + 'static, - N: svc::NewService, Service = S> + Clone + Send + Sync + 'static, - S: svc::Service< - http::Request, - Response = http::Response, - Error = Error, - >, - S: Clone + Send + Sync + 'static, - S::Future: Send, { - svc::layer::mk(move |inner| { - svc::stack(inner) - .check_new_service::, http::Request>() - .push_on_service( - svc::layers() - .push(http::BoxRequest::layer()) - // The router does not take the backend's availability into - // consideration, so we must eagerly fail requests to prevent - // leaking tasks onto the runtime. - .push(svc::LoadShed::layer()), - ) - .push(http::insert::NewInsert::::layer()) - .push( - metrics - .http_profile_route_actual - .to_layer::>(), - ) - // Depending on whether or not the request can be - // retried, it may have one of two `Body` types. This - // layer unifies any `Body` type into `BoxBody`. - .push_on_service(http::BoxRequest::erased()) - // Sets an optional retry policy. - .push(retry::layer(metrics.http_profile_route_retry.clone())) - // Sets an optional request timeout. - .push(http::NewTimeout::layer()) - // Records per-route metrics. - .push( - metrics - .http_profile_route - .to_layer::>(), - ) - // Sets the per-route response classifier as a request - // extension. - .push(classify::NewClassify::layer()) - // TODO(ver) .push(svc::NewMapErr::layer_from_target::()) - .push_on_service(http::BoxResponse::layer()) - .push(svc::ArcNewService::layer()) - .into_inner() - }) + pub(super) fn layer( + metrics: metrics::Proxy, + ) -> impl svc::Layer< + N, + Service = svc::ArcNewService< + Self, + impl svc::Service< + http::Request, + Response = http::Response, + Error = Error, + Future = impl Send, + > + Clone, + >, + > + Clone + where + N: svc::NewService, Service = S> + Clone + Send + Sync + 'static, + S: svc::Service< + http::Request, + Response = http::Response, + Error = Error, + >, + S: Clone + Send + Sync + 'static, + S::Future: Send, + { + svc::layer::mk(move |inner| { + svc::stack(inner) + // Each `RouteParams` provides a `Distribution` that is used to + // choose a concrete service for a given route. + .push(BackendCache::layer()) + // Lazily cache a service for each `RouteParams` + // returned from the `SelectRoute` impl. + .push_on_service(RouteParams::layer(metrics.clone())) + .push(svc::NewOneshotRoute::, _, _>::layer_cached()) + .push(svc::ArcNewService::layer()) + .into_inner() + }) + } } -// === impl Params === - impl From<(Routes, T)> for Params where T: Eq + Hash + Clone + Debug, @@ -265,6 +203,74 @@ where // === impl RouteParams === +impl RouteParams { + fn layer( + metrics: metrics::Proxy, + ) -> impl svc::Layer< + N, + Service = svc::ArcNewService< + Self, + impl svc::Service< + http::Request, + Response = http::Response, + Error = Error, + Future = impl Send, + > + Clone, + >, + > + Clone + where + T: Clone + Debug + Eq + Hash + Send + Sync + 'static, + N: svc::NewService, Service = S> + Clone + Send + Sync + 'static, + S: svc::Service< + http::Request, + Response = http::Response, + Error = Error, + >, + S: Clone + Send + Sync + 'static, + S::Future: Send, + { + svc::layer::mk(move |inner| { + svc::stack(inner) + .check_new_service::, http::Request>() + .push_on_service( + svc::layers() + .push(http::BoxRequest::layer()) + // The router does not take the backend's availability into + // consideration, so we must eagerly fail requests to prevent + // leaking tasks onto the runtime. + .push(svc::LoadShed::layer()), + ) + .push(http::insert::NewInsert::::layer()) + .push( + metrics + .http_profile_route_actual + .to_layer::>(), + ) + // Depending on whether or not the request can be + // retried, it may have one of two `Body` types. This + // layer unifies any `Body` type into `BoxBody`. + .push_on_service(http::BoxRequest::erased()) + // Sets an optional retry policy. + .push(retry::layer(metrics.http_profile_route_retry.clone())) + // Sets an optional request timeout. + .push(http::NewTimeout::layer()) + // Records per-route metrics. + .push( + metrics + .http_profile_route + .to_layer::>(), + ) + // Sets the per-route response classifier as a request + // extension. + .push(classify::NewClassify::layer()) + // TODO(ver) .push(svc::NewMapErr::layer_from_target::()) + .push_on_service(http::BoxResponse::layer()) + .push(svc::ArcNewService::layer()) + .into_inner() + }) + } +} + impl svc::Param for RouteParams { fn param(&self) -> LogicalAddr { self.addr.clone() From efa16da7b5c7df8cdebfec629f31de46310c2309 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 21 Feb 2023 23:29:29 +0000 Subject: [PATCH 09/13] fiuxp --- linkerd/app/outbound/src/ingress.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/linkerd/app/outbound/src/ingress.rs b/linkerd/app/outbound/src/ingress.rs index 8c59af7f6d..9c3762ff96 100644 --- a/linkerd/app/outbound/src/ingress.rs +++ b/linkerd/app/outbound/src/ingress.rs @@ -367,8 +367,6 @@ fn mk_route( None } -mod foo {} - // === impl Logical === impl std::cmp::PartialEq for Logical { From 7843709e4f2d08627f71b8d3a5c51a522034d380 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 21 Feb 2023 23:29:55 +0000 Subject: [PATCH 10/13] -todo --- linkerd/app/outbound/src/http/logical/profile.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/linkerd/app/outbound/src/http/logical/profile.rs b/linkerd/app/outbound/src/http/logical/profile.rs index 17bce6801e..74de0d398c 100644 --- a/linkerd/app/outbound/src/http/logical/profile.rs +++ b/linkerd/app/outbound/src/http/logical/profile.rs @@ -263,7 +263,6 @@ impl RouteParams { // Sets the per-route response classifier as a request // extension. .push(classify::NewClassify::layer()) - // TODO(ver) .push(svc::NewMapErr::layer_from_target::()) .push_on_service(http::BoxResponse::layer()) .push(svc::ArcNewService::layer()) .into_inner() From 42c256b975c78772fcb8db96caa0863eab03ce5d Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 21 Feb 2023 23:30:24 +0000 Subject: [PATCH 11/13] move comment --- linkerd/app/outbound/src/http/logical/profile.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/linkerd/app/outbound/src/http/logical/profile.rs b/linkerd/app/outbound/src/http/logical/profile.rs index 74de0d398c..334362ce32 100644 --- a/linkerd/app/outbound/src/http/logical/profile.rs +++ b/linkerd/app/outbound/src/http/logical/profile.rs @@ -40,13 +40,13 @@ pub(super) struct RouteParams { // === impl Params === -// Wraps a `NewService`--instantiated once per logical target--that caches a set -// of concrete services so that, as the watch provides new `Params`, we can -// reuse inner services. impl Params where T: Clone + Debug + Eq + Hash + Send + Sync + 'static, { + /// Wraps a `NewService`--instantiated once per logical target--that caches + /// a set of concrete services so that, as the watch provides new `Params`, + /// we can reuse inner services. pub(super) fn layer( metrics: metrics::Proxy, ) -> impl svc::Layer< From 66c33b18b0e3eeb3e9e094113a2a5b9bb64f552d Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Wed, 22 Feb 2023 00:51:50 +0000 Subject: [PATCH 12/13] fix unhandled cancelation in spawned route watch --- linkerd/app/outbound/src/http.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/linkerd/app/outbound/src/http.rs b/linkerd/app/outbound/src/http.rs index 10575b8345..5954646886 100644 --- a/linkerd/app/outbound/src/http.rs +++ b/linkerd/app/outbound/src/http.rs @@ -46,9 +46,16 @@ pub fn spawn_routes( tokio::select! { biased; _ = tx.closed() => return, - _ = profile_rx.changed() => { + res = profile_rx.changed() => { + if res.is_err() { + // Drop the `tx` sender when the profile sender is + // dropped. + return; + } + let routes = (mk)(&*profile_rx.borrow()); if tx.send(routes).is_err() { + // Drop the `tx` sender when all of its receivers are dropped. return; } } From f2254c9c9b85a93971b5a1539cda43f4833474e1 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Wed, 22 Feb 2023 01:08:26 +0000 Subject: [PATCH 13/13] simplify spawn_routes --- linkerd/app/gateway/src/http.rs | 15 ++++-------- linkerd/app/outbound/src/http.rs | 33 +++++++++++++------------- linkerd/app/outbound/src/ingress.rs | 29 ++++++++--------------- linkerd/app/outbound/src/sidecar.rs | 36 +++++++++++++++-------------- 4 files changed, 50 insertions(+), 63 deletions(-) diff --git a/linkerd/app/gateway/src/http.rs b/linkerd/app/gateway/src/http.rs index 0db5a4cf7d..10ae4a66cf 100644 --- a/linkerd/app/gateway/src/http.rs +++ b/linkerd/app/gateway/src/http.rs @@ -118,17 +118,12 @@ impl Gateway { // discovery information. .push_filter(|(_, parent): (_, T)| -> Result<_, GatewayDomainInvalid> { let routes = { - let profile = + let mut profile = svc::Param::>>::param(&parent) .ok_or(GatewayDomainInvalid)?; - - let mut route = mk_route(&*profile.borrow()).ok_or(GatewayDomainInvalid)?; - outbound::http::spawn_routes(profile, move |p: &profiles::Profile| { - if let Some(r) = mk_route(p) { - route = r; - } - route.clone() - }) + let init = + mk_routes(&*profile.borrow_and_update()).ok_or(GatewayDomainInvalid)?; + outbound::http::spawn_routes(profile, init, mk_routes) }; Ok(Target { @@ -153,7 +148,7 @@ impl Gateway { } } -fn mk_route(profile: &profiles::Profile) -> Option { +fn mk_routes(profile: &profiles::Profile) -> Option { if let Some(addr) = profile.addr.clone() { return Some(outbound::http::Routes::Profile( outbound::http::profile::Routes { diff --git a/linkerd/app/outbound/src/http.rs b/linkerd/app/outbound/src/http.rs index 5954646886..e223a9ff2a 100644 --- a/linkerd/app/outbound/src/http.rs +++ b/linkerd/app/outbound/src/http.rs @@ -34,30 +34,29 @@ pub struct Http(T); pub fn spawn_routes( mut profile_rx: watch::Receiver, - mut mk: impl FnMut(&profiles::Profile) -> Routes + Send + Sync + 'static, + init: Routes, + mut mk: impl FnMut(&profiles::Profile) -> Option + Send + Sync + 'static, ) -> watch::Receiver { - let (tx, rx) = { - let routes = (mk)(&*profile_rx.borrow()); - watch::channel(routes) - }; + let (tx, rx) = watch::channel(init); tokio::spawn(async move { loop { - tokio::select! { + let res = tokio::select! { biased; _ = tx.closed() => return, - res = profile_rx.changed() => { - if res.is_err() { - // Drop the `tx` sender when the profile sender is - // dropped. - return; - } + res = profile_rx.changed() => res, + }; - let routes = (mk)(&*profile_rx.borrow()); - if tx.send(routes).is_err() { - // Drop the `tx` sender when all of its receivers are dropped. - return; - } + if res.is_err() { + // Drop the `tx` sender when the profile sender is + // dropped. + return; + } + + if let Some(routes) = (mk)(&*profile_rx.borrow_and_update()) { + if tx.send(routes).is_err() { + // Drop the `tx` sender when all of its receivers are dropped. + return; } } } diff --git a/linkerd/app/outbound/src/ingress.rs b/linkerd/app/outbound/src/ingress.rs index 9c3762ff96..6de5e3bbda 100644 --- a/linkerd/app/outbound/src/ingress.rs +++ b/linkerd/app/outbound/src/ingress.rs @@ -288,15 +288,12 @@ impl TryFrom>> for Http { svc::Param::>::param(&parent).map(watch::Receiver::from), ) { (RequestTarget::Named(addr), profile) => { - let profile = profile.ok_or_else(|| ProfileRequired(addr.clone()))?; - let mut route = - mk_route(&*profile.borrow()).ok_or_else(|| ProfileRequired(addr.clone()))?; - let routes = http::spawn_routes(profile, move |p: &profiles::Profile| { - if let Some(r) = mk_route(p) { - route = r; - } - route.clone() - }); + let routes = { + let mut profile = profile.ok_or_else(|| ProfileRequired(addr.clone()))?; + let init = mk_routes(&*profile.borrow_and_update()) + .ok_or_else(|| ProfileRequired(addr.clone()))?; + http::spawn_routes(profile, init, mk_routes) + }; Ok(Http { version: (*parent).param(), @@ -307,16 +304,10 @@ impl TryFrom>> for Http { }) } - (RequestTarget::Orig(OrigDstAddr(addr)), Some(profile)) => { - let route = mk_route(&*profile.borrow()); + (RequestTarget::Orig(OrigDstAddr(addr)), Some(mut profile)) => { + let route = mk_routes(&*profile.borrow_and_update()); let routes = if let Some(route) = route { - let mut route = route; - http::spawn_routes(profile, move |p: &profiles::Profile| { - if let Some(r) = mk_route(p) { - route = r; - } - route.clone() - }) + http::spawn_routes(profile, route, mk_routes) } else { http::spawn_routes_default(Remote(ServerAddr(addr))) }; @@ -343,7 +334,7 @@ impl TryFrom>> for Http { } } -fn mk_route( +fn mk_routes( profiles::Profile { addr, endpoint, diff --git a/linkerd/app/outbound/src/sidecar.rs b/linkerd/app/outbound/src/sidecar.rs index 9f600be9cd..1e6cac4c83 100644 --- a/linkerd/app/outbound/src/sidecar.rs +++ b/linkerd/app/outbound/src/sidecar.rs @@ -59,25 +59,27 @@ impl Outbound<()> { .push_map_target(|parent: protocol::Http| { let version = svc::Param::::param(&parent); let orig_dst = (*parent).orig_dst; + let mk_routes = move |profile: &profiles::Profile| { + if let Some(addr) = profile.addr.clone() { + return http::Routes::Profile(http::profile::Routes { + addr, + routes: profile.http_routes.clone(), + targets: profile.targets.clone(), + }); + } + + if let Some((addr, metadata)) = profile.endpoint.clone() { + return http::Routes::Endpoint(Remote(ServerAddr(addr)), metadata); + } + + http::Routes::Endpoint(Remote(ServerAddr(*orig_dst)), Default::default()) + }; let routes = match (*parent).profile.clone() { Some(profile) => { - http::spawn_routes(profile.into(), move |profile: &profiles::Profile| { - if let Some(addr) = profile.addr.clone() { - return http::Routes::Profile(http::profile::Routes { - addr, - routes: profile.http_routes.clone(), - targets: profile.targets.clone(), - }); - } - - if let Some((addr, metadata)) = profile.endpoint.clone() { - return http::Routes::Endpoint(Remote(ServerAddr(addr)), metadata); - } - - http::Routes::Endpoint( - Remote(ServerAddr(*orig_dst)), - Default::default(), - ) + let mut rx = watch::Receiver::from(profile); + let init = mk_routes(&*rx.borrow_and_update()); + http::spawn_routes(rx, init, move |profile: &profiles::Profile| { + Some(mk_routes(profile)) }) } None => http::spawn_routes_default(Remote(ServerAddr(*orig_dst))),