diff --git a/Cargo.lock b/Cargo.lock index 746cf32d..6b254832 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -875,9 +875,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" [[package]] name = "sqlparser" -version = "0.23.0" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0beb13adabbdda01b63d595f38c8bfd19a361e697fd94ce0098a634077bc5b25" +checksum = "86be66ea0b2b22749cfa157d16e2e84bf793e626a3375f4d378dc289fa03affb" dependencies = [ "log", ] diff --git a/Cargo.toml b/Cargo.toml index 8284d2df..e06b6930 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ serde_derive = "1" regex = "1" num_cpus = "1" once_cell = "1" -sqlparser = "0.23.0" +sqlparser = "0.26.0" log = "0.4" arc-swap = "1" env_logger = "0.9" diff --git a/src/query_router.rs b/src/query_router.rs index 34745edd..269645b2 100644 --- a/src/query_router.rs +++ b/src/query_router.rs @@ -50,10 +50,10 @@ pub struct QueryRouter { active_role: Option, /// Should we try to parse queries to route them to replicas or primary automatically - query_parser_enabled: bool, + query_parser_enabled: Option, /// Include the primary into the replica pool for reads. - primary_reads_enabled: bool, + primary_reads_enabled: Option, /// Pool configuration. pool_settings: PoolSettings, @@ -95,8 +95,8 @@ impl QueryRouter { QueryRouter { active_shard: None, active_role: None, - query_parser_enabled: false, - primary_reads_enabled: false, + query_parser_enabled: None, + primary_reads_enabled: None, pool_settings: PoolSettings::default(), } } @@ -172,7 +172,7 @@ impl QueryRouter { Some(Role::Primary) => Role::Primary.to_string(), Some(Role::Replica) => Role::Replica.to_string(), None => { - if self.query_parser_enabled { + if self.query_parser_enabled() { String::from("auto") } else { String::from("any") @@ -180,7 +180,7 @@ impl QueryRouter { } }, - Command::ShowPrimaryReads => match self.primary_reads_enabled { + Command::ShowPrimaryReads => match self.primary_reads_enabled() { true => String::from("on"), false => String::from("off"), }, @@ -207,28 +207,28 @@ impl QueryRouter { Command::SetServerRole => { self.active_role = match value.to_ascii_lowercase().as_ref() { "primary" => { - self.query_parser_enabled = false; + self.query_parser_enabled = Some(false); Some(Role::Primary) } "replica" => { - self.query_parser_enabled = false; + self.query_parser_enabled = Some(false); Some(Role::Replica) } "any" => { - self.query_parser_enabled = false; + self.query_parser_enabled = Some(false); None } "auto" => { - self.query_parser_enabled = true; + self.query_parser_enabled = Some(true); None } "default" => { self.active_role = self.pool_settings.default_role; - self.query_parser_enabled = self.query_parser_enabled; + self.query_parser_enabled = None; self.active_role } @@ -239,13 +239,13 @@ impl QueryRouter { Command::SetPrimaryReads => { if value == "on" { debug!("Setting primary reads to on"); - self.primary_reads_enabled = true; + self.primary_reads_enabled = Some(true); } else if value == "off" { debug!("Setting primary reads to off"); - self.primary_reads_enabled = false; + self.primary_reads_enabled = Some(false); } else if value == "default" { debug!("Setting primary reads to default"); - self.primary_reads_enabled = self.pool_settings.primary_reads_enabled; + self.primary_reads_enabled = None; } } @@ -300,34 +300,44 @@ impl QueryRouter { let ast = match Parser::parse_sql(&PostgreSqlDialect {}, &query) { Ok(ast) => ast, Err(err) => { - debug!("{}", err.to_string()); + // SELECT ... FOR UPDATE won't get parsed correctly. + error!("{}: {}", err, query); + self.active_role = Some(Role::Primary); return false; } }; + debug!("AST: {:?}", ast); + if ast.len() == 0 { + // That's weird, no idea, let's go to primary + self.active_role = Some(Role::Primary); return false; } - match ast[0] { - // All transactions go to the primary, probably a write. - StartTransaction { .. } => { - self.active_role = Some(Role::Primary); - } + for q in &ast { + match q { + // All transactions go to the primary, probably a write. + StartTransaction { .. } => { + self.active_role = Some(Role::Primary); + break; + } - // Likely a read-only query - Query { .. } => { - self.active_role = match self.primary_reads_enabled { - false => Some(Role::Replica), // If primary should not be receiving reads, use a replica. - true => None, // Any server role is fine in this case. + // Likely a read-only query + Query { .. } => { + self.active_role = match self.primary_reads_enabled() { + false => Some(Role::Replica), // If primary should not be receiving reads, use a replica. + true => None, // Any server role is fine in this case. + } } - } - // Likely a write - _ => { - self.active_role = Some(Role::Primary); - } - }; + // Likely a write + _ => { + self.active_role = Some(Role::Primary); + break; + } + }; + } true } @@ -350,9 +360,18 @@ impl QueryRouter { } /// Should we attempt to parse queries? - #[allow(dead_code)] pub fn query_parser_enabled(&self) -> bool { - self.query_parser_enabled + match self.query_parser_enabled { + None => self.pool_settings.query_parser_enabled, + Some(value) => value, + } + } + + pub fn primary_reads_enabled(&self) -> bool { + match self.primary_reads_enabled { + None => self.pool_settings.primary_reads_enabled, + Some(value) => value, + } } } @@ -616,7 +635,7 @@ mod test { assert!(qr.query_parser_enabled()); let query = simple_query("SET SERVER ROLE TO 'default'"); assert!(qr.try_execute_command(query) != None); - assert!(qr.query_parser_enabled()); + assert!(!qr.query_parser_enabled()); } #[test] @@ -635,16 +654,16 @@ mod test { let mut qr = QueryRouter::new(); assert_eq!(qr.active_role, None); assert_eq!(qr.active_shard, None); - assert_eq!(qr.query_parser_enabled, false); - assert_eq!(qr.primary_reads_enabled, false); + assert_eq!(qr.query_parser_enabled, None); + assert_eq!(qr.primary_reads_enabled, None); // Internal state must not be changed due to this, only defaults qr.update_pool_settings(pool_settings.clone()); assert_eq!(qr.active_role, None); assert_eq!(qr.active_shard, None); - assert_eq!(qr.query_parser_enabled, false); - assert_eq!(qr.primary_reads_enabled, false); + assert_eq!(qr.query_parser_enabled(), true); + assert_eq!(qr.primary_reads_enabled(), false); let q1 = simple_query("SET SERVER ROLE TO 'primary'"); assert!(qr.try_execute_command(q1) != None); @@ -654,4 +673,21 @@ mod test { assert!(qr.try_execute_command(q2) != None); assert_eq!(qr.active_role.unwrap(), pool_settings.clone().default_role); } + + #[test] + fn test_parse_multiple_queries() { + QueryRouter::setup(); + + let mut qr = QueryRouter::new(); + assert!(qr.infer_role(simple_query("BEGIN; SELECT 1; COMMIT;"))); + assert_eq!(qr.role(), Role::Primary); + + assert!(qr.infer_role(simple_query("SELECT 1; SELECT 2;"))); + assert_eq!(qr.role(), Role::Replica); + + assert!(qr.infer_role(simple_query( + "SELECT 123; INSERT INTO t VALUES (5); SELECT 1;" + ))); + assert_eq!(qr.role(), Role::Primary); + } }