From ccc58ce1ca6ef82ee82e92b3fc18856e8367048e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Thu, 21 Nov 2024 14:19:51 +0100 Subject: [PATCH] Merge pull request #1127 from Lorak-mmk/fix-concurrent-ddl Fix concurrent DDL queries (cherry picked from commit 64647f957c93e3d70a7f60c5a18e875bc347116f) --- CONTRIBUTING.md | 12 + scylla/src/transport/caching_session.rs | 29 +- scylla/src/transport/connection.rs | 20 +- scylla/src/transport/session_test.rs | 360 +++++++----------- scylla/src/utils/test_utils.rs | 86 ++++- scylla/tests/integration/authenticate.rs | 16 +- scylla/tests/integration/consistency.rs | 8 +- scylla/tests/integration/cql_collections.rs | 14 +- scylla/tests/integration/cql_types.rs | 131 +++---- scylla/tests/integration/cql_value.rs | 59 ++- .../tests/integration/execution_profiles.rs | 7 +- scylla/tests/integration/history.rs | 6 +- .../integration/large_batch_statements.rs | 16 +- scylla/tests/integration/lwt_optimisation.rs | 5 +- scylla/tests/integration/retries.rs | 14 +- scylla/tests/integration/shards.rs | 8 +- .../tests/integration/silent_prepare_batch.rs | 9 +- .../tests/integration/silent_prepare_query.rs | 10 +- .../integration/skip_metadata_optimization.rs | 11 +- scylla/tests/integration/tablets.rs | 30 +- scylla/tests/integration/utils.rs | 67 +++- 21 files changed, 467 insertions(+), 451 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index f152583b2e..05d68cbfd8 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -49,6 +49,18 @@ The above commands will leave a running ScyllaDB cluster in the background. To stop it, use `make down`.\ Starting a cluster without running any test is possible with `make up`. +### Writing tests that need to connect to Scylla + +If you test requires connecting to Scylla, there are a few things you should consider. + +1. Such tests are considered integration tests and should be placed in `scylla/tests/integration`. +2. To avoid name conflicts while creating a keyspace use `unique_keyspace_name` function from `utils` module. +3. This `utils` module (`scylla/tests/integration/utils.rs`) contains other functions that may be helpful for writing tests. + For example `create_new_session_builder` or `test_with_3_node_cluster`. +4. To perform DDL queries (creating / altering / dropping a keyspace / table /type) use `ddl` method from the utils module. + To do this, import the `PerformDDL` trait (`use crate::utils::PerformDDL;`). Then you can call `ddl` method on a + `Session`. + ### Tracing in tests By default cargo captures `print!` macro's output from tests and prints them for failed tests. diff --git a/scylla/src/transport/caching_session.rs b/scylla/src/transport/caching_session.rs index 79d2c25388..77668b28cc 100644 --- a/scylla/src/transport/caching_session.rs +++ b/scylla/src/transport/caching_session.rs @@ -329,7 +329,9 @@ where mod tests { use crate::query::Query; use crate::statement::PagingState; - use crate::test_utils::{create_new_session_builder, scylla_supports_tablets, setup_tracing}; + use crate::test_utils::{ + create_new_session_builder, scylla_supports_tablets, setup_tracing, PerformDDL, + }; use crate::transport::partitioner::PartitionerName; use crate::transport::session::Session; use crate::utils::test_utils::unique_keyspace_name; @@ -358,18 +360,15 @@ mod tests { } session - .query_unpaged(create_ks, &[]) + .ddl(create_ks) .await .expect("Could not create keyspace"); session - .query_unpaged( - format!( - "CREATE TABLE IF NOT EXISTS {}.test_table (a int primary key, b int)", - ks - ), - &[], - ) + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {}.test_table (a int primary key, b int)", + ks + )) .await .expect("Could not create table"); @@ -566,10 +565,7 @@ mod tests { let session: CachingSession = create_caching_session().await; session - .execute_unpaged( - "CREATE TABLE IF NOT EXISTS test_batch_table (a int, b int, primary key (a, b))", - (), - ) + .ddl("CREATE TABLE IF NOT EXISTS test_batch_table (a int, b int, primary key (a, b))") .await .unwrap(); @@ -689,7 +685,7 @@ mod tests { let session: CachingSession = CachingSession::from(new_for_test(true).await, 100); session - .execute_unpaged("CREATE TABLE tbl (a int PRIMARY KEY, b int)", ()) + .ddl("CREATE TABLE tbl (a int PRIMARY KEY, b int)") .await .unwrap(); @@ -745,10 +741,7 @@ mod tests { let session: CachingSession = CachingSession::from(new_for_test(false).await, 100); session - .execute_unpaged( - "CREATE TABLE tbl (a int PRIMARY KEY) with cdc = {'enabled': true}", - &(), - ) + .ddl("CREATE TABLE tbl (a int PRIMARY KEY) with cdc = {'enabled': true}") .await .unwrap(); diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 26e41ae723..576c2acecc 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -2397,7 +2397,7 @@ mod tests { use crate::transport::connection::open_connection; use crate::transport::node::ResolvedContactPoint; use crate::transport::topology::UntranslatedEndpoint; - use crate::utils::test_utils::unique_keyspace_name; + use crate::utils::test_utils::{unique_keyspace_name, PerformDDL}; use crate::SessionBuilder; use futures::{StreamExt, TryStreamExt}; use std::collections::HashMap; @@ -2452,17 +2452,14 @@ mod tests { .build() .await .unwrap(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks.clone()), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks.clone())).await.unwrap(); session.use_keyspace(ks.clone(), false).await.unwrap(); session - .query_unpaged("DROP TABLE IF EXISTS connection_query_iter_tab", &[]) + .ddl("DROP TABLE IF EXISTS connection_query_iter_tab") .await .unwrap(); session - .query_unpaged( - "CREATE TABLE IF NOT EXISTS connection_query_iter_tab (p int primary key)", - &[], - ) + .ddl("CREATE TABLE IF NOT EXISTS connection_query_iter_tab (p int primary key)") .await .unwrap(); } @@ -2548,13 +2545,10 @@ mod tests { .build() .await .unwrap(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks.clone()), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks.clone())).await.unwrap(); session.use_keyspace(ks.clone(), false).await.unwrap(); session - .query_unpaged( - "CREATE TABLE IF NOT EXISTS t (p int primary key, v blob)", - &[], - ) + .ddl("CREATE TABLE IF NOT EXISTS t (p int primary key, v blob)") .await .unwrap(); } @@ -2580,7 +2574,7 @@ mod tests { .await .unwrap(); - connection.query_unpaged("TRUNCATE t").await.unwrap(); + connection.ddl("TRUNCATE t").await.unwrap(); let mut futs = Vec::new(); diff --git a/scylla/src/transport/session_test.rs b/scylla/src/transport/session_test.rs index 3e3552c230..6844a5ef78 100644 --- a/scylla/src/transport/session_test.rs +++ b/scylla/src/transport/session_test.rs @@ -5,7 +5,6 @@ use crate::query::Query; use crate::retry_policy::{QueryInfo, RetryDecision, RetryPolicy, RetrySession}; use crate::routing::Token; use crate::statement::Consistency; -use crate::test_utils::{scylla_supports_tablets, setup_tracing}; use crate::tracing::TracingInfo; use crate::transport::errors::{BadKeyspaceName, BadQuery, DbError, QueryError}; use crate::transport::partitioner::{ @@ -17,7 +16,8 @@ use crate::transport::topology::{ CollectionType, ColumnKind, CqlType, NativeType, UserDefinedType, }; use crate::utils::test_utils::{ - create_new_session_builder, supports_feature, unique_keyspace_name, + create_new_session_builder, scylla_supports_tablets, setup_tracing, supports_feature, + unique_keyspace_name, PerformDDL, }; use crate::ExecutionProfile; use crate::{self as scylla, QueryResult}; @@ -70,15 +70,12 @@ async fn test_unprepared_statement() { let session = create_new_session_builder().build().await.unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session - .query_unpaged( - format!( - "CREATE TABLE IF NOT EXISTS {}.t (a int, b int, c text, primary key (a, b))", - ks - ), - &[], - ) + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {}.t (a int, b int, c text, primary key (a, b))", + ks + )) .await .unwrap(); @@ -177,19 +174,16 @@ async fn test_prepared_statement() { let session = create_new_session_builder().build().await.unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session - .query_unpaged( - format!( - "CREATE TABLE IF NOT EXISTS {}.t2 (a int, b int, c text, primary key (a, b))", - ks - ), - &[], - ) + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {}.t2 (a int, b int, c text, primary key (a, b))", + ks + )) .await .unwrap(); session - .query_unpaged(format!("CREATE TABLE IF NOT EXISTS {}.complex_pk (a int, b int, c text, d int, e int, primary key ((a,b,c),d))", ks), &[]) + .ddl(format!("CREATE TABLE IF NOT EXISTS {}.complex_pk (a int, b int, c text, d int, e int, primary key ((a,b,c),d))", ks)) .await .unwrap(); @@ -401,15 +395,12 @@ async fn test_counter_batch() { create_ks += " AND TABLETS = {'enabled': false}" } - session.query_unpaged(create_ks, &[]).await.unwrap(); + session.ddl(create_ks).await.unwrap(); session - .query_unpaged( - format!( - "CREATE TABLE IF NOT EXISTS {}.t_batch (key int PRIMARY KEY, value counter)", - ks - ), - &[], - ) + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {}.t_batch (key int PRIMARY KEY, value counter)", + ks + )) .await .unwrap(); @@ -449,15 +440,12 @@ async fn test_batch() { let session = Arc::new(create_new_session_builder().build().await.unwrap()); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session - .query_unpaged( - format!( - "CREATE TABLE IF NOT EXISTS {}.t_batch (a int, b int, c text, primary key (a, b))", - ks - ), - &[], - ) + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {}.t_batch (a int, b int, c text, primary key (a, b))", + ks + )) .await .unwrap(); @@ -524,10 +512,10 @@ async fn test_batch() { // This statement flushes the prepared statement cache session - .query_unpaged( - format!("ALTER TABLE {}.t_batch WITH gc_grace_seconds = 42", ks), - &[], - ) + .ddl(format!( + "ALTER TABLE {}.t_batch WITH gc_grace_seconds = 42", + ks + )) .await .unwrap(); session.batch(&batch, values).await.unwrap(); @@ -555,12 +543,12 @@ async fn test_token_calculation() { let session = create_new_session_builder().build().await.unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session - .query_unpaged( - format!("CREATE TABLE IF NOT EXISTS {}.t3 (a text primary key)", ks), - &[], - ) + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {}.t3 (a text primary key)", + ks + )) .await .unwrap(); @@ -626,12 +614,12 @@ async fn test_token_awareness() { create_ks += " AND TABLETS = {'enabled': false}" } - session.query_unpaged(create_ks, &[]).await.unwrap(); + session.ddl(create_ks).await.unwrap(); session - .query_unpaged( - format!("CREATE TABLE IF NOT EXISTS {}.t (a text primary key)", ks), - &[], - ) + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {}.t (a text primary key)", + ks + )) .await .unwrap(); @@ -678,13 +666,13 @@ async fn test_use_keyspace() { let session = create_new_session_builder().build().await.unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session - .query_unpaged( - format!("CREATE TABLE IF NOT EXISTS {}.tab (a text primary key)", ks), - &[], - ) + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {}.tab (a text primary key)", + ks + )) .await .unwrap(); @@ -774,22 +762,22 @@ async fn test_use_keyspace_case_sensitivity() { let ks_lower = unique_keyspace_name().to_lowercase(); let ks_upper = ks_lower.to_uppercase(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS \"{}\" WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks_lower), &[]).await.unwrap(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS \"{}\" WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks_upper), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS \"{}\" WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks_lower)).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS \"{}\" WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks_upper)).await.unwrap(); session - .query_unpaged( - format!("CREATE TABLE {}.tab (a text primary key)", ks_lower), - &[], - ) + .ddl(format!( + "CREATE TABLE {}.tab (a text primary key)", + ks_lower + )) .await .unwrap(); session - .query_unpaged( - format!("CREATE TABLE \"{}\".tab (a text primary key)", ks_upper), - &[], - ) + .ddl(format!( + "CREATE TABLE \"{}\".tab (a text primary key)", + ks_upper + )) .await .unwrap(); @@ -850,13 +838,13 @@ async fn test_raw_use_keyspace() { let session = create_new_session_builder().build().await.unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session - .query_unpaged( - format!("CREATE TABLE IF NOT EXISTS {}.tab (a text primary key)", ks), - &[], - ) + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {}.tab (a text primary key)", + ks + )) .await .unwrap(); @@ -928,9 +916,9 @@ async fn test_db_errors() { )); // AlreadyExists when creating a keyspace for the second time - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); - let create_keyspace_res = session.query_unpaged(format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await; + let create_keyspace_res = session.ddl(format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await; let keyspace_exists_error: DbError = match create_keyspace_res { Err(QueryError::DbError(e, _)) => e, _ => panic!("Second CREATE KEYSPACE didn't return an error!"), @@ -946,15 +934,15 @@ async fn test_db_errors() { // AlreadyExists when creating a table for the second time session - .query_unpaged( - format!("CREATE TABLE IF NOT EXISTS {}.tab (a text primary key)", ks), - &[], - ) + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {}.tab (a text primary key)", + ks + )) .await .unwrap(); let create_table_res = session - .query_unpaged(format!("CREATE TABLE {}.tab (a text primary key)", ks), &[]) + .ddl(format!("CREATE TABLE {}.tab (a text primary key)", ks)) .await; let create_tab_error: DbError = match create_table_res { Err(QueryError::DbError(e, _)) => e, @@ -976,13 +964,13 @@ async fn test_tracing() { let session = create_new_session_builder().build().await.unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session - .query_unpaged( - format!("CREATE TABLE IF NOT EXISTS {}.tab (a text primary key)", ks), - &[], - ) + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {}.tab (a text primary key)", + ks + )) .await .unwrap(); @@ -1200,15 +1188,12 @@ async fn test_timestamp() { let session = create_new_session_builder().build().await.unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session - .query_unpaged( - format!( - "CREATE TABLE IF NOT EXISTS {}.t_timestamp (a text, b text, primary key (a))", - ks - ), - &[], - ) + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {}.t_timestamp (a text, b text, primary key (a))", + ks + )) .await .unwrap(); @@ -1469,7 +1454,7 @@ async fn test_schema_types_in_metadata() { let ks = unique_keyspace_name(); session - .query_unpaged(format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]) + .ddl(format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)) .await .unwrap(); @@ -1479,31 +1464,27 @@ async fn test_schema_types_in_metadata() { .unwrap(); session - .query_unpaged( + .ddl( "CREATE TYPE IF NOT EXISTS type_a ( a map>, text>, b frozen>, frozen>>> )", - &[], ) .await .unwrap(); session - .query_unpaged("CREATE TYPE IF NOT EXISTS type_b (a int, b text)", &[]) + .ddl("CREATE TYPE IF NOT EXISTS type_b (a int, b text)") .await .unwrap(); session - .query_unpaged( - "CREATE TYPE IF NOT EXISTS type_c (a map>, frozen>)", - &[], - ) + .ddl("CREATE TYPE IF NOT EXISTS type_c (a map>, frozen>)") .await .unwrap(); session - .query_unpaged( + .ddl( "CREATE TABLE IF NOT EXISTS table_a ( a frozen PRIMARY KEY, b type_b, @@ -1511,18 +1492,16 @@ async fn test_schema_types_in_metadata() { d map>>, e tuple )", - &[], ) .await .unwrap(); session - .query_unpaged( + .ddl( "CREATE TABLE IF NOT EXISTS table_b ( a text PRIMARY KEY, b frozen> )", - &[], ) .await .unwrap(); @@ -1628,7 +1607,7 @@ async fn test_user_defined_types_in_metadata() { let ks = unique_keyspace_name(); session - .query_unpaged(format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]) + .ddl(format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)) .await .unwrap(); @@ -1638,26 +1617,22 @@ async fn test_user_defined_types_in_metadata() { .unwrap(); session - .query_unpaged( + .ddl( "CREATE TYPE IF NOT EXISTS type_a ( a map>, text>, b frozen>, frozen>>> )", - &[], ) .await .unwrap(); session - .query_unpaged("CREATE TYPE IF NOT EXISTS type_b (a int, b text)", &[]) + .ddl("CREATE TYPE IF NOT EXISTS type_b (a int, b text)") .await .unwrap(); session - .query_unpaged( - "CREATE TYPE IF NOT EXISTS type_c (a map>, frozen>)", - &[], - ) + .ddl("CREATE TYPE IF NOT EXISTS type_c (a map>, frozen>)") .await .unwrap(); @@ -1692,7 +1667,7 @@ async fn test_column_kinds_in_metadata() { let ks = unique_keyspace_name(); session - .query_unpaged(format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]) + .ddl(format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)) .await .unwrap(); @@ -1702,7 +1677,7 @@ async fn test_column_kinds_in_metadata() { .unwrap(); session - .query_unpaged( + .ddl( "CREATE TABLE IF NOT EXISTS t ( a int, b int, @@ -1712,7 +1687,6 @@ async fn test_column_kinds_in_metadata() { f int, PRIMARY KEY ((c, e), b, a) )", - &[], ) .await .unwrap(); @@ -1738,7 +1712,7 @@ async fn test_primary_key_ordering_in_metadata() { let ks = unique_keyspace_name(); session - .query_unpaged(format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]) + .ddl(format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)) .await .unwrap(); @@ -1748,7 +1722,7 @@ async fn test_primary_key_ordering_in_metadata() { .unwrap(); session - .query_unpaged( + .ddl( "CREATE TABLE IF NOT EXISTS t ( a int, b int, @@ -1761,7 +1735,6 @@ async fn test_primary_key_ordering_in_metadata() { i int STATIC, PRIMARY KEY ((c, e), b, a) )", - &[], ) .await .unwrap(); @@ -1794,7 +1767,7 @@ async fn test_table_partitioner_in_metadata() { create_ks += " AND TABLETS = {'enabled': false}"; } - session.query_unpaged(create_ks, &[]).await.unwrap(); + session.ddl(create_ks).await.unwrap(); session .query_unpaged(format!("USE {}", ks), &[]) @@ -1802,9 +1775,8 @@ async fn test_table_partitioner_in_metadata() { .unwrap(); session - .query_unpaged( + .ddl( "CREATE TABLE t (pk int, ck int, v int, PRIMARY KEY (pk, ck, v))WITH cdc = {'enabled':true}", - &[], ) .await .unwrap(); @@ -1835,7 +1807,7 @@ async fn test_turning_off_schema_fetching() { let ks = unique_keyspace_name(); session - .query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]) + .ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)) .await .unwrap(); @@ -1845,31 +1817,27 @@ async fn test_turning_off_schema_fetching() { .unwrap(); session - .query_unpaged( + .ddl( "CREATE TYPE IF NOT EXISTS type_a ( a map>, text>, b frozen>, frozen>>> )", - &[], ) .await .unwrap(); session - .query_unpaged("CREATE TYPE IF NOT EXISTS type_b (a int, b text)", &[]) + .ddl("CREATE TYPE IF NOT EXISTS type_b (a int, b text)") .await .unwrap(); session - .query_unpaged( - "CREATE TYPE IF NOT EXISTS type_c (a map>, frozen>)", - &[], - ) + .ddl("CREATE TYPE IF NOT EXISTS type_c (a map>, frozen>)") .await .unwrap(); session - .query_unpaged( + .ddl( "CREATE TABLE IF NOT EXISTS table_a ( a frozen PRIMARY KEY, b type_b, @@ -1877,7 +1845,6 @@ async fn test_turning_off_schema_fetching() { d map>>, e tuple )", - &[], ) .await .unwrap(); @@ -1909,7 +1876,7 @@ async fn test_named_bind_markers() { let ks = unique_keyspace_name(); session - .query_unpaged(format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]) + .ddl(format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)) .await .unwrap(); @@ -1919,10 +1886,7 @@ async fn test_named_bind_markers() { .unwrap(); session - .query_unpaged( - "CREATE TABLE t (pk int, ck int, v int, PRIMARY KEY (pk, ck, v))", - &[], - ) + .ddl("CREATE TABLE t (pk int, ck int, v int, PRIMARY KEY (pk, ck, v))") .await .unwrap(); @@ -1974,11 +1938,11 @@ async fn test_prepared_partitioner() { create_ks += " AND TABLETS = {'enabled': false}" } - session.query_unpaged(create_ks, &[]).await.unwrap(); + session.ddl(create_ks).await.unwrap(); session.use_keyspace(ks, false).await.unwrap(); session - .query_unpaged("CREATE TABLE IF NOT EXISTS t1 (a int primary key)", &[]) + .ddl("CREATE TABLE IF NOT EXISTS t1 (a int primary key)") .await .unwrap(); @@ -2000,10 +1964,7 @@ async fn test_prepared_partitioner() { } session - .query_unpaged( - "CREATE TABLE IF NOT EXISTS t2 (a int primary key) WITH cdc = {'enabled':true}", - &[], - ) + .ddl("CREATE TABLE IF NOT EXISTS t2 (a int primary key) WITH cdc = {'enabled':true}") .await .unwrap(); @@ -2023,14 +1984,14 @@ async fn test_prepared_partitioner() { async fn rename(session: &Session, rename_str: &str) { session - .query_unpaged(format!("ALTER TABLE tab RENAME {}", rename_str), ()) + .ddl(format!("ALTER TABLE tab RENAME {}", rename_str)) .await .unwrap(); } async fn rename_caching(session: &CachingSession, rename_str: &str) { session - .execute_unpaged(format!("ALTER TABLE tab RENAME {}", rename_str), &()) + .ddl(format!("ALTER TABLE tab RENAME {}", rename_str)) .await .unwrap(); } @@ -2049,14 +2010,11 @@ async fn test_unprepared_reprepare_in_execute() { let session = create_new_session_builder().build().await.unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session.use_keyspace(ks, false).await.unwrap(); session - .query_unpaged( - "CREATE TABLE IF NOT EXISTS tab (a int, b int, c int, primary key (a, b, c))", - &[], - ) + .ddl("CREATE TABLE IF NOT EXISTS tab (a int, b int, c int, primary key (a, b, c))") .await .unwrap(); @@ -2112,14 +2070,11 @@ async fn test_unusual_valuelists() { let session = create_new_session_builder().build().await.unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session.use_keyspace(ks, false).await.unwrap(); session - .query_unpaged( - "CREATE TABLE IF NOT EXISTS tab (a int, b int, c varchar, primary key (a, b, c))", - &[], - ) + .ddl("CREATE TABLE IF NOT EXISTS tab (a int, b int, c varchar, primary key (a, b, c))") .await .unwrap(); @@ -2182,14 +2137,11 @@ async fn test_unprepared_reprepare_in_batch() { let session = create_new_session_builder().build().await.unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session.use_keyspace(ks, false).await.unwrap(); session - .query_unpaged( - "CREATE TABLE IF NOT EXISTS tab (a int, b int, c int, primary key (a, b, c))", - &[], - ) + .ddl("CREATE TABLE IF NOT EXISTS tab (a int, b int, c int, primary key (a, b, c))") .await .unwrap(); @@ -2249,16 +2201,13 @@ async fn test_unprepared_reprepare_in_caching_session_execute() { let session = create_new_session_builder().build().await.unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session.use_keyspace(ks, false).await.unwrap(); let caching_session: CachingSession = CachingSession::from(session, 64); caching_session - .execute_unpaged( - "CREATE TABLE IF NOT EXISTS tab (a int, b int, c int, primary key (a, b, c))", - &[], - ) + .ddl("CREATE TABLE IF NOT EXISTS tab (a int, b int, c int, primary key (a, b, c))") .await .unwrap(); @@ -2311,16 +2260,16 @@ async fn test_views_in_schema_info() { let session = create_new_session_builder().build().await.unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session.use_keyspace(ks.clone(), false).await.unwrap(); session - .query_unpaged("CREATE TABLE t(id int PRIMARY KEY, v int)", &[]) + .ddl("CREATE TABLE t(id int PRIMARY KEY, v int)") .await .unwrap(); - session.query_unpaged("CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM t WHERE v IS NOT NULL PRIMARY KEY (v, id)", &[]).await.unwrap(); - session.query_unpaged("CREATE MATERIALIZED VIEW mv2 AS SELECT id, v FROM t WHERE v IS NOT NULL PRIMARY KEY (v, id)", &[]).await.unwrap(); + session.ddl("CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM t WHERE v IS NOT NULL PRIMARY KEY (v, id)").await.unwrap(); + session.ddl("CREATE MATERIALIZED VIEW mv2 AS SELECT id, v FROM t WHERE v IS NOT NULL PRIMARY KEY (v, id)").await.unwrap(); session.await_schema_agreement().await.unwrap(); session.refresh_metadata().await.unwrap(); @@ -2384,14 +2333,11 @@ async fn test_prepare_batch() { let session = create_new_session_builder().build().await.unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session.use_keyspace(ks.clone(), false).await.unwrap(); session - .query_unpaged( - "CREATE TABLE test_batch_table (a int, b int, primary key (a, b))", - (), - ) + .ddl("CREATE TABLE test_batch_table (a int, b int, primary key (a, b))") .await .unwrap(); @@ -2481,14 +2427,11 @@ async fn test_refresh_metadata_after_schema_agreement() { let session = create_new_session_builder().build().await.unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session.use_keyspace(ks.clone(), false).await.unwrap(); session - .query_unpaged( - "CREATE TYPE udt (field1 int, field2 uuid, field3 text)", - &[], - ) + .ddl("CREATE TYPE udt (field1 int, field2 uuid, field3 text)") .await .unwrap(); @@ -2527,9 +2470,9 @@ async fn test_rate_limit_exceeded_exception() { } let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session.use_keyspace(ks.clone(), false).await.unwrap(); - session.query_unpaged("CREATE TABLE tbl (pk int PRIMARY KEY, v int) WITH per_partition_rate_limit = {'max_writes_per_second': 1}", ()).await.unwrap(); + session.ddl("CREATE TABLE tbl (pk int PRIMARY KEY, v int) WITH per_partition_rate_limit = {'max_writes_per_second': 1}").await.unwrap(); let stmt = session .prepare("INSERT INTO tbl (pk, v) VALUES (?, ?)") @@ -2571,14 +2514,11 @@ async fn test_batch_lwts() { if scylla_supports_tablets(&session).await { create_ks += " and TABLETS = { 'enabled': false}"; } - session.query_unpaged(create_ks, &[]).await.unwrap(); + session.ddl(create_ks).await.unwrap(); session.use_keyspace(ks.clone(), false).await.unwrap(); session - .query_unpaged( - "CREATE TABLE tab (p1 int, c1 int, r1 int, r2 int, primary key (p1, c1))", - (), - ) + .ddl("CREATE TABLE tab (p1 int, c1 int, r1 int, r2 int, primary key (p1, c1))") .await .unwrap(); @@ -2699,7 +2639,7 @@ async fn test_keyspaces_to_fetch() { let session_default = create_new_session_builder().build().await.unwrap(); for ks in [&ks1, &ks2] { session_default - .query_unpaged(format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]) + .ddl(format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)) .await .unwrap(); } @@ -2776,10 +2716,10 @@ async fn test_iter_works_when_retry_policy_returns_ignore_write_error() { if scylla_supports_tablets(&session).await { create_ks += " and TABLETS = { 'enabled': false}"; } - session.query_unpaged(create_ks, ()).await.unwrap(); + session.ddl(create_ks).await.unwrap(); session.use_keyspace(ks, true).await.unwrap(); session - .query_unpaged("CREATE TABLE t (pk int PRIMARY KEY, v int)", ()) + .ddl("CREATE TABLE t (pk int PRIMARY KEY, v int)") .await .unwrap(); @@ -2819,15 +2759,12 @@ async fn test_iter_methods_with_modification_statements() { let session = create_new_session_builder().build().await.unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session - .query_unpaged( - format!( - "CREATE TABLE IF NOT EXISTS {}.t (a int, b int, c text, primary key (a, b))", - ks - ), - &[], - ) + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {}.t (a int, b int, c text, primary key (a, b))", + ks + )) .await .unwrap(); @@ -2870,7 +2807,7 @@ async fn test_get_keyspace_name() { // No keyspace is set in config, so get_keyspace() should return None. let session = create_new_session_builder().build().await.unwrap(); assert_eq!(session.get_keyspace(), None); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); assert_eq!(session.get_keyspace(), None); // Call use_keyspace(), get_keyspace now should return the new keyspace name @@ -2896,25 +2833,19 @@ async fn simple_strategy_test() { let session = create_new_session_builder().build().await.unwrap(); session - .query_unpaged( - format!( - "CREATE KEYSPACE {} WITH REPLICATION = \ + .ddl(format!( + "CREATE KEYSPACE {} WITH REPLICATION = \ {{'class': 'SimpleStrategy', 'replication_factor': 1}}", - ks - ), - (), - ) + ks + )) .await .unwrap(); session - .query_unpaged( - format!( - "CREATE TABLE {}.tab (p int, c int, r int, PRIMARY KEY (p, c, r))", - ks - ), - (), - ) + .ddl(format!( + "CREATE TABLE {}.tab (p int, c int, r int, PRIMARY KEY (p, c, r))", + ks + )) .await .unwrap(); @@ -2961,7 +2892,7 @@ async fn test_manual_primary_key_computation() { // Setup session let ks = unique_keyspace_name(); let session = create_new_session_builder().build().await.unwrap(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session.use_keyspace(&ks, true).await.unwrap(); async fn assert_tokens_equal( @@ -2996,10 +2927,7 @@ async fn test_manual_primary_key_computation() { // Single-column partition key { session - .query_unpaged( - "CREATE TABLE IF NOT EXISTS t2 (a int, b int, c text, primary key (a, b))", - &[], - ) + .ddl("CREATE TABLE IF NOT EXISTS t2 (a int, b int, c text, primary key (a, b))") .await .unwrap(); @@ -3027,7 +2955,7 @@ async fn test_manual_primary_key_computation() { // Composite partition key { session - .query_unpaged("CREATE TABLE IF NOT EXISTS complex_pk (a int, b int, c text, d int, e int, primary key ((a,b,c),d))", &[]) + .ddl("CREATE TABLE IF NOT EXISTS complex_pk (a int, b int, c text, d int, e int, primary key ((a,b,c),d))") .await .unwrap(); @@ -3070,7 +2998,7 @@ async fn test_deserialize_empty_collections() { // Setup session. let ks = unique_keyspace_name(); let session = create_new_session_builder().build().await.unwrap(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session.use_keyspace(&ks, true).await.unwrap(); async fn deserialize_empty_collection< @@ -3086,7 +3014,7 @@ async fn test_deserialize_empty_collections() { "CREATE TABLE {} (n int primary key, c {}<{}>)", table_name, collection_name, collection_type_params ); - session.query_unpaged(query, ()).await.unwrap(); + session.ddl(query).await.unwrap(); // Populate the table with an empty collection, effectively inserting null as the collection. session diff --git a/scylla/src/utils/test_utils.rs b/scylla/src/utils/test_utils.rs index d745a31359..7e258c352c 100644 --- a/scylla/src/utils/test_utils.rs +++ b/scylla/src/utils/test_utils.rs @@ -1,5 +1,12 @@ +use crate::load_balancing::{FallbackPlan, LoadBalancingPolicy, RoutingInfo}; +use crate::query::Query; +use crate::routing::Shard; +use crate::transport::connection::Connection; +use crate::transport::errors::QueryError; use crate::transport::session_builder::{GenericSessionBuilder, SessionBuilderKind}; -use crate::Session; +use crate::transport::{ClusterData, NodeRef}; +use crate::{CachingSession, ExecutionProfile, Session}; +use std::sync::Arc; use std::{num::NonZeroU32, time::Duration}; use std::{ sync::atomic::{AtomicUsize, Ordering}, @@ -100,3 +107,80 @@ pub(crate) fn setup_tracing() { .with_writer(tracing_subscriber::fmt::TestWriter::new()) .try_init(); } + +// This LBP produces a predictable query plan - it order the nodes +// by position in the ring. +// This is to make sure that all DDL queries land on the same node, +// to prevent errors from concurrent DDL queries executed on different nodes. +#[derive(Debug)] +struct SchemaQueriesLBP; + +impl LoadBalancingPolicy for SchemaQueriesLBP { + fn pick<'a>( + &'a self, + _query: &'a RoutingInfo, + cluster: &'a ClusterData, + ) -> Option<(NodeRef<'a>, Option)> { + // I'm not sure if Scylla can handle concurrent DDL queries to different shard, + // in other words if its local lock is per-node or per shard. + // Just to be safe, let's use explicit shard. + cluster.get_nodes_info().first().map(|node| (node, Some(0))) + } + + fn fallback<'a>( + &'a self, + _query: &'a RoutingInfo, + cluster: &'a ClusterData, + ) -> FallbackPlan<'a> { + Box::new(cluster.get_nodes_info().iter().map(|node| (node, Some(0)))) + } + + fn name(&self) -> String { + "SchemaQueriesLBP".to_owned() + } +} + +fn apply_ddl_lbp(query: &mut Query) { + let policy = query + .get_execution_profile_handle() + .map(|profile| profile.pointee_to_builder()) + .unwrap_or(ExecutionProfile::builder()) + .load_balancing_policy(Arc::new(SchemaQueriesLBP)) + .build(); + query.set_execution_profile_handle(Some(policy.into_handle())); +} + +// This is just to make it easier to call the above function: +// we'll be able to do session.ddl(...) instead of perform_ddl(&session, ...) +// or something like that. +#[async_trait::async_trait] +pub(crate) trait PerformDDL { + async fn ddl(&self, query: impl Into + Send) -> Result<(), QueryError>; +} + +#[async_trait::async_trait] +impl PerformDDL for Session { + async fn ddl(&self, query: impl Into + Send) -> Result<(), QueryError> { + let mut query = query.into(); + apply_ddl_lbp(&mut query); + self.query_unpaged(query, &[]).await.map(|_| ()) + } +} + +#[async_trait::async_trait] +impl PerformDDL for CachingSession { + async fn ddl(&self, query: impl Into + Send) -> Result<(), QueryError> { + let mut query = query.into(); + apply_ddl_lbp(&mut query); + self.execute_unpaged(query, &[]).await.map(|_| ()) + } +} + +#[async_trait::async_trait] +impl PerformDDL for Connection { + async fn ddl(&self, query: impl Into + Send) -> Result<(), QueryError> { + let mut query = query.into(); + apply_ddl_lbp(&mut query); + self.query_unpaged(query).await.map(|_| ()) + } +} diff --git a/scylla/tests/integration/authenticate.rs b/scylla/tests/integration/authenticate.rs index babdb0d5c8..c2a6569cca 100644 --- a/scylla/tests/integration/authenticate.rs +++ b/scylla/tests/integration/authenticate.rs @@ -1,4 +1,4 @@ -use crate::utils::{setup_tracing, unique_keyspace_name}; +use crate::utils::{setup_tracing, unique_keyspace_name, PerformDDL}; use async_trait::async_trait; use bytes::{BufMut, BytesMut}; use scylla::authentication::{AuthError, AuthenticatorProvider, AuthenticatorSession}; @@ -20,12 +20,9 @@ async fn authenticate_superuser() { .unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session.use_keyspace(ks, false).await.unwrap(); - session - .query_unpaged("DROP TABLE IF EXISTS t;", &[]) - .await - .unwrap(); + session.ddl("DROP TABLE IF EXISTS t;").await.unwrap(); println!("Ok."); } @@ -79,12 +76,9 @@ async fn custom_authentication() { .unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session.use_keyspace(ks, false).await.unwrap(); - session - .query_unpaged("DROP TABLE IF EXISTS t;", &[]) - .await - .unwrap(); + session.ddl("DROP TABLE IF EXISTS t;").await.unwrap(); println!("Ok."); } diff --git a/scylla/tests/integration/consistency.rs b/scylla/tests/integration/consistency.rs index aee410f762..a503fb7d3b 100644 --- a/scylla/tests/integration/consistency.rs +++ b/scylla/tests/integration/consistency.rs @@ -1,6 +1,4 @@ -use crate::utils::{setup_tracing, test_with_3_node_cluster}; - -use crate::utils::unique_keyspace_name; +use crate::utils::{setup_tracing, test_with_3_node_cluster, unique_keyspace_name, PerformDDL}; use scylla::execution_profile::{ExecutionProfileBuilder, ExecutionProfileHandle}; use scylla::load_balancing::{DefaultPolicy, LoadBalancingPolicy, RoutingInfo}; use scylla::prepared_statement::PreparedStatement; @@ -60,10 +58,10 @@ const CREATE_TABLE_STR: &str = "CREATE TABLE consistency_tests (a int, b int, PR const QUERY_STR: &str = "INSERT INTO consistency_tests (a, b) VALUES (?, 1)"; async fn create_schema(session: &Session, ks: &str) { - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks)).await.unwrap(); session.use_keyspace(ks, false).await.unwrap(); - session.query_unpaged(CREATE_TABLE_STR, &[]).await.unwrap(); + session.ddl(CREATE_TABLE_STR).await.unwrap(); } // The following functions perform a request with consistencies set directly on a statement. diff --git a/scylla/tests/integration/cql_collections.rs b/scylla/tests/integration/cql_collections.rs index c8e783547f..a36d1bc16d 100644 --- a/scylla/tests/integration/cql_collections.rs +++ b/scylla/tests/integration/cql_collections.rs @@ -1,5 +1,6 @@ use crate::utils::{ create_new_session_builder, setup_tracing, unique_keyspace_name, DeserializeOwnedValue, + PerformDDL, }; use scylla::frame::response::result::CqlValue; use scylla::Session; @@ -9,7 +10,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; async fn connect() -> Session { let session = create_new_session_builder().build().await.unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session.use_keyspace(ks, false).await.unwrap(); session @@ -17,13 +18,10 @@ async fn connect() -> Session { async fn create_table(session: &Session, table_name: &str, value_type: &str) { session - .query_unpaged( - format!( - "CREATE TABLE IF NOT EXISTS {} (p int PRIMARY KEY, val {})", - table_name, value_type - ), - (), - ) + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {} (p int PRIMARY KEY, val {})", + table_name, value_type + )) .await .unwrap(); } diff --git a/scylla/tests/integration/cql_types.rs b/scylla/tests/integration/cql_types.rs index 62c1b8f5ff..1125914283 100644 --- a/scylla/tests/integration/cql_types.rs +++ b/scylla/tests/integration/cql_types.rs @@ -11,7 +11,7 @@ use std::str::FromStr; use crate::utils::{ create_new_session_builder, scylla_supports_tablets, setup_tracing, unique_keyspace_name, - DeserializeOwnedValue, + DeserializeOwnedValue, PerformDDL, }; // Used to prepare a table for test @@ -35,22 +35,19 @@ async fn init_test_maybe_without_tablets( create_ks += " AND TABLETS = {'enabled': false}" } - session.query_unpaged(create_ks, &[]).await.unwrap(); + session.ddl(create_ks).await.unwrap(); session.use_keyspace(ks, false).await.unwrap(); session - .query_unpaged(format!("DROP TABLE IF EXISTS {}", table_name), &[]) + .ddl(format!("DROP TABLE IF EXISTS {}", table_name)) .await .unwrap(); session - .query_unpaged( - format!( - "CREATE TABLE IF NOT EXISTS {} (id int PRIMARY KEY, val {})", - table_name, type_name - ), - &[], - ) + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {} (id int PRIMARY KEY, val {})", + table_name, type_name + )) .await .unwrap(); @@ -173,26 +170,20 @@ async fn test_cql_varint() { let ks = unique_keyspace_name(); session - .query_unpaged( - format!( - "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = \ + .ddl(format!( + "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = \ {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", - ks - ), - &[], - ) + ks + )) .await .unwrap(); session.use_keyspace(ks, false).await.unwrap(); session - .query_unpaged( - format!( - "CREATE TABLE IF NOT EXISTS {} (id int PRIMARY KEY, val varint)", - table_name - ), - &[], - ) + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {} (id int PRIMARY KEY, val varint)", + table_name + )) .await .unwrap(); @@ -1285,23 +1276,17 @@ async fn test_timeuuid_ordering() { let ks = unique_keyspace_name(); session - .query_unpaged( - format!( - "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = \ + .ddl(format!( + "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = \ {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", - ks - ), - &[], - ) + ks + )) .await .unwrap(); session.use_keyspace(ks, false).await.unwrap(); session - .query_unpaged( - "CREATE TABLE tab (p int, t timeuuid, PRIMARY KEY (p, t))", - (), - ) + .ddl("CREATE TABLE tab (p int, t timeuuid, PRIMARY KEY (p, t))") .await .unwrap(); @@ -1527,47 +1512,38 @@ async fn test_udt_after_schema_update() { let ks = unique_keyspace_name(); session - .query_unpaged( - format!( - "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = \ + .ddl(format!( + "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = \ {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", - ks - ), - &[], - ) + ks + )) .await .unwrap(); session.use_keyspace(ks, false).await.unwrap(); session - .query_unpaged(format!("DROP TABLE IF EXISTS {}", table_name), &[]) + .ddl(format!("DROP TABLE IF EXISTS {}", table_name)) .await .unwrap(); session - .query_unpaged(format!("DROP TYPE IF EXISTS {}", type_name), &[]) + .ddl(format!("DROP TYPE IF EXISTS {}", type_name)) .await .unwrap(); session - .query_unpaged( - format!( - "CREATE TYPE IF NOT EXISTS {} (first int, second boolean)", - type_name - ), - &[], - ) + .ddl(format!( + "CREATE TYPE IF NOT EXISTS {} (first int, second boolean)", + type_name + )) .await .unwrap(); session - .query_unpaged( - format!( - "CREATE TABLE IF NOT EXISTS {} (id int PRIMARY KEY, val {})", - table_name, type_name - ), - &[], - ) + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {} (id int PRIMARY KEY, val {})", + table_name, type_name + )) .await .unwrap(); @@ -1624,7 +1600,7 @@ async fn test_udt_after_schema_update() { assert_eq!(read_udt, v1); session - .query_unpaged(format!("ALTER TYPE {} ADD third text;", type_name), &[]) + .ddl(format!("ALTER TYPE {} ADD third text;", type_name)) .await .unwrap(); @@ -1708,47 +1684,38 @@ async fn test_udt_with_missing_field() { let ks = unique_keyspace_name(); session - .query_unpaged( - format!( - "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = \ + .ddl(format!( + "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = \ {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", - ks - ), - &[], - ) + ks + )) .await .unwrap(); session.use_keyspace(ks, false).await.unwrap(); session - .query_unpaged(format!("DROP TABLE IF EXISTS {}", table_name), &[]) + .ddl(format!("DROP TABLE IF EXISTS {}", table_name)) .await .unwrap(); session - .query_unpaged(format!("DROP TYPE IF EXISTS {}", type_name), &[]) + .ddl(format!("DROP TYPE IF EXISTS {}", type_name)) .await .unwrap(); session - .query_unpaged( - format!( - "CREATE TYPE IF NOT EXISTS {} (first int, second boolean, third float, fourth blob)", - type_name - ), - &[], - ) + .ddl(format!( + "CREATE TYPE IF NOT EXISTS {} (first int, second boolean, third float, fourth blob)", + type_name + )) .await .unwrap(); session - .query_unpaged( - format!( - "CREATE TABLE IF NOT EXISTS {} (id int PRIMARY KEY, val {})", - table_name, type_name - ), - &[], - ) + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {} (id int PRIMARY KEY, val {})", + table_name, type_name + )) .await .unwrap(); diff --git a/scylla/tests/integration/cql_value.rs b/scylla/tests/integration/cql_value.rs index c0e55562d8..d0648b1472 100644 --- a/scylla/tests/integration/cql_value.rs +++ b/scylla/tests/integration/cql_value.rs @@ -4,7 +4,7 @@ use scylla::frame::response::result::CqlValue; use scylla::frame::value::CqlDuration; use scylla::Session; -use crate::utils::{create_new_session_builder, setup_tracing, unique_keyspace_name}; +use crate::utils::{create_new_session_builder, setup_tracing, unique_keyspace_name, PerformDDL}; #[tokio::test] async fn test_cqlvalue_udt() { @@ -12,32 +12,20 @@ async fn test_cqlvalue_udt() { let session: Session = create_new_session_builder().build().await.unwrap(); let ks = unique_keyspace_name(); session - .query_unpaged( - format!( - "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = \ + .ddl(format!( + "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = \ {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", - ks - ), - &[], - ) + ks + )) .await .unwrap(); session.use_keyspace(&ks, false).await.unwrap(); session - .query_unpaged( - "CREATE TYPE IF NOT EXISTS cqlvalue_udt_type (int_val int, text_val text)", - &[], - ) - .await - .unwrap(); - session - .query_unpaged( - "CREATE TABLE IF NOT EXISTS cqlvalue_udt_test (k int, my cqlvalue_udt_type, primary key (k))", - &[], - ) + .ddl("CREATE TYPE IF NOT EXISTS cqlvalue_udt_type (int_val int, text_val text)") .await .unwrap(); + session.ddl("CREATE TABLE IF NOT EXISTS cqlvalue_udt_test (k int, my cqlvalue_udt_type, primary key (k))").await.unwrap(); let udt_cql_value = CqlValue::UserDefinedType { keyspace: ks, @@ -75,14 +63,11 @@ async fn test_cqlvalue_duration() { let ks = unique_keyspace_name(); session - .query_unpaged( - format!( - "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = \ + .ddl(format!( + "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = \ {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", - ks - ), - &[], - ) + ks + )) .await .unwrap(); session.use_keyspace(&ks, false).await.unwrap(); @@ -93,12 +78,24 @@ async fn test_cqlvalue_duration() { nanoseconds: 21372137, }); + session.ddl("CREATE TABLE IF NOT EXISTS cqlvalue_duration_test (pk int, ck int, v duration, primary key (pk, ck))").await.unwrap(); let fixture_queries = vec![ - ("CREATE TABLE IF NOT EXISTS cqlvalue_duration_test (pk int, ck int, v duration, primary key (pk, ck))", vec![],), - ("INSERT INTO cqlvalue_duration_test (pk, ck, v) VALUES (0, 0, ?)", vec![&duration_cql_value,],), - ("INSERT INTO cqlvalue_duration_test (pk, ck, v) VALUES (0, 1, 89h4m48s)", vec![],), - ("INSERT INTO cqlvalue_duration_test (pk, ck, v) VALUES (0, 2, PT89H8M53S)", vec![],), - ("INSERT INTO cqlvalue_duration_test (pk, ck, v) VALUES (0, 3, P0000-00-00T89:09:09)", vec![],), + ( + "INSERT INTO cqlvalue_duration_test (pk, ck, v) VALUES (0, 0, ?)", + vec![&duration_cql_value], + ), + ( + "INSERT INTO cqlvalue_duration_test (pk, ck, v) VALUES (0, 1, 89h4m48s)", + vec![], + ), + ( + "INSERT INTO cqlvalue_duration_test (pk, ck, v) VALUES (0, 2, PT89H8M53S)", + vec![], + ), + ( + "INSERT INTO cqlvalue_duration_test (pk, ck, v) VALUES (0, 3, P0000-00-00T89:09:09)", + vec![], + ), ]; for query in fixture_queries { diff --git a/scylla/tests/integration/execution_profiles.rs b/scylla/tests/integration/execution_profiles.rs index cdfde1c32c..dd58cdbae0 100644 --- a/scylla/tests/integration/execution_profiles.rs +++ b/scylla/tests/integration/execution_profiles.rs @@ -1,7 +1,7 @@ use std::ops::Deref; use std::sync::Arc; -use crate::utils::{setup_tracing, test_with_3_node_cluster, unique_keyspace_name}; +use crate::utils::{setup_tracing, test_with_3_node_cluster, unique_keyspace_name, PerformDDL}; use assert_matches::assert_matches; use scylla::batch::BatchStatement; use scylla::batch::{Batch, BatchType}; @@ -164,14 +164,13 @@ async fn test_execution_profiles() { let ks = unique_keyspace_name(); /* Prepare schema */ - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks)).await.unwrap(); session - .query_unpaged( + .ddl( format!( "CREATE TABLE IF NOT EXISTS {}.t (a int, b int, c text, primary key (a, b))", ks ), - &[], ) .await .unwrap(); diff --git a/scylla/tests/integration/history.rs b/scylla/tests/integration/history.rs index 1d4f89df8d..1bbd21e024 100644 --- a/scylla/tests/integration/history.rs +++ b/scylla/tests/integration/history.rs @@ -10,7 +10,7 @@ use scylla::history::{ use scylla::query::Query; use scylla::transport::errors::QueryError; -use crate::utils::{create_new_session_builder, setup_tracing, unique_keyspace_name}; +use crate::utils::{create_new_session_builder, setup_tracing, unique_keyspace_name, PerformDDL}; // Set a single time for all timestamps within StructuredHistory. // HistoryCollector sets the timestamp to current time which changes with each test. @@ -211,13 +211,13 @@ async fn iterator_query_history() { let session = create_new_session_builder().build().await.unwrap(); let ks = unique_keyspace_name(); session - .query_unpaged(format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]) + .ddl(format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)) .await .unwrap(); session.use_keyspace(ks, true).await.unwrap(); session - .query_unpaged("CREATE TABLE t (p int primary key)", ()) + .ddl("CREATE TABLE t (p int primary key)") .await .unwrap(); for i in 0..32 { diff --git a/scylla/tests/integration/large_batch_statements.rs b/scylla/tests/integration/large_batch_statements.rs index 3d6757165b..724d8c9496 100644 --- a/scylla/tests/integration/large_batch_statements.rs +++ b/scylla/tests/integration/large_batch_statements.rs @@ -1,6 +1,6 @@ use assert_matches::assert_matches; -use crate::utils::{create_new_session_builder, setup_tracing, unique_keyspace_name}; +use crate::utils::{create_new_session_builder, setup_tracing, unique_keyspace_name, PerformDDL}; use scylla::batch::Batch; use scylla::batch::BatchType; use scylla::query::Query; @@ -30,19 +30,15 @@ async fn test_large_batch_statements() { async fn create_test_session(session: Session, ks: &String) -> Session { session - .query_unpaged( + .ddl( format!("CREATE KEYSPACE {} WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }}",ks), - &[], ) .await.unwrap(); session - .query_unpaged( - format!( - "CREATE TABLE {}.pairs (dummy int, k blob, v blob, primary key (dummy, k))", - ks - ), - &[], - ) + .ddl(format!( + "CREATE TABLE {}.pairs (dummy int, k blob, v blob, primary key (dummy, k))", + ks + )) .await .unwrap(); session diff --git a/scylla/tests/integration/lwt_optimisation.rs b/scylla/tests/integration/lwt_optimisation.rs index 9fd6f073f0..466120bce2 100644 --- a/scylla/tests/integration/lwt_optimisation.rs +++ b/scylla/tests/integration/lwt_optimisation.rs @@ -1,5 +1,6 @@ use crate::utils::{ scylla_supports_tablets, setup_tracing, test_with_3_node_cluster, unique_keyspace_name, + PerformDDL, }; use scylla::retry_policy::FallthroughRetryPolicy; use scylla::transport::session::Session; @@ -73,11 +74,11 @@ async fn if_lwt_optimisation_mark_offered_then_negotiatied_and_lwt_routed_optima if scylla_supports_tablets(&session).await { create_ks += " and TABLETS = { 'enabled': false}"; } - session.query_unpaged(create_ks, &[]).await.unwrap(); + session.ddl(create_ks).await.unwrap(); session.use_keyspace(ks, false).await.unwrap(); session - .query_unpaged("CREATE TABLE t (a int primary key, b int)", &[]) + .ddl("CREATE TABLE t (a int primary key, b int)") .await .unwrap(); diff --git a/scylla/tests/integration/retries.rs b/scylla/tests/integration/retries.rs index a21de6d619..7c7d35c4d4 100644 --- a/scylla/tests/integration/retries.rs +++ b/scylla/tests/integration/retries.rs @@ -1,4 +1,4 @@ -use crate::utils::{setup_tracing, test_with_3_node_cluster, unique_keyspace_name}; +use crate::utils::{setup_tracing, test_with_3_node_cluster, unique_keyspace_name, PerformDDL}; use scylla::query::Query; use scylla::retry_policy::FallthroughRetryPolicy; use scylla::speculative_execution::SimpleSpeculativeExecutionPolicy; @@ -36,10 +36,10 @@ async fn speculative_execution_is_fired() { .unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks)).await.unwrap(); session.use_keyspace(ks, false).await.unwrap(); session - .query_unpaged("CREATE TABLE t (a int primary key)", &[]) + .ddl("CREATE TABLE t (a int primary key)") .await .unwrap(); @@ -112,10 +112,10 @@ async fn retries_occur() { .unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks)).await.unwrap(); session.use_keyspace(ks, false).await.unwrap(); session - .query_unpaged("CREATE TABLE t (a int primary key)", &[]) + .ddl("CREATE TABLE t (a int primary key)") .await .unwrap(); @@ -192,10 +192,10 @@ async fn speculative_execution_panic_regression_test() { .unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks)).await.unwrap(); session.use_keyspace(ks, false).await.unwrap(); session - .query_unpaged("CREATE TABLE t (a int primary key)", &[]) + .ddl("CREATE TABLE t (a int primary key)") .await .unwrap(); diff --git a/scylla/tests/integration/shards.rs b/scylla/tests/integration/shards.rs index 17f3c1ceb0..6c1f8fdb8f 100644 --- a/scylla/tests/integration/shards.rs +++ b/scylla/tests/integration/shards.rs @@ -1,7 +1,9 @@ +use std::collections::HashSet; use std::sync::Arc; use crate::utils::{ scylla_supports_tablets, setup_tracing, test_with_3_node_cluster, unique_keyspace_name, + PerformDDL, }; use scylla::SessionBuilder; use tokio::sync::mpsc; @@ -17,7 +19,6 @@ use scylla_proxy::{ProxyError, RequestFrame, WorkerError}; #[cfg(not(scylla_cloud_tests))] async fn test_consistent_shard_awareness() { setup_tracing(); - use std::collections::HashSet; let res = test_with_3_node_cluster(ShardAwareness::QueryNode, |proxy_uris, translation_map, mut running_proxy| async move { @@ -43,14 +44,13 @@ async fn test_consistent_shard_awareness() { if scylla_supports_tablets(&session).await { create_ks += " and TABLETS = { 'enabled': false}"; } - session.query_unpaged(create_ks, &[]).await.unwrap(); + session.ddl(create_ks).await.unwrap(); session - .query_unpaged( + .ddl( format!( "CREATE TABLE IF NOT EXISTS {}.t (a int, b int, c text, primary key (a, b))", ks ), - &[], ) .await .unwrap(); diff --git a/scylla/tests/integration/silent_prepare_batch.rs b/scylla/tests/integration/silent_prepare_batch.rs index 3f86ae09f1..b510e04626 100644 --- a/scylla/tests/integration/silent_prepare_batch.rs +++ b/scylla/tests/integration/silent_prepare_batch.rs @@ -1,4 +1,4 @@ -use crate::utils::{create_new_session_builder, setup_tracing, unique_keyspace_name}; +use crate::utils::{create_new_session_builder, setup_tracing, unique_keyspace_name, PerformDDL}; use scylla::batch::Batch; use scylla::prepared_statement::PreparedStatement; use scylla::Session; @@ -11,14 +11,11 @@ async fn test_quietly_prepare_batch() { let session = create_new_session_builder().build().await.unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session.use_keyspace(ks.clone(), false).await.unwrap(); session - .query_unpaged( - "CREATE TABLE test_batch_table (a int, b int, primary key (a, b))", - (), - ) + .ddl("CREATE TABLE test_batch_table (a int, b int, primary key (a, b))") .await .unwrap(); diff --git a/scylla/tests/integration/silent_prepare_query.rs b/scylla/tests/integration/silent_prepare_query.rs index ed259914ef..477b633862 100644 --- a/scylla/tests/integration/silent_prepare_query.rs +++ b/scylla/tests/integration/silent_prepare_query.rs @@ -1,4 +1,4 @@ -use crate::utils::{setup_tracing, test_with_3_node_cluster, unique_keyspace_name}; +use crate::utils::{setup_tracing, test_with_3_node_cluster, unique_keyspace_name, PerformDDL}; use scylla::query::Query; use scylla::Session; use scylla::SessionBuilder; @@ -27,10 +27,10 @@ async fn test_prepare_query_with_values() { .unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks)).await.unwrap(); session.use_keyspace(ks, false).await.unwrap(); session - .query_unpaged("CREATE TABLE t (a int primary key)", &[]) + .ddl("CREATE TABLE t (a int primary key)") .await .unwrap(); @@ -78,10 +78,10 @@ async fn test_query_with_no_values() { .unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks)).await.unwrap(); session.use_keyspace(ks, false).await.unwrap(); session - .query_unpaged("CREATE TABLE t (a int primary key)", &[]) + .ddl("CREATE TABLE t (a int primary key)") .await .unwrap(); diff --git a/scylla/tests/integration/skip_metadata_optimization.rs b/scylla/tests/integration/skip_metadata_optimization.rs index 1260eda326..eb8ff8520a 100644 --- a/scylla/tests/integration/skip_metadata_optimization.rs +++ b/scylla/tests/integration/skip_metadata_optimization.rs @@ -1,4 +1,4 @@ -use crate::utils::{setup_tracing, test_with_3_node_cluster, unique_keyspace_name}; +use crate::utils::{setup_tracing, test_with_3_node_cluster, unique_keyspace_name, PerformDDL}; use scylla::prepared_statement::PreparedStatement; use scylla::{Session, SessionBuilder}; use scylla_cql::frame::request::query::{PagingState, PagingStateResponse}; @@ -27,10 +27,10 @@ async fn test_skip_result_metadata() { .unwrap(); let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks)).await.unwrap(); session.use_keyspace(ks, false).await.unwrap(); session - .query_unpaged("CREATE TABLE t (a int primary key, b int, c text)", &[]) + .ddl("CREATE TABLE t (a int primary key, b int, c text)") .await .unwrap(); session.query_unpaged("INSERT INTO t (a, b, c) VALUES (1, 2, 'foo_filter_data')", &[]).await.unwrap(); @@ -82,14 +82,13 @@ async fn test_skip_result_metadata() { { let ks = unique_keyspace_name(); - session.query_unpaged(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap(); session.use_keyspace(ks, true).await.unwrap(); type RowT = (i32, i32, String); session - .query_unpaged( + .ddl( "CREATE TABLE IF NOT EXISTS t2 (a int, b int, c text, primary key (a, b))", - &[], ) .await .unwrap(); diff --git a/scylla/tests/integration/tablets.rs b/scylla/tests/integration/tablets.rs index 446ac170e1..fd56c7d939 100644 --- a/scylla/tests/integration/tablets.rs +++ b/scylla/tests/integration/tablets.rs @@ -1,9 +1,9 @@ use std::sync::Arc; -use crate::utils::scylla_supports_tablets; -use crate::utils::setup_tracing; -use crate::utils::test_with_3_node_cluster; -use crate::utils::unique_keyspace_name; +use crate::utils::{ + scylla_supports_tablets, setup_tracing, test_with_3_node_cluster, unique_keyspace_name, + PerformDDL, +}; use futures::future::try_join_all; use futures::TryStreamExt; @@ -252,25 +252,19 @@ fn count_tablet_feedbacks( async fn prepare_schema(session: &Session, ks: &str, table: &str, tablet_count: usize) { session - .query_unpaged( - format!( - "CREATE KEYSPACE IF NOT EXISTS {} + .ddl(format!( + "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 2}} AND tablets = {{ 'initial': {} }}", - ks, tablet_count - ), - &[], - ) + ks, tablet_count + )) .await .unwrap(); session - .query_unpaged( - format!( - "CREATE TABLE IF NOT EXISTS {}.{} (a int, b int, c text, primary key (a, b))", - ks, table - ), - &[], - ) + .ddl(format!( + "CREATE TABLE IF NOT EXISTS {}.{} (a int, b int, c text, primary key (a, b))", + ks, table + )) .await .unwrap(); } diff --git a/scylla/tests/integration/utils.rs b/scylla/tests/integration/utils.rs index 8e187cbe8e..07d2079745 100644 --- a/scylla/tests/integration/utils.rs +++ b/scylla/tests/integration/utils.rs @@ -1,13 +1,19 @@ use futures::Future; use scylla::deserialize::DeserializeValue; +use scylla::load_balancing::{FallbackPlan, LoadBalancingPolicy, RoutingInfo}; +use scylla::query::Query; +use scylla::routing::Shard; +use scylla::transport::errors::QueryError; use scylla::transport::session_builder::{GenericSessionBuilder, SessionBuilderKind}; -use scylla::Session; +use scylla::transport::{ClusterData, NodeRef}; +use scylla::{ExecutionProfile, Session}; use std::collections::HashMap; use std::env; use std::net::SocketAddr; use std::num::NonZeroU32; use std::str::FromStr; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use scylla_proxy::{Node, Proxy, ProxyError, RunningProxy, ShardAwareness}; @@ -167,3 +173,62 @@ impl DeserializeOwnedValue for T where T: for<'frame, 'metadata> DeserializeValue<'frame, 'metadata> { } + +// This LBP produces a predictable query plan - it order the nodes +// by position in the ring. +// This is to make sure that all DDL queries land on the same node, +// to prevent errors from concurrent DDL queries executed on different nodes. +#[derive(Debug)] +struct SchemaQueriesLBP; + +impl LoadBalancingPolicy for SchemaQueriesLBP { + fn pick<'a>( + &'a self, + _query: &'a RoutingInfo, + cluster: &'a ClusterData, + ) -> Option<(NodeRef<'a>, Option)> { + // I'm not sure if Scylla can handle concurrent DDL queries to different shard, + // in other words if its local lock is per-node or per shard. + // Just to be safe, let's use explicit shard. + cluster.get_nodes_info().first().map(|node| (node, Some(0))) + } + + fn fallback<'a>( + &'a self, + _query: &'a RoutingInfo, + cluster: &'a ClusterData, + ) -> FallbackPlan<'a> { + Box::new(cluster.get_nodes_info().iter().map(|node| (node, Some(0)))) + } + + fn name(&self) -> String { + "SchemaQueriesLBP".to_owned() + } +} + +fn apply_ddl_lbp(query: &mut Query) { + let policy = query + .get_execution_profile_handle() + .map(|profile| profile.pointee_to_builder()) + .unwrap_or(ExecutionProfile::builder()) + .load_balancing_policy(Arc::new(SchemaQueriesLBP)) + .build(); + query.set_execution_profile_handle(Some(policy.into_handle())); +} + +// This is just to make it easier to call the above function: +// we'll be able to do session.ddl(...) instead of perform_ddl(&session, ...) +// or something like that. +#[async_trait::async_trait] +pub(crate) trait PerformDDL { + async fn ddl(&self, query: impl Into + Send) -> Result<(), QueryError>; +} + +#[async_trait::async_trait] +impl PerformDDL for Session { + async fn ddl(&self, query: impl Into + Send) -> Result<(), QueryError> { + let mut query = query.into(); + apply_ddl_lbp(&mut query); + self.query_unpaged(query, &[]).await.map(|_| ()) + } +}