Skip to content

Commit

Permalink
Merge branch 'main' into query_sytem_local
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Aug 15, 2022
2 parents 4a9be03 + f4d4878 commit 9427869
Show file tree
Hide file tree
Showing 23 changed files with 639 additions and 522 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.62"
channel = "1.63"
components = [ "rustfmt", "clippy" ]
2 changes: 1 addition & 1 deletion shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ nix = "0.24.0"
reqwest = "0.11.6"
metrics-util = "0.14.0"
cdrs-tokio = { git = "https://github.com/krojew/cdrs-tokio", rev = "7d6d8fee723a232f17b4173de61e6fd9eb4d4505" }
scylla = "0.4.7"
scylla = { version = "0.4.7", features = ["ssl"] }

[[bench]]
name = "redis_benches"
Expand Down
19 changes: 14 additions & 5 deletions shotover-proxy/benches/cassandra_benches.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use cassandra_cpp::{stmt, Session, Statement};
use cassandra_cpp::{stmt, Cluster, Session, Statement};
use criterion::{criterion_group, criterion_main, Criterion};
use test_helpers::cert::generate_cassandra_test_certs;
use test_helpers::docker_compose::DockerCompose;
Expand Down Expand Up @@ -260,8 +260,15 @@ impl BenchResources {
let compose = DockerCompose::new(compose_file);
let shotover_manager = ShotoverManager::from_topology_file(shotover_topology);

let CassandraConnection::Datastax(connection) =
shotover_manager.cassandra_connection("127.0.0.1", 9042);
let mut cluster = Cluster::default();
cluster.set_contact_points("127.0.0.1").unwrap();
cluster.set_credentials("cassandra", "cassandra").unwrap();
cluster.set_port(9042).unwrap();
cluster.set_load_balance_round_robin();

// By default unwrap uses the Debug formatter `{:?}` which is extremely noisy for the error type returned by `connect()`.
// So we instead force the Display formatter `{}` on the error.
let connection = cluster.connect().map_err(|err| format!("{err}")).unwrap();

let bench_resources = Self {
_compose: compose,
Expand All @@ -279,8 +286,10 @@ impl BenchResources {

let ca_cert = "example-configs/cassandra-tls/certs/localhost_CA.crt";

let CassandraConnection::Datastax(connection) =
shotover_manager.cassandra_connection_tls("127.0.0.1", 9042, ca_cert);
let CassandraConnection::Datastax {
session: connection,
..
} = shotover_manager.cassandra_connection_tls("127.0.0.1", 9042, ca_cert);

let bench_resources = Self {
_compose: compose,
Expand Down
2 changes: 2 additions & 0 deletions shotover-proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
//! * [`transforms::Transforms`], the enum to register with (add a variant) for enabling your own transform.
//! * [`transforms::TransformsConfig`], the enum to register with (add a variant) for configuring your own transform.

#![allow(clippy::derive_partial_eq_without_eq)]

pub mod codec;
mod concurrency;
pub mod config;
Expand Down
6 changes: 3 additions & 3 deletions shotover-proxy/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,9 @@ impl Message {
bytes,
message_type,
} => match message_type {
MessageType::Cassandra => Ok(Metadata::Cassandra(cassandra::raw_frame::metadata(
&*bytes,
)?)),
MessageType::Cassandra => {
Ok(Metadata::Cassandra(cassandra::raw_frame::metadata(bytes)?))
}
MessageType::Redis => Ok(Metadata::Redis),
MessageType::None => Ok(Metadata::None),
},
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/transforms/protect/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub async fn encrypt(
key_management: &KeyManager,
key_id: &str,
) -> Result<Operand> {
let value = MessageValue::from(&*value);
let value = MessageValue::from(value);

let sym_key = key_management.cached_get_key(key_id, None, None).await?;

Expand Down
24 changes: 14 additions & 10 deletions shotover-proxy/tests/cassandra_int_tests/batch_statements.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::helpers::cassandra::{assert_query_result, run_query, CassandraConnection, ResultValue};
use cassandra_cpp::{stmt, Batch, BatchType};

pub fn test(connection: &CassandraConnection) {
pub async fn test(connection: &CassandraConnection) {
// setup keyspace and table for the batch statement tests
{
run_query(connection, "CREATE KEYSPACE batch_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };");
run_query(connection, "CREATE TABLE batch_keyspace.batch_table (id int PRIMARY KEY, lastname text, firstname text);");
run_query(connection, "CREATE KEYSPACE batch_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").await;
run_query(connection, "CREATE TABLE batch_keyspace.batch_table (id int PRIMARY KEY, lastname text, firstname text);").await;
}

{
Expand All @@ -31,7 +31,8 @@ pub fn test(connection: &CassandraConnection) {
ResultValue::Varchar("text2".into()),
],
],
);
)
.await;
}

{
Expand Down Expand Up @@ -60,7 +61,8 @@ pub fn test(connection: &CassandraConnection) {
ResultValue::Varchar("text2".into()),
],
],
);
)
.await;
}

{
Expand All @@ -70,7 +72,7 @@ pub fn test(connection: &CassandraConnection) {
batch.add_statement(&stmt!(statement.as_str())).unwrap();
}
connection.execute_batch(&batch);
assert_query_result(connection, "SELECT * FROM batch_keyspace.batch_table;", &[]);
assert_query_result(connection, "SELECT * FROM batch_keyspace.batch_table;", &[]).await;
}

{
Expand All @@ -84,7 +86,7 @@ pub fn test(connection: &CassandraConnection) {
INSERT INTO batch_keyspace.batch_table (id, lastname, firstname) VALUES (2, 'text1', 'text2');
INSERT INTO batch_keyspace.batch_table (id, lastname, firstname) VALUES (3, 'text1', 'text2');
APPLY BATCH;";
run_query(connection, insert_statement);
run_query(connection, insert_statement).await;

assert_query_result(
connection,
Expand All @@ -101,10 +103,11 @@ APPLY BATCH;";
ResultValue::Varchar("text2".into()),
],
],
);
)
.await;

