From ecdaf44c95de0c6132b9ba6a19f67a406f6750bb Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sat, 20 Aug 2022 20:36:44 -0500 Subject: [PATCH 01/17] wip --- Cargo.lock | 227 +++++++++++++++++++++-- Cargo.toml | 6 +- src/admin.rs | 70 ++++--- src/client.rs | 6 +- src/config.rs | 32 +--- src/main.rs | 28 ++- src/pool.rs | 45 ++--- tests/config.rs | 1 + tests/errors.rs | 1 + tests/integration_test.rs | 372 ++++++++++++++++++++++++++++++++++++++ tests/tls.rs | 1 + 11 files changed, 683 insertions(+), 106 deletions(-) create mode 120000 tests/config.rs create mode 120000 tests/errors.rs create mode 100644 tests/integration_test.rs create mode 120000 tests/tls.rs diff --git a/Cargo.lock b/Cargo.lock index 2e20f709..521db986 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -60,7 +60,7 @@ dependencies = [ "async-trait", "futures-channel", "futures-util", - "parking_lot", + "parking_lot 0.11.2", "tokio", ] @@ -85,6 +85,12 @@ version = "3.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37ccbd214614c6783386c1af30caf03192f17891059cecc394b4fb119e363de3" +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + [[package]] name = "bytes" version = "1.1.0" @@ -159,48 +165,102 @@ dependencies = [ "termcolor", ] +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + [[package]] name = "fnv" version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "futures" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab30e97ab6aacfe635fad58f22c2bb06c8b685f7421eb1e064a729e2a5f481fa" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" -version = "0.3.19" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba3dda0b6588335f360afc675d0564c17a77a2bda81ca178a4b6081bd86c7f0b" +checksum = "2bfc52cbddcfd745bf1740338492bb0bd83d76c67b445f91c5fb29fae29ecaa1" dependencies = [ "futures-core", + "futures-sink", ] [[package]] name = "futures-core" -version = "0.3.19" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0c8ff0461b82559810cdccfde3215c3f373807f5e5232b71479bff7bb2583d7" +checksum = "d2acedae88d38235936c3922476b10fced7b2b68136f5e3c03c2d5be348a1115" + +[[package]] +name = "futures-executor" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d11aa21b5b587a64682c0094c2bdd4df0076c5324961a40cc3abd7f37930528" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93a66fc6d035a26a3ae255a6d2bca35eda63ae4c5512bef54449113f7a1228e5" + +[[package]] +name = "futures-macro" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0db9cce532b0eae2ccf2766ab246f114b56b9cf6d445e00c2549fbc100ca045d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] [[package]] name = "futures-sink" -version = "0.3.21" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" +checksum = "ca0bae1fe9752cf7fd9b0064c674ae63f97b37bc714d745cbde0afb7ec4e6765" [[package]] name = "futures-task" -version = "0.3.19" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ee7c6485c30167ce4dfb83ac568a849fe53274c831081476ee13e0dce1aad72" +checksum = "842fc63b931f4056a24d59de13fb1272134ce261816e063e634ad0c15cdc5306" [[package]] name = "futures-util" -version = "0.3.19" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b5cf40b47a271f77a8b1bec03ca09044d99d2372c0de244e66430761127164" +checksum = "f0828a5471e340229c11c77ca80017937ce3c58cb788a17e5f1c2d5c485a9577" dependencies = [ "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -487,7 +547,17 @@ checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" dependencies = [ "instant", "lock_api", - "parking_lot_core", + "parking_lot_core 0.8.5", +] + +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core 0.9.3", ] [[package]] @@ -504,6 +574,25 @@ dependencies = [ "winapi", ] +[[package]] +name = "parking_lot_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-sys", +] + +[[package]] +name = "percent-encoding" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" + [[package]] name = "pgcat" version = "0.6.0-alpha1" @@ -521,8 +610,9 @@ dependencies = [ "md-5", "num_cpus", "once_cell", - "parking_lot", + "parking_lot 0.11.2", "phf", + "postgres", "rand", "regex", "rustls-pemfile", @@ -593,6 +683,49 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "postgres" +version = "0.19.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8bbcd5f6deb39585a0d9f4ef34c4a41c25b7ad26d23c75d837d78c8e7adc85f" +dependencies = [ + "bytes", + "fallible-iterator", + "futures", + "log", + "tokio", + "tokio-postgres", +] + +[[package]] +name = "postgres-protocol" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "878c6cbf956e03af9aa8204b407b9cbf47c072164800aa918c516cd4b056c50c" +dependencies = [ + "base64", + "byteorder", + "bytes", + "fallible-iterator", + "hmac", + "md-5", + "memchr", + "rand", + "sha2", + "stringprep", +] + +[[package]] +name = "postgres-types" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebd6e8b7189a73169290e89bd24c771071f1012d8fe6f738f5226531f0b03d89" +dependencies = [ + "bytes", + "fallible-iterator", + "postgres-protocol", +] + [[package]] name = "ppv-lite86" version = "0.2.16" @@ -906,7 +1039,7 @@ dependencies = [ "mio", "num_cpus", "once_cell", - "parking_lot", + "parking_lot 0.11.2", "pin-project-lite", "signal-hook-registry", "tokio-macros", @@ -924,6 +1057,29 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-postgres" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19c88a47a23c5d2dc9ecd28fb38fba5fc7e5ddc1fe64488ec145076b0c71c8ae" +dependencies = [ + "async-trait", + "byteorder", + "bytes", + "fallible-iterator", + "futures", + "log", + "parking_lot 0.12.1", + "percent-encoding", + "phf", + "pin-project-lite", + "postgres-protocol", + "postgres-types", + "socket2", + "tokio", + "tokio-util", +] + [[package]] name = "tokio-rustls" version = "0.23.4" @@ -1161,3 +1317,46 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" +dependencies = [ + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" + +[[package]] +name = "windows_i686_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" + +[[package]] +name = "windows_i686_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" diff --git a/Cargo.toml b/Cargo.toml index 37370244..537cdac3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,8 +3,6 @@ name = "pgcat" version = "0.6.0-alpha1" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] tokio = { version = "1", features = ["full"] } bytes = "1" @@ -33,3 +31,7 @@ tokio-rustls = "0.23" rustls-pemfile = "1" hyper = { version = "0.14", features = ["full"] } phf = { version = "0.10", features = ["macros"] } + +[dev-dependencies] +postgres = "0.19.3" + diff --git a/src/admin.rs b/src/admin.rs index 5c820ee7..4718169d 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -3,12 +3,12 @@ use bytes::{Buf, BufMut, BytesMut}; use log::{info, trace}; use std::collections::HashMap; -use crate::config::{get_config, reload_config, VERSION}; +use crate::config::{get_config, VERSION}; use crate::errors::Error; -use crate::messages::*; use crate::pool::get_all_pools; use crate::stats::get_stats; use crate::ClientServerMap; +use crate::{messages::*, reload_config}; pub fn generate_server_info_for_admin() -> BytesMut { let mut server_info = BytesMut::new(); @@ -44,32 +44,45 @@ where trace!("Admin query: {}", query); - if query.starts_with("SHOW STATS") { - trace!("SHOW STATS"); - show_stats(stream).await - } else if query.starts_with("RELOAD") { - trace!("RELOAD"); - reload(stream, client_server_map).await - } else if query.starts_with("SHOW CONFIG") { - trace!("SHOW CONFIG"); - show_config(stream).await - } else if query.starts_with("SHOW DATABASES") { - trace!("SHOW DATABASES"); - show_databases(stream).await - } else if query.starts_with("SHOW POOLS") { - trace!("SHOW POOLS"); - show_pools(stream).await - } else if query.starts_with("SHOW LISTS") { - trace!("SHOW LISTS"); - show_lists(stream).await - } else if query.starts_with("SHOW VERSION") { - trace!("SHOW VERSION"); - show_version(stream).await - } else if query.starts_with("SET ") { - trace!("SET"); - ignore_set(stream).await - } else { - error_response(stream, "Unsupported query against the admin database").await + let query_parts: Vec<&str> = query.trim_end_matches(';').split_whitespace().collect(); + + match query_parts[0] { + "RELOAD" => { + trace!("RELOAD"); + reload(stream, client_server_map).await + } + "SET" => { + trace!("SET"); + ignore_set(stream).await + } + "SHOW" => match query_parts[1] { + "CONFIG" => { + trace!("SHOW CONFIG"); + show_config(stream).await + } + "DATABASES" => { + trace!("SHOW DATABASES"); + show_databases(stream).await + } + "LISTS" => { + trace!("SHOW LISTS"); + show_lists(stream).await + } + "POOLS" => { + trace!("SHOW POOLS"); + show_pools(stream).await + } + "STATS" => { + trace!("SHOW STATS"); + show_stats(stream).await + } + "VERSION" => { + trace!("SHOW VERSION"); + show_version(stream).await + } + _ => error_response(stream, "Unsupported SHOW query against the admin database").await, + }, + _ => error_response(stream, "Unsupported query against the admin database").await, } } @@ -174,6 +187,7 @@ where let columns = vec![ ("database", DataType::Text), ("user", DataType::Text), + ("cl_idle", DataType::Numeric), ("cl_active", DataType::Numeric), ("cl_waiting", DataType::Numeric), ("cl_cancel_req", DataType::Numeric), diff --git a/src/client.rs b/src/client.rs index b36eae09..24a2e5e2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -499,7 +499,6 @@ where // The query router determines where the query is going to go, // e.g. primary, replica, which shard. let mut query_router = QueryRouter::new(); - let mut round_robin = rand::random(); // Our custom protocol loop. // We expect the client to either start a transaction with regular queries @@ -634,8 +633,7 @@ where .get( query_router.shard(), query_router.role(), - self.process_id, - round_robin, + self.process_id ) .await { @@ -655,8 +653,6 @@ where let address = connection.1; let server = &mut *reference; - round_robin += 1; - // Server is assigned to the client in case the client wants to // cancel a query later. server.claim(self.process_id, self.secret_key); diff --git a/src/config.rs b/src/config.rs index ae006b39..6dd2cec1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -13,7 +13,6 @@ use toml; use crate::errors::Error; use crate::tls::{load_certs, load_keys}; -use crate::{ClientServerMap, ConnectionPool}; pub const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -64,6 +63,8 @@ pub struct Address { pub database: String, pub role: Role, pub replica_number: usize, + pub username: String, + pub poolname: String, } impl Default for Address { @@ -76,6 +77,8 @@ impl Default for Address { replica_number: 0, database: String::from("database"), role: Role::Replica, + username: String::from("username"), + poolname: String::from("poolname"), } } } @@ -84,11 +87,11 @@ impl Address { /// Address name (aka database) used in `SHOW STATS`, `SHOW DATABASES`, and `SHOW POOLS`. pub fn name(&self) -> String { match self.role { - Role::Primary => format!("{}_shard_{}_primary", self.database, self.shard), + Role::Primary => format!("{}_shard_{}_primary", self.poolname, self.shard), Role::Replica => format!( "{}_shard_{}_replica_{}", - self.database, self.shard, self.replica_number + self.poolname, self.shard, self.replica_number ), } } @@ -521,28 +524,6 @@ pub async fn parse(path: &str) -> Result<(), Error> { Ok(()) } -pub async fn reload_config(client_server_map: ClientServerMap) -> Result { - let old_config = get_config(); - match parse(&old_config.path).await { - Ok(()) => (), - Err(err) => { - error!("Config reload error: {:?}", err); - return Err(Error::BadConfig); - } - }; - let new_config = get_config(); - - if old_config.pools != new_config.pools { - info!("Pool configuration changed, re-creating server pools"); - ConnectionPool::from_config(client_server_map).await?; - Ok(true) - } else if old_config != new_config { - Ok(true) - } else { - Ok(false) - } -} - #[cfg(test)] mod test { use super::*; @@ -552,7 +533,6 @@ mod test { parse("pgcat.toml").await.unwrap(); assert_eq!(get_config().path, "pgcat.toml".to_string()); - assert_eq!(get_config().general.ban_time, 60); assert_eq!(get_config().pools.len(), 2); assert_eq!(get_config().pools["sharded_db"].shards.len(), 3); diff --git a/src/main.rs b/src/main.rs index db934747..d49e16bf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -36,6 +36,7 @@ extern crate tokio; extern crate tokio_rustls; extern crate toml; +use config::parse; use log::{debug, error, info}; use parking_lot::Mutex; use tokio::net::TcpListener; @@ -65,7 +66,8 @@ mod sharding; mod stats; mod tls; -use crate::config::{get_config, reload_config, VERSION}; +use crate::config::{get_config, VERSION}; +use crate::errors::Error; use crate::pool::{ClientServerMap, ConnectionPool}; use crate::prometheus::start_metric_server; use crate::stats::{Collector, Reporter, REPORTER}; @@ -133,7 +135,7 @@ async fn main() { let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new())); // Statistics reporting. - let (tx, rx) = mpsc::channel(100); + let (tx, rx) = mpsc::channel(100_000); REPORTER.store(Arc::new(Reporter::new(tx.clone()))); // Connection pool that allows to query all shards and replicas. @@ -340,3 +342,25 @@ fn format_duration(duration: &chrono::Duration) -> String { format!("{}d {}:{}:{}", days, hours, minutes, seconds) } + +async fn reload_config(client_server_map: ClientServerMap) -> Result { + let old_config = get_config(); + match parse(&old_config.path).await { + Ok(()) => (), + Err(err) => { + error!("Config reload error: {:?}", err); + return Err(Error::BadConfig); + } + }; + let new_config = get_config(); + + if old_config.pools != new_config.pools { + info!("Pool configuration changed, re-creating server pools"); + ConnectionPool::from_config(client_server_map).await?; + Ok(true) + } else if old_config != new_config { + Ok(true) + } else { + Ok(false) + } +} diff --git a/src/pool.rs b/src/pool.rs index 76445c8f..f76f5b6f 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -114,12 +114,14 @@ impl ConnectionPool { let address = Address { id: address_id, - database: pool_name.clone(), + database: shard.database.clone(), host: server.0.clone(), port: server.1.to_string(), role: role, replica_number, shard: shard_idx.parse::().unwrap(), + username: user_info.username.clone(), + poolname: pool_name.clone(), }; address_id += 1; @@ -199,16 +201,9 @@ impl ConnectionPool { /// the pooler starts up. async fn validate(&mut self) -> Result<(), Error> { let mut server_infos = Vec::new(); - let stats = self.stats.clone(); - for shard in 0..self.shards() { - let mut round_robin = 0; - - for _ in 0..self.servers(shard) { - // To keep stats consistent. - let fake_process_id = 0; - - let connection = match self.get(shard, None, fake_process_id, round_robin).await { + for index in 0..self.servers(shard) { + let connection = match self.databases[shard][index].get().await { Ok(conn) => conn, Err(err) => { error!("Shard {} down or misconfigured: {:?}", shard, err); @@ -216,25 +211,20 @@ impl ConnectionPool { } }; - let proxy = connection.0; - let address = connection.1; + let proxy = connection; let server = &*proxy; let server_info = server.server_info(); - stats.client_disconnecting(fake_process_id, address.id); - if server_infos.len() > 0 { // Compare against the last server checked. if server_info != server_infos[server_infos.len() - 1] { warn!( "{:?} has different server configuration than the last server", - address + proxy.address() ); } } - server_infos.push(server_info); - round_robin += 1; } } @@ -252,14 +242,14 @@ impl ConnectionPool { /// Get a connection from the pool. pub async fn get( &self, - shard: usize, // shard number - role: Option, // primary or replica - process_id: i32, // client id - mut round_robin: usize, // round robin offset + shard: usize, // shard number + role: Option, // primary or replica + process_id: i32, // client id ) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> { let now = Instant::now(); let addresses = &self.addresses[shard]; + let mut allowed_attempts = match role { // Primary-specific queries get one attempt, if the primary is down, // nothing we should do about it I think. It's dangerous to retry @@ -287,10 +277,7 @@ impl ConnectionPool { let healthcheck_delay = get_config().general.healthcheck_delay as u128; while allowed_attempts > 0 { - // Round-robin replicas. - round_robin += 1; - - let index = round_robin % addresses.len(); + let index = rand::random::() % addresses.len(); let address = &addresses[index]; // Make sure you're getting a primary or a replica @@ -300,13 +287,13 @@ impl ConnectionPool { continue; } - allowed_attempts -= 1; - // Don't attempt to connect to banned servers. if self.is_banned(address, shard, role) { continue; } + allowed_attempts -= 1; + // Indicate we're waiting on a server connection from a pool. self.stats.client_waiting(process_id, address.id); @@ -333,7 +320,7 @@ impl ConnectionPool { if !require_healthcheck { self.stats .checkout_time(now.elapsed().as_micros(), process_id, address.id); - self.stats.server_idle(conn.process_id(), address.id); + self.stats.server_active(conn.process_id(), address.id); return Ok((conn, address.clone())); } @@ -352,7 +339,7 @@ impl ConnectionPool { Ok(_) => { self.stats .checkout_time(now.elapsed().as_micros(), process_id, address.id); - self.stats.server_idle(conn.process_id(), address.id); + self.stats.server_active(conn.process_id(), address.id); return Ok((conn, address.clone())); } diff --git a/tests/config.rs b/tests/config.rs new file mode 120000 index 00000000..f16cf01b --- /dev/null +++ b/tests/config.rs @@ -0,0 +1 @@ +../src/config.rs \ No newline at end of file diff --git a/tests/errors.rs b/tests/errors.rs new file mode 120000 index 00000000..e1746501 --- /dev/null +++ b/tests/errors.rs @@ -0,0 +1 @@ +../src/errors.rs \ No newline at end of file diff --git a/tests/integration_test.rs b/tests/integration_test.rs new file mode 100644 index 00000000..5d5b679b --- /dev/null +++ b/tests/integration_test.rs @@ -0,0 +1,372 @@ +mod config; +mod errors; +mod tls; + +use config::Config; +use core::time; +use postgres::{Client, NoTls}; +use rand::{distributions::Alphanumeric, Rng}; +use std::{ + collections::HashMap, + fmt, fs, + io::Write, + process::{Child, Command, Stdio}, + thread::sleep, +}; + +pub const BASE_CONFIG: &str = "./pgcat.toml"; + +pub fn get_base_config() -> Config { + return toml::from_str(fs::read_to_string(BASE_CONFIG).unwrap().as_str()).unwrap(); +} + +pub struct PgcatInstance { + process: Child, + port: i16, + log_file: String, + config_filename: String, + last_query_count: usize, +} +impl PgcatInstance { + pub fn start(log_level: log::Level, config: Option) -> PgcatInstance { + let base_filename: String = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(7) + .map(char::from) + .collect(); + let config_filename = format!("/tmp/pgcat_{}.temp.toml", base_filename); + let log_filename = format!("/tmp/pgcat_{}.out", base_filename); + + let file = fs::File::create(log_filename.clone()).unwrap(); + let stdio = Stdio::from(file); + + let port = match config { + Some(mut cfg) => { + let mut file = fs::File::create(config_filename.clone()).unwrap(); + cfg.path = config_filename.clone(); + file.write_all(toml::to_string(&cfg).unwrap().as_bytes()) + .unwrap(); + cfg.general.port + } + None => { + fs::copy(BASE_CONFIG, config_filename.clone()).unwrap(); + let mut cfg = + toml::from_str::(&fs::read_to_string(config_filename.clone()).unwrap()) + .unwrap(); + cfg.path = config_filename.clone(); + let mut file = fs::File::create(config_filename.clone()).unwrap(); + file.write_all(toml::to_string(&cfg).unwrap().as_bytes()) + .unwrap(); + cfg.general.port + } + }; + + let child = Command::new("./target/debug/pgcat") + .env("RUST_LOG", log_level.as_str()) + .arg(config_filename.clone()) + .stderr(stdio) + .spawn() + .unwrap(); + println!( + "Started pgcat instance PID({:?}), LogFile({:?}), ConfigFile({:?})", + child.id(), + log_filename, + config_filename + ); + + let mut instance = PgcatInstance { + process: child, + port: port, + log_file: log_filename, + config_filename: config_filename.clone(), + last_query_count: 0, + }; + instance.wait_until_ready(); + return instance; + } + + pub fn current_config(&self) -> Config { + return toml::from_str( + fs::read_to_string(self.config_filename.clone()) + .unwrap() + .as_str(), + ) + .unwrap(); + } + + pub fn get_total_query_count(&self) -> usize { + self.get_logs().matches("talking to server Address").count() + } + + pub fn begin_recording_query_count(&mut self) { + self.last_query_count = self.get_logs().matches("Sending query").count(); + } + + pub fn end_recording_query_count(&mut self) -> usize { + return self.get_logs().matches("Sending query").count() - self.last_query_count; + } + + pub fn get_logs(&self) -> String { + fs::read_to_string(self.log_file.clone()).unwrap() + } + + pub fn update_config(&self, new_config: Config) { + let mut file = fs::File::create(self.config_filename.clone()).unwrap(); + file.write_all(toml::to_string(&new_config).unwrap().as_bytes()) + .unwrap(); + } + + pub fn reload_config(&self) { + Command::new("kill") + .args(["-s", "HUP", self.process.id().to_string().as_str()]) + .spawn() + .unwrap() + .wait() + .unwrap(); + } + + pub fn stop(&mut self) { + self.process.kill(); + } + + pub fn wait_until_ready(&self) { + let conn_str = format!( + "postgres://sharding_user:sharding_user@localhost:{}/shard0", + self.port + ); + for _ in 0..10 { + if Client::connect(&conn_str, NoTls).is_ok() { + return; + } + sleep(time::Duration::from_millis(500)); + } + panic!("Server was never ready!"); + } +} +impl Drop for PgcatInstance { + fn drop(&mut self) { + self.stop(); + fs::remove_file(self.config_filename.clone()).unwrap(); + fs::remove_file(self.log_file.clone()).unwrap(); + } +} + +pub fn single_shard_setup( + num_replica: u8, + db: String, + base_config: Option, +) -> (PgcatInstance, Vec) { + let mut pgclowder = vec![]; + let mut main_server_entries = vec![]; + let primary_pg_address = ( + String::from("localhost"), + 5432 as u16, + String::from("primary"), + ); + let replica_pg_address = ( + String::from("localhost"), + 5432 as u16, + String::from("replica"), + ); + + let mut base_cfg = match base_config { + Some(cfg) => cfg, + None => Config::default(), + }; + let user = config::User { + username: String::from("sharding_user"), + password: String::from("sharding_user"), + pool_size: 5, + statement_timeout: 100000, + }; + base_cfg.pools.insert(db.clone(), config::Pool::default()); + base_cfg + .pools + .get_mut(&db.clone()) + .unwrap() + .users + .insert(String::from("sharding_user"), user); + + for i in 0..num_replica + 1 { + let mut cfg = base_cfg.clone(); + cfg.general.port = 10000 + i as i16; + let (local_address, main_address) = match i { + 0 => ( + primary_pg_address.clone(), + ( + String::from("localhost"), + cfg.general.port as u16, + String::from("primary"), + ), + ), + _ => ( + replica_pg_address.clone(), + ( + String::from("localhost"), + cfg.general.port as u16, + String::from("replica"), + ), + ), + }; + + cfg.pools.get_mut(&db.clone()).unwrap().shards = HashMap::from([( + String::from("0"), + config::Shard { + database: db.clone(), + servers: vec![local_address], + }, + )]); + main_server_entries.push(main_address); + pgclowder.push(PgcatInstance::start(log::Level::Debug, Some(cfg))) + } + let mut main_cfg = base_cfg.clone(); + main_cfg.general.port = 6432; + main_cfg.pools.get_mut(&db.clone()).unwrap().shards = HashMap::from([( + String::from("0"), + config::Shard { + database: String::from("shard0"), + // Server entries point to other Pgcat processes + servers: main_server_entries, + }, + )]); + return ( + PgcatInstance::start(log::Level::Trace, Some(main_cfg)), + pgclowder, + ); +} + +fn value_within_range(target_value: i32, value: i32) { + if value == target_value { + return; + } + // Allow a buffer of 15% around the target value + let buffer = ((target_value as f32) * 0.15) as i32; + let start = target_value - buffer; + let end = target_value + buffer; + let result = if (start..end).contains(&value) { + "Pass" + } else { + "Fail" + }; + + println!( + "Expecting {} to fall between {} and {} ... {}", + value, start, end, result + ); + assert!((start..end).contains(&value)); +} + +#[test] +fn test_basic_load_balancing() { + let (_main_pgcat, mut proxy_instances) = single_shard_setup(3, String::from("shard0"), None); + let mut client = Client::connect( + "postgres://sharding_user:sharding_user@localhost:6432/shard0", + NoTls, + ) + .unwrap(); + + for proxy in proxy_instances.iter_mut() { + proxy.begin_recording_query_count(); + } + let total_count = 5000; + for _ in 0..total_count { + if client.simple_query("SELECT 1").is_err() { + client = Client::connect( + "postgres://sharding_user:sharding_user@localhost:6432/shard0", + NoTls, + ) + .unwrap(); + continue; + } + } + + let expected_share = total_count / 4; + for proxy in proxy_instances.iter_mut() { + value_within_range(expected_share, proxy.end_recording_query_count() as i32); + } +} + +#[test] +fn test_failover_load_balancing() { + let (_main_pgcat, mut proxy_instances) = single_shard_setup(3, String::from("shard0"), None); + + let mut replica2 = proxy_instances.pop().unwrap(); + let mut replica1 = proxy_instances.pop().unwrap(); + let mut replica0 = proxy_instances.pop().unwrap(); + let mut primary = proxy_instances.pop().unwrap(); + + primary.begin_recording_query_count(); + replica0.begin_recording_query_count(); + replica1.begin_recording_query_count(); + replica2.begin_recording_query_count(); + + replica1.stop(); + + let total_count = 2000; + let mut client = Client::connect( + "postgres://sharding_user:sharding_user@localhost:6432/shard0", + NoTls, + ) + .unwrap(); + for _ in 0..total_count { + if client.simple_query("SELECT 1").is_err() { + client = Client::connect( + "postgres://sharding_user:sharding_user@localhost:6432/shard0", + NoTls, + ) + .unwrap(); + continue; + } + } + let expected_share = total_count / 3; + + value_within_range(expected_share, primary.end_recording_query_count() as i32); + value_within_range(expected_share, replica0.end_recording_query_count() as i32); + value_within_range(0, replica1.end_recording_query_count() as i32); + value_within_range(expected_share, replica2.end_recording_query_count() as i32); +} + +#[test] +fn test_load_balancing_with_query_routing() { + let (main_pgcat, mut proxy_instances) = single_shard_setup(3, String::from("shard0"), None); + let mut cfg = main_pgcat.current_config(); + cfg.pools.get_mut("shard0").unwrap().primary_reads_enabled = false; + cfg.pools.get_mut("shard0").unwrap().query_parser_enabled = true; + cfg.pools.get_mut("shard0").unwrap().default_role = String::from("auto"); + main_pgcat.update_config(cfg); + main_pgcat.reload_config(); + + let mut replica2 = proxy_instances.pop().unwrap(); + let mut replica1 = proxy_instances.pop().unwrap(); + let mut replica0 = proxy_instances.pop().unwrap(); + let mut primary = proxy_instances.pop().unwrap(); + + primary.begin_recording_query_count(); + replica0.begin_recording_query_count(); + replica1.begin_recording_query_count(); + replica2.begin_recording_query_count(); + + let mut client = Client::connect( + "postgres://sharding_user:sharding_user@localhost:6432/shard0", + NoTls, + ) + .unwrap(); + client.simple_query("SET SERVER ROLE TO 'auto'").unwrap(); + + let total_count = 2000; + for _ in 0..total_count { + if client.simple_query("SELECT 1").is_err() { + client = Client::connect( + "postgres://sharding_user:sharding_user@localhost:6432/shard0", + NoTls, + ) + .unwrap(); + continue; + } + } + let expected_share = total_count / 3; + value_within_range(expected_share, replica0.end_recording_query_count() as i32); + value_within_range(expected_share, replica1.end_recording_query_count() as i32); + value_within_range(expected_share, replica2.end_recording_query_count() as i32); + value_within_range(0, primary.end_recording_query_count() as i32); +} diff --git a/tests/tls.rs b/tests/tls.rs new file mode 120000 index 00000000..0af4b183 --- /dev/null +++ b/tests/tls.rs @@ -0,0 +1 @@ +../src/tls.rs \ No newline at end of file From 6c93deaac40c6ef0180bd9e2b2135705c9a11cac Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 21 Aug 2022 06:37:41 -0500 Subject: [PATCH 02/17] revert some' --- src/client.rs | 6 +- src/config.rs | 24 +++ src/main.rs | 26 +-- src/pool.rs | 380 +++++++++++++++++++++++++++++++++++++- tests/config.rs | 1 - tests/errors.rs | 1 - tests/integration_test.rs | 372 ------------------------------------- tests/tls.rs | 1 - 8 files changed, 402 insertions(+), 409 deletions(-) delete mode 120000 tests/config.rs delete mode 120000 tests/errors.rs delete mode 100644 tests/integration_test.rs delete mode 120000 tests/tls.rs diff --git a/src/client.rs b/src/client.rs index 77b1cb6c..0c553f83 100644 --- a/src/client.rs +++ b/src/client.rs @@ -630,11 +630,7 @@ where // Grab a server from the pool. let connection = match pool - .get( - query_router.shard(), - query_router.role(), - self.process_id - ) + .get(query_router.shard(), query_router.role(), self.process_id) .await { Ok(conn) => { diff --git a/src/config.rs b/src/config.rs index 6dd2cec1..ed338104 100644 --- a/src/config.rs +++ b/src/config.rs @@ -13,6 +13,7 @@ use toml; use crate::errors::Error; use crate::tls::{load_certs, load_keys}; +use crate::{ClientServerMap, ConnectionPool}; pub const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -524,6 +525,28 @@ pub async fn parse(path: &str) -> Result<(), Error> { Ok(()) } +pub async fn reload_config(client_server_map: ClientServerMap) -> Result { + let old_config = get_config(); + match parse(&old_config.path).await { + Ok(()) => (), + Err(err) => { + error!("Config reload error: {:?}", err); + return Err(Error::BadConfig); + } + }; + let new_config = get_config(); + + if old_config.pools != new_config.pools { + info!("Pool configuration changed, re-creating server pools"); + ConnectionPool::from_config(client_server_map).await?; + Ok(true) + } else if old_config != new_config { + Ok(true) + } else { + Ok(false) + } +} + #[cfg(test)] mod test { use super::*; @@ -533,6 +556,7 @@ mod test { parse("pgcat.toml").await.unwrap(); assert_eq!(get_config().path, "pgcat.toml".to_string()); + assert_eq!(get_config().general.ban_time, 60); assert_eq!(get_config().pools.len(), 2); assert_eq!(get_config().pools["sharded_db"].shards.len(), 3); diff --git a/src/main.rs b/src/main.rs index d49e16bf..0b2e1d59 100644 --- a/src/main.rs +++ b/src/main.rs @@ -36,7 +36,6 @@ extern crate tokio; extern crate tokio_rustls; extern crate toml; -use config::parse; use log::{debug, error, info}; use parking_lot::Mutex; use tokio::net::TcpListener; @@ -66,8 +65,7 @@ mod sharding; mod stats; mod tls; -use crate::config::{get_config, VERSION}; -use crate::errors::Error; +use crate::config::{get_config, reload_config, VERSION}; use crate::pool::{ClientServerMap, ConnectionPool}; use crate::prometheus::start_metric_server; use crate::stats::{Collector, Reporter, REPORTER}; @@ -342,25 +340,3 @@ fn format_duration(duration: &chrono::Duration) -> String { format!("{}d {}:{}:{}", days, hours, minutes, seconds) } - -async fn reload_config(client_server_map: ClientServerMap) -> Result { - let old_config = get_config(); - match parse(&old_config.path).await { - Ok(()) => (), - Err(err) => { - error!("Config reload error: {:?}", err); - return Err(Error::BadConfig); - } - }; - let new_config = get_config(); - - if old_config.pools != new_config.pools { - info!("Pool configuration changed, re-creating server pools"); - ConnectionPool::from_config(client_server_map).await?; - Ok(true) - } else if old_config != new_config { - Ok(true) - } else { - Ok(false) - } -} diff --git a/src/pool.rs b/src/pool.rs index f76f5b6f..3db546ce 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -242,14 +242,13 @@ impl ConnectionPool { /// Get a connection from the pool. pub async fn get( &self, - shard: usize, // shard number - role: Option, // primary or replica - process_id: i32, // client id + shard: usize, // shard number + role: Option, // primary or replica + process_id: i32, // client id ) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> { let now = Instant::now(); let addresses = &self.addresses[shard]; - let mut allowed_attempts = match role { // Primary-specific queries get one attempt, if the primary is down, // nothing we should do about it I think. It's dangerous to retry @@ -575,3 +574,376 @@ pub fn get_number_of_addresses() -> usize { pub fn get_all_pools() -> HashMap<(String, String), ConnectionPool> { return (*(*POOLS.load())).clone(); } + +mod test { + extern crate postgres; + + use super::super::config::Config; + use super::super::config::Pool; + use super::super::config::Shard; + use super::super::config::User; + + use core::time; + use log::error; + use postgres::{Client, NoTls}; + use rand::{distributions::Alphanumeric, Rng}; + use std::{ + collections::HashMap, + fs, + io::Write, + process::{Child, Command, Stdio}, + thread::sleep, + }; + + pub const BASE_CONFIG: &str = "./pgcat.toml"; + + pub struct PgcatInstance { + process: Child, + port: i16, + log_file: String, + config_filename: String, + last_query_count: usize, + } + impl PgcatInstance { + pub fn start(log_level: log::Level, config: Option) -> PgcatInstance { + let base_filename: String = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(7) + .map(char::from) + .collect(); + let config_filename = format!("/tmp/pgcat_{}.temp.toml", base_filename); + let log_filename = format!("/tmp/pgcat_{}.out", base_filename); + + let file = fs::File::create(log_filename.clone()).unwrap(); + let stdio = Stdio::from(file); + + let port = match config { + Some(mut cfg) => { + let mut file = fs::File::create(config_filename.clone()).unwrap(); + cfg.path = config_filename.clone(); + file.write_all(toml::to_string(&cfg).unwrap().as_bytes()) + .unwrap(); + cfg.general.port + } + None => { + fs::copy(BASE_CONFIG, config_filename.clone()).unwrap(); + let mut cfg = toml::from_str::( + &fs::read_to_string(config_filename.clone()).unwrap(), + ) + .unwrap(); + cfg.path = config_filename.clone(); + let mut file = fs::File::create(config_filename.clone()).unwrap(); + file.write_all(toml::to_string(&cfg).unwrap().as_bytes()) + .unwrap(); + cfg.general.port + } + }; + + let child = Command::new("./target/debug/pgcat") + .env("RUST_LOG", log_level.as_str()) + .arg(config_filename.clone()) + .stderr(stdio) + .spawn() + .unwrap(); + println!( + "Started pgcat instance PID({:?}), LogFile({:?}), ConfigFile({:?})", + child.id(), + log_filename, + config_filename + ); + + let instance = PgcatInstance { + process: child, + port: port, + log_file: log_filename, + config_filename: config_filename.clone(), + last_query_count: 0, + }; + instance.wait_until_ready(); + return instance; + } + + pub fn current_config(&self) -> Config { + return toml::from_str( + fs::read_to_string(self.config_filename.clone()) + .unwrap() + .as_str(), + ) + .unwrap(); + } + + pub fn begin_recording_query_count(&mut self) { + self.last_query_count = self.get_logs().matches("Sending query").count(); + } + + pub fn end_recording_query_count(&mut self) -> usize { + return self.get_logs().matches("Sending query").count() - self.last_query_count; + } + + pub fn get_logs(&self) -> String { + fs::read_to_string(self.log_file.clone()).unwrap() + } + + pub fn update_config(&self, new_config: Config) { + let mut file = fs::File::create(self.config_filename.clone()).unwrap(); + file.write_all(toml::to_string(&new_config).unwrap().as_bytes()) + .unwrap(); + } + + pub fn reload_config(&self) { + Command::new("kill") + .args(["-s", "HUP", self.process.id().to_string().as_str()]) + .spawn() + .unwrap() + .wait() + .unwrap(); + } + + pub fn stop(&mut self) { + match self.process.kill() { + Ok(_) => { + self.process.try_wait().unwrap(); + () + } + Err(err) => { + error!("Failed to kill process {:?}, {:?}", self.process.id(), err); + () + } + } + } + + pub fn wait_until_ready(&self) { + let conn_str = format!( + "postgres://sharding_user:sharding_user@localhost:{}/shard0", + self.port + ); + for _ in 0..10 { + if Client::connect(&conn_str, NoTls).is_ok() { + return; + } + sleep(time::Duration::from_millis(500)); + } + panic!("Server was never ready!"); + } + } + impl Drop for PgcatInstance { + fn drop(&mut self) { + self.stop(); + //fs::remove_file(self.config_filename.clone()).unwrap(); + //fs::remove_file(self.log_file.clone()).unwrap(); + } + } + + pub fn single_shard_setup( + num_replica: u8, + db: String, + base_config: Option, + ) -> (PgcatInstance, Vec) { + let mut rng = rand::thread_rng(); + let mut pgclowder = vec![]; + let mut main_server_entries = vec![]; + let primary_pg_address = ( + String::from("localhost"), + 5432 as u16, + String::from("primary"), + ); + let replica_pg_address = ( + String::from("localhost"), + 5432 as u16, + String::from("replica"), + ); + + let mut base_cfg = match base_config { + Some(cfg) => cfg, + None => Config::default(), + }; + let user = User { + username: String::from("sharding_user"), + password: String::from("sharding_user"), + pool_size: 5, + statement_timeout: 100000, + }; + base_cfg.pools.insert(db.clone(), Pool::default()); + base_cfg + .pools + .get_mut(&db.clone()) + .unwrap() + .users + .insert(String::from("sharding_user"), user); + + for i in 0..num_replica + 1 { + let mut cfg = base_cfg.clone(); + let port = rng.gen_range(10000..32000); + + cfg.general.port = port; + let (local_address, main_address) = match i { + 0 => ( + primary_pg_address.clone(), + ( + String::from("localhost"), + cfg.general.port as u16, + String::from("primary"), + ), + ), + _ => ( + replica_pg_address.clone(), + ( + String::from("localhost"), + cfg.general.port as u16, + String::from("replica"), + ), + ), + }; + + cfg.pools.get_mut(&db.clone()).unwrap().shards = HashMap::from([( + String::from("0"), + Shard { + database: db.clone(), + servers: vec![local_address], + }, + )]); + main_server_entries.push(main_address); + pgclowder.push(PgcatInstance::start(log::Level::Debug, Some(cfg))) + } + let mut main_cfg = base_cfg.clone(); + main_cfg.general.port = rng.gen_range(10000..32000); + main_cfg.pools.get_mut(&db.clone()).unwrap().shards = HashMap::from([( + String::from("0"), + Shard { + database: String::from("shard0"), + // Server entries point to other Pgcat processes + servers: main_server_entries, + }, + )]); + return ( + PgcatInstance::start(log::Level::Trace, Some(main_cfg)), + pgclowder, + ); + } + + fn value_within_range(target_value: i32, value: i32) { + if value == target_value { + return; + } + // Allow a buffer of 15% around the target value + let buffer = ((target_value as f32) * 0.15) as i32; + let start = target_value - buffer; + let end = target_value + buffer; + let result = if (start..end).contains(&value) { + "Pass" + } else { + "Fail" + }; + + println!( + "Expecting {} to fall between {} and {} ... {}", + value, start, end, result + ); + assert!((start..end).contains(&value)); + } + + #[test] + fn test_basic_load_balancing() { + let (main_pgcat, mut proxy_instances) = single_shard_setup(3, String::from("shard0"), None); + + let conn_str = format!( + "postgres://sharding_user:sharding_user@localhost:{}/shard0", + main_pgcat.port + ); + let mut client = Client::connect(&conn_str, NoTls).unwrap(); + + for proxy in proxy_instances.iter_mut() { + proxy.begin_recording_query_count(); + } + let total_count = 5000; + for _ in 0..total_count { + if client.simple_query("SELECT 1").is_err() { + client = Client::connect(&conn_str, NoTls).unwrap(); + continue; + } + } + + let expected_share = total_count / 4; + for proxy in proxy_instances.iter_mut() { + value_within_range(expected_share, proxy.end_recording_query_count() as i32); + } + } + + #[test] + fn test_failover_load_balancing() { + let (main_pgcat, mut proxy_instances) = + single_shard_setup(3, String::from("shard0"), None); + + let mut replica2 = proxy_instances.pop().unwrap(); + let mut replica1 = proxy_instances.pop().unwrap(); + let mut replica0 = proxy_instances.pop().unwrap(); + let mut primary = proxy_instances.pop().unwrap(); + + primary.begin_recording_query_count(); + replica0.begin_recording_query_count(); + replica1.begin_recording_query_count(); + replica2.begin_recording_query_count(); + + replica1.stop(); + + let conn_str = format!( + "postgres://sharding_user:sharding_user@localhost:{}/shard0", + main_pgcat.port + ); + let total_count = 2000; + let mut client = Client::connect(&conn_str, NoTls).unwrap(); + for _ in 0..total_count { + if client.simple_query("SELECT 1").is_err() { + client = Client::connect(&conn_str, NoTls).unwrap(); + continue; + } + } + let expected_share = total_count / 3; + + value_within_range(expected_share, primary.end_recording_query_count() as i32); + value_within_range(expected_share, replica0.end_recording_query_count() as i32); + value_within_range(0, replica1.end_recording_query_count() as i32); + value_within_range(expected_share, replica2.end_recording_query_count() as i32); + } + + #[test] + fn test_load_balancing_with_query_routing() { + let (main_pgcat, mut proxy_instances) = single_shard_setup(3, String::from("shard0"), None); + let mut cfg = main_pgcat.current_config(); + cfg.pools.get_mut("shard0").unwrap().primary_reads_enabled = false; + cfg.pools.get_mut("shard0").unwrap().query_parser_enabled = true; + cfg.pools.get_mut("shard0").unwrap().default_role = String::from("auto"); + main_pgcat.update_config(cfg); + main_pgcat.reload_config(); + + let mut replica2 = proxy_instances.pop().unwrap(); + let mut replica1 = proxy_instances.pop().unwrap(); + let mut replica0 = proxy_instances.pop().unwrap(); + let mut primary = proxy_instances.pop().unwrap(); + + primary.begin_recording_query_count(); + replica0.begin_recording_query_count(); + replica1.begin_recording_query_count(); + replica2.begin_recording_query_count(); + + let conn_str = format!( + "postgres://sharding_user:sharding_user@localhost:{}/shard0", + main_pgcat.port + ); + let mut client = Client::connect(&conn_str, NoTls).unwrap(); + client.simple_query("SET SERVER ROLE TO 'auto'").unwrap(); + + let total_count = 2000; + for _ in 0..total_count { + if client.simple_query("SELECT 1").is_err() { + client = Client::connect(&conn_str, NoTls).unwrap(); + continue; + } + } + let expected_share = total_count / 3; + value_within_range(expected_share, replica0.end_recording_query_count() as i32); + value_within_range(expected_share, replica1.end_recording_query_count() as i32); + value_within_range(expected_share, replica2.end_recording_query_count() as i32); + value_within_range(0, primary.end_recording_query_count() as i32); + } +} diff --git a/tests/config.rs b/tests/config.rs deleted file mode 120000 index f16cf01b..00000000 --- a/tests/config.rs +++ /dev/null @@ -1 +0,0 @@ -../src/config.rs \ No newline at end of file diff --git a/tests/errors.rs b/tests/errors.rs deleted file mode 120000 index e1746501..00000000 --- a/tests/errors.rs +++ /dev/null @@ -1 +0,0 @@ -../src/errors.rs \ No newline at end of file diff --git a/tests/integration_test.rs b/tests/integration_test.rs deleted file mode 100644 index 5d5b679b..00000000 --- a/tests/integration_test.rs +++ /dev/null @@ -1,372 +0,0 @@ -mod config; -mod errors; -mod tls; - -use config::Config; -use core::time; -use postgres::{Client, NoTls}; -use rand::{distributions::Alphanumeric, Rng}; -use std::{ - collections::HashMap, - fmt, fs, - io::Write, - process::{Child, Command, Stdio}, - thread::sleep, -}; - -pub const BASE_CONFIG: &str = "./pgcat.toml"; - -pub fn get_base_config() -> Config { - return toml::from_str(fs::read_to_string(BASE_CONFIG).unwrap().as_str()).unwrap(); -} - -pub struct PgcatInstance { - process: Child, - port: i16, - log_file: String, - config_filename: String, - last_query_count: usize, -} -impl PgcatInstance { - pub fn start(log_level: log::Level, config: Option) -> PgcatInstance { - let base_filename: String = rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(7) - .map(char::from) - .collect(); - let config_filename = format!("/tmp/pgcat_{}.temp.toml", base_filename); - let log_filename = format!("/tmp/pgcat_{}.out", base_filename); - - let file = fs::File::create(log_filename.clone()).unwrap(); - let stdio = Stdio::from(file); - - let port = match config { - Some(mut cfg) => { - let mut file = fs::File::create(config_filename.clone()).unwrap(); - cfg.path = config_filename.clone(); - file.write_all(toml::to_string(&cfg).unwrap().as_bytes()) - .unwrap(); - cfg.general.port - } - None => { - fs::copy(BASE_CONFIG, config_filename.clone()).unwrap(); - let mut cfg = - toml::from_str::(&fs::read_to_string(config_filename.clone()).unwrap()) - .unwrap(); - cfg.path = config_filename.clone(); - let mut file = fs::File::create(config_filename.clone()).unwrap(); - file.write_all(toml::to_string(&cfg).unwrap().as_bytes()) - .unwrap(); - cfg.general.port - } - }; - - let child = Command::new("./target/debug/pgcat") - .env("RUST_LOG", log_level.as_str()) - .arg(config_filename.clone()) - .stderr(stdio) - .spawn() - .unwrap(); - println!( - "Started pgcat instance PID({:?}), LogFile({:?}), ConfigFile({:?})", - child.id(), - log_filename, - config_filename - ); - - let mut instance = PgcatInstance { - process: child, - port: port, - log_file: log_filename, - config_filename: config_filename.clone(), - last_query_count: 0, - }; - instance.wait_until_ready(); - return instance; - } - - pub fn current_config(&self) -> Config { - return toml::from_str( - fs::read_to_string(self.config_filename.clone()) - .unwrap() - .as_str(), - ) - .unwrap(); - } - - pub fn get_total_query_count(&self) -> usize { - self.get_logs().matches("talking to server Address").count() - } - - pub fn begin_recording_query_count(&mut self) { - self.last_query_count = self.get_logs().matches("Sending query").count(); - } - - pub fn end_recording_query_count(&mut self) -> usize { - return self.get_logs().matches("Sending query").count() - self.last_query_count; - } - - pub fn get_logs(&self) -> String { - fs::read_to_string(self.log_file.clone()).unwrap() - } - - pub fn update_config(&self, new_config: Config) { - let mut file = fs::File::create(self.config_filename.clone()).unwrap(); - file.write_all(toml::to_string(&new_config).unwrap().as_bytes()) - .unwrap(); - } - - pub fn reload_config(&self) { - Command::new("kill") - .args(["-s", "HUP", self.process.id().to_string().as_str()]) - .spawn() - .unwrap() - .wait() - .unwrap(); - } - - pub fn stop(&mut self) { - self.process.kill(); - } - - pub fn wait_until_ready(&self) { - let conn_str = format!( - "postgres://sharding_user:sharding_user@localhost:{}/shard0", - self.port - ); - for _ in 0..10 { - if Client::connect(&conn_str, NoTls).is_ok() { - return; - } - sleep(time::Duration::from_millis(500)); - } - panic!("Server was never ready!"); - } -} -impl Drop for PgcatInstance { - fn drop(&mut self) { - self.stop(); - fs::remove_file(self.config_filename.clone()).unwrap(); - fs::remove_file(self.log_file.clone()).unwrap(); - } -} - -pub fn single_shard_setup( - num_replica: u8, - db: String, - base_config: Option, -) -> (PgcatInstance, Vec) { - let mut pgclowder = vec![]; - let mut main_server_entries = vec![]; - let primary_pg_address = ( - String::from("localhost"), - 5432 as u16, - String::from("primary"), - ); - let replica_pg_address = ( - String::from("localhost"), - 5432 as u16, - String::from("replica"), - ); - - let mut base_cfg = match base_config { - Some(cfg) => cfg, - None => Config::default(), - }; - let user = config::User { - username: String::from("sharding_user"), - password: String::from("sharding_user"), - pool_size: 5, - statement_timeout: 100000, - }; - base_cfg.pools.insert(db.clone(), config::Pool::default()); - base_cfg - .pools - .get_mut(&db.clone()) - .unwrap() - .users - .insert(String::from("sharding_user"), user); - - for i in 0..num_replica + 1 { - let mut cfg = base_cfg.clone(); - cfg.general.port = 10000 + i as i16; - let (local_address, main_address) = match i { - 0 => ( - primary_pg_address.clone(), - ( - String::from("localhost"), - cfg.general.port as u16, - String::from("primary"), - ), - ), - _ => ( - replica_pg_address.clone(), - ( - String::from("localhost"), - cfg.general.port as u16, - String::from("replica"), - ), - ), - }; - - cfg.pools.get_mut(&db.clone()).unwrap().shards = HashMap::from([( - String::from("0"), - config::Shard { - database: db.clone(), - servers: vec![local_address], - }, - )]); - main_server_entries.push(main_address); - pgclowder.push(PgcatInstance::start(log::Level::Debug, Some(cfg))) - } - let mut main_cfg = base_cfg.clone(); - main_cfg.general.port = 6432; - main_cfg.pools.get_mut(&db.clone()).unwrap().shards = HashMap::from([( - String::from("0"), - config::Shard { - database: String::from("shard0"), - // Server entries point to other Pgcat processes - servers: main_server_entries, - }, - )]); - return ( - PgcatInstance::start(log::Level::Trace, Some(main_cfg)), - pgclowder, - ); -} - -fn value_within_range(target_value: i32, value: i32) { - if value == target_value { - return; - } - // Allow a buffer of 15% around the target value - let buffer = ((target_value as f32) * 0.15) as i32; - let start = target_value - buffer; - let end = target_value + buffer; - let result = if (start..end).contains(&value) { - "Pass" - } else { - "Fail" - }; - - println!( - "Expecting {} to fall between {} and {} ... {}", - value, start, end, result - ); - assert!((start..end).contains(&value)); -} - -#[test] -fn test_basic_load_balancing() { - let (_main_pgcat, mut proxy_instances) = single_shard_setup(3, String::from("shard0"), None); - let mut client = Client::connect( - "postgres://sharding_user:sharding_user@localhost:6432/shard0", - NoTls, - ) - .unwrap(); - - for proxy in proxy_instances.iter_mut() { - proxy.begin_recording_query_count(); - } - let total_count = 5000; - for _ in 0..total_count { - if client.simple_query("SELECT 1").is_err() { - client = Client::connect( - "postgres://sharding_user:sharding_user@localhost:6432/shard0", - NoTls, - ) - .unwrap(); - continue; - } - } - - let expected_share = total_count / 4; - for proxy in proxy_instances.iter_mut() { - value_within_range(expected_share, proxy.end_recording_query_count() as i32); - } -} - -#[test] -fn test_failover_load_balancing() { - let (_main_pgcat, mut proxy_instances) = single_shard_setup(3, String::from("shard0"), None); - - let mut replica2 = proxy_instances.pop().unwrap(); - let mut replica1 = proxy_instances.pop().unwrap(); - let mut replica0 = proxy_instances.pop().unwrap(); - let mut primary = proxy_instances.pop().unwrap(); - - primary.begin_recording_query_count(); - replica0.begin_recording_query_count(); - replica1.begin_recording_query_count(); - replica2.begin_recording_query_count(); - - replica1.stop(); - - let total_count = 2000; - let mut client = Client::connect( - "postgres://sharding_user:sharding_user@localhost:6432/shard0", - NoTls, - ) - .unwrap(); - for _ in 0..total_count { - if client.simple_query("SELECT 1").is_err() { - client = Client::connect( - "postgres://sharding_user:sharding_user@localhost:6432/shard0", - NoTls, - ) - .unwrap(); - continue; - } - } - let expected_share = total_count / 3; - - value_within_range(expected_share, primary.end_recording_query_count() as i32); - value_within_range(expected_share, replica0.end_recording_query_count() as i32); - value_within_range(0, replica1.end_recording_query_count() as i32); - value_within_range(expected_share, replica2.end_recording_query_count() as i32); -} - -#[test] -fn test_load_balancing_with_query_routing() { - let (main_pgcat, mut proxy_instances) = single_shard_setup(3, String::from("shard0"), None); - let mut cfg = main_pgcat.current_config(); - cfg.pools.get_mut("shard0").unwrap().primary_reads_enabled = false; - cfg.pools.get_mut("shard0").unwrap().query_parser_enabled = true; - cfg.pools.get_mut("shard0").unwrap().default_role = String::from("auto"); - main_pgcat.update_config(cfg); - main_pgcat.reload_config(); - - let mut replica2 = proxy_instances.pop().unwrap(); - let mut replica1 = proxy_instances.pop().unwrap(); - let mut replica0 = proxy_instances.pop().unwrap(); - let mut primary = proxy_instances.pop().unwrap(); - - primary.begin_recording_query_count(); - replica0.begin_recording_query_count(); - replica1.begin_recording_query_count(); - replica2.begin_recording_query_count(); - - let mut client = Client::connect( - "postgres://sharding_user:sharding_user@localhost:6432/shard0", - NoTls, - ) - .unwrap(); - client.simple_query("SET SERVER ROLE TO 'auto'").unwrap(); - - let total_count = 2000; - for _ in 0..total_count { - if client.simple_query("SELECT 1").is_err() { - client = Client::connect( - "postgres://sharding_user:sharding_user@localhost:6432/shard0", - NoTls, - ) - .unwrap(); - continue; - } - } - let expected_share = total_count / 3; - value_within_range(expected_share, replica0.end_recording_query_count() as i32); - value_within_range(expected_share, replica1.end_recording_query_count() as i32); - value_within_range(expected_share, replica2.end_recording_query_count() as i32); - value_within_range(0, primary.end_recording_query_count() as i32); -} diff --git a/tests/tls.rs b/tests/tls.rs deleted file mode 120000 index 0af4b183..00000000 --- a/tests/tls.rs +++ /dev/null @@ -1 +0,0 @@ -../src/tls.rs \ No newline at end of file From c9be1592a3c8699b1ccd697da26cc8764e872f53 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 21 Aug 2022 06:38:27 -0500 Subject: [PATCH 03/17] revert more --- src/admin.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index 4718169d..6a79e49e 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -3,12 +3,12 @@ use bytes::{Buf, BufMut, BytesMut}; use log::{info, trace}; use std::collections::HashMap; -use crate::config::{get_config, VERSION}; +use crate::config::{get_config, reload_config, VERSION}; use crate::errors::Error; +use crate::messages::*; use crate::pool::get_all_pools; use crate::stats::get_stats; use crate::ClientServerMap; -use crate::{messages::*, reload_config}; pub fn generate_server_info_for_admin() -> BytesMut { let mut server_info = BytesMut::new(); From f4b3bf4c011eacf95a2d4222a331a98643512a8c Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 21 Aug 2022 06:41:34 -0500 Subject: [PATCH 04/17] poor-man's integration test --- src/main.rs | 374 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 374 insertions(+) diff --git a/src/main.rs b/src/main.rs index 0b2e1d59..9e5c9b10 100644 --- a/src/main.rs +++ b/src/main.rs @@ -340,3 +340,377 @@ fn format_duration(duration: &chrono::Duration) -> String { format!("{}d {}:{}:{}", days, hours, minutes, seconds) } + + +mod test { + extern crate postgres; + + use super::config::Config; + use super::config::Pool; + use super::config::Shard; + use super::config::User; + + use core::time; + use log::error; + use postgres::{Client, NoTls}; + use rand::{distributions::Alphanumeric, Rng}; + use std::{ + collections::HashMap, + fs, + io::Write, + process::{Child, Command, Stdio}, + thread::sleep, + }; + + pub const BASE_CONFIG: &str = "./pgcat.toml"; + + pub struct PgcatInstance { + process: Child, + port: i16, + log_file: String, + config_filename: String, + last_query_count: usize, + } + impl PgcatInstance { + pub fn start(log_level: log::Level, config: Option) -> PgcatInstance { + let base_filename: String = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(7) + .map(char::from) + .collect(); + let config_filename = format!("/tmp/pgcat_{}.temp.toml", base_filename); + let log_filename = format!("/tmp/pgcat_{}.out", base_filename); + + let file = fs::File::create(log_filename.clone()).unwrap(); + let stdio = Stdio::from(file); + + let port = match config { + Some(mut cfg) => { + let mut file = fs::File::create(config_filename.clone()).unwrap(); + cfg.path = config_filename.clone(); + file.write_all(toml::to_string(&cfg).unwrap().as_bytes()) + .unwrap(); + cfg.general.port + } + None => { + fs::copy(BASE_CONFIG, config_filename.clone()).unwrap(); + let mut cfg = toml::from_str::( + &fs::read_to_string(config_filename.clone()).unwrap(), + ) + .unwrap(); + cfg.path = config_filename.clone(); + let mut file = fs::File::create(config_filename.clone()).unwrap(); + file.write_all(toml::to_string(&cfg).unwrap().as_bytes()) + .unwrap(); + cfg.general.port + } + }; + + let child = Command::new("./target/debug/pgcat") + .env("RUST_LOG", log_level.as_str()) + .arg(config_filename.clone()) + .stderr(stdio) + .spawn() + .unwrap(); + println!( + "Started pgcat instance PID({:?}), LogFile({:?}), ConfigFile({:?})", + child.id(), + log_filename, + config_filename + ); + + let instance = PgcatInstance { + process: child, + port: port, + log_file: log_filename, + config_filename: config_filename.clone(), + last_query_count: 0, + }; + instance.wait_until_ready(); + return instance; + } + + pub fn current_config(&self) -> Config { + return toml::from_str( + fs::read_to_string(self.config_filename.clone()) + .unwrap() + .as_str(), + ) + .unwrap(); + } + + pub fn begin_recording_query_count(&mut self) { + self.last_query_count = self.get_logs().matches("Sending query").count(); + } + + pub fn end_recording_query_count(&mut self) -> usize { + return self.get_logs().matches("Sending query").count() - self.last_query_count; + } + + pub fn get_logs(&self) -> String { + fs::read_to_string(self.log_file.clone()).unwrap() + } + + pub fn update_config(&self, new_config: Config) { + let mut file = fs::File::create(self.config_filename.clone()).unwrap(); + file.write_all(toml::to_string(&new_config).unwrap().as_bytes()) + .unwrap(); + } + + pub fn reload_config(&self) { + Command::new("kill") + .args(["-s", "HUP", self.process.id().to_string().as_str()]) + .spawn() + .unwrap() + .wait() + .unwrap(); + } + + pub fn stop(&mut self) { + match self.process.kill() { + Ok(_) => { + self.process.try_wait().unwrap(); + () + } + Err(err) => { + error!("Failed to kill process {:?}, {:?}", self.process.id(), err); + () + } + } + } + + pub fn wait_until_ready(&self) { + let conn_str = format!( + "postgres://sharding_user:sharding_user@localhost:{}/shard0", + self.port + ); + for _ in 0..10 { + if Client::connect(&conn_str, NoTls).is_ok() { + return; + } + sleep(time::Duration::from_millis(500)); + } + panic!("Server was never ready!"); + } + } + impl Drop for PgcatInstance { + fn drop(&mut self) { + self.stop(); + //fs::remove_file(self.config_filename.clone()).unwrap(); + //fs::remove_file(self.log_file.clone()).unwrap(); + } + } + + pub fn single_shard_setup( + num_replica: u8, + db: String, + base_config: Option, + ) -> (PgcatInstance, Vec) { + let mut rng = rand::thread_rng(); + let mut pgclowder = vec![]; + let mut main_server_entries = vec![]; + let primary_pg_address = ( + String::from("localhost"), + 5432 as u16, + String::from("primary"), + ); + let replica_pg_address = ( + String::from("localhost"), + 5432 as u16, + String::from("replica"), + ); + + let mut base_cfg = match base_config { + Some(cfg) => cfg, + None => Config::default(), + }; + let user = User { + username: String::from("sharding_user"), + password: String::from("sharding_user"), + pool_size: 5, + statement_timeout: 100000, + }; + base_cfg.pools.insert(db.clone(), Pool::default()); + base_cfg + .pools + .get_mut(&db.clone()) + .unwrap() + .users + .insert(String::from("sharding_user"), user); + + for i in 0..num_replica + 1 { + let mut cfg = base_cfg.clone(); + let port = rng.gen_range(10000..32000); + + cfg.general.port = port; + let (local_address, main_address) = match i { + 0 => ( + primary_pg_address.clone(), + ( + String::from("localhost"), + cfg.general.port as u16, + String::from("primary"), + ), + ), + _ => ( + replica_pg_address.clone(), + ( + String::from("localhost"), + cfg.general.port as u16, + String::from("replica"), + ), + ), + }; + + cfg.pools.get_mut(&db.clone()).unwrap().shards = HashMap::from([( + String::from("0"), + Shard { + database: db.clone(), + servers: vec![local_address], + }, + )]); + main_server_entries.push(main_address); + pgclowder.push(PgcatInstance::start(log::Level::Debug, Some(cfg))) + } + let mut main_cfg = base_cfg.clone(); + main_cfg.general.port = rng.gen_range(10000..32000); + main_cfg.pools.get_mut(&db.clone()).unwrap().shards = HashMap::from([( + String::from("0"), + Shard { + database: String::from("shard0"), + // Server entries point to other Pgcat processes + servers: main_server_entries, + }, + )]); + return ( + PgcatInstance::start(log::Level::Trace, Some(main_cfg)), + pgclowder, + ); + } + + fn value_within_range(target_value: i32, value: i32) { + if value == target_value { + return; + } + // Allow a buffer of 15% around the target value + let buffer = ((target_value as f32) * 0.15) as i32; + let start = target_value - buffer; + let end = target_value + buffer; + let result = if (start..end).contains(&value) { + "Pass" + } else { + "Fail" + }; + + println!( + "Expecting {} to fall between {} and {} ... {}", + value, start, end, result + ); + assert!((start..end).contains(&value)); + } + + #[test] + fn test_basic_load_balancing() { + let (main_pgcat, mut proxy_instances) = single_shard_setup(3, String::from("shard0"), None); + + let conn_str = format!( + "postgres://sharding_user:sharding_user@localhost:{}/shard0", + main_pgcat.port + ); + let mut client = Client::connect(&conn_str, NoTls).unwrap(); + + for proxy in proxy_instances.iter_mut() { + proxy.begin_recording_query_count(); + } + let total_count = 5000; + for _ in 0..total_count { + if client.simple_query("SELECT 1").is_err() { + client = Client::connect(&conn_str, NoTls).unwrap(); + continue; + } + } + + let expected_share = total_count / 4; + for proxy in proxy_instances.iter_mut() { + value_within_range(expected_share, proxy.end_recording_query_count() as i32); + } + } + + #[test] + fn test_failover_load_balancing() { + let (main_pgcat, mut proxy_instances) = + single_shard_setup(3, String::from("shard0"), None); + + let mut replica2 = proxy_instances.pop().unwrap(); + let mut replica1 = proxy_instances.pop().unwrap(); + let mut replica0 = proxy_instances.pop().unwrap(); + let mut primary = proxy_instances.pop().unwrap(); + + primary.begin_recording_query_count(); + replica0.begin_recording_query_count(); + replica1.begin_recording_query_count(); + replica2.begin_recording_query_count(); + + replica1.stop(); + + let conn_str = format!( + "postgres://sharding_user:sharding_user@localhost:{}/shard0", + main_pgcat.port + ); + let total_count = 2000; + let mut client = Client::connect(&conn_str, NoTls).unwrap(); + for _ in 0..total_count { + if client.simple_query("SELECT 1").is_err() { + client = Client::connect(&conn_str, NoTls).unwrap(); + continue; + } + } + let expected_share = total_count / 3; + + value_within_range(expected_share, primary.end_recording_query_count() as i32); + value_within_range(expected_share, replica0.end_recording_query_count() as i32); + value_within_range(0, replica1.end_recording_query_count() as i32); + value_within_range(expected_share, replica2.end_recording_query_count() as i32); + } + + #[test] + fn test_load_balancing_with_query_routing() { + let (main_pgcat, mut proxy_instances) = single_shard_setup(3, String::from("shard0"), None); + let mut cfg = main_pgcat.current_config(); + cfg.pools.get_mut("shard0").unwrap().primary_reads_enabled = false; + cfg.pools.get_mut("shard0").unwrap().query_parser_enabled = true; + cfg.pools.get_mut("shard0").unwrap().default_role = String::from("auto"); + main_pgcat.update_config(cfg); + main_pgcat.reload_config(); + + let mut replica2 = proxy_instances.pop().unwrap(); + let mut replica1 = proxy_instances.pop().unwrap(); + let mut replica0 = proxy_instances.pop().unwrap(); + let mut primary = proxy_instances.pop().unwrap(); + + primary.begin_recording_query_count(); + replica0.begin_recording_query_count(); + replica1.begin_recording_query_count(); + replica2.begin_recording_query_count(); + + let conn_str = format!( + "postgres://sharding_user:sharding_user@localhost:{}/shard0", + main_pgcat.port + ); + let mut client = Client::connect(&conn_str, NoTls).unwrap(); + client.simple_query("SET SERVER ROLE TO 'auto'").unwrap(); + + let total_count = 2000; + for _ in 0..total_count { + if client.simple_query("SELECT 1").is_err() { + client = Client::connect(&conn_str, NoTls).unwrap(); + continue; + } + } + let expected_share = total_count / 3; + value_within_range(expected_share, replica0.end_recording_query_count() as i32); + value_within_range(expected_share, replica1.end_recording_query_count() as i32); + value_within_range(expected_share, replica2.end_recording_query_count() as i32); + value_within_range(0, primary.end_recording_query_count() as i32); + } +} From 1a07a7e576609cddcc368badd77f16915c39cea0 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 21 Aug 2022 06:48:49 -0500 Subject: [PATCH 05/17] remove test --- src/pool.rs | 373 ---------------------------------------------------- 1 file changed, 373 deletions(-) diff --git a/src/pool.rs b/src/pool.rs index 3db546ce..4596b873 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -574,376 +574,3 @@ pub fn get_number_of_addresses() -> usize { pub fn get_all_pools() -> HashMap<(String, String), ConnectionPool> { return (*(*POOLS.load())).clone(); } - -mod test { - extern crate postgres; - - use super::super::config::Config; - use super::super::config::Pool; - use super::super::config::Shard; - use super::super::config::User; - - use core::time; - use log::error; - use postgres::{Client, NoTls}; - use rand::{distributions::Alphanumeric, Rng}; - use std::{ - collections::HashMap, - fs, - io::Write, - process::{Child, Command, Stdio}, - thread::sleep, - }; - - pub const BASE_CONFIG: &str = "./pgcat.toml"; - - pub struct PgcatInstance { - process: Child, - port: i16, - log_file: String, - config_filename: String, - last_query_count: usize, - } - impl PgcatInstance { - pub fn start(log_level: log::Level, config: Option) -> PgcatInstance { - let base_filename: String = rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(7) - .map(char::from) - .collect(); - let config_filename = format!("/tmp/pgcat_{}.temp.toml", base_filename); - let log_filename = format!("/tmp/pgcat_{}.out", base_filename); - - let file = fs::File::create(log_filename.clone()).unwrap(); - let stdio = Stdio::from(file); - - let port = match config { - Some(mut cfg) => { - let mut file = fs::File::create(config_filename.clone()).unwrap(); - cfg.path = config_filename.clone(); - file.write_all(toml::to_string(&cfg).unwrap().as_bytes()) - .unwrap(); - cfg.general.port - } - None => { - fs::copy(BASE_CONFIG, config_filename.clone()).unwrap(); - let mut cfg = toml::from_str::( - &fs::read_to_string(config_filename.clone()).unwrap(), - ) - .unwrap(); - cfg.path = config_filename.clone(); - let mut file = fs::File::create(config_filename.clone()).unwrap(); - file.write_all(toml::to_string(&cfg).unwrap().as_bytes()) - .unwrap(); - cfg.general.port - } - }; - - let child = Command::new("./target/debug/pgcat") - .env("RUST_LOG", log_level.as_str()) - .arg(config_filename.clone()) - .stderr(stdio) - .spawn() - .unwrap(); - println!( - "Started pgcat instance PID({:?}), LogFile({:?}), ConfigFile({:?})", - child.id(), - log_filename, - config_filename - ); - - let instance = PgcatInstance { - process: child, - port: port, - log_file: log_filename, - config_filename: config_filename.clone(), - last_query_count: 0, - }; - instance.wait_until_ready(); - return instance; - } - - pub fn current_config(&self) -> Config { - return toml::from_str( - fs::read_to_string(self.config_filename.clone()) - .unwrap() - .as_str(), - ) - .unwrap(); - } - - pub fn begin_recording_query_count(&mut self) { - self.last_query_count = self.get_logs().matches("Sending query").count(); - } - - pub fn end_recording_query_count(&mut self) -> usize { - return self.get_logs().matches("Sending query").count() - self.last_query_count; - } - - pub fn get_logs(&self) -> String { - fs::read_to_string(self.log_file.clone()).unwrap() - } - - pub fn update_config(&self, new_config: Config) { - let mut file = fs::File::create(self.config_filename.clone()).unwrap(); - file.write_all(toml::to_string(&new_config).unwrap().as_bytes()) - .unwrap(); - } - - pub fn reload_config(&self) { - Command::new("kill") - .args(["-s", "HUP", self.process.id().to_string().as_str()]) - .spawn() - .unwrap() - .wait() - .unwrap(); - } - - pub fn stop(&mut self) { - match self.process.kill() { - Ok(_) => { - self.process.try_wait().unwrap(); - () - } - Err(err) => { - error!("Failed to kill process {:?}, {:?}", self.process.id(), err); - () - } - } - } - - pub fn wait_until_ready(&self) { - let conn_str = format!( - "postgres://sharding_user:sharding_user@localhost:{}/shard0", - self.port - ); - for _ in 0..10 { - if Client::connect(&conn_str, NoTls).is_ok() { - return; - } - sleep(time::Duration::from_millis(500)); - } - panic!("Server was never ready!"); - } - } - impl Drop for PgcatInstance { - fn drop(&mut self) { - self.stop(); - //fs::remove_file(self.config_filename.clone()).unwrap(); - //fs::remove_file(self.log_file.clone()).unwrap(); - } - } - - pub fn single_shard_setup( - num_replica: u8, - db: String, - base_config: Option, - ) -> (PgcatInstance, Vec) { - let mut rng = rand::thread_rng(); - let mut pgclowder = vec![]; - let mut main_server_entries = vec![]; - let primary_pg_address = ( - String::from("localhost"), - 5432 as u16, - String::from("primary"), - ); - let replica_pg_address = ( - String::from("localhost"), - 5432 as u16, - String::from("replica"), - ); - - let mut base_cfg = match base_config { - Some(cfg) => cfg, - None => Config::default(), - }; - let user = User { - username: String::from("sharding_user"), - password: String::from("sharding_user"), - pool_size: 5, - statement_timeout: 100000, - }; - base_cfg.pools.insert(db.clone(), Pool::default()); - base_cfg - .pools - .get_mut(&db.clone()) - .unwrap() - .users - .insert(String::from("sharding_user"), user); - - for i in 0..num_replica + 1 { - let mut cfg = base_cfg.clone(); - let port = rng.gen_range(10000..32000); - - cfg.general.port = port; - let (local_address, main_address) = match i { - 0 => ( - primary_pg_address.clone(), - ( - String::from("localhost"), - cfg.general.port as u16, - String::from("primary"), - ), - ), - _ => ( - replica_pg_address.clone(), - ( - String::from("localhost"), - cfg.general.port as u16, - String::from("replica"), - ), - ), - }; - - cfg.pools.get_mut(&db.clone()).unwrap().shards = HashMap::from([( - String::from("0"), - Shard { - database: db.clone(), - servers: vec![local_address], - }, - )]); - main_server_entries.push(main_address); - pgclowder.push(PgcatInstance::start(log::Level::Debug, Some(cfg))) - } - let mut main_cfg = base_cfg.clone(); - main_cfg.general.port = rng.gen_range(10000..32000); - main_cfg.pools.get_mut(&db.clone()).unwrap().shards = HashMap::from([( - String::from("0"), - Shard { - database: String::from("shard0"), - // Server entries point to other Pgcat processes - servers: main_server_entries, - }, - )]); - return ( - PgcatInstance::start(log::Level::Trace, Some(main_cfg)), - pgclowder, - ); - } - - fn value_within_range(target_value: i32, value: i32) { - if value == target_value { - return; - } - // Allow a buffer of 15% around the target value - let buffer = ((target_value as f32) * 0.15) as i32; - let start = target_value - buffer; - let end = target_value + buffer; - let result = if (start..end).contains(&value) { - "Pass" - } else { - "Fail" - }; - - println!( - "Expecting {} to fall between {} and {} ... {}", - value, start, end, result - ); - assert!((start..end).contains(&value)); - } - - #[test] - fn test_basic_load_balancing() { - let (main_pgcat, mut proxy_instances) = single_shard_setup(3, String::from("shard0"), None); - - let conn_str = format!( - "postgres://sharding_user:sharding_user@localhost:{}/shard0", - main_pgcat.port - ); - let mut client = Client::connect(&conn_str, NoTls).unwrap(); - - for proxy in proxy_instances.iter_mut() { - proxy.begin_recording_query_count(); - } - let total_count = 5000; - for _ in 0..total_count { - if client.simple_query("SELECT 1").is_err() { - client = Client::connect(&conn_str, NoTls).unwrap(); - continue; - } - } - - let expected_share = total_count / 4; - for proxy in proxy_instances.iter_mut() { - value_within_range(expected_share, proxy.end_recording_query_count() as i32); - } - } - - #[test] - fn test_failover_load_balancing() { - let (main_pgcat, mut proxy_instances) = - single_shard_setup(3, String::from("shard0"), None); - - let mut replica2 = proxy_instances.pop().unwrap(); - let mut replica1 = proxy_instances.pop().unwrap(); - let mut replica0 = proxy_instances.pop().unwrap(); - let mut primary = proxy_instances.pop().unwrap(); - - primary.begin_recording_query_count(); - replica0.begin_recording_query_count(); - replica1.begin_recording_query_count(); - replica2.begin_recording_query_count(); - - replica1.stop(); - - let conn_str = format!( - "postgres://sharding_user:sharding_user@localhost:{}/shard0", - main_pgcat.port - ); - let total_count = 2000; - let mut client = Client::connect(&conn_str, NoTls).unwrap(); - for _ in 0..total_count { - if client.simple_query("SELECT 1").is_err() { - client = Client::connect(&conn_str, NoTls).unwrap(); - continue; - } - } - let expected_share = total_count / 3; - - value_within_range(expected_share, primary.end_recording_query_count() as i32); - value_within_range(expected_share, replica0.end_recording_query_count() as i32); - value_within_range(0, replica1.end_recording_query_count() as i32); - value_within_range(expected_share, replica2.end_recording_query_count() as i32); - } - - #[test] - fn test_load_balancing_with_query_routing() { - let (main_pgcat, mut proxy_instances) = single_shard_setup(3, String::from("shard0"), None); - let mut cfg = main_pgcat.current_config(); - cfg.pools.get_mut("shard0").unwrap().primary_reads_enabled = false; - cfg.pools.get_mut("shard0").unwrap().query_parser_enabled = true; - cfg.pools.get_mut("shard0").unwrap().default_role = String::from("auto"); - main_pgcat.update_config(cfg); - main_pgcat.reload_config(); - - let mut replica2 = proxy_instances.pop().unwrap(); - let mut replica1 = proxy_instances.pop().unwrap(); - let mut replica0 = proxy_instances.pop().unwrap(); - let mut primary = proxy_instances.pop().unwrap(); - - primary.begin_recording_query_count(); - replica0.begin_recording_query_count(); - replica1.begin_recording_query_count(); - replica2.begin_recording_query_count(); - - let conn_str = format!( - "postgres://sharding_user:sharding_user@localhost:{}/shard0", - main_pgcat.port - ); - let mut client = Client::connect(&conn_str, NoTls).unwrap(); - client.simple_query("SET SERVER ROLE TO 'auto'").unwrap(); - - let total_count = 2000; - for _ in 0..total_count { - if client.simple_query("SELECT 1").is_err() { - client = Client::connect(&conn_str, NoTls).unwrap(); - continue; - } - } - let expected_share = total_count / 3; - value_within_range(expected_share, replica0.end_recording_query_count() as i32); - value_within_range(expected_share, replica1.end_recording_query_count() as i32); - value_within_range(expected_share, replica2.end_recording_query_count() as i32); - value_within_range(0, primary.end_recording_query_count() as i32); - } -} From 3159a47524fe543ed35b3ce4247053d3da0be3df Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 21 Aug 2022 06:50:55 -0500 Subject: [PATCH 06/17] fmt --- src/main.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main.rs b/src/main.rs index 9e5c9b10..e113d9a3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -341,7 +341,6 @@ fn format_duration(duration: &chrono::Duration) -> String { format!("{}d {}:{}:{}", days, hours, minutes, seconds) } - mod test { extern crate postgres; @@ -638,8 +637,7 @@ mod test { #[test] fn test_failover_load_balancing() { - let (main_pgcat, mut proxy_instances) = - single_shard_setup(3, String::from("shard0"), None); + let (main_pgcat, mut proxy_instances) = single_shard_setup(3, String::from("shard0"), None); let mut replica2 = proxy_instances.pop().unwrap(); let mut replica1 = proxy_instances.pop().unwrap(); From 6f9577cf5625a8e61adf7d5bcd557ad647f5ffa9 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 21 Aug 2022 07:51:34 -0500 Subject: [PATCH 07/17] --workspace --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 1d3449a1..f7a74bc1 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -40,7 +40,7 @@ jobs: command: "cargo install cargo-binutils rustfilt && rustup component add llvm-tools-preview" - run: name: "Build" - command: "cargo build" + command: "cargo build --workspace" - run: name: "Tests" command: "cargo test && bash .circleci/run_tests.sh && .circleci/generate_coverage.sh" From 86355f05fa92ec95d20b87e620d3f01b126b2862 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 21 Aug 2022 08:05:22 -0500 Subject: [PATCH 08/17] fix build --- .circleci/config.yml | 2 +- src/main.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index f7a74bc1..1d3449a1 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -40,7 +40,7 @@ jobs: command: "cargo install cargo-binutils rustfilt && rustup component add llvm-tools-preview" - run: name: "Build" - command: "cargo build --workspace" + command: "cargo build" - run: name: "Tests" command: "cargo test && bash .circleci/run_tests.sh && .circleci/generate_coverage.sh" diff --git a/src/main.rs b/src/main.rs index e113d9a3..9194e53a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -341,6 +341,7 @@ fn format_duration(duration: &chrono::Duration) -> String { format!("{}d {}:{}:{}", days, hours, minutes, seconds) } +#[cfg(test)] mod test { extern crate postgres; From acd259afed08fc11e63ce8d00edc19b9d0b8318f Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 21 Aug 2022 08:16:04 -0500 Subject: [PATCH 09/17] fix integration test --- .circleci/config.yml | 2 +- src/main.rs | 21 +++++++++++++-------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 1d3449a1..5f6537e4 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -43,7 +43,7 @@ jobs: command: "cargo build" - run: name: "Tests" - command: "cargo test && bash .circleci/run_tests.sh && .circleci/generate_coverage.sh" + command: "cargo build && cargo test && bash .circleci/run_tests.sh && .circleci/generate_coverage.sh" - store_artifacts: path: /tmp/cov destination: coverage-data diff --git a/src/main.rs b/src/main.rs index 9194e53a..5d818001 100644 --- a/src/main.rs +++ b/src/main.rs @@ -484,7 +484,7 @@ mod test { "postgres://sharding_user:sharding_user@localhost:{}/shard0", self.port ); - for _ in 0..10 { + for _ in 0..30 { if Client::connect(&conn_str, NoTls).is_ok() { return; } @@ -496,8 +496,8 @@ mod test { impl Drop for PgcatInstance { fn drop(&mut self) { self.stop(); - //fs::remove_file(self.config_filename.clone()).unwrap(); - //fs::remove_file(self.log_file.clone()).unwrap(); + fs::remove_file(self.config_filename.clone()).unwrap(); + fs::remove_file(self.log_file.clone()).unwrap(); } } @@ -611,7 +611,9 @@ mod test { #[test] fn test_basic_load_balancing() { - let (main_pgcat, mut proxy_instances) = single_shard_setup(3, String::from("shard0"), None); + let num_replicas: i32 = 2; + let (main_pgcat, mut proxy_instances) = + single_shard_setup(num_replicas, String::from("shard0"), None); let conn_str = format!( "postgres://sharding_user:sharding_user@localhost:{}/shard0", @@ -630,7 +632,7 @@ mod test { } } - let expected_share = total_count / 4; + let expected_share = total_count / (num_replicas + 1); for proxy in proxy_instances.iter_mut() { value_within_range(expected_share, proxy.end_recording_query_count() as i32); } @@ -638,7 +640,9 @@ mod test { #[test] fn test_failover_load_balancing() { - let (main_pgcat, mut proxy_instances) = single_shard_setup(3, String::from("shard0"), None); + let num_replicas: i32 = 2; + let (main_pgcat, mut proxy_instances) = + single_shard_setup(num_replicas, String::from("shard0"), None); let mut replica2 = proxy_instances.pop().unwrap(); let mut replica1 = proxy_instances.pop().unwrap(); @@ -664,7 +668,7 @@ mod test { continue; } } - let expected_share = total_count / 3; + let expected_share = total_count / num_replicas; value_within_range(expected_share, primary.end_recording_query_count() as i32); value_within_range(expected_share, replica0.end_recording_query_count() as i32); @@ -674,6 +678,7 @@ mod test { #[test] fn test_load_balancing_with_query_routing() { + let num_replicas: i32 = 2; let (main_pgcat, mut proxy_instances) = single_shard_setup(3, String::from("shard0"), None); let mut cfg = main_pgcat.current_config(); cfg.pools.get_mut("shard0").unwrap().primary_reads_enabled = false; @@ -706,7 +711,7 @@ mod test { continue; } } - let expected_share = total_count / 3; + let expected_share = total_count / num_replicas; value_within_range(expected_share, replica0.end_recording_query_count() as i32); value_within_range(expected_share, replica1.end_recording_query_count() as i32); value_within_range(expected_share, replica2.end_recording_query_count() as i32); From c8ea88197ceff06c3713631c072b1526e87bbb87 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 21 Aug 2022 08:22:23 -0500 Subject: [PATCH 10/17] another stab --- src/main.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main.rs b/src/main.rs index 5d818001..fda0f75b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -611,9 +611,9 @@ mod test { #[test] fn test_basic_load_balancing() { - let num_replicas: i32 = 2; + let num_replicas: i32 = 3; let (main_pgcat, mut proxy_instances) = - single_shard_setup(num_replicas, String::from("shard0"), None); + single_shard_setup(num_replicas as u8, String::from("shard0"), None); let conn_str = format!( "postgres://sharding_user:sharding_user@localhost:{}/shard0", @@ -640,9 +640,9 @@ mod test { #[test] fn test_failover_load_balancing() { - let num_replicas: i32 = 2; + let num_replicas: i32 = 3; let (main_pgcat, mut proxy_instances) = - single_shard_setup(num_replicas, String::from("shard0"), None); + single_shard_setup(num_replicas as u8, String::from("shard0"), None); let mut replica2 = proxy_instances.pop().unwrap(); let mut replica1 = proxy_instances.pop().unwrap(); @@ -678,7 +678,7 @@ mod test { #[test] fn test_load_balancing_with_query_routing() { - let num_replicas: i32 = 2; + let num_replicas: i32 = 3; let (main_pgcat, mut proxy_instances) = single_shard_setup(3, String::from("shard0"), None); let mut cfg = main_pgcat.current_config(); cfg.pools.get_mut("shard0").unwrap().primary_reads_enabled = false; From d12b4260ddaa9d5dd608b0c63a105b9e043ef98c Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 21 Aug 2022 08:28:42 -0500 Subject: [PATCH 11/17] log --- src/main.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main.rs b/src/main.rs index fda0f75b..94837de5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -490,6 +490,7 @@ mod test { } sleep(time::Duration::from_millis(500)); } + println!("Logs from failed process {}", self.get_logs()); panic!("Server was never ready!"); } } From 160b34763f9bbabad1c5cba09fd822e7f9826240 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 21 Aug 2022 08:44:35 -0500 Subject: [PATCH 12/17] run after integration --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 5f6537e4..cf0917e5 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -43,7 +43,7 @@ jobs: command: "cargo build" - run: name: "Tests" - command: "cargo build && cargo test && bash .circleci/run_tests.sh && .circleci/generate_coverage.sh" + command: "cargo build && bash .circleci/run_tests.sh && cargo test && .circleci/generate_coverage.sh" - store_artifacts: path: /tmp/cov destination: coverage-data From 38f325c4228aa36cc3418644b0e07dcabcc46cce Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 21 Aug 2022 08:48:55 -0500 Subject: [PATCH 13/17] cargo test after integration --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index cf0917e5..69d43f9a 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -43,7 +43,7 @@ jobs: command: "cargo build" - run: name: "Tests" - command: "cargo build && bash .circleci/run_tests.sh && cargo test && .circleci/generate_coverage.sh" + command: "cargo build && bash .circleci/run_tests.sh && cargo test && .circleci/generate_coverage.sh" - store_artifacts: path: /tmp/cov destination: coverage-data From 87d7bbffa5fa8c0c5d114a45a68ad7789418912e Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 21 Aug 2022 10:59:29 -0500 Subject: [PATCH 14/17] revert --- .circleci/config.yml | 2 +- Cargo.lock | 207 +---------------------- Cargo.toml | 6 +- src/main.rs | 379 ------------------------------------------- 4 files changed, 7 insertions(+), 587 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 69d43f9a..1d3449a1 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -43,7 +43,7 @@ jobs: command: "cargo build" - run: name: "Tests" - command: "cargo build && bash .circleci/run_tests.sh && cargo test && .circleci/generate_coverage.sh" + command: "cargo test && bash .circleci/run_tests.sh && .circleci/generate_coverage.sh" - store_artifacts: path: /tmp/cov destination: coverage-data diff --git a/Cargo.lock b/Cargo.lock index 521db986..6579260a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -60,7 +60,7 @@ dependencies = [ "async-trait", "futures-channel", "futures-util", - "parking_lot 0.11.2", + "parking_lot", "tokio", ] @@ -85,12 +85,6 @@ version = "3.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37ccbd214614c6783386c1af30caf03192f17891059cecc394b4fb119e363de3" -[[package]] -name = "byteorder" -version = "1.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" - [[package]] name = "bytes" version = "1.1.0" @@ -165,33 +159,12 @@ dependencies = [ "termcolor", ] -[[package]] -name = "fallible-iterator" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" - [[package]] name = "fnv" version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" -[[package]] -name = "futures" -version = "0.3.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab30e97ab6aacfe635fad58f22c2bb06c8b685f7421eb1e064a729e2a5f481fa" -dependencies = [ - "futures-channel", - "futures-core", - "futures-executor", - "futures-io", - "futures-sink", - "futures-task", - "futures-util", -] - [[package]] name = "futures-channel" version = "0.3.23" @@ -199,7 +172,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2bfc52cbddcfd745bf1740338492bb0bd83d76c67b445f91c5fb29fae29ecaa1" dependencies = [ "futures-core", - "futures-sink", ] [[package]] @@ -208,34 +180,6 @@ version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2acedae88d38235936c3922476b10fced7b2b68136f5e3c03c2d5be348a1115" -[[package]] -name = "futures-executor" -version = "0.3.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d11aa21b5b587a64682c0094c2bdd4df0076c5324961a40cc3abd7f37930528" -dependencies = [ - "futures-core", - "futures-task", - "futures-util", -] - -[[package]] -name = "futures-io" -version = "0.3.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93a66fc6d035a26a3ae255a6d2bca35eda63ae4c5512bef54449113f7a1228e5" - -[[package]] -name = "futures-macro" -version = "0.3.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0db9cce532b0eae2ccf2766ab246f114b56b9cf6d445e00c2549fbc100ca045d" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "futures-sink" version = "0.3.23" @@ -256,11 +200,7 @@ checksum = "f0828a5471e340229c11c77ca80017937ce3c58cb788a17e5f1c2d5c485a9577" dependencies = [ "futures-channel", "futures-core", - "futures-io", - "futures-macro", - "futures-sink", "futures-task", - "memchr", "pin-project-lite", "pin-utils", "slab", @@ -547,17 +487,7 @@ checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" dependencies = [ "instant", "lock_api", - "parking_lot_core 0.8.5", -] - -[[package]] -name = "parking_lot" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" -dependencies = [ - "lock_api", - "parking_lot_core 0.9.3", + "parking_lot_core", ] [[package]] @@ -574,25 +504,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "parking_lot_core" -version = "0.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929" -dependencies = [ - "cfg-if", - "libc", - "redox_syscall", - "smallvec", - "windows-sys", -] - -[[package]] -name = "percent-encoding" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" - [[package]] name = "pgcat" version = "0.6.0-alpha1" @@ -610,9 +521,8 @@ dependencies = [ "md-5", "num_cpus", "once_cell", - "parking_lot 0.11.2", + "parking_lot", "phf", - "postgres", "rand", "regex", "rustls-pemfile", @@ -683,49 +593,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" -[[package]] -name = "postgres" -version = "0.19.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8bbcd5f6deb39585a0d9f4ef34c4a41c25b7ad26d23c75d837d78c8e7adc85f" -dependencies = [ - "bytes", - "fallible-iterator", - "futures", - "log", - "tokio", - "tokio-postgres", -] - -[[package]] -name = "postgres-protocol" -version = "0.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "878c6cbf956e03af9aa8204b407b9cbf47c072164800aa918c516cd4b056c50c" -dependencies = [ - "base64", - "byteorder", - "bytes", - "fallible-iterator", - "hmac", - "md-5", - "memchr", - "rand", - "sha2", - "stringprep", -] - -[[package]] -name = "postgres-types" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebd6e8b7189a73169290e89bd24c771071f1012d8fe6f738f5226531f0b03d89" -dependencies = [ - "bytes", - "fallible-iterator", - "postgres-protocol", -] - [[package]] name = "ppv-lite86" version = "0.2.16" @@ -1039,7 +906,7 @@ dependencies = [ "mio", "num_cpus", "once_cell", - "parking_lot 0.11.2", + "parking_lot", "pin-project-lite", "signal-hook-registry", "tokio-macros", @@ -1057,29 +924,6 @@ dependencies = [ "syn", ] -[[package]] -name = "tokio-postgres" -version = "0.7.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19c88a47a23c5d2dc9ecd28fb38fba5fc7e5ddc1fe64488ec145076b0c71c8ae" -dependencies = [ - "async-trait", - "byteorder", - "bytes", - "fallible-iterator", - "futures", - "log", - "parking_lot 0.12.1", - "percent-encoding", - "phf", - "pin-project-lite", - "postgres-protocol", - "postgres-types", - "socket2", - "tokio", - "tokio-util", -] - [[package]] name = "tokio-rustls" version = "0.23.4" @@ -1317,46 +1161,3 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" - -[[package]] -name = "windows-sys" -version = "0.36.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" -dependencies = [ - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_msvc", -] - -[[package]] -name = "windows_aarch64_msvc" -version = "0.36.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" - -[[package]] -name = "windows_i686_gnu" -version = "0.36.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" - -[[package]] -name = "windows_i686_msvc" -version = "0.36.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" - -[[package]] -name = "windows_x86_64_gnu" -version = "0.36.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" - -[[package]] -name = "windows_x86_64_msvc" -version = "0.36.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" diff --git a/Cargo.toml b/Cargo.toml index 537cdac3..37370244 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,8 @@ name = "pgcat" version = "0.6.0-alpha1" edition = "2021" +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + [dependencies] tokio = { version = "1", features = ["full"] } bytes = "1" @@ -31,7 +33,3 @@ tokio-rustls = "0.23" rustls-pemfile = "1" hyper = { version = "0.14", features = ["full"] } phf = { version = "0.10", features = ["macros"] } - -[dev-dependencies] -postgres = "0.19.3" - diff --git a/src/main.rs b/src/main.rs index 94837de5..0b2e1d59 100644 --- a/src/main.rs +++ b/src/main.rs @@ -340,382 +340,3 @@ fn format_duration(duration: &chrono::Duration) -> String { format!("{}d {}:{}:{}", days, hours, minutes, seconds) } - -#[cfg(test)] -mod test { - extern crate postgres; - - use super::config::Config; - use super::config::Pool; - use super::config::Shard; - use super::config::User; - - use core::time; - use log::error; - use postgres::{Client, NoTls}; - use rand::{distributions::Alphanumeric, Rng}; - use std::{ - collections::HashMap, - fs, - io::Write, - process::{Child, Command, Stdio}, - thread::sleep, - }; - - pub const BASE_CONFIG: &str = "./pgcat.toml"; - - pub struct PgcatInstance { - process: Child, - port: i16, - log_file: String, - config_filename: String, - last_query_count: usize, - } - impl PgcatInstance { - pub fn start(log_level: log::Level, config: Option) -> PgcatInstance { - let base_filename: String = rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(7) - .map(char::from) - .collect(); - let config_filename = format!("/tmp/pgcat_{}.temp.toml", base_filename); - let log_filename = format!("/tmp/pgcat_{}.out", base_filename); - - let file = fs::File::create(log_filename.clone()).unwrap(); - let stdio = Stdio::from(file); - - let port = match config { - Some(mut cfg) => { - let mut file = fs::File::create(config_filename.clone()).unwrap(); - cfg.path = config_filename.clone(); - file.write_all(toml::to_string(&cfg).unwrap().as_bytes()) - .unwrap(); - cfg.general.port - } - None => { - fs::copy(BASE_CONFIG, config_filename.clone()).unwrap(); - let mut cfg = toml::from_str::( - &fs::read_to_string(config_filename.clone()).unwrap(), - ) - .unwrap(); - cfg.path = config_filename.clone(); - let mut file = fs::File::create(config_filename.clone()).unwrap(); - file.write_all(toml::to_string(&cfg).unwrap().as_bytes()) - .unwrap(); - cfg.general.port - } - }; - - let child = Command::new("./target/debug/pgcat") - .env("RUST_LOG", log_level.as_str()) - .arg(config_filename.clone()) - .stderr(stdio) - .spawn() - .unwrap(); - println!( - "Started pgcat instance PID({:?}), LogFile({:?}), ConfigFile({:?})", - child.id(), - log_filename, - config_filename - ); - - let instance = PgcatInstance { - process: child, - port: port, - log_file: log_filename, - config_filename: config_filename.clone(), - last_query_count: 0, - }; - instance.wait_until_ready(); - return instance; - } - - pub fn current_config(&self) -> Config { - return toml::from_str( - fs::read_to_string(self.config_filename.clone()) - .unwrap() - .as_str(), - ) - .unwrap(); - } - - pub fn begin_recording_query_count(&mut self) { - self.last_query_count = self.get_logs().matches("Sending query").count(); - } - - pub fn end_recording_query_count(&mut self) -> usize { - return self.get_logs().matches("Sending query").count() - self.last_query_count; - } - - pub fn get_logs(&self) -> String { - fs::read_to_string(self.log_file.clone()).unwrap() - } - - pub fn update_config(&self, new_config: Config) { - let mut file = fs::File::create(self.config_filename.clone()).unwrap(); - file.write_all(toml::to_string(&new_config).unwrap().as_bytes()) - .unwrap(); - } - - pub fn reload_config(&self) { - Command::new("kill") - .args(["-s", "HUP", self.process.id().to_string().as_str()]) - .spawn() - .unwrap() - .wait() - .unwrap(); - } - - pub fn stop(&mut self) { - match self.process.kill() { - Ok(_) => { - self.process.try_wait().unwrap(); - () - } - Err(err) => { - error!("Failed to kill process {:?}, {:?}", self.process.id(), err); - () - } - } - } - - pub fn wait_until_ready(&self) { - let conn_str = format!( - "postgres://sharding_user:sharding_user@localhost:{}/shard0", - self.port - ); - for _ in 0..30 { - if Client::connect(&conn_str, NoTls).is_ok() { - return; - } - sleep(time::Duration::from_millis(500)); - } - println!("Logs from failed process {}", self.get_logs()); - panic!("Server was never ready!"); - } - } - impl Drop for PgcatInstance { - fn drop(&mut self) { - self.stop(); - fs::remove_file(self.config_filename.clone()).unwrap(); - fs::remove_file(self.log_file.clone()).unwrap(); - } - } - - pub fn single_shard_setup( - num_replica: u8, - db: String, - base_config: Option, - ) -> (PgcatInstance, Vec) { - let mut rng = rand::thread_rng(); - let mut pgclowder = vec![]; - let mut main_server_entries = vec![]; - let primary_pg_address = ( - String::from("localhost"), - 5432 as u16, - String::from("primary"), - ); - let replica_pg_address = ( - String::from("localhost"), - 5432 as u16, - String::from("replica"), - ); - - let mut base_cfg = match base_config { - Some(cfg) => cfg, - None => Config::default(), - }; - let user = User { - username: String::from("sharding_user"), - password: String::from("sharding_user"), - pool_size: 5, - statement_timeout: 100000, - }; - base_cfg.pools.insert(db.clone(), Pool::default()); - base_cfg - .pools - .get_mut(&db.clone()) - .unwrap() - .users - .insert(String::from("sharding_user"), user); - - for i in 0..num_replica + 1 { - let mut cfg = base_cfg.clone(); - let port = rng.gen_range(10000..32000); - - cfg.general.port = port; - let (local_address, main_address) = match i { - 0 => ( - primary_pg_address.clone(), - ( - String::from("localhost"), - cfg.general.port as u16, - String::from("primary"), - ), - ), - _ => ( - replica_pg_address.clone(), - ( - String::from("localhost"), - cfg.general.port as u16, - String::from("replica"), - ), - ), - }; - - cfg.pools.get_mut(&db.clone()).unwrap().shards = HashMap::from([( - String::from("0"), - Shard { - database: db.clone(), - servers: vec![local_address], - }, - )]); - main_server_entries.push(main_address); - pgclowder.push(PgcatInstance::start(log::Level::Debug, Some(cfg))) - } - let mut main_cfg = base_cfg.clone(); - main_cfg.general.port = rng.gen_range(10000..32000); - main_cfg.pools.get_mut(&db.clone()).unwrap().shards = HashMap::from([( - String::from("0"), - Shard { - database: String::from("shard0"), - // Server entries point to other Pgcat processes - servers: main_server_entries, - }, - )]); - return ( - PgcatInstance::start(log::Level::Trace, Some(main_cfg)), - pgclowder, - ); - } - - fn value_within_range(target_value: i32, value: i32) { - if value == target_value { - return; - } - // Allow a buffer of 15% around the target value - let buffer = ((target_value as f32) * 0.15) as i32; - let start = target_value - buffer; - let end = target_value + buffer; - let result = if (start..end).contains(&value) { - "Pass" - } else { - "Fail" - }; - - println!( - "Expecting {} to fall between {} and {} ... {}", - value, start, end, result - ); - assert!((start..end).contains(&value)); - } - - #[test] - fn test_basic_load_balancing() { - let num_replicas: i32 = 3; - let (main_pgcat, mut proxy_instances) = - single_shard_setup(num_replicas as u8, String::from("shard0"), None); - - let conn_str = format!( - "postgres://sharding_user:sharding_user@localhost:{}/shard0", - main_pgcat.port - ); - let mut client = Client::connect(&conn_str, NoTls).unwrap(); - - for proxy in proxy_instances.iter_mut() { - proxy.begin_recording_query_count(); - } - let total_count = 5000; - for _ in 0..total_count { - if client.simple_query("SELECT 1").is_err() { - client = Client::connect(&conn_str, NoTls).unwrap(); - continue; - } - } - - let expected_share = total_count / (num_replicas + 1); - for proxy in proxy_instances.iter_mut() { - value_within_range(expected_share, proxy.end_recording_query_count() as i32); - } - } - - #[test] - fn test_failover_load_balancing() { - let num_replicas: i32 = 3; - let (main_pgcat, mut proxy_instances) = - single_shard_setup(num_replicas as u8, String::from("shard0"), None); - - let mut replica2 = proxy_instances.pop().unwrap(); - let mut replica1 = proxy_instances.pop().unwrap(); - let mut replica0 = proxy_instances.pop().unwrap(); - let mut primary = proxy_instances.pop().unwrap(); - - primary.begin_recording_query_count(); - replica0.begin_recording_query_count(); - replica1.begin_recording_query_count(); - replica2.begin_recording_query_count(); - - replica1.stop(); - - let conn_str = format!( - "postgres://sharding_user:sharding_user@localhost:{}/shard0", - main_pgcat.port - ); - let total_count = 2000; - let mut client = Client::connect(&conn_str, NoTls).unwrap(); - for _ in 0..total_count { - if client.simple_query("SELECT 1").is_err() { - client = Client::connect(&conn_str, NoTls).unwrap(); - continue; - } - } - let expected_share = total_count / num_replicas; - - value_within_range(expected_share, primary.end_recording_query_count() as i32); - value_within_range(expected_share, replica0.end_recording_query_count() as i32); - value_within_range(0, replica1.end_recording_query_count() as i32); - value_within_range(expected_share, replica2.end_recording_query_count() as i32); - } - - #[test] - fn test_load_balancing_with_query_routing() { - let num_replicas: i32 = 3; - let (main_pgcat, mut proxy_instances) = single_shard_setup(3, String::from("shard0"), None); - let mut cfg = main_pgcat.current_config(); - cfg.pools.get_mut("shard0").unwrap().primary_reads_enabled = false; - cfg.pools.get_mut("shard0").unwrap().query_parser_enabled = true; - cfg.pools.get_mut("shard0").unwrap().default_role = String::from("auto"); - main_pgcat.update_config(cfg); - main_pgcat.reload_config(); - - let mut replica2 = proxy_instances.pop().unwrap(); - let mut replica1 = proxy_instances.pop().unwrap(); - let mut replica0 = proxy_instances.pop().unwrap(); - let mut primary = proxy_instances.pop().unwrap(); - - primary.begin_recording_query_count(); - replica0.begin_recording_query_count(); - replica1.begin_recording_query_count(); - replica2.begin_recording_query_count(); - - let conn_str = format!( - "postgres://sharding_user:sharding_user@localhost:{}/shard0", - main_pgcat.port - ); - let mut client = Client::connect(&conn_str, NoTls).unwrap(); - client.simple_query("SET SERVER ROLE TO 'auto'").unwrap(); - - let total_count = 2000; - for _ in 0..total_count { - if client.simple_query("SELECT 1").is_err() { - client = Client::connect(&conn_str, NoTls).unwrap(); - continue; - } - } - let expected_share = total_count / num_replicas; - value_within_range(expected_share, replica0.end_recording_query_count() as i32); - value_within_range(expected_share, replica1.end_recording_query_count() as i32); - value_within_range(expected_share, replica2.end_recording_query_count() as i32); - value_within_range(0, primary.end_recording_query_count() as i32); - } -} From 6608e1484ade9ad1fc48c28285c3724902e134b9 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 21 Aug 2022 10:59:56 -0500 Subject: [PATCH 15/17] revert more --- Cargo.lock | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6579260a..2e20f709 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -167,36 +167,36 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" [[package]] name = "futures-channel" -version = "0.3.23" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bfc52cbddcfd745bf1740338492bb0bd83d76c67b445f91c5fb29fae29ecaa1" +checksum = "ba3dda0b6588335f360afc675d0564c17a77a2bda81ca178a4b6081bd86c7f0b" dependencies = [ "futures-core", ] [[package]] name = "futures-core" -version = "0.3.23" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2acedae88d38235936c3922476b10fced7b2b68136f5e3c03c2d5be348a1115" +checksum = "d0c8ff0461b82559810cdccfde3215c3f373807f5e5232b71479bff7bb2583d7" [[package]] name = "futures-sink" -version = "0.3.23" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca0bae1fe9752cf7fd9b0064c674ae63f97b37bc714d745cbde0afb7ec4e6765" +checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" [[package]] name = "futures-task" -version = "0.3.23" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "842fc63b931f4056a24d59de13fb1272134ce261816e063e634ad0c15cdc5306" +checksum = "6ee7c6485c30167ce4dfb83ac568a849fe53274c831081476ee13e0dce1aad72" [[package]] name = "futures-util" -version = "0.3.23" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0828a5471e340229c11c77ca80017937ce3c58cb788a17e5f1c2d5c485a9577" +checksum = "d9b5cf40b47a271f77a8b1bec03ca09044d99d2372c0de244e66430761127164" dependencies = [ "futures-channel", "futures-core", From 13b30543a6e0f71332aa9ed5acc189123f9c73ea Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 21 Aug 2022 18:00:03 -0500 Subject: [PATCH 16/17] Refactor + clean up --- src/config.rs | 6 ++-- src/pool.rs | 81 +++++++++++++++++++++------------------------------ 2 files changed, 37 insertions(+), 50 deletions(-) diff --git a/src/config.rs b/src/config.rs index ed338104..9d1658ff 100644 --- a/src/config.rs +++ b/src/config.rs @@ -63,7 +63,7 @@ pub struct Address { pub shard: usize, pub database: String, pub role: Role, - pub replica_number: usize, + pub instance_index: usize, pub username: String, pub poolname: String, } @@ -75,7 +75,7 @@ impl Default for Address { host: String::from("127.0.0.1"), port: String::from("5432"), shard: 0, - replica_number: 0, + instance_index: 0, database: String::from("database"), role: Role::Replica, username: String::from("username"), @@ -92,7 +92,7 @@ impl Address { Role::Replica => format!( "{}_shard_{}_replica_{}", - self.poolname, self.shard, self.replica_number + self.poolname, self.shard, self.instance_index ), } } diff --git a/src/pool.rs b/src/pool.rs index 4596b873..f49ede7f 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -6,6 +6,8 @@ use chrono::naive::NaiveDateTime; use log::{debug, error, info, warn}; use once_cell::sync::Lazy; use parking_lot::{Mutex, RwLock}; +use rand::seq::SliceRandom; +use rand::thread_rng; use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; @@ -118,7 +120,7 @@ impl ConnectionPool { host: server.0.clone(), port: server.1.to_string(), role: role, - replica_number, + instance_index: replica_number, shard: shard_idx.parse::().unwrap(), username: user_info.username.clone(), poolname: pool_name.clone(), @@ -247,61 +249,41 @@ impl ConnectionPool { process_id: i32, // client id ) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> { let now = Instant::now(); - let addresses = &self.addresses[shard]; + let mut candidates: Vec
= self.addresses[shard] + .clone() + .into_iter() + .filter(|address| address.role == role) + .collect(); - let mut allowed_attempts = match role { - // Primary-specific queries get one attempt, if the primary is down, - // nothing we should do about it I think. It's dangerous to retry - // write queries. - Some(Role::Primary) => 1, - - // Replicas get to try as many times as there are replicas - // and connections in the pool. - _ => addresses.len(), - }; - - debug!("Allowed attempts for {:?}: {}", role, allowed_attempts); - - let exists = match role { - Some(role) => addresses.iter().filter(|addr| addr.role == role).count() > 0, - None => true, - }; - - if !exists { - error!("Requested role {:?}, but none are configured", role); - return Err(Error::BadConfig); - } + // Random load balancing + candidates.shuffle(&mut thread_rng()); let healthcheck_timeout = get_config().general.healthcheck_timeout; let healthcheck_delay = get_config().general.healthcheck_delay as u128; - while allowed_attempts > 0 { - let index = rand::random::() % addresses.len(); - let address = &addresses[index]; - - // Make sure you're getting a primary or a replica - // as per request. If no specific role is requested, the first - // available will be chosen. - if address.role != role { - continue; - } + while !candidates.is_empty() { + // Get the next candidate + let address = match candidates.pop() { + Some(address) => address, + None => break, + }; - // Don't attempt to connect to banned servers. - if self.is_banned(address, shard, role) { + if self.is_banned(&address, shard, role) { continue; } - allowed_attempts -= 1; - // Indicate we're waiting on a server connection from a pool. self.stats.client_waiting(process_id, address.id); // Check if we can connect - let mut conn = match self.databases[shard][index].get().await { + let mut conn = match self.databases[address.shard][address.instance_index] + .get() + .await + { Ok(conn) => conn, Err(err) => { - error!("Banning replica {}, error: {:?}", index, err); - self.ban(address, shard, process_id); + error!("Banning instance {:?}, error: {:?}", address, err); + self.ban(&address, address.shard, process_id); self.stats.client_disconnecting(process_id, address.id); self.stats .checkout_time(now.elapsed().as_micros(), process_id, address.id); @@ -343,29 +325,34 @@ impl ConnectionPool { } // Health check failed. - Err(_) => { - error!("Banning replica {} because of failed health check", index); + Err(err) => { + error!( + "Banning instance {:?} because of failed health check, {:?}", + address, err + ); // Don't leave a bad connection in the pool. server.mark_bad(); - self.ban(address, shard, process_id); + self.ban(&address, address.shard, process_id); continue; } }, // Health check timed out. Err(_) => { - error!("Banning replica {} because of health check timeout", index); + error!( + "Banning instance {:?} because of health check timeout", + address + ); // Don't leave a bad connection in the pool. server.mark_bad(); - self.ban(address, shard, process_id); + self.ban(&address, address.shard, process_id); continue; } } } - return Err(Error::AllServersDown); } From 18bc61c73f9c4219d8511361f294d53defae6517 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 21 Aug 2022 18:09:26 -0500 Subject: [PATCH 17/17] more clean up --- src/pool.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/pool.rs b/src/pool.rs index f49ede7f..cbb9b43f 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -268,7 +268,7 @@ impl ConnectionPool { None => break, }; - if self.is_banned(&address, shard, role) { + if self.is_banned(&address, address.shard, role) { continue; } @@ -340,10 +340,10 @@ impl ConnectionPool { }, // Health check timed out. - Err(_) => { + Err(err) => { error!( - "Banning instance {:?} because of health check timeout", - address + "Banning instance {:?} because of health check timeout, {:?}", + address, err ); // Don't leave a bad connection in the pool. server.mark_bad();