Skip to content

Commit c186d88

Browse files
authored
inbound: connections wait for ServerPolicy discovery (#2186)
Currently, when the inbound proxy gets a connection to a port it has not previously configured, it starts doing ServerPolicy discovery for that port but then uses the default policy until the configuration is synced from the control plane. This behavior is fairly complex to reason about and surprising for users. Therefore, we would like to remove this defaulting behavior from the inbound proxy. This branch changes how the inbound proxy performs policy discovery. Rather than spawning the lookup in a background task and using a default policy until a policy is discovered, the inbound proxy will now wait until the policy for a port is discovered before continuing to process a connection on that port. In terms of the inbound stack, this means that policy discovery is now a `MakeService` rather than a `NewService`. The cache is still used when a policy discovery watch has already been started. This branch does not *completely* remove all default policy configuration from the proxy. The default policy environment variables are still used when the proxy is configured without a control plane policy controller address. However, this mode will be removed in a subsequent PR.
1 parent f21228b commit c186d88

File tree

12 files changed

+259
-96
lines changed

12 files changed

+259
-96
lines changed

Cargo.lock

+1
Original file line numberDiff line numberDiff line change
@@ -891,6 +891,7 @@ dependencies = [
891891
"linkerd2-proxy-api",
892892
"once_cell",
893893
"parking_lot",
894+
"pin-project",
894895
"thiserror",
895896
"tokio",
896897
"tokio-test",

linkerd/app/admin/src/stack.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,15 @@ struct Rescue;
6969
// === impl Config ===
7070

7171
impl Config {
72+
/// Builds the admin endpoint server.
73+
///
74+
/// This method is asynchronous, as it must discover a `ServerPolicy` for
75+
/// the admin port.
7276
#[allow(clippy::too_many_arguments)]
73-
pub fn build<B, R>(
77+
pub async fn build<B, R>(
7478
self,
7579
bind: B,
76-
policy: impl inbound::policy::GetPolicy,
80+
policy: &impl inbound::policy::GetPolicy,
7781
identity: identity::Server,
7882
report: R,
7983
metrics: inbound::Metrics,
@@ -89,7 +93,7 @@ impl Config {
8993
let (listen_addr, listen) = bind.bind(&self.server)?;
9094

9195
// Get the policy for the admin server.
92-
let policy = policy.get_policy(OrigDstAddr(listen_addr.into()));
96+
let policy = policy.get_policy(OrigDstAddr(listen_addr.into())).await?;
9397

9498
let (ready, latch) = crate::server::Readiness::new();
9599
let admin = crate::server::Admin::new(report, ready, shutdown, trace);

linkerd/app/inbound/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ linkerd-tonic-watch = { path = "../../tonic-watch" }
3131
linkerd2-proxy-api = { version = "0.8", features = ["inbound"] }
3232
once_cell = "1"
3333
parking_lot = "0.12"
34+
pin-project = "1"
3435
thiserror = "1"
3536
tokio = { version = "1", features = ["sync"] }
3637
tonic = { version = "0.8", default-features = false }

linkerd/app/inbound/src/accept.rs

+16-9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::{
2-
policy::{AllowPolicy, GetPolicy},
2+
policy::{self, AllowPolicy, GetPolicy},
33
Inbound,
44
};
55
use linkerd_app_core::{
@@ -26,7 +26,7 @@ impl<N> Inbound<N> {
2626
pub(crate) fn push_accept<T, I, NSvc, D, DSvc>(
2727
self,
2828
proxy_port: u16,
29-
policies: impl GetPolicy + Clone + Send + Sync + 'static,
29+
policies: impl GetPolicy,
3030
direct: D,
3131
) -> Inbound<svc::ArcNewTcp<T, I>>
3232
where
@@ -46,6 +46,19 @@ impl<N> Inbound<N> {
4646
{
4747
self.map_stack(|cfg, rt, accept| {
4848
accept
49+
.push_on_service(svc::MapErr::layer_boxed())
50+
.push_map_target(|(policy, t): (AllowPolicy, T)| {
51+
tracing::debug!(policy = ?&*policy.borrow(), "Accepted");
52+
Accept {
53+
client_addr: t.param(),
54+
orig_dst_addr: t.param(),
55+
policy,
56+
}
57+
})
58+
.lift_new_with_target()
59+
.push(policy::Discover::layer(policies))
60+
.into_new_service()
61+
.check_new_service::<T, I>()
4962
.push_switch(
5063
// Switch to the `direct` stack when a connection's original destination is the
5164
// proxy's inbound port. Otherwise, check that connections are allowed on the
@@ -56,13 +69,7 @@ impl<N> Inbound<N> {
5669
return Ok(svc::Either::B(t));
5770
}
5871

59-
let policy = policies.get_policy(addr);
60-
tracing::debug!(policy = ?&*policy.borrow(), "Accepted");
61-
Ok(svc::Either::A(Accept {
62-
client_addr: t.param(),
63-
orig_dst_addr: addr,
64-
policy,
65-
}))
72+
Ok(svc::Either::A(t))
6673
},
6774
direct,
6875
)

linkerd/app/inbound/src/direct.rs

+58-50
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ impl<N> Inbound<N> {
9191
/// gateways may need to accept HTTP requests from older proxy versions
9292
pub(crate) fn push_direct<T, I, NSvc, G, GSvc, H, HSvc>(
9393
self,
94-
policies: impl policy::GetPolicy + Clone + Send + Sync + 'static,
94+
policies: impl policy::GetPolicy,
9595
gateway: G,
9696
http: H,
9797
) -> Inbound<svc::ArcNewTcp<T, I>>
@@ -146,58 +146,50 @@ impl<N> Inbound<N> {
146146
// the header.
147147
.push_switch(Ok::<Local, Infallible>, http)
148148
.push_switch(
149-
{
150-
let policies = policies.clone();
151-
move |(h, client): (TransportHeader, ClientInfo)| -> Result<_, Infallible> {
152-
match h {
153-
TransportHeader {
154-
port,
155-
name: None,
156-
protocol,
157-
} => Ok(svc::Either::A({
158-
// When the transport header targets an alternate port (but does
159-
// not identify an alternate target name), we check the new
160-
// target's policy (rather than the inbound proxy's address).
161-
let addr = (client.local_addr.ip(), port).into();
162-
let policy = policies.get_policy(OrigDstAddr(addr));
163-
match protocol {
164-
None => svc::Either::A(LocalTcp {
165-
server_addr: Remote(ServerAddr(addr)),
166-
client_addr: client.client_addr,
167-
client_id: client.client_id,
149+
|(policy, (h, client)): (AllowPolicy, (TransportHeader, ClientInfo))| -> Result<_, Infallible> {
150+
match h {
151+
TransportHeader {
152+
port,
153+
name: None,
154+
protocol,
155+
} => {
156+
// The transport header targets an alternate port (but does
157+
// not identify an alternate target name).
158+
let addr = (client.local_addr.ip(), port).into();
159+
Ok(svc::Either::A(match protocol {
160+
None => svc::Either::A(LocalTcp {
161+
server_addr: Remote(ServerAddr(addr)),
162+
client_addr: client.client_addr,
163+
client_id: client.client_id,
164+
policy,
165+
}),
166+
Some(protocol) => {
167+
// When TransportHeader includes the protocol, but does not
168+
// include an alternate name we go through the Inbound HTTP
169+
// stack.
170+
svc::Either::B(LocalHttp {
171+
addr: Remote(ServerAddr(addr)),
168172
policy,
169-
}),
170-
Some(protocol) => {
171-
// When TransportHeader includes the protocol, but does not
172-
// include an alternate name we go through the Inbound HTTP
173-
// stack.
174-
svc::Either::B(LocalHttp {
175-
addr: Remote(ServerAddr(addr)),
176-
policy,
177-
protocol,
178-
client,
179-
})
180-
}
173+
protocol,
174+
client,
175+
})
181176
}
182-
})),
183-
184-
TransportHeader {
185-
port,
186-
name: Some(name),
177+
}))
178+
},
179+
TransportHeader {
180+
port,
181+
name: Some(name),
182+
protocol,
183+
} => Ok(svc::Either::B(
184+
// When the transport header provides an alternate target, the
185+
// connection is a gateway connection.
186+
GatewayTransportHeader {
187+
target: NameAddr::from((name, port)),
187188
protocol,
188-
} => Ok(svc::Either::B({
189-
// When the transport header provides an alternate target, the
190-
// connection is a gateway connection. We check the _gateway
191-
// address's_ policy (rather than the target address).
192-
let policy = policies.get_policy(client.local_addr);
193-
GatewayTransportHeader {
194-
target: NameAddr::from((name, port)),
195-
protocol,
196-
client,
197-
policy,
198-
}
199-
})),
200-
}
189+
client,
190+
policy,
191+
}
192+
))
201193
}
202194
},
203195
// HTTP detection is not necessary in this case, since the transport
@@ -211,6 +203,22 @@ impl<N> Inbound<N> {
211203
)
212204
.into_inner(),
213205
)
206+
.lift_new_with_target()
207+
.push(policy::Discover::layer_via(
208+
policies,
209+
|(header, client): &(TransportHeader, ClientInfo)| {
210+
if header.name.is_some() {
211+
// When the transport header provides an alternate target, the
212+
// connection is a gateway connection. . We use the `OrigDstAddr`--the
213+
// inbound proxy server's address--to lookup policies.
214+
return client.local_addr;
215+
}
216+
217+
// Otherwise, use the port override from the transport header.
218+
OrigDstAddr((client.local_addr.ip(), header.port).into())
219+
},
220+
))
221+
.into_new_service()
214222
.check_new_service::<(TransportHeader, ClientInfo), _>()
215223
// Use ALPN to determine whether a transport header should be read.
216224
.push(NewTransportHeaderServer::layer(detect_timeout))

linkerd/app/inbound/src/policy.rs

+27-4
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
mod api;
22
mod config;
33
pub mod defaults;
4+
mod discover;
45
mod http;
56
mod store;
67
mod tcp;
78

89
pub(crate) use self::store::Store;
910
pub use self::{
1011
config::Config,
12+
discover::Discover,
1113
http::{
1214
HttpInvalidPolicy, HttpRouteInvalidRedirect, HttpRouteNotFound, HttpRouteRedirect,
1315
HttpRouteUnauthorized, NewHttpPolicy,
@@ -20,6 +22,7 @@ use linkerd_app_core::{
2022
metrics::{RouteAuthzLabels, ServerAuthzLabels},
2123
tls,
2224
transport::{ClientAddr, OrigDstAddr, Remote},
25+
Error,
2326
};
2427
use linkerd_idle_cache::Cached;
2528
pub use linkerd_proxy_server_policy::{
@@ -28,7 +31,7 @@ pub use linkerd_proxy_server_policy::{
2831
http::{filter::Redirection, Route as HttpRoute},
2932
route, Authentication, Authorization, Meta, Protocol, RoutePolicy, ServerPolicy,
3033
};
31-
use std::sync::Arc;
34+
use std::{future::Future, sync::Arc};
3235
use thiserror::Error;
3336
use tokio::sync::watch;
3437

@@ -38,9 +41,11 @@ pub struct ServerUnauthorized {
3841
server: Arc<Meta>,
3942
}
4043

41-
pub trait GetPolicy {
42-
// Returns the traffic policy configured for the destination address.
43-
fn get_policy(&self, dst: OrigDstAddr) -> AllowPolicy;
44+
/// Returns the traffic policy configured for the destination address.
45+
pub trait GetPolicy: Clone + Send + Sync + 'static {
46+
type Future: Future<Output = Result<AllowPolicy, Error>> + Unpin + Send;
47+
48+
fn get_policy(&self, target: OrigDstAddr) -> Self::Future;
4449
}
4550

4651
#[derive(Clone, Debug, PartialEq, Eq)]
@@ -75,6 +80,24 @@ pub enum Routes {
7580
Grpc(Arc<[GrpcRoute]>),
7681
}
7782

83+
// === impl GetPolicy ===
84+
85+
impl<S> GetPolicy for S
86+
where
87+
S: tower::Service<OrigDstAddr, Response = AllowPolicy, Error = Error>,
88+
S: Clone + Send + Sync + Unpin + 'static,
89+
S::Future: Send + Unpin,
90+
{
91+
type Future = tower::util::Oneshot<S, OrigDstAddr>;
92+
93+
#[inline]
94+
fn get_policy(&self, target: OrigDstAddr) -> Self::Future {
95+
use tower::util::ServiceExt;
96+
97+
self.clone().oneshot(target)
98+
}
99+
}
100+
78101
// === impl DefaultPolicy ===
79102

80103
impl From<ServerPolicy> for DefaultPolicy {

linkerd/app/inbound/src/policy/config.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ impl Config {
3636
dns: dns::Resolver,
3737
metrics: metrics::ControlHttp,
3838
identity: identity::NewClient,
39-
) -> impl GetPolicy + Clone + Send + Sync + 'static {
39+
) -> impl GetPolicy {
4040
match self {
4141
Self::Fixed {
4242
default,
+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
use super::{AllowPolicy, GetPolicy};
2+
use futures::ready;
3+
use linkerd_app_core::{svc, transport::OrigDstAddr, Error};
4+
use std::{
5+
future::Future,
6+
pin::Pin,
7+
task::{Context, Poll},
8+
};
9+
10+
#[derive(Debug, Clone)]
11+
pub struct Discover<X, G, N> {
12+
extract_addr: X,
13+
get_policy: G,
14+
new_svc: N,
15+
}
16+
17+
#[pin_project::pin_project]
18+
pub struct DiscoverFuture<F, N> {
19+
#[pin]
20+
inner: F,
21+
new_svc: N,
22+
}
23+
24+
impl<G, N> Discover<(), G, N>
25+
where
26+
G: GetPolicy + Clone,
27+
{
28+
pub fn layer(get_policy: G) -> impl svc::layer::Layer<N, Service = Self> + Clone {
29+
Self::layer_via(get_policy, ())
30+
}
31+
}
32+
33+
impl<X, G, N> Discover<X, G, N>
34+
where
35+
G: GetPolicy + Clone,
36+
X: Clone,
37+
{
38+
pub fn layer_via(
39+
get_policy: G,
40+
extract_addr: X,
41+
) -> impl svc::layer::Layer<N, Service = Self> + Clone {
42+
svc::layer::mk(move |new_svc| Self {
43+
extract_addr: extract_addr.clone(),
44+
get_policy: get_policy.clone(),
45+
new_svc,
46+
})
47+
}
48+
}
49+
50+
impl<X, G, N, NSvc, T> svc::Service<T> for Discover<X, G, N>
51+
where
52+
G: GetPolicy,
53+
X: svc::ExtractParam<OrigDstAddr, T>,
54+
N: svc::NewService<T, Service = NSvc> + Clone,
55+
NSvc: svc::NewService<AllowPolicy>,
56+
{
57+
type Error = Error;
58+
type Response = NSvc::Service;
59+
type Future = DiscoverFuture<G::Future, NSvc>;
60+
61+
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
62+
Poll::Ready(Ok(()))
63+
}
64+
65+
fn call(&mut self, target: T) -> Self::Future {
66+
let dst = self.extract_addr.extract_param(&target);
67+
DiscoverFuture {
68+
inner: self.get_policy.get_policy(dst),
69+
new_svc: self.new_svc.new_service(target),
70+
}
71+
}
72+
}
73+
74+
// === impl DiscoverFuture ===
75+
76+
impl<F, N, E> Future for DiscoverFuture<F, N>
77+
where
78+
F: Future<Output = Result<AllowPolicy, E>>,
79+
N: svc::NewService<AllowPolicy>,
80+
{
81+
type Output = Result<N::Service, E>;
82+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
83+
let this = self.project();
84+
let policy = ready!(this.inner.poll(cx))?;
85+
let svc = this.new_svc.new_service(policy);
86+
Poll::Ready(Ok(svc))
87+
}
88+
}

0 commit comments

Comments
 (0)