From 1c261fea09e59792f2979d036504fba4d83ef632 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Tue, 25 Oct 2022 11:30:03 -0700 Subject: [PATCH] Starting automatic sharding --- pgcat.toml | 3 + src/client.rs | 2 +- src/config.rs | 9 +++ src/pool.rs | 5 ++ src/query_router.rs | 167 +++++++++++++++++++++++++++++++++++++++----- 5 files changed, 169 insertions(+), 17 deletions(-) diff --git a/pgcat.toml b/pgcat.toml index 9125afd5..8d588043 100644 --- a/pgcat.toml +++ b/pgcat.toml @@ -83,6 +83,9 @@ primary_reads_enabled = true # sharding_function = "pg_bigint_hash" +# Automatically parse this from queries and route queries to the right shard! +automatic_sharding_key = "id" + # Credentials for users that may connect to this cluster [pools.sharded_db.users.0] username = "sharding_user" diff --git a/src/client.rs b/src/client.rs index e72dbf79..7db92052 100644 --- a/src/client.rs +++ b/src/client.rs @@ -672,7 +672,7 @@ where // Normal query, not a custom command. None => { if query_router.query_parser_enabled() { - query_router.infer_role(message.clone()); + query_router.infer(message.clone()); } } diff --git a/src/config.rs b/src/config.rs index 1cb37595..647b0166 100644 --- a/src/config.rs +++ b/src/config.rs @@ -267,6 +267,10 @@ pub struct Pool { pub connect_timeout: Option, pub sharding_function: ShardingFunction, + + #[serde(default = "Pool::default_automatic_sharding_key")] + pub automatic_sharding_key: Option, + pub shards: BTreeMap, pub users: BTreeMap, } @@ -276,6 +280,10 @@ impl Pool { PoolMode::Transaction } + pub fn default_automatic_sharding_key() -> Option { + None + } + pub fn validate(&self) -> Result<(), Error> { match self.default_role.as_ref() { "any" => (), @@ -318,6 +326,7 @@ impl Default for Pool { query_parser_enabled: false, primary_reads_enabled: false, sharding_function: ShardingFunction::PgBigintHash, + automatic_sharding_key: None, connect_timeout: None, } } diff --git a/src/pool.rs b/src/pool.rs index 24505fa7..2b80f7ba 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -79,6 +79,9 @@ pub struct PoolSettings { // Sharding function. pub sharding_function: ShardingFunction, + + // Sharding key + pub automatic_sharding_key: Option, } impl Default for PoolSettings { @@ -91,6 +94,7 @@ impl Default for PoolSettings { query_parser_enabled: false, primary_reads_enabled: true, sharding_function: ShardingFunction::PgBigintHash, + automatic_sharding_key: None, } } } @@ -254,6 +258,7 @@ impl ConnectionPool { query_parser_enabled: pool_config.query_parser_enabled.clone(), primary_reads_enabled: pool_config.primary_reads_enabled, sharding_function: pool_config.sharding_function, + automatic_sharding_key: pool_config.automatic_sharding_key.clone(), }, }; diff --git a/src/query_router.rs b/src/query_router.rs index 269645b2..f65a7734 100644 --- a/src/query_router.rs +++ b/src/query_router.rs @@ -5,6 +5,7 @@ use log::{debug, error}; use once_cell::sync::OnceCell; use regex::{Regex, RegexSet}; use sqlparser::ast::Statement::{Query, StartTransaction}; +use sqlparser::ast::{BinaryOperator, Expr, SetExpr, Value}; use sqlparser::dialect::PostgreSqlDialect; use sqlparser::parser::Parser; @@ -12,6 +13,8 @@ use crate::config::Role; use crate::pool::PoolSettings; use crate::sharding::Sharder; +use std::collections::BTreeSet; + /// Regexes used to parse custom commands. const CUSTOM_SQL_REGEXES: [&str; 7] = [ r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$", @@ -256,7 +259,7 @@ impl QueryRouter { } /// Try to infer which server to connect to based on the contents of the query. - pub fn infer_role(&mut self, mut buf: BytesMut) -> bool { + pub fn infer(&mut self, mut buf: BytesMut) -> bool { debug!("Inferring role"); let code = buf.get_u8() as char; @@ -324,7 +327,21 @@ impl QueryRouter { } // Likely a read-only query - Query { .. } => { + Query(query) => { + match &self.pool_settings.automatic_sharding_key { + Some(_) => { + // TODO: if we have multiple queries in the same message, + // we can either split them and execute them individually + // or discard shard selection. If they point to the same shard though, + // we can let them through as-is. + // This is basically building a database now :) + self.active_shard = self.infer_shard(query); + debug!("Automatically using shard: {:?}", self.active_shard); + } + + None => (), + }; + self.active_role = match self.primary_reads_enabled() { false => Some(Role::Replica), // If primary should not be receiving reads, use a replica. true => None, // Any server role is fine in this case. @@ -342,6 +359,118 @@ impl QueryRouter { true } + /// A `selection` is the `WHERE` clause. This parses + /// the clause and extracts the sharding key, if present. + fn selection_parser(&self, expr: &Expr) -> Vec { + let mut result = Vec::new(); + let mut found = false; + + match expr { + // This parses `sharding_key = 5`. But it's technically + // legal to write `5 = sharding_key`. I don't judge the people + // who do that, but I think ORMs will still use the first variant, + // so we can leave the second as a TODO. + Expr::BinaryOp { left, op, right } => { + match &**left { + Expr::BinaryOp { .. } => result.extend(self.selection_parser(&left)), + Expr::Identifier(ident) => { + found = ident.value + == *self.pool_settings.automatic_sharding_key.as_ref().unwrap(); + } + _ => (), + }; + + match op { + BinaryOperator::Eq => (), + BinaryOperator::Or => (), + BinaryOperator::And => (), + _ => { + // TODO: support other operators than equality. + debug!("Unsupported operation: {:?}", op); + return Vec::new(); + } + }; + + match &**right { + Expr::BinaryOp { .. } => result.extend(self.selection_parser(&right)), + Expr::Value(Value::Number(value, ..)) => { + if found { + match value.parse::() { + Ok(value) => result.push(value), + Err(_) => { + debug!("Sharding key was not an integer: {}", value); + } + }; + } + } + _ => (), + }; + } + + _ => (), + }; + + debug!("Sharding keys found: {:?}", result); + + result + } + + /// Try to figure out which shard the query should go to. + fn infer_shard(&self, query: &sqlparser::ast::Query) -> Option { + let mut shards = BTreeSet::new(); + + match &*query.body { + SetExpr::Query(query) => { + match self.infer_shard(&*query) { + Some(shard) => { + shards.insert(shard); + } + None => (), + }; + } + + SetExpr::Select(select) => { + match &select.selection { + Some(selection) => { + let sharding_keys = self.selection_parser(&selection); + + // TODO: Add support for prepared statements here. + // This should just give us the position of the value in the `B` message. + + let sharder = Sharder::new( + self.pool_settings.shards, + self.pool_settings.sharding_function, + ); + + for value in sharding_keys { + let shard = sharder.shard(value); + shards.insert(shard); + } + } + + None => (), + }; + } + _ => (), + }; + + match shards.len() { + // Didn't find a sharding key, you're on your own. + 0 => { + debug!("No sharding keys found"); + None + } + + 1 => Some(shards.into_iter().last().unwrap()), + + // TODO: support querying multiple shards (some day...) + _ => { + debug!("More than one sharding key found"); + None + } + } + } + /// Get the current desired server role we should be talking to. pub fn role(&self) -> Option { self.active_role @@ -392,7 +521,7 @@ mod test { } #[test] - fn test_infer_role_replica() { + fn test_infer_replica() { QueryRouter::setup(); let mut qr = QueryRouter::new(); assert!(qr.try_execute_command(simple_query("SET SERVER ROLE TO 'auto'")) != None); @@ -410,13 +539,13 @@ mod test { for query in queries { // It's a recognized query - assert!(qr.infer_role(query)); + assert!(qr.infer(query)); assert_eq!(qr.role(), Some(Role::Replica)); } } #[test] - fn test_infer_role_primary() { + fn test_infer_primary() { QueryRouter::setup(); let mut qr = QueryRouter::new(); @@ -429,24 +558,24 @@ mod test { for query in queries { // It's a recognized query - assert!(qr.infer_role(query)); + assert!(qr.infer(query)); assert_eq!(qr.role(), Some(Role::Primary)); } } #[test] - fn test_infer_role_primary_reads_enabled() { + fn test_infer_primary_reads_enabled() { QueryRouter::setup(); let mut qr = QueryRouter::new(); let query = simple_query("SELECT * FROM items WHERE id = 5"); assert!(qr.try_execute_command(simple_query("SET PRIMARY READS TO on")) != None); - assert!(qr.infer_role(query)); + assert!(qr.infer(query)); assert_eq!(qr.role(), None); } #[test] - fn test_infer_role_parse_prepared() { + fn test_infer_parse_prepared() { QueryRouter::setup(); let mut qr = QueryRouter::new(); qr.try_execute_command(simple_query("SET SERVER ROLE TO 'auto'")); @@ -461,7 +590,7 @@ mod test { res.put(prepared_stmt); res.put_i16(0); - assert!(qr.infer_role(res)); + assert!(qr.infer(res)); assert_eq!(qr.role(), Some(Role::Replica)); } @@ -625,11 +754,11 @@ mod test { assert_eq!(qr.role(), None); let query = simple_query("INSERT INTO test_table VALUES (1)"); - assert_eq!(qr.infer_role(query), true); + assert_eq!(qr.infer(query), true); assert_eq!(qr.role(), Some(Role::Primary)); let query = simple_query("SELECT * FROM test_table"); - assert_eq!(qr.infer_role(query), true); + assert_eq!(qr.infer(query), true); assert_eq!(qr.role(), Some(Role::Replica)); assert!(qr.query_parser_enabled()); @@ -644,12 +773,13 @@ mod test { let pool_settings = PoolSettings { pool_mode: PoolMode::Transaction, - shards: 0, + shards: 2, user: crate::config::User::default(), default_role: Some(Role::Replica), query_parser_enabled: true, primary_reads_enabled: false, sharding_function: ShardingFunction::PgBigintHash, + automatic_sharding_key: Some(String::from("id")), }; let mut qr = QueryRouter::new(); assert_eq!(qr.active_role, None); @@ -672,6 +802,11 @@ mod test { let q2 = simple_query("SET SERVER ROLE TO 'default'"); assert!(qr.try_execute_command(q2) != None); assert_eq!(qr.active_role.unwrap(), pool_settings.clone().default_role); + + // Here we go :) + let q3 = simple_query("SELECT * FROM test WHERE id = 5 AND values IN (1, 2, 3)"); + assert!(qr.infer(q3)); + assert_eq!(qr.shard(), 1); } #[test] @@ -679,13 +814,13 @@ mod test { QueryRouter::setup(); let mut qr = QueryRouter::new(); - assert!(qr.infer_role(simple_query("BEGIN; SELECT 1; COMMIT;"))); + assert!(qr.infer(simple_query("BEGIN; SELECT 1; COMMIT;"))); assert_eq!(qr.role(), Role::Primary); - assert!(qr.infer_role(simple_query("SELECT 1; SELECT 2;"))); + assert!(qr.infer(simple_query("SELECT 1; SELECT 2;"))); assert_eq!(qr.role(), Role::Replica); - assert!(qr.infer_role(simple_query( + assert!(qr.infer(simple_query( "SELECT 123; INSERT INTO t VALUES (5); SELECT 1;" ))); assert_eq!(qr.role(), Role::Primary);