Skip to content

Automatic sharding: part one of many #194

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ primary_reads_enabled = true
#
sharding_function = "pg_bigint_hash"

# Automatically parse this from queries and route queries to the right shard!
automatic_sharding_key = "id"

# Credentials for users that may connect to this cluster
[pools.sharded_db.users.0]
username = "sharding_user"
Expand Down
2 changes: 1 addition & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ where
// Normal query, not a custom command.
None => {
if query_router.query_parser_enabled() {
query_router.infer_role(message.clone());
query_router.infer(message.clone());
}
}

Expand Down
9 changes: 9 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ pub struct Pool {
pub connect_timeout: Option<u64>,

pub sharding_function: ShardingFunction,

#[serde(default = "Pool::default_automatic_sharding_key")]
pub automatic_sharding_key: Option<String>,

pub shards: BTreeMap<String, Shard>,
pub users: BTreeMap<String, User>,
}
Expand All @@ -276,6 +280,10 @@ impl Pool {
PoolMode::Transaction
}

pub fn default_automatic_sharding_key() -> Option<String> {
None
}

pub fn validate(&self) -> Result<(), Error> {
match self.default_role.as_ref() {
"any" => (),
Expand Down Expand Up @@ -318,6 +326,7 @@ impl Default for Pool {
query_parser_enabled: false,
primary_reads_enabled: false,
sharding_function: ShardingFunction::PgBigintHash,
automatic_sharding_key: None,
connect_timeout: None,
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ pub struct PoolSettings {

// Sharding function.
pub sharding_function: ShardingFunction,

// Sharding key
pub automatic_sharding_key: Option<String>,
}

impl Default for PoolSettings {
Expand All @@ -91,6 +94,7 @@ impl Default for PoolSettings {
query_parser_enabled: false,
primary_reads_enabled: true,
sharding_function: ShardingFunction::PgBigintHash,
automatic_sharding_key: None,
}
}
}
Expand Down Expand Up @@ -254,6 +258,7 @@ impl ConnectionPool {
query_parser_enabled: pool_config.query_parser_enabled.clone(),
primary_reads_enabled: pool_config.primary_reads_enabled,
sharding_function: pool_config.sharding_function,
automatic_sharding_key: pool_config.automatic_sharding_key.clone(),
},
};

Expand Down
167 changes: 151 additions & 16 deletions src/query_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ use log::{debug, error};
use once_cell::sync::OnceCell;
use regex::{Regex, RegexSet};
use sqlparser::ast::Statement::{Query, StartTransaction};
use sqlparser::ast::{BinaryOperator, Expr, SetExpr, Value};
use sqlparser::dialect::PostgreSqlDialect;
use sqlparser::parser::Parser;

use crate::config::Role;
use crate::pool::PoolSettings;
use crate::sharding::Sharder;

use std::collections::BTreeSet;

/// Regexes used to parse custom commands.
const CUSTOM_SQL_REGEXES: [&str; 7] = [
r"(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$",
Expand Down Expand Up @@ -256,7 +259,7 @@ impl QueryRouter {
}

/// Try to infer which server to connect to based on the contents of the query.
pub fn infer_role(&mut self, mut buf: BytesMut) -> bool {
pub fn infer(&mut self, mut buf: BytesMut) -> bool {
debug!("Inferring role");

let code = buf.get_u8() as char;
Expand Down Expand Up @@ -324,7 +327,21 @@ impl QueryRouter {
}

// Likely a read-only query
Query { .. } => {
Query(query) => {
match &self.pool_settings.automatic_sharding_key {
Some(_) => {
// TODO: if we have multiple queries in the same message,
// we can either split them and execute them individually
// or discard shard selection. If they point to the same shard though,
// we can let them through as-is.
// This is basically building a database now :)
self.active_shard = self.infer_shard(query);
debug!("Automatically using shard: {:?}", self.active_shard);
}

None => (),
};

self.active_role = match self.primary_reads_enabled() {
false => Some(Role::Replica), // If primary should not be receiving reads, use a replica.
true => None, // Any server role is fine in this case.
Expand All @@ -342,6 +359,118 @@ impl QueryRouter {
true
}

/// A `selection` is the `WHERE` clause. This parses
/// the clause and extracts the sharding key, if present.
fn selection_parser(&self, expr: &Expr) -> Vec<i64> {
let mut result = Vec::new();
let mut found = false;

match expr {
// This parses `sharding_key = 5`. But it's technically
// legal to write `5 = sharding_key`. I don't judge the people
// who do that, but I think ORMs will still use the first variant,
// so we can leave the second as a TODO.
Expr::BinaryOp { left, op, right } => {
match &**left {
Expr::BinaryOp { .. } => result.extend(self.selection_parser(&left)),
Expr::Identifier(ident) => {
found = ident.value
== *self.pool_settings.automatic_sharding_key.as_ref().unwrap();
}
_ => (),
};

match op {
BinaryOperator::Eq => (),
BinaryOperator::Or => (),
BinaryOperator::And => (),
_ => {
// TODO: support other operators than equality.
debug!("Unsupported operation: {:?}", op);
return Vec::new();
}
};

match &**right {
Expr::BinaryOp { .. } => result.extend(self.selection_parser(&right)),
Expr::Value(Value::Number(value, ..)) => {
if found {
match value.parse::<i64>() {
Ok(value) => result.push(value),
Err(_) => {
debug!("Sharding key was not an integer: {}", value);
}
};
}
}
_ => (),
};
}

_ => (),
};

debug!("Sharding keys found: {:?}", result);

result
}

/// Try to figure out which shard the query should go to.
fn infer_shard(&self, query: &sqlparser::ast::Query) -> Option<usize> {
let mut shards = BTreeSet::new();

match &*query.body {
SetExpr::Query(query) => {
match self.infer_shard(&*query) {
Some(shard) => {
shards.insert(shard);
}
None => (),
};
}

SetExpr::Select(select) => {
match &select.selection {
Some(selection) => {
let sharding_keys = self.selection_parser(&selection);

// TODO: Add support for prepared statements here.
// This should just give us the position of the value in the `B` message.

let sharder = Sharder::new(
self.pool_settings.shards,
self.pool_settings.sharding_function,
);

for value in sharding_keys {
let shard = sharder.shard(value);
shards.insert(shard);
}
}

None => (),
};
}
_ => (),
};

match shards.len() {
// Didn't find a sharding key, you're on your own.
0 => {
debug!("No sharding keys found");
None
}

1 => Some(shards.into_iter().last().unwrap()),

// TODO: support querying multiple shards (some day...)
_ => {
debug!("More than one sharding key found");
None
}
}
}

/// Get the current desired server role we should be talking to.
pub fn role(&self) -> Option<Role> {
self.active_role
Expand Down Expand Up @@ -392,7 +521,7 @@ mod test {
}

#[test]
fn test_infer_role_replica() {
fn test_infer_replica() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
assert!(qr.try_execute_command(simple_query("SET SERVER ROLE TO 'auto'")) != None);
Expand All @@ -410,13 +539,13 @@ mod test {

for query in queries {
// It's a recognized query
assert!(qr.infer_role(query));
assert!(qr.infer(query));
assert_eq!(qr.role(), Some(Role::Replica));
}
}

#[test]
fn test_infer_role_primary() {
fn test_infer_primary() {
QueryRouter::setup();
let mut qr = QueryRouter::new();

Expand All @@ -429,24 +558,24 @@ mod test {

for query in queries {
// It's a recognized query
assert!(qr.infer_role(query));
assert!(qr.infer(query));
assert_eq!(qr.role(), Some(Role::Primary));
}
}

#[test]
fn test_infer_role_primary_reads_enabled() {
fn test_infer_primary_reads_enabled() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
let query = simple_query("SELECT * FROM items WHERE id = 5");
assert!(qr.try_execute_command(simple_query("SET PRIMARY READS TO on")) != None);

assert!(qr.infer_role(query));
assert!(qr.infer(query));
assert_eq!(qr.role(), None);
}

#[test]
fn test_infer_role_parse_prepared() {
fn test_infer_parse_prepared() {
QueryRouter::setup();
let mut qr = QueryRouter::new();
qr.try_execute_command(simple_query("SET SERVER ROLE TO 'auto'"));
Expand All @@ -461,7 +590,7 @@ mod test {
res.put(prepared_stmt);
res.put_i16(0);

assert!(qr.infer_role(res));
assert!(qr.infer(res));
assert_eq!(qr.role(), Some(Role::Replica));
}

Expand Down Expand Up @@ -625,11 +754,11 @@ mod test {
assert_eq!(qr.role(), None);

let query = simple_query("INSERT INTO test_table VALUES (1)");
assert_eq!(qr.infer_role(query), true);
assert_eq!(qr.infer(query), true);
assert_eq!(qr.role(), Some(Role::Primary));

let query = simple_query("SELECT * FROM test_table");
assert_eq!(qr.infer_role(query), true);
assert_eq!(qr.infer(query), true);
assert_eq!(qr.role(), Some(Role::Replica));

assert!(qr.query_parser_enabled());
Expand All @@ -644,12 +773,13 @@ mod test {

let pool_settings = PoolSettings {
pool_mode: PoolMode::Transaction,
shards: 0,
shards: 2,
user: crate::config::User::default(),
default_role: Some(Role::Replica),
query_parser_enabled: true,
primary_reads_enabled: false,
sharding_function: ShardingFunction::PgBigintHash,
automatic_sharding_key: Some(String::from("id")),
};
let mut qr = QueryRouter::new();
assert_eq!(qr.active_role, None);
Expand All @@ -672,20 +802,25 @@ mod test {
let q2 = simple_query("SET SERVER ROLE TO 'default'");
assert!(qr.try_execute_command(q2) != None);
assert_eq!(qr.active_role.unwrap(), pool_settings.clone().default_role);

// Here we go :)
let q3 = simple_query("SELECT * FROM test WHERE id = 5 AND values IN (1, 2, 3)");
assert!(qr.infer(q3));
assert_eq!(qr.shard(), 1);
}

#[test]
fn test_parse_multiple_queries() {
QueryRouter::setup();

let mut qr = QueryRouter::new();
assert!(qr.infer_role(simple_query("BEGIN; SELECT 1; COMMIT;")));
assert!(qr.infer(simple_query("BEGIN; SELECT 1; COMMIT;")));
assert_eq!(qr.role(), Role::Primary);

assert!(qr.infer_role(simple_query("SELECT 1; SELECT 2;")));
assert!(qr.infer(simple_query("SELECT 1; SELECT 2;")));
assert_eq!(qr.role(), Role::Replica);

assert!(qr.infer_role(simple_query(
assert!(qr.infer(simple_query(
"SELECT 123; INSERT INTO t VALUES (5); SELECT 1;"
)));
assert_eq!(qr.role(), Role::Primary);
Expand Down