From 97c0ca8224f50c351a524a2c5c61db81dad5c18d Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Wed, 15 Mar 2023 14:42:30 +1100 Subject: [PATCH] Return error message to client when chain fails to send/receive --- shotover-proxy/src/message/mod.rs | 8 +++++++- shotover-proxy/src/server.rs | 33 ++++++++++++++++++++++++++++--- 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/shotover-proxy/src/message/mod.rs b/shotover-proxy/src/message/mod.rs index 4d184d7eb..c194ccdaa 100644 --- a/shotover-proxy/src/message/mod.rs +++ b/shotover-proxy/src/message/mod.rs @@ -235,7 +235,13 @@ impl Message { /// If self is a response: the returned `Message` is a valid replacement of self pub fn to_error_response(&self, error: String) -> Message { Message::from_frame(match self.metadata().unwrap() { - Metadata::Redis => Frame::Redis(RedisFrame::Error(format!("ERR {error}").into())), + Metadata::Redis => { + // Redis errors can not contain newlines at the protocol level + let message = format!("ERR {error}") + .replace("\r\n", " ") + .replace('\n', " "); + Frame::Redis(RedisFrame::Error(message.into())) + } Metadata::Cassandra(frame) => Frame::Cassandra(CassandraFrame { version: frame.version, stream_id: frame.stream_id, diff --git a/shotover-proxy/src/server.rs b/shotover-proxy/src/server.rs index 09a17a30c..881bc4f28 100644 --- a/shotover-proxy/src/server.rs +++ b/shotover-proxy/src/server.rs @@ -538,6 +538,14 @@ impl Handler { debug!("Received raw message {:?}", messages); debug!("client details: {:?}", &self.client_details); + let mut error_report_messages = if reverse_chain { + // Avoid allocating for reverse chains as we dont make use of this value in that case + vec![] + } else { + // This clone should be cheap as cloning a Message that has never had `.frame()` called should result in no new allocations. + messages.clone() + }; + let wrapper = Wrapper::new_with_client_details( messages, self.client_details.clone(), @@ -549,12 +557,31 @@ impl Handler { self.chain .process_request_rev(wrapper, self.client_details.clone()) .await + .context("chain failed to receive pushed messages/events")? } else { - self.chain + match self + .chain .process_request(wrapper, self.client_details.clone()) .await - } - .context("chain failed to send and/or receive messages")?; + .context("chain failed to send and/or receive messages") + { + Ok(x) => x, + Err(err) => { + // An internal error occured and we need to terminate the connection because we can no longer make any gaurantees about the state its in. + // However before we do that we need to return errors for all the messages in this batch for two reasons: + // * Poorly programmed clients may hang forever waiting for a response + // * We want to give the user a hint as to what went wrong + // + they might not know to check the shotover logs + // + they may not be able to correlate which error in the shotover logs corresponds to their failed message + let err = err.context("The connection will now be closed due to an internal shotover error that occured while processing this batch of messages. This is a bug in either shotover or a shotover custom transform"); + for m in &mut error_report_messages { + *m = m.to_error_response(format!("{err:?}")); + } + out_tx.send(error_report_messages)?; + return Err(err); + } + } + }; debug!("sending message: {:?}", modified_messages); // send the result of the process up stream