Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple client connection metadata from the I/O type #1426

Merged
merged 14 commits into from
Jan 3, 2022
4 changes: 2 additions & 2 deletions linkerd/app/core/src/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use linkerd_exp_backoff::{ExponentialBackoff, ExponentialBackoffStream};
pub use linkerd_reconnect::NewReconnect;
pub use linkerd_stack::{
self as stack, layer, ArcNewService, BoxCloneService, BoxService, BoxServiceLayer, Either,
ExtractParam, Fail, FailFast, Filter, InsertParam, MapErr, MapTargetLayer, NewRouter,
NewService, Param, Predicate, UnwrapOr,
ExtractParam, Fail, FailFast, Filter, InsertParam, MakeConnection, MapErr, MapTargetLayer,
NewRouter, NewService, Param, Predicate, UnwrapOr,
};
pub use linkerd_stack_tracing::{NewInstrument, NewInstrumentLayer};
use std::{
Expand Down
7 changes: 3 additions & 4 deletions linkerd/app/gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use linkerd_app_core::{
},
svc::{self, Param},
tls,
transport::{ClientAddr, OrigDstAddr, Remote},
transport::{ClientAddr, Local, OrigDstAddr, Remote},
transport_header::SessionProtocol,
Error, Infallible, NameAddr, NameMatch,
};
Expand Down Expand Up @@ -70,9 +70,8 @@ pub fn stack<I, O, P, R>(
where
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + fmt::Debug + Send + Sync + Unpin + 'static,
O: Clone + Send + Sync + Unpin + 'static,
O: svc::Service<outbound::tcp::Connect, Error = io::Error>,
O::Response:
io::AsyncRead + io::AsyncWrite + tls::HasNegotiatedProtocol + Send + Unpin + 'static,
O: svc::MakeConnection<outbound::tcp::Connect, Metadata = Local<ClientAddr>, Error = io::Error>,
O::Connection: Send + Unpin,
O::Future: Send + Unpin + 'static,
P: profiles::GetProfile<profiles::LookupAddr> + Clone + Send + Sync + Unpin + 'static,
P::Future: Send + 'static,
Expand Down
13 changes: 9 additions & 4 deletions linkerd/app/inbound/src/http/router.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{policy, stack_labels, Inbound};
use linkerd_app_core::{
classify, errors, http_tracing, io, metrics,
classify, errors, http_tracing, metrics,
profiles::{self, DiscoveryRejected},
proxy::{http, tap},
svc::{self, ExtractParam, Param},
Expand Down Expand Up @@ -84,22 +84,27 @@ impl<C> Inbound<C> {
P: profiles::GetProfile<profiles::LookupAddr> + Clone + Send + Sync + 'static,
P::Future: Send,
P::Error: Send,
C: svc::Service<Http> + Clone + Send + Sync + Unpin + 'static,
C::Response: io::AsyncRead + io::AsyncWrite + Send + Unpin + 'static,
C::Error: Into<Error>,
C: svc::MakeConnection<Http> + Clone + Send + Sync + Unpin + 'static,
C::Connection: Send + Unpin,
C::Metadata: Send,
C::Future: Send,
{
self.map_stack(|config, rt, connect| {
let allow_profile = config.allow_discovery.clone();

// Creates HTTP clients for each inbound port & HTTP settings.
let http = connect
.push(svc::layer::mk(|inner: C| inner.into_service()))
.check_service::<Http>()
.push(svc::stack::BoxFuture::layer())
.check_service::<Http>()
.push(transport::metrics::Client::layer(rt.metrics.proxy.transport.clone()))
.check_service::<Http>()
.push(http::client::layer(
config.proxy.connect.h1_settings,
config.proxy.connect.h2_settings,
))
.check_service::<Http>()
.push_on_service(svc::MapErr::layer(Into::into))
.into_new_service()
.push_new_reconnect(config.proxy.connect.backoff)
Expand Down
12 changes: 7 additions & 5 deletions linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,10 @@ impl Inbound<()> {
self,
proxy_port: u16,
) -> Inbound<
impl svc::Service<
impl svc::MakeConnection<
T,
Response = impl io::AsyncRead + io::AsyncWrite + Send,
Connection = impl Send + Unpin,
Metadata = impl Send + Unpin,
Error = Error,
Future = impl Send,
> + Clone,
Expand Down Expand Up @@ -214,16 +215,17 @@ impl<S> Inbound<S> {
T: svc::Param<transport::labels::Key> + Clone + Send + 'static,
I: io::AsyncRead + io::AsyncWrite,
I: Debug + Send + Unpin + 'static,
S: svc::Service<T> + Clone + Send + Sync + Unpin + 'static,
S::Response: io::AsyncRead + io::AsyncWrite + Send + Unpin + 'static,
S::Error: Into<Error>,
S: svc::MakeConnection<T> + Clone + Send + Sync + Unpin + 'static,
S::Connection: Send + Unpin,
S::Metadata: Send + Unpin,
S::Future: Send,
{
self.map_stack(|_, rt, connect| {
connect
.push(transport::metrics::Client::layer(
rt.metrics.proxy.transport.clone(),
))
.push(svc::stack::WithoutConnectionMetadata::layer())
.push_make_thunk()
.push_on_service(
svc::layers()
Expand Down
12 changes: 8 additions & 4 deletions linkerd/app/outbound/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,14 @@ impl<S> Outbound<S> {
pub fn push_endpoint<I>(self) -> Outbound<svc::ArcNewTcp<tcp::Endpoint, I>>
where
Self: Clone + 'static,
S: svc::Service<tcp::Connect, Error = io::Error> + Clone + Send + Sync + Unpin + 'static,
S::Response:
tls::HasNegotiatedProtocol + io::AsyncRead + io::AsyncWrite + Send + Unpin + 'static,
S::Future: Send + Unpin,
S: svc::MakeConnection<tcp::Connect, Metadata = Local<ClientAddr>, Error = io::Error>
+ Clone
+ Send
+ Sync
+ Unpin
+ 'static,
S::Connection: Send + Unpin + 'static,
S::Future: Send,
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr,
I: fmt::Debug + Send + Sync + Unpin + 'static,
{
Expand Down
7 changes: 3 additions & 4 deletions linkerd/app/outbound/src/http/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use linkerd_app_core::{
svc::{self, ExtractParam},
tls, Error, Result, CANONICAL_DST_HEADER,
};
use tokio::io;

#[derive(Copy, Clone, Debug)]
struct ClientRescue {
Expand All @@ -24,9 +23,9 @@ impl<C> Outbound<C> {
+ tap::Inspect,
B: http::HttpBody<Error = Error> + std::fmt::Debug + Default + Send + 'static,
B::Data: Send + 'static,
C: svc::Service<T> + Clone + Send + Sync + Unpin + 'static,
C::Response: io::AsyncRead + io::AsyncWrite + Send + Unpin,
C::Error: Into<Error>,
C: svc::MakeConnection<T> + Clone + Send + Sync + Unpin + 'static,
C::Connection: Send + Unpin,
C::Metadata: Send + Unpin,
C::Future: Send + Unpin + 'static,
{
self.map_stack(|config, rt, connect| {
Expand Down
2 changes: 2 additions & 0 deletions linkerd/app/outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ pub struct Accept<P> {
pub protocol: P,
}

pub type ConnectMeta = tls::ConnectMeta<Local<ClientAddr>>;

// === impl Outbound ===

impl Outbound<()> {
Expand Down
9 changes: 5 additions & 4 deletions linkerd/app/outbound/src/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ pub use linkerd_app_core::proxy::api_resolve::ConcreteAddr;
use linkerd_app_core::{
io, profiles,
proxy::{api_resolve::Metadata, core::Resolve},
svc, tls, Addr, Error,
svc,
transport::{ClientAddr, Local},
Addr, Error,
};
pub use profiles::LogicalAddr;
use std::fmt;
Expand Down Expand Up @@ -118,9 +120,8 @@ impl<C> Outbound<C> {
where
Self: Clone + 'static,
C: Clone + Send + Sync + Unpin + 'static,
C: svc::Service<tcp::Connect, Error = io::Error>,
C::Response:
tls::HasNegotiatedProtocol + io::AsyncRead + io::AsyncWrite + Send + Unpin + 'static,
C: svc::MakeConnection<tcp::Connect, Metadata = Local<ClientAddr>, Error = io::Error>,
C::Connection: Send + Unpin,
C::Future: Send + Unpin,
R: Clone + Send + 'static,
R: Resolve<ConcreteAddr, Endpoint = Metadata, Error = Error> + Sync,
Expand Down
33 changes: 21 additions & 12 deletions linkerd/app/outbound/src/tcp/connect.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use super::opaque_transport::{self, OpaqueTransport};
use crate::Outbound;
use crate::{ConnectMeta, Outbound};
use futures::future;
use linkerd_app_core::{
io,
proxy::http,
svc, tls,
transport::{self, ConnectTcp, Remote, ServerAddr},
transport::{self, ClientAddr, ConnectTcp, Local, Remote, ServerAddr},
transport_header::SessionProtocol,
Error,
};
Expand Down Expand Up @@ -36,9 +36,10 @@ impl<C> Outbound<C> {
pub fn push_tcp_endpoint<T>(
self,
) -> Outbound<
impl svc::Service<
impl svc::MakeConnection<
T,
Response = impl io::AsyncRead + io::AsyncWrite + Send + Unpin,
Connection = impl Send + Unpin,
Metadata = ConnectMeta,
Error = Error,
Future = impl Send,
> + Clone,
Expand All @@ -50,9 +51,12 @@ impl<C> Outbound<C> {
+ svc::Param<Option<http::AuthorityOverride>>
+ svc::Param<Option<SessionProtocol>>
+ svc::Param<transport::labels::Key>,
C: svc::Service<Connect, Error = io::Error> + Clone + Send + 'static,
C::Response: tls::HasNegotiatedProtocol,
C::Response: io::AsyncRead + io::AsyncWrite + Send + Unpin + 'static,
C: svc::MakeConnection<Connect, Metadata = Local<ClientAddr>, Error = io::Error>
+ Clone
+ Send
+ 'static,
C::Connection: Send + Unpin,
C::Metadata: Send + Unpin,
C::Future: Send + 'static,
{
self.map_stack(|config, rt, connect| {
Expand Down Expand Up @@ -85,13 +89,14 @@ impl<C> Outbound<C> {
where
T: Clone + Send + 'static,
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + std::fmt::Debug + Send + Unpin + 'static,
C: svc::Service<T> + Clone + Send + Sync + 'static,
C::Response: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin,
C::Error: Into<Error>,
C: svc::MakeConnection<T> + Clone + Send + Sync + 'static,
C::Connection: Send + Unpin,
C::Metadata: Send + Unpin,
C::Future: Send,
{
self.map_stack(|_, _, conn| {
conn.push_make_thunk()
conn.push(svc::stack::WithoutConnectionMetadata::layer())
.push_make_thunk()
.push_on_service(super::Forward::layer())
.instrument(|_: &_| debug_span!("tcp.forward"))
.push(svc::ArcNewService::layer())
Expand Down Expand Up @@ -166,6 +171,7 @@ mod tests {
use crate::{
svc::{self, NewService, ServiceExt},
test_util::*,
transport::{ClientAddr, Local},
};
use std::net::SocketAddr;

Expand All @@ -180,7 +186,10 @@ mod tests {
assert_eq!(a, addr);
let mut io = support::io();
io.write(b"hello").read(b"world");
future::ok::<_, support::io::Error>(io.build())
future::ok::<_, support::io::Error>((
io.build(),
Local(ClientAddr(([0, 0, 0, 0], 0).into())),
))
}))
.push_tcp_forward()
.into_inner();
Expand Down
24 changes: 13 additions & 11 deletions linkerd/app/outbound/src/tcp/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,7 @@ use linkerd_app_core::{
};
use tracing::debug_span;

impl<C> Outbound<C>
where
C: svc::Service<Endpoint> + Clone + Send + 'static,
C::Response: io::AsyncRead + io::AsyncWrite + Send + Unpin,
C::Error: Into<Error>,
C::Future: Send,
{
impl<C> Outbound<C> {
/// Constructs a TCP load balancer.
pub fn push_tcp_logical<I, R>(
self,
Expand All @@ -30,6 +24,11 @@ where
>,
>
where
C: svc::MakeConnection<Endpoint> + Clone + Send + 'static,
C::Connection: Send + Unpin,
C::Metadata: Send + Unpin,
C::Future: Send,
C: Send + Sync + 'static,
I: io::AsyncRead + io::AsyncWrite + std::fmt::Debug + Send + Unpin + 'static,
R: Resolve<ConcreteAddr, Endpoint = Metadata, Error = Error>
+ Clone
Expand All @@ -38,7 +37,6 @@ where
+ 'static,
R::Resolution: Send,
R::Future: Send + Unpin,
C: Send + Sync + 'static,
{
self.map_stack(|config, rt, connect| {
let config::ProxyConfig {
Expand All @@ -63,6 +61,7 @@ where
.into_inner();

connect
.push(svc::stack::WithoutConnectionMetadata::layer())
.push_make_thunk()
.instrument(|t: &Endpoint| {
debug_span!(
Expand Down Expand Up @@ -159,7 +158,8 @@ mod tests {
assert_eq!(*ep.addr.as_ref(), ep_addr);
let mut io = support::io();
io.write(b"hola").read(b"mundo");
future::ok::<_, support::io::Error>(io.build())
let local = Local(ClientAddr(([0, 0, 0, 0], 4444).into()));
future::ok::<_, support::io::Error>((io.build(), local))
}))
.push_tcp_logical(resolve)
.into_inner();
Expand Down Expand Up @@ -225,13 +225,15 @@ mod tests {
tracing::debug!(%addr, "writing ep0");
let mut io = support::io();
io.write(b"who r u?").read(b"ep0");
future::ok::<_, support::io::Error>(io.build())
let local = Local(ClientAddr(([0, 0, 0, 0], 4444).into()));
future::ok::<_, support::io::Error>((io.build(), local))
}
Remote(ServerAddr(addr)) if addr == ep1_addr => {
tracing::debug!(%addr, "writing ep1");
let mut io = support::io();
io.write(b"who r u?").read(b"ep1");
future::ok::<_, support::io::Error>(io.build())
let local = Local(ClientAddr(([0, 0, 0, 0], 4444).into()));
future::ok::<_, support::io::Error>((io.build(), local))
}
addr => unreachable!("unexpected endpoint: {}", addr),
}))
Expand Down
Loading