Skip to content

Commit

Permalink
Make infer role configurable and fix double parse bug (#533)
Browse files Browse the repository at this point in the history
* Make infer role configurable and fix double parse bug

* Fix tests

* Enable infer_role_from query in toml for tests

* Fix test

* Add max length config, add logging for which application is failing to parse, and change config name

* fmt

* Update src/config.rs

---------

Co-authored-by: Lev Kokotov <levkk@users.noreply.github.com>
  • Loading branch information
zainkabani and levkk authored Aug 8, 2023
1 parent 7c3c90c commit e14b283
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 60 deletions.
5 changes: 5 additions & 0 deletions .circleci/pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ default_role = "any"
# we'll direct it to the primary.
query_parser_enabled = true

# If the query parser is enabled and this setting is enabled, we'll attempt to
# infer the role from the query itself.
query_parser_read_write_splitting = true

# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
# queries. The primary can always be explicitely selected with our custom protocol.
Expand Down Expand Up @@ -134,6 +138,7 @@ database = "shard2"
pool_mode = "session"
default_role = "primary"
query_parser_enabled = true
query_parser_read_write_splitting = true
primary_reads_enabled = true
sharding_function = "pg_bigint_hash"

Expand Down
4 changes: 4 additions & 0 deletions examples/docker/pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ default_role = "any"
# we'll direct it to the primary.
query_parser_enabled = true

# If the query parser is enabled and this setting is enabled, we'll attempt to
# infer the role from the query itself.
query_parser_read_write_splitting = true

# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
# queries. The primary can always be explicitly selected with our custom protocol.
Expand Down
4 changes: 4 additions & 0 deletions pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ default_role = "any"
# we'll direct it to the primary.
query_parser_enabled = true

# If the query parser is enabled and this setting is enabled, we'll attempt to
# infer the role from the query itself.
query_parser_read_write_splitting = true

# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
# queries. The primary can always be explicitly selected with our custom protocol.
Expand Down
112 changes: 76 additions & 36 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,9 @@ where
let mut prepared_statement = None;
let mut will_prepare = false;

let client_identifier =
ClientIdentifier::new(&self.application_name, &self.username, &self.pool_name);

// Our custom protocol loop.
// We expect the client to either start a transaction with regular queries
// or issue commands for our sharding and server selection protocol.
Expand Down Expand Up @@ -812,6 +815,21 @@ where
message_result = read_message(&mut self.read) => message_result?
};

// Handle admin database queries.
if self.admin {
debug!("Handling admin command");
handle_admin(&mut self.write, message, self.client_server_map.clone()).await?;
continue;
}

// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
// when starting a query.
let mut pool = self.get_pool().await?;
query_router.update_pool_settings(pool.settings.clone());

let mut initial_parsed_ast = None;

