diff --git a/Cargo.lock b/Cargo.lock index dc7ea8ded..d80330919 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -155,13 +155,24 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" +[[package]] +name = "bigdecimal" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1e50562e37200edf7c6c43e54a08e64a5553bfb59d9c297d5572512aa517256" +dependencies = [ + "num-bigint 0.3.3", + "num-integer", + "num-traits 0.2.15", +] + [[package]] name = "bigdecimal" version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6aaf33151a6429fe9211d1b276eafdf70cdff28b071e76c0b0e1503221ea3744" dependencies = [ - "num-bigint", + "num-bigint 0.4.3", "num-integer", "num-traits 0.2.15", "serde", @@ -436,7 +447,7 @@ version = "3.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "759bf187376e1afa7b85b959e6a664a3e7a95203415dba952ad19139e798f902" dependencies = [ - "heck", + "heck 0.4.0", "proc-macro-error", "proc-macro2", "quote", @@ -509,7 +520,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c9662338ec83ade685bf4e5e70d11a505033a3715dc6b1b7b0650a950857778" dependencies = [ - "bigdecimal", + "bigdecimal 0.3.0", "bytes", "hex", "itertools", @@ -684,6 +695,18 @@ dependencies = [ "syn", ] +[[package]] +name = "dashmap" +version = "5.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3495912c9c1ccf2e18976439f4443f3fee0fd61f424ff99fde6a66b15ecb448f" +dependencies = [ + "cfg-if", + "hashbrown", + "lock_api", + "parking_lot_core", +] + [[package]] name = "derivative" version = "2.2.0" @@ -1055,6 +1078,15 @@ dependencies = [ "ahash", ] +[[package]] +name = "heck" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "heck" version = "0.4.0" @@ -1082,6 +1114,12 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ebdb29d2ea9ed0083cd8cece49bbd968021bd99b0849edb4a9a7ee0fdf6a4e0" +[[package]] +name = "histogram" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12cb882ccb290b8646e554b157ab0b71e64e8d5bef775cd66b6531e52d302669" + [[package]] name = "hmac" version = "0.11.0" @@ -1546,7 +1584,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43db66d1170d347f9a065114077f7dccb00c1b9478c89384490a3425279a4606" dependencies = [ - "num-bigint", + "num-bigint 0.4.3", "num-complex", "num-integer", "num-iter", @@ -1554,6 +1592,17 @@ dependencies = [ "num-traits 0.2.15", ] +[[package]] +name = "num-bigint" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f6f7833f2cbf2360a6cfd58cd41a53aa7a90bd4c202f5b1c7dd2ed73c57b2c3" +dependencies = [ + "autocfg", + "num-integer", + "num-traits 0.2.15", +] + [[package]] name = "num-bigint" version = "0.4.3" @@ -1604,7 +1653,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0" dependencies = [ "autocfg", - "num-bigint", + "num-bigint 0.4.3", "num-integer", "num-traits 0.2.15", "serde", @@ -1639,6 +1688,27 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf5395665662ef45796a4ff5486c5d41d29e0c09640af4c5f17fd94ee2c119c9" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0498641e53dd6ac1a4f22547548caa6864cc4933784319cd1775271c5a46ce" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "num_threads" version = "0.1.6" @@ -1930,6 +2000,17 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c6fa0831dd7cc608c38a5e323422a0077678fa5744aa2be4ad91c4ece8eec8d5" +[[package]] +name = "proc-macro-crate" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d50bfb8c23f23915855a00d98b5a35ef2e0b871bb52937bacadb798fbb66c8" +dependencies = [ + "once_cell", + "thiserror", + "toml", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -2392,6 +2473,46 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "scylla" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e16bd82cb3eb8961f45759695eee56f162e73c1e2e30042d9450dfa98d600ac1" +dependencies = [ + "arc-swap", + "bigdecimal 0.2.2", + "byteorder", + "bytes", + "chrono", + "dashmap", + "futures", + "histogram", + "itertools", + "lz4_flex", + "num-bigint 0.3.3", + "num_enum", + "rand", + "scylla-macros", + "smallvec", + "snap", + "strum", + "strum_macros 0.23.1", + "thiserror", + "tokio", + "tracing", + "uuid 1.1.2", +] + +[[package]] +name = "scylla-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13dc0caffb1274feb3df615e3260cb71a5a7a5d579adc49ba5544c87950a701c" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "security-framework" version = "2.6.1" @@ -2563,7 +2684,7 @@ dependencies = [ "async-recursion", "async-trait", "base64", - "bigdecimal", + "bigdecimal 0.3.0", "bincode", "bytes", "bytes-utils", @@ -2607,11 +2728,12 @@ dependencies = [ "reqwest", "rusoto_kms", "rusoto_signature", + "scylla", "serde", "serde_json", "serde_yaml", "serial_test", - "strum_macros", + "strum_macros 0.24.2", "test-helpers", "thiserror", "tls-parser", @@ -2700,13 +2822,32 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "strum" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cae14b91c7d11c9a851d3fbc80a963198998c2a64eec840477fa92d8ce9b70bb" + +[[package]] +name = "strum_macros" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bb0dc7ee9c15cea6199cde9a127fa16a4c5819af85395457ad72d68edc85a38" +dependencies = [ + "heck 0.3.3", + "proc-macro2", + "quote", + "rustversion", + "syn", +] + [[package]] name = "strum_macros" version = "0.24.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4faebde00e8ff94316c01800f9054fd2ba77d30d9e922541913051d1d978918b" dependencies = [ - "heck", + "heck 0.4.0", "proc-macro2", "quote", "rustversion", @@ -2993,6 +3134,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "toml" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d82e1a7758622a465f8cee077614c73484dac5b836c02ff6a40d5d1010324d7" +dependencies = [ + "serde", +] + [[package]] name = "tower-service" version = "0.3.2" @@ -3137,6 +3287,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-segmentation" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e8820f5d777f6224dc4be3632222971ac30164d4a258d595640799554ebfd99" + [[package]] name = "unicode-width" version = "0.1.9" diff --git a/shotover-proxy/Cargo.toml b/shotover-proxy/Cargo.toml index 13d01150e..f2952d0cb 100644 --- a/shotover-proxy/Cargo.toml +++ b/shotover-proxy/Cargo.toml @@ -91,6 +91,7 @@ nix = "0.24.0" reqwest = "0.11.6" metrics-util = "0.14.0" cdrs-tokio = { git = "https://github.com/krojew/cdrs-tokio" } +scylla = "0.4.7" [[bench]] name = "redis_benches" diff --git a/shotover-proxy/tests/cassandra_int_tests/functions.rs b/shotover-proxy/tests/cassandra_int_tests/functions.rs index faa051f68..13943d4d4 100644 --- a/shotover-proxy/tests/cassandra_int_tests/functions.rs +++ b/shotover-proxy/tests/cassandra_int_tests/functions.rs @@ -1,29 +1,45 @@ +use crate::cassandra_int_tests::schema_awaiter::SchemaAwaiter; use crate::helpers::cassandra::{assert_query_result, run_query, CassandraConnection, ResultValue}; fn drop_function(session: &CassandraConnection) { - assert_query_result(session, "SELECT test_function_keyspace.my_function(x, y) FROM test_function_keyspace.test_function_table WHERE id=1;", &[&[ResultValue::Int(4)]]); + assert_query_result( + session, + "SELECT test_function_keyspace.my_function(x, y) FROM test_function_keyspace.test_function_table WHERE id=1;", + &[&[ResultValue::Int(4)]] + ); run_query(session, "DROP FUNCTION test_function_keyspace.my_function"); let statement = "SELECT test_function_keyspace.my_function(x) FROM test_function_keyspace.test_function_table WHERE id=1;"; let result = session.execute_expect_err(statement).to_string(); - assert_eq!(result, "Cassandra detailed error SERVER_INVALID_QUERY: Unknown function 'test_function_keyspace.my_function'"); + assert_eq!( + result, + "Cassandra detailed error SERVER_INVALID_QUERY: Unknown function 'test_function_keyspace.my_function'" + ); } -fn create_function(session: &CassandraConnection) { +async fn create_function(session: &CassandraConnection, direct_connections: &SchemaAwaiter) { run_query( - session, - "CREATE FUNCTION test_function_keyspace.my_function (a int, b int) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE javascript AS 'a * b';", - ); - assert_query_result(session, "SELECT test_function_keyspace.my_function(x, y) FROM test_function_keyspace.test_function_table;",&[&[ResultValue::Int(4)], &[ResultValue::Int(9)], &[ResultValue::Int(16)]]); + session, + "CREATE FUNCTION test_function_keyspace.my_function (a int, b int) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE javascript AS 'a * b';", + ); + direct_connections.await_schema_agreement().await; + assert_query_result( + session, + "SELECT test_function_keyspace.my_function(x, y) FROM test_function_keyspace.test_function_table;", + &[&[ResultValue::Int(4)], &[ResultValue::Int(9)], &[ResultValue::Int(16)]] + ); } -pub fn test(session: &CassandraConnection) { - run_query(session, "CREATE KEYSPACE test_function_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };"); +pub async fn test(session: &CassandraConnection, direct_connections: &SchemaAwaiter) { + run_query( + session, + "CREATE KEYSPACE test_function_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };" + ); run_query( - session, - "CREATE TABLE test_function_keyspace.test_function_table (id int PRIMARY KEY, x int, y int);", - ); + session, + "CREATE TABLE test_function_keyspace.test_function_table (id int PRIMARY KEY, x int, y int);", + ); run_query( session, r#"BEGIN BATCH @@ -33,6 +49,6 @@ INSERT INTO test_function_keyspace.test_function_table (id, x, y) VALUES (3, 4, APPLY BATCH;"#, ); - create_function(session); + create_function(session, direct_connections).await; drop_function(session); } diff --git a/shotover-proxy/tests/cassandra_int_tests/mod.rs b/shotover-proxy/tests/cassandra_int_tests/mod.rs index 1fe4d38d5..d63ad138e 100644 --- a/shotover-proxy/tests/cassandra_int_tests/mod.rs +++ b/shotover-proxy/tests/cassandra_int_tests/mod.rs @@ -1,3 +1,4 @@ +use crate::cassandra_int_tests::schema_awaiter::SchemaAwaiter; use crate::helpers::cassandra::{assert_query_result, run_query, ResultValue}; use crate::helpers::ShotoverManager; use cassandra_cpp::{stmt, Batch, BatchType, Error, ErrorKind}; @@ -24,25 +25,27 @@ mod native_types; mod prepared_statements; #[cfg(feature = "alpha-transforms")] mod protect; +mod schema_awaiter; mod table; mod udt; -#[test] +#[tokio::test(flavor = "multi_thread")] #[serial] -fn test_passthrough() { +async fn test_passthrough() { let _compose = DockerCompose::new("example-configs/cassandra-passthrough/docker-compose.yml"); let shotover_manager = ShotoverManager::from_topology_file("example-configs/cassandra-passthrough/topology.yaml"); let connection = shotover_manager.cassandra_connection("127.0.0.1", 9042); + let schema_awaiter = SchemaAwaiter::new("127.0.0.1:9043").await; keyspace::test(&connection); table::test(&connection); udt::test(&connection); native_types::test(&connection); collections::test(&connection); - functions::test(&connection); + functions::test(&connection, &schema_awaiter).await; prepared_statements::test(&connection); batch_statements::test(&connection); } @@ -76,28 +79,28 @@ fn test_source_tls_and_single_tls() { udt::test(&connection); native_types::test(&connection); collections::test(&connection); - functions::test(&connection); prepared_statements::test(&connection); batch_statements::test(&connection); } -#[test] +#[tokio::test(flavor = "multi_thread")] #[serial] #[cfg(feature = "alpha-transforms")] -fn test_cluster() { +async fn test_cluster() { let _compose = DockerCompose::new("example-configs/cassandra-cluster/docker-compose.yml"); let shotover_manager = ShotoverManager::from_topology_file("example-configs/cassandra-cluster/topology.yaml"); let connection = shotover_manager.cassandra_connection("127.0.0.1", 9042); + let schema_awaiter = SchemaAwaiter::new("172.16.1.2:9042").await; keyspace::test(&connection); table::test(&connection); udt::test(&connection); native_types::test(&connection); collections::test(&connection); - functions::test(&connection); + functions::test(&connection, &schema_awaiter).await; prepared_statements::test(&connection); batch_statements::test(&connection); } @@ -132,14 +135,13 @@ fn test_source_tls_and_cluster_tls() { udt::test(&connection); native_types::test(&connection); collections::test(&connection); - functions::test(&connection); prepared_statements::test(&connection); batch_statements::test(&connection); } -#[test] +#[tokio::test(flavor = "multi_thread")] #[serial] -fn test_cassandra_redis_cache() { +async fn test_cassandra_redis_cache() { let recorder = DebuggingRecorder::new(); let snapshotter = recorder.snapshotter(); recorder.install().unwrap(); @@ -151,20 +153,21 @@ fn test_cassandra_redis_cache() { let mut redis_connection = shotover_manager.redis_connection(6379); let connection = shotover_manager.cassandra_connection("127.0.0.1", 9042); + let schema_awaiter = SchemaAwaiter::new("127.0.0.1:9043").await; keyspace::test(&connection); table::test(&connection); udt::test(&connection); - functions::test(&connection); + functions::test(&connection, &schema_awaiter).await; prepared_statements::test(&connection); batch_statements::test(&connection); cache::test(&connection, &mut redis_connection, &snapshotter); } -#[test] +#[tokio::test(flavor = "multi_thread")] #[serial] #[cfg(feature = "alpha-transforms")] -fn test_cassandra_protect_transform_local() { +async fn test_cassandra_protect_transform_local() { let _compose = DockerCompose::new("example-configs/cassandra-protect-local/docker-compose.yml"); let shotover_manager = ShotoverManager::from_topology_file( @@ -173,21 +176,22 @@ fn test_cassandra_protect_transform_local() { let shotover_connection = shotover_manager.cassandra_connection("127.0.0.1", 9042); let direct_connection = shotover_manager.cassandra_connection("127.0.0.1", 9043); + let schema_awaiter = SchemaAwaiter::new("127.0.0.1:9043").await; keyspace::test(&shotover_connection); table::test(&shotover_connection); udt::test(&shotover_connection); native_types::test(&shotover_connection); collections::test(&shotover_connection); - functions::test(&shotover_connection); + functions::test(&shotover_connection, &schema_awaiter).await; batch_statements::test(&shotover_connection); protect::test(&shotover_connection, &direct_connection); } -#[test] +#[tokio::test(flavor = "multi_thread")] #[serial] #[cfg(feature = "alpha-transforms")] -fn test_cassandra_protect_transform_aws() { +async fn test_cassandra_protect_transform_aws() { let _compose = DockerCompose::new("example-configs/cassandra-protect-aws/docker-compose.yml"); let _compose_aws = DockerCompose::new_moto(); @@ -196,13 +200,14 @@ fn test_cassandra_protect_transform_aws() { let shotover_connection = shotover_manager.cassandra_connection("127.0.0.1", 9042); let direct_connection = shotover_manager.cassandra_connection("127.0.0.1", 9043); + let schema_awaiter = SchemaAwaiter::new("127.0.0.1:9043").await; keyspace::test(&shotover_connection); table::test(&shotover_connection); udt::test(&shotover_connection); native_types::test(&shotover_connection); collections::test(&shotover_connection); - functions::test(&shotover_connection); + functions::test(&shotover_connection, &schema_awaiter).await; batch_statements::test(&shotover_connection); protect::test(&shotover_connection, &direct_connection); } diff --git a/shotover-proxy/tests/cassandra_int_tests/schema_awaiter.rs b/shotover-proxy/tests/cassandra_int_tests/schema_awaiter.rs new file mode 100644 index 000000000..36447d6c4 --- /dev/null +++ b/shotover-proxy/tests/cassandra_int_tests/schema_awaiter.rs @@ -0,0 +1,26 @@ +use scylla::{Session, SessionBuilder}; + +// Modifying the schema will take a while to propagate to all nodes. +// It seems adding a table doesnt cause any problems, maybe cassandra is just routing to a node that has the table. +// But for cases like adding a new function we hit issues where the function is not yet propagated to all nodes. +// So we make use of the scylla drivers await_schema_agreement logic to wait until all nodes are on the same schema. +pub struct SchemaAwaiter { + session: Session, +} + +impl SchemaAwaiter { + pub async fn new(node: &str) -> Self { + SchemaAwaiter { + session: SessionBuilder::new() + .known_node(node) + .user("cassandra", "cassandra") + .build() + .await + .unwrap(), + } + } + + pub async fn await_schema_agreement(&self) { + self.session.await_schema_agreement().await.unwrap(); + } +}