Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nodes list is filtered by rack #749

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ chain_config:
- CassandraSinkCluster:
first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"]
data_center: "dc1"
rack: "rack1"
tls:
certificate_authority_path: "example-configs/cassandra-tls/certs/localhost_CA.crt"
certificate_path: "example-configs/cassandra-tls/certs/localhost.crt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ chain_config:
- CassandraSinkCluster:
first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"]
data_center: "dc1"
rack: "rack1"
source_to_chain_mapping:
cassandra_prod: main_chain
32 changes: 21 additions & 11 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod node;
pub struct CassandraSinkClusterConfig {
pub first_contact_points: Vec<String>,
pub data_center: String,
pub rack: String,
pub tls: Option<TlsConnectorConfig>,
pub read_timeout: Option<u64>,
}
Expand All @@ -41,6 +42,7 @@ impl CassandraSinkClusterConfig {
self.first_contact_points.clone(),
chain_name,
self.data_center.clone(),
self.rack.clone(),
tls,
self.read_timeout,
)))
Expand All @@ -61,6 +63,7 @@ pub struct CassandraSinkCluster {
read_timeout: Option<Duration>,
peer_table: FQName,
data_center: String,
rack: String,
/// A local clone of topology_task_nodes
/// Internally stores connections to the nodes
local_nodes: Vec<CassandraNode>,
Expand All @@ -87,6 +90,7 @@ impl Clone for CassandraSinkCluster {
read_timeout: self.read_timeout,
peer_table: self.peer_table.clone(),
data_center: self.data_center.clone(),
rack: self.rack.clone(),
local_nodes: vec![],
topology_task_nodes: self.topology_task_nodes.clone(),
rng: SmallRng::from_rng(rand::thread_rng()).unwrap(),
Expand All @@ -100,6 +104,7 @@ impl CassandraSinkCluster {
contact_points: Vec<String>,
chain_name: String,
data_center: String,
rack: String,
tls: Option<TlsConnector>,
timeout: Option<u64>,
) -> CassandraSinkCluster {
Expand All @@ -114,6 +119,7 @@ impl CassandraSinkCluster {
nodes_shared.clone(),
task_handshake_rx,
data_center.clone(),
rack.clone(),
);

CassandraSinkCluster {
Expand All @@ -130,6 +136,7 @@ impl CassandraSinkCluster {
read_timeout: receive_timeout,
peer_table: FQName::new("system", "peers"),
data_center,
rack,
local_nodes: vec![],
topology_task_nodes: nodes_shared,
rng: SmallRng::from_rng(rand::thread_rng()).unwrap(),
Expand Down Expand Up @@ -275,11 +282,13 @@ pub fn create_topology_task(
nodes: Arc<RwLock<Vec<CassandraNode>>>,
mut handshake_rx: mpsc::Receiver<TaskHandshake>,
data_center: String,
rack: String,
) {
tokio::spawn(async move {
while let Some(handshake) = handshake_rx.recv().await {
let mut attempts = 0;
while let Err(err) = topology_task_process(&tls, &nodes, &handshake, &data_center).await
while let Err(err) =
topology_task_process(&tls, &nodes, &handshake, &data_center, &rack).await
{
tracing::error!("topology task failed, retrying, error was: {err:?}");
attempts += 1;
Expand All @@ -304,6 +313,7 @@ async fn topology_task_process(
nodes: &Arc<RwLock<Vec<CassandraNode>>>,
handshake: &TaskHandshake,
data_center: &str,
rack: &str,
) -> Result<()> {
let outbound =
node::new_connection(&handshake.address, &handshake.handshake, tls, &None).await?;
Expand Down Expand Up @@ -343,8 +353,8 @@ async fn topology_task_process(
)?;

let (new_nodes, more_nodes) = tokio::join!(
async { system_peers_into_nodes(peers_rx.await?.response?, data_center) },
async { system_peers_into_nodes(local_rx.await?.response?, data_center) }
async { system_peers_into_nodes(peers_rx.await?.response?, data_center, rack) },
async { system_peers_into_nodes(local_rx.await?.response?, data_center, rack) }
);
let mut new_nodes = new_nodes?;
new_nodes.extend(more_nodes?);
Expand All @@ -362,6 +372,7 @@ async fn topology_task_process(
fn system_peers_into_nodes(
mut response: Message,
config_data_center: &str,
config_rack: &str,
) -> Result<Vec<CassandraNode>> {
if let Some(Frame::Cassandra(frame)) = response.frame() {
match &mut frame.operation {
Expand All @@ -371,8 +382,12 @@ fn system_peers_into_nodes(
}) => rows
.iter_mut()
.filter(|row| {
if let Some(MessageValue::Varchar(data_center)) = row.get(2) {
data_center == config_data_center
if let (
Some(MessageValue::Varchar(data_center)),
Some(MessageValue::Varchar(rack)),
) = (row.get(2), row.get(1))
{
data_center == config_data_center && rack == config_rack
} else {
false
}
Expand All @@ -393,11 +408,7 @@ fn system_peers_into_nodes(
return Err(anyhow!("tokens not a list"));
};
let _data_center = row.pop();
let rack = if let Some(MessageValue::Varchar(value)) = row.pop() {
value
} else {
return Err(anyhow!("rack not a varchar"));
};
let _rack = row.pop();
let address = if let Some(MessageValue::Inet(value)) = row.pop() {
value
} else {
Expand All @@ -406,7 +417,6 @@ fn system_peers_into_nodes(

Ok(CassandraNode {
address,
_rack: rack,
_tokens: tokens,
outbound: None,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use tokio::sync::{mpsc, oneshot};
#[derive(Debug, Clone)]
pub struct CassandraNode {
pub address: IpAddr,
pub _rack: String,
pub _tokens: Vec<String>,
pub outbound: Option<CassandraConnection>,
}
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/tests/cassandra_int_tests/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub async fn test() {
nodes_shared.clone(),
task_handshake_rx,
"dc1".to_string(),
"rack1".to_string(),
);

// Give the handshake task a hardcoded handshake.
Expand Down Expand Up @@ -54,7 +55,6 @@ pub async fn test() {
.expect("Node did not contain a unique expected address");
possible_addresses.remove(address_index);

assert_eq!(node._rack, "rack1");
assert_eq!(node._tokens.len(), 128);
}
}
Expand Down