Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Annotate socket-level errors with a scope #852

Merged
merged 10 commits into from
Jan 19, 2021
9 changes: 7 additions & 2 deletions linkerd/app/core/src/serve.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::io;
use crate::svc;
use futures::prelude::*;
use linkerd_error::Error;
Expand All @@ -18,7 +19,7 @@ pub async fn serve<M, A, I>(
where
I: Send + 'static,
M: svc::NewService<Addrs, Service = A>,
A: tower::Service<I, Response = ()> + Send + 'static,
A: tower::Service<io::ScopedIo<I>, Response = ()> + Send + 'static,
A::Error: Into<Error>,
A::Future: Send + 'static,
{
Expand All @@ -45,7 +46,11 @@ where
async move {
match accept.ready_oneshot().err_into::<Error>().await {
Ok(mut accept) => {
match accept.call(io).err_into::<Error>().await {
match accept
.call(io::ScopedIo::server(io))
.err_into::<Error>()
.await
{
Ok(()) => debug!("Connection closed"),
Err(error) => info!(%error, "Connection closed"),
}
Expand Down
14 changes: 7 additions & 7 deletions linkerd/app/outbound/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub fn accept_stack<P, C, T, TSvc, H, HSvc, I>(
drain: drain::Watch,
) -> impl svc::NewService<
tcp::Accept,
Service = impl svc::Service<SensorIo<I>, Response = (), Error = Error, Future = impl Send>,
Service = impl svc::Service<I, Response = (), Error = Error, Future = impl Send>,
> + Clone
where
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + std::fmt::Debug + Send + Unpin + 'static,
Expand All @@ -120,14 +120,14 @@ where
C::Error: Into<Error>,
C::Future: Send,
T: svc::NewService<tcp::Concrete, Service = TSvc> + Clone + Send + 'static,
TSvc: svc::Service<io::PrefixedIo<SensorIo<I>>, Response = ()>
+ svc::Service<SensorIo<I>, Response = ()>
TSvc: svc::Service<io::PrefixedIo<I>, Response = ()>
+ svc::Service<I, Response = ()>
+ Send
+ 'static,
<TSvc as svc::Service<SensorIo<I>>>::Error: Into<Error>,
<TSvc as svc::Service<SensorIo<I>>>::Future: Send,
<TSvc as svc::Service<io::PrefixedIo<SensorIo<I>>>>::Error: Into<Error>,
<TSvc as svc::Service<io::PrefixedIo<SensorIo<I>>>>::Future: Send,
<TSvc as svc::Service<I>>::Error: Into<Error>,
<TSvc as svc::Service<I>>::Future: Send,
<TSvc as svc::Service<io::PrefixedIo<I>>>::Error: Into<Error>,
<TSvc as svc::Service<io::PrefixedIo<I>>>::Future: Send,
H: svc::NewService<http::Logical, Service = HSvc> + Clone + Send + 'static,
HSvc: svc::Service<http::Request<http::BoxBody>, Response = http::Response<http::BoxBody>>
+ Send
Expand Down
2 changes: 2 additions & 0 deletions linkerd/io/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
mod boxed;
mod either;
mod prefixed;
mod scoped;
mod sensor;

pub use self::{
boxed::BoxedIo,
either::EitherIo,
prefixed::PrefixedIo,
scoped::ScopedIo,
sensor::{Sensor, SensorIo},
};
pub use std::io::*;
Expand Down
132 changes: 132 additions & 0 deletions linkerd/io/src/scoped.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use crate as io;
use pin_project::pin_project;
use std::{pin::Pin, task::Context};

/// An I/O stream where errors are annotated a scope.
#[pin_project]
#[derive(Debug)]
pub struct ScopedIo<I> {
scope: Scope,

#[pin]
io: I,
}

#[derive(Copy, Clone, Debug)]
enum Scope {
Client,
Server,
}

// === impl Scope ===

impl Scope {
#[inline]
fn err(&self) -> impl Fn(io::Error) -> io::Error {
let scope = *self;
move |err| io::Error::new(err.kind(), format!("{}: {}", scope, err))
}
}

impl std::fmt::Display for Scope {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Client => write!(f, "client"),
Self::Server => write!(f, "server"),
}
}
}

// === impl ScopedIo ===

impl<I> ScopedIo<I> {
pub fn client(io: I) -> Self {
Self {
scope: Scope::Client,
io,
}
}

pub fn server(io: I) -> Self {
Self {
scope: Scope::Server,
io,
}
}
}

#[async_trait::async_trait]
impl<I: io::Peek + Send + Sync> io::Peek for ScopedIo<I> {
async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.io.peek(buf).await.map_err(self.scope.err())
}
}

impl<I: io::PeerAddr> io::PeerAddr for ScopedIo<I> {
#[inline]
fn peer_addr(&self) -> io::Result<std::net::SocketAddr> {
self.io.peer_addr().map_err(self.scope.err())
}
}

impl<I: io::AsyncRead> io::AsyncRead for ScopedIo<I> {
#[inline]
fn poll_read(
olix0r marked this conversation as resolved.
Show resolved Hide resolved
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut io::ReadBuf<'_>,
) -> io::Poll<()> {
let this = self.project();
this.io.poll_read(cx, buf).map_err(this.scope.err())
}
}

impl<I: io::Write> io::Write for ScopedIo<I> {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.io.write(buf).map_err(self.scope.err())
}

#[inline]
fn flush(&mut self) -> io::Result<()> {
self.io.flush().map_err(self.scope.err())
}
}

impl<I: io::AsyncWrite> io::AsyncWrite for ScopedIo<I> {
#[inline]
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
let this = self.project();
this.io.poll_shutdown(cx).map_err(this.scope.err())
}

#[inline]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
let this = self.project();
this.io.poll_flush(cx).map_err(this.scope.err())
}

#[inline]
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> io::Poll<usize> {
let this = self.project();
let scope = this.scope;
this.io.poll_write(cx, buf).map_err(scope.err())
}

#[inline]
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> io::Poll<usize> {
let this = self.project();
this.io
.poll_write_vectored(cx, bufs)
.map_err(this.scope.err())
}

#[inline]
fn is_write_vectored(&self) -> bool {
self.io.is_write_vectored()
}
}
4 changes: 2 additions & 2 deletions linkerd/proxy/tap/src/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl AcceptPermittedClients {
}
}

