Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cassandra mock bench #1016

Merged
merged 5 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build_and_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ jobs:
cargo run --release --example cassandra_bench -- --config-dir example-configs/cassandra-passthrough --rate 1000
cargo run --release --example cassandra_cluster_bench
cargo run --release --example cassandra_cluster_flamegraph
cargo run --release --example cassandra_bench_mocked
if: ${{ matrix.name == 'Ubuntu 20.04 - Release' }}
- name: Ensure that tests did not create or modify any files that arent .gitignore'd
run: |
Expand Down
6 changes: 3 additions & 3 deletions shotover-proxy/examples/cassandra_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async fn main() {
test_helpers::bench::init();
let args = Args::parse();

let latte = Latte::new(args.rate);
let latte = Latte::new(args.rate, 1);
let bench = "read";
{
let _compose = DockerCompose::new(&format!("{}/docker-compose.yaml", args.config_dir));
Expand All @@ -37,11 +37,11 @@ async fn main() {
println!("Benching Shotover ...");
latte.init(bench, "localhost:9043");
latte.bench(bench, "localhost:9042");
shotover.shutdown_and_then_consume_events(&[]).await;

println!("Benching Direct Cassandra ...");
latte.init(bench, "localhost:9043");
latte.bench(bench, "localhost:9043");

shotover.shutdown_and_then_consume_events(&[]).await;
}

println!("Direct Cassandra (A) vs Shotover (B)");
Expand Down
47 changes: 47 additions & 0 deletions shotover-proxy/examples/cassandra_bench_mocked.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::num::NonZeroUsize;

use test_helpers::latte::Latte;
use test_helpers::shotover_process::ShotoverProcessBuilder;

// This benchmark does not reflect realistic use as we are using a mocked out version of cassandra.
// The purpose of this bench is to create a scenario where shotover is the bottleneck between the client and cassandra.
// This will allow us to observe optimizations that might be too small to reliably show up in a realistic bench. e.g. improvements to parsing speed
// On the other hand, for optimizations that could be dependent on cassandra having such unrealistically low latency, they need to be confirmed in more realistic scenarios. e.g. changes to the way we batch messages in tcp

#[tokio::main]
async fn main() {
test_helpers::bench::init();

let (shotover_cores, client_cores, db_cores) =
// Assigning multiple cores to the db and client will help ensure that shotover acts as the bottleneck.
// So take advantage of that when we have the cores available to do so.
if std::thread::available_parallelism().unwrap() >= NonZeroUsize::new(8).unwrap() {
(1, 3, 3)
} else {
(1, 1, 1)
};

let latte = Latte::new(10000000, client_cores);
let config_dir = "example-configs/cassandra-passthrough";
let bench = "read";
{
test_helpers::mock_cassandra::start(db_cores, 9043);
let shotover =
ShotoverProcessBuilder::new_with_topology(&format!("{}/topology.yaml", config_dir))
.with_cores(shotover_cores)
.start()
.await;

println!("Benching Shotover ...");
// no need to initialize as cassandra is completely mocked out, just directly start benching.
latte.bench(bench, "localhost:9042");

shotover.shutdown_and_then_consume_events(&[]).await;

println!("Benching Direct Mocked Cassandra ...");
latte.bench(bench, "localhost:9043");
}

println!("Direct Mocked Cassandra (A) vs Shotover (B)");
latte.compare("read-localhost:9043.json", "read-localhost:9042.json");
}
6 changes: 3 additions & 3 deletions shotover-proxy/examples/cassandra_cluster_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use test_helpers::shotover_process::ShotoverProcessBuilder;
async fn main() {
test_helpers::bench::init();

let latte = Latte::new(10000000);
let latte = Latte::new(10000000, 1);
let config_dir = "example-configs/cassandra-cluster-v4";
let bench = "read";
{
Expand All @@ -20,11 +20,11 @@ async fn main() {
latte.init(bench, "172.16.1.2:9044");
latte.bench(bench, "localhost:9042");

shotover.shutdown_and_then_consume_events(&[]).await;

println!("Benching Direct Cassandra ...");
latte.init(bench, "172.16.1.2:9044");
latte.bench(bench, "172.16.1.2:9044");

shotover.shutdown_and_then_consume_events(&[]).await;
}

println!("Direct Cassandra (A) vs Shotover (B)");
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/examples/cassandra_cluster_flamegraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use test_helpers::shotover_process::ShotoverProcessBuilder;
async fn main() {
test_helpers::bench::init();

let latte = Latte::new(10000000);
let latte = Latte::new(10000000, 1);
let config_dir = "example-configs/cassandra-cluster-v4";
let bench = "read";
{
Expand Down
7 changes: 5 additions & 2 deletions test-helpers/src/latte.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
// * write our own benchmark logic
pub struct Latte {
rate: u64,
threads: u64,
}

impl Latte {
pub fn new(rate: u64) -> Latte {
pub fn new(rate: u64, threads: u64) -> Latte {
crate::docker_compose::run_command(
"cargo",
&[
Expand All @@ -19,7 +20,7 @@ impl Latte {
],
)
.unwrap();
Latte { rate }
Latte { rate, threads }
}

pub fn init(&self, name: &str, address_load: &str) {
Expand Down Expand Up @@ -68,6 +69,8 @@ impl Latte {
"15s", // default is 60s but 15 seems fine
"--connections",
"128", // Shotover performs extremely poorly with 1 connection and this is not currently an intended usecase
"--threads",
&self.threads.to_string(),
"--output",
&format!("{name}-{address_bench}.json"),
&format!("examples/{name}.rn"),
Expand Down
1 change: 1 addition & 0 deletions test-helpers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod flamegraph;
pub mod latte;
pub mod lazy;
pub mod metrics;
pub mod mock_cassandra;
pub mod shotover_process;

use anyhow::{bail, Result};
Expand Down
193 changes: 193 additions & 0 deletions test-helpers/src/mock_cassandra.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
use bytes::{Bytes, BytesMut};
use cassandra_protocol::frame::{CheckEnvelopeSizeError, Opcode};
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::mpsc;
use std::thread::JoinHandle;

struct Worker {
handle: JoinHandle<()>,
connection_tx: mpsc::Sender<Connection>,
}

// Spawns a single thread which will reply to cassandra messages with dummy responses.
// No attempt at correctness is made here, its purely just whatever is enough to get the benchmark to complete
// What we want to test is the speed of a basic message moving through shotover
pub fn start(cores: usize, port: u16) -> JoinHandle<()> {
std::thread::spawn(move || {
let mut workers = vec![];

for _ in 0..cores {
workers.push(Worker::spawn());
}

let listener = TcpListener::bind(("127.0.0.1", port)).unwrap();

let mut worker_index = 0;
for stream in listener.incoming() {
let stream = stream.unwrap();
stream.set_nonblocking(true).unwrap();
stream.set_nodelay(true).unwrap();

// This allocation algorithm relies on the fact that the bencher will equally distribute
// load accross all connections without terminating any for the length of the bench
workers[worker_index]
.connection_tx
.send(Connection {
stream,
buffer: BytesMut::with_capacity(10000),
})
.unwrap();
worker_index = (worker_index + 1) % workers.len();
}

for worker in workers {
worker.handle.join().unwrap();
}
})
}

impl Worker {
fn spawn() -> Worker {
let (connection_tx, connection_rx) = mpsc::channel();
let handle = std::thread::spawn(move || {
let mut connections: Vec<Connection> = vec![];
loop {
for connection in connection_rx.try_iter() {
connections.push(connection);
}
for connection in &mut connections {
while let Some(message) = connection.get_message() {
let stream_id1 = message[2];
let stream_id2 = message[3];
match Opcode::try_from(message[4]).unwrap() {
Opcode::Options => connection.send_message(&[
0x84, 0x00, stream_id1, stream_id2, 0x06, 0x00, 0x00, 0x00, 0x66,
0x00, 0x03, 0x00, 0x11, 0x50, 0x52, 0x4f, 0x54, 0x4f, 0x43, 0x4f,
0x4c, 0x5f, 0x56, 0x45, 0x52, 0x53, 0x49, 0x4f, 0x4e, 0x53, 0x00,
0x04, 0x00, 0x04, 0x33, 0x2f, 0x76, 0x33, 0x00, 0x04, 0x34, 0x2f,
0x76, 0x34, 0x00, 0x04, 0x35, 0x2f, 0x76, 0x35, 0x00, 0x09, 0x36,
0x2f, 0x76, 0x36, 0x2d, 0x62, 0x65, 0x74, 0x61, 0x00, 0x0b, 0x43,
0x4f, 0x4d, 0x50, 0x52, 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x00,
0x02, 0x00, 0x06, 0x73, 0x6e, 0x61, 0x70, 0x70, 0x79, 0x00, 0x03,
0x6c, 0x7a, 0x34, 0x00, 0x0b, 0x43, 0x51, 0x4c, 0x5f, 0x56, 0x45,
0x52, 0x53, 0x49, 0x4f, 0x4e, 0x00, 0x01, 0x00, 0x05, 0x33, 0x2e,
0x34, 0x2e, 0x35,
]),
Opcode::Startup | Opcode::Register => connection
.send_message(&[0x84, 0, stream_id1, stream_id2, 2, 0, 0, 0, 0]),
Opcode::Prepare => connection.send_message(&[
0x84, 0x00, stream_id1, stream_id2, 0x08, 0x00, 0x00, 0x00, 0x54,
0x00, 0x00, 0x00, 0x04, 0x00, 0x10, 0x73, 0x8d, 0x0d, 0x1d, 0x8d,
0xcd, 0xf4, 0x7a, 0xbe, 0x14, 0xb6, 0x16, 0xb6, 0x29, 0xaa, 0xce,
0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
0x01, 0x00, 0x00, 0x00, 0x05, 0x6c, 0x61, 0x74, 0x74, 0x65, 0x00,
0x05, 0x62, 0x61, 0x73, 0x69, 0x63, 0x00, 0x02, 0x69, 0x64, 0x00,
0x02, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x05,
0x6c, 0x61, 0x74, 0x74, 0x65, 0x00, 0x05, 0x62, 0x61, 0x73, 0x69,
0x63, 0x00, 0x02, 0x69, 0x64, 0x00, 0x02,
]),
// Just assume this is the query that is being benchmarked
Opcode::Execute => connection.send_message(&[
0x84, 0x00, stream_id1, stream_id2, 0x08, 0x00, 0x00, 0x00, 0x04,
0x00, 0x00, 0x00, 0x01,
]),
Opcode::Query => {
let query_len =
u32::from_be_bytes(message[9..13].try_into().unwrap()) as usize;
let query =
std::str::from_utf8(&message[13..13 + query_len]).unwrap();
match query {
"select peer, data_center, rack, tokens from system.peers" => {
connection.send_message(&[
0x84, 0x00, stream_id1, stream_id2, 0x08, 0x00, 0x00, 0x00,
0x4a, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00,
0x00, 0x00, 0x04, 0x00, 0x06, 0x73, 0x79, 0x73, 0x74, 0x65,
0x6d, 0x00, 0x05, 0x70, 0x65, 0x65, 0x72, 0x73, 0x00, 0x04,
0x70, 0x65, 0x65, 0x72, 0x00, 0x10, 0x00, 0x0b, 0x64, 0x61,
0x74, 0x61, 0x5f, 0x63, 0x65, 0x6e, 0x74, 0x65, 0x72, 0x00,
0x0d, 0x00, 0x04, 0x72, 0x61, 0x63, 0x6b, 0x00, 0x0d, 0x00,
0x06, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x73, 0x00, 0x22, 0x00,
0x0d, 0x00, 0x00, 0x00, 0x00,
])
}
"select rpc_address, data_center, rack, tokens from system.local" => {
connection.send_message(&[
0x84, 0x00, stream_id1, stream_id2, 0x08, 0x00, 0x00, 0x00,
0x7e, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00,
0x00, 0x00, 0x04, 0x00, 0x06, 0x73, 0x79, 0x73, 0x74, 0x65,
0x6d, 0x00, 0x05, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x00, 0x0b,
0x72, 0x70, 0x63, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73,
0x73, 0x00, 0x10, 0x00, 0x0b, 0x64, 0x61, 0x74, 0x61, 0x5f,
0x63, 0x65, 0x6e, 0x74, 0x65, 0x72, 0x00, 0x0d, 0x00, 0x04,
0x72, 0x61, 0x63, 0x6b, 0x00, 0x0d, 0x00, 0x06, 0x74, 0x6f,
0x6b, 0x65, 0x6e, 0x73, 0x00, 0x22, 0x00, 0x0d, 0x00, 0x00,
0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0xac, 0x13, 0x00, 0x02,
0x00, 0x00, 0x00, 0x0b, 0x64, 0x61, 0x74, 0x61, 0x63, 0x65,
0x6e, 0x74, 0x65, 0x72, 0x31, 0x00, 0x00, 0x00, 0x05, 0x72,
0x61, 0x63, 0x6b, 0x31, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00,
0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x30,
]);
}
"select keyspace_name, replication from system_schema.keyspaces" => {
connection.send_message(&[
0x84, 0x00, stream_id1, stream_id2, 0x08, 0x00, 0x00, 0x00, 0x04,
0x00, 0x00, 0x00, 0x01
]);
}
"SELECT cluster_name, release_version FROM system.local" => {
connection.send_message(&[
0x84, 0x0, stream_id1, stream_id2, 0x08, 0x00, 0x00, 0x00, 0x04,
0x00, 0x00, 0x00, 0x01
]);
}
query => todo!("Unhandled query {query:?}"),
}
}

op => todo!("Unhandled opcode {op} in message: {message:?}"),
}
}
}
}
});
Worker {
handle,
connection_tx,
}
}
}

struct Connection {
stream: TcpStream,
buffer: BytesMut,
}

impl Connection {
fn get_message(&mut self) -> Option<Bytes> {
if let Some(message) = self.message_from_buffer() {
return Some(message);
}

let mut bytes = [0u8; 2048];
match self.stream.read(&mut bytes).map_err(|e| e.kind()) {
Ok(size) => self.buffer.extend(&bytes[..size]),
Err(std::io::ErrorKind::WouldBlock) => {}
Err(err) => panic!("unexpected error when reading {err}"),
}

self.message_from_buffer()
}

fn message_from_buffer(&mut self) -> Option<Bytes> {
match cassandra_protocol::frame::Envelope::check_envelope_size(&self.buffer) {
Ok(message_size) => Some(self.buffer.split_to(message_size).freeze()),
Err(CheckEnvelopeSizeError::NotEnoughBytes) => None,
Err(err) => panic!("envelope error: {err:?}"),
}
}

fn send_message(&mut self, message: &[u8]) {
self.stream.write_all(message).unwrap();
}
}
13 changes: 12 additions & 1 deletion test-helpers/src/shotover_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ pub use tokio_bin_process::BinProcess;
pub struct ShotoverProcessBuilder {
topology_path: String,
log_name: Option<String>,
cores: Option<String>,
}

impl ShotoverProcessBuilder {
pub fn new_with_topology(topology_path: &str) -> Self {
Self {
topology_path: topology_path.to_owned(),
log_name: None,
cores: None,
}
}

Expand All @@ -22,10 +24,19 @@ impl ShotoverProcessBuilder {
self
}

pub fn with_cores(mut self, cores: u32) -> Self {
self.cores = Some(cores.to_string());
self
}

pub async fn start(&self) -> BinProcess {
let mut args = vec!["-t", &self.topology_path, "--log-format", "json"];
if let Some(cores) = &self.cores {
args.extend(["--core-threads", cores]);
}
let mut shotover = BinProcess::start_with_args(
"shotover-proxy",
&["-t", &self.topology_path, "--log-format", "json"],
&args,
self.log_name.as_deref().unwrap_or("shotover"),
)
.await;
Expand Down