diff --git a/shotover-proxy/Cargo.toml b/shotover-proxy/Cargo.toml index 004a8ed43..df233537c 100644 --- a/shotover-proxy/Cargo.toml +++ b/shotover-proxy/Cargo.toml @@ -12,7 +12,6 @@ tokio-util = { version = "0.6.1", features = ["full"]} tokio-stream = "0.1.2" bytes = "1.0.0" futures = "0.3.12" -#futures = { path = "/home/ben/git/futures-rs/futures" } futures-core = "0.3.1" chrono = {version = "0.4.10", features = ["serde"]} async-trait = "0.1.30" @@ -65,7 +64,6 @@ mlua_serde = { version = "0.5", features = ["lua53"] } #wasm wasmer-runtime = "0.17.1" -#wasmer-runtime = "0.13.1" #Crypto sodiumoxide = "0.2.5" diff --git a/shotover-proxy/src/server.rs b/shotover-proxy/src/server.rs index 639307e69..dcd6c5b0e 100644 --- a/shotover-proxy/src/server.rs +++ b/shotover-proxy/src/server.rs @@ -111,8 +111,6 @@ impl TcpCodecListener { // "forget" the permit, which drops the permit value **without** // incrementing the semaphore's permits. Then, in the handler task // we manually add a new permit when processing completes. - // self.limit_connections.acquire().await.forget(); - if self.hard_connection_limit { match self.limit_connections.try_acquire() { Ok(p) => { @@ -170,7 +168,6 @@ impl TcpCodecListener { let mut handler = Handler { // Get a handle to the shared database. Internally, this is an // `Arc`, so a clone only increments the ref count. - // chain: self.chain.clone(), chain: self.chain.clone(), client_details: peer, conn_details: conn_string, @@ -193,7 +190,6 @@ impl TcpCodecListener { // dropped. _shutdown_complete: self.shutdown_complete_tx.clone(), }; - // let chain = self.chain.clone(); // Spawn a new task to process the connections. Tokio tasks are like // asynchronous green threads and are executed concurrently. @@ -300,7 +296,7 @@ impl Handler { pub async fn run(&mut self, stream: TcpStream) -> Result<()> { // As long as the shutdown signal has not been received, try to read a // new request frame. - let mut idle_time: u64 = 1; + let mut idle_time_seconds: u64 = 1; let (in_tx, mut in_rx) = tokio::sync::mpsc::unbounded_channel::(); let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::(); @@ -331,30 +327,26 @@ impl Handler { }); while !self.shutdown.is_shutdown() { - // While reading a request frame, also listen for the shutdown - // signal - + // While reading a request frame, also listen for the shutdown signal trace!("Waiting for message"); let frame = tokio::select! { - res = timeout(Duration::from_secs(idle_time) , in_rx.recv()) => { + res = timeout(Duration::from_secs(idle_time_seconds) , in_rx.recv()) => { match res { Ok(maybe_message) => { - idle_time = 1; + idle_time_seconds = 1; match maybe_message { Some(m) => m, None => return Ok(()) } }, Err(_) => { - match idle_time { - 0..=10 => trace!("Connection Idle for more than {} seconds {}", idle_time, self.conn_details), - 11..=35 => trace!("Connection Idle for more than {} seconds {}", idle_time, self.conn_details), - _ => { - debug!("Dropping Connection Idle for more than {} seconds {}", idle_time, self.conn_details); - return Ok(()) - } + if idle_time_seconds < 35 { + trace!("Connection Idle for more than {} seconds {}", idle_time_seconds, self.conn_details); + } else { + debug!("Dropping. Connection Idle for more than {} seconds {}", idle_time_seconds, self.conn_details); + return Ok(()); } - idle_time *= 2; + idle_time_seconds *= 2; continue } } @@ -389,16 +381,6 @@ impl Handler { return Ok(()); } } - - // match frame { - // Ok(message) => { - // - // } - // Err(e) => { - // trace!("Error handling message in TcpStream source: {:?}", e); - // return Ok(()); - // } - // } } Ok(()) @@ -462,7 +444,7 @@ impl Shutdown { } // Cannot receive a "lag error" as only one value is ever sent. - let _ = self.notify.recv().await; + self.notify.recv().await.unwrap(); // Remember that the signal has been received. self.shutdown = true; diff --git a/shotover-proxy/src/sources/cassandra_source.rs b/shotover-proxy/src/sources/cassandra_source.rs index 7b666602d..e54388cff 100644 --- a/shotover-proxy/src/sources/cassandra_source.rs +++ b/shotover-proxy/src/sources/cassandra_source.rs @@ -57,7 +57,6 @@ pub struct CassandraSource { } impl CassandraSource { - //"127.0.0.1:9043 pub async fn new( chain: &TransformChain, listen_addr: String, @@ -68,7 +67,6 @@ impl CassandraSource { connection_limit: Option, hard_connection_limit: Option, ) -> CassandraSource { - // let listener = TcpListener::bind(listen_addr.clone()).await.unwrap(); let name = "Cassandra Source"; info!("Starting Cassandra source on [{}]", listen_addr); @@ -97,8 +95,6 @@ impl CassandraSource { } } - // listener.run().await?; - let TcpCodecListener { notify_shutdown, shutdown_complete_tx, diff --git a/shotover-proxy/src/sources/redis_source.rs b/shotover-proxy/src/sources/redis_source.rs index 75821a20a..8ac4938d3 100644 --- a/shotover-proxy/src/sources/redis_source.rs +++ b/shotover-proxy/src/sources/redis_source.rs @@ -54,7 +54,6 @@ pub struct RedisSource { } impl RedisSource { - //"127.0.0.1:9043 pub async fn new( chain: &TransformChain, listen_addr: String, @@ -64,15 +63,6 @@ impl RedisSource { connection_limit: Option, hard_connection_limit: Option, ) -> RedisSource { - // let mut socket = - // socket2::Socket::new(Domain::ipv4(), Type::stream(), Some(Protocol::tcp())).unwrap(); - // let addr = listen_addr.clone().parse::().unwrap(); - // socket.bind(&addr.into()).unwrap(); - // socket.listen(10).unwrap(); - // - // // let listener = TcpListener::bind(listen_addr.clone()).await.unwrap(); - // let listener = TcpListener::from_std(socket.into_tcp_listener()).unwrap(); - info!("Starting Redis source on [{}]", listen_addr); let name = "Redis Source"; @@ -88,7 +78,7 @@ impl RedisSource { shutdown_complete_tx, }; - let jh = Handle::current().spawn(async move { + let join_handle = Handle::current().spawn(async move { tokio::select! { res = listener.run() => { if let Err(err) = res { @@ -109,14 +99,12 @@ impl RedisSource { drop(shutdown_complete_tx); drop(notify_shutdown); - // let _ shutd - Ok(()) }); RedisSource { name, - join_handle: jh, + join_handle, listen_addr: listen_addr.clone(), } }