Skip to content

Commit

Permalink
cdrs-tokio integration
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros committed Aug 26, 2022
1 parent 97d30e7 commit ad8f732
Show file tree
Hide file tree
Showing 14 changed files with 681 additions and 412 deletions.
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ license = "Apache-2.0"
[features]
# Include WIP alpha transforms in the public API
alpha-transforms = []
cpp-driver-tests = []

[dependencies]
pretty-hex = "0.3.0"
Expand Down Expand Up @@ -92,6 +93,7 @@ reqwest = "0.11.6"
metrics-util = "0.14.0"
cdrs-tokio = { git = "https://github.com/krojew/cdrs-tokio" }
scylla = { version = "0.4.7", features = ["ssl"] }
rstest = "0.15.0"

[[bench]]
name = "redis_benches"
Expand All @@ -104,3 +106,4 @@ harness = false
[[bench]]
name = "cassandra_benches"
harness = false
required-features = ["cpp-driver-tests"]
74 changes: 28 additions & 46 deletions shotover-proxy/benches/cassandra_benches.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
use cassandra_cpp::{stmt, Cluster, Session, Statement};
use cassandra_cpp::{stmt, Session, Statement};
use criterion::{criterion_group, criterion_main, Criterion};
use test_helpers::cert::generate_cassandra_test_certs;
use test_helpers::docker_compose::DockerCompose;
use test_helpers::lazy::new_lazy_shared;

#[path = "../tests/helpers/mod.rs"]
mod helpers;
use helpers::cassandra::CassandraConnection;
use helpers::cassandra::{CassandraConnection, CassandraDriver};
use helpers::ShotoverManager;

struct Query {
name: &'static str,
statement: Statement,
}

const DRIVER: CassandraDriver = CassandraDriver::Datastax;

