diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs index 8f83d8614..27b4db6d5 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs @@ -27,7 +27,7 @@ use node::{CassandraNode, ConnectionFactory}; use node_pool::{GetReplicaErr, NodePool}; use rand::prelude::*; use serde::Deserialize; -use std::net::SocketAddr; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::time::Duration; use tokio::sync::{mpsc, oneshot, watch}; use topology::{create_topology_task, TaskConnectionInfo}; @@ -745,6 +745,7 @@ impl CassandraSinkCluster { let mut broadcast_address_alias = "broadcast_address"; let mut listen_address_alias = "listen_address"; let mut host_id_alias = "host_id"; + let mut rpc_address_alias = "rpc_address"; let mut rpc_port_alias = "rpc_port"; for select in &table.selects { if let SelectElement::Column(column) = select { @@ -765,6 +766,8 @@ impl CassandraSinkCluster { listen_address_alias = alias; } else if column.name == Identifier::Unquoted("host_id".to_string()) { host_id_alias = alias; + } else if column.name == Identifier::Unquoted("rpc_address".to_string()) { + rpc_address_alias = alias } else if column.name == Identifier::Unquoted("rpc_port".to_string()) { rpc_port_alias = alias } @@ -817,6 +820,12 @@ impl CassandraSinkCluster { if let MessageValue::Uuid(host_id) = col { *host_id = self.local_shotover_node.host_id; } + } else if col_meta.name == rpc_address_alias { + if let MessageValue::Inet(address) = col { + if address != &IpAddr::V4(Ipv4Addr::UNSPECIFIED) { + *address = self.local_shotover_node.address.ip() + } + } } else if col_meta.name == rpc_port_alias { if let MessageValue::Integer(rpc_port, _) = col { *rpc_port = self.local_shotover_node.address.port() as i64;