Skip to content

Commit

Permalink
control connection creation error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Oct 31, 2022
1 parent bc3ce73 commit ce6ddb0
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 20 deletions.
92 changes: 72 additions & 20 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,18 @@ impl CassandraSinkCluster {
.iter()
.any(|x| x.address == address && x.is_up)
{
let address = self
let addresses: Vec<_> = self
.pool
.get_round_robin_node_in_dc_rack(&self.local_shotover_node.rack)
.address;
self.create_control_connection(address).await?;
.get_shuffled_nodes_in_dc_rack(
&self.local_shotover_node.rack,
&mut self.rng,
)
.iter()
.map(|node| node.address)
.collect();
self.create_control_connection(&addresses).await.map_err(|e|
e.context("Failed to recreate control connection after control connection node went down")
)?;
}
}
}
Expand Down Expand Up @@ -256,18 +263,23 @@ impl CassandraSinkCluster {
// Create the initial connection.
// Messages will be sent through this connection until we have extracted the handshake.
if self.control_connection.is_none() {
let random_point = if self.pool.nodes().iter().all(|x| !x.is_up) {
tokio::net::lookup_host(self.contact_points.choose(&mut self.rng).unwrap())
.await?
.next()
.unwrap()
let points = if self.pool.nodes().iter().all(|x| !x.is_up) {
let mut points = Vec::with_capacity(self.contact_points.len());
for point in &self.contact_points {
points.push(tokio::net::lookup_host(point).await?.next().unwrap());
}
points
} else {
self.pool
.get_round_robin_node_in_dc_rack(&self.local_shotover_node.rack)
.address
.get_shuffled_nodes_in_dc_rack(&self.local_shotover_node.rack, &mut self.rng)
.iter()
.map(|node| node.address)
.collect()
};

self.create_control_connection(random_point).await?;
self.create_control_connection(&points)
.await
.map_err(|e| e.context("Failed to create initial control connection"))?;
}

if !self.init_handshake_complete {
Expand Down Expand Up @@ -514,11 +526,15 @@ impl CassandraSinkCluster {
// If we have to populate the local_nodes at this point then that means the control connection
// may not have been made against a node in the configured data_center/rack.
// Therefore we need to recreate the control connection to ensure that it is in the configured data_center/rack.
let address = self
let addresses: Vec<_> = self
.pool
.get_round_robin_node_in_dc_rack(&self.local_shotover_node.rack)
.address;
self.create_control_connection(address).await?;
.get_shuffled_nodes_in_dc_rack(&self.local_shotover_node.rack, &mut self.rng)
.iter()
.map(|node| node.address)
.collect();
self.create_control_connection(&addresses)
.await
.map_err(|e| e.context("Failed to recreate control connection when initial connection was possibly against the wrong node"))?;
}
tracing::info!(
"Control connection finalized against node at: {:?}",
Expand All @@ -528,11 +544,47 @@ impl CassandraSinkCluster {
Ok(())
}

async fn create_control_connection(&mut self, address: SocketAddr) -> Result<()> {
self.control_connection = Some(self.connection_factory.new_connection(address).await?);
self.control_connection_address = Some(address);
async fn create_control_connection(&mut self, addresses: &[SocketAddr]) -> Result<()> {
struct AddressError {
address: SocketAddr,
error: anyhow::Error,
}
fn bullet_list_of_node_failures(errors: &[AddressError]) -> String {
let mut node_errors = String::new();
for AddressError { error, address } in errors {
node_errors.push_str(&format!("\n* {address:?}:"));
for sub_error in error.chain() {
node_errors.push_str(&format!("\n - {sub_error}"));
}
}
node_errors
}

Ok(())
let mut errors = vec![];
for address in addresses {
match self.connection_factory.new_connection(address).await {
Ok(connection) => {
self.control_connection = Some(connection);
self.control_connection_address = Some(*address);
if !errors.is_empty() {
let node_errors = bullet_list_of_node_failures(&errors);
tracing::warn!("A successful connection to a control node was made but attempts to connect to these nodes failed first:{node_errors}");
}
return Ok(());
}
Err(error) => {
errors.push(AddressError {
error,
address: *address,
});
}
}
}

let node_errors = bullet_list_of_node_failures(&errors);
Err(anyhow!(
"Attempted to create a control connection against every node in the rack and all attempts failed:{node_errors}"
))
}

fn get_rewrite_table(&self, request: &mut Message, index: usize) -> Option<TableToRewrite> {
Expand Down
15 changes: 15 additions & 0 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/node_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,21 @@ impl NodePool {
write_lock.insert(id, metadata);
}

pub fn get_shuffled_nodes_in_dc_rack(
&mut self,
rack: &str,
rng: &mut SmallRng,
) -> Vec<&mut CassandraNode> {
let mut nodes: Vec<_> = self
.nodes
.iter_mut()
.filter(|node| node.is_up && node.rack == *rack)
.collect();

nodes.shuffle(rng);
nodes
}

pub fn get_round_robin_node_in_dc_rack(&mut self, rack: &str) -> &mut CassandraNode {
let up_indexes: Vec<usize> = self
.nodes
Expand Down

0 comments on commit ce6ddb0

Please sign in to comment.