Skip to content

Commit

Permalink
Implement outbound drain (#898)
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Leggett <benjamin.leggett@solo.io>
  • Loading branch information
bleggett authored Apr 9, 2024
1 parent 625b6e1 commit fa7b1aa
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 24 deletions.
8 changes: 7 additions & 1 deletion src/proxy/inbound_passthrough.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ impl InboundPassthrough {
}

pub(super) async fn run(self) {
let inner_drain = self.drain.clone();
let accept = async move {
loop {
// Asynchronously wait for an inbound socket.
let socket = self.listener.accept().await;
let pi = self.pi.clone();
let stream_drain = inner_drain.clone();

let connection_manager = self.pi.connection_manager.clone();
match socket {
Expand All @@ -77,6 +79,7 @@ impl InboundPassthrough {
socket::to_canonical(remote),
stream,
connection_manager,
stream_drain,
)
.await
{
Expand Down Expand Up @@ -109,6 +112,7 @@ impl InboundPassthrough {
source: SocketAddr,
mut inbound: TcpStream,
connection_manager: ConnectionManager,
outbound_conn_drain: Watch,
) -> Result<(), Error> {
let orig = socket::orig_dst_addr_or_default(&inbound);
// Check if it is a recursive call when proxy mode is Node.
Expand Down Expand Up @@ -138,7 +142,9 @@ impl InboundPassthrough {
// Spoofing the source IP only works when the destination or the source are on our node.
// In this case, the source and the destination might both be remote, so we need to disable it.
oc.pi.cfg.enable_original_source = Some(false);
return oc.proxy_to(inbound, source, orig, false).await;
return oc
.proxy_to_cancellable(inbound, source, orig, false, Some(outbound_conn_drain))
.await;
}

// We enforce RBAC only for non-hairpin cases. This is because we may not be able to properly
Expand Down
113 changes: 99 additions & 14 deletions src/proxy/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::time::Instant;
use bytes::Bytes;
use drain::Watch;
use http_body_util::Empty;
use hyper::client::conn::http2;
use hyper::header::FORWARDED;
use hyper::StatusCode;

Expand Down Expand Up @@ -74,11 +75,19 @@ impl Outbound {
}

pub(super) async fn run(self) {
// Since we are spawning autonomous tasks to handle outbound connections for a single workload,
// we can have situations where the workload is deleted, but a task is still "stuck"
// waiting for a server response stream on a HTTP/2 connection or whatnot.
//
// So use a drain to nuke tasks that might be stuck sending.
let (sub_drain_signal, sub_drain) = drain::channel();
let accept = async move {
loop {
// Asynchronously wait for an inbound socket.
let socket = self.listener.accept().await;
let start_outbound_instant = Instant::now();
let outbound_drain = sub_drain.clone();
let outer_conn_drain = sub_drain.clone();
match socket {
Ok((stream, _remote)) => {
let mut oc = OutboundConnection {
Expand All @@ -88,11 +97,20 @@ impl Outbound {
let span = info_span!("outbound", id=%oc.id);
tokio::spawn(
(async move {
let res = oc.proxy(stream).await;
match res {
Ok(_) => info!(dur=?start_outbound_instant.elapsed(), "complete"),
Err(e) => warn!(dur=?start_outbound_instant.elapsed(), err=%e, "failed")
};
debug!(dur=?start_outbound_instant.elapsed(), id=%oc.id, "outbound spawn START");
// Since this task is spawned, make sure we are guaranteed to terminate
tokio::select! {
_ = outbound_drain.signaled() => {
debug!("outbound drain signaled");
}
res = oc.proxy(stream, outer_conn_drain.clone()) => {
match res {
Ok(_) => info!(dur=?start_outbound_instant.elapsed(), "complete"),
Err(e) => warn!(dur=?start_outbound_instant.elapsed(), err=%e, "failed")
};
}
}
debug!(dur=?start_outbound_instant.elapsed(), id=%oc.id, "outbound spawn DONE");
})
.instrument(span),
);
Expand All @@ -113,6 +131,8 @@ impl Outbound {
tokio::select! {
res = accept => { res }
_ = self.drain.signaled() => {
debug!("outbound drained, dropping any outbound connections");
sub_drain_signal.drain().await;
info!("outbound drained");
}
}
Expand All @@ -125,18 +145,53 @@ pub(super) struct OutboundConnection {
}

impl OutboundConnection {
async fn proxy(&mut self, stream: TcpStream) -> Result<(), Error> {
async fn proxy(&mut self, stream: TcpStream, outer_conn_drain: Watch) -> Result<(), Error> {
let peer = socket::to_canonical(stream.peer_addr().expect("must receive peer addr"));
let orig_dst_addr = socket::orig_dst_addr_or_default(&stream);
self.proxy_to(stream, peer, orig_dst_addr, false).await
self.proxy_to(stream, peer, orig_dst_addr, false, Some(outer_conn_drain))
.await
}

pub async fn proxy_to(
// this is a cancellable outbound proxy. If `out_drain` is a Watch drain, will resolve
// when the drain is signaled, or the outbound stream is completed, no matter what.
//
// If `out_drain` is none, will only resolve when the outbound stream is terminated.
//
// If using `proxy_to` in `tokio::spawn` tasks, it is recommended to use a drain, to guarantee termination
// and prevent "zombie" outbound tasks.
pub async fn proxy_to_cancellable(
&mut self,
stream: TcpStream,
remote_addr: SocketAddr,
orig_dst_addr: SocketAddr,
block_passthrough: bool,
out_drain: Option<Watch>,
) -> Result<(), Error> {
match out_drain {
Some(drain) => {
let outer_conn_drain = drain.clone();
tokio::select! {
_ = drain.signaled() => {
info!("socks drain signaled");
Ok(())
}
res = self.proxy_to(stream, remote_addr, orig_dst_addr, block_passthrough, Some(outer_conn_drain)) => res
}
}
None => {
self.proxy_to(stream, remote_addr, orig_dst_addr, block_passthrough, None)
.await
}
}
}

async fn proxy_to(
&mut self,
mut stream: TcpStream,
remote_addr: SocketAddr,
orig_dst_addr: SocketAddr,
block_passthrough: bool,
outer_conn_drain: Option<Watch>,
) -> Result<(), Error> {
if self.pi.cfg.proxy_mode == ProxyMode::Shared
&& Some(orig_dst_addr.ip()) == self.pi.cfg.local_ip
Expand Down Expand Up @@ -272,8 +327,7 @@ impl OutboundConnection {
// Setup our connection future. This won't always run if we have an existing connection
// in the pool.
let connect = async {
let mut builder =
hyper::client::conn::http2::Builder::new(hyper_util::TokioExecutor);
let mut builder = http2::Builder::new(hyper_util::TokioExecutor);
let builder = builder
.initial_stream_window_size(self.pi.cfg.window_size)
.max_frame_size(self.pi.cfg.frame_size)
Expand All @@ -300,12 +354,38 @@ impl OutboundConnection {
.handshake(::hyper_util::rt::TokioIo::new(tls_stream))
.await
.map_err(Error::HttpHandshake)?;

// spawn a task to poll the connection and drive the HTTP state
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("Error in HBONE connection handshake: {:?}", e);
// if we got a drain for that connection, respect it in a race
match outer_conn_drain {
Some(conn_drain) => {
tokio::spawn(async move {
tokio::select! {
_ = conn_drain.signaled() => {
debug!("draining outer HBONE connection");
}
res = connection=> {
match res {
Err(e) => {
error!("Error in HBONE connection handshake: {:?}", e);
}
Ok(_) => {
debug!("done with HBONE connection handshake: {:?}", res);
}
}
}
}
});
}
});
None => {
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("Error in HBONE connection handshake: {:?}", e);
}
});
}
}

Ok(request_sender)
};
let mut connection = self.pi.pool.connect(pool_key.clone(), connect).await?;
Expand All @@ -326,7 +406,12 @@ impl OutboundConnection {
.body(Empty::<Bytes>::new())
.expect("builder with known status code should not fail");

debug!("outbound - connection send START");
// There are scenarios (upstream hangup, etc) where this "send" will simply get stuck.
// As in, stream processing deadlocks, and `send_request` never resolves to anything.
// Probably related to https://github.com/hyperium/hyper/issues/3623
let response = connection.send_request(request).await?;
debug!("outbound - connection send END");

let code = response.status();
if code != 200 {
Expand Down
18 changes: 12 additions & 6 deletions src/proxy/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,18 @@ impl Pool {
request_sender
}
// Connect won, checkout can just be dropped.
Either::Right((Err(err), checkout)) => match err {
// Connect won but we already had an in-flight connection, so use that.
Error::PoolAlreadyConnecting => checkout.await?,
// Some other connection error
err => return Err(err),
},
Either::Right((Err(err), checkout)) => {
debug!(
?key,
"connect won, but wait for existing pooled connection to establish"
);
match err {
// Connect won but we already had an in-flight connection, so use that.
Error::PoolAlreadyConnecting => checkout.await?,
// Some other connection error
err => return Err(err),
}
}
};

Ok(Connection(request_sender))
Expand Down
24 changes: 21 additions & 3 deletions src/proxy/socks5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,13 @@ impl Socks5 {
}

pub async fn run(self) {
let inner_drain = self.drain.clone();
let accept = async move {
loop {
// Asynchronously wait for an inbound socket.
let socket = self.listener.accept().await;
let inpod = self.pi.cfg.inpod_enabled;
let stream_drain = inner_drain.clone();
match socket {
Ok((stream, remote)) => {
info!("accepted outbound connection from {}", remote);
Expand All @@ -69,7 +72,7 @@ impl Socks5 {
id: TraceParent::new(),
};
tokio::spawn(async move {
if let Err(err) = handle(oc, stream).await {
if let Err(err) = handle(oc, stream, stream_drain, inpod).await {
log::error!("handshake error: {}", err);
}
});
Expand All @@ -87,6 +90,7 @@ impl Socks5 {
tokio::select! {
res = accept => { res }
_ = self.drain.signaled() => {
// out_drain_signal.drain().await;
info!("socks5 drained");
}
}
Expand All @@ -97,7 +101,12 @@ impl Socks5 {
// sufficient to integrate with common clients:
// - only unauthenticated requests
// - only CONNECT, with IPv4 or IPv6
async fn handle(mut oc: OutboundConnection, mut stream: TcpStream) -> Result<(), anyhow::Error> {
async fn handle(
mut oc: OutboundConnection,
mut stream: TcpStream,
out_drain: Watch,
is_inpod: bool,
) -> Result<(), anyhow::Error> {
// Version(5), Number of auth methods
let mut version = [0u8; 2];
stream.read_exact(&mut version).await?;
Expand Down Expand Up @@ -189,8 +198,17 @@ async fn handle(mut oc: OutboundConnection, mut stream: TcpStream) -> Result<(),
stream.write_all(&buf).await?;

info!("accepted connection from {remote_addr} to {host}");
// For inpod, we want this `spawn` to guaranteed-terminate when we drain - the workload is gone.
// For non-inpod (shared instance for all workloads), let the spawned task run until the proxy process
// itself is killed, or the connection terminates normally.
tokio::spawn(async move {
let res = oc.proxy_to(stream, remote_addr, host, true).await;
let drain = match is_inpod {
true => Some(out_drain),
false => None,
};
let res = oc
.proxy_to_cancellable(stream, remote_addr, host, true, drain)
.await;
match res {
Ok(_) => {}
Err(ref e) => warn!("outbound proxy failed: {}", e),
Expand Down

0 comments on commit fa7b1aa

Please sign in to comment.