Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test_cluster_single_rack: duplicate into v3 and v4 variants #780

Merged
merged 1 commit into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ networks:

services:
cassandra-one:
image: bitnami/cassandra:3.11
image: bitnami/cassandra:4.0.6
networks:
cluster_subnet:
ipv4_address: 172.16.1.2
Expand Down Expand Up @@ -43,7 +43,7 @@ services:
- ../cassandra-tls/certs/truststore.p12:/bitnami/cassandra/secrets/truststore

cassandra-two:
image: bitnami/cassandra:3.11
image: bitnami/cassandra:4.0.6
networks:
cluster_subnet:
ipv4_address: 172.16.1.3
Expand All @@ -52,7 +52,7 @@ services:
volumes: *volumes

cassandra-three:
image: bitnami/cassandra:3.11
image: bitnami/cassandra:4.0.6
networks:
cluster_subnet:
ipv4_address: 172.16.1.4
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
version: "3.3"
networks:
cluster_subnet:
name: cluster_subnet
driver: bridge
ipam:
driver: default
config:
- subnet: 172.16.1.0/24
gateway: 172.16.1.1

services:
cassandra-one:
image: bitnami/cassandra:4.0.6
networks:
cluster_subnet:
ipv4_address: 172.16.1.2
healthcheck:
&healthcheck
test: [ "CMD", "cqlsh", "-e", "describe keyspaces" ]
interval: 5s
timeout: 5s
retries: 60
environment:
&environment
CASSANDRA_SEEDS: "cassandra-one,cassandra-two"
CASSANDRA_CLUSTER_NAME: TestCluster
CASSANDRA_DC: dc1
CASSANDRA_RACK: rack1
CASSANDRA_ENDPOINT_SNITCH: GossipingPropertyFileSnitch
CASSANDRA_NUM_TOKENS: 128
MAX_HEAP_SIZE: "400M"
MIN_HEAP_SIZE: "400M"
HEAP_NEWSIZE: "48M"
CASSANDRA_ENABLE_SCRIPTED_USER_DEFINED_FUNCTIONS: "true"
CASSANDRA_ENABLE_USER_DEFINED_FUNCTIONS: "true"

cassandra-two:
image: bitnami/cassandra:4.0.6
networks:
cluster_subnet:
ipv4_address: 172.16.1.3
healthcheck: *healthcheck
environment: *environment

cassandra-three:
image: bitnami/cassandra:4.0.6
networks:
cluster_subnet:
ipv4_address: 172.16.1.4
healthcheck: *healthcheck
environment: *environment
149 changes: 0 additions & 149 deletions shotover-proxy/tests/cassandra_int_tests/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::helpers::cassandra::{assert_query_result, CassandraConnection, ResultValue};
use cassandra_protocol::frame::Version;
use shotover_proxy::frame::{CassandraFrame, CassandraOperation, Frame};
use shotover_proxy::message::Message;
Expand All @@ -8,157 +7,9 @@ use shotover_proxy::transforms::cassandra::sink_cluster::{
node::{CassandraNode, ConnectionFactory},
TaskConnectionInfo,
};
use std::net::IpAddr;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};

async fn test_rewrite_system_peers(connection: &CassandraConnection) {
let all_columns = "peer, data_center, host_id, preferred_ip, rack, release_version, rpc_address, schema_version, tokens";
assert_query_result(connection, "SELECT * FROM system.peers;", &[]).await;
assert_query_result(
connection,
&format!("SELECT {all_columns} FROM system.peers;"),
&[],
)
.await;
assert_query_result(
connection,
&format!("SELECT {all_columns}, {all_columns} FROM system.peers;"),
&[],
)
.await;
}
async fn test_rewrite_system_peers_dummy_peers(connection: &CassandraConnection) {
let star_results1 = [
ResultValue::Inet("127.0.0.1".parse().unwrap()),
ResultValue::Varchar("dc1".into()),
ResultValue::Uuid("3c3c4e2d-ba74-4f76-b52e-fb5bcee6a9f4".parse().unwrap()),
ResultValue::Inet("255.255.255.255".into()),
ResultValue::Varchar("rack1".into()),
ResultValue::Varchar("3.11.13".into()),
ResultValue::Inet("255.255.255.255".into()),
// schema_version is non deterministic so we cant assert on it.
ResultValue::Any,
// Unfortunately token generation appears to be non-deterministic but we can at least assert that
// there are 128 tokens per node
ResultValue::Set(std::iter::repeat(ResultValue::Any).take(3 * 128).collect()),
];
let star_results2 = [
ResultValue::Inet("127.0.0.1".parse().unwrap()),
ResultValue::Varchar("dc1".into()),
ResultValue::Uuid("fa74d7ec-1223-472b-97de-04a32ccdb70b".parse().unwrap()),
ResultValue::Inet("255.255.255.255".into()),
ResultValue::Varchar("rack1".into()),
ResultValue::Varchar("3.11.13".into()),
ResultValue::Inet("255.255.255.255".into()),
// schema_version is non deterministic so we cant assert on it.
ResultValue::Any,
// Unfortunately token generation appears to be non-deterministic but we can at least assert that
// there are 128 tokens per node
ResultValue::Set(std::iter::repeat(ResultValue::Any).take(3 * 128).collect()),
];

let all_columns = "peer, data_center, host_id, preferred_ip, rack, release_version, rpc_address, schema_version, tokens";
assert_query_result(
connection,
"SELECT * FROM system.peers;",
&[&star_results1, &star_results2],
)
.await;
assert_query_result(
connection,
&format!("SELECT {all_columns} FROM system.peers;"),
&[&star_results1, &star_results2],
)
.await;
assert_query_result(
connection,
&format!("SELECT {all_columns}, {all_columns} FROM system.peers;"),
&[
&[star_results1.as_slice(), star_results1.as_slice()].concat(),
&[star_results2.as_slice(), star_results2.as_slice()].concat(),
],
)
.await;
}

