Skip to content

Commit

Permalink
Return error message to client when chain fails to send/receive (#1082)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Mar 17, 2023
1 parent 0d03cec commit 2ae9e5d
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 8 deletions.
8 changes: 7 additions & 1 deletion shotover-proxy/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,13 @@ impl Message {
/// If self is a response: the returned `Message` is a valid replacement of self
pub fn to_error_response(&self, error: String) -> Message {
Message::from_frame(match self.metadata().unwrap() {
Metadata::Redis => Frame::Redis(RedisFrame::Error(format!("ERR {error}").into())),
Metadata::Redis => {
// Redis errors can not contain newlines at the protocol level
let message = format!("ERR {error}")
.replace("\r\n", " ")
.replace('\n', " ");
Frame::Redis(RedisFrame::Error(message.into()))
}
Metadata::Cassandra(frame) => Frame::Cassandra(CassandraFrame {
version: frame.version,
stream_id: frame.stream_id,
Expand Down
32 changes: 29 additions & 3 deletions shotover-proxy/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,14 @@ impl<C: CodecBuilder + 'static> Handler<C> {
debug!("Received raw message {:?}", messages);
debug!("client details: {:?}", &self.client_details);

let mut error_report_messages = if reverse_chain {
// Avoid allocating for reverse chains as we dont make use of this value in that case
vec![]
} else {
// This clone should be cheap as cloning a Message that has never had `.frame()` called should result in no new allocations.
messages.clone()
};

let wrapper = Wrapper::new_with_client_details(
messages,
self.client_details.clone(),
Expand All @@ -549,12 +557,30 @@ impl<C: CodecBuilder + 'static> Handler<C> {
self.chain
.process_request_rev(wrapper, self.client_details.clone())
.await
.context("Chain failed to receive pushed messages/events, the connection will now be closed.")?
} else {
self.chain
match self
.chain
.process_request(wrapper, self.client_details.clone())
.await
}
.context("chain failed to send and/or receive messages")?;
.context("Chain failed to send and/or receive messages, the connection will now be closed.")
{
Ok(x) => x,
Err(err) => {
// An internal error occured and we need to terminate the connection because we can no longer make any gaurantees about the state its in.
// However before we do that we need to return errors for all the messages in this batch for two reasons:
// * Poorly programmed clients may hang forever waiting for a response
// * We want to give the user a hint as to what went wrong
// + they might not know to check the shotover logs
// + they may not be able to correlate which error in the shotover logs corresponds to their failed message
for m in &mut error_report_messages {
*m = m.to_error_response(format!("Internal shotover (or custom transform) bug: {err:?}"));
}
out_tx.send(error_report_messages)?;
return Err(err);
}
}
};

debug!("sending message: {:?}", modified_messages);
// send the result of the process up stream
Expand Down
4 changes: 2 additions & 2 deletions shotover-proxy/tests/cassandra_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ async fn cluster_single_rack_v4(#[case] driver: CassandraDriver) {
r#"connection was unexpectedly terminated
Caused by:
0: chain failed to send and/or receive messages
0: Chain failed to send and/or receive messages, the connection will now be closed.
1: CassandraSinkCluster transform failed
2: Failed to create new connection
3: destination 172.16.1.3:9044 did not respond to connection attempt within 3s"#,
Expand All @@ -240,7 +240,7 @@ Caused by:
r#"connection was unexpectedly terminated
Caused by:
0: chain failed to send and/or receive messages
0: Chain failed to send and/or receive messages, the connection will now be closed.
1: CassandraSinkCluster transform failed
2: system.local returned unexpected cassandra operation: Error(ErrorBody { message: "Internal shotover error: Broken pipe (os error 32)", ty: Server })"#,
)
Expand Down
27 changes: 26 additions & 1 deletion shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use crate::redis_int_tests::assert::*;
use bytes::BytesMut;
use futures::StreamExt;
use rand::{thread_rng, Rng};
use rand_distr::Alphanumeric;
use redis::aio::Connection;
use redis::cluster::ClusterConnection;
use redis::{AsyncCommands, Commands, ErrorKind, RedisError, Value};
use shotover_proxy::frame::RedisFrame;
use shotover_proxy::tcp;
use std::collections::{HashMap, HashSet};
use std::thread::sleep;
use std::time::Duration;
use test_helpers::connection::redis_connection;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

use tokio::time::timeout;
use tracing::trace;
Expand Down Expand Up @@ -1269,7 +1272,7 @@ pub async fn test_trigger_transform_failure_driver(connection: &mut Connection)
.await
.unwrap_err()
.to_string(),
"unexpected end of file".to_string()
"An error was signalled by the server: Internal shotover (or custom transform) bug: Chain failed to send and/or receive messages, the connection will now be closed. Caused by: 0: RedisSinkSingle transform failed 1: Failed to connect to destination \"127.0.0.1:1111\" 2: Connection refused (os error 111)".to_string()
);
}

Expand All @@ -1286,6 +1289,11 @@ pub async fn test_trigger_transform_failure_raw() {

connection.write_all(b"*1\r\n$4\r\nping\r\n").await.unwrap();

assert_eq!(
read_redis_message(&mut connection).await,
RedisFrame::Error("ERR Internal shotover (or custom transform) bug: Chain failed to send and/or receive messages, the connection will now be closed. Caused by: 0: RedisSinkSingle transform failed 1: Failed to connect to destination \"127.0.0.1:1111\" 2: Connection refused (os error 111)".into())
);

// If the connection was closed by shotover then we will succesfully read 0 bytes.
// If the connection was not closed by shotover then read will block for 10 seconds until the time is hit and then the unwrap will panic.
let amount = timeout(Duration::from_secs(10), connection.read(&mut [0; 1]))
Expand All @@ -1296,6 +1304,23 @@ pub async fn test_trigger_transform_failure_raw() {
assert_eq!(amount, 0);
}

async fn read_redis_message(connection: &mut TcpStream) -> RedisFrame {
let mut buffer = BytesMut::new();
loop {
if let Ok(Some((result, len))) =
redis_protocol::resp2::decode::decode(&buffer.clone().freeze())
{
let _ = buffer.split_to(len);
return result;
}

let mut data = [0; 1024];
if let Ok(read_count) = connection.read(&mut data).await {
buffer.extend(&data[..read_count]);
}
}
}

/// CAREFUL: This lacks any kind of check that shotover is ready,
/// so make sure shotover_manager.redis_connection is run on 6379 before calling this.
pub async fn test_invalid_frame() {
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/tests/redis_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async fn passthrough_redis_down() {
r#"connection was unexpectedly terminated
Caused by:
0: chain failed to send and/or receive messages
0: Chain failed to send and/or receive messages, the connection will now be closed.
1: RedisSinkSingle transform failed
2: Failed to connect to destination "127.0.0.1:1111"
3: Connection refused (os error 111)"#,
Expand Down

0 comments on commit 2ae9e5d

Please sign in to comment.