Skip to content

Commit

Permalink
Fix another intermittent cluster failure
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Aug 9, 2022
1 parent ee70553 commit 0d5ec99
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 40 deletions.
8 changes: 4 additions & 4 deletions shotover-proxy/tests/cassandra_int_tests/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ fn drop_function(session: &CassandraConnection) {
);
}

async fn create_function(session: &CassandraConnection, direct_connections: &SchemaAwaiter) {
async fn create_function(session: &CassandraConnection, schema_awaiter: &SchemaAwaiter) {
run_query(
session,
"CREATE FUNCTION test_function_keyspace.my_function (a int, b int) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE javascript AS 'a * b';",
);
direct_connections.await_schema_agreement().await;
schema_awaiter.await_schema_agreement().await;
assert_query_result(
session,
"SELECT test_function_keyspace.my_function(x, y) FROM test_function_keyspace.test_function_table;",
&[&[ResultValue::Int(4)], &[ResultValue::Int(9)], &[ResultValue::Int(16)]]
);
}

pub async fn test(session: &CassandraConnection, direct_connections: &SchemaAwaiter) {
pub async fn test(session: &CassandraConnection, schema_awaiter: &SchemaAwaiter) {
run_query(
session,
"CREATE KEYSPACE test_function_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };"
Expand All @@ -49,6 +49,6 @@ INSERT INTO test_function_keyspace.test_function_table (id, x, y) VALUES (3, 4,
APPLY BATCH;"#,
);

create_function(session, direct_connections).await;
create_function(session, schema_awaiter).await;
drop_function(session);
}
52 changes: 30 additions & 22 deletions shotover-proxy/tests/cassandra_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,18 @@ async fn test_passthrough() {
let schema_awaiter = SchemaAwaiter::new("127.0.0.1:9043").await;

keyspace::test(&connection);
table::test(&connection);
udt::test(&connection);
table::test(&connection, &schema_awaiter).await;
udt::test(&connection, &schema_awaiter).await;
native_types::test(&connection);
collections::test(&connection);
functions::test(&connection, &schema_awaiter).await;
prepared_statements::test(&connection);
batch_statements::test(&connection);
}

#[test]
#[tokio::test(flavor = "multi_thread")]
#[serial]
fn test_source_tls_and_single_tls() {
async fn test_source_tls_and_single_tls() {
test_helpers::cert::generate_cassandra_test_certs();
let _compose = DockerCompose::new("example-configs/cassandra-tls/docker-compose.yml");

Expand All @@ -75,12 +75,14 @@ fn test_source_tls_and_single_tls() {
}

let connection = shotover_manager.cassandra_connection_tls("127.0.0.1", 9043, ca_cert);
let schema_awaiter = SchemaAwaiter::new_noop();

keyspace::test(&connection);
table::test(&connection);
udt::test(&connection);
table::test(&connection, &schema_awaiter).await;
udt::test(&connection, &schema_awaiter).await;
native_types::test(&connection);
collections::test(&connection);
functions::test(&connection, &schema_awaiter).await;
prepared_statements::test(&connection);
batch_statements::test(&connection);
}
Expand All @@ -99,8 +101,8 @@ async fn test_cluster() {
let schema_awaiter = SchemaAwaiter::new("172.16.1.2:9042").await;
// TODO: uncomment once we implement `USE` routing
//keyspace::test(&connection1);
table::test(&connection1);
udt::test(&connection1);
table::test(&connection1, &schema_awaiter).await;
udt::test(&connection1, &schema_awaiter).await;
native_types::test(&connection1);
collections::test(&connection1);
functions::test(&connection1, &schema_awaiter).await;
Expand All @@ -115,10 +117,10 @@ async fn test_cluster() {
cluster::test().await;
}

#[test]
#[tokio::test(flavor = "multi_thread")]
#[serial]
#[cfg(feature = "alpha-transforms")]
fn test_source_tls_and_cluster_tls() {
async fn test_source_tls_and_cluster_tls() {
test_helpers::cert::generate_cassandra_test_certs();
let _compose = DockerCompose::new("example-configs/cassandra-cluster-tls/docker-compose.yml");

Expand All @@ -139,11 +141,12 @@ fn test_source_tls_and_cluster_tls() {
}

let connection = shotover_manager.cassandra_connection_tls("127.0.0.1", 9042, ca_cert);
let schema_awaiter = SchemaAwaiter::new_noop();

// TODO: uncomment once we implement `USE` routing
//keyspace::test(&connection);
table::test(&connection);
udt::test(&connection);
table::test(&connection, &schema_awaiter).await;
udt::test(&connection, &schema_awaiter).await;
native_types::test(&connection);
collections::test(&connection);
prepared_statements::test(&connection);
Expand All @@ -167,8 +170,8 @@ async fn test_cassandra_redis_cache() {
let schema_awaiter = SchemaAwaiter::new("127.0.0.1:9043").await;

keyspace::test(&connection);
table::test(&connection);
udt::test(&connection);
table::test(&connection, &schema_awaiter).await;
udt::test(&connection, &schema_awaiter).await;
functions::test(&connection, &schema_awaiter).await;
prepared_statements::test(&connection);
batch_statements::test(&connection);
Expand All @@ -190,8 +193,8 @@ async fn test_cassandra_protect_transform_local() {
let schema_awaiter = SchemaAwaiter::new("127.0.0.1:9043").await;

keyspace::test(&shotover_connection);
table::test(&shotover_connection);
udt::test(&shotover_connection);
table::test(&shotover_connection, &schema_awaiter).await;
udt::test(&shotover_connection, &schema_awaiter).await;
native_types::test(&shotover_connection);
collections::test(&shotover_connection);
functions::test(&shotover_connection, &schema_awaiter).await;
Expand All @@ -214,18 +217,18 @@ async fn test_cassandra_protect_transform_aws() {
let schema_awaiter = SchemaAwaiter::new("127.0.0.1:9043").await;

keyspace::test(&shotover_connection);
table::test(&shotover_connection);
udt::test(&shotover_connection);
table::test(&shotover_connection, &schema_awaiter).await;
udt::test(&shotover_connection, &schema_awaiter).await;
native_types::test(&shotover_connection);
collections::test(&shotover_connection);
functions::test(&shotover_connection, &schema_awaiter).await;
batch_statements::test(&shotover_connection);
protect::test(&shotover_connection, &direct_connection);
}

#[test]
#[tokio::test(flavor = "multi_thread")]
#[serial]
fn test_cassandra_peers_rewrite_cassandra4() {
async fn test_cassandra_peers_rewrite_cassandra4() {
let _docker_compose = DockerCompose::new(
"tests/test-configs/cassandra-peers-rewrite/docker-compose-4.0-cassandra.yaml",
);
Expand All @@ -237,7 +240,10 @@ fn test_cassandra_peers_rewrite_cassandra4() {
let normal_connection = shotover_manager.cassandra_connection("127.0.0.1", 9043);

let rewrite_port_connection = shotover_manager.cassandra_connection("127.0.0.1", 9044);
table::test(&rewrite_port_connection); // run some basic tests to confirm it works as normal

// run some basic tests to confirm it works as normal
let schema_awaiter = SchemaAwaiter::new_noop();
table::test(&normal_connection, &schema_awaiter).await;

{
assert_query_result(
Expand Down Expand Up @@ -314,7 +320,9 @@ async fn test_cassandra_peers_rewrite_cassandra3() {
);

let connection = shotover_manager.cassandra_connection("127.0.0.1", 9044);
table::test(&connection); // run some basic tests to confirm it works as normal
// run some basic tests to confirm it works as normal
let schema_awaiter = SchemaAwaiter::new_noop();
table::test(&connection, &schema_awaiter).await;

// Assert that the error cassandra gives because system.peers_v2 does not exist on cassandra v3
// is passed through shotover unchanged.
Expand Down
24 changes: 16 additions & 8 deletions shotover-proxy/tests/cassandra_int_tests/schema_awaiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,30 @@ use scylla::{Session, SessionBuilder};
// But for cases like adding a new function we hit issues where the function is not yet propagated to all nodes.
// So we make use of the scylla drivers await_schema_agreement logic to wait until all nodes are on the same schema.
pub struct SchemaAwaiter {
session: Session,
session: Option<Session>,
}

impl SchemaAwaiter {
pub async fn new(node: &str) -> Self {
SchemaAwaiter {
session: SessionBuilder::new()
.known_node(node)
.user("cassandra", "cassandra")
.build()
.await
.unwrap(),
session: Some(
SessionBuilder::new()
.known_node(node)
.user("cassandra", "cassandra")
.build()
.await
.unwrap(),
),
}
}

pub fn new_noop() -> Self {
SchemaAwaiter { session: None }
}

pub async fn await_schema_agreement(&self) {
self.session.await_schema_agreement().await.unwrap();
if let Some(session) = &self.session {
session.await_schema_agreement().await.unwrap();
}
}
}
9 changes: 6 additions & 3 deletions shotover-proxy/tests/cassandra_int_tests/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use crate::helpers::cassandra::{
run_query, CassandraConnection, ResultValue,
};

use super::schema_awaiter::SchemaAwaiter;

fn test_create_table(session: &CassandraConnection) {
run_query(
session,
Expand Down Expand Up @@ -34,23 +36,24 @@ fn test_drop_table(session: &CassandraConnection) {
);
}

fn test_alter_table(session: &CassandraConnection) {
async fn test_alter_table(session: &CassandraConnection, schema_awaiter: &SchemaAwaiter) {
run_query(
session,
"CREATE TABLE test_table_keyspace.alter_me (id UUID PRIMARY KEY, name text, age int);",
);

assert_query_result(session, "SELECT column_name FROM system_schema.columns WHERE keyspace_name = 'test_table_keyspace' AND table_name = 'alter_me' AND column_name = 'age';", &[&[ResultValue::Varchar("age".into())]]);
schema_awaiter.await_schema_agreement().await;
run_query(
session,
"ALTER TABLE test_table_keyspace.alter_me RENAME id TO new_id",
);
assert_query_result(session, "SELECT column_name FROM system_schema.columns WHERE keyspace_name = 'test_table_keyspace' AND table_name = 'alter_me' AND column_name = 'new_id';", &[&[ResultValue::Varchar("new_id".into())]]);
}

pub fn test(session: &CassandraConnection) {
pub async fn test(session: &CassandraConnection, schema_awaiter: &SchemaAwaiter) {
run_query(session, "CREATE KEYSPACE test_table_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };");
test_create_table(session);
test_drop_table(session);
test_alter_table(session);
test_alter_table(session, schema_awaiter).await;
}
9 changes: 6 additions & 3 deletions shotover-proxy/tests/cassandra_int_tests/udt.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::cassandra_int_tests::schema_awaiter::SchemaAwaiter;
use crate::helpers::cassandra::{run_query, CassandraConnection};

fn test_create_udt(session: &CassandraConnection) {
Expand All @@ -15,19 +16,21 @@ fn test_create_udt(session: &CassandraConnection) {
);
}

fn test_drop_udt(session: &CassandraConnection) {
async fn test_drop_udt(session: &CassandraConnection, schema_awaiter: &SchemaAwaiter) {
run_query(
session,
"CREATE TYPE test_udt_keyspace.test_type_drop_me (foo text, bar int)",
);
schema_awaiter.await_schema_agreement().await;
run_query(session, "DROP TYPE test_udt_keyspace.test_type_drop_me;");
schema_awaiter.await_schema_agreement().await;
let statement = "CREATE TABLE test_udt_keyspace.test_delete_table (id int PRIMARY KEY, foo test_type_drop_me);";
let result = session.execute_expect_err(statement).to_string();
assert_eq!(result, "Cassandra detailed error SERVER_INVALID_QUERY: Unknown type test_udt_keyspace.test_type_drop_me");
}

pub fn test(session: &CassandraConnection) {
pub async fn test(session: &CassandraConnection, schema_awaiter: &SchemaAwaiter) {
run_query(session, "CREATE KEYSPACE test_udt_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };");
test_create_udt(session);
test_drop_udt(session);
test_drop_udt(session, schema_awaiter).await;
}

0 comments on commit 0d5ec99

Please sign in to comment.