Skip to content

Commit

Permalink
transport: Improve TCP server error messages (#1372)
Browse files Browse the repository at this point in the history
The TCP server can fail connections in a few situations:

* `accept(2)` can fail
* setting thte TCP keepalive can fail
* getting the client peer address can fail

It's not easy to differentiate these errors when inspecting proxy logs.

This change adds wrapper error types with custom error messages that
disambiguate the error cause and updates the `Bind` trait to use boxed
error types (instead of `io::Error`).

Additionally, this change includes the client address when logging
information/warnings about connection errors in general. This is
redundant when logging is set at debug, but will be helpful when logging
is set at the normal level.

(cherry picked from commit 6dd16e9)
Signed-off-by: Oliver Gould <ver@buoyant.io>
  • Loading branch information
olix0r committed Mar 30, 2022
1 parent 38d9aed commit ac79745
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 26 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1221,9 +1221,11 @@ version = "0.1.0"
dependencies = [
"futures",
"libc",
"linkerd-error",
"linkerd-io",
"linkerd-stack",
"socket2 0.4.2",
"thiserror",
"tokio",
"tokio-stream",
"tracing",
Expand Down
12 changes: 8 additions & 4 deletions linkerd/app/core/src/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{
io,
svc::{self, Param},
transport::{ClientAddr, Remote},
Result,
};
use futures::prelude::*;
use linkerd_error::Error;
Expand All @@ -12,7 +13,7 @@ use tracing::{debug, debug_span, info, instrument::Instrument, warn};
///
/// The task is driven until shutdown is signaled.
pub async fn serve<M, S, I, A>(
listen: impl Stream<Item = std::io::Result<(A, I)>>,
listen: impl Stream<Item = Result<(A, I)>>,
new_accept: M,
shutdown: impl Future,
) where
Expand All @@ -39,7 +40,8 @@ pub async fn serve<M, S, I, A>(
};

// The local addr should be instrumented from the listener's context.
let span = debug_span!("accept", client.addr = %addrs.param()).entered();
let Remote(ClientAddr(client_addr)) = addrs.param();
let span = debug_span!("accept", client.addr = %client_addr).entered();
let accept = new_accept.new_service(addrs);

// Dispatch all of the work for a given connection onto a
Expand All @@ -57,15 +59,17 @@ pub async fn serve<M, S, I, A>(
Err(reason) if is_io(&*reason) => {
debug!(%reason, "Connection closed")
}
Err(error) => info!(%error, "Connection closed"),
Err(error) => {
info!(%error, client.addr = %client_addr, "Connection closed")
}
}
// Hold the service until the connection is complete. This
// helps tie any inner cache lifetimes to the services they
// return.
drop(accept);
}
Err(error) => {
warn!(%error, "Server failed to become ready");
warn!(%error, client.addr = %client_addr, "Server failed to become ready");
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/inbound/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use futures::Stream;
use linkerd_app_core::{
dns, io, metrics, profiles, serve, svc,
transport::{self, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr},
Error,
Error, Result,
};
use std::fmt::Debug;
use tracing::debug_span;
Expand All @@ -30,7 +30,7 @@ impl Inbound<()> {
pub async fn serve<A, I, G, GSvc, P>(
self,
addr: Local<ServerAddr>,
listen: impl Stream<Item = io::Result<(A, I)>> + Send + Sync + 'static,
listen: impl Stream<Item = Result<(A, I)>> + Send + Sync + 'static,
policies: impl policy::CheckPolicy + Clone + Send + Sync + 'static,
profiles: P,
gateway: G,
Expand Down
15 changes: 6 additions & 9 deletions linkerd/app/integration/src/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use super::*;
use app_core::transport::OrigDstAddr;
use linkerd_app_core::{
svc::Param,
transport::OrigDstAddr,
transport::{listen, orig_dst, Keepalive, ListenAddr},
Result,
};
use std::{fmt, future::Future, net::SocketAddr, pin::Pin, task::Poll, thread};
use tokio::net::TcpStream;
Expand Down Expand Up @@ -65,22 +66,18 @@ where
{
type Addrs = orig_dst::Addrs;
type Io = tokio::net::TcpStream;
type Incoming = Pin<
Box<dyn Stream<Item = io::Result<(orig_dst::Addrs, TcpStream)>> + Send + Sync + 'static>,
>;
type Incoming =
Pin<Box<dyn Stream<Item = Result<(orig_dst::Addrs, TcpStream)>> + Send + Sync + 'static>>;

fn bind(self, params: &T) -> io::Result<listen::Bound<Self::Incoming>> {
fn bind(self, params: &T) -> Result<listen::Bound<Self::Incoming>> {
let (bound, incoming) = listen::BindTcp::default().bind(params)?;
let incoming = Box::pin(incoming.map(move |res| {
let (inner, tcp) = res?;
let orig_dst = match self {
Self::Addr(addr) => OrigDstAddr(addr),
Self::Direct => OrigDstAddr(inner.server.into()),
Self::None => {
return Err(io::Error::new(
io::ErrorKind::Other,
"No mocked SO_ORIG_DST",
))
return Err("No mocked SO_ORIG_DST".into());
}
};
let addrs = orig_dst::Addrs { inner, orig_dst };
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use linkerd_app_core::{
svc::{self, stack::Param},
tls,
transport::{self, addrs::*},
AddrMatch, Error, ProxyRuntime,
AddrMatch, Error, ProxyRuntime, Result,
};
use std::{
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -154,7 +154,7 @@ impl<S> Outbound<S> {
impl Outbound<()> {
pub async fn serve<A, I, P, R>(
self,
listen: impl Stream<Item = io::Result<(A, I)>> + Send + Sync + 'static,
listen: impl Stream<Item = Result<(A, I)>> + Send + Sync + 'static,
profiles: P,
resolve: R,
) where
Expand Down
2 changes: 2 additions & 0 deletions linkerd/proxy/transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ Transport-level implementations that rely on core proxy infrastructure

[dependencies]
futures = { version = "0.3", default-features = false }
linkerd-error = { path = "../../error" }
linkerd-io = { path = "../../io" }
linkerd-stack = { path = "../../stack" }
socket2 = "0.4"
thiserror = "1"
tokio = { version = "1", features = ["macros", "net"] }
tokio-stream = { version = "0.1", features = ["net"] }
tracing = "0.1.29"
Expand Down
28 changes: 21 additions & 7 deletions linkerd/proxy/transport/src/listen.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::{addrs::*, Keepalive};
use futures::prelude::*;
use linkerd_error::Result;
use linkerd_io as io;
use linkerd_stack::Param;
use std::{fmt, pin::Pin};
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_stream::wrappers::TcpListenerStream;

Expand All @@ -21,9 +23,9 @@ pub trait Bind<T> {
+ Sync
+ 'static;
type Addrs: Clone + Send + Sync + 'static;
type Incoming: Stream<Item = io::Result<(Self::Addrs, Self::Io)>> + Send + Sync + 'static;
type Incoming: Stream<Item = Result<(Self::Addrs, Self::Io)>> + Send + Sync + 'static;

fn bind(self, params: &T) -> io::Result<Bound<Self::Incoming>>;
fn bind(self, params: &T) -> Result<Bound<Self::Incoming>>;
}

pub type Bound<I> = (Local<ServerAddr>, I);
Expand All @@ -37,6 +39,18 @@ pub struct Addrs {
pub client: Remote<ClientAddr>,
}

#[derive(Debug, Error)]
#[error("failed to accept socket: {0}")]
struct AcceptError(#[source] io::Error);

#[derive(Debug, Error)]
#[error("failed to set TCP keepalive: {0}")]
struct KeepaliveError(#[source] io::Error);

#[derive(Debug, Error)]
#[error("failed to obtain peer address: {0}")]
struct PeerAddrError(#[source] io::Error);

// === impl BindTcp ===

impl BindTcp {
Expand All @@ -50,10 +64,10 @@ where
T: Param<ListenAddr> + Param<Keepalive>,
{
type Addrs = Addrs;
type Incoming = Pin<Box<dyn Stream<Item = io::Result<(Self::Addrs, Self::Io)>> + Send + Sync>>;
type Incoming = Pin<Box<dyn Stream<Item = Result<(Self::Addrs, Self::Io)>> + Send + Sync>>;
type Io = TcpStream;

fn bind(self, params: &T) -> io::Result<Bound<Self::Incoming>> {
fn bind(self, params: &T) -> Result<Bound<Self::Incoming>> {
let listen = {
let ListenAddr(addr) = params.param();
let l = std::net::TcpListener::bind(addr)?;
Expand All @@ -64,10 +78,10 @@ where
let server = Local(ServerAddr(listen.local_addr()?));
let Keepalive(keepalive) = params.param();
let accept = TcpListenerStream::new(listen).map(move |res| {
let tcp = res?;
let tcp = res.map_err(AcceptError)?;
super::set_nodelay_or_warn(&tcp);
let tcp = super::set_keepalive_or_warn(tcp, keepalive)?;
let client = Remote(ClientAddr(tcp.peer_addr()?));
let tcp = super::set_keepalive_or_warn(tcp, keepalive).map_err(KeepaliveError)?;
let client = Remote(ClientAddr(tcp.peer_addr().map_err(PeerAddrError)?));
Ok((Addrs { server, client }, tcp))
});

Expand Down
5 changes: 3 additions & 2 deletions linkerd/proxy/transport/src/orig_dst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{
listen::{self, Bind, Bound},
};
use futures::prelude::*;
use linkerd_error::Result;
use linkerd_io as io;
use linkerd_stack::Param;
use std::pin::Pin;
Expand Down Expand Up @@ -60,9 +61,9 @@ where
type Addrs = Addrs<B::Addrs>;
type Io = TcpStream;
type Incoming =
Pin<Box<dyn Stream<Item = io::Result<(Self::Addrs, TcpStream)>> + Send + Sync + 'static>>;
Pin<Box<dyn Stream<Item = Result<(Self::Addrs, TcpStream)>> + Send + Sync + 'static>>;

fn bind(self, t: &T) -> io::Result<Bound<Self::Incoming>> {
fn bind(self, t: &T) -> Result<Bound<Self::Incoming>> {
let (addr, incoming) = self.inner.bind(t)?;

let incoming = incoming.map(|res| {
Expand Down

0 comments on commit ac79745

Please sign in to comment.