Skip to content

Commit

Permalink
Return error message to client when chain fails to send/receive
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Mar 15, 2023
1 parent 0d03cec commit 97c0ca8
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 4 deletions.
8 changes: 7 additions & 1 deletion shotover-proxy/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,13 @@ impl Message {
/// If self is a response: the returned `Message` is a valid replacement of self
pub fn to_error_response(&self, error: String) -> Message {
Message::from_frame(match self.metadata().unwrap() {
Metadata::Redis => Frame::Redis(RedisFrame::Error(format!("ERR {error}").into())),
Metadata::Redis => {
// Redis errors can not contain newlines at the protocol level
let message = format!("ERR {error}")
.replace("\r\n", " ")
.replace('\n', " ");
Frame::Redis(RedisFrame::Error(message.into()))
}
Metadata::Cassandra(frame) => Frame::Cassandra(CassandraFrame {
version: frame.version,
stream_id: frame.stream_id,
Expand Down
33 changes: 30 additions & 3 deletions shotover-proxy/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,14 @@ impl<C: CodecBuilder + 'static> Handler<C> {
debug!("Received raw message {:?}", messages);
debug!("client details: {:?}", &self.client_details);

let mut error_report_messages = if reverse_chain {
// Avoid allocating for reverse chains as we dont make use of this value in that case
vec![]
} else {
// This clone should be cheap as cloning a Message that has never had `.frame()` called should result in no new allocations.
messages.clone()
};

let wrapper = Wrapper::new_with_client_details(
messages,
self.client_details.clone(),
Expand All @@ -549,12 +557,31 @@ impl<C: CodecBuilder + 'static> Handler<C> {
self.chain
.process_request_rev(wrapper, self.client_details.clone())
.await
.context("chain failed to receive pushed messages/events")?
} else {
self.chain
match self
.chain
.process_request(wrapper, self.client_details.clone())
.await
}
.context("chain failed to send and/or receive messages")?;
.context("chain failed to send and/or receive messages")
{
Ok(x) => x,
Err(err) => {
// An internal error occured and we need to terminate the connection because we can no longer make any gaurantees about the state its in.
// However before we do that we need to return errors for all the messages in this batch for two reasons:
// * Poorly programmed clients may hang forever waiting for a response
// * We want to give the user a hint as to what went wrong
// + they might not know to check the shotover logs
// + they may not be able to correlate which error in the shotover logs corresponds to their failed message
let err = err.context("The connection will now be closed due to an internal shotover error that occured while processing this batch of messages. This is a bug in either shotover or a shotover custom transform");
for m in &mut error_report_messages {
*m = m.to_error_response(format!("{err:?}"));
}
out_tx.send(error_report_messages)?;
return Err(err);
}
}
};

debug!("sending message: {:?}", modified_messages);
// send the result of the process up stream
Expand Down

0 comments on commit 97c0ca8

Please sign in to comment.