diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs index 7a990c628..d5ab79259 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs @@ -1,4 +1,6 @@ -use self::node_pool::{get_accessible_owned_connection, NodePoolBuilder, PreparedMetadata}; +use self::node_pool::{ + get_accessible_owned_connection, AddressError, NodePoolBuilder, PreparedMetadata, +}; use crate::error::ChainResponse; use crate::frame::cassandra::{parse_statement_single, CassandraMetadata, Tracing}; use crate::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame}; @@ -6,6 +8,7 @@ use crate::message::{Message, Messages, Metadata}; use crate::message_value::{IntSize, MessageValue}; use crate::tls::{TlsConnector, TlsConnectorConfig}; use crate::transforms::cassandra::connection::{CassandraConnection, Response, ResponseError}; +use crate::transforms::cassandra::sink_cluster::node_pool::bullet_list_of_node_failures; use crate::transforms::{Transform, TransformBuilder, TransformConfig, Transforms, Wrapper}; use anyhow::{anyhow, Result}; use async_trait::async_trait; @@ -348,15 +351,33 @@ impl CassandraSinkCluster { // This is purely an optimization: // To avoid opening these connections sequentially later on, we open them concurrently now. - // We ignore errors, the code that actually uses the connection is responsible for reporting the errors - let _ignore_errors = join_all( + let results = join_all( self.pool .nodes() .iter_mut() .filter(|x| destination_nodes.contains(&x.host_id)) - .map(|node| node.get_connection(&self.connection_factory)), + .map(|node| async { + match node.get_connection(&self.connection_factory).await { + Ok(_) => Ok(()), + Err(error) => { + node.report_issue(); + Err(AddressError { + error, + address: node.address, + }) + } + } + }), ) .await; + if results.iter().any(|x| x.is_err()) { + tracing::warn!( + "Failed to open a connection to the following nodes:{}", + bullet_list_of_node_failures( + results.iter().filter_map(|x| x.as_ref().err()) + ) + ) + } } } } diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/node_pool.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/node_pool.rs index b92d5de0b..716185e9f 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/node_pool.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/node_pool.rs @@ -259,31 +259,32 @@ impl NodePool { } } +pub struct AddressError { + pub address: SocketAddr, + pub error: anyhow::Error, +} + +pub fn bullet_list_of_node_failures<'a, I: Iterator>(errors: I) -> String { + let mut node_errors = String::new(); + for AddressError { error, address } in errors { + node_errors.push_str(&format!("\n* {address:?}:")); + for sub_error in error.chain() { + node_errors.push_str(&format!("\n - {sub_error}")); + } + } + node_errors +} + pub async fn get_accessible_node<'a>( connection_factory: &ConnectionFactory, nodes: Vec<&'a mut CassandraNode>, ) -> Result<&'a mut CassandraNode> { - struct AddressError { - address: SocketAddr, - error: anyhow::Error, - } - fn bullet_list_of_node_failures(errors: &[AddressError]) -> String { - let mut node_errors = String::new(); - for AddressError { error, address } in errors { - node_errors.push_str(&format!("\n* {address:?}:")); - for sub_error in error.chain() { - node_errors.push_str(&format!("\n - {sub_error}")); - } - } - node_errors - } - let mut errors = vec![]; for node in nodes { match node.get_connection(connection_factory).await { Ok(_) => { if !errors.is_empty() { - let node_errors = bullet_list_of_node_failures(&errors); + let node_errors = bullet_list_of_node_failures(errors.iter()); tracing::warn!("A successful connection to a node was made but attempts to connect to these nodes failed first:{node_errors}"); } return Ok(node); @@ -298,7 +299,7 @@ pub async fn get_accessible_node<'a>( } } - let node_errors = bullet_list_of_node_failures(&errors); + let node_errors = bullet_list_of_node_failures(errors.iter()); Err(anyhow!( "Attempted to open a connection to one of multiple nodes but all attempts failed:{node_errors}" ))