From 2916f2f52993f6bc9513894705fce1d994990939 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 2 Aug 2022 15:59:20 +1000 Subject: [PATCH] Remove loop from CassandraSinkSingle (#716) --- .../src/transforms/cassandra/sink_single.rs | 132 +++++++++--------- 1 file changed, 63 insertions(+), 69 deletions(-) diff --git a/shotover-proxy/src/transforms/cassandra/sink_single.rs b/shotover-proxy/src/transforms/cassandra/sink_single.rs index ad9494f87..9285bf362 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_single.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_single.rs @@ -80,84 +80,78 @@ impl CassandraSinkSingle { impl CassandraSinkSingle { async fn send_message(&mut self, messages: Messages) -> ChainResponse { - loop { - match self.outbound { - None => { - trace!("creating outbound connection {:?}", self.address); - self.outbound = Some( - CassandraConnection::new( - self.address.clone(), - CassandraCodec::new(), - self.tls.clone(), - self.pushed_messages_tx.clone(), - ) - .await?, - ); - // we should either connect and set the value of outbound, or return an error... so we shouldn't loop more than 2 times - } - Some(ref mut outbound_framed_codec) => { - trace!("sending frame upstream"); + if self.outbound.is_none() { + trace!("creating outbound connection {:?}", self.address); + self.outbound = Some( + CassandraConnection::new( + self.address.clone(), + CassandraCodec::new(), + self.tls.clone(), + self.pushed_messages_tx.clone(), + ) + .await?, + ); + } + trace!("sending frame upstream"); - let expected_size = messages.len(); - let results: Result>> = messages - .into_iter() - .map(|m| { - let (return_chan_tx, return_chan_rx) = oneshot::channel(); - outbound_framed_codec.send(m, return_chan_tx)?; + let outbound = self.outbound.as_mut().unwrap(); + let expected_size = messages.len(); + let results: Result>> = messages + .into_iter() + .map(|m| { + let (return_chan_tx, return_chan_rx) = oneshot::channel(); + outbound.send(m, return_chan_tx)?; - Ok(return_chan_rx) - }) - .collect(); + Ok(return_chan_rx) + }) + .collect(); - let mut responses = Vec::with_capacity(expected_size); - let mut results = results?; + let mut responses = Vec::with_capacity(expected_size); + let mut results = results?; - loop { - match timeout(Duration::from_secs(5), results.next()).await { - Ok(Some(prelim)) => { - match prelim? { - Response { - response: Ok(message), - .. - } => { - if let Some(raw_bytes) = message.as_raw_bytes() { - if let Ok(Opcode::Error) = - cassandra::raw_frame::get_opcode(raw_bytes) - { - self.failed_requests.increment(1); - } - } - responses.push(message); - } - Response { - mut original, - response: Err(err), - } => { - // TODO: This is wrong: need to have a response for each incoming message - original.set_error(err.to_string()); - responses.push(original); - } - }; - } - Ok(None) => break, - Err(_) => { - info!( - "timed out waiting for results got - {:?} expected - {:?}", - responses.len(), - expected_size - ); - info!( - "timed out waiting for results - {:?} - {:?}", - responses, results - ); + loop { + match timeout(Duration::from_secs(5), results.next()).await { + Ok(Some(prelim)) => { + match prelim? { + Response { + response: Ok(message), + .. + } => { + if let Some(raw_bytes) = message.as_raw_bytes() { + if let Ok(Opcode::Error) = + cassandra::raw_frame::get_opcode(raw_bytes) + { + self.failed_requests.increment(1); + } } + responses.push(message); } - } - - return Ok(responses); + Response { + mut original, + response: Err(err), + } => { + // TODO: This is wrong: need to have a response for each incoming message + original.set_error(err.to_string()); + responses.push(original); + } + }; + } + Ok(None) => break, + Err(_) => { + info!( + "timed out waiting for results got - {:?} expected - {:?}", + responses.len(), + expected_size + ); + info!( + "timed out waiting for results - {:?} - {:?}", + responses, results + ); } } } + + Ok(responses) } }