From f7baaa09bcc74413d36787b046771eccadcf0da2 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 24 Jan 2023 18:58:46 +1100 Subject: [PATCH 1/2] Add cassandra bench with mocked out cassandra server --- shotover-proxy/examples/cassandra_bench.rs | 4 +- .../examples/cassandra_bench_mocked.rs | 27 +++ .../examples/cassandra_cluster_bench.rs | 4 +- test-helpers/src/lib.rs | 1 + test-helpers/src/mock_cassandra.rs | 161 ++++++++++++++++++ 5 files changed, 193 insertions(+), 4 deletions(-) create mode 100644 shotover-proxy/examples/cassandra_bench_mocked.rs create mode 100644 test-helpers/src/mock_cassandra.rs diff --git a/shotover-proxy/examples/cassandra_bench.rs b/shotover-proxy/examples/cassandra_bench.rs index 7034a3fed..a7326c2bf 100644 --- a/shotover-proxy/examples/cassandra_bench.rs +++ b/shotover-proxy/examples/cassandra_bench.rs @@ -37,11 +37,11 @@ async fn main() { println!("Benching Shotover ..."); latte.init(bench, "localhost:9043"); latte.bench(bench, "localhost:9042"); + shotover.shutdown_and_then_consume_events(&[]).await; + println!("Benching Direct Cassandra ..."); latte.init(bench, "localhost:9043"); latte.bench(bench, "localhost:9043"); - - shotover.shutdown_and_then_consume_events(&[]).await; } println!("Direct Cassandra (A) vs Shotover (B)"); diff --git a/shotover-proxy/examples/cassandra_bench_mocked.rs b/shotover-proxy/examples/cassandra_bench_mocked.rs new file mode 100644 index 000000000..1725e99eb --- /dev/null +++ b/shotover-proxy/examples/cassandra_bench_mocked.rs @@ -0,0 +1,27 @@ +use test_helpers::latte::Latte; +use test_helpers::shotover_process::shotover_from_topology_file; + +#[tokio::main] +async fn main() { + test_helpers::bench::init(); + + let latte = Latte::new(10000000); + let config_dir = "example-configs/cassandra-passthrough"; + let bench = "read"; + { + test_helpers::mock_cassandra::start(9043); + let shotover = shotover_from_topology_file(&format!("{}/topology.yaml", config_dir)).await; + + println!("Benching Shotover ..."); + // no need to initialize as cassandra is completely mocked out, just directly start benching. + latte.bench(bench, "localhost:9042"); + + shotover.shutdown_and_then_consume_events(&[]).await; + + println!("Benching Direct Mocked Cassandra ..."); + latte.bench(bench, "localhost:9043"); + } + + println!("Direct Mocked Cassandra (A) vs Shotover (B)"); + latte.compare("read-localhost:9043.json", "read-localhost:9042.json"); +} diff --git a/shotover-proxy/examples/cassandra_cluster_bench.rs b/shotover-proxy/examples/cassandra_cluster_bench.rs index fb2c72c67..52599c47f 100644 --- a/shotover-proxy/examples/cassandra_cluster_bench.rs +++ b/shotover-proxy/examples/cassandra_cluster_bench.rs @@ -20,11 +20,11 @@ async fn main() { latte.init(bench, "172.16.1.2:9044"); latte.bench(bench, "localhost:9042"); + shotover.shutdown_and_then_consume_events(&[]).await; + println!("Benching Direct Cassandra ..."); latte.init(bench, "172.16.1.2:9044"); latte.bench(bench, "172.16.1.2:9044"); - - shotover.shutdown_and_then_consume_events(&[]).await; } println!("Direct Cassandra (A) vs Shotover (B)"); diff --git a/test-helpers/src/lib.rs b/test-helpers/src/lib.rs index 89344ea93..393412fe2 100644 --- a/test-helpers/src/lib.rs +++ b/test-helpers/src/lib.rs @@ -8,6 +8,7 @@ pub mod flamegraph; pub mod latte; pub mod lazy; pub mod metrics; +pub mod mock_cassandra; pub mod shotover_process; use anyhow::{bail, Result}; diff --git a/test-helpers/src/mock_cassandra.rs b/test-helpers/src/mock_cassandra.rs new file mode 100644 index 000000000..88b22a7a7 --- /dev/null +++ b/test-helpers/src/mock_cassandra.rs @@ -0,0 +1,161 @@ +use bytes::{Bytes, BytesMut}; +use cassandra_protocol::frame::{CheckEnvelopeSizeError, Opcode}; +use std::io::{Read, Write}; +use std::net::{TcpListener, TcpStream}; +use std::sync::mpsc; +use std::thread::JoinHandle; + +// Spawns a single thread which will reply to cassandra messages with dummy responses. +// No attempt at correctness is made here, its purely just whatever is enough to get the benchmark to complete +// What we want to test is the speed of a basic message moving through shotover +pub fn start(port: u16) -> JoinHandle<()> { + std::thread::spawn(move || { + let mut connections = vec![]; + let (connection_tx, connection_rx) = mpsc::channel(); + let _listener = std::thread::spawn(move || { + let listener = TcpListener::bind(("127.0.0.1", port)).unwrap(); + + for stream in listener.incoming() { + let stream = stream.unwrap(); + stream.set_nonblocking(true).unwrap(); + stream.set_nodelay(true).unwrap(); + connection_tx + .send(Connection { + stream, + buffer: BytesMut::with_capacity(10000), + }) + .unwrap(); + } + }); + + loop { + for connection in connection_rx.try_iter() { + connections.push(connection); + } + for connection in &mut connections { + while let Some(message) = connection.get_message() { + let stream_id1 = message[2]; + let stream_id2 = message[3]; + match Opcode::try_from(message[4]).unwrap() { + Opcode::Options => connection.send_message(&[ + 0x84, 0x00, stream_id1, stream_id2, 0x06, 0x00, 0x00, 0x00, 0x66, 0x00, + 0x03, 0x00, 0x11, 0x50, 0x52, 0x4f, 0x54, 0x4f, 0x43, 0x4f, 0x4c, 0x5f, + 0x56, 0x45, 0x52, 0x53, 0x49, 0x4f, 0x4e, 0x53, 0x00, 0x04, 0x00, 0x04, + 0x33, 0x2f, 0x76, 0x33, 0x00, 0x04, 0x34, 0x2f, 0x76, 0x34, 0x00, 0x04, + 0x35, 0x2f, 0x76, 0x35, 0x00, 0x09, 0x36, 0x2f, 0x76, 0x36, 0x2d, 0x62, + 0x65, 0x74, 0x61, 0x00, 0x0b, 0x43, 0x4f, 0x4d, 0x50, 0x52, 0x45, 0x53, + 0x53, 0x49, 0x4f, 0x4e, 0x00, 0x02, 0x00, 0x06, 0x73, 0x6e, 0x61, 0x70, + 0x70, 0x79, 0x00, 0x03, 0x6c, 0x7a, 0x34, 0x00, 0x0b, 0x43, 0x51, 0x4c, + 0x5f, 0x56, 0x45, 0x52, 0x53, 0x49, 0x4f, 0x4e, 0x00, 0x01, 0x00, 0x05, + 0x33, 0x2e, 0x34, 0x2e, 0x35, + ]), + Opcode::Startup | Opcode::Register => connection + .send_message(&[0x84, 0, stream_id1, stream_id2, 2, 0, 0, 0, 0]), + Opcode::Prepare => connection.send_message(&[ + 0x84, 0x00, stream_id1, stream_id2, 0x08, 0x00, 0x00, 0x00, 0x54, 0x00, + 0x00, 0x00, 0x04, 0x00, 0x10, 0x73, 0x8d, 0x0d, 0x1d, 0x8d, 0xcd, 0xf4, + 0x7a, 0xbe, 0x14, 0xb6, 0x16, 0xb6, 0x29, 0xaa, 0xce, 0x00, 0x00, 0x00, + 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, + 0x05, 0x6c, 0x61, 0x74, 0x74, 0x65, 0x00, 0x05, 0x62, 0x61, 0x73, 0x69, + 0x63, 0x00, 0x02, 0x69, 0x64, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, + 0x00, 0x00, 0x01, 0x00, 0x05, 0x6c, 0x61, 0x74, 0x74, 0x65, 0x00, 0x05, + 0x62, 0x61, 0x73, 0x69, 0x63, 0x00, 0x02, 0x69, 0x64, 0x00, 0x02, + ]), + // Just assume this is the query that is being benchmarked + Opcode::Execute => connection.send_message(&[ + 0x84, 0x00, stream_id1, stream_id2, 0x08, 0x00, 0x00, 0x00, 0x04, 0x00, + 0x00, 0x00, 0x01, + ]), + Opcode::Query => { + let query_len = + u32::from_be_bytes(message[9..13].try_into().unwrap()) as usize; + let query = std::str::from_utf8(&message[13..13 + query_len]).unwrap(); + match query { + "select peer, data_center, rack, tokens from system.peers" => { + connection.send_message(&[ + 0x84, 0x00, stream_id1, stream_id2, 0x08, 0x00, 0x00, 0x00, + 0x4a, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, + 0x00, 0x00, 0x04, 0x00, 0x06, 0x73, 0x79, 0x73, 0x74, 0x65, + 0x6d, 0x00, 0x05, 0x70, 0x65, 0x65, 0x72, 0x73, 0x00, 0x04, + 0x70, 0x65, 0x65, 0x72, 0x00, 0x10, 0x00, 0x0b, 0x64, 0x61, + 0x74, 0x61, 0x5f, 0x63, 0x65, 0x6e, 0x74, 0x65, 0x72, 0x00, + 0x0d, 0x00, 0x04, 0x72, 0x61, 0x63, 0x6b, 0x00, 0x0d, 0x00, + 0x06, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x73, 0x00, 0x22, 0x00, + 0x0d, 0x00, 0x00, 0x00, 0x00, + ]) + } + "select rpc_address, data_center, rack, tokens from system.local" => { + connection.send_message(&[ + 0x84, 0x00, stream_id1, stream_id2, 0x08, 0x00, 0x00, 0x00, + 0x7e, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, + 0x00, 0x00, 0x04, 0x00, 0x06, 0x73, 0x79, 0x73, 0x74, 0x65, + 0x6d, 0x00, 0x05, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x00, 0x0b, + 0x72, 0x70, 0x63, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, + 0x73, 0x00, 0x10, 0x00, 0x0b, 0x64, 0x61, 0x74, 0x61, 0x5f, + 0x63, 0x65, 0x6e, 0x74, 0x65, 0x72, 0x00, 0x0d, 0x00, 0x04, + 0x72, 0x61, 0x63, 0x6b, 0x00, 0x0d, 0x00, 0x06, 0x74, 0x6f, + 0x6b, 0x65, 0x6e, 0x73, 0x00, 0x22, 0x00, 0x0d, 0x00, 0x00, + 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0xac, 0x13, 0x00, 0x02, + 0x00, 0x00, 0x00, 0x0b, 0x64, 0x61, 0x74, 0x61, 0x63, 0x65, + 0x6e, 0x74, 0x65, 0x72, 0x31, 0x00, 0x00, 0x00, 0x05, 0x72, + 0x61, 0x63, 0x6b, 0x31, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, + 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x30, + ]); + } + "select keyspace_name, replication from system_schema.keyspaces" => { + connection.send_message(&[ + 0x84, 0x00, stream_id1, stream_id2, 0x08, 0x00, 0x00, 0x00, 0x04, + 0x00, 0x00, 0x00, 0x01 + ]); + } + "SELECT cluster_name, release_version FROM system.local" => { + connection.send_message(&[ + 0x84, 0x0, stream_id1, stream_id2, 0x08, 0x00, 0x00, 0x00, 0x04, + 0x00, 0x00, 0x00, 0x01 + ]); + } + query => todo!("Unhandled query {query:?}"), + } + } + + op => todo!("Unhandled opcode {op} in message: {message:?}"), + } + } + } + } + }) +} + +struct Connection { + stream: TcpStream, + buffer: BytesMut, +} + +impl Connection { + fn get_message(&mut self) -> Option { + if let Some(message) = self.message_from_buffer() { + return Some(message); + } + + let mut bytes = [0u8; 2048]; + match self.stream.read(&mut bytes).map_err(|e| e.kind()) { + Ok(size) => self.buffer.extend(&bytes[..size]), + Err(std::io::ErrorKind::WouldBlock) => {} + Err(err) => panic!("unexpected error when reading {err}"), + } + + self.message_from_buffer() + } + + fn message_from_buffer(&mut self) -> Option { + match cassandra_protocol::frame::Envelope::check_envelope_size(&self.buffer) { + Ok(message_size) => Some(self.buffer.split_to(message_size).freeze()), + Err(CheckEnvelopeSizeError::NotEnoughBytes) => None, + Err(err) => panic!("envelope error: {err:?}"), + } + } + + fn send_message(&mut self, message: &[u8]) { + self.stream.write_all(message).unwrap(); + } +} From 874998ee8fd90e56e6590fe8d55490cf6749f0c0 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Wed, 25 Jan 2023 17:22:56 +1100 Subject: [PATCH 2/2] Ensure appropriate thread counts are used --- .github/workflows/build_and_test.yaml | 1 + shotover-proxy/examples/cassandra_bench.rs | 2 +- .../examples/cassandra_bench_mocked.rs | 28 +++- .../examples/cassandra_cluster_bench.rs | 2 +- .../examples/cassandra_cluster_flamegraph.rs | 2 +- test-helpers/src/latte.rs | 7 +- test-helpers/src/mock_cassandra.rs | 158 +++++++++++------- test-helpers/src/shotover_process.rs | 13 +- 8 files changed, 140 insertions(+), 73 deletions(-) diff --git a/.github/workflows/build_and_test.yaml b/.github/workflows/build_and_test.yaml index 5a0df9795..4ea0eb3b4 100644 --- a/.github/workflows/build_and_test.yaml +++ b/.github/workflows/build_and_test.yaml @@ -75,6 +75,7 @@ jobs: cargo run --release --example cassandra_bench -- --config-dir example-configs/cassandra-passthrough --rate 1000 cargo run --release --example cassandra_cluster_bench cargo run --release --example cassandra_cluster_flamegraph + cargo run --release --example cassandra_bench_mocked if: ${{ matrix.name == 'Ubuntu 20.04 - Release' }} - name: Ensure that tests did not create or modify any files that arent .gitignore'd run: | diff --git a/shotover-proxy/examples/cassandra_bench.rs b/shotover-proxy/examples/cassandra_bench.rs index a7326c2bf..34784e33f 100644 --- a/shotover-proxy/examples/cassandra_bench.rs +++ b/shotover-proxy/examples/cassandra_bench.rs @@ -22,7 +22,7 @@ async fn main() { test_helpers::bench::init(); let args = Args::parse(); - let latte = Latte::new(args.rate); + let latte = Latte::new(args.rate, 1); let bench = "read"; { let _compose = DockerCompose::new(&format!("{}/docker-compose.yaml", args.config_dir)); diff --git a/shotover-proxy/examples/cassandra_bench_mocked.rs b/shotover-proxy/examples/cassandra_bench_mocked.rs index 1725e99eb..f8e488d02 100644 --- a/shotover-proxy/examples/cassandra_bench_mocked.rs +++ b/shotover-proxy/examples/cassandra_bench_mocked.rs @@ -1,16 +1,36 @@ +use std::num::NonZeroUsize; + use test_helpers::latte::Latte; -use test_helpers::shotover_process::shotover_from_topology_file; +use test_helpers::shotover_process::ShotoverProcessBuilder; + +// This benchmark does not reflect realistic use as we are using a mocked out version of cassandra. +// The purpose of this bench is to create a scenario where shotover is the bottleneck between the client and cassandra. +// This will allow us to observe optimizations that might be too small to reliably show up in a realistic bench. e.g. improvements to parsing speed +// On the other hand, for optimizations that could be dependent on cassandra having such unrealistically low latency, they need to be confirmed in more realistic scenarios. e.g. changes to the way we batch messages in tcp #[tokio::main] async fn main() { test_helpers::bench::init(); - let latte = Latte::new(10000000); + let (shotover_cores, client_cores, db_cores) = + // Assigning multiple cores to the db and client will help ensure that shotover acts as the bottleneck. + // So take advantage of that when we have the cores available to do so. + if std::thread::available_parallelism().unwrap() >= NonZeroUsize::new(8).unwrap() { + (1, 3, 3) + } else { + (1, 1, 1) + }; + + let latte = Latte::new(10000000, client_cores); let config_dir = "example-configs/cassandra-passthrough"; let bench = "read"; { - test_helpers::mock_cassandra::start(9043); - let shotover = shotover_from_topology_file(&format!("{}/topology.yaml", config_dir)).await; + test_helpers::mock_cassandra::start(db_cores, 9043); + let shotover = + ShotoverProcessBuilder::new_with_topology(&format!("{}/topology.yaml", config_dir)) + .with_cores(shotover_cores) + .start() + .await; println!("Benching Shotover ..."); // no need to initialize as cassandra is completely mocked out, just directly start benching. diff --git a/shotover-proxy/examples/cassandra_cluster_bench.rs b/shotover-proxy/examples/cassandra_cluster_bench.rs index 52599c47f..dbdd8cdbc 100644 --- a/shotover-proxy/examples/cassandra_cluster_bench.rs +++ b/shotover-proxy/examples/cassandra_cluster_bench.rs @@ -6,7 +6,7 @@ use test_helpers::shotover_process::ShotoverProcessBuilder; async fn main() { test_helpers::bench::init(); - let latte = Latte::new(10000000); + let latte = Latte::new(10000000, 1); let config_dir = "example-configs/cassandra-cluster-v4"; let bench = "read"; { diff --git a/shotover-proxy/examples/cassandra_cluster_flamegraph.rs b/shotover-proxy/examples/cassandra_cluster_flamegraph.rs index 0d814cf90..d7cead24b 100644 --- a/shotover-proxy/examples/cassandra_cluster_flamegraph.rs +++ b/shotover-proxy/examples/cassandra_cluster_flamegraph.rs @@ -13,7 +13,7 @@ use test_helpers::shotover_process::ShotoverProcessBuilder; async fn main() { test_helpers::bench::init(); - let latte = Latte::new(10000000); + let latte = Latte::new(10000000, 1); let config_dir = "example-configs/cassandra-cluster-v4"; let bench = "read"; { diff --git a/test-helpers/src/latte.rs b/test-helpers/src/latte.rs index 4815b376e..c6179724f 100644 --- a/test-helpers/src/latte.rs +++ b/test-helpers/src/latte.rs @@ -4,10 +4,11 @@ // * write our own benchmark logic pub struct Latte { rate: u64, + threads: u64, } impl Latte { - pub fn new(rate: u64) -> Latte { + pub fn new(rate: u64, threads: u64) -> Latte { crate::docker_compose::run_command( "cargo", &[ @@ -19,7 +20,7 @@ impl Latte { ], ) .unwrap(); - Latte { rate } + Latte { rate, threads } } pub fn init(&self, name: &str, address_load: &str) { @@ -68,6 +69,8 @@ impl Latte { "15s", // default is 60s but 15 seems fine "--connections", "128", // Shotover performs extremely poorly with 1 connection and this is not currently an intended usecase + "--threads", + &self.threads.to_string(), "--output", &format!("{name}-{address_bench}.json"), &format!("examples/{name}.rn"), diff --git a/test-helpers/src/mock_cassandra.rs b/test-helpers/src/mock_cassandra.rs index 88b22a7a7..8d951cb13 100644 --- a/test-helpers/src/mock_cassandra.rs +++ b/test-helpers/src/mock_cassandra.rs @@ -5,72 +5,99 @@ use std::net::{TcpListener, TcpStream}; use std::sync::mpsc; use std::thread::JoinHandle; +struct Worker { + handle: JoinHandle<()>, + connection_tx: mpsc::Sender, +} + // Spawns a single thread which will reply to cassandra messages with dummy responses. // No attempt at correctness is made here, its purely just whatever is enough to get the benchmark to complete // What we want to test is the speed of a basic message moving through shotover -pub fn start(port: u16) -> JoinHandle<()> { +pub fn start(cores: usize, port: u16) -> JoinHandle<()> { std::thread::spawn(move || { - let mut connections = vec![]; - let (connection_tx, connection_rx) = mpsc::channel(); - let _listener = std::thread::spawn(move || { - let listener = TcpListener::bind(("127.0.0.1", port)).unwrap(); + let mut workers = vec![]; - for stream in listener.incoming() { - let stream = stream.unwrap(); - stream.set_nonblocking(true).unwrap(); - stream.set_nodelay(true).unwrap(); - connection_tx - .send(Connection { - stream, - buffer: BytesMut::with_capacity(10000), - }) - .unwrap(); - } - }); + for _ in 0..cores { + workers.push(Worker::spawn()); + } - loop { - for connection in connection_rx.try_iter() { - connections.push(connection); - } - for connection in &mut connections { - while let Some(message) = connection.get_message() { - let stream_id1 = message[2]; - let stream_id2 = message[3]; - match Opcode::try_from(message[4]).unwrap() { - Opcode::Options => connection.send_message(&[ - 0x84, 0x00, stream_id1, stream_id2, 0x06, 0x00, 0x00, 0x00, 0x66, 0x00, - 0x03, 0x00, 0x11, 0x50, 0x52, 0x4f, 0x54, 0x4f, 0x43, 0x4f, 0x4c, 0x5f, - 0x56, 0x45, 0x52, 0x53, 0x49, 0x4f, 0x4e, 0x53, 0x00, 0x04, 0x00, 0x04, - 0x33, 0x2f, 0x76, 0x33, 0x00, 0x04, 0x34, 0x2f, 0x76, 0x34, 0x00, 0x04, - 0x35, 0x2f, 0x76, 0x35, 0x00, 0x09, 0x36, 0x2f, 0x76, 0x36, 0x2d, 0x62, - 0x65, 0x74, 0x61, 0x00, 0x0b, 0x43, 0x4f, 0x4d, 0x50, 0x52, 0x45, 0x53, - 0x53, 0x49, 0x4f, 0x4e, 0x00, 0x02, 0x00, 0x06, 0x73, 0x6e, 0x61, 0x70, - 0x70, 0x79, 0x00, 0x03, 0x6c, 0x7a, 0x34, 0x00, 0x0b, 0x43, 0x51, 0x4c, - 0x5f, 0x56, 0x45, 0x52, 0x53, 0x49, 0x4f, 0x4e, 0x00, 0x01, 0x00, 0x05, - 0x33, 0x2e, 0x34, 0x2e, 0x35, - ]), - Opcode::Startup | Opcode::Register => connection - .send_message(&[0x84, 0, stream_id1, stream_id2, 2, 0, 0, 0, 0]), - Opcode::Prepare => connection.send_message(&[ - 0x84, 0x00, stream_id1, stream_id2, 0x08, 0x00, 0x00, 0x00, 0x54, 0x00, - 0x00, 0x00, 0x04, 0x00, 0x10, 0x73, 0x8d, 0x0d, 0x1d, 0x8d, 0xcd, 0xf4, - 0x7a, 0xbe, 0x14, 0xb6, 0x16, 0xb6, 0x29, 0xaa, 0xce, 0x00, 0x00, 0x00, - 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, - 0x05, 0x6c, 0x61, 0x74, 0x74, 0x65, 0x00, 0x05, 0x62, 0x61, 0x73, 0x69, - 0x63, 0x00, 0x02, 0x69, 0x64, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, - 0x00, 0x00, 0x01, 0x00, 0x05, 0x6c, 0x61, 0x74, 0x74, 0x65, 0x00, 0x05, - 0x62, 0x61, 0x73, 0x69, 0x63, 0x00, 0x02, 0x69, 0x64, 0x00, 0x02, - ]), - // Just assume this is the query that is being benchmarked - Opcode::Execute => connection.send_message(&[ - 0x84, 0x00, stream_id1, stream_id2, 0x08, 0x00, 0x00, 0x00, 0x04, 0x00, - 0x00, 0x00, 0x01, - ]), - Opcode::Query => { - let query_len = - u32::from_be_bytes(message[9..13].try_into().unwrap()) as usize; - let query = std::str::from_utf8(&message[13..13 + query_len]).unwrap(); - match query { + let listener = TcpListener::bind(("127.0.0.1", port)).unwrap(); + + let mut worker_index = 0; + for stream in listener.incoming() { + let stream = stream.unwrap(); + stream.set_nonblocking(true).unwrap(); + stream.set_nodelay(true).unwrap(); + + // This allocation algorithm relies on the fact that the bencher will equally distribute + // load accross all connections without terminating any for the length of the bench + workers[worker_index] + .connection_tx + .send(Connection { + stream, + buffer: BytesMut::with_capacity(10000), + }) + .unwrap(); + worker_index = (worker_index + 1) % workers.len(); + } + + for worker in workers { + worker.handle.join().unwrap(); + } + }) +} + +impl Worker { + fn spawn() -> Worker { + let (connection_tx, connection_rx) = mpsc::channel(); + let handle = std::thread::spawn(move || { + let mut connections: Vec = vec![]; + loop { + for connection in connection_rx.try_iter() { + connections.push(connection); + } + for connection in &mut connections { + while let Some(message) = connection.get_message() { + let stream_id1 = message[2]; + let stream_id2 = message[3]; + match Opcode::try_from(message[4]).unwrap() { + Opcode::Options => connection.send_message(&[ + 0x84, 0x00, stream_id1, stream_id2, 0x06, 0x00, 0x00, 0x00, 0x66, + 0x00, 0x03, 0x00, 0x11, 0x50, 0x52, 0x4f, 0x54, 0x4f, 0x43, 0x4f, + 0x4c, 0x5f, 0x56, 0x45, 0x52, 0x53, 0x49, 0x4f, 0x4e, 0x53, 0x00, + 0x04, 0x00, 0x04, 0x33, 0x2f, 0x76, 0x33, 0x00, 0x04, 0x34, 0x2f, + 0x76, 0x34, 0x00, 0x04, 0x35, 0x2f, 0x76, 0x35, 0x00, 0x09, 0x36, + 0x2f, 0x76, 0x36, 0x2d, 0x62, 0x65, 0x74, 0x61, 0x00, 0x0b, 0x43, + 0x4f, 0x4d, 0x50, 0x52, 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x00, + 0x02, 0x00, 0x06, 0x73, 0x6e, 0x61, 0x70, 0x70, 0x79, 0x00, 0x03, + 0x6c, 0x7a, 0x34, 0x00, 0x0b, 0x43, 0x51, 0x4c, 0x5f, 0x56, 0x45, + 0x52, 0x53, 0x49, 0x4f, 0x4e, 0x00, 0x01, 0x00, 0x05, 0x33, 0x2e, + 0x34, 0x2e, 0x35, + ]), + Opcode::Startup | Opcode::Register => connection + .send_message(&[0x84, 0, stream_id1, stream_id2, 2, 0, 0, 0, 0]), + Opcode::Prepare => connection.send_message(&[ + 0x84, 0x00, stream_id1, stream_id2, 0x08, 0x00, 0x00, 0x00, 0x54, + 0x00, 0x00, 0x00, 0x04, 0x00, 0x10, 0x73, 0x8d, 0x0d, 0x1d, 0x8d, + 0xcd, 0xf4, 0x7a, 0xbe, 0x14, 0xb6, 0x16, 0xb6, 0x29, 0xaa, 0xce, + 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, + 0x01, 0x00, 0x00, 0x00, 0x05, 0x6c, 0x61, 0x74, 0x74, 0x65, 0x00, + 0x05, 0x62, 0x61, 0x73, 0x69, 0x63, 0x00, 0x02, 0x69, 0x64, 0x00, + 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x05, + 0x6c, 0x61, 0x74, 0x74, 0x65, 0x00, 0x05, 0x62, 0x61, 0x73, 0x69, + 0x63, 0x00, 0x02, 0x69, 0x64, 0x00, 0x02, + ]), + // Just assume this is the query that is being benchmarked + Opcode::Execute => connection.send_message(&[ + 0x84, 0x00, stream_id1, stream_id2, 0x08, 0x00, 0x00, 0x00, 0x04, + 0x00, 0x00, 0x00, 0x01, + ]), + Opcode::Query => { + let query_len = + u32::from_be_bytes(message[9..13].try_into().unwrap()) as usize; + let query = + std::str::from_utf8(&message[13..13 + query_len]).unwrap(); + match query { "select peer, data_center, rack, tokens from system.peers" => { connection.send_message(&[ 0x84, 0x00, stream_id1, stream_id2, 0x08, 0x00, 0x00, 0x00, @@ -116,14 +143,19 @@ pub fn start(port: u16) -> JoinHandle<()> { } query => todo!("Unhandled query {query:?}"), } - } + } - op => todo!("Unhandled opcode {op} in message: {message:?}"), + op => todo!("Unhandled opcode {op} in message: {message:?}"), + } } } } + }); + Worker { + handle, + connection_tx, } - }) + } } struct Connection { diff --git a/test-helpers/src/shotover_process.rs b/test-helpers/src/shotover_process.rs index 367b0cacd..c88c400af 100644 --- a/test-helpers/src/shotover_process.rs +++ b/test-helpers/src/shotover_process.rs @@ -7,6 +7,7 @@ pub use tokio_bin_process::BinProcess; pub struct ShotoverProcessBuilder { topology_path: String, log_name: Option, + cores: Option, } impl ShotoverProcessBuilder { @@ -14,6 +15,7 @@ impl ShotoverProcessBuilder { Self { topology_path: topology_path.to_owned(), log_name: None, + cores: None, } } @@ -22,10 +24,19 @@ impl ShotoverProcessBuilder { self } + pub fn with_cores(mut self, cores: u32) -> Self { + self.cores = Some(cores.to_string()); + self + } + pub async fn start(&self) -> BinProcess { + let mut args = vec!["-t", &self.topology_path, "--log-format", "json"]; + if let Some(cores) = &self.cores { + args.extend(["--core-threads", cores]); + } let mut shotover = BinProcess::start_with_args( "shotover-proxy", - &["-t", &self.topology_path, "--log-format", "json"], + &args, self.log_name.as_deref().unwrap_or("shotover"), ) .await;