From e4f7e6fbe27b6d482d98fcc832c1a600e37eb428 Mon Sep 17 00:00:00 2001 From: conorbros Date: Thu, 25 Aug 2022 17:47:30 +1000 Subject: [PATCH] cdrs-tokio integration --- Cargo.lock | 26 + shotover-proxy/Cargo.toml | 3 + shotover-proxy/benches/cassandra_benches.rs | 74 ++- shotover-proxy/src/message/mod.rs | 2 +- .../cassandra_int_tests/batch_statements.rs | 45 +- .../tests/cassandra_int_tests/functions.rs | 37 +- .../tests/cassandra_int_tests/mod.rs | 289 ++++++----- .../tests/cassandra_int_tests/native_types.rs | 2 +- .../prepared_statements.rs | 52 +- .../tests/cassandra_int_tests/protect.rs | 42 +- shotover-proxy/tests/examples/mod.rs | 7 +- shotover-proxy/tests/helpers/cassandra.rs | 472 +++++++++++++++--- shotover-proxy/tests/helpers/mod.rs | 30 +- test-helpers/src/docker_compose.rs | 12 +- 14 files changed, 681 insertions(+), 412 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ecc884a25..08a628835 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2376,6 +2376,31 @@ dependencies = [ "winapi", ] +[[package]] +name = "rstest" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9c9dc66cc29792b663ffb5269be669f1613664e69ad56441fdb895c2347b930" +dependencies = [ + "futures", + "futures-timer", + "rstest_macros", + "rustc_version", +] + +[[package]] +name = "rstest_macros" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5015e68a0685a95ade3eee617ff7101ab6a3fc689203101ca16ebc16f2b89c66" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "rustc_version", + "syn", +] + [[package]] name = "rusoto_core" version = "0.48.0" @@ -2774,6 +2799,7 @@ dependencies = [ "redis", "redis-protocol", "reqwest", + "rstest", "rusoto_kms", "rusoto_signature", "scylla", diff --git a/shotover-proxy/Cargo.toml b/shotover-proxy/Cargo.toml index 3f1011009..3a7a51162 100644 --- a/shotover-proxy/Cargo.toml +++ b/shotover-proxy/Cargo.toml @@ -11,6 +11,7 @@ license = "Apache-2.0" [features] # Include WIP alpha transforms in the public API alpha-transforms = [] +cpp-driver-tests = [] [dependencies] pretty-hex = "0.3.0" @@ -92,6 +93,7 @@ reqwest = "0.11.6" metrics-util = "0.14.0" cdrs-tokio = { git = "https://github.com/krojew/cdrs-tokio" } scylla = { version = "0.4.7", features = ["ssl"] } +rstest = "0.15.0" [[bench]] name = "redis_benches" @@ -104,3 +106,4 @@ harness = false [[bench]] name = "cassandra_benches" harness = false +required-features = ["cpp-driver-tests"] diff --git a/shotover-proxy/benches/cassandra_benches.rs b/shotover-proxy/benches/cassandra_benches.rs index 1a07bc6cd..105add3f3 100644 --- a/shotover-proxy/benches/cassandra_benches.rs +++ b/shotover-proxy/benches/cassandra_benches.rs @@ -1,4 +1,4 @@ -use cassandra_cpp::{stmt, Cluster, Session, Statement}; +use cassandra_cpp::{stmt, Session, Statement}; use criterion::{criterion_group, criterion_main, Criterion}; use test_helpers::cert::generate_cassandra_test_certs; use test_helpers::docker_compose::DockerCompose; @@ -6,7 +6,7 @@ use test_helpers::lazy::new_lazy_shared; #[path = "../tests/helpers/mod.rs"] mod helpers; -use helpers::cassandra::CassandraConnection; +use helpers::cassandra::{CassandraConnection, CassandraDriver}; use helpers::ShotoverManager; struct Query { @@ -14,6 +14,8 @@ struct Query { statement: Statement, } +const DRIVER: CassandraDriver = CassandraDriver::Datastax; + fn cassandra(c: &mut Criterion) { let mut group = c.benchmark_group("cassandra"); group.throughput(criterion::Throughput::Elements(1)); @@ -48,7 +50,7 @@ fn cassandra(c: &mut Criterion) { |b, resources| { b.iter(|| { let mut resources = resources.borrow_mut(); - let connection = &mut resources.as_mut().unwrap().connection; + let connection = &mut resources.as_mut().unwrap().get_connection(); connection.execute(&query.statement).wait().unwrap(); }) }, @@ -71,7 +73,7 @@ fn cassandra(c: &mut Criterion) { |b, resources| { b.iter(|| { let mut resources = resources.borrow_mut(); - let connection = &mut resources.as_mut().unwrap().connection; + let connection = &mut resources.as_mut().unwrap().get_connection(); connection.execute(&query.statement).wait().unwrap(); }) }, @@ -93,7 +95,7 @@ fn cassandra(c: &mut Criterion) { |b, resources| { b.iter(|| { let mut resources = resources.borrow_mut(); - let connection = &mut resources.as_mut().unwrap().connection; + let connection = &mut resources.as_mut().unwrap().get_connection(); connection.execute(&query.statement).wait().unwrap(); }) }, @@ -116,7 +118,7 @@ fn cassandra(c: &mut Criterion) { |b, resources| { b.iter(|| { let mut resources = resources.borrow_mut(); - let connection = &mut resources.as_mut().unwrap().connection; + let connection = &mut resources.as_mut().unwrap().get_connection(); connection.execute(&query.statement).wait().unwrap(); }) }, @@ -139,7 +141,7 @@ fn cassandra(c: &mut Criterion) { |b, resources| { b.iter(|| { let mut resources = resources.borrow_mut(); - let connection = &mut resources.as_mut().unwrap().connection; + let connection = &mut resources.as_mut().unwrap().get_connection(); connection.execute(&query.statement).wait().unwrap(); }) }, @@ -158,7 +160,7 @@ fn cassandra(c: &mut Criterion) { group.bench_with_input(format!("tls_{}", query.name), &resources, |b, resources| { b.iter(|| { let mut resources = resources.borrow_mut(); - let connection = &mut resources.as_mut().unwrap().connection; + let connection = &mut resources.as_mut().unwrap().get_connection(); connection.execute(&query.statement).wait().unwrap(); }) }); @@ -184,21 +186,21 @@ fn cassandra(c: &mut Criterion) { ); resources - .connection + .get_connection() .execute(&stmt!( "CREATE KEYSPACE test_protect_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };" )) .wait() .unwrap(); resources - .connection + .get_connection() .execute(&stmt!( "CREATE TABLE test_protect_keyspace.test_table (pk varchar PRIMARY KEY, cluster varchar, col1 blob, col2 int, col3 boolean);" )) .wait() .unwrap(); resources - .connection + .get_connection() .execute(&stmt!( "INSERT INTO test_protect_keyspace.test_table (pk, cluster, col1, col2, col3) VALUES ('pk1', 'cluster', 'Initial value', 42, true);" )) @@ -215,7 +217,7 @@ fn cassandra(c: &mut Criterion) { |b, resources| { b.iter(|| { let mut resources = resources.borrow_mut(); - let connection = &mut resources.as_mut().unwrap().connection; + let connection = &mut resources.as_mut().unwrap().get_connection(); connection.execute(&query.statement).wait().unwrap(); }) }, @@ -237,7 +239,7 @@ fn cassandra(c: &mut Criterion) { |b, resources| { b.iter(|| { let mut resources = resources.borrow_mut(); - let connection = &mut resources.as_mut().unwrap().connection; + let connection = &mut resources.as_mut().unwrap().get_connection(); connection.execute(&query.statement).wait().unwrap(); }) }, @@ -252,7 +254,7 @@ criterion_main!(benches); pub struct BenchResources { _compose: DockerCompose, _shotover_manager: ShotoverManager, - connection: Session, + connection: CassandraConnection, } impl BenchResources { @@ -260,15 +262,7 @@ impl BenchResources { let compose = DockerCompose::new(compose_file); let shotover_manager = ShotoverManager::from_topology_file(shotover_topology); - let mut cluster = Cluster::default(); - cluster.set_contact_points("127.0.0.1").unwrap(); - cluster.set_credentials("cassandra", "cassandra").unwrap(); - cluster.set_port(9042).unwrap(); - cluster.set_load_balance_round_robin(); - - // By default unwrap uses the Debug formatter `{:?}` which is extremely noisy for the error type returned by `connect()`. - // So we instead force the Display formatter `{}` on the error. - let connection = cluster.connect().map_err(|err| format!("{err}")).unwrap(); + let connection = shotover_manager.cassandra_connection("127.0.0.1", 9042, DRIVER); let bench_resources = Self { _compose: compose, @@ -279,6 +273,10 @@ impl BenchResources { bench_resources } + pub fn get_connection(&self) -> &Session { + self.connection.as_datastax() + } + fn new_tls(shotover_topology: &str, compose_file: &str) -> Self { generate_cassandra_test_certs(); let compose = DockerCompose::new(compose_file); @@ -286,10 +284,8 @@ impl BenchResources { let ca_cert = "example-configs/cassandra-tls/certs/localhost_CA.crt"; - let CassandraConnection::Datastax { - session: connection, - .. - } = shotover_manager.cassandra_connection_tls("127.0.0.1", 9042, ca_cert); + let connection = + shotover_manager.cassandra_connection_tls("127.0.0.1", 9042, ca_cert, DRIVER); let bench_resources = Self { _compose: compose, @@ -301,24 +297,10 @@ impl BenchResources { } fn setup(&self) { - self.connection - .execute(&stmt!( - "CREATE KEYSPACE benchmark_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };" - )) - .wait().unwrap(); - - self.connection - .execute(&stmt!( - "CREATE TABLE benchmark_keyspace.table_1 (id int PRIMARY KEY, x int, name varchar);" - )) - .wait() - .unwrap(); - - self.connection - .execute(&stmt!( - "INSERT INTO benchmark_keyspace.table_1 (id, x, name) VALUES (0, 10, 'initial value');" - )) - .wait() - .unwrap(); + self.connection.as_datastax().execute(&stmt!("CREATE KEYSPACE benchmark_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };")).wait().unwrap(); + self.connection.as_datastax().execute(&stmt!("CREATE TABLE benchmark_keyspace.table_1 (id int PRIMARY KEY, x int, name varchar);")).wait().unwrap(); + self.connection.as_datastax().execute( + &stmt!("INSERT INTO benchmark_keyspace.table_1 (id, x, name) VALUES (0, 10, 'initial value');"), + ).wait().unwrap(); } } diff --git a/shotover-proxy/src/message/mod.rs b/shotover-proxy/src/message/mod.rs index d1fef5fc1..8d42ebc10 100644 --- a/shotover-proxy/src/message/mod.rs +++ b/shotover-proxy/src/message/mod.rs @@ -300,7 +300,7 @@ impl Message { Metadata::Cassandra(metadata) => { let body = CassandraOperation::Error(ErrorBody { error_code: 0x1001, - message: "".into(), + message: "Server overloaded".into(), additional_info: AdditionalErrorInfo::Overloaded, }); diff --git a/shotover-proxy/tests/cassandra_int_tests/batch_statements.rs b/shotover-proxy/tests/cassandra_int_tests/batch_statements.rs index 111d1740e..ee33b9cab 100644 --- a/shotover-proxy/tests/cassandra_int_tests/batch_statements.rs +++ b/shotover-proxy/tests/cassandra_int_tests/batch_statements.rs @@ -1,5 +1,4 @@ use crate::helpers::cassandra::{assert_query_result, run_query, CassandraConnection, ResultValue}; -use cassandra_cpp::{stmt, Batch, BatchType}; pub async fn test(connection: &CassandraConnection) { // setup keyspace and table for the batch statement tests @@ -9,12 +8,15 @@ pub async fn test(connection: &CassandraConnection) { } { - let mut batch = Batch::new(BatchType::LOGGED); + let mut queries: Vec<(String, i32)> = vec![]; for i in 0..2 { - let statement = format!("INSERT INTO batch_keyspace.batch_table (id, lastname, firstname) VALUES ({}, 'text1', 'text2')", i); - batch.add_statement(&stmt!(statement.as_str())).unwrap(); + queries.push(( + "INSERT INTO batch_keyspace.batch_table (id, lastname, firstname) VALUES (?, 'text1', 'text2')".into(), + i + )); } - connection.execute_batch(&batch); + + connection.execute_batch(queries); assert_query_result( connection, @@ -36,15 +38,15 @@ pub async fn test(connection: &CassandraConnection) { } { - let mut batch = Batch::new(BatchType::LOGGED); + let mut queries: Vec<(String, i32)> = vec![]; for i in 0..2 { - let statement = format!( - "UPDATE batch_keyspace.batch_table SET lastname = 'text3' WHERE id = {};", - i - ); - batch.add_statement(&stmt!(statement.as_str())).unwrap(); + queries.push(( + "UPDATE batch_keyspace.batch_table SET lastname = 'text3' WHERE id = ?;".into(), + i, + )); } - connection.execute_batch(&batch); + + connection.execute_batch(queries); assert_query_result( connection, @@ -66,26 +68,27 @@ pub async fn test(connection: &CassandraConnection) { } { - let mut batch = Batch::new(BatchType::LOGGED); + let mut queries: Vec<(String, i32)> = vec![]; for i in 0..2 { - let statement = format!("DELETE FROM batch_keyspace.batch_table WHERE id = {};", i); - batch.add_statement(&stmt!(statement.as_str())).unwrap(); + queries.push(( + "DELETE FROM batch_keyspace.batch_table WHERE id = ?;".into(), + i, + )); } - connection.execute_batch(&batch); + connection.execute_batch(queries); assert_query_result(connection, "SELECT * FROM batch_keyspace.batch_table;", &[]).await; } { - let batch = Batch::new(BatchType::LOGGED); - connection.execute_batch(&batch); + connection.execute_batch(vec![]); } // test batch statements over QUERY PROTOCOL { let insert_statement = "BEGIN BATCH -INSERT INTO batch_keyspace.batch_table (id, lastname, firstname) VALUES (2, 'text1', 'text2'); -INSERT INTO batch_keyspace.batch_table (id, lastname, firstname) VALUES (3, 'text1', 'text2'); -APPLY BATCH;"; + INSERT INTO batch_keyspace.batch_table (id, lastname, firstname) VALUES (2, 'text1', 'text2'); + INSERT INTO batch_keyspace.batch_table (id, lastname, firstname) VALUES (3, 'text1', 'text2'); + APPLY BATCH;"; run_query(connection, insert_statement).await; assert_query_result( diff --git a/shotover-proxy/tests/cassandra_int_tests/functions.rs b/shotover-proxy/tests/cassandra_int_tests/functions.rs index bad04dffb..599a76a8f 100644 --- a/shotover-proxy/tests/cassandra_int_tests/functions.rs +++ b/shotover-proxy/tests/cassandra_int_tests/functions.rs @@ -1,40 +1,29 @@ use crate::helpers::cassandra::{assert_query_result, run_query, CassandraConnection, ResultValue}; async fn drop_function(session: &CassandraConnection) { - assert_query_result( - session, - "SELECT test_function_keyspace.my_function(x, y) FROM test_function_keyspace.test_function_table WHERE id=1;", - &[&[ResultValue::Int(4)]] - ).await; + assert_query_result(session, "SELECT test_function_keyspace.my_function(x, y) FROM test_function_keyspace.test_function_table WHERE id=1;", &[&[ResultValue::Int(4)]]).await; run_query(session, "DROP FUNCTION test_function_keyspace.my_function").await; - session.execute_expect_err_contains( - "SELECT test_function_keyspace.my_function(x) FROM test_function_keyspace.test_function_table WHERE id=1;", - "Unknown function 'test_function_keyspace.my_function'", + let statement = "SELECT test_function_keyspace.my_function(x) FROM test_function_keyspace.test_function_table WHERE id=1;"; + let result = session.execute_expect_err(statement); + + assert_eq!( + result, + "Unknown function 'test_function_keyspace.my_function'" ); } async fn create_function(session: &CassandraConnection) { - run_query( - session, - "CREATE FUNCTION test_function_keyspace.my_function (a int, b int) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE javascript AS 'a * b';", - ).await; - assert_query_result( - session, - "SELECT test_function_keyspace.my_function(x, y) FROM test_function_keyspace.test_function_table;", - &[&[ResultValue::Int(4)], &[ResultValue::Int(9)], &[ResultValue::Int(16)]] - ).await; + run_query(session,"CREATE FUNCTION test_function_keyspace.my_function (a int, b int) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE javascript AS 'a * b';").await; + assert_query_result(session, "SELECT test_function_keyspace.my_function(x, y) FROM test_function_keyspace.test_function_table;",&[&[ResultValue::Int(4)], &[ResultValue::Int(9)], &[ResultValue::Int(16)]]).await; } pub async fn test(session: &CassandraConnection) { + run_query(session, "CREATE KEYSPACE test_function_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").await; run_query( - session, - "CREATE KEYSPACE test_function_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };" - ).await; - run_query( - session, - "CREATE TABLE test_function_keyspace.test_function_table (id int PRIMARY KEY, x int, y int);", - ).await; + session, + "CREATE TABLE test_function_keyspace.test_function_table (id int PRIMARY KEY, x int, y int);", + ).await; run_query( session, r#"BEGIN BATCH diff --git a/shotover-proxy/tests/cassandra_int_tests/mod.rs b/shotover-proxy/tests/cassandra_int_tests/mod.rs index e3e9bbb56..943017d04 100644 --- a/shotover-proxy/tests/cassandra_int_tests/mod.rs +++ b/shotover-proxy/tests/cassandra_int_tests/mod.rs @@ -1,61 +1,71 @@ -use crate::helpers::cassandra::{assert_query_result, run_query, ResultValue}; +#[cfg(feature = "cpp-driver-tests")] +use crate::helpers::cassandra::{ + assert_query_result, run_query, CassandraDriver::Datastax, ResultValue, +}; +use crate::helpers::cassandra::{CassandraDriver, CassandraDriver::CdrsTokio}; use crate::helpers::ShotoverManager; -use cassandra_cpp::{stmt, Batch, BatchType, Error, ErrorKind}; -use cdrs_tokio::authenticators::StaticPasswordAuthenticatorProvider; -use cdrs_tokio::cluster::session::{SessionBuilder, TcpSessionBuilder}; -use cdrs_tokio::cluster::NodeTcpConfigBuilder; +#[cfg(feature = "cpp-driver-tests")] +use cassandra_cpp::{Error, ErrorKind}; use cdrs_tokio::frame::events::{ SchemaChange, SchemaChangeOptions, SchemaChangeTarget, SchemaChangeType, ServerEvent, }; -use cdrs_tokio::load_balancing::RoundRobinLoadBalancingStrategy; +#[cfg(feature = "cpp-driver-tests")] use futures::future::{join_all, try_join_all}; use metrics_util::debugging::DebuggingRecorder; +use rstest::rstest; use serial_test::serial; -use std::sync::Arc; use test_helpers::docker_compose::DockerCompose; use tokio::time::{sleep, timeout, Duration}; mod batch_statements; mod cache; +#[cfg(feature = "cpp-driver-tests")] #[cfg(feature = "alpha-transforms")] mod cluster; +#[cfg(feature = "cpp-driver-tests")] #[cfg(feature = "alpha-transforms")] mod cluster_multi_rack; -mod collections; +//mod collections; mod functions; mod keyspace; mod native_types; mod prepared_statements; +#[cfg(feature = "cpp-driver-tests")] #[cfg(feature = "alpha-transforms")] mod protect; mod table; mod udt; +#[rstest] +#[case(CdrsTokio)] +#[cfg_attr(feature = "cpp-driver-tests", case(Datastax))] #[tokio::test(flavor = "multi_thread")] #[serial] -async fn test_passthrough() { +async fn test_passthrough(#[case] driver: CassandraDriver) { let _compose = DockerCompose::new("example-configs/cassandra-passthrough/docker-compose.yml"); let shotover_manager = ShotoverManager::from_topology_file("example-configs/cassandra-passthrough/topology.yaml"); - let connection = shotover_manager - .cassandra_connection("127.0.0.1", 9042) - .await; + let connection = shotover_manager.cassandra_connection("127.0.0.1", 9042, driver); keyspace::test(&connection).await; table::test(&connection).await; udt::test(&connection).await; native_types::test(&connection).await; - collections::test(&connection).await; + //collections::test(&connection).await; functions::test(&connection).await; prepared_statements::test(&connection).await; batch_statements::test(&connection).await; } +#[cfg(feature = "cpp-driver-tests")] +#[rstest] +//#[case(CdrsTokio)] // TODO +#[cfg_attr(feature = "cpp-driver-tests", case(Datastax))] #[tokio::test(flavor = "multi_thread")] #[serial] -async fn test_source_tls_and_single_tls() { +async fn test_source_tls_and_single_tls(#[case] driver: CassandraDriver) { test_helpers::cert::generate_cassandra_test_certs(); let _compose = DockerCompose::new("example-configs/cassandra-tls/docker-compose.yml"); @@ -67,7 +77,7 @@ async fn test_source_tls_and_single_tls() { { // Run a quick test straight to Cassandra to check our assumptions that Shotover and Cassandra TLS are behaving exactly the same let direct_connection = - shotover_manager.cassandra_connection_tls("127.0.0.1", 9042, ca_cert); + shotover_manager.cassandra_connection_tls("127.0.0.1", 9042, ca_cert, driver); assert_query_result( &direct_connection, "SELECT bootstrapped FROM system.local", @@ -76,48 +86,49 @@ async fn test_source_tls_and_single_tls() { .await; } - let connection = shotover_manager.cassandra_connection_tls("127.0.0.1", 9043, ca_cert); + let connection = shotover_manager.cassandra_connection_tls("127.0.0.1", 9043, ca_cert, driver); keyspace::test(&connection).await; table::test(&connection).await; udt::test(&connection).await; native_types::test(&connection).await; - collections::test(&connection).await; + // collections::test(&connection); functions::test(&connection).await; prepared_statements::test(&connection).await; batch_statements::test(&connection).await; } +#[cfg(feature = "cpp-driver-tests")] +#[cfg(feature = "alpha-transforms")] +#[rstest] +//#[case(CdrsTokio)] // TODO +#[cfg_attr(feature = "cpp-driver-tests", case(Datastax))] #[tokio::test(flavor = "multi_thread")] #[serial] -#[cfg(feature = "alpha-transforms")] -async fn test_cluster_single_rack() { +async fn test_cluster_single_rack(#[case] driver: CassandraDriver) { let _compose = DockerCompose::new("example-configs/cassandra-cluster/docker-compose.yml"); { let shotover_manager = ShotoverManager::from_topology_file("example-configs/cassandra-cluster/topology.yaml"); - let mut connection1 = shotover_manager - .cassandra_connection("127.0.0.1", 9042) - .await; + let mut connection1 = shotover_manager.cassandra_connection("127.0.0.1", 9042, driver); connection1 .enable_schema_awaiter("172.16.1.2:9042", None) .await; + keyspace::test(&connection1).await; table::test(&connection1).await; udt::test(&connection1).await; native_types::test(&connection1).await; - collections::test(&connection1).await; + // collections::test(&connection); functions::test(&connection1).await; prepared_statements::test(&connection1).await; batch_statements::test(&connection1).await; cluster::test(&connection1).await; //Check for bugs in cross connection state - let mut connection2 = shotover_manager - .cassandra_connection("127.0.0.1", 9042) - .await; + let mut connection2 = shotover_manager.cassandra_connection("127.0.0.1", 9042, driver); connection2 .enable_schema_awaiter("172.16.1.2:9042", None) .await; @@ -127,10 +138,14 @@ async fn test_cluster_single_rack() { cluster::test_topology_task(None).await; } +#[cfg(feature = "cpp-driver-tests")] +#[rstest] +//#[case(CdrsTokio)] // TODO +#[cfg_attr(feature = "cpp-driver-tests", case(Datastax))] #[tokio::test(flavor = "multi_thread")] #[serial] #[cfg(feature = "alpha-transforms")] -async fn test_cluster_multi_rack() { +async fn test_cluster_multi_rack(#[case] driver: CassandraDriver) { let _compose = DockerCompose::new("example-configs/cassandra-cluster-multi-rack/docker-compose.yml"); @@ -145,9 +160,8 @@ async fn test_cluster_multi_rack() { "example-configs/cassandra-cluster-multi-rack/topology_rack3.yaml", ); - let mut connection1 = shotover_manager_rack1 - .cassandra_connection("127.0.0.1", 9042) - .await; + let mut connection1 = + shotover_manager_rack1.cassandra_connection("127.0.0.1", 9042, driver); connection1 .enable_schema_awaiter("172.16.1.2:9042", None) .await; @@ -155,16 +169,15 @@ async fn test_cluster_multi_rack() { table::test(&connection1).await; udt::test(&connection1).await; native_types::test(&connection1).await; - collections::test(&connection1).await; + // collections::test(&connection1).await; functions::test(&connection1).await; prepared_statements::test(&connection1).await; batch_statements::test(&connection1).await; cluster_multi_rack::test(&connection1).await; //Check for bugs in cross connection state - let mut connection2 = shotover_manager_rack1 - .cassandra_connection("127.0.0.1", 9042) - .await; + let mut connection2 = + shotover_manager_rack1.cassandra_connection("127.0.0.1", 9042, driver); connection2 .enable_schema_awaiter("172.16.1.2:9042", None) .await; @@ -174,52 +187,55 @@ async fn test_cluster_multi_rack() { cluster_multi_rack::test_topology_task(None).await; } +#[cfg(feature = "cpp-driver-tests")] +#[rstest] +//#[case(CdrsTokio)] // TODO +#[cfg_attr(feature = "cpp-driver-tests", case(Datastax))] #[tokio::test(flavor = "multi_thread")] #[serial] -#[cfg(feature = "alpha-transforms")] -async fn test_source_tls_and_cluster_tls() { +async fn test_source_tls_and_cluster_tls(#[case] driver: CassandraDriver) { test_helpers::cert::generate_cassandra_test_certs(); - let ca_cert = "example-configs/cassandra-tls/certs/localhost_CA.crt"; let _compose = DockerCompose::new("example-configs/cassandra-cluster-tls/docker-compose.yml"); - { - let shotover_manager = ShotoverManager::from_topology_file( - "example-configs/cassandra-cluster-tls/topology.yaml", - ); - { - // Run a quick test straight to Cassandra to check our assumptions that Shotover and Cassandra TLS are behaving exactly the same - let direct_connection = - shotover_manager.cassandra_connection_tls("172.16.1.2", 9042, ca_cert); - assert_query_result( - &direct_connection, - "SELECT bootstrapped FROM system.local", - &[&[ResultValue::Varchar("COMPLETED".into())]], - ) - .await; - } + let shotover_manager = + ShotoverManager::from_topology_file("example-configs/cassandra-cluster-tls/topology.yaml"); - let mut connection = shotover_manager.cassandra_connection_tls("127.0.0.1", 9042, ca_cert); - connection - .enable_schema_awaiter("172.16.1.2:9042", Some(ca_cert)) - .await; + let ca_cert = "example-configs/cassandra-tls/certs/localhost_CA.crt"; - keyspace::test(&connection).await; - table::test(&connection).await; - udt::test(&connection).await; - native_types::test(&connection).await; - functions::test(&connection).await; - collections::test(&connection).await; - prepared_statements::test(&connection).await; - batch_statements::test(&connection).await; - cluster::test(&connection).await; + { + // Run a quick test straight to Cassandra to check our assumptions that Shotover and Cassandra TLS are behaving exactly the same + let direct_connection = + shotover_manager.cassandra_connection_tls("172.16.1.2", 9042, ca_cert, driver); + assert_query_result( + &direct_connection, + "SELECT bootstrapped FROM system.local", + &[&[ResultValue::Varchar("COMPLETED".into())]], + ) + .await; } - cluster::test_topology_task(Some(ca_cert)).await; + let mut connection = + shotover_manager.cassandra_connection_tls("127.0.0.1", 9042, ca_cert, driver); + connection + .enable_schema_awaiter("172.16.1.2:9042", Some(ca_cert)) + .await; + + keyspace::test(&connection).await; + table::test(&connection).await; + udt::test(&connection).await; + native_types::test(&connection).await; + // collections::test(&connection); + functions::test(&connection).await; + prepared_statements::test(&connection).await; + batch_statements::test(&connection).await; } +#[rstest] +#[case(CdrsTokio)] +#[cfg_attr(feature = "cpp-driver-tests", case(Datastax))] #[tokio::test(flavor = "multi_thread")] #[serial] -async fn test_cassandra_redis_cache() { +async fn test_cassandra_redis_cache(#[case] driver: CassandraDriver) { let recorder = DebuggingRecorder::new(); let snapshotter = recorder.snapshotter(); recorder.install().unwrap(); @@ -230,9 +246,7 @@ async fn test_cassandra_redis_cache() { ); let mut redis_connection = shotover_manager.redis_connection(6379); - let connection = shotover_manager - .cassandra_connection("127.0.0.1", 9042) - .await; + let connection = shotover_manager.cassandra_connection("127.0.0.1", 9042, driver); keyspace::test(&connection).await; table::test(&connection).await; @@ -243,63 +257,67 @@ async fn test_cassandra_redis_cache() { cache::test(&connection, &mut redis_connection, &snapshotter).await; } +#[cfg(feature = "cpp-driver-tests")] +#[cfg(feature = "alpha-transforms")] +#[rstest] +// #[case(CdrsTokio)] // TODO +#[cfg_attr(feature = "cpp-driver-tests", case(Datastax))] #[tokio::test(flavor = "multi_thread")] #[serial] -#[cfg(feature = "alpha-transforms")] -async fn test_cassandra_protect_transform_local() { +async fn test_cassandra_protect_transform_local(#[case] driver: CassandraDriver) { let _compose = DockerCompose::new("example-configs/cassandra-protect-local/docker-compose.yml"); let shotover_manager = ShotoverManager::from_topology_file( "example-configs/cassandra-protect-local/topology.yaml", ); - let shotover_connection = shotover_manager - .cassandra_connection("127.0.0.1", 9042) - .await; - let direct_connection = shotover_manager - .cassandra_connection("127.0.0.1", 9043) - .await; + let shotover_connection = shotover_manager.cassandra_connection("127.0.0.1", 9042, driver); + let direct_connection = shotover_manager.cassandra_connection("127.0.0.1", 9043, driver); keyspace::test(&shotover_connection).await; table::test(&shotover_connection).await; udt::test(&shotover_connection).await; native_types::test(&shotover_connection).await; - collections::test(&shotover_connection).await; + // collections::test(&shotover_connection); functions::test(&shotover_connection).await; batch_statements::test(&shotover_connection).await; protect::test(&shotover_connection, &direct_connection).await; } +#[cfg(feature = "cpp-driver-tests")] +#[cfg(feature = "alpha-transforms")] +#[rstest] +//#[case(CdrsTokio)] // TODO +#[cfg_attr(feature = "cpp-driver-tests", case(Datastax))] #[tokio::test(flavor = "multi_thread")] #[serial] -#[cfg(feature = "alpha-transforms")] -async fn test_cassandra_protect_transform_aws() { +async fn test_cassandra_protect_transform_aws(#[case] driver: CassandraDriver) { let _compose = DockerCompose::new("example-configs/cassandra-protect-aws/docker-compose.yml"); let _compose_aws = DockerCompose::new_moto(); let shotover_manager = ShotoverManager::from_topology_file("example-configs/cassandra-protect-aws/topology.yaml"); - let shotover_connection = shotover_manager - .cassandra_connection("127.0.0.1", 9042) - .await; - let direct_connection = shotover_manager - .cassandra_connection("127.0.0.1", 9043) - .await; + let shotover_connection = shotover_manager.cassandra_connection("127.0.0.1", 9042, driver); + let direct_connection = shotover_manager.cassandra_connection("127.0.0.1", 9043, driver); keyspace::test(&shotover_connection).await; table::test(&shotover_connection).await; udt::test(&shotover_connection).await; native_types::test(&shotover_connection).await; - collections::test(&shotover_connection).await; + // collections::test(&shotover_connection); functions::test(&shotover_connection).await; batch_statements::test(&shotover_connection).await; protect::test(&shotover_connection, &direct_connection).await; } +#[cfg(feature = "cpp-driver-tests")] +#[rstest] +//#[case(CdrsTokio)] // TODO +#[cfg_attr(feature = "cpp-driver-tests", case(Datastax))] #[tokio::test(flavor = "multi_thread")] #[serial] -async fn test_cassandra_peers_rewrite_cassandra4() { +async fn test_cassandra_peers_rewrite_cassandra4(#[case] driver: CassandraDriver) { let _docker_compose = DockerCompose::new( "tests/test-configs/cassandra-peers-rewrite/docker-compose-4.0-cassandra.yaml", ); @@ -308,16 +326,10 @@ async fn test_cassandra_peers_rewrite_cassandra4() { "tests/test-configs/cassandra-peers-rewrite/topology.yaml", ); - let normal_connection = shotover_manager - .cassandra_connection("127.0.0.1", 9043) - .await; - - let rewrite_port_connection = shotover_manager - .cassandra_connection("127.0.0.1", 9044) - .await; + let normal_connection = shotover_manager.cassandra_connection("127.0.0.1", 9043, driver); - // run some basic tests to confirm it works as normal - table::test(&normal_connection).await; + let rewrite_port_connection = shotover_manager.cassandra_connection("127.0.0.1", 9044, driver); + table::test(&rewrite_port_connection).await; // run some basic tests to confirm it works as normal { assert_query_result( @@ -392,9 +404,13 @@ async fn test_cassandra_peers_rewrite_cassandra4() { } } +#[cfg(feature = "cpp-driver-tests")] +#[rstest] +//#[case(CdrsTokio)] // TODO +#[cfg_attr(feature = "cpp-driver-tests", case(Datastax))] #[tokio::test(flavor = "multi_thread")] #[serial] -async fn test_cassandra_peers_rewrite_cassandra3() { +async fn test_cassandra_peers_rewrite_cassandra3(#[case] driver: CassandraDriver) { let _docker_compose = DockerCompose::new( "tests/test-configs/cassandra-peers-rewrite/docker-compose-3.11-cassandra.yaml", ); @@ -403,41 +419,32 @@ async fn test_cassandra_peers_rewrite_cassandra3() { "tests/test-configs/cassandra-peers-rewrite/topology.yaml", ); - let connection = shotover_manager - .cassandra_connection("127.0.0.1", 9044) - .await; - // run some basic tests to confirm it works as normal - table::test(&connection).await; + let connection = shotover_manager.cassandra_connection("127.0.0.1", 9044, driver); + table::test(&connection).await; // run some basic tests to confirm it works as normal // Assert that the error cassandra gives because system.peers_v2 does not exist on cassandra v3 // is passed through shotover unchanged. let statement = "SELECT data_center, native_port, rack FROM system.peers_v2;"; let result = connection.execute_expect_err(statement); - assert!(matches!( - result, - Error( - ErrorKind::CassErrorResult(cassandra_cpp::CassErrorCode::SERVER_INVALID_QUERY, ..), - _ - ) - )); + assert_eq!(result, "unconfigured table peers_v2"); } +#[cfg(feature = "cpp-driver-tests")] +#[rstest] +//#[case(CdrsTokio)] // TODO +#[cfg_attr(feature = "cpp-driver-tests", case(Datastax))] #[tokio::test(flavor = "multi_thread")] #[serial] -async fn test_cassandra_request_throttling() { +async fn test_cassandra_request_throttling(#[case] driver: CassandraDriver) { let _docker_compose = DockerCompose::new("example-configs/cassandra-passthrough/docker-compose.yml"); let shotover_manager = ShotoverManager::from_topology_file("tests/test-configs/cassandra-request-throttling.yaml"); - let connection = shotover_manager - .cassandra_connection("127.0.0.1", 9042) - .await; + let connection = shotover_manager.cassandra_connection("127.0.0.1", 9042, driver); std::thread::sleep(std::time::Duration::from_secs(1)); // sleep to reset the window and not trigger the rate limiter with client's startup reqeusts - let connection_2 = shotover_manager - .cassandra_connection("127.0.0.1", 9042) - .await; + let connection_2 = shotover_manager.cassandra_connection("127.0.0.1", 9042, driver); std::thread::sleep(std::time::Duration::from_secs(1)); // sleep to reset the window again let statement = "SELECT * FROM system.peers"; @@ -489,31 +496,23 @@ async fn test_cassandra_request_throttling() { // this batch set should be allowed through { - let mut batch = Batch::new(BatchType::LOGGED); + let mut queries: Vec<(String, i32)> = vec![]; for i in 0..25 { - let statement = format!("INSERT INTO test_keyspace.my_table (id, lastname, firstname) VALUES ({}, 'text', 'text')", i); - batch.add_statement(&stmt!(statement.as_str())).unwrap(); + queries.push(("INSERT INTO test_keyspace.my_table (id, lastname, firstname) VALUES (?, 'text', 'text')".into(), i)); } - connection.execute_batch(&batch); + connection.execute_batch(queries); } std::thread::sleep(std::time::Duration::from_secs(1)); // sleep to reset the window // this batch set should not be allowed through { - let mut batch = Batch::new(BatchType::LOGGED); + let mut queries: Vec<(String, i32)> = vec![]; for i in 0..60 { - let statement = format!("INSERT INTO test_keyspace.my_table (id, lastname, firstname) VALUES ({}, 'text', 'text')", i); - batch.add_statement(&stmt!(statement.as_str())).unwrap(); + queries.push(("INSERT INTO test_keyspace.my_table (id, lastname, firstname) VALUES (?, 'text', 'text')".into(), i)); } - let result = connection.execute_batch_expect_err(&batch); - assert!(matches!( - result, - Error( - ErrorKind::CassErrorResult(cassandra_cpp::CassErrorCode::SERVER_OVERLOADED, ..), - .. - ) - )); + let result = connection.execute_batch_expect_err(queries); + assert_eq!(result, "Server overloaded".to_string()); } std::thread::sleep(std::time::Duration::from_secs(1)); // sleep to reset the window @@ -521,35 +520,25 @@ async fn test_cassandra_request_throttling() { batch_statements::test(&connection).await; } +#[rstest] +#[case(CdrsTokio)] #[tokio::test(flavor = "multi_thread")] #[serial] -async fn test_events_keyspace() { +async fn test_events_keyspace(#[case] driver: CassandraDriver) { let _docker_compose = DockerCompose::new("example-configs/cassandra-passthrough/docker-compose.yml"); - let _shotover_manager = + let shotover_manager = ShotoverManager::from_topology_file("example-configs/cassandra-passthrough/topology.yaml"); - let user = "cassandra"; - let password = "cassandra"; - let auth = StaticPasswordAuthenticatorProvider::new(&user, &password); - let config = NodeTcpConfigBuilder::new() - .with_contact_point("127.0.0.1:9042".into()) - .with_authenticator_provider(Arc::new(auth)) - .build() - .await - .unwrap(); - - let session = TcpSessionBuilder::new(RoundRobinLoadBalancingStrategy::new(), config) - .build() - .unwrap(); + let connection = shotover_manager.cassandra_connection("127.0.0.1", 9042, driver); - let mut event_recv = session.create_event_receiver(); + let mut event_recv = connection.as_cdrs().create_event_receiver(); sleep(Duration::from_secs(10)).await; // let the driver finish connecting to the cluster and registering for the events let create_ks = "CREATE KEYSPACE IF NOT EXISTS test_events_ks WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };"; - session.query(create_ks).await.unwrap(); + connection.execute(create_ks).await; let event = timeout(Duration::from_secs(10), event_recv.recv()) .await diff --git a/shotover-proxy/tests/cassandra_int_tests/native_types.rs b/shotover-proxy/tests/cassandra_int_tests/native_types.rs index b16e28234..aecbffc7f 100644 --- a/shotover-proxy/tests/cassandra_int_tests/native_types.rs +++ b/shotover-proxy/tests/cassandra_int_tests/native_types.rs @@ -30,7 +30,7 @@ async fn select(session: &CassandraConnection) { ResultValue::VarInt(vec![3, 5, 233]), ]], ) - .await + .await; } async fn insert(session: &CassandraConnection) { diff --git a/shotover-proxy/tests/cassandra_int_tests/prepared_statements.rs b/shotover-proxy/tests/cassandra_int_tests/prepared_statements.rs index d1ab4c920..44e300138 100644 --- a/shotover-proxy/tests/cassandra_int_tests/prepared_statements.rs +++ b/shotover-proxy/tests/cassandra_int_tests/prepared_statements.rs @@ -5,10 +5,8 @@ use crate::helpers::cassandra::{ async fn delete(session: &CassandraConnection) { let prepared = session.prepare("DELETE FROM test_prepare_statements.table_1 WHERE id = ?;"); - let mut statement = prepared.bind(); - statement.bind_int32(0, 1).unwrap(); assert_eq!( - session.execute_prepared(&statement), + session.execute_prepared(&prepared, 1), Vec::>::new() ); @@ -20,66 +18,42 @@ async fn delete(session: &CassandraConnection) { .await; } -async fn insert(session: &CassandraConnection) { - let prepared = session - .prepare("INSERT INTO test_prepare_statements.table_1 (id, x, name) VALUES (?, ?, ?);"); +fn insert(session: &CassandraConnection) { + let prepared = session.prepare("INSERT INTO test_prepare_statements.table_1 (id) VALUES (?);"); - let mut statement = prepared.bind(); - statement.bind_int32(0, 1).unwrap(); - statement.bind_int32(1, 11).unwrap(); - statement.bind_string(2, "foo").unwrap(); assert_eq!( - session.execute_prepared(&statement), + session.execute_prepared(&prepared, 1), Vec::>::new() ); - statement = prepared.bind(); - statement.bind_int32(0, 2).unwrap(); - statement.bind_int32(1, 12).unwrap(); - statement.bind_string(2, "bar").unwrap(); assert_eq!( - session.execute_prepared(&statement), + session.execute_prepared(&prepared, 2), Vec::>::new() ); - statement = prepared.bind(); - statement.bind_int32(0, 2).unwrap(); - statement.bind_int32(1, 13).unwrap(); - statement.bind_string(2, "baz").unwrap(); assert_eq!( - session.execute_prepared(&statement), + session.execute_prepared(&prepared, 2), Vec::>::new() ); } -async fn select(session: &CassandraConnection) { - let prepared = - session.prepare("SELECT id, x, name FROM test_prepare_statements.table_1 WHERE id = ?"); +fn select(session: &CassandraConnection) { + let prepared = session.prepare("SELECT id FROM test_prepare_statements.table_1 WHERE id = ?"); - let mut statement = prepared.bind(); - statement.bind_int32(0, 1).unwrap(); + let result_rows = session.execute_prepared(&prepared, 1); - let result_rows = session.execute_prepared(&statement); - - assert_rows( - result_rows, - &[&[ - ResultValue::Int(1), - ResultValue::Int(11), - ResultValue::Varchar("foo".into()), - ]], - ); + assert_rows(result_rows, &[&[ResultValue::Int(1)]]); } pub async fn test(session: &CassandraConnection) { run_query(session, "CREATE KEYSPACE test_prepare_statements WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").await; run_query( session, - "CREATE TABLE test_prepare_statements.table_1 (id int PRIMARY KEY, x int, name varchar);", + "CREATE TABLE test_prepare_statements.table_1 (id int PRIMARY KEY);", ) .await; - insert(session).await; - select(session).await; + insert(session); + select(session); delete(session).await; } diff --git a/shotover-proxy/tests/cassandra_int_tests/protect.rs b/shotover-proxy/tests/cassandra_int_tests/protect.rs index d6dcf4a8d..36257f97f 100644 --- a/shotover-proxy/tests/cassandra_int_tests/protect.rs +++ b/shotover-proxy/tests/cassandra_int_tests/protect.rs @@ -1,5 +1,4 @@ use crate::helpers::cassandra::{assert_query_result, run_query, CassandraConnection, ResultValue}; -use cassandra_cpp::{stmt, Batch, BatchType}; use chacha20poly1305::Nonce; use serde::Deserialize; @@ -13,29 +12,19 @@ pub struct Protected { pub async fn test(shotover_session: &CassandraConnection, direct_session: &CassandraConnection) { run_query(shotover_session, "CREATE KEYSPACE test_protect_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").await; - run_query( - shotover_session, - "CREATE TABLE test_protect_keyspace.test_table (pk varchar PRIMARY KEY, cluster varchar, col1 blob, col2 int, col3 boolean);", - ).await; + run_query(shotover_session, "CREATE TABLE test_protect_keyspace.test_table (pk varchar PRIMARY KEY, cluster varchar, col1 blob, col2 int, col3 boolean);").await; - run_query( - shotover_session, - "INSERT INTO test_protect_keyspace.test_table (pk, cluster, col1, col2, col3) VALUES ('pk1', 'cluster', 'I am gonna get encrypted!!', 42, true);" - ).await; + run_query(shotover_session,"INSERT INTO test_protect_keyspace.test_table (pk, cluster, col1, col2, col3) VALUES ('pk1', 'cluster', 'I am gonna get encrypted!!', 0, true);").await; - let mut batch = Batch::new(BatchType::LOGGED); - batch.add_statement(&stmt!( - "INSERT INTO test_protect_keyspace.test_table (pk, cluster, col1, col2, col3) VALUES ('pk2', 'cluster', 'encrypted2', 422, true)" - )).unwrap(); - batch.add_statement(&stmt!( - "INSERT INTO test_protect_keyspace.test_table (pk, cluster, col1, col2, col3) VALUES ('pk3', 'cluster', 'encrypted3', 423, false)" - )).unwrap(); - shotover_session.execute_batch(&batch); + shotover_session.execute_batch(vec![ + ("INSERT INTO test_protect_keyspace.test_table (pk, cluster, col1, col2, col3) VALUES ('pk2', 'cluster', 'encrypted2', ?, true)".into(), 1), + ("INSERT INTO test_protect_keyspace.test_table (pk, cluster, col1, col2, col3) VALUES ('pk3', 'cluster', 'encrypted3', ?, false)".into(), 2) + ]); let insert_statement = "BEGIN BATCH -INSERT INTO test_protect_keyspace.test_table (pk, cluster, col1, col2, col3) VALUES ('pk4', 'cluster', 'encrypted4', 424, true); -INSERT INTO test_protect_keyspace.test_table (pk, cluster, col1, col2, col3) VALUES ('pk5', 'cluster', 'encrypted5', 425, false); -APPLY BATCH;"; + INSERT INTO test_protect_keyspace.test_table (pk, cluster, col1, col2, col3) VALUES ('pk4', 'cluster', 'encrypted4', 3, true); + INSERT INTO test_protect_keyspace.test_table (pk, cluster, col1, col2, col3) VALUES ('pk5', 'cluster', 'encrypted5', 4, false); + APPLY BATCH;"; run_query(shotover_session, insert_statement).await; // assert that data is decrypted by shotover @@ -47,35 +36,35 @@ APPLY BATCH;"; ResultValue::Varchar("pk1".into()), ResultValue::Varchar("cluster".into()), ResultValue::Blob("I am gonna get encrypted!!".into()), - ResultValue::Int(42), + ResultValue::Int(0), ResultValue::Boolean(true), ], &[ ResultValue::Varchar("pk2".into()), ResultValue::Varchar("cluster".into()), ResultValue::Blob("encrypted2".into()), - ResultValue::Int(422), + ResultValue::Int(1), ResultValue::Boolean(true), ], &[ ResultValue::Varchar("pk3".into()), ResultValue::Varchar("cluster".into()), ResultValue::Blob("encrypted3".into()), - ResultValue::Int(423), + ResultValue::Int(2), ResultValue::Boolean(false), ], &[ ResultValue::Varchar("pk4".into()), ResultValue::Varchar("cluster".into()), ResultValue::Blob("encrypted4".into()), - ResultValue::Int(424), + ResultValue::Int(3), ResultValue::Boolean(true), ], &[ ResultValue::Varchar("pk5".into()), ResultValue::Varchar("cluster".into()), ResultValue::Blob("encrypted5".into()), - ResultValue::Int(425), + ResultValue::Int(4), ResultValue::Boolean(false), ], ], @@ -87,6 +76,7 @@ APPLY BATCH;"; .execute("SELECT pk, cluster, col1, col2, col3 FROM test_protect_keyspace.test_table") .await; assert_eq!(result.len(), 5); + for row in result { assert_eq!(row.len(), 5); @@ -97,7 +87,7 @@ APPLY BATCH;"; if let ResultValue::Blob(value) = &row[2] { let _: Protected = bincode::deserialize(value).unwrap(); } else { - panic!("expected 3rd column to be ResultValue::Varchar in {row:?}"); + panic!("expected 3rd column to be ResultValue::Blob in {row:?}"); } } } diff --git a/shotover-proxy/tests/examples/mod.rs b/shotover-proxy/tests/examples/mod.rs index eca2dc0a6..c5952f4f6 100644 --- a/shotover-proxy/tests/examples/mod.rs +++ b/shotover-proxy/tests/examples/mod.rs @@ -1,14 +1,17 @@ -use crate::helpers::cassandra::{assert_query_result, CassandraConnection, ResultValue}; +use crate::helpers::cassandra::{ + assert_query_result, CassandraConnection, CassandraDriver, ResultValue, +}; use serial_test::serial; use test_helpers::docker_compose::DockerCompose; +#[cfg(feature = "cpp-driver-tests")] #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_cassandra_rewrite_peers_example() { let _docker_compose = DockerCompose::new("example-configs-docker/cassandra-peers-rewrite/docker-compose.yml"); - let connection = CassandraConnection::new("172.16.1.2", 9043).await; + let connection = CassandraConnection::new("172.16.1.2", 9043, CassandraDriver::Datastax); assert_query_result( &connection, diff --git a/shotover-proxy/tests/helpers/cassandra.rs b/shotover-proxy/tests/helpers/cassandra.rs index 089a1f930..530ae2531 100644 --- a/shotover-proxy/tests/helpers/cassandra.rs +++ b/shotover-proxy/tests/helpers/cassandra.rs @@ -1,36 +1,195 @@ -use cassandra_cpp::Error as CassandraError; +#[cfg(feature = "cpp-driver-tests")] use cassandra_cpp::{ - stmt, Batch, CassFuture, CassResult, Cluster, Error, PreparedStatement, Session, Statement, - Value, ValueType, + stmt, Batch, BatchType, CassFuture, CassResult, Cluster, Error, ErrorKind, PreparedStatement, + Session as DatastaxSession, Ssl, Value, ValueType, +}; +use cassandra_protocol::types::cassandra_type::{wrapper_fn, CassandraType}; +use cdrs_tokio::{ + authenticators::StaticPasswordAuthenticatorProvider, + cluster::session::{Session as CdrsTokioSession, SessionBuilder, TcpSessionBuilder}, + cluster::{NodeTcpConfigBuilder, TcpConnectionManager}, + frame::{ + message_response::ResponseBody, message_result::ResResultBody, Envelope, Serialize, Version, + }, + load_balancing::RoundRobinLoadBalancingStrategy, + query::{BatchQueryBuilder, PreparedQuery as CdrsTokioPreparedQuery}, + query_values, + transport::TransportTcp, + types::prelude::Error as CdrsError, }; use openssl::ssl::{SslContext, SslMethod}; use ordered_float::OrderedFloat; use scylla::{Session as SessionScylla, SessionBuilder as SessionBuilderScylla}; +#[cfg(feature = "cpp-driver-tests")] +use std::fs::read_to_string; +use std::sync::Arc; + +#[derive(Debug)] +pub enum PreparedQuery { + #[cfg(feature = "cpp-driver-tests")] + Datastax(PreparedStatement), + CdrsTokio(CdrsTokioPreparedQuery), +} + +impl PreparedQuery { + #[cfg(feature = "cpp-driver-tests")] + fn as_datastax(&self) -> &PreparedStatement { + match self { + PreparedQuery::Datastax(p) => p, + _ => panic!("Not PreparedQuery::Datastax"), + } + } + + fn as_cdrs(&self) -> &CdrsTokioPreparedQuery { + match self { + PreparedQuery::CdrsTokio(p) => p, + #[cfg(feature = "cpp-driver-tests")] + _ => panic!("Not PreparedQuery::CdrsTokio"), + } + } +} + +#[allow(unused)] +#[derive(Copy, Clone)] +pub enum CassandraDriver { + #[cfg(feature = "cpp-driver-tests")] + Datastax, + CdrsTokio, +} + +type CdrsTokioSessionInstance = CdrsTokioSession< + TransportTcp, + TcpConnectionManager, + RoundRobinLoadBalancingStrategy, +>; pub enum CassandraConnection { + #[cfg(feature = "cpp-driver-tests")] Datastax { - session: Session, + session: DatastaxSession, + schema_awaiter: Option, + }, + CdrsTokio { + session: CdrsTokioSessionInstance, schema_awaiter: Option, }, } impl CassandraConnection { #[allow(unused)] - pub async fn new(contact_points: &str, port: u16) -> CassandraConnection { + pub fn new(contact_points: &str, port: u16, driver: CassandraDriver) -> Self { for contact_point in contact_points.split(',') { test_helpers::wait_for_socket_to_open(contact_point, port); } - let mut cluster = Cluster::default(); - cluster.set_contact_points(contact_points).unwrap(); - cluster.set_credentials("cassandra", "cassandra").unwrap(); - cluster.set_port(port).unwrap(); - cluster.set_load_balance_round_robin(); - - CassandraConnection::Datastax { - // By default unwrap uses the Debug formatter `{:?}` which is extremely noisy for the error type returned by `connect()`. - // So we instead force the Display formatter `{}` on the error. - session: cluster.connect().map_err(|err| format!("{err}")).unwrap(), - schema_awaiter: None, + + match driver { + #[cfg(feature = "cpp-driver-tests")] + CassandraDriver::Datastax => { + let mut cluster = Cluster::default(); + cluster.set_contact_points(contact_points).unwrap(); + cluster.set_credentials("cassandra", "cassandra").unwrap(); + cluster.set_port(port).unwrap(); + cluster.set_load_balance_round_robin(); + + CassandraConnection::Datastax { + // By default unwrap uses the Debug formatter `{:?}` which is extremely noisy for the error type returned by `connect()`. + // So we instead force the Display formatter `{}` on the error. + session: cluster.connect().map_err(|err| format!("{err}")).unwrap(), + schema_awaiter: None, + } + } + CassandraDriver::CdrsTokio => { + let user = "cassandra"; + let password = "cassandra"; + let auth = StaticPasswordAuthenticatorProvider::new(&user, &password); + let config = futures::executor::block_on( + NodeTcpConfigBuilder::new() + .with_contact_point("127.0.0.1:9042".into()) + .with_authenticator_provider(Arc::new(auth)) + .build(), + ) + .unwrap(); + + let session = + TcpSessionBuilder::new(RoundRobinLoadBalancingStrategy::new(), config) + .build() + .unwrap(); + CassandraConnection::CdrsTokio { + session, + schema_awaiter: None, + } + } + } + } + + #[allow(unused)] + pub fn as_cdrs(&self) -> &CdrsTokioSessionInstance { + match self { + Self::CdrsTokio { session, .. } => session, + _ => panic!("Not CdrsTokio"), + } + } + + #[cfg(feature = "cpp-driver-tests")] + #[allow(unused)] + pub fn as_datastax(&self) -> &DatastaxSession { + match self { + Self::Datastax { session, .. } => session, + _ => panic!("Not Datastax"), + } + } + + #[allow(unused)] + pub fn new_tls( + contact_points: &str, + port: u16, + ca_cert_path: &str, + driver: CassandraDriver, + ) -> Self { + match driver { + #[cfg(feature = "cpp-driver-tests")] + CassandraDriver::Datastax => { + let ca_cert = read_to_string(ca_cert_path).unwrap(); + let mut ssl = Ssl::default(); + Ssl::add_trusted_cert(&mut ssl, &ca_cert).unwrap(); + + for contact_point in contact_points.split(',') { + test_helpers::wait_for_socket_to_open(contact_point, port); + } + + let mut cluster = Cluster::default(); + cluster.set_credentials("cassandra", "cassandra").unwrap(); + cluster.set_contact_points(contact_points).unwrap(); + cluster.set_port(port).ok(); + cluster.set_load_balance_round_robin(); + cluster.set_ssl(&mut ssl); + + CassandraConnection::Datastax { + session: cluster.connect().unwrap(), + schema_awaiter: None, + } + } + CassandraDriver::CdrsTokio => { + let user = "cassandra"; + let password = "cassandra"; + let auth = StaticPasswordAuthenticatorProvider::new(&user, &password); + let config = futures::executor::block_on( + NodeTcpConfigBuilder::new() + .with_contact_point("127.0.0.1:9042".into()) + .with_authenticator_provider(Arc::new(auth)) + .build(), + ) + .unwrap(); + + let session = + TcpSessionBuilder::new(RoundRobinLoadBalancingStrategy::new(), config) + .build() + .unwrap(); + CassandraConnection::CdrsTokio { + session, + schema_awaiter: None, + } + } } } @@ -42,7 +201,19 @@ impl CassandraConnection { context.build() }); match self { - CassandraConnection::Datastax { schema_awaiter, .. } => { + #[cfg(feature = "cpp-driver-tests")] + Self::Datastax { schema_awaiter, .. } => { + *schema_awaiter = Some( + SessionBuilderScylla::new() + .known_node(direct_node) + .user("cassandra", "cassandra") + .ssl_context(context) + .build() + .await + .unwrap(), + ); + } + Self::CdrsTokio { schema_awaiter, .. } => { *schema_awaiter = Some( SessionBuilderScylla::new() .known_node(direct_node) @@ -56,62 +227,92 @@ impl CassandraConnection { } } + async fn await_schema_agreement(&self) { + match self { + #[cfg(feature = "cpp-driver-tests")] + Self::Datastax { schema_awaiter, .. } => { + if let Some(schema_awaiter) = schema_awaiter { + schema_awaiter.await_schema_agreement().await.unwrap(); + } + } + Self::CdrsTokio { schema_awaiter, .. } => { + if let Some(schema_awaiter) = schema_awaiter { + futures::executor::block_on(schema_awaiter.await_schema_agreement()).unwrap(); + } + } + } + } + #[allow(unused)] pub async fn execute(&self, query: &str) -> Vec> { let result = match self { - CassandraConnection::Datastax { session, .. } => { + #[cfg(feature = "cpp-driver-tests")] + Self::Datastax { session, .. } => { let statement = stmt!(query); match session.execute(&statement).wait() { Ok(result) => result .into_iter() - .map(|x| x.into_iter().map(ResultValue::new).collect()) + .map(|x| x.into_iter().map(ResultValue::new_from_cpp).collect()) .collect(), Err(Error(err, _)) => panic!("The CQL query: {query}\nFailed with: {err}"), } } + Self::CdrsTokio { session, .. } => { + let response = session.query(query).await.unwrap(); + Self::process_cdrs_response(response) + } }; let query = query.to_uppercase(); let query = query.trim(); - if query.starts_with("CREATE") || query.starts_with("ALTER") { - match self { - CassandraConnection::Datastax { - session, - schema_awaiter, - } => { - if let Some(schema_awaiter) = schema_awaiter { - schema_awaiter.await_schema_agreement().await; - } - } - } + if query.starts_with("CREATE") || query.starts_with("ALTER") || query.starts_with("DROP") { + self.await_schema_agreement().await; } result } #[allow(unused)] + #[cfg(feature = "cpp-driver-tests")] pub fn execute_async(&self, query: &str) -> CassFuture { match self { - CassandraConnection::Datastax { session, .. } => { + #[cfg(feature = "cpp-driver-tests")] + Self::Datastax { session, .. } => { let statement = stmt!(query); session.execute(&statement) } + Self::CdrsTokio { .. } => todo!(), } } #[allow(unused)] - pub fn execute_expect_err(&self, query: &str) -> CassandraError { + pub fn execute_expect_err(&self, query: &str) -> String { match self { - CassandraConnection::Datastax { session, .. } => { + #[cfg(feature = "cpp-driver-tests")] + Self::Datastax { session, .. } => { let statement = stmt!(query); - session.execute(&statement).wait().unwrap_err() + let error = session.execute(&statement).wait().unwrap_err(); + + if let ErrorKind::CassErrorResult(_, msg, ..) = error.0 { + return msg; + } + + panic!("Did not get an error result for {query}"); + } + Self::CdrsTokio { session, .. } => { + let error = futures::executor::block_on(session.query(query)).unwrap_err(); + + match error { + CdrsError::Server { body, .. } => body.message, + _ => todo!(), + } } } } #[allow(unused)] pub fn execute_expect_err_contains(&self, query: &str, contains: &str) { - let result = self.execute_expect_err(query).to_string(); + let result = self.execute_expect_err(query); assert!( result.contains(contains), "Expected the error to contain '{contains}' but it did not and was instead '{result}'" @@ -119,36 +320,64 @@ impl CassandraConnection { } #[allow(unused)] - pub fn prepare(&self, query: &str) -> PreparedStatement { + pub fn prepare(&self, query: &str) -> PreparedQuery { match self { - CassandraConnection::Datastax { session, .. } => { - session.prepare(query).unwrap().wait().unwrap() + #[cfg(feature = "cpp-driver-tests")] + Self::Datastax { session, .. } => { + PreparedQuery::Datastax(session.prepare(query).unwrap().wait().unwrap()) + } + Self::CdrsTokio { session, .. } => { + let query = futures::executor::block_on(session.prepare(query)).unwrap(); + PreparedQuery::CdrsTokio(query) } } } #[allow(unused)] - pub fn execute_prepared(&self, statement: &Statement) -> Vec> { + pub fn execute_prepared( + &self, + prepared_query: &PreparedQuery, + value: i32, + ) -> Vec> { match self { - CassandraConnection::Datastax { session, .. } => { - match session.execute(statement).wait() { + #[cfg(feature = "cpp-driver-tests")] + Self::Datastax { session, .. } => { + let mut statement = prepared_query.as_datastax().bind(); + statement.bind_int32(0, value); + match session.execute(&statement).wait() { Ok(result) => result .into_iter() - .map(|x| x.into_iter().map(ResultValue::new).collect()) + .map(|x| x.into_iter().map(ResultValue::new_from_cpp).collect()) .collect(), Err(Error(err, _)) => { panic!("The statement: {statement:?}\nFailed with: {err}") } } } + Self::CdrsTokio { session, .. } => { + let statement = prepared_query.as_cdrs(); + let response = futures::executor::block_on( + session.exec_with_values(statement, query_values!(value)), + ) + .unwrap(); + + Self::process_cdrs_response(response) + } } } #[allow(unused)] - pub fn execute_batch(&self, batch: &Batch) { + pub fn execute_batch(&self, queries: Vec<(String, i32)>) { match self { - CassandraConnection::Datastax { session, .. } => { - match session.execute_batch(batch).wait() { + #[cfg(feature = "cpp-driver-tests")] + Self::Datastax { session, .. } => { + let mut batch = Batch::new(BatchType::LOGGED); + + for (query, value) in queries { + batch.add_statement(stmt!(query.as_str()).bind_int32(0, value).unwrap()); + } + + match session.execute_batch(&batch).wait() { Ok(result) => assert_eq!( result.into_iter().count(), 0, @@ -157,21 +386,81 @@ impl CassandraConnection { Err(Error(err, _)) => panic!("The batch: {batch:?}\nFailed with: {err}"), } } + Self::CdrsTokio { session, .. } => { + let mut builder = BatchQueryBuilder::new(); + + for (query, value) in queries { + builder = builder.add_query(query, query_values!(value)); + } + + let batch = builder.build().unwrap(); + + futures::executor::block_on(session.batch(batch)).unwrap(); + } } } #[allow(unused)] - pub fn execute_batch_expect_err(&self, batch: &Batch) -> CassandraError { + pub fn execute_batch_expect_err(&self, queries: Vec<(String, i32)>) -> String { match self { - CassandraConnection::Datastax { session, .. } => { - session.execute_batch(batch).wait().unwrap_err() + #[cfg(feature = "cpp-driver-tests")] + Self::Datastax { session, .. } => { + let mut batch = Batch::new(BatchType::LOGGED); + for (query, value) in queries { + batch.add_statement(stmt!(query.as_str()).bind_int32(0, value).unwrap()); + } + let error = session.execute_batch(&batch).wait().unwrap_err(); + if let ErrorKind::CassErrorResult(_, msg, ..) = error.0 { + return msg; + } + + panic!("Did not get an error result for {batch:?}"); } + Self::CdrsTokio { .. } => todo!(), + } + } + + fn process_cdrs_response(response: Envelope) -> Vec> { + let version = response.version; + let response_body = response.response_body().unwrap(); + + match response_body { + ResponseBody::Error(err) => { + panic!("CQL query Failed with: {err:?}") + } + ResponseBody::Result(res_result_body) => match res_result_body { + ResResultBody::Rows(rows) => { + let mut result_values = vec![]; + + for row in &rows.rows_content { + let mut row_result_values = vec![]; + for (i, col_spec) in rows.metadata.col_specs.iter().enumerate() { + let wrapper = wrapper_fn(&col_spec.col_type.id); + let value = ResultValue::new_from_cdrs( + wrapper(&row[i], &col_spec.col_type, version).unwrap(), + version, + ); + + row_result_values.push(value); + } + result_values.push(row_result_values); + } + + result_values + } + ResResultBody::Prepared(_) => todo!(), + ResResultBody::SchemaChange(_) => vec![], + ResResultBody::SetKeyspace(_) => vec![], + ResResultBody::Void => vec![], + }, + _ => todo!(), } } } #[derive(Debug, Clone, PartialOrd, Eq, Ord)] pub enum ResultValue { + #[cfg(feature = "cpp-driver-tests")] Text(String), Varchar(String), Int(i32), @@ -186,7 +475,7 @@ pub enum ResultValue { Float(OrderedFloat), Inet(String), SmallInt(i16), - Time(Vec), // TODO should be String + Time(Vec), // TODO shoulbe be String Timestamp(i64), TimeUuid(uuid::Uuid), Counter(i64), @@ -205,6 +494,7 @@ pub enum ResultValue { impl PartialEq for ResultValue { fn eq(&self, other: &Self) -> bool { match (self, other) { + #[cfg(feature = "cpp-driver-tests")] (Self::Text(l0), Self::Text(r0)) => l0 == r0, (Self::Varchar(l0), Self::Varchar(r0)) => l0 == r0, (Self::Int(l0), Self::Int(r0)) => l0 == r0, @@ -238,7 +528,8 @@ impl PartialEq for ResultValue { impl ResultValue { #[allow(unused)] - pub fn new(value: Value) -> ResultValue { + #[cfg(feature = "cpp-driver-tests")] + pub fn new_from_cpp(value: Value) -> Self { match value.get_type() { ValueType::TEXT => ResultValue::Text(value.get_string().unwrap()), ValueType::VARCHAR => ResultValue::Varchar(value.get_string().unwrap()), @@ -255,10 +546,7 @@ impl ResultValue { ValueType::DOUBLE => ResultValue::Double(value.get_f64().unwrap().into()), ValueType::DURATION => ResultValue::Duration(value.get_bytes().unwrap().to_vec()), ValueType::FLOAT => ResultValue::Float(value.get_f32().unwrap().into()), - ValueType::INET => value - .get_inet() - .map(|x| ResultValue::Inet(x.to_string())) - .unwrap_or_else(|_| ResultValue::Inet("NULL address".to_string())), + ValueType::INET => ResultValue::Inet(value.get_inet().unwrap().to_string()), ValueType::SMALL_INT => ResultValue::SmallInt(value.get_i16().unwrap()), ValueType::TIME => ResultValue::Time(value.get_bytes().unwrap().to_vec()), ValueType::TIMESTAMP => ResultValue::Timestamp(value.get_i64().unwrap()), @@ -273,24 +561,21 @@ impl ResultValue { ValueType::LIST => { let mut list = Vec::new(); for i in value.get_set().unwrap() { - list.push(ResultValue::new(i)); + list.push(ResultValue::new_from_cpp(i)); } ResultValue::List(list) } ValueType::MAP => { let mut map = Vec::new(); - // null value results in empty map - if let Ok(kv) = value.get_map() { - for (k, v) in kv { - map.push((ResultValue::new(k), ResultValue::new(v))); - } + for (k, v) in value.get_map().unwrap() { + map.push((ResultValue::new_from_cpp(k), ResultValue::new_from_cpp(v))); } ResultValue::Map(map) } ValueType::SET => { let mut set = Vec::new(); for i in value.get_set().unwrap() { - set.push(ResultValue::new(i)); + set.push(ResultValue::new_from_cpp(i)); } ResultValue::Set(set) } @@ -298,18 +583,61 @@ impl ResultValue { ValueType::TUPLE => todo!(), } } -} -/// Execute a `query` against the `session` and return result rows -#[allow(unused)] -pub fn execute_query(session: &Session, query: &str) -> Vec> { - let statement = stmt!(query); - match session.execute(&statement).wait() { - Ok(result) => result - .into_iter() - .map(|x| x.into_iter().map(ResultValue::new).collect()) - .collect(), - Err(Error(err, _)) => panic!("The CSQL query: {query}\nFailed with: {err}"), + pub fn new_from_cdrs(value: CassandraType, version: Version) -> Self { + match value { + CassandraType::Ascii(ascii) => ResultValue::Ascii(ascii), + CassandraType::Bigint(big_int) => ResultValue::BigInt(big_int), + CassandraType::Blob(blob) => ResultValue::Blob(blob.into_vec()), + CassandraType::Boolean(b) => ResultValue::Boolean(b), + CassandraType::Counter(counter) => ResultValue::Counter(counter), + CassandraType::Decimal(decimal) => { + ResultValue::Decimal(decimal.serialize_to_vec(version)) + } + CassandraType::Double(double) => ResultValue::Double(double.into()), + CassandraType::Float(float) => ResultValue::Float(float.into()), + CassandraType::Int(int) => ResultValue::Int(int), + CassandraType::Timestamp(timestamp) => ResultValue::Timestamp(timestamp), + CassandraType::Uuid(uuid) => ResultValue::Uuid(uuid), + CassandraType::Varchar(varchar) => ResultValue::Varchar(varchar), + CassandraType::Varint(var_int) => ResultValue::VarInt(var_int.to_signed_bytes_be()), + CassandraType::Timeuuid(uuid) => ResultValue::TimeUuid(uuid), + CassandraType::Inet(ip_addr) => ResultValue::Inet(ip_addr.to_string()), + CassandraType::Date(date) => ResultValue::Date(date.serialize_to_vec(version)), + CassandraType::Time(time) => ResultValue::Time(time.serialize_to_vec(version)), + CassandraType::Smallint(small_int) => ResultValue::SmallInt(small_int), + CassandraType::Tinyint(tiny_int) => ResultValue::TinyInt(tiny_int), + CassandraType::Duration(duration) => { + ResultValue::Duration(duration.serialize_to_vec(version)) + } + CassandraType::List(list) => { + let mut elements = Vec::new(); + for element in list { + elements.push(ResultValue::new_from_cdrs(element, version)); + } + ResultValue::List(elements) + } + CassandraType::Map(map) => { + let mut elements = Vec::new(); + for (k, v) in map { + elements.push(( + ResultValue::new_from_cdrs(k, version), + ResultValue::new_from_cdrs(v, version), + )); + } + ResultValue::Map(elements) + } + CassandraType::Set(set) => { + let mut elements = Vec::new(); + for element in set { + elements.push(ResultValue::new_from_cdrs(element, version)); + } + ResultValue::Set(elements) + } + CassandraType::Udt(_) => todo!(), + CassandraType::Tuple(_) => todo!(), + CassandraType::Null => todo!(), + } } } diff --git a/shotover-proxy/tests/helpers/mod.rs b/shotover-proxy/tests/helpers/mod.rs index 587ed44c3..451822621 100644 --- a/shotover-proxy/tests/helpers/mod.rs +++ b/shotover-proxy/tests/helpers/mod.rs @@ -1,10 +1,9 @@ use anyhow::Result; -use cassandra_cpp::{Cluster, Ssl}; +use cassandra::{CassandraConnection, CassandraDriver}; use redis::aio::AsyncStream; use redis::Client; use shotover_proxy::runner::{ConfigOpts, Runner}; use shotover_proxy::tls::{TlsConnector, TlsConnectorConfig}; -use std::fs::read_to_string; use std::pin::Pin; use std::sync::mpsc; use std::time::Duration; @@ -14,7 +13,6 @@ use tokio::task::JoinHandle; use tokio_io_timeout::TimeoutStream; pub mod cassandra; -use cassandra::CassandraConnection; #[must_use] pub struct ShotoverManager { @@ -148,12 +146,13 @@ impl ShotoverManager { } #[allow(unused)] - pub async fn cassandra_connection( + pub fn cassandra_connection( &self, contact_points: &str, port: u16, + driver: CassandraDriver, ) -> CassandraConnection { - CassandraConnection::new(contact_points, port).await + CassandraConnection::new(contact_points, port, driver) } #[allow(unused)] @@ -162,26 +161,9 @@ impl ShotoverManager { contact_points: &str, port: u16, ca_cert_path: &str, + driver: CassandraDriver, ) -> CassandraConnection { - let ca_cert = read_to_string(ca_cert_path).unwrap(); - let mut ssl = Ssl::default(); - Ssl::add_trusted_cert(&mut ssl, &ca_cert).unwrap(); - - for contact_point in contact_points.split(',') { - test_helpers::wait_for_socket_to_open(contact_point, port); - } - - let mut cluster = Cluster::default(); - cluster.set_credentials("cassandra", "cassandra").unwrap(); - cluster.set_contact_points(contact_points).unwrap(); - cluster.set_port(port).ok(); - cluster.set_load_balance_round_robin(); - cluster.set_ssl(&mut ssl); - - CassandraConnection::Datastax { - session: cluster.connect().unwrap(), - schema_awaiter: None, - } + CassandraConnection::new_tls(contact_points, port, ca_cert_path, driver) } fn shutdown_shotover(&mut self) -> Result<()> { diff --git a/test-helpers/src/docker_compose.rs b/test-helpers/src/docker_compose.rs index 86de54207..8fdbcc25a 100644 --- a/test-helpers/src/docker_compose.rs +++ b/test-helpers/src/docker_compose.rs @@ -6,7 +6,7 @@ use std::process::Command; use std::thread; use std::time; use subprocess::{Exec, Redirection}; -use tracing::{debug, info}; +use tracing::trace; /// Runs a command and returns the output as a string. /// @@ -17,7 +17,7 @@ use tracing::{debug, info}; /// * `args` - An array of command line arguments for the command /// pub fn run_command(command: &str, args: &[&str]) -> Result { - debug!("executing {}", command); + trace!("executing {}", command); let data = Exec::cmd(command) .args(args) .stdout(Redirection::Pipe) @@ -150,7 +150,7 @@ impl DockerCompose { /// * If `count` occurrences of `log_text` is not found in the log within 90 seconds. /// fn wait_for_log(&self, log_text: &str, count: usize, timeout_seconds: u64) { - info!("wait_for_log: '{log_text}' {count}"); + trace!("wait_for_log: '{log_text}' {count}"); let args = ["-f", &self.file_path, "logs"]; let re = Regex::new(log_text).unwrap(); let sys_time = time::Instant::now(); @@ -166,11 +166,11 @@ impl DockerCompose { result ); } - debug!("wait_for_log: {log_text:?} looping {my_count}/{count}"); + trace!("wait_for_log: {log_text:?} looping {my_count}/{count}"); result = run_command("docker-compose", &args).unwrap(); my_count = re.find_iter(&result).count(); } - debug!( + trace!( "wait_for_log: found '{}' {} times in {:?} seconds", log_text, count, @@ -183,7 +183,7 @@ impl DockerCompose { /// # Arguments /// * `file_path` - The path to the docker-compose yaml file that was used to start docker. fn clean_up(file_path: &str) -> Result<()> { - debug!("bringing down docker compose {}", file_path); + trace!("bringing down docker compose {}", file_path); run_command("docker-compose", &["-f", file_path, "down", "-v"])?; run_command("docker-compose", &["-f", file_path, "rm", "-f", "-s", "-v"])?;