Skip to content

Commit

Permalink
Fix and test token aware routing (#854)
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros authored Oct 24, 2022
1 parent 5b7ef5e commit fccf27c
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ fn serialize_routing_key_with_indexes(
0 => None,
1 => values
.get(pk_indexes[0] as usize)
.map(|value| value.serialize_to_vec(version)),
.and_then(|value| match value {
Value::Some(value) => Some(value.serialize_to_vec(version)),
_ => None,
}),
_ => {
let mut buf = vec![];
if pk_indexes
Expand Down
4 changes: 4 additions & 0 deletions shotover-proxy/tests/cassandra_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ mod prepared_statements;
#[cfg(feature = "cassandra-cpp-driver-tests")]
#[cfg(feature = "alpha-transforms")]
mod protect;
#[cfg(feature = "cassandra-cpp-driver-tests")]
mod routing;
mod table;
mod udt;

Expand Down Expand Up @@ -148,6 +150,7 @@ async fn test_cluster_single_rack_v3(#[case] driver: CassandraDriver) {
};
standard_test_suite(&connection, driver).await;
cluster_single_rack_v3::test_dummy_peers(&connection().await).await;
routing::test("127.0.0.1", 9042, "172.16.1.2", 9042).await;

//Check for bugs in cross connection state
native_types::test(&connection().await).await;
Expand Down Expand Up @@ -181,6 +184,7 @@ async fn test_cluster_single_rack_v4(#[case] driver: CassandraDriver) {
standard_test_suite(&connection, driver).await;
cluster_single_rack_v4::test(&connection().await).await;

routing::test("127.0.0.1", 9042, "172.16.1.2", 9044).await;
//Check for bugs in cross connection state
let mut connection2 = CassandraConnection::new("127.0.0.1", 9042, driver).await;
connection2
Expand Down
93 changes: 93 additions & 0 deletions shotover-proxy/tests/cassandra_int_tests/routing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use crate::helpers::cassandra::{run_query, CassandraConnection, CassandraDriver};

pub async fn create_keyspace(connection: &mut CassandraConnection) {
let create_ks: &'static str = "CREATE KEYSPACE IF NOT EXISTS test_routing_ks WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };";
run_query(connection, create_ks).await;
}

pub async fn create_table(connection: &mut CassandraConnection) {
let create_table_cql =
"CREATE TABLE IF NOT EXISTS test_routing_ks.my_test_table (key int PRIMARY KEY, name text);";
run_query(connection, create_table_cql).await;
}

pub async fn test(
shotover_contact_point: &str,
shotover_port: u16,
cassandra_contact_point: &str,
cassandra_port: u16,
) {
let mut shotover = CassandraConnection::new(
shotover_contact_point,
shotover_port,
CassandraDriver::Scylla,
)
.await;
shotover
.enable_schema_awaiter(
&format!("{}:{}", cassandra_contact_point, cassandra_port),
None,
)
.await;
let cassandra = CassandraConnection::new(
cassandra_contact_point,
cassandra_port,
CassandraDriver::Scylla,
)
.await;

create_keyspace(&mut shotover).await;
create_table(&mut shotover).await;

let insert_cql = "INSERT INTO test_routing_ks.my_test_table (key, name) VALUES (?, 'my_name')";
let prepared_insert = shotover.prepare(insert_cql).await;

let select_cql = "SELECT name FROM test_routing_ks.my_test_table WHERE key = ?;";
let prepared_select = shotover.prepare(select_cql).await;

let update_cql = "UPDATE test_routing_ks.my_test_table SET name = 'not_my_name' WHERE key = ?";
let prepared_update = cassandra.prepare(update_cql).await;

let delete_cql = "DELETE FROM test_routing_ks.my_test_table WHERE key = ?;";
let prepared_delete = cassandra.prepare(delete_cql).await;

for key in 0..10 {
let shotover_hit = shotover
.execute_prepared_coordinator_node(&prepared_insert, key)
.await;
let cassandra_hit = cassandra
.execute_prepared_coordinator_node(&prepared_insert, key)
.await;
assert_eq!(shotover_hit, cassandra_hit);
}

for key in 0..10 {
let shotover_hit = shotover
.execute_prepared_coordinator_node(&prepared_select, key)
.await;
let cassandra_hit = cassandra
.execute_prepared_coordinator_node(&prepared_select, key)
.await;
assert_eq!(shotover_hit, cassandra_hit);
}

for key in 0..10 {
let shotover_hit = shotover
.execute_prepared_coordinator_node(&prepared_update, key)
.await;
let cassandra_hit = cassandra
.execute_prepared_coordinator_node(&prepared_update, key)
.await;
assert_eq!(shotover_hit, cassandra_hit);
}

for key in 0..10 {
let shotover_hit = shotover
.execute_prepared_coordinator_node(&prepared_delete, key)
.await;
let cassandra_hit = cassandra
.execute_prepared_coordinator_node(&prepared_delete, key)
.await;
assert_eq!(shotover_hit, cassandra_hit);
}
}
Loading

0 comments on commit fccf27c

Please sign in to comment.