diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs index d8d858697..632be6133 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs @@ -231,7 +231,7 @@ impl CassandraSinkCluster { .next() .unwrap() } else { - SocketAddr::new(self.get_random_node_in_dc_rack().address, 9042) + self.get_random_node_in_dc_rack().address }; self.init_handshake_connection = Some(self.connection_factory.new_connection(random_point).await?); @@ -353,7 +353,7 @@ impl CassandraSinkCluster { let random_address = self.get_random_node_in_dc_rack().address; self.init_handshake_connection = Some( self.connection_factory - .new_connection((random_address, 9042)) + .new_connection(random_address) .await?, ); } diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs index 6ee624141..1ad3c827c 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs @@ -3,13 +3,13 @@ use crate::message::{Message, Messages}; use crate::tls::TlsConnector; use crate::transforms::cassandra::connection::CassandraConnection; use anyhow::Result; -use std::net::IpAddr; +use std::net::SocketAddr; use tokio::net::ToSocketAddrs; use tokio::sync::{mpsc, oneshot}; #[derive(Debug, Clone)] pub struct CassandraNode { - pub address: IpAddr, + pub address: SocketAddr, pub rack: String, pub _tokens: Vec, pub outbound: Option, @@ -21,11 +21,7 @@ impl CassandraNode { connection_factory: &ConnectionFactory, ) -> Result<&mut CassandraConnection> { if self.outbound.is_none() { - self.outbound = Some( - connection_factory - .new_connection((self.address, 9042)) - .await?, - ) + self.outbound = Some(connection_factory.new_connection(self.address).await?) } Ok(self.outbound.as_mut().unwrap()) diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs index 0ccb0e144..13253990c 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs @@ -142,14 +142,14 @@ fn system_peers_into_nodes( } else { return Err(anyhow!("rack not a varchar")); }; - let address = if let Some(MessageValue::Inet(value)) = row.pop() { + let ip = if let Some(MessageValue::Inet(value)) = row.pop() { value } else { return Err(anyhow!("address not an inet")); }; Ok(CassandraNode { - address, + address: SocketAddr::new(ip, 9042), rack, _tokens: tokens, outbound: None, diff --git a/shotover-proxy/tests/cassandra_int_tests/cluster_multi_rack.rs b/shotover-proxy/tests/cassandra_int_tests/cluster_multi_rack.rs index 568a340bd..eeb340786 100644 --- a/shotover-proxy/tests/cassandra_int_tests/cluster_multi_rack.rs +++ b/shotover-proxy/tests/cassandra_int_tests/cluster_multi_rack.rs @@ -1,6 +1,6 @@ use crate::cassandra_int_tests::cluster::run_topology_task; use crate::helpers::cassandra::{assert_query_result, CassandraConnection, ResultValue}; -use std::net::IpAddr; +use std::net::SocketAddr; async fn test_rewrite_system_peers(connection: &CassandraConnection) { let star_results1 = [ @@ -118,10 +118,10 @@ pub async fn test_topology_task(ca_path: Option<&str>) { let nodes = run_topology_task(ca_path).await; assert_eq!(nodes.len(), 3); - let mut possible_addresses: Vec = vec![ - "172.16.1.2".parse().unwrap(), - "172.16.1.3".parse().unwrap(), - "172.16.1.4".parse().unwrap(), + let mut possible_addresses: Vec = vec![ + "172.16.1.2:9042".parse().unwrap(), + "172.16.1.3:9042".parse().unwrap(), + "172.16.1.4:9042".parse().unwrap(), ]; let mut possible_racks: Vec<&str> = vec!["rack1", "rack2", "rack3"]; for node in &nodes { diff --git a/shotover-proxy/tests/cassandra_int_tests/cluster_single_rack_v4.rs b/shotover-proxy/tests/cassandra_int_tests/cluster_single_rack_v4.rs index 2801b4d56..ca7b4aa6a 100644 --- a/shotover-proxy/tests/cassandra_int_tests/cluster_single_rack_v4.rs +++ b/shotover-proxy/tests/cassandra_int_tests/cluster_single_rack_v4.rs @@ -1,6 +1,6 @@ use crate::cassandra_int_tests::cluster::run_topology_task; use crate::helpers::cassandra::{assert_query_result, CassandraConnection, ResultValue}; -use std::net::IpAddr; +use std::net::SocketAddr; async fn test_rewrite_system_peers(connection: &CassandraConnection) { let all_columns = "peer, data_center, host_id, preferred_ip, rack, release_version, rpc_address, schema_version, tokens"; @@ -215,10 +215,10 @@ pub async fn test_topology_task(ca_path: Option<&str>) { let nodes = run_topology_task(ca_path).await; assert_eq!(nodes.len(), 3); - let mut possible_addresses: Vec = vec![ - "172.16.1.2".parse().unwrap(), - "172.16.1.3".parse().unwrap(), - "172.16.1.4".parse().unwrap(), + let mut possible_addresses: Vec = vec![ + "172.16.1.2:9042".parse().unwrap(), + "172.16.1.3:9042".parse().unwrap(), + "172.16.1.4:9042".parse().unwrap(), ]; for node in &nodes { let address_index = possible_addresses