Skip to content

Commit 89ee318

Browse files
authored
inbound: Introduce a policy::LookupAddr type (#2264)
Currently, the inbound crate's `GetPolicy` trait performs lookups on `OrigDstAddr`s. This is not strictly correct: the `OrigDstAddr` newtype is intended to specifically represent an address that was returned by a `getsockopt(..., SO_ORIGINAL_DST, ...)` call. Various uses of `OrigDstAddr`, especially in the inbound policy stack, overload `OrigDstAddr` for other uses so that other addresses must be coerced into an `OrigDstAddr`. To simplify the inbound proxy's use of address types: - Policy types (e.g. `ServerPolicy`, `AllowPermit`, etc) now represent addresses as `ServerAddr` rather than `OrigDstAddr`. - The `inbound::policy` module now defines a `LookupAddr` newtype, which is passed as an argument to `GetPolicy`. - The inbound accept and direct stacks provide `ExtractParam` impls to the `policy::Discover` layer indicating how `LookupAddr`s are produced from targets in that stack. This makes the different behaviors between the normal accept stack and the direct stack more explicit at the callsite.
1 parent 93b06e6 commit 89ee318

File tree

14 files changed

+78
-55
lines changed

14 files changed

+78
-55
lines changed

linkerd/app/admin/src/stack.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,9 @@ impl Config {
9393
let (listen_addr, listen) = bind.bind(&self.server)?;
9494

9595
// Get the policy for the admin server.
96-
let policy = policy.get_policy(OrigDstAddr(listen_addr.into())).await?;
96+
let policy = policy
97+
.get_policy(inbound::policy::LookupAddr(listen_addr.into()))
98+
.await?;
9799

98100
let (ready, latch) = crate::server::Readiness::new();
99101
let admin = crate::server::Admin::new(report, ready, shutdown, trace);

linkerd/app/gateway/src/http/tests.rs

+10-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use linkerd_app_core::{
44
svc::{NewService, ServiceExt},
55
tls,
66
trace::test::trace_init,
7+
transport::ServerAddr,
78
Error, NameAddr,
89
};
910
use linkerd_app_inbound::GatewayLoop;
@@ -52,7 +53,7 @@ async fn upgraded_request_remains_relative_form() {
5253

5354
impl svc::Param<OrigDstAddr> for Target {
5455
fn param(&self) -> OrigDstAddr {
55-
OrigDstAddr(([10, 10, 10, 10], 4143).into())
56+
OrigDstAddr(Self::dst_addr())
5657
}
5758
}
5859

@@ -132,14 +133,21 @@ async fn upgraded_request_remains_relative_form() {
132133
}]))]),
133134
},
134135
};
135-
let (policy, tx) = inbound::policy::AllowPolicy::for_test(self.param(), policy);
136+
let (policy, tx) =
137+
inbound::policy::AllowPolicy::for_test(ServerAddr(Self::dst_addr()), policy);
136138
tokio::spawn(async move {
137139
tx.closed().await;
138140
});
139141
policy
140142
}
141143
}
142144

145+
impl Target {
146+
fn dst_addr() -> std::net::SocketAddr {
147+
([10, 10, 10, 10], 4143).into()
148+
}
149+
}
150+
143151
let (inner, mut handle) =
144152
mock::pair::<http::Request<http::BoxBody>, http::Response<http::BoxBody>>();
145153
handle.allow(1);

