Skip to content

Commit

Permalink
Fix connection shutdown logic
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Nov 23, 2022
1 parent 17f08ff commit 523aafe
Showing 1 changed file with 30 additions and 19 deletions.
49 changes: 30 additions & 19 deletions shotover-proxy/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ use crate::tls::TlsAcceptor;
use crate::transforms::chain::TransformChain;
use crate::transforms::Wrapper;
use anyhow::{anyhow, Context, Result};
use futures::future::join_all;
use futures::{SinkExt, StreamExt};
use metrics::{register_gauge, Gauge};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::{mpsc, watch, Semaphore};
use tokio::task::JoinHandle;
use tokio::time;
use tokio::time::timeout;
use tokio::time::Duration;
Expand Down Expand Up @@ -94,6 +96,8 @@ pub struct TcpCodecListener<C: Codec> {

/// Timeout in seconds after which to kill an idle connection. No timeout means connections will never be timed out.
timeout: Option<u64>,

connection_handles: Vec<JoinHandle<()>>,
}

impl<C: Codec + 'static> TcpCodecListener<C> {
Expand Down Expand Up @@ -128,6 +132,7 @@ impl<C: Codec + 'static> TcpCodecListener<C> {
connection_count: 0,
available_connections_gauge,
timeout,
connection_handles: vec![],
})
}

Expand Down Expand Up @@ -234,7 +239,7 @@ impl<C: Codec + 'static> TcpCodecListener<C> {
self.connection_count = self.connection_count.wrapping_add(1);

// Spawn a new task to process the connections.
tokio::spawn(
self.connection_handles.push(tokio::spawn(
async move {
tracing::debug!("New connection from {}", handler.conn_details);

Expand All @@ -251,28 +256,15 @@ impl<C: Codec + 'static> TcpCodecListener<C> {
id = self.connection_count,
source = self.source_name.as_str()
)),
);
));
if self.connection_count % 1000 == 0 {
self.connection_handles.retain(|x| x.is_finished());
}
}
}

pub async fn shutdown(&mut self) {
match self
.chain
.process_request(
Wrapper::flush_with_chain_name(self.chain.name.clone()),
"".into(),
)
.await
{
Ok(_) => info!("source {} was shutdown", self.source_name),
Err(e) => error!(
"{:?}",
e.context(format!(
"source {} encountered an error when flushing the chain for shutdown",
self.source_name,
))
),
}
join_all(&mut self.connection_handles).await;
}

/// Accept an inbound connection.
Expand Down Expand Up @@ -573,6 +565,25 @@ impl<C: Codec + 'static> Handler<C> {
// send the result of the process up stream
out_tx.send(modified_messages)?;
}

match self
.chain
.process_request(
Wrapper::flush_with_chain_name(self.chain.name.clone()),
self.client_details.clone(),
)
.await
{
Ok(_) => {}
Err(e) => error!(
"{:?}",
e.context(format!(
"encountered an error when flushing the chain {} for shutdown",
self.chain.name,
))
),
}

Ok(())
}
}
Expand Down

0 comments on commit 523aafe

Please sign in to comment.