Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return error message to client when chain fails to send/receive #1082

Merged
merged 1 commit into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion shotover-proxy/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,13 @@ impl Message {
/// If self is a response: the returned `Message` is a valid replacement of self
pub fn to_error_response(&self, error: String) -> Message {
Message::from_frame(match self.metadata().unwrap() {
Metadata::Redis => Frame::Redis(RedisFrame::Error(format!("ERR {error}").into())),
Metadata::Redis => {
// Redis errors can not contain newlines at the protocol level
let message = format!("ERR {error}")
.replace("\r\n", " ")
.replace('\n', " ");
Frame::Redis(RedisFrame::Error(message.into()))
}
Metadata::Cassandra(frame) => Frame::Cassandra(CassandraFrame {
version: frame.version,
stream_id: frame.stream_id,
Expand Down
32 changes: 29 additions & 3 deletions shotover-proxy/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,14 @@ impl<C: CodecBuilder + 'static> Handler<C> {
debug!("Received raw message {:?}", messages);
debug!("client details: {:?}", &self.client_details);

let mut error_report_messages = if reverse_chain {
// Avoid allocating for reverse chains as we dont make use of this value in that case
vec![]
} else {
// This clone should be cheap as cloning a Message that has never had `.frame()` called should result in no new allocations.
messages.clone()
};

let wrapper = Wrapper::new_with_client_details(
messages,
self.client_details.clone(),
Expand All @@ -549,12 +557,30 @@ impl<C: CodecBuilder + 'static> Handler<C> {
self.chain
.process_request_rev(wrapper, self.client_details.clone())
.await
.context("Chain failed to receive pushed messages/events, the connection will now be closed.")?
} else {
self.chain
match self
.chain
.process_request(wrapper, self.client_details.clone())
.await
}
.context("chain failed to send and/or receive messages")?;
.context("Chain failed to send and/or receive messages, the connection will now be closed.")
{
Ok(x) => x,
Err(err) => {
// An internal error occured and we need to terminate the connection because we can no longer make any gaurantees about the state its in.
// However before we do that we need to return errors for all the messages in this batch for two reasons:
// * Poorly programmed clients may hang forever waiting for a response
// * We want to give the user a hint as to what went wrong
// + they might not know to check the shotover logs
// + they may not be able to correlate which error in the shotover logs corresponds to their failed message
for m in &mut error_report_messages {
*m = m.to_error_response(format!("Internal shotover (or custom transform) bug: {err:?}"));
}
out_tx.send(error_report_messages)?;
return Err(err);
}
}
};

debug!("sending message: {:?}", modified_messages);
// send the result of the process up stream
Expand Down
4 changes: 2 additions & 2 deletions shotover-proxy/tests/cassandra_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ async fn cluster_single_rack_v4(#[case] driver: CassandraDriver) {
r#"connection was unexpectedly terminated
Caused by:
0: chain failed to send and/or receive messages
0: Chain failed to send and/or receive messages, the connection will now be closed.
1: CassandraSinkCluster transform failed
2: Failed to create new connection
3: destination 172.16.1.3:9044 did not respond to connection attempt within 3s"#,
Expand All @@ -240,7 +240,7 @@ Caused by:
r#"connection was unexpectedly terminated
Caused by:
0: chain failed to send and/or receive messages
0: Chain failed to send and/or receive messages, the connection will now be closed.
1: CassandraSinkCluster transform failed
2: system.local returned unexpected cassandra operation: Error(ErrorBody { message: "Internal shotover error: Broken pipe (os error 32)", ty: Server })"#,
)
Expand Down
27 changes: 26 additions & 1 deletion shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use crate::redis_int_tests::assert::*;
use bytes::BytesMut;
use futures::StreamExt;
use rand::{thread_rng, Rng};
use rand_distr::Alphanumeric;
use redis::aio::Connection;
use redis::cluster::ClusterConnection;
use redis::{AsyncCommands, Commands, ErrorKind, RedisError, Value};
use shotover_proxy::frame::RedisFrame;
use shotover_proxy::tcp;
use std::collections::{HashMap, HashSet};
use std::thread::sleep;
use std::time::Duration;
use test_helpers::connection::redis_connection;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

use tokio::time::timeout;
use tracing::trace;
Expand Down Expand Up @@ -1269,7 +1272,7 @@ pub async fn test_trigger_transform_failure_driver(connection: &mut Connection)
.await
.unwrap_err()
.to_string(),
"unexpected end of file".to_string()
"An error was signalled by the server: Internal shotover (or custom transform) bug: Chain failed to send and/or receive messages, the connection will now be closed. Caused by: 0: RedisSinkSingle transform failed 1: Failed to connect to destination \"127.0.0.1:1111\" 2: Connection refused (os error 111)".to_string()
);
}

Expand All @@ -1286,6 +1289,11 @@ pub async fn test_trigger_transform_failure_raw() {

connection.write_all(b"*1\r\n$4\r\nping\r\n").await.unwrap();

assert_eq!(
read_redis_message(&mut connection).await,
RedisFrame::Error("ERR Internal shotover (or custom transform) bug: Chain failed to send and/or receive messages, the connection will now be closed. Caused by: 0: RedisSinkSingle transform failed 1: Failed to connect to destination \"127.0.0.1:1111\" 2: Connection refused (os error 111)".into())
);

// If the connection was closed by shotover then we will succesfully read 0 bytes.
// If the connection was not closed by shotover then read will block for 10 seconds until the time is hit and then the unwrap will panic.
let amount = timeout(Duration::from_secs(10), connection.read(&mut [0; 1]))
Expand All @@ -1296,6 +1304,23 @@ pub async fn test_trigger_transform_failure_raw() {
assert_eq!(amount, 0);
}

async fn read_redis_message(connection: &mut TcpStream) -> RedisFrame {
let mut buffer = BytesMut::new();
loop {
if let Ok(Some((result, len))) =
redis_protocol::resp2::decode::decode(&buffer.clone().freeze())
{
let _ = buffer.split_to(len);
return result;
}

let mut data = [0; 1024];
if let Ok(read_count) = connection.read(&mut data).await {
buffer.extend(&data[..read_count]);
}
}
}

/// CAREFUL: This lacks any kind of check that shotover is ready,
/// so make sure shotover_manager.redis_connection is run on 6379 before calling this.
pub async fn test_invalid_frame() {
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/tests/redis_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async fn passthrough_redis_down() {
r#"connection was unexpectedly terminated
Caused by:
0: chain failed to send and/or receive messages
0: Chain failed to send and/or receive messages, the connection will now be closed.
1: RedisSinkSingle transform failed
2: Failed to connect to destination "127.0.0.1:1111"
3: Connection refused (os error 111)"#,
Expand Down