Skip to content

Commit

Permalink
Review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Mar 28, 2023
1 parent 6a56b1e commit 9d7c4cb
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 21 deletions.
29 changes: 25 additions & 4 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use self::node_pool::{get_accessible_owned_connection, NodePoolBuilder, PreparedMetadata};
use self::node_pool::{
get_accessible_owned_connection, AddressError, NodePoolBuilder, PreparedMetadata,
};
use crate::error::ChainResponse;
use crate::frame::cassandra::{parse_statement_single, CassandraMetadata, Tracing};
use crate::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame};
use crate::message::{Message, Messages, Metadata};
use crate::message_value::{IntSize, MessageValue};
use crate::tls::{TlsConnector, TlsConnectorConfig};
use crate::transforms::cassandra::connection::{CassandraConnection, Response, ResponseError};
use crate::transforms::cassandra::sink_cluster::node_pool::bullet_list_of_node_failures;
use crate::transforms::{Transform, TransformBuilder, TransformConfig, Transforms, Wrapper};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
Expand Down Expand Up @@ -348,15 +351,33 @@ impl CassandraSinkCluster {

// This is purely an optimization:
// To avoid opening these connections sequentially later on, we open them concurrently now.
// We ignore errors, the code that actually uses the connection is responsible for reporting the errors
let _ignore_errors = join_all(
let results = join_all(
self.pool
.nodes()
.iter_mut()
.filter(|x| destination_nodes.contains(&x.host_id))
.map(|node| node.get_connection(&self.connection_factory)),
.map(|node| async {
match node.get_connection(&self.connection_factory).await {
Ok(_) => Ok(()),
Err(error) => {
node.report_issue();
Err(AddressError {
error,
address: node.address,
})
}
}
}),
)
.await;
if results.iter().any(|x| x.is_err()) {
tracing::warn!(
"Failed to open a connection to the following nodes:{}",
bullet_list_of_node_failures(
results.iter().filter_map(|x| x.as_ref().err())
)
)
}
}
}
}
Expand Down
35 changes: 18 additions & 17 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/node_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,31 +259,32 @@ impl NodePool {
}
}

pub struct AddressError {
pub address: SocketAddr,
pub error: anyhow::Error,
}

pub fn bullet_list_of_node_failures<'a, I: Iterator<Item = &'a AddressError>>(errors: I) -> String {
let mut node_errors = String::new();
for AddressError { error, address } in errors {
node_errors.push_str(&format!("\n* {address:?}:"));
for sub_error in error.chain() {
node_errors.push_str(&format!("\n - {sub_error}"));
}
}
node_errors
}

pub async fn get_accessible_node<'a>(
connection_factory: &ConnectionFactory,
nodes: Vec<&'a mut CassandraNode>,
) -> Result<&'a mut CassandraNode> {
struct AddressError {
address: SocketAddr,
error: anyhow::Error,
}
fn bullet_list_of_node_failures(errors: &[AddressError]) -> String {
let mut node_errors = String::new();
for AddressError { error, address } in errors {
node_errors.push_str(&format!("\n* {address:?}:"));
for sub_error in error.chain() {
node_errors.push_str(&format!("\n - {sub_error}"));
}
}
node_errors
}

let mut errors = vec![];
for node in nodes {
match node.get_connection(connection_factory).await {
Ok(_) => {
if !errors.is_empty() {
let node_errors = bullet_list_of_node_failures(&errors);
let node_errors = bullet_list_of_node_failures(errors.iter());
tracing::warn!("A successful connection to a node was made but attempts to connect to these nodes failed first:{node_errors}");
}
return Ok(node);
Expand All @@ -298,7 +299,7 @@ pub async fn get_accessible_node<'a>(
}
}

let node_errors = bullet_list_of_node_failures(&errors);
let node_errors = bullet_list_of_node_failures(errors.iter());
Err(anyhow!(
"Attempted to open a connection to one of multiple nodes but all attempts failed:{node_errors}"
))
Expand Down

0 comments on commit 9d7c4cb

Please sign in to comment.