async fn test_rewrite_system_local(connection: &CassandraConnection) {
let star_results = [
ResultValue::Varchar("local".into()),
ResultValue::Varchar("COMPLETED".into()),
ResultValue::Inet("127.0.0.1".parse().unwrap()),
ResultValue::Varchar("TestCluster".into()),
ResultValue::Varchar("3.4.4".into()),
ResultValue::Varchar("dc1".into()),
// gossip_generation is non deterministic cant assert on it
ResultValue::Any,
ResultValue::Uuid("2dd022d6-2937-4754-89d6-02d2933a8f7a".parse().unwrap()),
ResultValue::Inet("127.0.0.1".parse().unwrap()),
ResultValue::Varchar("4".into()),
ResultValue::Varchar("org.apache.cassandra.dht.Murmur3Partitioner".into()),
ResultValue::Varchar("rack1".into()),
ResultValue::Varchar("3.11.13".into()),
ResultValue::Inet("0.0.0.0".parse().unwrap()),
// schema_version is non deterministic so we cant assert on it.
ResultValue::Any,
// thrift_version isnt used anymore so I dont really care what it maps to
ResultValue::Any,
// Unfortunately token generation appears to be non-deterministic but we can at least assert that
// there are 128 tokens per node
ResultValue::Set(std::iter::repeat(ResultValue::Any).take(3 * 128).collect()),
ResultValue::Map(vec![]),
];

let all_columns =
"key, bootstrapped, broadcast_address, cluster_name, cql_version, data_center,
gossip_generation, host_id, listen_address, native_protocol_version, partitioner, rack,
release_version, rpc_address, schema_version, thrift_version, tokens, truncated_at";

assert_query_result(connection, "SELECT * FROM system.local;", &[&star_results]).await;
assert_query_result(
connection,
&format!("SELECT {all_columns} FROM system.local;"),
&[&star_results],
)
.await;
assert_query_result(
connection,
&format!("SELECT {all_columns}, {all_columns} FROM system.local;"),
&[&[star_results.as_slice(), star_results.as_slice()].concat()],
)
.await;
}

pub async fn test(connection: &CassandraConnection) {
test_rewrite_system_local(connection).await;
test_rewrite_system_peers(connection).await;
}

pub async fn test_dummy_peers(connection: &CassandraConnection) {
test_rewrite_system_local(connection).await;
test_rewrite_system_peers_dummy_peers(connection).await;
}