match message[0] as char {
// Buffer extended protocol messages even if we do not have
// a server connection yet. Hopefully, when we get the S message
Expand Down Expand Up @@ -841,24 +859,34 @@ where

'Q' => {
if query_router.query_parser_enabled() {
if let Ok(ast) = QueryRouter::parse(&message) {
let plugin_result = query_router.execute_plugins(&ast).await;
match query_router.parse(&message) {
Ok(ast) => {
let plugin_result = query_router.execute_plugins(&ast).await;

match plugin_result {
Ok(PluginOutput::Deny(error)) => {
error_response(&mut self.write, &error).await?;
continue;
}
match plugin_result {
Ok(PluginOutput::Deny(error)) => {
error_response(&mut self.write, &error).await?;
continue;
}

Ok(PluginOutput::Intercept(result)) => {
write_all(&mut self.write, result).await?;
continue;
}
Ok(PluginOutput::Intercept(result)) => {
write_all(&mut self.write, result).await?;
continue;
}

_ => (),
};
_ => (),
};

let _ = query_router.infer(&ast);

let _ = query_router.infer(&ast);
initial_parsed_ast = Some(ast);
}
Err(error) => {
warn!(
"Query parsing error: {} (client: {})",
error, client_identifier
);
}
}
}
}
Expand All @@ -872,13 +900,21 @@ where
self.buffer.put(&message[..]);

if query_router.query_parser_enabled() {
if let Ok(ast) = QueryRouter::parse(&message) {
if let Ok(output) = query_router.execute_plugins(&ast).await {
plugin_output = Some(output);
}
match query_router.parse(&message) {
Ok(ast) => {
if let Ok(output) = query_router.execute_plugins(&ast).await {
plugin_output = Some(output);
}

let _ = query_router.infer(&ast);
}
let _ = query_router.infer(&ast);
}
Err(error) => {
warn!(
"Query parsing error: {} (client: {})",
error, client_identifier
);
}
};
}

continue;
Expand Down Expand Up @@ -922,13 +958,6 @@ where
_ => (),
}

// Handle admin database queries.
if self.admin {
debug!("Handling admin command");
handle_admin(&mut self.write, message, self.client_server_map.clone()).await?;
continue;
}

// Check on plugin results.
match plugin_output {
Some(PluginOutput::Deny(error)) => {
Expand All @@ -941,11 +970,6 @@ where
_ => (),
};

// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
// when starting a query.
let mut pool = self.get_pool().await?;

// Check if the pool is paused and wait until it's resumed.
if pool.wait_paused().await {
// Refresh pool information, something might have changed.
Expand Down Expand Up @@ -1165,6 +1189,9 @@ where
None => {
trace!("Waiting for message inside transaction or in session mode");

// This is not an initial message so discard the initial_parsed_ast
initial_parsed_ast.take();

match tokio::time::timeout(
idle_client_timeout_duration,
read_message(&mut self.read),
Expand Down Expand Up @@ -1221,7 +1248,22 @@ where
// Query
'Q' => {
if query_router.query_parser_enabled() {
if let Ok(ast) = QueryRouter::parse(&message) {
// We don't want to parse again if we already parsed it as the initial message
let ast = match initial_parsed_ast {
Some(_) => Some(initial_parsed_ast.take().unwrap()),
None => match query_router.parse(&message) {
Ok(ast) => Some(ast),
Err(error) => {
warn!(
"Query parsing error: {} (client: {})",
error, client_identifier
);
None
}
},
};

if let Some(ast) = ast {
let plugin_result = query_router.execute_plugins(&ast).await;

match plugin_result {
Expand All @@ -1237,8 +1279,6 @@ where

_ => (),
};

let _ = query_router.infer(&ast);
}
}
debug!("Sending query to server");
Expand Down Expand Up @@ -1290,7 +1330,7 @@ where
}

if query_router.query_parser_enabled() {
if let Ok(ast) = QueryRouter::parse(&message) {
if let Ok(ast) = query_router.parse(&message) {
if let Ok(output) = query_router.execute_plugins(&ast).await {
plugin_output = Some(output);
}
Expand Down
39 changes: 39 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,11 @@ pub struct Pool {
#[serde(default)] // False
pub query_parser_enabled: bool,

pub query_parser_max_length: Option<usize>,

#[serde(default)] // False
pub query_parser_read_write_splitting: bool,

#[serde(default)] // False
pub primary_reads_enabled: bool,

Expand Down Expand Up @@ -627,6 +632,18 @@ impl Pool {
}
}

if self.query_parser_read_write_splitting && !self.query_parser_enabled {
error!(
"query_parser_read_write_splitting is only valid when query_parser_enabled is true"
);
return Err(Error::BadConfig);
}

if self.plugins.is_some() && !self.query_parser_enabled {
error!("plugins are only valid when query_parser_enabled is true");
return Err(Error::BadConfig);
}

self.automatic_sharding_key = match &self.automatic_sharding_key {
Some(key) => {
// No quotes in the key so we don't have to compare quoted
Expand Down Expand Up @@ -663,6 +680,8 @@ impl Default for Pool {
users: BTreeMap::default(),
default_role: String::from("any"),
query_parser_enabled: false,
query_parser_max_length: None,
query_parser_read_write_splitting: false,
primary_reads_enabled: false,
sharding_function: ShardingFunction::PgBigintHash,
automatic_sharding_key: None,
Expand Down Expand Up @@ -914,6 +933,17 @@ impl From<&Config> for std::collections::HashMap<String, String> {
format!("pools.{}.query_parser_enabled", pool_name),
pool.query_parser_enabled.to_string(),
),
(
format!("pools.{}.query_parser_max_length", pool_name),
match pool.query_parser_max_length {
Some(max_length) => max_length.to_string(),
None => String::from("unlimited"),
},
),
(
format!("pools.{}.query_parser_read_write_splitting", pool_name),
pool.query_parser_read_write_splitting.to_string(),
),
(
format!("pools.{}.default_role", pool_name),
pool.default_role.clone(),
Expand Down Expand Up @@ -1096,6 +1126,15 @@ impl Config {
"[pool: {}] Query router: {}",
pool_name, pool_config.query_parser_enabled
);

info!(
"[pool: {}] Query parser max length: {:?}",
pool_name, pool_config.query_parser_max_length
);
info!(
"[pool: {}] Infer role from query: {}",
pool_name, pool_config.query_parser_read_write_splitting
);
info!(
"[pool: {}] Number of shards: {}",
pool_name,
Expand Down
11 changes: 11 additions & 0 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ pub struct PoolSettings {
// Enable/disable query parser.
pub query_parser_enabled: bool,

// Max length of query the parser will parse.
pub query_parser_max_length: Option<usize>,

// Infer role
pub query_parser_read_write_splitting: bool,

// Read from the primary as well or not.
pub primary_reads_enabled: bool,

Expand Down Expand Up @@ -157,6 +163,8 @@ impl Default for PoolSettings {
db: String::default(),
default_role: None,
query_parser_enabled: false,
query_parser_max_length: None,
query_parser_read_write_splitting: false,
primary_reads_enabled: true,
sharding_function: ShardingFunction::PgBigintHash,
automatic_sharding_key: None,
Expand Down Expand Up @@ -456,6 +464,9 @@ impl ConnectionPool {
_ => unreachable!(),
},
query_parser_enabled: pool_config.query_parser_enabled,
query_parser_max_length: pool_config.query_parser_max_length,
query_parser_read_write_splitting: pool_config
.query_parser_read_write_splitting,
primary_reads_enabled: pool_config.primary_reads_enabled,
sharding_function: pool_config.sharding_function,
automatic_sharding_key: pool_config.automatic_sharding_key.clone(),
Expand Down
Loading

0 comments on commit e14b283

Please sign in to comment.