Skip to content

Commit

Permalink
Merge branch 'main' into handle_use_statements_when_routing
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Sep 15, 2022
2 parents 0dd3abe + a46fefa commit b0a4918
Show file tree
Hide file tree
Showing 13 changed files with 757 additions and 326 deletions.
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ license = "Apache-2.0"
[features]
# Include WIP alpha transforms in the public API
alpha-transforms = []
cassandra-cpp-driver-tests = []

[dependencies]
pretty-hex = "0.3.0"
Expand Down Expand Up @@ -92,7 +93,9 @@ reqwest = "0.11.6"
metrics-util = "0.14.0"
cdrs-tokio = { git = "https://github.com/krojew/cdrs-tokio" }
scylla = { version = "0.5.0", features = ["ssl"] }
rstest = "0.15.0"

[[bench]]
name = "benches"
harness = false
required-features = ["cassandra-cpp-driver-tests"]
76 changes: 44 additions & 32 deletions shotover-proxy/benches/benches/cassandra.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::helpers::cassandra::CassandraConnection;
use crate::helpers::cassandra::{CassandraConnection, CassandraDriver};
use crate::helpers::ShotoverManager;
use cassandra_cpp::{stmt, Session, Statement};
use criterion::{criterion_group, Criterion};
use criterion::{criterion_group, criterion_main, Criterion};
use test_helpers::cert::generate_cassandra_test_certs;
use test_helpers::docker_compose::DockerCompose;
use test_helpers::lazy::new_lazy_shared;
Expand All @@ -11,6 +11,8 @@ struct Query {
statement: Statement,
}

const DRIVER: CassandraDriver = CassandraDriver::Datastax;