pub async fn test_topology_task(ca_path: Option<&str>) {
let nodes = run_topology_task(ca_path).await;

assert_eq!(nodes.len(), 3);
let mut possible_addresses: Vec<IpAddr> = vec![
"172.16.1.2".parse().unwrap(),
"172.16.1.3".parse().unwrap(),
"172.16.1.4".parse().unwrap(),
];
for node in &nodes {
let address_index = possible_addresses
.iter()
.position(|x| *x == node.address)
.expect("Node did not contain a unique expected address");
possible_addresses.remove(address_index);

assert_eq!(node.rack, "rack1");
assert_eq!(node._tokens.len(), 128);
}
}
pub async fn run_topology_task(ca_path: Option<&str>) -> Vec<CassandraNode> {
let nodes_shared = Arc::new(RwLock::new(vec![]));
let (task_handshake_tx, task_handshake_rx) = mpsc::channel(1);
Expand Down
107 changes: 107 additions & 0 deletions shotover-proxy/tests/cassandra_int_tests/cluster_single_rack_v3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
use crate::helpers::cassandra::{assert_query_result, CassandraConnection, ResultValue};

async fn test_rewrite_system_peers_dummy_peers(connection: &CassandraConnection) {
let star_results1 = [
ResultValue::Inet("127.0.0.1".parse().unwrap()),
ResultValue::Varchar("dc1".into()),
ResultValue::Uuid("3c3c4e2d-ba74-4f76-b52e-fb5bcee6a9f4".parse().unwrap()),
ResultValue::Inet("255.255.255.255".into()),
ResultValue::Varchar("rack1".into()),
ResultValue::Varchar("3.11.13".into()),
ResultValue::Inet("255.255.255.255".into()),
// schema_version is non deterministic so we cant assert on it.
ResultValue::Any,
// Unfortunately token generation appears to be non-deterministic but we can at least assert that
// there are 128 tokens per node
ResultValue::Set(std::iter::repeat(ResultValue::Any).take(3 * 128).collect()),
];
let star_results2 = [
ResultValue::Inet("127.0.0.1".parse().unwrap()),
ResultValue::Varchar("dc1".into()),
ResultValue::Uuid("fa74d7ec-1223-472b-97de-04a32ccdb70b".parse().unwrap()),
ResultValue::Inet("255.255.255.255".into()),
ResultValue::Varchar("rack1".into()),
ResultValue::Varchar("3.11.13".into()),
ResultValue::Inet("255.255.255.255".into()),
// schema_version is non deterministic so we cant assert on it.
ResultValue::Any,
// Unfortunately token generation appears to be non-deterministic but we can at least assert that
// there are 128 tokens per node
ResultValue::Set(std::iter::repeat(ResultValue::Any).take(3 * 128).collect()),
];

let all_columns = "peer, data_center, host_id, preferred_ip, rack, release_version, rpc_address, schema_version, tokens";
assert_query_result(
connection,
"SELECT * FROM system.peers;",
&[&star_results1, &star_results2],
)
.await;
assert_query_result(
connection,
&format!("SELECT {all_columns} FROM system.peers;"),
&[&star_results1, &star_results2],
)
.await;
assert_query_result(
connection,
&format!("SELECT {all_columns}, {all_columns} FROM system.peers;"),
&[
&[star_results1.as_slice(), star_results1.as_slice()].concat(),
&[star_results2.as_slice(), star_results2.as_slice()].concat(),
],
)
.await;
}

async fn test_rewrite_system_local(connection: &CassandraConnection) {
let star_results = [
ResultValue::Varchar("local".into()),
ResultValue::Varchar("COMPLETED".into()),
ResultValue::Inet("127.0.0.1".parse().unwrap()),
ResultValue::Varchar("TestCluster".into()),
ResultValue::Varchar("3.4.4".into()),
ResultValue::Varchar("dc1".into()),
// gossip_generation is non deterministic cant assert on it
ResultValue::Any,
ResultValue::Uuid("2dd022d6-2937-4754-89d6-02d2933a8f7a".parse().unwrap()),
ResultValue::Inet("127.0.0.1".parse().unwrap()),
ResultValue::Varchar("4".into()),
ResultValue::Varchar("org.apache.cassandra.dht.Murmur3Partitioner".into()),
ResultValue::Varchar("rack1".into()),
ResultValue::Varchar("3.11.13".into()),
ResultValue::Inet("0.0.0.0".parse().unwrap()),
// schema_version is non deterministic so we cant assert on it.
ResultValue::Any,
// thrift_version isnt used anymore so I dont really care what it maps to
ResultValue::Any,
// Unfortunately token generation appears to be non-deterministic but we can at least assert that
// there are 128 tokens per node
ResultValue::Set(std::iter::repeat(ResultValue::Any).take(3 * 128).collect()),
ResultValue::Map(vec![]),
];

let all_columns =
"key, bootstrapped, broadcast_address, cluster_name, cql_version, data_center,
gossip_generation, host_id, listen_address, native_protocol_version, partitioner, rack,
release_version, rpc_address, schema_version, thrift_version, tokens, truncated_at";

assert_query_result(connection, "SELECT * FROM system.local;", &[&star_results]).await;
assert_query_result(
connection,
&format!("SELECT {all_columns} FROM system.local;"),
&[&star_results],
)
.await;
assert_query_result(
connection,
&format!("SELECT {all_columns}, {all_columns} FROM system.local;"),
&[&[star_results.as_slice(), star_results.as_slice()].concat()],
)
.await;
}

pub async fn test_dummy_peers(connection: &CassandraConnection) {
test_rewrite_system_local(connection).await;
test_rewrite_system_peers_dummy_peers(connection).await;
}
Loading