impl<T> Service<Connection<T, TcpStream>> for AcceptPermittedClients {
impl<T> Service<Connection<T, io::ScopedIo<TcpStream>>> for AcceptPermittedClients {
type Response = ServeFuture;
type Error = Error;
type Future = future::Ready<Result<Self::Response, Self::Error>>;
Expand All @@ -73,7 +73,7 @@ impl<T> Service<Connection<T, TcpStream>> for AcceptPermittedClients {
Poll::Ready(Ok(()))
}

fn call(&mut self, ((peer_id, _), io): Connection<T, TcpStream>) -> Self::Future {
fn call(&mut self, ((peer_id, _), io): Connection<T, io::ScopedIo<TcpStream>>) -> Self::Future {
future::ok(match peer_id {
Conditional::Some(ref peer) => {
if self.permitted_client_ids.contains(peer) {
Expand Down
9 changes: 5 additions & 4 deletions linkerd/proxy/transport/src/connect.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use linkerd_io as io;
use std::task::{Context, Poll};
use std::{future::Future, io, net::SocketAddr, pin::Pin, time::Duration};
use std::{future::Future, net::SocketAddr, pin::Pin, time::Duration};
use tokio::net::TcpStream;
use tracing::debug;

Expand All @@ -15,10 +16,10 @@ impl ConnectTcp {
}

impl<T: Into<SocketAddr>> tower::Service<T> for ConnectTcp {
type Response = TcpStream;
type Response = io::ScopedIo<TcpStream>;
type Error = io::Error;
type Future =
Pin<Box<dyn Future<Output = Result<TcpStream, io::Error>> + Send + Sync + 'static>>;
Pin<Box<dyn Future<Output = io::Result<io::ScopedIo<TcpStream>>> + Send + Sync + 'static>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
Expand All @@ -37,7 +38,7 @@ impl<T: Into<SocketAddr>> tower::Service<T> for ConnectTcp {
?keepalive,
"Connected",
);
Ok(io)
Ok(io::ScopedIo::client(io))
})
}
}
2 changes: 1 addition & 1 deletion linkerd/tls/tests/tls_accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ fn run_test<C, CF, CR, S, SF, SR>(
) -> (Transported<CR>, Transported<SR>)
where
// Client
C: FnOnce(tls::client::Io<TcpStream>) -> CF + Clone + Send + 'static,
C: FnOnce(tls::client::Io<io::ScopedIo<TcpStream>>) -> CF + Clone + Send + 'static,
CF: Future<Output = Result<CR, io::Error>> + Send + 'static,
CR: Send + 'static,
// Server
Expand Down