Skip to content

Commit

Permalink
Remove loop from CassandraSinkSingle (#716)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Aug 2, 2022
1 parent f10252d commit 2916f2f
Showing 1 changed file with 63 additions and 69 deletions.
132 changes: 63 additions & 69 deletions shotover-proxy/src/transforms/cassandra/sink_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,84 +80,78 @@ impl CassandraSinkSingle {

impl CassandraSinkSingle {
async fn send_message(&mut self, messages: Messages) -> ChainResponse {
loop {
match self.outbound {
None => {
trace!("creating outbound connection {:?}", self.address);
self.outbound = Some(
CassandraConnection::new(
self.address.clone(),
CassandraCodec::new(),
self.tls.clone(),
self.pushed_messages_tx.clone(),
)
.await?,
);
// we should either connect and set the value of outbound, or return an error... so we shouldn't loop more than 2 times
}
Some(ref mut outbound_framed_codec) => {
trace!("sending frame upstream");
if self.outbound.is_none() {
trace!("creating outbound connection {:?}", self.address);
self.outbound = Some(
CassandraConnection::new(
self.address.clone(),
CassandraCodec::new(),
self.tls.clone(),
self.pushed_messages_tx.clone(),
)
.await?,
);
}
trace!("sending frame upstream");

let expected_size = messages.len();
let results: Result<FuturesOrdered<oneshot::Receiver<Response>>> = messages
.into_iter()
.map(|m| {
let (return_chan_tx, return_chan_rx) = oneshot::channel();
outbound_framed_codec.send(m, return_chan_tx)?;
let outbound = self.outbound.as_mut().unwrap();
let expected_size = messages.len();
let results: Result<FuturesOrdered<oneshot::Receiver<Response>>> = messages
.into_iter()
.map(|m| {
let (return_chan_tx, return_chan_rx) = oneshot::channel();
outbound.send(m, return_chan_tx)?;

Ok(return_chan_rx)
})
.collect();
Ok(return_chan_rx)
})
.collect();

let mut responses = Vec::with_capacity(expected_size);
let mut results = results?;
let mut responses = Vec::with_capacity(expected_size);
let mut results = results?;

loop {
match timeout(Duration::from_secs(5), results.next()).await {
Ok(Some(prelim)) => {
match prelim? {
Response {
response: Ok(message),
..
} => {
if let Some(raw_bytes) = message.as_raw_bytes() {
if let Ok(Opcode::Error) =
cassandra::raw_frame::get_opcode(raw_bytes)
{
self.failed_requests.increment(1);
}
}
responses.push(message);
}
Response {
mut original,
response: Err(err),
} => {
// TODO: This is wrong: need to have a response for each incoming message
original.set_error(err.to_string());
responses.push(original);
}
};
}
Ok(None) => break,
Err(_) => {
info!(
"timed out waiting for results got - {:?} expected - {:?}",
responses.len(),
expected_size
);
info!(
"timed out waiting for results - {:?} - {:?}",
responses, results
);
loop {
match timeout(Duration::from_secs(5), results.next()).await {
Ok(Some(prelim)) => {
match prelim? {
Response {
response: Ok(message),
..
} => {
if let Some(raw_bytes) = message.as_raw_bytes() {
if let Ok(Opcode::Error) =
cassandra::raw_frame::get_opcode(raw_bytes)
{
self.failed_requests.increment(1);
}
}
responses.push(message);
}
}

return Ok(responses);
Response {
mut original,
response: Err(err),
} => {
// TODO: This is wrong: need to have a response for each incoming message
original.set_error(err.to_string());
responses.push(original);
}
};
}
Ok(None) => break,
Err(_) => {
info!(
"timed out waiting for results got - {:?} expected - {:?}",
responses.len(),
expected_size
);
info!(
"timed out waiting for results - {:?} - {:?}",
responses, results
);
}
}
}

Ok(responses)
}
}

Expand Down

0 comments on commit 2916f2f

Please sign in to comment.