diff --git a/.github/workflows/build_and_test.yaml b/.github/workflows/build_and_test.yaml index 5a0df9795..4ea0eb3b4 100644 --- a/.github/workflows/build_and_test.yaml +++ b/.github/workflows/build_and_test.yaml @@ -75,6 +75,7 @@ jobs: cargo run --release --example cassandra_bench -- --config-dir example-configs/cassandra-passthrough --rate 1000 cargo run --release --example cassandra_cluster_bench cargo run --release --example cassandra_cluster_flamegraph + cargo run --release --example cassandra_bench_mocked if: ${{ matrix.name == 'Ubuntu 20.04 - Release' }} - name: Ensure that tests did not create or modify any files that arent .gitignore'd run: | diff --git a/shotover-proxy/examples/cassandra_bench.rs b/shotover-proxy/examples/cassandra_bench.rs index 7034a3fed..34784e33f 100644 --- a/shotover-proxy/examples/cassandra_bench.rs +++ b/shotover-proxy/examples/cassandra_bench.rs @@ -22,7 +22,7 @@ async fn main() { test_helpers::bench::init(); let args = Args::parse(); - let latte = Latte::new(args.rate); + let latte = Latte::new(args.rate, 1); let bench = "read"; { let _compose = DockerCompose::new(&format!("{}/docker-compose.yaml", args.config_dir)); @@ -37,11 +37,11 @@ async fn main() { println!("Benching Shotover ..."); latte.init(bench, "localhost:9043"); latte.bench(bench, "localhost:9042"); + shotover.shutdown_and_then_consume_events(&[]).await; + println!("Benching Direct Cassandra ..."); latte.init(bench, "localhost:9043"); latte.bench(bench, "localhost:9043"); - - shotover.shutdown_and_then_consume_events(&[]).await; } println!("Direct Cassandra (A) vs Shotover (B)"); diff --git a/shotover-proxy/examples/cassandra_bench_mocked.rs b/shotover-proxy/examples/cassandra_bench_mocked.rs new file mode 100644 index 000000000..f8e488d02 --- /dev/null +++ b/shotover-proxy/examples/cassandra_bench_mocked.rs @@ -0,0 +1,47 @@ +use std::num::NonZeroUsize; + +use test_helpers::latte::Latte; +use test_helpers::shotover_process::ShotoverProcessBuilder; + +// This benchmark does not reflect realistic use as we are using a mocked out version of cassandra. +// The purpose of this bench is to create a scenario where shotover is the bottleneck between the client and cassandra. +// This will allow us to observe optimizations that might be too small to reliably show up in a realistic bench. e.g. improvements to parsing speed +// On the other hand, for optimizations that could be dependent on cassandra having such unrealistically low latency, they need to be confirmed in more realistic scenarios. e.g. changes to the way we batch messages in tcp + +#[tokio::main] +async fn main() { + test_helpers::bench::init(); + + let (shotover_cores, client_cores, db_cores) = + // Assigning multiple cores to the db and client will help ensure that shotover acts as the bottleneck. + // So take advantage of that when we have the cores available to do so. + if std::thread::available_parallelism().unwrap() >= NonZeroUsize::new(8).unwrap() { + (1, 3, 3) + } else { + (1, 1, 1) + }; + + let latte = Latte::new(10000000, client_cores); + let config_dir = "example-configs/cassandra-passthrough"; + let bench = "read"; + { + test_helpers::mock_cassandra::start(db_cores, 9043); + let shotover = + ShotoverProcessBuilder::new_with_topology(&format!("{}/topology.yaml", config_dir)) + .with_cores(shotover_cores) + .start() + .await; + + println!("Benching Shotover ..."); + // no need to initialize as cassandra is completely mocked out, just directly start benching. + latte.bench(bench, "localhost:9042"); + + shotover.shutdown_and_then_consume_events(&[]).await; + + println!("Benching Direct Mocked Cassandra ..."); + latte.bench(bench, "localhost:9043"); + } + + println!("Direct Mocked Cassandra (A) vs Shotover (B)"); + latte.compare("read-localhost:9043.json", "read-localhost:9042.json"); +} diff --git a/shotover-proxy/examples/cassandra_cluster_bench.rs b/shotover-proxy/examples/cassandra_cluster_bench.rs index fb2c72c67..dbdd8cdbc 100644 --- a/shotover-proxy/examples/cassandra_cluster_bench.rs +++ b/shotover-proxy/examples/cassandra_cluster_bench.rs @@ -6,7 +6,7 @@ use test_helpers::shotover_process::ShotoverProcessBuilder; async fn main() { test_helpers::bench::init(); - let latte = Latte::new(10000000); + let latte = Latte::new(10000000, 1); let config_dir = "example-configs/cassandra-cluster-v4"; let bench = "read"; { @@ -20,11 +20,11 @@ async fn main() { latte.init(bench, "172.16.1.2:9044"); latte.bench(bench, "localhost:9042"); + shotover.shutdown_and_then_consume_events(&[]).await; + println!("Benching Direct Cassandra ..."); latte.init(bench, "172.16.1.2:9044"); latte.bench(bench, "172.16.1.2:9044"); - - shotover.shutdown_and_then_consume_events(&[]).await; } println!("Direct Cassandra (A) vs Shotover (B)"); diff --git a/shotover-proxy/examples/cassandra_cluster_flamegraph.rs b/shotover-proxy/examples/cassandra_cluster_flamegraph.rs index 0d814cf90..d7cead24b 100644 --- a/shotover-proxy/examples/cassandra_cluster_flamegraph.rs +++ b/shotover-proxy/examples/cassandra_cluster_flamegraph.rs @@ -13,7 +13,7 @@ use test_helpers::shotover_process::ShotoverProcessBuilder; async fn main() { test_helpers::bench::init(); - let latte = Latte::new(10000000); + let latte = Latte::new(10000000, 1); let config_dir = "example-configs/cassandra-cluster-v4"; let bench = "read"; { diff --git a/test-helpers/src/latte.rs b/test-helpers/src/latte.rs index 4815b376e..c6179724f 100644 --- a/test-helpers/src/latte.rs +++ b/test-helpers/src/latte.rs @@ -4,10 +4,11 @@ // * write our own benchmark logic pub struct Latte { rate: u64, + threads: u64, } impl Latte { - pub fn new(rate: u64) -> Latte { + pub fn new(rate: u64, threads: u64) -> Latte { crate::docker_compose::run_command( "cargo", &[ @@ -19,7 +20,7 @@ impl Latte { ], ) .unwrap(); - Latte { rate } + Latte { rate, threads } } pub fn init(&self, name: &str, address_load: &str) { @@ -68,6 +69,8 @@ impl Latte { "15s", // default is 60s but 15 seems fine "--connections", "128", // Shotover performs extremely poorly with 1 connection and this is not currently an intended usecase + "--threads", + &self.threads.to_string(), "--output", &format!("{name}-{address_bench}.json"), &format!("examples/{name}.rn"), diff --git a/test-helpers/src/lib.rs b/test-helpers/src/lib.rs index 89344ea93..393412fe2 100644 --- a/test-helpers/src/lib.rs +++ b/test-helpers/src/lib.rs @@ -8,6 +8,7 @@ pub mod flamegraph; pub mod latte; pub mod lazy; pub mod metrics; +pub mod mock_cassandra; pub mod shotover_process; use anyhow::{bail, Result}; diff --git a/test-helpers/src/mock_cassandra.rs b/test-helpers/src/mock_cassandra.rs new file mode 100644 index 000000000..8d951cb13 --- /dev/null +++ b/test-helpers/src/mock_cassandra.rs @@ -0,0 +1,193 @@ +use bytes::{Bytes, BytesMut}; +use cassandra_protocol::frame::{CheckEnvelopeSizeError, Opcode}; +use std::io::{Read, Write}; +use std::net::{TcpListener, TcpStream}; +use std::sync::mpsc; +use std::thread::JoinHandle; + +struct Worker { + handle: JoinHandle<()>, + connection_tx: mpsc::Sender, +} + +// Spawns a single thread which will reply to cassandra messages with dummy responses. +// No attempt at correctness is made here, its purely just whatever is enough to get the benchmark to complete +// What we want to test is the speed of a basic message moving through shotover +pub fn start(cores: usize, port: u16) -> JoinHandle<()> { + std::thread::spawn(move || { + let mut workers = vec![]; + + for _ in 0..cores { + workers.push(Worker::spawn()); + } + + let listener = TcpListener::bind(("127.0.0.1", port)).unwrap(); + + let mut worker_index = 0; + for stream in listener.incoming() { + let stream = stream.unwrap(); + stream.set_nonblocking(true).unwrap(); + stream.set_nodelay(true).unwrap(); + + // This allocation algorithm relies on the fact that the bencher will equally distribute + // load accross all connections without terminating any for the length of the bench + workers[worker_index] + .connection_tx + .send(Connection { + stream, + buffer: BytesMut::with_capacity(10000), + }) + .unwrap(); + worker_index = (worker_index + 1) % workers.len(); + } + + for worker in workers { + worker.handle.join().unwrap(); + } + }) +} + +impl Worker { + fn spawn() -> Worker { + let (connection_tx, connection_rx) = mpsc::channel(); + let handle = std::thread::spawn(move || { + let mut connections: Vec = vec![]; + loop { + for connection in connection_rx.try_iter() { + connections.push(connection); + } + for connection in &mut connections { + while let Some(message) = connection.get_message() { + let stream_id1 = message[2]; + let stream_id2 = message[3]; + match Opcode::try_from(message[4]).unwrap() { + Opcode::Options => connection.send_message(&[ + 0x84, 0x00, stream_id1, stream_id2, 0x06, 0x00, 0x00, 0x00, 0x66, + 0x00, 0x03, 0x00, 0x11, 0x50, 0x52, 0x4f, 0x54, 0x4f, 0x43, 0x4f, + 0x4c, 0x5f, 0x56, 0x45, 0x52, 0x53, 0x49, 0x4f, 0x4e, 0x53, 0x00, + 0x04, 0x00, 0x04, 0x33, 0x2f, 0x76, 0x33, 0x00, 0x04, 0x34, 0x2f, + 0x76, 0x34, 0x00, 0x04, 0x35, 0x2f, 0x76, 0x35, 0x00, 0x09, 0x36, + 0x2f, 0x76, 0x36, 0x2d, 0x62, 0x65, 0x74, 0x61, 0x00, 0x0b, 0x43, + 0x4f, 0x4d, 0x50, 0x52, 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x00, + 0x02, 0x00, 0x06, 0x73, 0x6e, 0x61, 0x70, 0x70, 0x79, 0x00, 0x03, + 0x6c, 0x7a, 0x34, 0x00, 0x0b, 0x43, 0x51, 0x4c, 0x5f, 0x56, 0x45, + 0x52, 0x53, 0x49, 0x4f, 0x4e, 0x00, 0x01, 0x00, 0x05, 0x33, 0x2e, + 0x34, 0x2e, 0x35, + ]), + Opcode::Startup | Opcode::Register => connection + .send_message(&[0x84, 0, stream_id1, stream_id2, 2, 0, 0, 0, 0]), + Opcode::Prepare => connection.send_message(&[ + 0x84, 0x00, stream_id1, stream_id2, 0x08, 0x00, 0x00, 0x00, 0x54, + 0x00, 0x00, 0x00, 0x04, 0x00, 0x10, 0x73, 0x8d, 0x0d, 0x1d, 0x8d, + 0xcd, 0xf4, 0x7a, 0xbe, 0x14, 0xb6, 0x16, 0xb6, 0x29, 0xaa, 0xce, + 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, + 0x01, 0x00, 0x00, 0x00, 0x05, 0x6c, 0x61, 0x74, 0x74, 0x65, 0x00, + 0x05, 0x62, 0x61, 0x73, 0x69, 0x63, 0x00, 0x02, 0x69, 0x64, 0x00, + 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x05, + 0x6c, 0x61, 0x74, 0x74, 0x65, 0x00, 0x05, 0x62, 0x61, 0x73, 0x69, + 0x63, 0x00, 0x02, 0x69, 0x64, 0x00, 0x02, + ]), + // Just assume this is the query that is being benchmarked + Opcode::Execute => connection.send_message(&[ + 0x84, 0x00, stream_id1, stream_id2, 0x08, 0x00, 0x00, 0x00, 0x04, + 0x00, 0x00, 0x00, 0x01, + ]), + Opcode::Query => { + let query_len = + u32::from_be_bytes(message[9..13].try_into().unwrap()) as usize; + let query = + std::str::from_utf8(&message[13..13 + query_len]).unwrap(); + match query { + "select peer, data_center, rack, tokens from system.peers" => { + connection.send_message(&[ + 0x84, 0x00, stream_id1, stream_id2, 0x08, 0x00, 0x00, 0x00, + 0x4a, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, + 0x00, 0x00, 0x04, 0x00, 0x06, 0x73, 0x79, 0x73, 0x74, 0x65, + 0x6d, 0x00, 0x05, 0x70, 0x65, 0x65, 0x72, 0x73, 0x00, 0x04, + 0x70, 0x65, 0x65, 0x72, 0x00, 0x10, 0x00, 0x0b, 0x64, 0x61, + 0x74, 0x61, 0x5f, 0x63, 0x65, 0x6e, 0x74, 0x65, 0x72, 0x00, + 0x0d, 0x00, 0x04, 0x72, 0x61, 0x63, 0x6b, 0x00, 0x0d, 0x00, + 0x06, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x73, 0x00, 0x22, 0x00, + 0x0d, 0x00, 0x00, 0x00, 0x00, + ]) + } + "select rpc_address, data_center, rack, tokens from system.local" => { + connection.send_message(&[ + 0x84, 0x00, stream_id1, stream_id2, 0x08, 0x00, 0x00, 0x00, + 0x7e, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, + 0x00, 0x00, 0x04, 0x00, 0x06, 0x73, 0x79, 0x73, 0x74, 0x65, + 0x6d, 0x00, 0x05, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x00, 0x0b, + 0x72, 0x70, 0x63, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, + 0x73, 0x00, 0x10, 0x00, 0x0b, 0x64, 0x61, 0x74, 0x61, 0x5f, + 0x63, 0x65, 0x6e, 0x74, 0x65, 0x72, 0x00, 0x0d, 0x00, 0x04, + 0x72, 0x61, 0x63, 0x6b, 0x00, 0x0d, 0x00, 0x06, 0x74, 0x6f, + 0x6b, 0x65, 0x6e, 0x73, 0x00, 0x22, 0x00, 0x0d, 0x00, 0x00, + 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0xac, 0x13, 0x00, 0x02, + 0x00, 0x00, 0x00, 0x0b, 0x64, 0x61, 0x74, 0x61, 0x63, 0x65, + 0x6e, 0x74, 0x65, 0x72, 0x31, 0x00, 0x00, 0x00, 0x05, 0x72, + 0x61, 0x63, 0x6b, 0x31, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, + 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x30, + ]); + } + "select keyspace_name, replication from system_schema.keyspaces" => { + connection.send_message(&[ + 0x84, 0x00, stream_id1, stream_id2, 0x08, 0x00, 0x00, 0x00, 0x04, + 0x00, 0x00, 0x00, 0x01 + ]); + } + "SELECT cluster_name, release_version FROM system.local" => { + connection.send_message(&[ + 0x84, 0x0, stream_id1, stream_id2, 0x08, 0x00, 0x00, 0x00, 0x04, + 0x00, 0x00, 0x00, 0x01 + ]); + } + query => todo!("Unhandled query {query:?}"), + } + } + + op => todo!("Unhandled opcode {op} in message: {message:?}"), + } + } + } + } + }); + Worker { + handle, + connection_tx, + } + } +} + +struct Connection { + stream: TcpStream, + buffer: BytesMut, +} + +impl Connection { + fn get_message(&mut self) -> Option { + if let Some(message) = self.message_from_buffer() { + return Some(message); + } + + let mut bytes = [0u8; 2048]; + match self.stream.read(&mut bytes).map_err(|e| e.kind()) { + Ok(size) => self.buffer.extend(&bytes[..size]), + Err(std::io::ErrorKind::WouldBlock) => {} + Err(err) => panic!("unexpected error when reading {err}"), + } + + self.message_from_buffer() + } + + fn message_from_buffer(&mut self) -> Option { + match cassandra_protocol::frame::Envelope::check_envelope_size(&self.buffer) { + Ok(message_size) => Some(self.buffer.split_to(message_size).freeze()), + Err(CheckEnvelopeSizeError::NotEnoughBytes) => None, + Err(err) => panic!("envelope error: {err:?}"), + } + } + + fn send_message(&mut self, message: &[u8]) { + self.stream.write_all(message).unwrap(); + } +} diff --git a/test-helpers/src/shotover_process.rs b/test-helpers/src/shotover_process.rs index 367b0cacd..c88c400af 100644 --- a/test-helpers/src/shotover_process.rs +++ b/test-helpers/src/shotover_process.rs @@ -7,6 +7,7 @@ pub use tokio_bin_process::BinProcess; pub struct ShotoverProcessBuilder { topology_path: String, log_name: Option, + cores: Option, } impl ShotoverProcessBuilder { @@ -14,6 +15,7 @@ impl ShotoverProcessBuilder { Self { topology_path: topology_path.to_owned(), log_name: None, + cores: None, } } @@ -22,10 +24,19 @@ impl ShotoverProcessBuilder { self } + pub fn with_cores(mut self, cores: u32) -> Self { + self.cores = Some(cores.to_string()); + self + } + pub async fn start(&self) -> BinProcess { + let mut args = vec!["-t", &self.topology_path, "--log-format", "json"]; + if let Some(cores) = &self.cores { + args.extend(["--core-threads", cores]); + } let mut shotover = BinProcess::start_with_args( "shotover-proxy", - &["-t", &self.topology_path, "--log-format", "json"], + &args, self.log_name.as_deref().unwrap_or("shotover"), ) .await;