Skip to content

Commit

Permalink
Annotate socket-level errors with a scope (#852)
Browse files Browse the repository at this point in the history
We frequently encounter socket errors like:

    Connection closed error=Transport endpoint is not connected (os error 107)

While the handler is generally scoped with tracing context to tell us
information about the server-side connection, we lack information about
which socket actually failed. For every "logical" TCP stream, we
generally have two concrete connections: One on the server-side of the
proxy and another on the client-side.

This change introduces a new `io::ScopedIo` wrapper type that annotates
all I/O error messages with a `client` or `server` prefix. Then, the
proxy's clients and servers are wrapped with this type so that all
socket-level error messages are disambiguated.
  • Loading branch information
olix0r authored Jan 19, 2021
1 parent adaa67e commit 1e98a5b
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 16 deletions.
9 changes: 7 additions & 2 deletions linkerd/app/core/src/serve.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::io;
use crate::svc;
use futures::prelude::*;
use linkerd_error::Error;
Expand All @@ -18,7 +19,7 @@ pub async fn serve<M, A, I>(
where
I: Send + 'static,
M: svc::NewService<Addrs, Service = A>,
A: tower::Service<I, Response = ()> + Send + 'static,
A: tower::Service<io::ScopedIo<I>, Response = ()> + Send + 'static,
A::Error: Into<Error>,
A::Future: Send + 'static,
{
Expand All @@ -45,7 +46,11 @@ where
async move {
match accept.ready_oneshot().err_into::<Error>().await {
Ok(mut accept) => {
match accept.call(io).err_into::<Error>().await {
match accept
.call(io::ScopedIo::server(io))
.err_into::<Error>()
.await
{
Ok(()) => debug!("Connection closed"),
Err(error) => info!(%error, "Connection closed"),
}
Expand Down
14 changes: 7 additions & 7 deletions linkerd/app/outbound/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub fn accept_stack<P, C, T, TSvc, H, HSvc, I>(
drain: drain::Watch,
) -> impl svc::NewService<
tcp::Accept,
Service = impl svc::Service<SensorIo<I>, Response = (), Error = Error, Future = impl Send>,
Service = impl svc::Service<I, Response = (), Error = Error, Future = impl Send>,
> + Clone
where
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + std::fmt::Debug + Send + Unpin + 'static,
Expand All @@ -120,14 +120,14 @@ where
C::Error: Into<Error>,
C::Future: Send,
T: svc::NewService<tcp::Concrete, Service = TSvc> + Clone + Send + 'static,
TSvc: svc::Service<io::PrefixedIo<SensorIo<I>>, Response = ()>
+ svc::Service<SensorIo<I>, Response = ()>
TSvc: svc::Service<io::PrefixedIo<I>, Response = ()>
+ svc::Service<I, Response = ()>
+ Send
+ 'static,
<TSvc as svc::Service<SensorIo<I>>>::Error: Into<Error>,
<TSvc as svc::Service<SensorIo<I>>>::Future: Send,
<TSvc as svc::Service<io::PrefixedIo<SensorIo<I>>>>::Error: Into<Error>,
<TSvc as svc::Service<io::PrefixedIo<SensorIo<I>>>>::Future: Send,
<TSvc as svc::Service<I>>::Error: Into<Error>,
<TSvc as svc::Service<I>>::Future: Send,
<TSvc as svc::Service<io::PrefixedIo<I>>>::Error: Into<Error>,
<TSvc as svc::Service<io::PrefixedIo<I>>>::Future: Send,
H: svc::NewService<http::Logical, Service = HSvc> + Clone + Send + 'static,
HSvc: svc::Service<http::Request<http::BoxBody>, Response = http::Response<http::BoxBody>>
+ Send
Expand Down
2 changes: 2 additions & 0 deletions linkerd/io/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
mod boxed;
mod either;
mod prefixed;
mod scoped;
mod sensor;

pub use self::{
boxed::BoxedIo,
either::EitherIo,
prefixed::PrefixedIo,
scoped::ScopedIo,
sensor::{Sensor, SensorIo},
};
pub use std::io::*;
Expand Down
132 changes: 132 additions & 0 deletions linkerd/io/src/scoped.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use crate as io;
use pin_project::pin_project;
use std::{pin::Pin, task::Context};

/// An I/O stream where errors are annotated a scope.
#[pin_project]
#[derive(Debug)]
pub struct ScopedIo<I> {
scope: Scope,

#[pin]
io: I,
}

#[derive(Copy, Clone, Debug)]
enum Scope {
Client,
Server,
}

// === impl Scope ===

impl Scope {
#[inline]
fn err(&self) -> impl Fn(io::Error) -> io::Error {
let scope = *self;
move |err| io::Error::new(err.kind(), format!("{}: {}", scope, err))
}
}

impl std::fmt::Display for Scope {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Client => write!(f, "client"),
Self::Server => write!(f, "server"),
}
}
}

// === impl ScopedIo ===

impl<I> ScopedIo<I> {
pub fn client(io: I) -> Self {
Self {
scope: Scope::Client,
io,
}
}

pub fn server(io: I) -> Self {
Self {
scope: Scope::Server,
io,
}
}
}

#[async_trait::async_trait]
impl<I: io::Peek + Send + Sync> io::Peek for ScopedIo<I> {
async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.io.peek(buf).await.map_err(self.scope.err())
}
}

impl<I: io::PeerAddr> io::PeerAddr for ScopedIo<I> {
#[inline]
fn peer_addr(&self) -> io::Result<std::net::SocketAddr> {
self.io.peer_addr().map_err(self.scope.err())
}
}

impl<I: io::AsyncRead> io::AsyncRead for ScopedIo<I> {
#[inline]
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut io::ReadBuf<'_>,
) -> io::Poll<()> {
let this = self.project();
this.io.poll_read(cx, buf).map_err(this.scope.err())
}
}

impl<I: io::Write> io::Write for ScopedIo<I> {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.io.write(buf).map_err(self.scope.err())
}

#[inline]
fn flush(&mut self) -> io::Result<()> {
self.io.flush().map_err(self.scope.err())
}
}

impl<I: io::AsyncWrite> io::AsyncWrite for ScopedIo<I> {
#[inline]
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
let this = self.project();
this.io.poll_shutdown(cx).map_err(this.scope.err())
}

#[inline]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
let this = self.project();
this.io.poll_flush(cx).map_err(this.scope.err())
}

#[inline]
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> io::Poll<usize> {
let this = self.project();
let scope = this.scope;
this.io.poll_write(cx, buf).map_err(scope.err())
}

#[inline]
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> io::Poll<usize> {
let this = self.project();
this.io
.poll_write_vectored(cx, bufs)
.map_err(this.scope.err())
}

#[inline]
fn is_write_vectored(&self) -> bool {
self.io.is_write_vectored()
}
}
4 changes: 2 additions & 2 deletions linkerd/proxy/tap/src/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl AcceptPermittedClients {
}
}