fn cassandra(c: &mut Criterion) {
let mut group = c.benchmark_group("cassandra");
group.throughput(criterion::Throughput::Elements(1));
Expand Down Expand Up @@ -45,7 +47,7 @@ fn cassandra(c: &mut Criterion) {
|b, resources| {
b.iter(|| {
let mut resources = resources.borrow_mut();
let connection = &mut resources.as_mut().unwrap().connection;
let connection = &mut resources.as_mut().unwrap().get_connection();
connection.execute(&query.statement).wait().unwrap();
})
},
Expand All @@ -68,7 +70,7 @@ fn cassandra(c: &mut Criterion) {
|b, resources| {
b.iter(|| {
let mut resources = resources.borrow_mut();
let connection = &mut resources.as_mut().unwrap().connection;
let connection = &mut resources.as_mut().unwrap().get_connection();
connection.execute(&query.statement).wait().unwrap();
})
},
Expand All @@ -90,7 +92,7 @@ fn cassandra(c: &mut Criterion) {
|b, resources| {
b.iter(|| {
let mut resources = resources.borrow_mut();
let connection = &mut resources.as_mut().unwrap().connection;
let connection = &mut resources.as_mut().unwrap().get_connection();
connection.execute(&query.statement).wait().unwrap();
})
},
Expand All @@ -113,7 +115,7 @@ fn cassandra(c: &mut Criterion) {
|b, resources| {
b.iter(|| {
let mut resources = resources.borrow_mut();
let connection = &mut resources.as_mut().unwrap().connection;
let connection = &mut resources.as_mut().unwrap().get_connection();
connection.execute(&query.statement).wait().unwrap();
})
},
Expand All @@ -136,7 +138,7 @@ fn cassandra(c: &mut Criterion) {
|b, resources| {
b.iter(|| {
let mut resources = resources.borrow_mut();
let connection = &mut resources.as_mut().unwrap().connection;
let connection = &mut resources.as_mut().unwrap().get_connection();
connection.execute(&query.statement).wait().unwrap();
})
},
Expand All @@ -155,7 +157,7 @@ fn cassandra(c: &mut Criterion) {
group.bench_with_input(format!("tls_{}", query.name), &resources, |b, resources| {
b.iter(|| {
let mut resources = resources.borrow_mut();
let connection = &mut resources.as_mut().unwrap().connection;
let connection = &mut resources.as_mut().unwrap().get_connection();
connection.execute(&query.statement).wait().unwrap();
})
});
Expand All @@ -181,21 +183,21 @@ fn cassandra(c: &mut Criterion) {
);

resources
.connection
.get_connection()
.execute(&stmt!(
"CREATE KEYSPACE test_protect_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };"
))
.wait()
.unwrap();
resources
.connection
.get_connection()
.execute(&stmt!(
"CREATE TABLE test_protect_keyspace.test_table (pk varchar PRIMARY KEY, cluster varchar, col1 blob, col2 int, col3 boolean);"
))
.wait()
.unwrap();
resources
.connection
.get_connection()
.execute(&stmt!(
"INSERT INTO test_protect_keyspace.test_table (pk, cluster, col1, col2, col3) VALUES ('pk1', 'cluster', 'Initial value', 42, true);"
))
Expand All @@ -212,7 +214,7 @@ fn cassandra(c: &mut Criterion) {
|b, resources| {
b.iter(|| {
let mut resources = resources.borrow_mut();
let connection = &mut resources.as_mut().unwrap().connection;
let connection = &mut resources.as_mut().unwrap().get_connection();
connection.execute(&query.statement).wait().unwrap();
})
},
Expand All @@ -234,7 +236,7 @@ fn cassandra(c: &mut Criterion) {
|b, resources| {
b.iter(|| {
let mut resources = resources.borrow_mut();
let connection = &mut resources.as_mut().unwrap().connection;
let connection = &mut resources.as_mut().unwrap().get_connection();
connection.execute(&query.statement).wait().unwrap();
})
},
Expand All @@ -244,22 +246,21 @@ fn cassandra(c: &mut Criterion) {
}

criterion_group!(benches, cassandra);
criterion_main!(benches);

pub struct BenchResources {
_compose: DockerCompose,
_shotover_manager: ShotoverManager,
connection: Session,
connection: CassandraConnection,
}

impl BenchResources {
fn new(shotover_topology: &str, compose_file: &str) -> Self {
let compose = DockerCompose::new(compose_file);
let shotover_manager = ShotoverManager::from_topology_file(shotover_topology);

let CassandraConnection::Datastax {
session: connection,
..
} = futures::executor::block_on(CassandraConnection::new("127.0.0.1", 9042));
let connection =
futures::executor::block_on(CassandraConnection::new("127.0.0.1", 9042, DRIVER));

let bench_resources = Self {
_compose: compose,
Expand All @@ -270,17 +271,18 @@ impl BenchResources {
bench_resources
}

pub fn get_connection(&self) -> &Session {
self.connection.as_datastax()
}

fn new_tls(shotover_topology: &str, compose_file: &str) -> Self {
generate_cassandra_test_certs();
let compose = DockerCompose::new(compose_file);
let shotover_manager = ShotoverManager::from_topology_file(shotover_topology);

let ca_cert = "example-configs/cassandra-tls/certs/localhost_CA.crt";

let CassandraConnection::Datastax {
session: connection,
..
} = futures::executor::block_on(CassandraConnection::new_tls("127.0.0.1", 9042, ca_cert));
let connection = CassandraConnection::new_tls("127.0.0.1", 9042, ca_cert, DRIVER);

let bench_resources = Self {
_compose: compose,
Expand All @@ -292,23 +294,33 @@ impl BenchResources {
}

fn setup(&self) {
let create_keyspace = stmt!(
"CREATE KEYSPACE benchmark_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };"
);

let create_table = stmt!(
"CREATE TABLE benchmark_keyspace.table_1 (id int PRIMARY KEY, x int, name varchar);"
);

let insert = stmt!(
"INSERT INTO benchmark_keyspace.table_1 (id, x, name) VALUES (0, 10, 'initial value');"
);

self.connection
.execute(&stmt!(
"CREATE KEYSPACE benchmark_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };"
))
.wait().unwrap();
.as_datastax()
.execute(&create_keyspace)
.wait()
.unwrap();

self.connection
.execute(&stmt!(
"CREATE TABLE benchmark_keyspace.table_1 (id int PRIMARY KEY, x int, name varchar);"
))
.as_datastax()
.execute(&create_table)
.wait()
.unwrap();

self.connection
.execute(&stmt!(
"INSERT INTO benchmark_keyspace.table_1 (id, x, name) VALUES (0, 10, 'initial value');"
))
.as_datastax()
.execute(&insert)
.wait()
.unwrap();
}
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl Message {
Metadata::Cassandra(metadata) => {
let body = CassandraOperation::Error(ErrorBody {
error_code: 0x1001,
message: "".into(),
message: "Server overloaded".into(),
additional_info: AdditionalErrorInfo::Overloaded,
});

Expand Down
31 changes: 15 additions & 16 deletions shotover-proxy/tests/cassandra_int_tests/batch_statements.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::helpers::cassandra::{assert_query_result, run_query, CassandraConnection, ResultValue};
use cassandra_cpp::{stmt, Batch, BatchType};

pub async fn test(connection: &CassandraConnection) {
// setup keyspace and table for the batch statement tests
Expand All @@ -9,12 +8,11 @@ pub async fn test(connection: &CassandraConnection) {
}

{
let mut batch = Batch::new(BatchType::LOGGED);
let mut batch = vec![];
for i in 0..2 {
let statement = format!("INSERT INTO batch_keyspace.batch_table (id, lastname, firstname) VALUES ({}, 'text1', 'text2')", i);
batch.add_statement(&stmt!(statement.as_str())).unwrap();
batch.push(format!("INSERT INTO batch_keyspace.batch_table (id, lastname, firstname) VALUES ({}, 'text1', 'text2')", i));
}
connection.execute_batch(&batch);
connection.execute_batch(batch);

assert_query_result(
connection,
Expand All @@ -36,15 +34,14 @@ pub async fn test(connection: &CassandraConnection) {
}

{
let mut batch = Batch::new(BatchType::LOGGED);
let mut batch = vec![];
for i in 0..2 {
let statement = format!(
batch.push(format!(
"UPDATE batch_keyspace.batch_table SET lastname = 'text3' WHERE id = {};",
i
);
batch.add_statement(&stmt!(statement.as_str())).unwrap();
));
}
connection.execute_batch(&batch);
connection.execute_batch(batch);

assert_query_result(
connection,
Expand All @@ -66,18 +63,20 @@ pub async fn test(connection: &CassandraConnection) {
}

{
let mut batch = Batch::new(BatchType::LOGGED);
let mut batch = vec![];
for i in 0..2 {
let statement = format!("DELETE FROM batch_keyspace.batch_table WHERE id = {};", i);
batch.add_statement(&stmt!(statement.as_str())).unwrap();
batch.push(format!(
"DELETE FROM batch_keyspace.batch_table WHERE id = {};",
i
));
}
connection.execute_batch(&batch);
connection.execute_batch(batch);
assert_query_result(connection, "SELECT * FROM batch_keyspace.batch_table;", &[]).await;
}

{
let batch = Batch::new(BatchType::LOGGED);
connection.execute_batch(&batch);
let batch = vec![];
connection.execute_batch(batch);
}

// test batch statements over QUERY PROTOCOL
Expand Down
Loading

0 comments on commit b0a4918

Please sign in to comment.