diff --git a/shotover-proxy/example-configs/cassandra-cluster-tls/topology.yaml b/shotover-proxy/example-configs/cassandra-cluster-tls/topology.yaml index 99b81e82f..840ed45b7 100644 --- a/shotover-proxy/example-configs/cassandra-cluster-tls/topology.yaml +++ b/shotover-proxy/example-configs/cassandra-cluster-tls/topology.yaml @@ -12,6 +12,7 @@ chain_config: - CassandraSinkCluster: first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"] data_center: "dc1" + rack: "rack1" tls: certificate_authority_path: "example-configs/cassandra-tls/certs/localhost_CA.crt" certificate_path: "example-configs/cassandra-tls/certs/localhost.crt" diff --git a/shotover-proxy/example-configs/cassandra-cluster/topology.yaml b/shotover-proxy/example-configs/cassandra-cluster/topology.yaml index 285798f9c..bab9bfc85 100644 --- a/shotover-proxy/example-configs/cassandra-cluster/topology.yaml +++ b/shotover-proxy/example-configs/cassandra-cluster/topology.yaml @@ -8,5 +8,6 @@ chain_config: - CassandraSinkCluster: first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"] data_center: "dc1" + rack: "rack1" source_to_chain_mapping: cassandra_prod: main_chain diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs index f05fc20ef..26aa445e2 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs @@ -30,6 +30,7 @@ mod node; pub struct CassandraSinkClusterConfig { pub first_contact_points: Vec, pub data_center: String, + pub rack: String, pub tls: Option, pub read_timeout: Option, } @@ -41,6 +42,7 @@ impl CassandraSinkClusterConfig { self.first_contact_points.clone(), chain_name, self.data_center.clone(), + self.rack.clone(), tls, self.read_timeout, ))) @@ -61,6 +63,7 @@ pub struct CassandraSinkCluster { read_timeout: Option, peer_table: FQName, data_center: String, + rack: String, /// A local clone of topology_task_nodes /// Internally stores connections to the nodes local_nodes: Vec, @@ -87,6 +90,7 @@ impl Clone for CassandraSinkCluster { read_timeout: self.read_timeout, peer_table: self.peer_table.clone(), data_center: self.data_center.clone(), + rack: self.rack.clone(), local_nodes: vec![], topology_task_nodes: self.topology_task_nodes.clone(), rng: SmallRng::from_rng(rand::thread_rng()).unwrap(), @@ -100,6 +104,7 @@ impl CassandraSinkCluster { contact_points: Vec, chain_name: String, data_center: String, + rack: String, tls: Option, timeout: Option, ) -> CassandraSinkCluster { @@ -114,6 +119,7 @@ impl CassandraSinkCluster { nodes_shared.clone(), task_handshake_rx, data_center.clone(), + rack.clone(), ); CassandraSinkCluster { @@ -130,6 +136,7 @@ impl CassandraSinkCluster { read_timeout: receive_timeout, peer_table: FQName::new("system", "peers"), data_center, + rack, local_nodes: vec![], topology_task_nodes: nodes_shared, rng: SmallRng::from_rng(rand::thread_rng()).unwrap(), @@ -275,11 +282,13 @@ pub fn create_topology_task( nodes: Arc>>, mut handshake_rx: mpsc::Receiver, data_center: String, + rack: String, ) { tokio::spawn(async move { while let Some(handshake) = handshake_rx.recv().await { let mut attempts = 0; - while let Err(err) = topology_task_process(&tls, &nodes, &handshake, &data_center).await + while let Err(err) = + topology_task_process(&tls, &nodes, &handshake, &data_center, &rack).await { tracing::error!("topology task failed, retrying, error was: {err:?}"); attempts += 1; @@ -304,6 +313,7 @@ async fn topology_task_process( nodes: &Arc>>, handshake: &TaskHandshake, data_center: &str, + rack: &str, ) -> Result<()> { let outbound = node::new_connection(&handshake.address, &handshake.handshake, tls, &None).await?; @@ -343,8 +353,8 @@ async fn topology_task_process( )?; let (new_nodes, more_nodes) = tokio::join!( - async { system_peers_into_nodes(peers_rx.await?.response?, data_center) }, - async { system_peers_into_nodes(local_rx.await?.response?, data_center) } + async { system_peers_into_nodes(peers_rx.await?.response?, data_center, rack) }, + async { system_peers_into_nodes(local_rx.await?.response?, data_center, rack) } ); let mut new_nodes = new_nodes?; new_nodes.extend(more_nodes?); @@ -362,6 +372,7 @@ async fn topology_task_process( fn system_peers_into_nodes( mut response: Message, config_data_center: &str, + config_rack: &str, ) -> Result> { if let Some(Frame::Cassandra(frame)) = response.frame() { match &mut frame.operation { @@ -371,8 +382,12 @@ fn system_peers_into_nodes( }) => rows .iter_mut() .filter(|row| { - if let Some(MessageValue::Varchar(data_center)) = row.get(2) { - data_center == config_data_center + if let ( + Some(MessageValue::Varchar(data_center)), + Some(MessageValue::Varchar(rack)), + ) = (row.get(2), row.get(1)) + { + data_center == config_data_center && rack == config_rack } else { false } @@ -393,11 +408,7 @@ fn system_peers_into_nodes( return Err(anyhow!("tokens not a list")); }; let _data_center = row.pop(); - let rack = if let Some(MessageValue::Varchar(value)) = row.pop() { - value - } else { - return Err(anyhow!("rack not a varchar")); - }; + let _rack = row.pop(); let address = if let Some(MessageValue::Inet(value)) = row.pop() { value } else { @@ -406,7 +417,6 @@ fn system_peers_into_nodes( Ok(CassandraNode { address, - _rack: rack, _tokens: tokens, outbound: None, }) diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs index 5c51a7639..46a85d54d 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs @@ -10,7 +10,6 @@ use tokio::sync::{mpsc, oneshot}; #[derive(Debug, Clone)] pub struct CassandraNode { pub address: IpAddr, - pub _rack: String, pub _tokens: Vec, pub outbound: Option, } diff --git a/shotover-proxy/tests/cassandra_int_tests/cluster.rs b/shotover-proxy/tests/cassandra_int_tests/cluster.rs index 6fe0aac75..24b4506f4 100644 --- a/shotover-proxy/tests/cassandra_int_tests/cluster.rs +++ b/shotover-proxy/tests/cassandra_int_tests/cluster.rs @@ -15,6 +15,7 @@ pub async fn test() { nodes_shared.clone(), task_handshake_rx, "dc1".to_string(), + "rack1".to_string(), ); // Give the handshake task a hardcoded handshake. @@ -54,7 +55,6 @@ pub async fn test() { .expect("Node did not contain a unique expected address"); possible_addresses.remove(address_index); - assert_eq!(node._rack, "rack1"); assert_eq!(node._tokens.len(), 128); } }