diff --git a/shotover-proxy/src/message/mod.rs b/shotover-proxy/src/message/mod.rs index 4d184d7eb..c194ccdaa 100644 --- a/shotover-proxy/src/message/mod.rs +++ b/shotover-proxy/src/message/mod.rs @@ -235,7 +235,13 @@ impl Message { /// If self is a response: the returned `Message` is a valid replacement of self pub fn to_error_response(&self, error: String) -> Message { Message::from_frame(match self.metadata().unwrap() { - Metadata::Redis => Frame::Redis(RedisFrame::Error(format!("ERR {error}").into())), + Metadata::Redis => { + // Redis errors can not contain newlines at the protocol level + let message = format!("ERR {error}") + .replace("\r\n", " ") + .replace('\n', " "); + Frame::Redis(RedisFrame::Error(message.into())) + } Metadata::Cassandra(frame) => Frame::Cassandra(CassandraFrame { version: frame.version, stream_id: frame.stream_id, diff --git a/shotover-proxy/src/server.rs b/shotover-proxy/src/server.rs index 09a17a30c..17339c39b 100644 --- a/shotover-proxy/src/server.rs +++ b/shotover-proxy/src/server.rs @@ -538,6 +538,14 @@ impl Handler { debug!("Received raw message {:?}", messages); debug!("client details: {:?}", &self.client_details); + let mut error_report_messages = if reverse_chain { + // Avoid allocating for reverse chains as we dont make use of this value in that case + vec![] + } else { + // This clone should be cheap as cloning a Message that has never had `.frame()` called should result in no new allocations. + messages.clone() + }; + let wrapper = Wrapper::new_with_client_details( messages, self.client_details.clone(), @@ -549,12 +557,30 @@ impl Handler { self.chain .process_request_rev(wrapper, self.client_details.clone()) .await + .context("Chain failed to receive pushed messages/events, the connection will now be closed.")? } else { - self.chain + match self + .chain .process_request(wrapper, self.client_details.clone()) .await - } - .context("chain failed to send and/or receive messages")?; + .context("Chain failed to send and/or receive messages, the connection will now be closed.") + { + Ok(x) => x, + Err(err) => { + // An internal error occured and we need to terminate the connection because we can no longer make any gaurantees about the state its in. + // However before we do that we need to return errors for all the messages in this batch for two reasons: + // * Poorly programmed clients may hang forever waiting for a response + // * We want to give the user a hint as to what went wrong + // + they might not know to check the shotover logs + // + they may not be able to correlate which error in the shotover logs corresponds to their failed message + for m in &mut error_report_messages { + *m = m.to_error_response(format!("Internal shotover (or custom transform) bug: {err:?}")); + } + out_tx.send(error_report_messages)?; + return Err(err); + } + } + }; debug!("sending message: {:?}", modified_messages); // send the result of the process up stream diff --git a/shotover-proxy/tests/cassandra_int_tests/mod.rs b/shotover-proxy/tests/cassandra_int_tests/mod.rs index e0ac259fb..57405aa3b 100644 --- a/shotover-proxy/tests/cassandra_int_tests/mod.rs +++ b/shotover-proxy/tests/cassandra_int_tests/mod.rs @@ -227,7 +227,7 @@ async fn cluster_single_rack_v4(#[case] driver: CassandraDriver) { r#"connection was unexpectedly terminated Caused by: - 0: chain failed to send and/or receive messages + 0: Chain failed to send and/or receive messages, the connection will now be closed. 1: CassandraSinkCluster transform failed 2: Failed to create new connection 3: destination 172.16.1.3:9044 did not respond to connection attempt within 3s"#, @@ -240,7 +240,7 @@ Caused by: r#"connection was unexpectedly terminated Caused by: - 0: chain failed to send and/or receive messages + 0: Chain failed to send and/or receive messages, the connection will now be closed. 1: CassandraSinkCluster transform failed 2: system.local returned unexpected cassandra operation: Error(ErrorBody { message: "Internal shotover error: Broken pipe (os error 32)", ty: Server })"#, ) diff --git a/shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs b/shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs index 3a59b4626..73785e1db 100644 --- a/shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs +++ b/shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs @@ -1,16 +1,19 @@ use crate::redis_int_tests::assert::*; +use bytes::BytesMut; use futures::StreamExt; use rand::{thread_rng, Rng}; use rand_distr::Alphanumeric; use redis::aio::Connection; use redis::cluster::ClusterConnection; use redis::{AsyncCommands, Commands, ErrorKind, RedisError, Value}; +use shotover_proxy::frame::RedisFrame; use shotover_proxy::tcp; use std::collections::{HashMap, HashSet}; use std::thread::sleep; use std::time::Duration; use test_helpers::connection::redis_connection; use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; use tokio::time::timeout; use tracing::trace; @@ -1269,7 +1272,7 @@ pub async fn test_trigger_transform_failure_driver(connection: &mut Connection) .await .unwrap_err() .to_string(), - "unexpected end of file".to_string() + "An error was signalled by the server: Internal shotover (or custom transform) bug: Chain failed to send and/or receive messages, the connection will now be closed. Caused by: 0: RedisSinkSingle transform failed 1: Failed to connect to destination \"127.0.0.1:1111\" 2: Connection refused (os error 111)".to_string() ); } @@ -1286,6 +1289,11 @@ pub async fn test_trigger_transform_failure_raw() { connection.write_all(b"*1\r\n$4\r\nping\r\n").await.unwrap(); + assert_eq!( + read_redis_message(&mut connection).await, + RedisFrame::Error("ERR Internal shotover (or custom transform) bug: Chain failed to send and/or receive messages, the connection will now be closed. Caused by: 0: RedisSinkSingle transform failed 1: Failed to connect to destination \"127.0.0.1:1111\" 2: Connection refused (os error 111)".into()) + ); + // If the connection was closed by shotover then we will succesfully read 0 bytes. // If the connection was not closed by shotover then read will block for 10 seconds until the time is hit and then the unwrap will panic. let amount = timeout(Duration::from_secs(10), connection.read(&mut [0; 1])) @@ -1296,6 +1304,23 @@ pub async fn test_trigger_transform_failure_raw() { assert_eq!(amount, 0); } +async fn read_redis_message(connection: &mut TcpStream) -> RedisFrame { + let mut buffer = BytesMut::new(); + loop { + if let Ok(Some((result, len))) = + redis_protocol::resp2::decode::decode(&buffer.clone().freeze()) + { + let _ = buffer.split_to(len); + return result; + } + + let mut data = [0; 1024]; + if let Ok(read_count) = connection.read(&mut data).await { + buffer.extend(&data[..read_count]); + } + } +} + /// CAREFUL: This lacks any kind of check that shotover is ready, /// so make sure shotover_manager.redis_connection is run on 6379 before calling this. pub async fn test_invalid_frame() { diff --git a/shotover-proxy/tests/redis_int_tests/mod.rs b/shotover-proxy/tests/redis_int_tests/mod.rs index 9ea56e94a..969742840 100644 --- a/shotover-proxy/tests/redis_int_tests/mod.rs +++ b/shotover-proxy/tests/redis_int_tests/mod.rs @@ -68,7 +68,7 @@ async fn passthrough_redis_down() { r#"connection was unexpectedly terminated Caused by: - 0: chain failed to send and/or receive messages + 0: Chain failed to send and/or receive messages, the connection will now be closed. 1: RedisSinkSingle transform failed 2: Failed to connect to destination "127.0.0.1:1111" 3: Connection refused (os error 111)"#,