Skip to content

Commit

Permalink
outbound: Preserve opaqueness on unknown endpoints (#1617)
Browse files Browse the repository at this point in the history
The outbound stack only honors opaqueness when the profile response
clearly indicates that the target is a known endpoint or logical
service. This ignores the case when the target is unknown but the target
port is in the default opaque ports list, in which case the profile
response has no metadata except for the opaqueness setting.

This change handles this case explicitly and adds a test for the
`switch_logical` stack to ensure that these profile responses are
honored.

Fixes linkerd/linkerd2#8273

Signed-off-by: Oliver Gould <ver@buoyant.io>
  • Loading branch information
olix0r committed Apr 19, 2022
1 parent d4c9fb2 commit c6d79c9
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 10 deletions.
9 changes: 7 additions & 2 deletions linkerd/app/outbound/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@ pub struct FromMetadata {
// === impl Endpoint ===

impl Endpoint<()> {
pub(crate) fn forward(addr: OrigDstAddr, reason: tls::NoClientTls) -> Self {
pub(crate) fn forward(
addr: OrigDstAddr,
reason: tls::NoClientTls,
opaque_protocol: bool,
) -> Self {
Self {
addr: Remote(ServerAddr(addr.into())),
metadata: Metadata::default(),
tls: Conditional::None(reason),
logical_addr: None,
opaque_protocol: false,
opaque_protocol,
protocol: (),
}
}
Expand Down Expand Up @@ -269,6 +273,7 @@ pub mod tests {
.new_service(tcp::Endpoint::forward(
OrigDstAddr(addr),
tls::NoClientTls::Disabled,
false,
));

let (client_io, server_io) = support::io::duplex(4096);
Expand Down
61 changes: 54 additions & 7 deletions linkerd/app/outbound/src/switch_logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ impl<S> Outbound<S> {
.push_switch(
move |(profile, target): (Option<profiles::Receiver>, T)| -> Result<_, Infallible> {
if let Some(rx) = profile {
// If the profile provides an endpoint, then the target is single endpoint and
// not a logical/load-balanced service.
let is_opaque = rx.is_opaque_protocol();

// If the profile provides an endpoint, then the target is single
// endpoint and not a logical/load-balanced service.
if let Some((addr, metadata)) = rx.endpoint() {
let is_opaque = rx.is_opaque_protocol();
tracing::debug!(%is_opaque, "Profile describes an endpoint");
return Ok(svc::Either::A(Endpoint::from_metadata(
addr,
Expand All @@ -46,20 +47,33 @@ impl<S> Outbound<S> {
)));
}

// Otherwise, if the profile provides a (named) logical address, then we build a
// If the profile provides a (named) logical address, then we build a
// logical stack so we apply routes, traffic splits, and load balancing.
if let Some(logical_addr) = rx.logical_addr() {
tracing::debug!("Profile describes a logical service");
return Ok(svc::Either::B(Logical::new(logical_addr, rx)));
}

// Otherwise, if there was a profile but it didn't include an endpoint or logical
// address, create a bare endpoint from the original destination address
// using the profile-provided opaqueness. This applies for targets that
// aren't known by the destination controller that may target ports
// included in the cluster-wide default opaque list.
tracing::debug!("Unknown endpoint");
return Ok(svc::Either::A(Endpoint::forward(
target.param(),
no_tls_reason,
is_opaque,
)));
}

// If there was no profile or it didn't include any useful metadata, create a bare
// endpoint from the original destination address.
tracing::debug!("No profile; forwarding to the original destination");
// If there was no profile, create a bare endpoint from the original
// destination address.
tracing::debug!("No profile");
Ok(svc::Either::A(Endpoint::forward(
target.param(),
no_tls_reason,
false,
)))
},
logical,
Expand Down Expand Up @@ -175,4 +189,37 @@ mod tests {
let (server_io, _client_io) = io::duplex(1);
svc.oneshot(server_io).await.expect("service must succeed");
}

#[tokio::test(flavor = "current_thread")]
async fn profile_neither() {
let _trace = linkerd_tracing::test::trace_init();

let endpoint_addr = SocketAddr::new([192, 0, 2, 20].into(), 2020);
let endpoint = {
let endpoint_addr = endpoint_addr.clone();
move |ep: tcp::Endpoint| {
assert_eq!(ep.addr.as_ref(), &endpoint_addr);
assert!(ep.opaque_protocol, "protocol must be marked opaque");
svc::mk(|_: io::DuplexStream| future::ok::<(), Error>(()))
}
};

let (rt, _shutdown) = runtime();
let stack = Outbound::new(default_config(), rt)
.with_stack(endpoint)
.push_switch_logical(svc::Fail::<_, WrongStack>::default())
.into_inner();

let (_tx, profile) = tokio::sync::watch::channel(profiles::Profile {
endpoint: None,
opaque_protocol: true,
addr: None,
..Default::default()
});

let orig_dst = OrigDstAddr(endpoint_addr);
let svc = stack.new_service((Some(profile.into()), orig_dst));
let (server_io, _client_io) = io::duplex(1);
svc.oneshot(server_io).await.expect("service must succeed");
}
}
4 changes: 3 additions & 1 deletion linkerd/service-profiles/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
task::{Context, Poll},
};
use tonic::{body::BoxBody, client::GrpcService};
use tracing::debug;
use tracing::{debug, trace};

/// Creates watches on service profiles.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -73,7 +73,9 @@ where
Box::pin(async move {
match w.spawn_watch(addr).await {
Ok(rsp) => {
debug!("Resolved profile");
let rx = rsp.into_inner();
trace!(profile = ?rx.borrow());
Ok(Some(rx.into()))
}
Err(status) => {
Expand Down

0 comments on commit c6d79c9

Please sign in to comment.