Skip to content

Commit

Permalink
Add test_cluster_multi_rack (#757)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Aug 25, 2022
1 parent 6c81054 commit 97d30e7
Show file tree
Hide file tree
Showing 9 changed files with 331 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
version: "3.3"
networks:
cluster_subnet:
name: cluster_subnet
driver: bridge
ipam:
driver: default
config:
- subnet: 172.16.1.0/24
gateway: 172.16.1.1

services:
cassandra-one:
image: bitnami/cassandra:3.11
networks:
cluster_subnet:
ipv4_address: 172.16.1.2
healthcheck:
&healthcheck
test: [ "CMD", "cqlsh", "-e", "describe keyspaces" ]
interval: 5s
timeout: 5s
retries: 60
environment:
CASSANDRA_SEEDS: "cassandra-one,cassandra-two"
CASSANDRA_CLUSTER_NAME: TestCluster
CASSANDRA_DC: dc1
CASSANDRA_RACK: rack1
CASSANDRA_ENDPOINT_SNITCH: GossipingPropertyFileSnitch
CASSANDRA_NUM_TOKENS: 128
MAX_HEAP_SIZE: "400M"
MIN_HEAP_SIZE: "400M"
HEAP_NEWSIZE: "48M"
CASSANDRA_ENABLE_SCRIPTED_USER_DEFINED_FUNCTIONS: "true"
CASSANDRA_ENABLE_USER_DEFINED_FUNCTIONS: "true"

cassandra-two:
image: bitnami/cassandra:3.11
networks:
cluster_subnet:
ipv4_address: 172.16.1.3
healthcheck: *healthcheck
environment:
CASSANDRA_SEEDS: "cassandra-one,cassandra-two"
CASSANDRA_CLUSTER_NAME: TestCluster
CASSANDRA_DC: dc1
CASSANDRA_RACK: rack2
CASSANDRA_ENDPOINT_SNITCH: GossipingPropertyFileSnitch
CASSANDRA_NUM_TOKENS: 128
MAX_HEAP_SIZE: "400M"
MIN_HEAP_SIZE: "400M"
HEAP_NEWSIZE: "48M"
CASSANDRA_ENABLE_SCRIPTED_USER_DEFINED_FUNCTIONS: "true"
CASSANDRA_ENABLE_USER_DEFINED_FUNCTIONS: "true"

cassandra-three:
image: bitnami/cassandra:3.11
networks:
cluster_subnet:
ipv4_address: 172.16.1.4
healthcheck: *healthcheck
environment:
CASSANDRA_SEEDS: "cassandra-one,cassandra-two"
CASSANDRA_CLUSTER_NAME: TestCluster
CASSANDRA_DC: dc1
CASSANDRA_RACK: rack3
CASSANDRA_ENDPOINT_SNITCH: GossipingPropertyFileSnitch
CASSANDRA_NUM_TOKENS: 128
MAX_HEAP_SIZE: "400M"
MIN_HEAP_SIZE: "400M"
HEAP_NEWSIZE: "48M"
CASSANDRA_ENABLE_SCRIPTED_USER_DEFINED_FUNCTIONS: "true"
CASSANDRA_ENABLE_USER_DEFINED_FUNCTIONS: "true"
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
sources:
cassandra_prod:
Cassandra:
listen_addr: "127.0.0.1:9042"
chain_config:
main_chain:
- CassandraSinkCluster:
first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"]
data_center: "dc1"
rack: "rack1"
host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a"
source_to_chain_mapping:
cassandra_prod: main_chain
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
sources:
cassandra_prod:
Cassandra:
listen_addr: "127.0.0.2:9042"
chain_config:
main_chain:
- CassandraSinkCluster:
first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"]
data_center: "dc1"
rack: "rack2"
host_id: "3c3c4e2d-ba74-4f76-b52e-fb5bcee6a9f4"
source_to_chain_mapping:
cassandra_prod: main_chain
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
sources:
cassandra_prod:
Cassandra:
listen_addr: "127.0.0.3:9042"
chain_config:
main_chain:
- CassandraSinkCluster:
first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"]
data_center: "dc1"
rack: "rack3"
host_id: "fa74d7ec-1223-472b-97de-04a32ccdb70b"
source_to_chain_mapping:
cassandra_prod: main_chain
55 changes: 40 additions & 15 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tokio::sync::{mpsc, oneshot, RwLock};
use uuid::Uuid;
use version_compare::Cmp;

mod node;
pub mod node;

#[derive(Deserialize, Debug, Clone)]
pub struct CassandraSinkClusterConfig {
Expand Down Expand Up @@ -458,10 +458,41 @@ impl CassandraSinkCluster {
metadata,
}) = &mut frame.operation
{
// TODO: if rack and data_center not in query then we cant perform this filtering,
// we will need to do an additional system.local query to get that information...
let mut is_in_data_center = true;
let mut is_in_rack = true;
for row in rows.iter_mut() {
for (col, col_meta) in row.iter_mut().zip(metadata.col_specs.iter()) {
if col_meta.name == rack_alias {
if let MessageValue::Varchar(rack) = col {
is_in_rack = rack == &self.rack;
if !is_in_rack {
*rack = self.rack.clone();
tracing::warn!("A contact point node is not in the configured rack, this node will receive traffic from outside of its rack");
}
}
} else if col_meta.name == data_center_alias {
if let MessageValue::Varchar(data_center) = col {
is_in_data_center = data_center == &self.data_center;
if !is_in_data_center {
*data_center = self.data_center.clone();
tracing::warn!("A contact point node is not in the configured data_center, this node will receive traffic from outside of its data_center");
}
}
}
}
}

for row in rows {
for (col, col_meta) in row.iter_mut().zip(metadata.col_specs.iter()) {
if col_meta.name == release_version_alias {
if let MessageValue::Varchar(release_version) = col {
if !is_in_data_center || !is_in_rack {
if let Some(peer) = peers.first() {
*release_version = peer.release_version.clone();
}
}
for peer in &peers {
if let Ok(Cmp::Lt) = version_compare::compare(
&peer.release_version,
Expand All @@ -473,13 +504,21 @@ impl CassandraSinkCluster {
}
} else if col_meta.name == tokens_alias {
if let MessageValue::List(tokens) = col {
if !is_in_data_center || !is_in_rack {
tokens.clear();
}
for peer in &peers {
tokens.extend(peer.tokens.iter().cloned());
}
tokens.sort();
}
} else if col_meta.name == schema_version_alias {
if let MessageValue::Uuid(schema_version) = col {
if !is_in_data_center || !is_in_rack {
if let Some(peer) = peers.first() {
*schema_version = peer.schema_version;
}
}
for peer in &peers {
if schema_version != &peer.schema_version {
*schema_version = Uuid::new_v4();
Expand All @@ -497,20 +536,6 @@ impl CassandraSinkCluster {
if let MessageValue::Uuid(host_id) = col {
*host_id = self.host_id;
}
} else if col_meta.name == rack_alias {
if let MessageValue::Varchar(rack) = col {
if rack != &self.rack {
*rack = self.rack.clone();
tracing::warn!("A contact point node is not in the configured rack, this node will receive traffic from outside of its rack");
}
}
} else if col_meta.name == data_center_alias {
if let MessageValue::Varchar(data_center) = col {
if data_center != &self.data_center {
*data_center = self.data_center.clone();
tracing::warn!("A contact point node is not in the configured data_center, this node will receive traffic from outside of its data_center");
}
}
}
}
}
Expand Down
41 changes: 23 additions & 18 deletions shotover-proxy/tests/cassandra_int_tests/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use cassandra_protocol::frame::Version;
use shotover_proxy::frame::{CassandraFrame, CassandraOperation, Frame};
use shotover_proxy::message::Message;
use shotover_proxy::tls::{TlsConnector, TlsConnectorConfig};
use shotover_proxy::transforms::cassandra::sink_cluster::node::CassandraNode;
use shotover_proxy::transforms::cassandra::sink_cluster::{create_topology_task, TaskHandshake};
use std::net::IpAddr;
use std::sync::Arc;
Expand Down Expand Up @@ -64,6 +65,27 @@ pub async fn test(connection: &CassandraConnection) {
}

pub async fn test_topology_task(ca_path: Option<&str>) {
let nodes = run_topology_task(ca_path).await;

assert_eq!(nodes.len(), 3);
let mut possible_addresses: Vec<IpAddr> = vec![
"172.16.1.2".parse().unwrap(),
"172.16.1.3".parse().unwrap(),
"172.16.1.4".parse().unwrap(),
];
for node in &nodes {
let address_index = possible_addresses
.iter()
.position(|x| *x == node.address)
.expect("Node did not contain a unique expected address");
possible_addresses.remove(address_index);

assert_eq!(node._rack, "rack1");
assert_eq!(node._tokens.len(), 128);
}
}

pub async fn run_topology_task(ca_path: Option<&str>) -> Vec<CassandraNode> {
// Directly test the internal topology task
let nodes_shared = Arc::new(RwLock::new(vec![]));
let (task_handshake_tx, task_handshake_rx) = mpsc::channel(1);
Expand Down Expand Up @@ -104,24 +126,7 @@ pub async fn test_topology_task(ca_path: Option<&str>) {
}
tries += 1;
}

// make assertions on the nodes list
assert_eq!(nodes.len(), 3);
let mut possible_addresses: Vec<IpAddr> = vec![
"172.16.1.2".parse().unwrap(),
"172.16.1.3".parse().unwrap(),
"172.16.1.4".parse().unwrap(),
];
for node in &nodes {
let address_index = possible_addresses
.iter()
.position(|x| *x == node.address)
.expect("Node did not contain a unique expected address");
possible_addresses.remove(address_index);

assert_eq!(node._rack, "rack1");
assert_eq!(node._tokens.len(), 128);
}
nodes
}

fn create_handshake() -> Vec<Message> {
Expand Down
102 changes: 102 additions & 0 deletions shotover-proxy/tests/cassandra_int_tests/cluster_multi_rack.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use crate::cassandra_int_tests::cluster::run_topology_task;
use crate::helpers::cassandra::{assert_query_result, CassandraConnection, ResultValue};
use std::net::IpAddr;

async fn test_rewrite_system_peers(connection: &CassandraConnection) {
let all_columns = "peer, data_center, host_id, preferred_ip, rack, release_version, rpc_address, schema_version, tokens";
assert_query_result(connection, "SELECT * FROM system.peers;", &[]).await;
assert_query_result(
connection,
&format!("SELECT {all_columns} FROM system.peers;"),
&[],
)
.await;
assert_query_result(
connection,
&format!("SELECT {all_columns}, {all_columns} FROM system.peers;"),
&[],
)
.await;
}

async fn test_rewrite_system_local(connection: &CassandraConnection) {
let star_results = [
ResultValue::Varchar("local".into()),
ResultValue::Varchar("COMPLETED".into()),
// broadcast address is non-deterministic because we dont know which node this will be
ResultValue::Any,
ResultValue::Varchar("TestCluster".into()),
ResultValue::Varchar("3.4.4".into()),
ResultValue::Varchar("dc1".into()),
// gossip_generation is non deterministic cant assert on it
ResultValue::Any,
// host_id is non-deterministic because we dont know which node this will be
ResultValue::Any,
ResultValue::Inet("127.0.0.1".parse().unwrap()),
ResultValue::Varchar("4".into()),
ResultValue::Varchar("org.apache.cassandra.dht.Murmur3Partitioner".into()),
// rack is non-deterministic because we dont know which node this will be
ResultValue::Any,
ResultValue::Varchar("3.11.13".into()),
ResultValue::Inet("0.0.0.0".parse().unwrap()),
// schema_version is non deterministic so we cant assert on it.
ResultValue::Any,
// thrift_version isnt used anymore so I dont really care what it maps to
ResultValue::Any,
// Unfortunately token generation appears to be non-deterministic but we can at least assert that
// there are 128 tokens per node
ResultValue::Set(std::iter::repeat(ResultValue::Any).take(128).collect()),
ResultValue::Map(vec![]),
];

let all_columns =
"key, bootstrapped, broadcast_address, cluster_name, cql_version, data_center,
gossip_generation, host_id, listen_address, native_protocol_version, partitioner, rack,
release_version, rpc_address, schema_version, thrift_version, tokens, truncated_at";

assert_query_result(connection, "SELECT * FROM system.local;", &[&star_results]).await;
assert_query_result(
connection,
&format!("SELECT {all_columns} FROM system.local;"),
&[&star_results],
)
.await;
assert_query_result(
connection,
&format!("SELECT {all_columns}, {all_columns} FROM system.local;"),
&[&[star_results.as_slice(), star_results.as_slice()].concat()],
)
.await;
}

pub async fn test(connection: &CassandraConnection) {
test_rewrite_system_local(connection).await;
test_rewrite_system_peers(connection).await;
}

pub async fn test_topology_task(ca_path: Option<&str>) {
let nodes = run_topology_task(ca_path).await;

assert_eq!(nodes.len(), 3);
let mut possible_addresses: Vec<IpAddr> = vec![
"172.16.1.2".parse().unwrap(),
"172.16.1.3".parse().unwrap(),
"172.16.1.4".parse().unwrap(),
];
let mut possible_racks: Vec<&str> = vec!["rack1", "rack2", "rack3"];
for node in &nodes {
let address_index = possible_addresses
.iter()
.position(|x| *x == node.address)
.expect("Node did not contain a unique expected address");
possible_addresses.remove(address_index);

let rack_index = possible_racks
.iter()
.position(|x| *x == node._rack)
.expect("Node did not contain a unique expected rack");
possible_racks.remove(rack_index);

assert_eq!(node._tokens.len(), 128);
}
}
Loading

0 comments on commit 97d30e7

Please sign in to comment.