let update_statement = "BEGIN BATCH UPDATE batch_keyspace.batch_table SET lastname = 'text3' WHERE id = 2; UPDATE batch_keyspace.batch_table SET lastname = 'text3' WHERE id = 3; APPLY BATCH;";
run_query(connection, update_statement);
run_query(connection, update_statement).await;

assert_query_result(
connection,
Expand All @@ -121,6 +124,7 @@ APPLY BATCH;";
ResultValue::Varchar("text2".into()),
],
],
);
)
.await;
}
}
20 changes: 10 additions & 10 deletions shotover-proxy/tests/cassandra_int_tests/cache/assert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ fn get_cache_miss_value(snapshotter: &Snapshotter) -> u64 {
result
}

fn assert_increment(
async fn assert_increment(
snapshotter: &Snapshotter,
session: &CassandraConnection,
query: &str,
expected_rows: &[&[ResultValue]],
) {
let before = get_cache_miss_value(snapshotter);
assert_query_result(session, query, expected_rows);
assert_query_result(session, query, expected_rows).await;
let after = get_cache_miss_value(snapshotter);
assert_eq!(
before + 1,
Expand All @@ -32,14 +32,14 @@ fn assert_increment(
);
}

fn assert_unchanged(
async fn assert_unchanged(
snapshotter: &Snapshotter,
session: &CassandraConnection,
query: &str,
expected_rows: &[&[ResultValue]],
) {
let before = get_cache_miss_value(snapshotter);
assert_query_result(session, query, expected_rows);
assert_query_result(session, query, expected_rows).await;
let after = get_cache_miss_value(snapshotter);
assert_eq!(
before,
Expand All @@ -48,26 +48,26 @@ fn assert_unchanged(
);
}

pub fn assert_query_is_cached(
pub async fn assert_query_is_cached(
snapshotter: &Snapshotter,
session: &CassandraConnection,
query: &str,
expected_rows: &[&[ResultValue]],
) {
// A query can be demonstrated as being cached if it is first recorded as a cache miss and then not recorded as a cache miss
assert_increment(snapshotter, session, query, expected_rows);
assert_unchanged(snapshotter, session, query, expected_rows);
assert_increment(snapshotter, session, query, expected_rows).await;
assert_unchanged(snapshotter, session, query, expected_rows).await;
}

pub fn assert_query_is_uncacheable(
pub async fn assert_query_is_uncacheable(
snapshotter: &Snapshotter,
session: &CassandraConnection,
query: &str,
expected_rows: &[&[ResultValue]],
) {
// A query can be demonstrated as not being cached if it never shows up as a cache miss
assert_unchanged(snapshotter, session, query, expected_rows);
assert_unchanged(snapshotter, session, query, expected_rows);
assert_unchanged(snapshotter, session, query, expected_rows).await;
assert_unchanged(snapshotter, session, query, expected_rows).await;
}

pub fn assert_sorted_set_equals(
Expand Down
34 changes: 21 additions & 13 deletions shotover-proxy/tests/cassandra_int_tests/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,34 @@ use metrics_util::debugging::Snapshotter;
use redis::Commands;
use std::collections::HashSet;

pub fn test(
pub async fn test(
cassandra_session: &CassandraConnection,
redis_connection: &mut redis::Connection,
snapshotter: &Snapshotter,
) {
redis::cmd("FLUSHDB").execute(redis_connection);

run_query(cassandra_session, "CREATE KEYSPACE test_cache_keyspace_simple WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };");
run_query(cassandra_session, "CREATE KEYSPACE test_cache_keyspace_simple WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").await;
run_query(
cassandra_session,
"CREATE TABLE test_cache_keyspace_simple.test_table (id int PRIMARY KEY, x int, name varchar);",
);
cassandra_session,
"CREATE TABLE test_cache_keyspace_simple.test_table (id int PRIMARY KEY, x int, name varchar);",
).await;

run_query(
cassandra_session,
"INSERT INTO test_cache_keyspace_simple.test_table (id, x, name) VALUES (1, 11, 'foo');",
);
)
.await;
run_query(
cassandra_session,
"INSERT INTO test_cache_keyspace_simple.test_table (id, x, name) VALUES (2, 12, 'bar');",
);
)
.await;
run_query(
cassandra_session,
"INSERT INTO test_cache_keyspace_simple.test_table (id, x, name) VALUES (3, 13, 'baz');",
);
)
.await;

// selects without where clauses do not hit the cache
assert::assert_query_is_uncacheable(
Expand All @@ -53,7 +56,8 @@ pub fn test(
ResultValue::Varchar("baz".into()),
],
],
);
)
.await;

// query against the primary key
assert::assert_query_is_cached(
Expand All @@ -65,7 +69,8 @@ pub fn test(
ResultValue::Int(11),
ResultValue::Varchar("foo".into()),
]],
);
)
.await;

// ensure key 2 and 3 are also loaded
assert::assert_query_is_cached(
Expand All @@ -77,7 +82,8 @@ pub fn test(
ResultValue::Int(12),
ResultValue::Varchar("bar".into()),
]],
);
)
.await;

assert::assert_query_is_cached(
snapshotter,
Expand All @@ -88,7 +94,8 @@ pub fn test(
ResultValue::Int(13),
ResultValue::Varchar("baz".into()),
]],
);
)
.await;

// query without primary key does not hit the cache
assert::assert_query_is_uncacheable(
Expand All @@ -100,7 +107,8 @@ pub fn test(
ResultValue::Int(11),
ResultValue::Varchar("foo".into()),
]],
);
)
.await;

let result: HashSet<String> = redis_connection.keys("*").unwrap();
let expected = HashSet::from([
Expand Down
Loading

0 comments on commit 9427869

Please sign in to comment.