Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

outbound: Preserve opaqueness on unknown endpoints #1617

Merged
merged 2 commits into from
Apr 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions linkerd/app/outbound/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,17 @@ pub struct FromMetadata {
// === impl Endpoint ===

impl Endpoint<()> {
pub(crate) fn forward(addr: OrigDstAddr, reason: tls::NoClientTls) -> Self {
pub(crate) fn forward(
addr: OrigDstAddr,
reason: tls::NoClientTls,
opaque_protocol: bool,
) -> Self {
Self {
addr: Remote(ServerAddr(addr.into())),
metadata: Metadata::default(),
tls: Conditional::None(reason),
logical_addr: None,
opaque_protocol: false,
opaque_protocol,
protocol: (),
}
}
Expand Down Expand Up @@ -263,6 +267,7 @@ pub mod tests {
.new_service(tcp::Endpoint::forward(
OrigDstAddr(addr),
tls::NoClientTls::Disabled,
false,
));

let (client_io, server_io) = support::io::duplex(4096);
Expand Down
61 changes: 54 additions & 7 deletions linkerd/app/outbound/src/switch_logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ impl<S> Outbound<S> {
.push_switch(
move |(profile, target): (Option<profiles::Receiver>, T)| -> Result<_, Infallible> {
if let Some(rx) = profile {
// If the profile provides an endpoint, then the target is single endpoint and
// not a logical/load-balanced service.
let is_opaque = rx.is_opaque_protocol();

// If the profile provides an endpoint, then the target is single
// endpoint and not a logical/load-balanced service.
if let Some((addr, metadata)) = rx.endpoint() {
let is_opaque = rx.is_opaque_protocol();
tracing::debug!(%is_opaque, "Profile describes an endpoint");
return Ok(svc::Either::A(Endpoint::from_metadata(
addr,
Expand All @@ -46,20 +47,33 @@ impl<S> Outbound<S> {
)));
}

// Otherwise, if the profile provides a (named) logical address, then we build a
// If the profile provides a (named) logical address, then we build a
// logical stack so we apply routes, traffic splits, and load balancing.
if let Some(logical_addr) = rx.logical_addr() {
tracing::debug!("Profile describes a logical service");
return Ok(svc::Either::B(Logical::new(logical_addr, rx)));
}

// Otherwise, if there was a profile but it didn't include an endpoint or logical
// address, create a bare endpoint from the original destination address
// using the profile-provided opaqueness. This applies for targets that
// aren't known by the destination controller that may target ports
// included in the cluster-wide default opaque list.
tracing::debug!("Unknown endpoint");
return Ok(svc::Either::A(Endpoint::forward(
target.param(),
no_tls_reason,
is_opaque,
)));
}

// If there was no profile or it didn't include any useful metadata, create a bare
// endpoint from the original destination address.
tracing::debug!("No profile; forwarding to the original destination");
// If there was no profile, create a bare endpoint from the original
// destination address.
tracing::debug!("No profile");
Ok(svc::Either::A(Endpoint::forward(
target.param(),
no_tls_reason,
false,
)))
},
logical,
Expand Down Expand Up @@ -175,4 +189,37 @@ mod tests {
let (server_io, _client_io) = io::duplex(1);
svc.oneshot(server_io).await.expect("service must succeed");
}

#[tokio::test(flavor = "current_thread")]
async fn profile_neither() {
let _trace = linkerd_tracing::test::trace_init();

let endpoint_addr = SocketAddr::new([192, 0, 2, 20].into(), 2020);
let endpoint = {
let endpoint_addr = endpoint_addr.clone();
move |ep: tcp::Endpoint| {
assert_eq!(ep.addr.as_ref(), &endpoint_addr);
assert!(ep.opaque_protocol, "protocol must be marked opaque");
svc::mk(|_: io::DuplexStream| future::ok::<(), Error>(()))
}
};

let (rt, _shutdown) = runtime();
let stack = Outbound::new(default_config(), rt)
.with_stack(endpoint)
.push_switch_logical(svc::Fail::<_, WrongStack>::default())
.into_inner();

let (_tx, profile) = tokio::sync::watch::channel(profiles::Profile {
endpoint: None,
opaque_protocol: true,
addr: None,
..Default::default()
});

let orig_dst = OrigDstAddr(endpoint_addr);
let svc = stack.new_service((Some(profile.into()), orig_dst));
let (server_io, _client_io) = io::duplex(1);
svc.oneshot(server_io).await.expect("service must succeed");
}
}
4 changes: 3 additions & 1 deletion linkerd/service-profiles/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
task::{Context, Poll},
};
use tonic::{body::BoxBody, client::GrpcService};
use tracing::debug;
use tracing::{debug, trace};

/// Creates watches on service profiles.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -73,7 +73,9 @@ where
Box::pin(async move {
match w.spawn_watch(addr).await {
Ok(rsp) => {
debug!("Resolved profile");
let rx = rsp.into_inner();
trace!(profile = ?rx.borrow());
Ok(Some(rx.into()))
}
Err(status) => {
Expand Down