linkerd/app/inbound/src/accept.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,12 @@ impl<N> Inbound<N> {
5959
}
6060
})
6161
.lift_new_with_target()
62-
.push(policy::Discover::layer(policies))
62+
.push(policy::Discover::layer_via(policies, |t: &T| {
63+
// For non-direct inbound connections, policies are always
64+
// looked up for the original destination address.
65+
let OrigDstAddr(addr) = t.param();
66+
policy::LookupAddr(addr)
67+
}))
6368
.into_new_service()
6469
.check_new_service::<T, I>()
6570
.push_switch(

linkerd/app/inbound/src/detect/tests.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ fn authzs() -> Arc<[Authorization]> {
2727

2828
fn allow(protocol: Protocol) -> AllowPolicy {
2929
let (allow, _tx) = AllowPolicy::for_test(
30-
orig_dst_addr(),
30+
ServerAddr(orig_dst_addr().into()),
3131
ServerPolicy {
3232
protocol,
3333
meta: Arc::new(Meta::Resource {

linkerd/app/inbound/src/direct.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -209,13 +209,13 @@ impl<N> Inbound<N> {
209209
|(header, client): &(TransportHeader, ClientInfo)| {
210210
if header.name.is_some() {
211211
// When the transport header provides an alternate target, the
212-
// connection is a gateway connection. . We use the `OrigDstAddr`--the
212+
// connection is a gateway connection. We use the `OrigDstAddr`--the
213213
// inbound proxy server's address--to lookup policies.
214-
return client.local_addr;
214+
return policy::LookupAddr(client.local_addr.into());
215215
}
216216

217217
// Otherwise, use the port override from the transport header.
218-
OrigDstAddr((client.local_addr.ip(), header.port).into())
218+
policy::LookupAddr((client.local_addr.ip(), header.port).into())
219219
},
220220
))
221221
.into_new_service()

linkerd/app/inbound/src/http/tests.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -578,11 +578,15 @@ impl Target {
578578
fn addr() -> SocketAddr {
579579
([127, 0, 0, 1], 80).into()
580580
}
581+
582+
fn dst_addr() -> SocketAddr {
583+
([192, 0, 2, 2], 80).into()
584+
}
581585
}
582586

583587
impl svc::Param<OrigDstAddr> for Target {
584588
fn param(&self) -> OrigDstAddr {
585-
OrigDstAddr(([192, 0, 2, 2], 80).into())
589+
OrigDstAddr(Self::dst_addr())
586590
}
587591
}
588592

@@ -622,7 +626,7 @@ impl svc::Param<policy::AllowPolicy> for Target {
622626
}),
623627
}]);
624628
let (policy, _) = policy::AllowPolicy::for_test(
625-
self.param(),
629+
ServerAddr(Self::dst_addr()),
626630
policy::ServerPolicy {
627631
protocol: policy::Protocol::Http1(Arc::new([
628632
linkerd_proxy_server_policy::http::default(authorizations),

linkerd/app/inbound/src/metrics/authz.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use linkerd_app_core::{
55
ServerLabel, TargetAddr, TlsAccept,
66
},
77
tls,
8-
transport::OrigDstAddr,
8+
transport::ServerAddr,
99
};
1010
use parking_lot::Mutex;
1111
use std::{collections::HashMap, sync::Arc};
@@ -79,7 +79,7 @@ impl HttpAuthzMetrics {
7979
pub fn route_not_found(
8080
&self,
8181
labels: ServerLabel,
82-
dst: OrigDstAddr,
82+
dst: ServerAddr,
8383
tls: tls::ConditionalServerTls,
8484
) {
8585
self.0
@@ -90,7 +90,7 @@ impl HttpAuthzMetrics {
9090
.incr();
9191
}
9292

93-
pub fn deny(&self, labels: RouteLabels, dst: OrigDstAddr, tls: tls::ConditionalServerTls) {
93+
pub fn deny(&self, labels: RouteLabels, dst: ServerAddr, tls: tls::ConditionalServerTls) {
9494
self.0
9595
.deny
9696
.lock()
@@ -205,7 +205,7 @@ impl FmtMetrics for TcpAuthzMetrics {
205205
// === impl Key ===
206206

207207
impl<L> Key<L> {
208-
fn new(labels: L, dst: OrigDstAddr, tls: tls::ConditionalServerTls) -> Self {
208+
fn new(labels: L, dst: ServerAddr, tls: tls::ConditionalServerTls) -> Self {
209209
Self {
210210
tls,
211211
target: TargetAddr(dst.into()),

linkerd/app/inbound/src/policy.rs

+16-13
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub use linkerd_app_core::metrics::ServerLabel;
2323
use linkerd_app_core::{
2424
metrics::{RouteAuthzLabels, ServerAuthzLabels},
2525
tls,
26-
transport::{ClientAddr, OrigDstAddr, Remote},
26+
transport::{ClientAddr, Remote, ServerAddr},
2727
Error,
2828
};
2929
use linkerd_idle_cache::Cached;
@@ -33,7 +33,7 @@ pub use linkerd_proxy_server_policy::{
3333
http::{filter::Redirection, Route as HttpRoute},
3434
route, Authentication, Authorization, Meta, Protocol, RoutePolicy, ServerPolicy,
3535
};
36-
use std::{future::Future, sync::Arc};
36+
use std::{future::Future, net::SocketAddr, sync::Arc};
3737
use thiserror::Error;
3838
use tokio::sync::watch;
3939

@@ -47,27 +47,30 @@ pub struct ServerUnauthorized {
4747
pub trait GetPolicy: Clone + Send + Sync + 'static {
4848
type Future: Future<Output = Result<AllowPolicy, Error>> + Unpin + Send;
4949

50-
fn get_policy(&self, target: OrigDstAddr) -> Self::Future;
50+
fn get_policy(&self, addr: LookupAddr) -> Self::Future;
5151
}
5252

53+
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
54+
pub struct LookupAddr(pub SocketAddr);
55+
5356
#[derive(Clone, Debug)]
5457
pub struct AllowPolicy {
55-
dst: OrigDstAddr,
58+
dst: ServerAddr,
5659
server: Cached<watch::Receiver<ServerPolicy>>,
5760
}
5861

5962
// Describes an authorized non-HTTP connection.
6063
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
6164
pub struct ServerPermit {
62-
pub dst: OrigDstAddr,
65+
pub dst: ServerAddr,
6366
pub protocol: Protocol,
6467
pub labels: ServerAuthzLabels,
6568
}
6669

6770
// Describes an authorized HTTP request.
6871
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
6972
pub struct HttpRoutePermit {
70-
pub dst: OrigDstAddr,
73+
pub dst: ServerAddr,
7174
pub labels: RouteAuthzLabels,
7275
}
7376

@@ -80,25 +83,25 @@ pub enum Routes {
8083

8184
impl<S> GetPolicy for S
8285
where
83-
S: tower::Service<OrigDstAddr, Response = AllowPolicy, Error = Error>,
86+
S: tower::Service<LookupAddr, Response = AllowPolicy, Error = Error>,
8487
S: Clone + Send + Sync + Unpin + 'static,
8588
S::Future: Send + Unpin,
8689
{
87-
type Future = tower::util::Oneshot<S, OrigDstAddr>;
90+
type Future = tower::util::Oneshot<S, LookupAddr>;
8891

8992
#[inline]
90-
fn get_policy(&self, target: OrigDstAddr) -> Self::Future {
93+
fn get_policy(&self, addr: LookupAddr) -> Self::Future {
9194
use tower::util::ServiceExt;
9295

93-
self.clone().oneshot(target)
96+
self.clone().oneshot(addr)
9497
}
9598
}
9699

97100
// === impl AllowPolicy ===
98101

99102
impl AllowPolicy {
100103
#[cfg(any(test, fuzzing, feature = "test-util"))]
101-
pub fn for_test(dst: OrigDstAddr, server: ServerPolicy) -> (Self, watch::Sender<ServerPolicy>) {
104+
pub fn for_test(dst: ServerAddr, server: ServerPolicy) -> (Self, watch::Sender<ServerPolicy>) {
102105
let (tx, server) = watch::channel(server);
103106
let server = Cached::uncached(server);
104107
let p = Self { dst, server };
@@ -116,7 +119,7 @@ impl AllowPolicy {
116119
}
117120

118121
#[inline]
119-
pub fn dst_addr(&self) -> OrigDstAddr {
122+
pub fn dst_addr(&self) -> ServerAddr {
120123
self.dst
121124
}
122125

@@ -186,7 +189,7 @@ fn is_authorized(
186189
// === impl Permit ===
187190

188191
impl ServerPermit {
189-
fn new(dst: OrigDstAddr, server: &ServerPolicy, authz: &Authorization) -> Self {
192+
fn new(dst: ServerAddr, server: &ServerPolicy, authz: &Authorization) -> Self {
190193
Self {
191194
dst,
192195
protocol: server.protocol.clone(),

linkerd/app/inbound/src/policy/discover.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
use super::{AllowPolicy, GetPolicy};
1+
use super::{AllowPolicy, GetPolicy, LookupAddr};
22
use futures::ready;
3-
use linkerd_app_core::{svc, transport::OrigDstAddr, Error};
3+
use linkerd_app_core::{svc, Error};
44
use std::{
55
future::Future,
66
pin::Pin,
@@ -50,7 +50,7 @@ where
5050
impl<X, G, N, NSvc, T> svc::Service<T> for Discover<X, G, N>
5151
where
5252
G: GetPolicy,
53-
X: svc::ExtractParam<OrigDstAddr, T>,
53+
X: svc::ExtractParam<LookupAddr, T>,
5454
N: svc::NewService<T, Service = NSvc> + Clone,
5555
NSvc: svc::NewService<AllowPolicy>,
5656
{

linkerd/app/inbound/src/policy/http.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use linkerd_app_core::{
88
metrics::{RouteAuthzLabels, RouteLabels},
99
svc::{self, ServiceExt},
1010
tls,
11-
transport::{ClientAddr, OrigDstAddr, Remote},
11+
transport::{ClientAddr, Remote, ServerAddr},
1212
Error, Result,
1313
};
1414
use linkerd_proxy_server_policy::{grpc, http, route::RouteMatch};
@@ -41,7 +41,7 @@ pub struct HttpPolicyService<T, N> {
4141

4242
#[derive(Clone, Debug)]
4343
struct ConnectionMeta {
44-
dst: OrigDstAddr,
44+
dst: ServerAddr,
4545
client: Remote<ClientAddr>,
4646
tls: tls::ConditionalServerTls,
4747
}

linkerd/app/inbound/src/policy/http/tests.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
use super::*;
22
use crate::policy::{Authentication, Authorization, Meta, Protocol, ServerPolicy};
3-
use linkerd_app_core::{svc::Service, Infallible};
3+
use linkerd_app_core::{svc::Service, transport::ServerAddr, Infallible};
44
use std::sync::Arc;
55

66
macro_rules! conn {
77
($client:expr, $dst:expr) => {{
88
ConnectionMeta {
9-
dst: OrigDstAddr(($dst, 8080).into()),
9+
dst: ServerAddr(($dst, 8080).into()),
1010
client: Remote(ClientAddr(($client, 30120).into())),
1111
tls: tls::ConditionalServerTls::Some(tls::ServerTls::Established {
1212
client_id: Some("foo.bar.bah".parse().unwrap()),

linkerd/app/inbound/src/policy/store.rs

+7-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
use super::AllowPolicy;
1+
use super::{AllowPolicy, LookupAddr};
22
use futures::future;
3-
use linkerd_app_core::{svc, transport::OrigDstAddr, Error};
3+
use linkerd_app_core::{svc, transport::ServerAddr, Error};
44
use linkerd_idle_cache::IdleCache;
55
pub use linkerd_proxy_server_policy::{
66
authz::Suffix, Authentication, Authorization, Protocol, ServerPolicy,
@@ -96,7 +96,7 @@ where
9696
}
9797
}
9898

99-
impl<D> svc::Service<OrigDstAddr> for Store<D>
99+
impl<D> svc::Service<LookupAddr> for Store<D>
100100
where
101101
D: svc::Service<
102102
u16,
@@ -117,11 +117,12 @@ where
117117
task::Poll::Ready(Ok(()))
118118
}
119119

120-
fn call(&mut self, dst: OrigDstAddr) -> Self::Future {
121-
// Lookup the polcify for the target port in the cache. If it doesn't
120+
fn call(&mut self, LookupAddr(addr): LookupAddr) -> Self::Future {
121+
// Lookup the policy for the target port in the cache. If it doesn't
122122
// already exist, we spawn a watch on the API (if it is configured). If
123123
// no discovery API is configured we use the default policy.
124-
let port = dst.port();
124+
let port = addr.port();
125+
let dst = ServerAddr(addr);
125126
if let Some(server) = self.cache.get(&port) {
126127
return future::Either::Left(future::ready(Ok(AllowPolicy { dst, server })));
127128
}

linkerd/app/inbound/src/policy/tcp.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::{
55
use futures::future;
66
use linkerd_app_core::{
77
svc, tls,
8-
transport::{ClientAddr, OrigDstAddr, Remote},
8+
transport::{ClientAddr, Remote, ServerAddr},
99
Error, Result,
1010
};
1111
use linkerd_proxy_server_policy::{Protocol, ServerPolicy};
@@ -182,7 +182,7 @@ where
182182
/// accept connections given the provided TLS state.
183183
fn check_authorized(
184184
server: &ServerPolicy,
185-
dst: OrigDstAddr,
185+
dst: ServerAddr,
186186
client_addr: Remote<ClientAddr>,
187187
tls: &tls::ConditionalServerTls,
188188
) -> Result<ServerPermit, ServerUnauthorized> {

0 commit comments

Comments
 (0)