Skip to content

Commit

Permalink
store response as single Message instead of Vec<Message> (#660)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored May 27, 2022
1 parent 6893fbf commit b7f1dca
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 46 deletions.
4 changes: 2 additions & 2 deletions shotover-proxy/src/transforms/cassandra/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ async fn rx_process<C: CodecReadHalf, T: AsyncRead>(
return_message_map.insert(stream_id, m);
},
Some((return_tx, original)) => {
return_tx.send(Response {original, response: Ok(vec![m]) })
return_tx.send(Response {original, response: Ok(m) })
.map_err(|_| anyhow!("couldn't send message"))?;
}
}
Expand All @@ -134,7 +134,7 @@ async fn rx_process<C: CodecReadHalf, T: AsyncRead>(
return_channel_map.insert(message_id, (return_chan, message));
}
Some(m) => {
return_chan.send(Response { original: message, response: Ok(vec![m]) })
return_chan.send(Response { original: message, response: Ok(m) })
.map_err(|_| anyhow!("couldn't send message"))?;
}
};
Expand Down
16 changes: 7 additions & 9 deletions shotover-proxy/src/transforms/cassandra/sink_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,17 @@ impl CassandraSinkSingle {
Ok(Some(prelim)) => {
match prelim? {
Response {
response: Ok(mut resp),
response: Ok(message),
..
} => {
for message in &resp {
if let Some(raw_bytes) = message.as_raw_bytes() {
if let Ok(Opcode::Error) =
cassandra::raw_frame::get_opcode(raw_bytes)
{
self.failed_requests.increment(1);
}
if let Some(raw_bytes) = message.as_raw_bytes() {
if let Ok(Opcode::Error) =
cassandra::raw_frame::get_opcode(raw_bytes)
{
self.failed_requests.increment(1);
}
}
responses.append(&mut resp);
responses.push(message);
}
Response {
mut original,
Expand Down
50 changes: 20 additions & 30 deletions shotover-proxy/src/transforms/redis/sink_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,23 +172,20 @@ impl RedisSinkCluster {
} else {
match response {
Ok(Response {
response: Ok(mut messages),
response: Ok(mut message),
..
}) => Some(messages.pop().map_or(
RedisFrame::Null,
|mut message| match message.frame().unwrap() {
Frame::Redis(frame) => {
let new_frame = frame.take();
match acc {
Some(prev_frame) => routing_info
.response_join()
.join(prev_frame, new_frame),
None => new_frame,
}
}) => Some(match message.frame().unwrap() {
Frame::Redis(frame) => {
let new_frame = frame.take();
match acc {
Some(prev_frame) => routing_info
.response_join()
.join(prev_frame, new_frame),
None => new_frame,
}
_ => unreachable!("direct response from a redis sink"),
},
)),
}
_ => unreachable!("direct response from a redis sink"),
}),
Ok(Response {
response: Err(e), ..
}) => Some(RedisFrame::Error(e.to_string().into())),
Expand All @@ -200,9 +197,7 @@ impl RedisSinkCluster {

Ok(Response {
original: message,
response: ChainResponse::Ok(vec![Message::from_frame(Frame::Redis(
response.unwrap(),
))]),
response: Ok(Message::from_frame(Frame::Redis(response.unwrap()))),
})
})
}
Expand Down Expand Up @@ -850,10 +845,7 @@ fn send_frame_request(
async fn receive_frame_response(receiver: oneshot::Receiver<Response>) -> Result<RedisFrame> {
let Response { response, .. } = receiver.await?;

// Exactly one Redis response is guaranteed by the codec on success.
let mut message = response?.pop().unwrap();

match message.frame() {
match response?.frame() {
Some(Frame::Redis(frame)) => Ok(frame.take()),
None => Err(anyhow!("Failed to parse redis frame")),
response => Err(anyhow!("Unexpected redis response: {response:?}")),
Expand All @@ -867,7 +859,7 @@ fn send_frame_response(
) -> Result<(), Response> {
one_tx.send(Response {
original: Message::from_frame(Frame::None),
response: Ok(vec![Message::from_frame(Frame::Redis(frame))]),
response: Ok(Message::from_frame(Frame::Redis(frame))),
})
}

Expand Down Expand Up @@ -915,17 +907,15 @@ impl Transform for RedisSinkCluster {
let Response { original, response } = s.or_else(|_| -> Result<Response> {
Ok(Response {
original: Message::from_frame(Frame::None),
response: Ok(vec![Message::from_frame(Frame::Redis(RedisFrame::Error(
response: Ok(Message::from_frame(Frame::Redis(RedisFrame::Error(
Str::from_inner(Bytes::from_static(b"ERR Could not route request"))
.unwrap(),
)))]),
)))),
})
})?;

let mut response = response?;
assert_eq!(response.len(), 1);
let mut response_m = response.remove(0);
match response_m.frame() {
match response.frame() {
Some(Frame::Redis(frame)) => {
match frame.to_redirection() {
Some(Redirection::Moved { slot, server }) => {
Expand All @@ -951,10 +941,10 @@ impl Transform for RedisSinkCluster {
one_rx.map_err(|e| anyhow!("Error while retrying ASK - {}", e)),
));
}
None => response_buffer.push(response_m),
None => response_buffer.push(response),
}
}
_ => response_buffer.push(response_m),
_ => response_buffer.push(response),
}
}
Ok(response_buffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ async fn rx_process<C: CodecReadHalf, R: AsyncRead + Unpin + Send + 'static>(
// If the receiver hangs up, just silently ignore
let _ = ret.send(Response {
original: message,
response: Ok(vec![m]),
response: Ok(m),
});
}
}
Expand Down
7 changes: 3 additions & 4 deletions shotover-proxy/src/transforms/util/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use anyhow::Error;
use anyhow::{Error, Result};
use std::fmt;
use std::io;

use crate::error::ChainResponse;
use crate::message::Message;

pub mod cluster_connection_pool;
Expand All @@ -17,8 +16,8 @@ pub struct Request {
/// Represents a `Response` to a `Request`
#[derive(Debug)]
pub struct Response {
pub original: Message, // Original `Message` that this `Response` is to
pub response: ChainResponse, // Response to the original `Message`
pub original: Message, // Original `Message` that this `Response` is to
pub response: Result<Message>, // Response to the original `Message`
}

#[derive(thiserror::Error, Debug)]
Expand Down

0 comments on commit b7f1dca

Please sign in to comment.