Skip to content

Commit

Permalink
CassandraSinkCluster: rewrite rpc_address (#859)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Oct 18, 2022
1 parent 39ac5f8 commit 8fb07c2
Showing 1 changed file with 10 additions and 1 deletion.
11 changes: 10 additions & 1 deletion shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use node::{CassandraNode, ConnectionFactory};
use node_pool::{GetReplicaErr, NodePool};
use rand::prelude::*;
use serde::Deserialize;
use std::net::SocketAddr;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
use tokio::sync::{mpsc, oneshot, watch};
use topology::{create_topology_task, TaskConnectionInfo};
Expand Down Expand Up @@ -744,6 +744,7 @@ impl CassandraSinkCluster {
let mut broadcast_address_alias = "broadcast_address";
let mut listen_address_alias = "listen_address";
let mut host_id_alias = "host_id";
let mut rpc_address_alias = "rpc_address";
let mut rpc_port_alias = "rpc_port";
for select in &table.selects {
if let SelectElement::Column(column) = select {
Expand All @@ -764,6 +765,8 @@ impl CassandraSinkCluster {
listen_address_alias = alias;
} else if column.name == Identifier::Unquoted("host_id".to_string()) {
host_id_alias = alias;
} else if column.name == Identifier::Unquoted("rpc_address".to_string()) {
rpc_address_alias = alias
} else if column.name == Identifier::Unquoted("rpc_port".to_string()) {
rpc_port_alias = alias
}
Expand Down Expand Up @@ -816,6 +819,12 @@ impl CassandraSinkCluster {
if let MessageValue::Uuid(host_id) = col {
*host_id = self.local_shotover_node.host_id;
}
} else if col_meta.name == rpc_address_alias {
if let MessageValue::Inet(address) = col {
if address != &IpAddr::V4(Ipv4Addr::UNSPECIFIED) {
*address = self.local_shotover_node.address.ip()
}
}
} else if col_meta.name == rpc_port_alias {
if let MessageValue::Integer(rpc_port, _) = col {
*rpc_port = self.local_shotover_node.address.port() as i64;
Expand Down

0 comments on commit 8fb07c2

Please sign in to comment.