Skip to content

Commit

Permalink
feat: support parse key partition (#506)
Browse files Browse the repository at this point in the history
* add key partition parser

* refactor the codes

* add unit test

* fix show create table for partition table

* address CR issues

* supplement the error msg when fail to parse partition key

* fix unit tests

Co-authored-by: CooooolFrog <zuliangwanghust@gmail.com>
  • Loading branch information
ShiKaiWi and ZuLiangWang authored Dec 23, 2022
1 parent 4b58bd7 commit 72d3542
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 48 deletions.
47 changes: 45 additions & 2 deletions interpreters/src/show_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,24 @@ impl ShowCreateInterpreter {
.as_str()
}
}
PartitionInfo::Key(v) => {
let rendered_partition_key = v.partition_key.join(",");
if v.linear {
res += format!(
" PARTITION BY LINEAR KEY({}) PARTITIONS {}",
rendered_partition_key,
v.definitions.len()
)
.as_str()
} else {
res += format!(
" PARTITION BY KEY({}) PARTITIONS {}",
rendered_partition_key,
v.definitions.len()
)
.as_str()
}
}
}

// TODO: update datafusion to remove `#`.
Expand Down Expand Up @@ -168,12 +186,12 @@ mod test {

use datafusion_expr::col;
use datafusion_proto::bytes::Serializeable;
use table_engine::partition::{Definition, HashPartitionInfo, PartitionInfo};
use table_engine::partition::{Definition, HashPartitionInfo, KeyPartitionInfo, PartitionInfo};

use super::ShowCreateInterpreter;

#[test]
fn test_render_partition_info() {
fn test_render_hash_partition_info() {
let expr = col("col1").add(col("col2"));
let partition_info = PartitionInfo::Hash(HashPartitionInfo {
definitions: vec![
Expand All @@ -196,4 +214,29 @@ mod test {
ShowCreateInterpreter::render_partition_info(Some(partition_info))
);
}

#[test]
fn test_render_key_partition_info() {
let partition_key_col_name = "col1";
let partition_info = PartitionInfo::Key(KeyPartitionInfo {
definitions: vec![
Definition {
name: "p0".to_string(),
origin_name: None,
},
Definition {
name: "p1".to_string(),
origin_name: None,
},
],
partition_key: vec![partition_key_col_name.to_string()],
linear: false,
});

let expected = " PARTITION BY KEY(col1) PARTITIONS 2".to_string();
assert_eq!(
expected,
ShowCreateInterpreter::render_partition_info(Some(partition_info))
);
}
}
9 changes: 8 additions & 1 deletion proto/protos/meta_update.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ message AddSpaceMeta {

message Definition {
string name = 1;
oneof origin_name{
oneof origin_name {
string origin = 2;
}
}
Expand All @@ -26,6 +26,12 @@ message HashPartitionInfo {
bool linear = 3;
}

message KeyPartitionInfo {
repeated Definition definitions = 1;
repeated string partition_key = 2;
bool linear = 3;
}

// Meta update for a new table
message AddTableMeta {
uint32 space_id = 1;
Expand All @@ -37,6 +43,7 @@ message AddTableMeta {
analytic_common.TableOptions options = 5;
oneof partition_info {
HashPartitionInfo hash = 6;
KeyPartitionInfo key_partition = 7;
}
}

Expand Down
9 changes: 9 additions & 0 deletions sql/src/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub struct CreateTable {
#[derive(Debug, PartialEq, Eq)]
pub enum Partition {
Hash(HashPartition),
Key(KeyPartition),
}

#[derive(Debug, PartialEq, Eq)]
Expand All @@ -87,6 +88,14 @@ pub struct HashPartition {
pub expr: sqlparser::ast::Expr,
}

#[derive(Debug, PartialEq, Eq)]
pub struct KeyPartition {
/// Key partition description: https://dev.mysql.com/doc/refman/5.7/en/partitioning-key.html
pub linear: bool,
pub partition_num: u64,
pub partition_key: Vec<String>,
}

#[derive(Debug, PartialEq, Eq)]
pub struct DropTable {
/// Table name
Expand Down
185 changes: 145 additions & 40 deletions sql/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ use table_engine::ANALYTIC_ENGINE_TYPE;

use crate::ast::{
AlterAddColumn, AlterModifySetting, CreateTable, DescribeTable, DropTable, ExistsTable,
HashPartition, Partition, ShowCreate, ShowCreateObject, ShowTables, Statement,
HashPartition, KeyPartition, Partition, ShowCreate, ShowCreateObject, ShowTables, Statement,
};

define_result!(ParserError);

// Use `Parser::expected` instead, if possible
macro_rules! parser_err {
($MSG:expr) => {
Err(ParserError::ParserError($MSG.to_string()))
Err(ParserError::ParserError($MSG))
};
}

Expand Down Expand Up @@ -537,7 +537,9 @@ impl<'a> Parser<'a> {
&mut self,
columns: &[ColumnDef],
) -> Result<Option<Partition>> {
// TODO: only hash type is supported now, we should support other types.
if let Some(key) = self.maybe_parse_and_check_key_partition(columns)? {
return Ok(Some(Partition::Key(key)));
}
if let Some(hash) = self.maybe_parse_and_check_hash_partition(columns)? {
return Ok(Some(Partition::Hash(hash)));
}
Expand All @@ -550,49 +552,102 @@ impl<'a> Parser<'a> {
columns: &[ColumnDef],
) -> Result<Option<HashPartition>> {
// Parse first part: "PARTITION BY HASH(expr)".
let (is_hash_partition, is_linear) = {
if self.consume_token("HASH") {
(true, false)
} else if self.consume_tokens(&["LINEAR", "HASH"]) {
(true, true)
} else {
(false, false)
}
};

if !is_hash_partition {
let linear = if self.consume_token("HASH") {
false
} else if self.consume_tokens(&["LINEAR", "HASH"]) {
true
} else {
return Ok(None);
}
};

// TODO: support all valid exprs not only column expr.
let expr = self.parse_and_check_expr_in_hash(columns)?;

// Parse second part: "PARTITIONS num" (if not set, num will use 1 as default).
let partition_num = if self.parser.parse_keyword(Keyword::PARTITIONS) {
match self.parser.parse_number_value()? {
sqlparser::ast::Value::Number(v, _) => match v.parse::<u64>() {
Ok(v) => v,
Err(e) => {
return parser_err!(format!(
"valid partition num after PARTITIONS, err:{}",
e
))
}
},
_ => return parser_err!("expect partition num after PARTITIONS"),
}
} else {
1
};
let partition_num = self.parse_partition_num()?;

// Parse successfully.
Ok(Some(HashPartition {
linear: is_linear,
linear,
partition_num,
expr,
}))
}

fn maybe_parse_and_check_key_partition(
&mut self,
columns: &[ColumnDef],
) -> Result<Option<KeyPartition>> {
let linear = if self.consume_token("KEY") {
false
} else if self.consume_tokens(&["LINEAR", "KEY"]) {
true
} else {
return Ok(None);
};

let key_columns = self
.parser
.parse_parenthesized_column_list(Mandatory)
.map_err(|e| {
ParserError::ParserError(format!("Fail to parse partition key, err:{}", e))
})?;

// Ensure at least one column for partition key.
if key_columns.is_empty() {
return parser_err!(
"except at least one partition key, default partition key is unsupported now"
.to_string()
);
}

// Validate all columns composing partition key:
// - The column must exist;
// - The column must be a tag;
for key_col in &key_columns {
let col_def = match columns.iter().find(|c| c.name.value == key_col.value) {
Some(v) => v,
None => {
return parser_err!(format!(
"partition key contains non-existent column:{}",
key_col.value,
))
}
};
let tag_column = col_def.options.iter().any(|opt| is_tag_column(&opt.option));
if !tag_column {
return parser_err!(format!(
"partition key must be tag, key name:{:?}",
key_col.value
));
}
}

let partition_num = self.parse_partition_num()?;
let partition_key = key_columns.into_iter().map(|v| v.value).collect();

// Parse successfully.
Ok(Some(KeyPartition {
linear,
partition_num,
partition_key,
}))
}

// Parse second part: "PARTITIONS num" (if not set, num will use 1 as default).
fn parse_partition_num(&mut self) -> Result<u64> {
if self.parser.parse_keyword(Keyword::PARTITIONS) {
match self.parser.parse_number_value()? {
sqlparser::ast::Value::Number(v, _) => match v.parse::<u64>() {
Ok(v) => Ok(v),
Err(e) => parser_err!(format!("invalid partition num, raw:{}, err:{}", v, e)),
},
v => parser_err!(format!("expect partition number, found:{}", v)),
}
} else {
Ok(1)
}
}

fn parse_and_check_expr_in_hash(&mut self, columns: &[ColumnDef]) -> Result<Expr> {
let expr = self.parser.parse_expr()?;
if let Expr::Nested(inner) = expr {
Expand Down Expand Up @@ -655,12 +710,8 @@ fn check_column_expr_validity_in_hash(column: &Ident, columns: &[ColumnDef]) ->
| DataType::UnsignedSmallInt(_)
| DataType::UnsignedBigInt(_)
);

let tag_option = col.options.iter().find(|opt| {
opt.option == ColumnOption::DialectSpecific(vec![Token::make_keyword(TAG)])
});

is_integer && tag_option.is_some()
let tag_column = col.options.iter().any(|opt| is_tag_column(&opt.option));
is_integer && tag_column
} else {
false
}
Expand Down Expand Up @@ -731,7 +782,10 @@ fn maybe_convert_table_name(object_name: &mut ObjectName) {

#[cfg(test)]
mod tests {
use sqlparser::ast::{ColumnOptionDef, DataType, Ident, ObjectName, Value};
use sqlparser::{
ast::{ColumnOptionDef, DataType, Ident, ObjectName, Value},
parser::ParserError::ParserError,
};

use super::*;
use crate::ast::TableName;
Expand Down Expand Up @@ -1190,6 +1244,7 @@ mod tests {
}

struct HashPartitionTableCases;

impl HashPartitionTableCases {
// Basic
fn basic() {
Expand Down Expand Up @@ -1289,4 +1344,54 @@ mod tests {
);
}
}

#[test]
fn test_key_partition() {
KeyPartitionTableCases::basic();
KeyPartitionTableCases::default_key_partition();
KeyPartitionTableCases::invalid_column_type();
}

struct KeyPartitionTableCases;

impl KeyPartitionTableCases {
fn basic() {
let sql = r#"CREATE TABLE `demo` (`name` string TAG, `value` double NOT NULL, `t` timestamp NOT NULL, TIMESTAMP KEY(t)) PARTITION BY KEY(name) PARTITIONS 2 ENGINE=Analytic with (enable_ttl="false")"#;
let stmt = Parser::parse_sql(sql).unwrap();
assert_eq!(stmt.len(), 1);
match &stmt[0] {
Statement::Create(v) => {
if let Some(Partition::Key(p)) = &v.partition {
assert!(!p.linear);
assert_eq!(&p.partition_key[0], "name");
assert_eq!(p.partition_num, 2);
} else {
panic!("failed");
};
}
_ => panic!("failed"),
}
}

fn default_key_partition() {
let sql = r#"CREATE TABLE `demo` (`name` string TAG, `value` double NOT NULL, `t` timestamp NOT NULL, TIMESTAMP KEY(t)) PARTITION BY KEY() PARTITIONS 2 ENGINE=Analytic with (enable_ttl="false")"#;
let stmt = Parser::parse_sql(sql);
assert_eq!(
stmt.err().unwrap(),
ParserError(
"Fail to parse partition key, err:sql parser error: Expected identifier, found: )".to_string()
)
);
}

fn invalid_column_type() {
let sql = r#"CREATE TABLE `demo` (`name` string TAG, `value` double NOT NULL, `t` timestamp NOT NULL, TIMESTAMP KEY(t)) PARTITION BY KEY(value) PARTITIONS 2 ENGINE=Analytic with (enable_ttl="false")"#;
let stmt = Parser::parse_sql(sql);

assert_eq!(
stmt.err().unwrap(),
ParserError(r#"partition key must be tag, key name:"value""#.to_string())
)
}
}
}
Loading

0 comments on commit 72d3542

Please sign in to comment.