Skip to content

Commit

Permalink
http: Parameterize normalize_ur::DefaultAuthority (#886)
Browse files Browse the repository at this point in the history
The NormalizeUri module needs is configured with a fallback authority in
case we're dealing with poorly-formed HTTP requests (i.e., HTTP/1.0).
Currently, the module is parameterized on a `SocketAddr`, because on the
outbound side this makes sense as a default value. But on the inbound
side, especially in a gateway configuration, it doesn't make sense to
support a default value. It's better to just fail the request early.

This change updates the NormalizeUri type to require its targets to be
parameterized on a DefaultAuthority type. This type provides an
_optional_ `http::uri::Authority` so the target may provide a named
value (i.e., from a service profile response) when appropriate. It will
also enable us to omit a default when appropriate.
  • Loading branch information
olix0r authored Jan 29, 2021
1 parent 9b61f65 commit 1f70655
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 52 deletions.
3 changes: 1 addition & 2 deletions linkerd/app/inbound/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use linkerd_app_core::{
spans::SpanConverter,
svc, Error, NameAddr, TraceContext, DST_OVERRIDE_HEADER,
};
use std::net::SocketAddr;
use tokio::sync::mpsc;
use tracing::debug_span;

Expand All @@ -35,7 +34,7 @@ pub fn server<T, I, H, HSvc>(
Service = impl svc::Service<I, Response = (), Error = Error, Future = impl Send> + Clone,
> + Clone
where
T: svc::stack::Param<SocketAddr>,
T: svc::stack::Param<http::normalize_uri::DefaultAuthority>,
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + Send + Unpin + 'static,
H: svc::NewService<T, Service = HSvc> + Clone + Send + Sync + Unpin + 'static,
HSvc: svc::Service<http::Request<http::BoxBody>, Response = http::Response<http::BoxBody>>
Expand Down
9 changes: 9 additions & 0 deletions linkerd/app/inbound/src/target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ impl Param<SocketAddr> for TcpAccept {
}
}

impl Param<http::normalize_uri::DefaultAuthority> for TcpAccept {
fn param(&self) -> http::normalize_uri::DefaultAuthority {
http::normalize_uri::DefaultAuthority(Some(
http::uri::Authority::from_str(&self.target_addr.to_string())
.expect("Address must be a valid authority"),
))
}
}

impl Param<transport::labels::Key> for TcpAccept {
fn param(&self) -> transport::labels::Key {
transport::labels::Key::accept(
Expand Down
85 changes: 54 additions & 31 deletions linkerd/app/outbound/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,30 @@ use indexmap::IndexMap;
pub use linkerd_app_core::proxy::http::*;
use linkerd_app_core::{
dst, profiles,
proxy::{
api_resolve::ProtocolHint,
http::{self, CanOverrideAuthority, ClientHandle},
tap,
},
proxy::{api_resolve::ProtocolHint, tap},
svc::stack::Param,
tls,
transport_header::SessionProtocol,
Conditional,
};
use std::{net::SocketAddr, sync::Arc};

pub type Accept = crate::target::Accept<http::Version>;
pub type Logical = crate::target::Logical<http::Version>;
pub type Concrete = crate::target::Concrete<http::Version>;
pub type Endpoint = crate::target::Endpoint<http::Version>;
use std::{net::SocketAddr, str::FromStr, sync::Arc};

pub type Accept = crate::target::Accept<Version>;
pub type Logical = crate::target::Logical<Version>;
pub type Concrete = crate::target::Concrete<Version>;
pub type Endpoint = crate::target::Endpoint<Version>;

impl Param<normalize_uri::DefaultAuthority> for Accept {
fn param(&self) -> normalize_uri::DefaultAuthority {
normalize_uri::DefaultAuthority(Some(
uri::Authority::from_str(&self.orig_dst.to_string())
.expect("Address must be a valid authority"),
))
}
}

impl From<(http::Version, tcp::Logical)> for Logical {
fn from((protocol, logical): (http::Version, tcp::Logical)) -> Self {
impl From<(Version, tcp::Logical)> for Logical {
fn from((protocol, logical): (Version, tcp::Logical)) -> Self {
Self {
protocol,
orig_dst: logical.orig_dst,
Expand All @@ -48,13 +53,31 @@ impl Logical {
}
}

impl Param<http::client::Settings> for Endpoint {
fn param(&self) -> http::client::Settings {
impl Param<normalize_uri::DefaultAuthority> for Logical {
fn param(&self) -> normalize_uri::DefaultAuthority {
if let Some(p) = self.profile.as_ref() {
if let Some(n) = p.borrow().name.as_ref() {
return normalize_uri::DefaultAuthority(Some(
uri::Authority::from_str(&format!("{}:{}", n, self.orig_dst.port()))
.expect("Address must be a valid authority"),
));
}
}

normalize_uri::DefaultAuthority(Some(
uri::Authority::from_str(&self.orig_dst.to_string())
.expect("Address must be a valid authority"),
))
}
}

impl Param<client::Settings> for Endpoint {
fn param(&self) -> client::Settings {
match self.concrete.logical.protocol {
http::Version::H2 => http::client::Settings::H2,
http::Version::Http1 => match self.metadata.protocol_hint() {
ProtocolHint::Unknown => http::client::Settings::Http1,
ProtocolHint::Http2 => http::client::Settings::OrigProtoUpgrade,
Version::H2 => client::Settings::H2,
Version::Http1 => match self.metadata.protocol_hint() {
ProtocolHint::Unknown => client::Settings::Http1,
ProtocolHint::Http2 => client::Settings::OrigProtoUpgrade,
},
}
}
Expand All @@ -63,8 +86,8 @@ impl Param<http::client::Settings> for Endpoint {
impl Param<Option<SessionProtocol>> for Endpoint {
fn param(&self) -> Option<SessionProtocol> {
match self.concrete.logical.protocol {
http::Version::H2 => Some(SessionProtocol::Http2),
http::Version::Http1 => match self.metadata.protocol_hint() {
Version::H2 => Some(SessionProtocol::Http2),
Version::Http1 => match self.metadata.protocol_hint() {
ProtocolHint::Http2 => Some(SessionProtocol::Http2),
ProtocolHint::Unknown => Some(SessionProtocol::Http1),
},
Expand All @@ -73,47 +96,47 @@ impl Param<Option<SessionProtocol>> for Endpoint {
}

// Used to set the l5d-canonical-dst header.
impl From<&'_ Logical> for http::header::HeaderValue {
impl From<&'_ Logical> for header::HeaderValue {
fn from(target: &'_ Logical) -> Self {
http::header::HeaderValue::from_str(&target.addr().to_string())
header::HeaderValue::from_str(&target.addr().to_string())
.expect("addr must be a valid header")
}
}

impl CanOverrideAuthority for Endpoint {
fn override_authority(&self) -> Option<http::uri::Authority> {
fn override_authority(&self) -> Option<uri::Authority> {
self.metadata.authority_override().cloned()
}
}

impl tap::Inspect for Endpoint {
fn src_addr<B>(&self, req: &http::Request<B>) -> Option<SocketAddr> {
fn src_addr<B>(&self, req: &Request<B>) -> Option<SocketAddr> {
req.extensions().get::<ClientHandle>().map(|c| c.addr)
}

fn src_tls<B>(&self, _: &http::Request<B>) -> tls::ConditionalServerTls {
fn src_tls<B>(&self, _: &Request<B>) -> tls::ConditionalServerTls {
Conditional::None(tls::NoServerTls::Loopback)
}

fn dst_addr<B>(&self, _: &http::Request<B>) -> Option<SocketAddr> {
fn dst_addr<B>(&self, _: &Request<B>) -> Option<SocketAddr> {
Some(self.addr)
}

fn dst_labels<B>(&self, _: &http::Request<B>) -> Option<&IndexMap<String, String>> {
fn dst_labels<B>(&self, _: &Request<B>) -> Option<&IndexMap<String, String>> {
Some(self.metadata.labels())
}

fn dst_tls<B>(&self, _: &http::Request<B>) -> tls::ConditionalClientTls {
fn dst_tls<B>(&self, _: &Request<B>) -> tls::ConditionalClientTls {
self.tls.clone()
}

fn route_labels<B>(&self, req: &http::Request<B>) -> Option<Arc<IndexMap<String, String>>> {
fn route_labels<B>(&self, req: &Request<B>) -> Option<Arc<IndexMap<String, String>>> {
req.extensions()
.get::<dst::Route>()
.map(|r| r.route.labels().clone())
}

fn is_outbound<B>(&self, _: &http::Request<B>) -> bool {
fn is_outbound<B>(&self, _: &Request<B>) -> bool {
true
}
}
63 changes: 44 additions & 19 deletions linkerd/proxy/http/src/normalize_uri.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
//! * Otherwise, the target's address is used (as provided by the target).
use super::h1;
use futures::{future, TryFutureExt};
use http::uri::Authority;
use linkerd_error::Error;
use linkerd_stack::{layer, NewService, Param};
use std::{
net::SocketAddr,
str::FromStr,
task::{Context, Poll},
};
use std::task::{Context, Poll};
use tracing::trace;

#[derive(Clone, Debug)]
Expand All @@ -31,9 +30,16 @@ pub struct NewNormalizeUri<N> {
#[derive(Clone, Debug)]
pub struct NormalizeUri<S> {
inner: S,
default: http::uri::Authority,
default: Option<http::uri::Authority>,
}

/// Parameterizes a stack target to produce an optional default authority.
#[derive(Clone, Debug)]
pub struct DefaultAuthority(pub Option<Authority>);

#[derive(Debug)]
pub struct NoAuthority(());

/// Detects the original form of a request URI and inserts a `WasAbsoluteForm`
/// extension.
#[derive(Clone, Debug)]
Expand All @@ -55,57 +61,76 @@ impl<N> NewNormalizeUri<N> {

impl<T, N> NewService<T> for NewNormalizeUri<N>
where
T: Param<SocketAddr>,
T: Param<DefaultAuthority>,
N: NewService<T>,
{
type Service = NormalizeUri<N::Service>;

fn new_service(&mut self, target: T) -> Self::Service {
let target_addr = target.param();
let DefaultAuthority(default) = target.param();
let inner = self.inner.new_service(target);
NormalizeUri::new(inner, target_addr)
NormalizeUri::new(inner, default)
}
}

// === impl NormalizeUri ===

impl<S> NormalizeUri<S> {
fn new(inner: S, target_addr: SocketAddr) -> Self {
let default = http::uri::Authority::from_str(&target_addr.to_string())
.expect("SocketAddr must be a valid Authority");
fn new(inner: S, default: Option<Authority>) -> Self {
Self { inner, default }
}
}

impl<S, B> tower::Service<http::Request<B>> for NormalizeUri<S>
where
S: tower::Service<http::Request<B>>,
S::Error: Into<Error>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
type Error = Error;
type Future = future::Either<
future::ErrInto<S::Future, Error>,
future::Ready<Result<S::Response, Error>>,
>;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
self.inner.poll_ready(cx)
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
self.inner.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, mut req: http::Request<B>) -> Self::Future {
if let http::Version::HTTP_10 | http::Version::HTTP_11 = req.version() {
if req.extensions().get::<h1::WasAbsoluteForm>().is_none()
&& req.uri().authority().is_none()
{
let authority =
h1::authority_from_host(&req).unwrap_or_else(|| self.default.clone());
let authority = match h1::authority_from_host(&req).or_else(|| self.default.clone())
{
Some(a) => a,
None => return future::Either::Right(future::err(NoAuthority(()).into())),
};

trace!(%authority, "Normalizing URI");
h1::set_authority(req.uri_mut(), authority);
}
}

self.inner.call(req)
future::Either::Left(self.inner.call(req).err_into())
}
}

// === impl NoAuthority ===

impl std::fmt::Display for NoAuthority {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"failed to normalize URI because no authority could be determined"
)
}
}

impl std::error::Error for NoAuthority {}

// === impl MarkAbsoluteForm ===

impl<S> MarkAbsoluteForm<S> {
Expand Down

0 comments on commit 1f70655

Please sign in to comment.