Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

inbound: connections wait for ServerPolicy discovery #2186

Merged
merged 28 commits into from
Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
8934ce0
make server policy store async
hawkw Jan 25, 2023
91a8f40
worky (direct stack is gross)
hawkw Jan 25, 2023
31d7622
Update linkerd/app/inbound/src/policy/store.rs
hawkw Jan 26, 2023
c561eed
always use `Error` as the error type
hawkw Jan 26, 2023
f60833f
clean up direct stack a bit
hawkw Jan 26, 2023
228b1f7
misc cleanup
hawkw Jan 26, 2023
bf4b3de
Merge branch 'main' into eliza/no-default-policy
hawkw Jan 31, 2023
48375bf
Merge branch 'main' into eliza/no-default-policy
hawkw Feb 1, 2023
f5564e7
put ready back
hawkw Feb 22, 2023
5752a5b
pull GetPolicy bounds to trait-level
hawkw Feb 22, 2023
cdeeaba
Merge branch 'main' into eliza/no-default-policy
hawkw Feb 22, 2023
2c34acf
GetPolicy can be passed as `impl GetPolicy`
hawkw Feb 22, 2023
3a2e376
fixups
olix0r Feb 22, 2023
1252ca7
separately newtype `policy::LookupAddr`s
hawkw Feb 22, 2023
c910ca9
disembreak gateway tests
hawkw Feb 23, 2023
635afad
Revert "disembreak gateway tests"
hawkw Feb 23, 2023
2e82c61
Revert "separately newtype `policy::LookupAddr`s"
hawkw Feb 23, 2023
9a54acc
review feedback
hawkw Feb 23, 2023
2476c77
blanket impl GetPolicy for Service
hawkw Feb 23, 2023
f481cd5
Update linkerd/app/inbound/src/policy.rs
olix0r Feb 23, 2023
bc5d93e
Update linkerd/app/inbound/src/policy/store.rs
olix0r Feb 23, 2023
973b9e6
cascade through `lift_new_with_target`
hawkw Feb 23, 2023
0006083
diff ensmalleration
hawkw Feb 23, 2023
35b08e5
-check
hawkw Feb 23, 2023
01f56be
further diff ensmalleration
hawkw Feb 23, 2023
744feff
ensmallerate direct stack diff a bit more
hawkw Feb 23, 2023
604bbf7
implement Service for Store (instead of GetProfile)
olix0r Feb 23, 2023
c3deef4
simplify idlecache api change
olix0r Feb 23, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,7 @@ dependencies = [
"linkerd2-proxy-api",
"once_cell",
"parking_lot",
"pin-project",
"thiserror",
"tokio",
"tokio-test",
Expand Down
10 changes: 7 additions & 3 deletions linkerd/app/admin/src/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,15 @@ struct Rescue;
// === impl Config ===

impl Config {
/// Builds the admin endpoint server.
///
/// This method is asynchronous, as it must discover a `ServerPolicy` for
/// the admin port.
#[allow(clippy::too_many_arguments)]
pub fn build<B, R>(
pub async fn build<B, R>(
self,
bind: B,
policy: impl inbound::policy::GetPolicy,
policy: &impl inbound::policy::GetPolicy,
identity: identity::Server,
report: R,
metrics: inbound::Metrics,
Expand All @@ -89,7 +93,7 @@ impl Config {
let (listen_addr, listen) = bind.bind(&self.server)?;

// Get the policy for the admin server.
let policy = policy.get_policy(OrigDstAddr(listen_addr.into()));
let policy = policy.get_policy(OrigDstAddr(listen_addr.into())).await?;

let (ready, latch) = crate::server::Readiness::new();
let admin = crate::server::Admin::new(report, ready, shutdown, trace);
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/inbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ linkerd-tonic-watch = { path = "../../tonic-watch" }
linkerd2-proxy-api = { version = "0.8", features = ["inbound"] }
once_cell = "1"
parking_lot = "0.12"
pin-project = "1"
thiserror = "1"
tokio = { version = "1", features = ["sync"] }
tonic = { version = "0.8", default-features = false }
Expand Down
24 changes: 15 additions & 9 deletions linkerd/app/inbound/src/accept.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
policy::{AllowPolicy, GetPolicy},
policy::{self, AllowPolicy, GetPolicy},
Inbound,
};
use linkerd_app_core::{
Expand All @@ -26,7 +26,7 @@ impl<N> Inbound<N> {
pub(crate) fn push_accept<T, I, NSvc, D, DSvc>(
self,
proxy_port: u16,
policies: impl GetPolicy + Clone + Send + Sync + 'static,
policies: impl GetPolicy,
direct: D,
) -> Inbound<svc::ArcNewTcp<T, I>>
where
Expand All @@ -46,6 +46,18 @@ impl<N> Inbound<N> {
{
self.map_stack(|cfg, rt, accept| {
accept
.push_on_service(svc::MapErr::layer_boxed())
.push_map_target(|(policy, t): (AllowPolicy, T)| {
tracing::debug!(policy = ?&*policy.borrow(), "Accepted");
Accept {
client_addr: t.param(),
orig_dst_addr: t.param(),
policy,
}
})
.push(policy::Discover::layer(policies))
.into_new_service()
.check_new_service::<T, I>()
.push_switch(
// Switch to the `direct` stack when a connection's original destination is the
// proxy's inbound port. Otherwise, check that connections are allowed on the
Expand All @@ -56,13 +68,7 @@ impl<N> Inbound<N> {
return Ok(svc::Either::B(t));
}

let policy = policies.get_policy(addr);
tracing::debug!(policy = ?&*policy.borrow(), "Accepted");
Ok(svc::Either::A(Accept {
client_addr: t.param(),
orig_dst_addr: addr,
policy,
}))
Ok(svc::Either::A(t))
},
direct,
)
Expand Down
115 changes: 67 additions & 48 deletions linkerd/app/inbound/src/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ struct TlsParams {
identity: identity::Server,
}

/// A policy discovery target with a transport header and client info.
#[derive(Debug, Clone)]
struct Discover {
header: TransportHeader,
client: ClientInfo,
}

impl<N> Inbound<N> {
/// Builds a stack that handles connections that target the proxy's inbound port
/// (i.e. without an SO_ORIGINAL_DST setting). This port behaves differently from
Expand All @@ -91,7 +98,7 @@ impl<N> Inbound<N> {
/// gateways may need to accept HTTP requests from older proxy versions
pub(crate) fn push_direct<T, I, NSvc, G, GSvc, H, HSvc>(
self,
policies: impl policy::GetPolicy + Clone + Send + Sync + 'static,
policies: impl policy::GetPolicy,
gateway: G,
http: H,
) -> Inbound<svc::ArcNewTcp<T, I>>
Expand Down Expand Up @@ -146,59 +153,50 @@ impl<N> Inbound<N> {
// the header.
.push_switch(Ok::<Local, Infallible>, http)
.push_switch(
{
let policies = policies.clone();
move |(h, client): (TransportHeader, ClientInfo)| -> Result<_, Infallible> {
match h {
|(policy, target): (AllowPolicy, Discover)| -> Result<_, Infallible> {
let Discover {
header:
TransportHeader {
name,
port,
name: None,
protocol,
} => Ok(svc::Either::A({
// When the transport header targets an alternate port (but does
// not identify an alternate target name), we check the new
// target's policy (rather than the inbound proxy's address).
let addr = (client.local_addr.ip(), port).into();
let policy = policies.get_policy(OrigDstAddr(addr));
match protocol {
None => svc::Either::A(LocalTcp {
server_addr: Remote(ServerAddr(addr)),
client_addr: client.client_addr,
client_id: client.client_id,
policy,
}),
Some(protocol) => {
// When TransportHeader includes the protocol, but does not
// include an alternate name we go through the Inbound HTTP
// stack.
svc::Either::B(LocalHttp {
addr: Remote(ServerAddr(addr)),
policy,
protocol,
client,
})
}
}
})),
},
client,
} = target;

if let Some(name) = name {
// When the transport header provides an alternate target, the
// connection is a gateway connection.
return Ok(svc::Either::B(GatewayTransportHeader {
target: NameAddr::from((name, port)),
protocol,
client,
policy,
}));
}

TransportHeader {
port,
name: Some(name),
// The transport header targets an alternate port (but does
// not identify an alternate target name).
let addr = (client.local_addr.ip(), port).into();
Ok(svc::Either::A(match protocol {
None => svc::Either::A(LocalTcp {
server_addr: Remote(ServerAddr(addr)),
client_addr: client.client_addr,
client_id: client.client_id,
policy,
}),
Some(protocol) => {
// When TransportHeader includes the protocol, but does not
// include an alternate name we go through the Inbound HTTP
// stack.
svc::Either::B(LocalHttp {
addr: Remote(ServerAddr(addr)),
policy,
protocol,
} => Ok(svc::Either::B({
// When the transport header provides an alternate target, the
// connection is a gateway connection. We check the _gateway
// address's_ policy (rather than the target address).
let policy = policies.get_policy(client.local_addr);
GatewayTransportHeader {
target: NameAddr::from((name, port)),
protocol,
client,
policy,
}
})),
client,
})
}
}
}))
},
// HTTP detection is not necessary in this case, since the transport
// header indicates the connection's HTTP version.
Expand All @@ -211,6 +209,9 @@ impl<N> Inbound<N> {
)
.into_inner(),
)
.push(policy::Discover::layer(policies))
.into_new_service()
.push_map_target(|(header, client)| Discover { header, client })
.check_new_service::<(TransportHeader, ClientInfo), _>()
// Use ALPN to determine whether a transport header should be read.
.push(NewTransportHeaderServer::layer(detect_timeout))
Expand Down Expand Up @@ -484,3 +485,21 @@ impl<T> InsertParam<tls::ConditionalServerTls, T> for TlsParams {
(tls, target)
}
}

// === impl Discover ===

impl svc::Param<OrigDstAddr> for Discover {
fn param(&self) -> OrigDstAddr {
if self.header.name.is_none() {
// When the transport header targets an alternate port (but does
// not identify an alternate target name), we check the new
// target's policy (rather than the inbound proxy's address).
return OrigDstAddr((self.client.local_addr.ip(), self.header.port).into());
}

// When the transport header provides an alternate target, the
// connection is a gateway connection. We check the _gateway
// address's_ policy (rather than the target address).
self.client.local_addr
}
}
13 changes: 9 additions & 4 deletions linkerd/app/inbound/src/policy.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
mod api;
mod config;
pub mod defaults;
mod discover;
mod http;
mod store;
mod tcp;

pub(crate) use self::store::Store;
pub use self::{
config::Config,
discover::Discover,
http::{
HttpInvalidPolicy, HttpRouteInvalidRedirect, HttpRouteNotFound, HttpRouteRedirect,
HttpRouteUnauthorized, NewHttpPolicy,
Expand All @@ -20,6 +22,7 @@ use linkerd_app_core::{
metrics::{RouteAuthzLabels, ServerAuthzLabels},
tls,
transport::{ClientAddr, OrigDstAddr, Remote},
Error,
};
use linkerd_idle_cache::Cached;
pub use linkerd_proxy_server_policy::{
Expand All @@ -28,7 +31,7 @@ pub use linkerd_proxy_server_policy::{
http::{filter::Redirection, Route as HttpRoute},
route, Authentication, Authorization, Meta, Protocol, RoutePolicy, ServerPolicy,
};
use std::sync::Arc;
use std::{future::Future, sync::Arc};
use thiserror::Error;
use tokio::sync::watch;

Expand All @@ -38,9 +41,11 @@ pub struct ServerUnauthorized {
server: Arc<Meta>,
}

pub trait GetPolicy {
// Returns the traffic policy configured for the destination address.
fn get_policy(&self, dst: OrigDstAddr) -> AllowPolicy;
/// Returns the traffic policy configured for the destination address.
pub trait GetPolicy: Clone + Send + Sync + 'static {
type Future: Future<Output = Result<AllowPolicy, Error>> + Unpin + Send;

fn get_policy(&self, target: OrigDstAddr) -> Self::Future;
}

#[derive(Clone, Debug, PartialEq, Eq)]
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/inbound/src/policy/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl Config {
dns: dns::Resolver,
metrics: metrics::ControlHttp,
identity: identity::NewClient,
) -> impl GetPolicy + Clone + Send + Sync + 'static {
) -> impl GetPolicy {
match self {
Self::Fixed {
default,
Expand Down
73 changes: 73 additions & 0 deletions linkerd/app/inbound/src/policy/discover.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use super::{AllowPolicy, GetPolicy};
use futures::ready;
use linkerd_app_core::{svc, transport::OrigDstAddr, Error};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

#[derive(Debug, Clone)]
pub struct Discover<G, N> {
get_policy: G,
new_svc: N,
}

#[pin_project::pin_project]
pub struct DiscoverFuture<T, F, N> {
target: Option<T>,
#[pin]
inner: F,
new_svc: N,
}

impl<G: GetPolicy + Clone, N> Discover<G, N> {
pub fn layer(get_policy: G) -> impl svc::layer::Layer<N, Service = Self> + Clone {
svc::layer::mk(move |new_svc| Self {
get_policy: get_policy.clone(),
new_svc,
})
}
}

impl<G: GetPolicy, N, T> svc::Service<T> for Discover<G, N>
where
G: GetPolicy,
N: svc::NewService<(AllowPolicy, T)> + Clone,
T: svc::Param<OrigDstAddr>,
{
type Error = Error;
type Response = N::Service;
type Future = DiscoverFuture<T, G::Future, N>;

fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, target: T) -> Self::Future {
let dst = target.param();
DiscoverFuture {
target: Some(target),
inner: self.get_policy.get_policy(dst),
new_svc: self.new_svc.clone(),
}
}
}

// === impl DiscoverFuture ===

impl<T, F, N, E> Future for DiscoverFuture<T, F, N>
where
F: Future<Output = Result<AllowPolicy, E>>,
N: svc::NewService<(AllowPolicy, T)>,
{
type Output = Result<N::Service, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let policy = ready!(this.inner.poll(cx))?;
let svc = this
.new_svc
.new_service((policy, this.target.take().expect("polled after ready")));
Poll::Ready(Ok(svc))
}
}
Loading