fn cassandra(c: &mut Criterion) {
let mut group = c.benchmark_group("cassandra");
group.throughput(criterion::Throughput::Elements(1));
Expand Down Expand Up @@ -48,7 +50,7 @@ fn cassandra(c: &mut Criterion) {
|b, resources| {
b.iter(|| {
let mut resources = resources.borrow_mut();
let connection = &mut resources.as_mut().unwrap().connection;
let connection = &mut resources.as_mut().unwrap().get_connection();
connection.execute(&query.statement).wait().unwrap();
})
},
Expand All @@ -71,7 +73,7 @@ fn cassandra(c: &mut Criterion) {
|b, resources| {
b.iter(|| {
let mut resources = resources.borrow_mut();
let connection = &mut resources.as_mut().unwrap().connection;
let connection = &mut resources.as_mut().unwrap().get_connection();
connection.execute(&query.statement).wait().unwrap();
})
},
Expand All @@ -93,7 +95,7 @@ fn cassandra(c: &mut Criterion) {
|b, resources| {
b.iter(|| {
let mut resources = resources.borrow_mut();
let connection = &mut resources.as_mut().unwrap().connection;
let connection = &mut resources.as_mut().unwrap().get_connection();
connection.execute(&query.statement).wait().unwrap();
})
},
Expand All @@ -116,7 +118,7 @@ fn cassandra(c: &mut Criterion) {
|b, resources| {
b.iter(|| {
let mut resources = resources.borrow_mut();
let connection = &mut resources.as_mut().unwrap().connection;
let connection = &mut resources.as_mut().unwrap().get_connection();
connection.execute(&query.statement).wait().unwrap();
})
},
Expand All @@ -139,7 +141,7 @@ fn cassandra(c: &mut Criterion) {
|b, resources| {
b.iter(|| {
let mut resources = resources.borrow_mut();
let connection = &mut resources.as_mut().unwrap().connection;
let connection = &mut resources.as_mut().unwrap().get_connection();
connection.execute(&query.statement).wait().unwrap();
})
},
Expand All @@ -158,7 +160,7 @@ fn cassandra(c: &mut Criterion) {
group.bench_with_input(format!("tls_{}", query.name), &resources, |b, resources| {
b.iter(|| {
let mut resources = resources.borrow_mut();
let connection = &mut resources.as_mut().unwrap().connection;
let connection = &mut resources.as_mut().unwrap().get_connection();
connection.execute(&query.statement).wait().unwrap();
})
});
Expand All @@ -184,21 +186,21 @@ fn cassandra(c: &mut Criterion) {
);

resources
.connection
.get_connection()
.execute(&stmt!(
"CREATE KEYSPACE test_protect_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };"
))
.wait()
.unwrap();
resources
.connection
.get_connection()
.execute(&stmt!(
"CREATE TABLE test_protect_keyspace.test_table (pk varchar PRIMARY KEY, cluster varchar, col1 blob, col2 int, col3 boolean);"
))
.wait()
.unwrap();
resources
.connection
.get_connection()
.execute(&stmt!(
"INSERT INTO test_protect_keyspace.test_table (pk, cluster, col1, col2, col3) VALUES ('pk1', 'cluster', 'Initial value', 42, true);"
))
Expand All @@ -215,7 +217,7 @@ fn cassandra(c: &mut Criterion) {
|b, resources| {
b.iter(|| {
let mut resources = resources.borrow_mut();
let connection = &mut resources.as_mut().unwrap().connection;
let connection = &mut resources.as_mut().unwrap().get_connection();
connection.execute(&query.statement).wait().unwrap();
})
},
Expand All @@ -237,7 +239,7 @@ fn cassandra(c: &mut Criterion) {
|b, resources| {
b.iter(|| {
let mut resources = resources.borrow_mut();
let connection = &mut resources.as_mut().unwrap().connection;
let connection = &mut resources.as_mut().unwrap().get_connection();
connection.execute(&query.statement).wait().unwrap();
})
},
Expand All @@ -252,23 +254,15 @@ criterion_main!(benches);
pub struct BenchResources {
_compose: DockerCompose,
_shotover_manager: ShotoverManager,
connection: Session,
connection: CassandraConnection,
}

impl BenchResources {
fn new(shotover_topology: &str, compose_file: &str) -> Self {
let compose = DockerCompose::new(compose_file);
let shotover_manager = ShotoverManager::from_topology_file(shotover_topology);

let mut cluster = Cluster::default();
cluster.set_contact_points("127.0.0.1").unwrap();
cluster.set_credentials("cassandra", "cassandra").unwrap();
cluster.set_port(9042).unwrap();
cluster.set_load_balance_round_robin();

// By default unwrap uses the Debug formatter `{:?}` which is extremely noisy for the error type returned by `connect()`.
// So we instead force the Display formatter `{}` on the error.
let connection = cluster.connect().map_err(|err| format!("{err}")).unwrap();
let connection = shotover_manager.cassandra_connection("127.0.0.1", 9042, DRIVER);

let bench_resources = Self {
_compose: compose,
Expand All @@ -279,17 +273,19 @@ impl BenchResources {
bench_resources
}

pub fn get_connection(&self) -> &Session {
self.connection.as_datastax()
}

fn new_tls(shotover_topology: &str, compose_file: &str) -> Self {
generate_cassandra_test_certs();
let compose = DockerCompose::new(compose_file);
let shotover_manager = ShotoverManager::from_topology_file(shotover_topology);

let ca_cert = "example-configs/cassandra-tls/certs/localhost_CA.crt";

let CassandraConnection::Datastax {
session: connection,
..
} = shotover_manager.cassandra_connection_tls("127.0.0.1", 9042, ca_cert);
let connection =
shotover_manager.cassandra_connection_tls("127.0.0.1", 9042, ca_cert, DRIVER);

let bench_resources = Self {
_compose: compose,
Expand All @@ -301,24 +297,10 @@ impl BenchResources {
}

fn setup(&self) {
self.connection
.execute(&stmt!(
"CREATE KEYSPACE benchmark_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };"
))
.wait().unwrap();

self.connection
.execute(&stmt!(
"CREATE TABLE benchmark_keyspace.table_1 (id int PRIMARY KEY, x int, name varchar);"
))
.wait()
.unwrap();

self.connection
.execute(&stmt!(
"INSERT INTO benchmark_keyspace.table_1 (id, x, name) VALUES (0, 10, 'initial value');"
))
.wait()
.unwrap();
self.connection.as_datastax().execute(&stmt!("CREATE KEYSPACE benchmark_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };")).wait().unwrap();
self.connection.as_datastax().execute(&stmt!("CREATE TABLE benchmark_keyspace.table_1 (id int PRIMARY KEY, x int, name varchar);")).wait().unwrap();
self.connection.as_datastax().execute(
&stmt!("INSERT INTO benchmark_keyspace.table_1 (id, x, name) VALUES (0, 10, 'initial value');"),
).wait().unwrap();
}
}
2 changes: 1 addition & 1 deletion shotover-proxy/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ impl Message {
Metadata::Cassandra(metadata) => {
let body = CassandraOperation::Error(ErrorBody {
error_code: 0x1001,
message: "".into(),
message: "Server overloaded".into(),
additional_info: AdditionalErrorInfo::Overloaded,
});

Expand Down
45 changes: 24 additions & 21 deletions shotover-proxy/tests/cassandra_int_tests/batch_statements.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::helpers::cassandra::{assert_query_result, run_query, CassandraConnection, ResultValue};
use cassandra_cpp::{stmt, Batch, BatchType};

pub async fn test(connection: &CassandraConnection) {
// setup keyspace and table for the batch statement tests
Expand All @@ -9,12 +8,15 @@ pub async fn test(connection: &CassandraConnection) {
}

{
let mut batch = Batch::new(BatchType::LOGGED);
let mut queries: Vec<(String, i32)> = vec![];
for i in 0..2 {
let statement = format!("INSERT INTO batch_keyspace.batch_table (id, lastname, firstname) VALUES ({}, 'text1', 'text2')", i);
batch.add_statement(&stmt!(statement.as_str())).unwrap();
queries.push((
"INSERT INTO batch_keyspace.batch_table (id, lastname, firstname) VALUES (?, 'text1', 'text2')".into(),
i
));
}
connection.execute_batch(&batch);

connection.execute_batch(queries);

assert_query_result(
connection,
Expand All @@ -36,15 +38,15 @@ pub async fn test(connection: &CassandraConnection) {
}

{
let mut batch = Batch::new(BatchType::LOGGED);
let mut queries: Vec<(String, i32)> = vec![];
for i in 0..2 {
let statement = format!(
"UPDATE batch_keyspace.batch_table SET lastname = 'text3' WHERE id = {};",
i
);
batch.add_statement(&stmt!(statement.as_str())).unwrap();
queries.push((
"UPDATE batch_keyspace.batch_table SET lastname = 'text3' WHERE id = ?;".into(),
i,
));
}
connection.execute_batch(&batch);

connection.execute_batch(queries);

assert_query_result(
connection,
Expand All @@ -66,26 +68,27 @@ pub async fn test(connection: &CassandraConnection) {
}

{
let mut batch = Batch::new(BatchType::LOGGED);
let mut queries: Vec<(String, i32)> = vec![];
for i in 0..2 {
let statement = format!("DELETE FROM batch_keyspace.batch_table WHERE id = {};", i);
batch.add_statement(&stmt!(statement.as_str())).unwrap();
queries.push((
"DELETE FROM batch_keyspace.batch_table WHERE id = ?;".into(),
i,
));
}
connection.execute_batch(&batch);
connection.execute_batch(queries);
assert_query_result(connection, "SELECT * FROM batch_keyspace.batch_table;", &[]).await;
}

{
let batch = Batch::new(BatchType::LOGGED);
connection.execute_batch(&batch);
connection.execute_batch(vec![]);
}

// test batch statements over QUERY PROTOCOL
{
let insert_statement = "BEGIN BATCH
INSERT INTO batch_keyspace.batch_table (id, lastname, firstname) VALUES (2, 'text1', 'text2');
INSERT INTO batch_keyspace.batch_table (id, lastname, firstname) VALUES (3, 'text1', 'text2');
APPLY BATCH;";
INSERT INTO batch_keyspace.batch_table (id, lastname, firstname) VALUES (2, 'text1', 'text2');
INSERT INTO batch_keyspace.batch_table (id, lastname, firstname) VALUES (3, 'text1', 'text2');
APPLY BATCH;";
run_query(connection, insert_statement).await;

assert_query_result(
Expand Down
Loading

0 comments on commit ad8f732

Please sign in to comment.