Skip to content

Commit

Permalink
Nodes list is filtered by rack
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Aug 22, 2022
1 parent 79c181f commit b71f45f
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ chain_config:
- CassandraSinkCluster:
first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"]
data_center: "dc1"
rack: "rack1"
tls:
certificate_authority_path: "example-configs/cassandra-tls/certs/localhost_CA.crt"
certificate_path: "example-configs/cassandra-tls/certs/localhost.crt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ chain_config:
- CassandraSinkCluster:
first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"]
data_center: "dc1"
rack: "rack1"
source_to_chain_mapping:
cassandra_prod: main_chain
32 changes: 21 additions & 11 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod node;
pub struct CassandraSinkClusterConfig {
pub first_contact_points: Vec<String>,
pub data_center: String,
pub rack: String,
pub tls: Option<TlsConnectorConfig>,
pub read_timeout: Option<u64>,
}
Expand All @@ -41,6 +42,7 @@ impl CassandraSinkClusterConfig {
self.first_contact_points.clone(),
chain_name,
self.data_center.clone(),
self.rack.clone(),
tls,
self.read_timeout,
)))
Expand All @@ -61,6 +63,7 @@ pub struct CassandraSinkCluster {
read_timeout: Option<Duration>,
peer_table: FQName,
data_center: String,
rack: String,
/// A local clone of topology_task_nodes
/// Internally stores connections to the nodes
local_nodes: Vec<CassandraNode>,
Expand All @@ -87,6 +90,7 @@ impl Clone for CassandraSinkCluster {
read_timeout: self.read_timeout,
peer_table: self.peer_table.clone(),
data_center: self.data_center.clone(),
rack: self.rack.clone(),
local_nodes: vec![],
topology_task_nodes: self.topology_task_nodes.clone(),
rng: SmallRng::from_rng(rand::thread_rng()).unwrap(),
Expand All @@ -100,6 +104,7 @@ impl CassandraSinkCluster {
contact_points: Vec<String>,
chain_name: String,
data_center: String,
rack: String,
tls: Option<TlsConnector>,
timeout: Option<u64>,
) -> CassandraSinkCluster {
Expand All @@ -114,6 +119,7 @@ impl CassandraSinkCluster {
nodes_shared.clone(),
task_handshake_rx,
data_center.clone(),
rack.clone(),
);

CassandraSinkCluster {
Expand All @@ -130,6 +136,7 @@ impl CassandraSinkCluster {
read_timeout: receive_timeout,
peer_table: FQName::new("system", "peers"),
data_center,
rack,
local_nodes: vec![],
topology_task_nodes: nodes_shared,
rng: SmallRng::from_rng(rand::thread_rng()).unwrap(),
Expand Down Expand Up @@ -275,11 +282,13 @@ pub fn create_topology_task(
nodes: Arc<RwLock<Vec<CassandraNode>>>,
mut handshake_rx: mpsc::Receiver<TaskHandshake>,
data_center: String,
rack: String,
) {
tokio::spawn(async move {
while let Some(handshake) = handshake_rx.recv().await {
let mut attempts = 0;
while let Err(err) = topology_task_process(&tls, &nodes, &handshake, &data_center).await
while let Err(err) =
topology_task_process(&tls, &nodes, &handshake, &data_center, &rack).await
{
tracing::error!("topology task failed, retrying, error was: {err:?}");
attempts += 1;
Expand All @@ -304,6 +313,7 @@ async fn topology_task_process(
nodes: &Arc<RwLock<Vec<CassandraNode>>>,
handshake: &TaskHandshake,
data_center: &str,
rack: &str,
) -> Result<()> {
let outbound =
node::new_connection(&handshake.address, &handshake.handshake, tls, &None).await?;
Expand Down Expand Up @@ -343,8 +353,8 @@ async fn topology_task_process(
)?;

let (new_nodes, more_nodes) = tokio::join!(
async { system_peers_into_nodes(peers_rx.await?.response?, data_center) },
async { system_peers_into_nodes(local_rx.await?.response?, data_center) }
async { system_peers_into_nodes(peers_rx.await?.response?, data_center, rack) },
async { system_peers_into_nodes(local_rx.await?.response?, data_center, rack) }
);
let mut new_nodes = new_nodes?;
new_nodes.extend(more_nodes?);
Expand All @@ -362,6 +372,7 @@ async fn topology_task_process(
fn system_peers_into_nodes(
mut response: Message,
config_data_center: &str,
config_rack: &str,
) -> Result<Vec<CassandraNode>> {
if let Some(Frame::Cassandra(frame)) = response.frame() {
match &mut frame.operation {
Expand All @@ -371,8 +382,12 @@ fn system_peers_into_nodes(
}) => rows
.iter_mut()
.filter(|row| {
if let Some(MessageValue::Varchar(data_center)) = row.get(2) {
data_center == config_data_center
if let (
Some(MessageValue::Varchar(data_center)),
Some(MessageValue::Varchar(rack)),
) = (row.get(2), row.get(1))
{
data_center == config_data_center && rack == config_rack
} else {
false
}
Expand All @@ -393,11 +408,7 @@ fn system_peers_into_nodes(
return Err(anyhow!("tokens not a list"));
};
let _data_center = row.pop();
let rack = if let Some(MessageValue::Varchar(value)) = row.pop() {
value
} else {
return Err(anyhow!("rack not a varchar"));
};
let _rack = row.pop();
let address = if let Some(MessageValue::Inet(value)) = row.pop() {
value
} else {
Expand All @@ -406,7 +417,6 @@ fn system_peers_into_nodes(

Ok(CassandraNode {
address,
_rack: rack,
_tokens: tokens,
outbound: None,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use tokio::sync::{mpsc, oneshot};
#[derive(Debug, Clone)]
pub struct CassandraNode {
pub address: IpAddr,
pub _rack: String,
pub _tokens: Vec<String>,
pub outbound: Option<CassandraConnection>,
}
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/tests/cassandra_int_tests/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub async fn test() {
nodes_shared.clone(),
task_handshake_rx,
"dc1".to_string(),
"rack1".to_string(),
);

// Give the handshake task a hardcoded handshake.
Expand Down Expand Up @@ -54,7 +55,6 @@ pub async fn test() {
.expect("Node did not contain a unique expected address");
possible_addresses.remove(address_index);

assert_eq!(node._rack, "rack1");
assert_eq!(node._tokens.len(), 128);
}
}
Expand Down

0 comments on commit b71f45f

Please sign in to comment.