impl<T> Service<Connection<T, TcpStream>> for AcceptPermittedClients {
impl<T> Service<Connection<T, io::ScopedIo<TcpStream>>> for AcceptPermittedClients {
type Response = ServeFuture;
type Error = Error;
type Future = future::Ready<Result<Self::Response, Self::Error>>;
Expand All @@ -73,7 +73,7 @@ impl<T> Service<Connection<T, TcpStream>> for AcceptPermittedClients {
Poll::Ready(Ok(()))
}

fn call(&mut self, ((peer_id, _), io): Connection<T, TcpStream>) -> Self::Future {
fn call(&mut self, ((peer_id, _), io): Connection<T, io::ScopedIo<TcpStream>>) -> Self::Future {
future::ok(match peer_id {
Conditional::Some(ref peer) => {
if self.permitted_client_ids.contains(peer) {
Expand Down
9 changes: 5 additions & 4 deletions linkerd/proxy/transport/src/connect.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use linkerd_io as io;
use std::task::{Context, Poll};
use std::{future::Future, io, net::SocketAddr, pin::Pin, time::Duration};
use std::{future::Future, net::SocketAddr, pin::Pin, time::Duration};
use tokio::net::TcpStream;
use tracing::debug;

Expand All @@ -15,10 +16,10 @@ impl ConnectTcp {
}

impl<T: Into<SocketAddr>> tower::Service<T> for ConnectTcp {
type Response = TcpStream;
type Response = io::ScopedIo<TcpStream>;
type Error = io::Error;
type Future =
Pin<Box<dyn Future<Output = Result<TcpStream, io::Error>> + Send + Sync + 'static>>;
Pin<Box<dyn Future<Output = io::Result<io::ScopedIo<TcpStream>>> + Send + Sync + 'static>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
Expand All @@ -37,7 +38,7 @@ impl<T: Into<SocketAddr>> tower::Service<T> for ConnectTcp {
?keepalive,
"Connected",
);
Ok(io)
Ok(io::ScopedIo::client(io))
})
}
}
2 changes: 1 addition & 1 deletion linkerd/tls/tests/tls_accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ fn run_test<C, CF, CR, S, SF, SR>(
) -> (Transported<CR>, Transported<SR>)
where
// Client
C: FnOnce(tls::client::Io<TcpStream>) -> CF + Clone + Send + 'static,
C: FnOnce(tls::client::Io<io::ScopedIo<TcpStream>>) -> CF + Clone + Send + 'static,
CF: Future<Output = Result<CR, io::Error>> + Send + 'static,
CR: Send + 'static,
// Server
Expand Down

0 comments on commit 1e98a5b

Please sign in to comment.