From 5b04f38b0c0bf5a9b207161f25ec4f52eaac8aa9 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Thu, 6 Oct 2022 18:39:49 +1100 Subject: [PATCH] Improve Cassandra new connection errors --- .../src/transforms/cassandra/connection.rs | 11 ++++++-- .../transforms/cassandra/sink_cluster/node.rs | 25 +++++++++++++++---- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/shotover-proxy/src/transforms/cassandra/connection.rs b/shotover-proxy/src/transforms/cassandra/connection.rs index 6be60700d..96bbb72da 100644 --- a/shotover-proxy/src/transforms/cassandra/connection.rs +++ b/shotover-proxy/src/transforms/cassandra/connection.rs @@ -41,9 +41,16 @@ impl CassandraConnection { pushed_messages_tx: Option>, ) -> Result { let tcp_stream = timeout(Duration::from_secs(3), TcpStream::connect(&host)) - .await? + .await + .map_err(|_| { + anyhow!( + "Cassandra node at {:?} did not respond to connection attempt within 3 seconds", + host + ) + })? .map_err(|e| { - anyhow::Error::new(e).context(format!("Failed to connect to upstream: {:?}", host)) + anyhow::Error::new(e) + .context(format!("Failed to connect to cassandra node: {:?}", host)) })?; let (out_tx, out_rx) = mpsc::unbounded_channel::(); diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs index 5046c5ed7..04f040c13 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs @@ -101,18 +101,33 @@ impl ConnectionFactory { self.tls.clone(), self.pushed_messages_tx.clone(), ) - .await?; + .await + .map_err(|e| e.context("Failed to create new connection"))?; for handshake_message in &self.init_handshake { let (return_chan_tx, return_chan_rx) = oneshot::channel(); - outbound.send(handshake_message.clone(), return_chan_tx)?; - return_chan_rx.await?; + outbound + .send(handshake_message.clone(), return_chan_tx) + .map_err(|e| { + anyhow!(e) + .context("Failed to initialize new connection with handshake, rx failed") + })?; + return_chan_rx.await.map_err(|e| { + anyhow!(e).context("Failed to initialize new connection with handshake, rx failed") + })?; } if let Some(use_message) = &self.use_message { let (return_chan_tx, return_chan_rx) = oneshot::channel(); - outbound.send(use_message.clone(), return_chan_tx)?; - return_chan_rx.await?; + outbound + .send(use_message.clone(), return_chan_tx) + .map_err(|e| { + anyhow!(e) + .context("Failed to initialize new connection with use message, rx failed") + })?; + return_chan_rx.await.map_err(|e| { + anyhow!(e).context("Failed to initialize new connection with handshake, rx failed") + })?; } Ok(outbound)