diff --git a/common/ast/src/ast/statement.rs b/common/ast/src/ast/statement.rs index deafa43a61d4..e200ab6aef91 100644 --- a/common/ast/src/ast/statement.rs +++ b/common/ast/src/ast/statement.rs @@ -294,6 +294,7 @@ pub enum AlterDatabaseAction<'a> { #[derive(Debug, Clone, PartialEq)] pub enum AlterTableAction<'a> { RenameTable { new_table: Identifier<'a> }, + AlterClusterKey { cluster_by: Vec> }, // TODO(wuzhiguo): AddColumn etc } @@ -579,6 +580,10 @@ impl<'a> Display for Statement<'a> { AlterTableAction::RenameTable { new_table } => { write!(f, " RENAME TO {new_table}")?; } + AlterTableAction::AlterClusterKey { cluster_by } => { + write!(f, " CLUSTER BY ")?; + write_comma_separated_list(f, cluster_by)?; + } } } Statement::RenameTable { diff --git a/common/ast/src/parser/statement.rs b/common/ast/src/parser/statement.rs index ec133d1ebb80..0c12496e02f5 100644 --- a/common/ast/src/parser/statement.rs +++ b/common/ast/src/parser/statement.rs @@ -567,15 +567,23 @@ pub fn alter_database_action(i: Input) -> IResult { } pub fn alter_table_action(i: Input) -> IResult { - let mut rename_table = map( + let rename_table = map( rule! { RENAME ~ TO ~ #ident }, |(_, _, new_table)| AlterTableAction::RenameTable { new_table }, ); + let cluster_by = map( + rule! { + CLUSTER ~ ^BY ~ ^"(" ~ ^#comma_separated_list1(expr) ~ ^")" + }, + |(_, _, _, cluster_by, _)| AlterTableAction::AlterClusterKey { cluster_by }, + ); + rule!( #rename_table + | #cluster_by )(i) } diff --git a/common/meta/app/src/schema/table.rs b/common/meta/app/src/schema/table.rs index 3da4d3aef502..77afea85464c 100644 --- a/common/meta/app/src/schema/table.rs +++ b/common/meta/app/src/schema/table.rs @@ -174,7 +174,11 @@ pub struct TableMeta { pub engine: String, pub engine_options: BTreeMap, pub options: BTreeMap, - pub cluster_keys: Option, + pub cluster_key: Option, + // The vector of cluster keys. + pub cluster_keys: Vec, + // The default cluster keys id. + pub default_cluster_key_id: Option, pub created_on: DateTime, pub updated_on: DateTime, pub comment: String, @@ -237,7 +241,9 @@ impl Default for TableMeta { engine: "".to_string(), engine_options: BTreeMap::new(), options: BTreeMap::new(), - cluster_keys: None, + cluster_key: None, + cluster_keys: vec![], + default_cluster_key_id: None, created_on: Default::default(), updated_on: Default::default(), comment: "".to_string(), @@ -247,6 +253,19 @@ impl Default for TableMeta { } } +impl TableMeta { + pub fn push_cluster_key(mut self, cluster_key: String) -> Self { + self.cluster_keys.push(cluster_key.clone()); + self.cluster_key = Some(cluster_key); + self.default_cluster_key_id = Some(self.cluster_keys.len() as u32 - 1); + self + } + + pub fn cluster_key(&self) -> Option<(u32, String)> { + self.default_cluster_key_id.zip(self.cluster_key.clone()) + } +} + impl Display for TableMeta { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { write!( diff --git a/common/planners/src/lib.rs b/common/planners/src/lib.rs index 37ea7dd3b590..488165649083 100644 --- a/common/planners/src/lib.rs +++ b/common/planners/src/lib.rs @@ -14,6 +14,7 @@ mod plan_aggregator_final; mod plan_aggregator_partial; +mod plan_alter_cluster_key; mod plan_broadcast; mod plan_call; mod plan_copy; @@ -105,6 +106,7 @@ mod plan_view_drop; pub use plan_aggregator_final::AggregatorFinalPlan; pub use plan_aggregator_partial::AggregatorPartialPlan; +pub use plan_alter_cluster_key::AlterClusterKeyPlan; pub use plan_broadcast::BroadcastPlan; pub use plan_call::CallPlan; pub use plan_copy::CopyMode; diff --git a/common/planners/src/plan_alter_cluster_key.rs b/common/planners/src/plan_alter_cluster_key.rs new file mode 100644 index 000000000000..7a2e4730d012 --- /dev/null +++ b/common/planners/src/plan_alter_cluster_key.rs @@ -0,0 +1,35 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_datavalues::DataSchema; +use common_datavalues::DataSchemaRef; + +use crate::Expression; + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] +pub struct AlterClusterKeyPlan { + pub tenant: String, + pub catalog_name: String, + pub database_name: String, + pub table_name: String, + pub cluster_keys: Vec, +} + +impl AlterClusterKeyPlan { + pub fn schema(&self) -> DataSchemaRef { + Arc::new(DataSchema::empty()) + } +} diff --git a/common/planners/src/plan_node.rs b/common/planners/src/plan_node.rs index d0dddc170a94..07050d1e8c69 100644 --- a/common/planners/src/plan_node.rs +++ b/common/planners/src/plan_node.rs @@ -19,6 +19,7 @@ use common_datavalues::DataSchemaRef; use crate::plan_table_undrop::UnDropTablePlan; use crate::AggregatorFinalPlan; use crate::AggregatorPartialPlan; +use crate::AlterClusterKeyPlan; use crate::AlterUserPlan; use crate::AlterUserUDFPlan; use crate::AlterViewPlan; @@ -113,6 +114,9 @@ pub enum PlanNode { // List List(ListPlan), + // Alter. + AlterClusterKey(AlterClusterKeyPlan), + // Show. Show(ShowPlan), @@ -275,6 +279,9 @@ impl PlanNode { // Kill. PlanNode::Kill(v) => v.schema(), + + // Alter + PlanNode::AlterClusterKey(v) => v.schema(), } } @@ -376,6 +383,9 @@ impl PlanNode { // Kill. PlanNode::Kill(_) => "KillQuery", + + // Alter. + PlanNode::AlterClusterKey(_) => "AlterClusterKeyPlan", } } diff --git a/common/planners/src/plan_node_display_indent.rs b/common/planners/src/plan_node_display_indent.rs index 857b868cef1c..8d0ff04c4b5d 100644 --- a/common/planners/src/plan_node_display_indent.rs +++ b/common/planners/src/plan_node_display_indent.rs @@ -19,6 +19,7 @@ use common_datavalues::DataType; use crate::AggregatorFinalPlan; use crate::AggregatorPartialPlan; +use crate::AlterClusterKeyPlan; use crate::BroadcastPlan; use crate::CallPlan; use crate::CopyPlan; @@ -85,6 +86,7 @@ impl<'a> fmt::Display for PlanNodeIndentFormatDisplay<'a> { PlanNode::DropRole(plan) => Self::format_drop_role(f, plan), PlanNode::Copy(plan) => Self::format_copy(f, plan), PlanNode::Call(plan) => Self::format_call(f, plan), + PlanNode::AlterClusterKey(plan) => Self::format_alter_cluster_key(f, plan), _ => { let mut printed = true; @@ -356,4 +358,13 @@ impl<'a> PlanNodeIndentFormatDisplay<'a> { write!(f, "Call {:}", plan.name)?; write!(f, " args: {:?}", plan.args) } + + fn format_alter_cluster_key(f: &mut Formatter, plan: &AlterClusterKeyPlan) -> fmt::Result { + write!( + f, + "Alter table {:}.{:}", + plan.database_name, plan.table_name + )?; + write!(f, " cluster by {:?}", plan.cluster_keys) + } } diff --git a/common/planners/src/plan_node_rewriter.rs b/common/planners/src/plan_node_rewriter.rs index e9a16ee387e1..02fd80612323 100644 --- a/common/planners/src/plan_node_rewriter.rs +++ b/common/planners/src/plan_node_rewriter.rs @@ -26,6 +26,7 @@ use crate::plan_subqueries_set::SubQueriesSetPlan; use crate::plan_table_undrop::UnDropTablePlan; use crate::AggregatorFinalPlan; use crate::AggregatorPartialPlan; +use crate::AlterClusterKeyPlan; use crate::AlterUserPlan; use crate::AlterUserUDFPlan; use crate::AlterViewPlan; @@ -199,6 +200,9 @@ pub trait PlanRewriter: Sized { // Kill. PlanNode::Kill(plan) => self.rewrite_kill(plan), + + // Alter. + PlanNode::AlterClusterKey(plan) => self.rewrite_alter_cluster_key(plan), } } @@ -511,6 +515,10 @@ pub trait PlanRewriter: Sized { fn rewrite_alter_user_udf(&mut self, plan: &AlterUserUDFPlan) -> Result { Ok(PlanNode::AlterUserUDF(plan.clone())) } + + fn rewrite_alter_cluster_key(&mut self, plan: &AlterClusterKeyPlan) -> Result { + Ok(PlanNode::AlterClusterKey(plan.clone())) + } } pub struct RewriteHelper {} diff --git a/common/planners/src/plan_node_visitor.rs b/common/planners/src/plan_node_visitor.rs index 98506b4aaaee..c875af334e86 100644 --- a/common/planners/src/plan_node_visitor.rs +++ b/common/planners/src/plan_node_visitor.rs @@ -19,6 +19,7 @@ use crate::plan_subqueries_set::SubQueriesSetPlan; use crate::plan_table_undrop::UnDropTablePlan; use crate::AggregatorFinalPlan; use crate::AggregatorPartialPlan; +use crate::AlterClusterKeyPlan; use crate::AlterUserPlan; use crate::AlterUserUDFPlan; use crate::AlterViewPlan; @@ -211,6 +212,9 @@ pub trait PlanVisitor { // Kill. PlanNode::Kill(plan) => self.visit_kill_query(plan), + + // Alter. + PlanNode::AlterClusterKey(plan) => self.visit_alter_cluster_key(plan), } } @@ -476,4 +480,8 @@ pub trait PlanVisitor { fn visit_alter_user_udf(&mut self, _: &AlterUserUDFPlan) -> Result<()> { Ok(()) } + + fn visit_alter_cluster_key(&mut self, _: &AlterClusterKeyPlan) -> Result<()> { + Ok(()) + } } diff --git a/common/proto-conv/src/table_from_to_protobuf_impl.rs b/common/proto-conv/src/table_from_to_protobuf_impl.rs index 7b9d457346b3..f244ec4dab3a 100644 --- a/common/proto-conv/src/table_from_to_protobuf_impl.rs +++ b/common/proto-conv/src/table_from_to_protobuf_impl.rs @@ -130,7 +130,9 @@ impl FromToProto for mt::TableMeta { engine: p.engine, engine_options: p.engine_options, options: p.options, + cluster_key: p.cluster_key, cluster_keys: p.cluster_keys, + default_cluster_key_id: p.default_cluster_key_id, created_on: DateTime::::from_pb(p.created_on)?, updated_on: DateTime::::from_pb(p.updated_on)?, drop_on: match p.drop_on { @@ -155,7 +157,9 @@ impl FromToProto for mt::TableMeta { engine: self.engine.clone(), engine_options: self.engine_options.clone(), options: self.options.clone(), + cluster_key: self.cluster_key.clone(), cluster_keys: self.cluster_keys.clone(), + default_cluster_key_id: self.default_cluster_key_id, created_on: self.created_on.to_pb()?, updated_on: self.updated_on.to_pb()?, drop_on: match self.drop_on { diff --git a/common/proto-conv/tests/it/proto_conv.rs b/common/proto-conv/tests/it/proto_conv.rs index 91968241e5a4..161fa4eebcc5 100644 --- a/common/proto-conv/tests/it/proto_conv.rs +++ b/common/proto-conv/tests/it/proto_conv.rs @@ -108,7 +108,9 @@ fn new_table_info() -> mt::TableInfo { engine: "44".to_string(), engine_options: btreemap! {s("abc") => s("def")}, options: btreemap! {s("xyz") => s("foo")}, - cluster_keys: Some("(a + 2, b)".to_string()), + cluster_key: Some("(a + 2, b)".to_string()), + cluster_keys: vec!["(a + 2, b)".to_string()], + default_cluster_key_id: Some(0), created_on: Utc.ymd(2014, 11, 28).and_hms(12, 0, 9), updated_on: Utc.ymd(2014, 11, 29).and_hms(12, 0, 10), comment: s("table_comment"), @@ -329,7 +331,9 @@ fn test_load_old() -> anyhow::Result<()> { engine: "44".to_string(), engine_options: btreemap! {s("abc") => s("def")}, options: btreemap! {s("xyz") => s("foo")}, - cluster_keys: Some("(a + 2, b)".to_string()), + cluster_key: Some("(a + 2, b)".to_string()), + cluster_keys: vec![], + default_cluster_key_id: None, created_on: Utc.ymd(2014, 11, 28).and_hms(12, 0, 9), updated_on: Utc.ymd(2014, 11, 29).and_hms(12, 0, 10), comment: s("table_comment"), diff --git a/common/protos/proto/metadata.proto b/common/protos/proto/metadata.proto index aa5faf055a66..9be1a5ea1ce4 100644 --- a/common/protos/proto/metadata.proto +++ b/common/protos/proto/metadata.proto @@ -164,7 +164,13 @@ message TableMeta { map options = 5; // Keys to sort rows in table. - optional string cluster_keys = 9; + optional string cluster_key = 9; + + // The vector of cluster keys. + repeated string cluster_keys = 4; + + // The default cluster keys id. + optional uint32 default_cluster_key_id = 8; // The time table created. string created_on = 20; diff --git a/query/src/interpreters/interpreter_alter_cluster_key.rs b/query/src/interpreters/interpreter_alter_cluster_key.rs new file mode 100644 index 000000000000..ce26ffd9dae6 --- /dev/null +++ b/query/src/interpreters/interpreter_alter_cluster_key.rs @@ -0,0 +1,90 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_exception::Result; +use common_meta_types::GrantObject; +use common_meta_types::UserPrivilegeType; +use common_planners::validate_expression; +use common_planners::AlterClusterKeyPlan; +use common_streams::DataBlockStream; +use common_streams::SendableDataBlockStream; + +use super::Interpreter; +use super::InterpreterPtr; +use crate::sessions::QueryContext; + +pub struct AlterClusterKeyInterpreter { + ctx: Arc, + plan: AlterClusterKeyPlan, +} + +impl AlterClusterKeyInterpreter { + pub fn try_create(ctx: Arc, plan: AlterClusterKeyPlan) -> Result { + Ok(Arc::new(AlterClusterKeyInterpreter { ctx, plan })) + } +} + +#[async_trait::async_trait] +impl Interpreter for AlterClusterKeyInterpreter { + fn name(&self) -> &str { + "AlterClusterKeyInterpreter" + } + + async fn execute( + &self, + _input_stream: Option, + ) -> Result { + let plan = &self.plan; + self.ctx + .get_current_session() + .validate_privilege( + &GrantObject::Table( + plan.catalog_name.clone(), + plan.database_name.clone(), + plan.table_name.clone(), + ), + UserPrivilegeType::Alter, + ) + .await?; + + let tenant = self.ctx.get_tenant(); + let catalog = self.ctx.get_catalog(&plan.catalog_name)?; + + let table = catalog + .get_table(tenant.as_str(), &plan.database_name, &plan.table_name) + .await?; + + let schema = table.schema(); + let cluster_keys = plan.cluster_keys.clone(); + // Let's validate the expressions firstly. + // TODO(zhyass): Not all expressions are valid for cluster key. + for expr in cluster_keys.iter() { + validate_expression(expr, &schema)?; + } + + let cluster_key_vec: Vec = cluster_keys.iter().map(|e| e.column_name()).collect(); + let cluster_key_str = format!("({})", cluster_key_vec.join(", ")); + + table + .alter_cluster_keys(self.ctx.clone(), &self.plan.catalog_name, cluster_key_str) + .await?; + Ok(Box::pin(DataBlockStream::create( + self.plan.schema(), + None, + vec![], + ))) + } +} diff --git a/query/src/interpreters/interpreter_factory.rs b/query/src/interpreters/interpreter_factory.rs index 34b48241be73..217f3dc7ffc1 100644 --- a/query/src/interpreters/interpreter_factory.rs +++ b/query/src/interpreters/interpreter_factory.rs @@ -27,6 +27,7 @@ use super::ListInterpreter; use super::ShowStagesInterpreter; use crate::interpreters::interpreter_show_engines::ShowEnginesInterpreter; use crate::interpreters::interpreter_table_rename::RenameTableInterpreter; +use crate::interpreters::AlterClusterKeyInterpreter; use crate::interpreters::AlterUserInterpreter; use crate::interpreters::AlterUserUDFInterpreter; use crate::interpreters::CallInterpreter; @@ -178,6 +179,9 @@ impl InterpreterFactory { DescribeUserStageInterpreter::try_create(ctx_clone, v) } + // alter. + PlanNode::AlterClusterKey(v) => AlterClusterKeyInterpreter::try_create(ctx_clone, v), + // others PlanNode::List(v) => ListInterpreter::try_create(ctx_clone, v), PlanNode::UseDatabase(v) => UseDatabaseInterpreter::try_create(ctx_clone, v), diff --git a/query/src/interpreters/interpreter_factory_v2.rs b/query/src/interpreters/interpreter_factory_v2.rs index 716c0c328c81..8255fbbf852d 100644 --- a/query/src/interpreters/interpreter_factory_v2.rs +++ b/query/src/interpreters/interpreter_factory_v2.rs @@ -54,7 +54,7 @@ impl InterpreterFactoryV2 { ExplainInterpreterV2::try_create(ctx, *plan.clone(), kind.clone()) } Plan::CreateTable(create_table) => { - CreateTableInterpreter::try_create(ctx, create_table.clone()) + CreateTableInterpreter::try_create(ctx, *create_table.clone()) } }?; Ok(inner) diff --git a/query/src/interpreters/interpreter_table_create.rs b/query/src/interpreters/interpreter_table_create.rs index 15ca8370a05a..081435d68adc 100644 --- a/query/src/interpreters/interpreter_table_create.rs +++ b/query/src/interpreters/interpreter_table_create.rs @@ -95,7 +95,7 @@ impl Interpreter for CreateTableInterpreter { match engine_desc { Some(engine) => { - if !self.plan.cluster_keys.is_empty() && !engine.support_order_key { + if !self.plan.cluster_keys.is_empty() && !engine.support_cluster_key { return Err(ErrorCode::UnsupportedEngineParams(format!( "Unsupported cluster key for engine: {}", engine.engine_name diff --git a/query/src/interpreters/interpreter_table_show_create.rs b/query/src/interpreters/interpreter_table_show_create.rs index a1888b12ad92..8f4fd9816d01 100644 --- a/query/src/interpreters/interpreter_table_show_create.rs +++ b/query/src/interpreters/interpreter_table_show_create.rs @@ -94,8 +94,8 @@ impl Interpreter for ShowCreateTableInterpreter { table_create_sql.push_str(table_engine.as_str()); let table_info = table.get_table_info(); - if let Some(order_keys_str) = &table_info.meta.cluster_keys { - table_create_sql.push_str(format!(" CLUSTER BY {}", order_keys_str).as_str()); + if let Some((_, cluster_keys_str)) = table_info.meta.cluster_key() { + table_create_sql.push_str(format!(" CLUSTER BY {}", cluster_keys_str).as_str()); } table_create_sql.push_str({ diff --git a/query/src/interpreters/mod.rs b/query/src/interpreters/mod.rs index 08c6c2ca70b8..a70e485b8888 100644 --- a/query/src/interpreters/mod.rs +++ b/query/src/interpreters/mod.rs @@ -14,6 +14,7 @@ mod access; mod interpreter; +mod interpreter_alter_cluster_key; mod interpreter_call; mod interpreter_common; mod interpreter_copy; @@ -81,6 +82,7 @@ mod stream; pub use access::ManagementModeAccess; pub use interpreter::Interpreter; pub use interpreter::InterpreterPtr; +pub use interpreter_alter_cluster_key::AlterClusterKeyInterpreter; pub use interpreter_call::CallInterpreter; pub use interpreter_copy::CopyInterpreter; pub use interpreter_database_create::CreateDatabaseInterpreter; diff --git a/query/src/sql/parsers/parser_table.rs b/query/src/sql/parsers/parser_table.rs index d1bf06d3555a..73f4b2b9ddfd 100644 --- a/query/src/sql/parsers/parser_table.rs +++ b/query/src/sql/parsers/parser_table.rs @@ -129,20 +129,40 @@ impl<'a> DfParser<'a> { let if_exists = self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]); let table_name = self.parser.parse_object_name()?; - if self.parser.parse_keywords(&[Keyword::RENAME, Keyword::TO]) { - let new_table_name = self.parser.parse_object_name()?; + match self.parser.next_token() { + Token::Word(w) => match w.keyword { + Keyword::RENAME => { + self.parser.expect_keyword(Keyword::TO)?; + let new_table_name = self.parser.parse_object_name()?; + + let rename = DfAlterTable { + if_exists, + table_name, + action: AlterTableAction::RenameTable(new_table_name), + }; + + Ok(DfStatement::AlterTable(rename)) + } + Keyword::CLUSTER => { + self.parser.expect_keyword(Keyword::BY)?; - let rename = DfAlterTable { - if_exists, - table_name, - action: AlterTableAction::RenameTable(new_table_name), - }; + self.parser.expect_token(&Token::LParen)?; + let cluster_keys = self.parser.parse_comma_separated(Parser::parse_expr)?; + self.parser.expect_token(&Token::RParen)?; - Ok(DfStatement::AlterTable(rename)) - } else { - Err(ParserError::ParserError(String::from( - "Alter table only support rename for now!", - ))) + let cluster_by = DfAlterTable { + if_exists, + table_name, + action: AlterTableAction::AlterClusterKey(cluster_keys), + }; + + Ok(DfStatement::AlterTable(cluster_by)) + } + _ => Err(ParserError::ParserError(String::from( + "Unsupported alter table statement!", + ))), + }, + unexpected => self.expected("alter table statement", unexpected), } } diff --git a/query/src/sql/planner/binder/ddl/table.rs b/query/src/sql/planner/binder/ddl/table.rs index cab9352fa256..eb44ae48388f 100644 --- a/query/src/sql/planner/binder/ddl/table.rs +++ b/query/src/sql/planner/binder/ddl/table.rs @@ -212,8 +212,8 @@ impl<'a> Binder { cluster_keys.push(cluster_key.to_string()); } if !cluster_keys.is_empty() { - let order_keys_sql = format!("({})", cluster_keys.join(", ")); - meta.cluster_keys = Some(order_keys_sql); + let cluster_keys_sql = format!("({})", cluster_keys.join(", ")); + meta = meta.push_cluster_key(cluster_keys_sql); } let plan = CreateTablePlan { @@ -232,6 +232,6 @@ impl<'a> Binder { None }, }; - Ok(Plan::CreateTable(plan)) + Ok(Plan::CreateTable(Box::new(plan))) } } diff --git a/query/src/sql/planner/plans/mod.rs b/query/src/sql/planner/plans/mod.rs index fd367389dbe6..81d0eefc5294 100644 --- a/query/src/sql/planner/plans/mod.rs +++ b/query/src/sql/planner/plans/mod.rs @@ -69,5 +69,5 @@ pub enum Plan { }, // DDL - CreateTable(CreateTablePlan), + CreateTable(Box), } diff --git a/query/src/sql/statements/statement_alter_table.rs b/query/src/sql/statements/statement_alter_table.rs index bdb20aacf1d3..eddf5d8243cc 100644 --- a/query/src/sql/statements/statement_alter_table.rs +++ b/query/src/sql/statements/statement_alter_table.rs @@ -14,13 +14,17 @@ use std::sync::Arc; +use common_exception::ErrorCode; use common_exception::Result; +use common_planners::AlterClusterKeyPlan; use common_planners::PlanNode; use common_planners::RenameTableEntity; use common_planners::RenameTablePlan; use common_tracing::tracing; +use sqlparser::ast::Expr; use sqlparser::ast::ObjectName; +use super::analyzer_expr::ExpressionAnalyzer; use crate::sessions::QueryContext; use crate::sql::statements::AnalyzableStatement; use crate::sql::statements::AnalyzedResult; @@ -35,6 +39,7 @@ pub struct DfAlterTable { #[derive(Clone, Debug, PartialEq)] pub enum AlterTableAction { RenameTable(ObjectName), + AlterClusterKey(Vec), // TODO AddColumn etc. } @@ -65,6 +70,24 @@ impl AnalyzableStatement for DfAlterTable { PlanNode::RenameTable(RenameTablePlan { tenant, entities }), ))) } + AlterTableAction::AlterClusterKey(exprs) => { + let expression_analyzer = ExpressionAnalyzer::create(ctx); + let cluster_keys = exprs.iter().try_fold(vec![], |mut acc, k| { + let expr = expression_analyzer.analyze_sync(k)?; + acc.push(expr); + Ok::<_, ErrorCode>(acc) + })?; + + Ok(AnalyzedResult::SimpleQuery(Box::new( + PlanNode::AlterClusterKey(AlterClusterKeyPlan { + tenant, + catalog_name, + database_name, + table_name, + cluster_keys, + }), + ))) + } } } } diff --git a/query/src/sql/statements/statement_create_table.rs b/query/src/sql/statements/statement_create_table.rs index f51cd0e3b64d..f48da17705cc 100644 --- a/query/src/sql/statements/statement_create_table.rs +++ b/query/src/sql/statements/statement_create_table.rs @@ -99,14 +99,15 @@ impl AnalyzableStatement for DfCreateTable { let mut cluster_keys = vec![]; for k in self.cluster_keys.iter() { let expr = expression_analyzer.analyze_sync(k)?; + // TODO(zhyass): Not all expressions are valid for cluster key. validate_expression(&expr, &table_meta.schema)?; cluster_keys.push(expr); } if !cluster_keys.is_empty() { let cluster_keys: Vec = cluster_keys.iter().map(|e| e.column_name()).collect(); - let order_keys_sql = format!("({})", cluster_keys.join(", ")); - table_meta.cluster_keys = Some(order_keys_sql); + let cluster_keys_sql = format!("({})", cluster_keys.join(", ")); + table_meta = table_meta.push_cluster_key(cluster_keys_sql); } Ok(AnalyzedResult::SimpleQuery(Box::new( diff --git a/query/src/storages/fuse/fuse_table.rs b/query/src/storages/fuse/fuse_table.rs index 4608906f9a6f..604fa747bf1a 100644 --- a/query/src/storages/fuse/fuse_table.rs +++ b/query/src/storages/fuse/fuse_table.rs @@ -17,10 +17,13 @@ use std::any::Any; use std::convert::TryFrom; use std::sync::Arc; +use common_cache::Cache; use common_datablocks::DataBlock; use common_exception::ErrorCode; use common_exception::Result; use common_meta_app::schema::TableInfo; +use common_meta_app::schema::UpdateTableMetaReq; +use common_meta_types::MatchSeq; use common_planners::Expression; use common_planners::Extras; use common_planners::Partitions; @@ -30,6 +33,7 @@ use common_planners::TruncateTablePlan; use common_streams::SendableDataBlockStream; use common_tracing::tracing; use futures::StreamExt; +use uuid::Uuid; use crate::pipelines::new::NewPipeline; use crate::sessions::QueryContext; @@ -39,6 +43,8 @@ use crate::sql::OPT_KEY_LEGACY_SNAPSHOT_LOC; use crate::sql::OPT_KEY_SNAPSHOT_LOCATION; use crate::storages::fuse::io::MetaReaders; use crate::storages::fuse::io::TableMetaLocationGenerator; +use crate::storages::fuse::meta::ClusterKey; +use crate::storages::fuse::meta::Statistics as FuseStatistics; use crate::storages::fuse::meta::TableSnapshot; use crate::storages::fuse::meta::Versioned; use crate::storages::fuse::operations::AppendOperationLogEntry; @@ -54,6 +60,7 @@ pub struct FuseTable { pub(crate) meta_location_generator: TableMetaLocationGenerator, pub(crate) cluster_keys: Vec, + pub(crate) cluster_key_meta: Option, pub(crate) read_only: bool, } @@ -65,14 +72,16 @@ impl FuseTable { pub fn do_create(table_info: TableInfo, read_only: bool) -> Result> { let storage_prefix = Self::parse_storage_prefix(&table_info)?; + let cluster_key_meta = table_info.meta.cluster_key(); let mut cluster_keys = Vec::new(); - if let Some(order) = &table_info.meta.cluster_keys { + if let Some((_, order)) = &cluster_key_meta { cluster_keys = PlanParser::parse_exprs(order)?; } Ok(Box::new(FuseTable { table_info, cluster_keys, + cluster_key_meta, meta_location_generator: TableMetaLocationGenerator::with_prefix(storage_prefix), read_only, })) @@ -82,7 +91,7 @@ impl FuseTable { StorageDescription { engine_name: "FUSE".to_string(), comment: "FUSE Storage Engine".to_string(), - support_order_key: true, + support_cluster_key: true, } } @@ -186,6 +195,81 @@ impl Table for FuseTable { self.cluster_keys.clone() } + async fn alter_cluster_keys( + &self, + ctx: Arc, + catalog_name: &str, + cluster_key_str: String, + ) -> Result<()> { + let mut new_table_meta = self.get_table_info().meta.clone(); + new_table_meta = new_table_meta.push_cluster_key(cluster_key_str); + let cluster_key_meta = new_table_meta.cluster_key(); + let schema = self.schema().as_ref().clone(); + + let prev = self.read_table_snapshot(ctx.as_ref()).await?; + let prev_version = self.snapshot_format_version(); + let prev_timestamp = prev.as_ref().and_then(|v| v.timestamp); + let prev_snapshot_id = prev.as_ref().map(|v| (v.snapshot_id, prev_version)); + let (summary, segments) = if let Some(v) = prev { + (v.summary.clone(), v.segments.clone()) + } else { + (FuseStatistics::default(), vec![]) + }; + + let new_snapshot = TableSnapshot::new( + Uuid::new_v4(), + &prev_timestamp, + prev_snapshot_id, + schema, + summary, + segments, + cluster_key_meta, + ); + + let uuid = new_snapshot.snapshot_id; + let snapshot_loc = self + .meta_location_generator() + .snapshot_location_from_uuid(&uuid, TableSnapshot::VERSION)?; + let bytes = serde_json::to_vec(&new_snapshot)?; + let operator = ctx.get_storage_operator()?; + operator.object(&snapshot_loc).write(bytes).await?; + + // set new snapshot location + new_table_meta + .options + .insert(OPT_KEY_SNAPSHOT_LOCATION.to_owned(), snapshot_loc.clone()); + // remove legacy options + new_table_meta.options.remove(OPT_KEY_LEGACY_SNAPSHOT_LOC); + + let table_id = self.table_info.ident.table_id; + let table_version = self.table_info.ident.seq; + let req = UpdateTableMetaReq { + table_id, + seq: MatchSeq::Exact(table_version), + new_table_meta, + }; + + let catalog = ctx.get_catalog(catalog_name)?; + let result = catalog.update_table_meta(req).await; + match result { + Ok(_) => { + if let Some(snapshot_cache) = + ctx.get_storage_cache_manager().get_table_snapshot_cache() + { + let cache = &mut snapshot_cache.write().await; + cache.put(snapshot_loc, Arc::new(new_snapshot)); + } + Ok(()) + } + Err(e) => { + // commit snapshot to meta server failed, try to delete it. + // "major GC" will collect this, if deletion failure (even after DAL retried) + let _ = operator.object(&snapshot_loc).delete().await; + Err(e) + } + } + } + #[tracing::instrument(level = "debug", name = "fuse_table_read_partitions", skip(self, ctx), fields(ctx.id = ctx.get_id().as_str()))] async fn read_partitions( &self, diff --git a/query/src/storages/fuse/io/write/block_stream_writer.rs b/query/src/storages/fuse/io/write/block_stream_writer.rs index e2caa2426d7e..fa3ffdb77b62 100644 --- a/query/src/storages/fuse/io/write/block_stream_writer.rs +++ b/query/src/storages/fuse/io/write/block_stream_writer.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use common_arrow::parquet::FileMetaData; use common_datablocks::DataBlock; -use common_datavalues::DataSchemaRef; use common_exception::ErrorCode; use common_exception::Result; use common_planners::Expression; @@ -38,6 +37,7 @@ use crate::storages::fuse::meta::SegmentInfo; use crate::storages::fuse::meta::Statistics; use crate::storages::fuse::statistics::accumulator::BlockStatistics; use crate::storages::fuse::statistics::StatisticsAccumulator; +use crate::storages::index::ClusterKeyInfo; pub type SegmentInfoStream = std::pin::Pin> + Send>>; @@ -48,12 +48,7 @@ pub struct BlockStreamWriter { number_of_blocks_accumulated: usize, statistics_accumulator: Option, meta_locations: TableMetaLocationGenerator, - - data_schema: DataSchemaRef, - cluster_keys: Vec, - cluster_keys_index: Option>, - expression_executor: Option, - + cluster_key_info: Option, ctx: Arc, } @@ -64,8 +59,7 @@ impl BlockStreamWriter { row_per_block: usize, block_per_segment: usize, meta_locations: TableMetaLocationGenerator, - data_schema: DataSchemaRef, - cluster_keys: Vec, + cluster_key_info: Option, ) -> SegmentInfoStream { // filter out empty blocks let block_stream = @@ -81,13 +75,8 @@ impl BlockStreamWriter { // Write out the blocks. // And transform the stream of DataBlocks into Stream of SegmentInfo at the same time. - let block_writer = BlockStreamWriter::new( - block_per_segment, - meta_locations, - ctx, - data_schema, - cluster_keys, - ); + let block_writer = + BlockStreamWriter::new(block_per_segment, meta_locations, ctx, cluster_key_info); let segments = Self::transform(Box::pin(block_stream), block_writer); Box::pin(segments) @@ -97,8 +86,7 @@ impl BlockStreamWriter { num_block_threshold: usize, meta_locations: TableMetaLocationGenerator, ctx: Arc, - data_schema: DataSchemaRef, - cluster_keys: Vec, + cluster_key_info: Option, ) -> Self { let data_accessor = ctx.get_storage_operator().unwrap(); Self { @@ -107,10 +95,7 @@ impl BlockStreamWriter { number_of_blocks_accumulated: 0, statistics_accumulator: None, meta_locations, - data_schema, - cluster_keys, - cluster_keys_index: None, - expression_executor: None, + cluster_key_info, ctx, } } @@ -147,53 +132,59 @@ impl BlockStreamWriter { } async fn write_block(&mut self, data_block: DataBlock) -> Result> { - let input_schema = data_block.schema().clone(); - let cluster_keys_index = if let Some(cluster_keys_index) = &self.cluster_keys_index { - cluster_keys_index.clone() - } else { - let fields = input_schema.fields().clone(); - let index = self - .cluster_keys - .iter() - .map(|e| { - let cname = e.column_name(); - fields.iter().position(|f| f.name() == &cname).unwrap() - }) - .collect::>(); - self.cluster_keys_index = Some(index.clone()); - index - }; - - let cluster_stats = - BlockStatistics::clusters_statistics(cluster_keys_index, data_block.clone())?; - // Remove unused columns before serialize - let block = if self.data_schema != input_schema { - let executor = if let Some(executor) = &self.expression_executor { - executor.clone() - } else { - let exprs: Vec = input_schema - .fields() + let mut cluster_stats = None; + let mut block = data_block; + if let Some(v) = self.cluster_key_info.as_mut() { + let input_schema = block.schema().clone(); + let cluster_key_index = if v.cluster_key_index.is_empty() { + let fields = input_schema.fields().clone(); + let index = v + .exprs .iter() - .map(|f| Expression::Column(f.name().to_owned())) - .collect(); - - let executor = ExpressionExecutor::try_create( - self.ctx.clone(), - "remove unused columns", - input_schema, - self.data_schema.clone(), - exprs, - true, - )?; - executor.validate()?; - self.expression_executor = Some(executor.clone()); - executor + .map(|e| { + let cname = e.column_name(); + fields.iter().position(|f| f.name() == &cname).unwrap() + }) + .collect::>(); + v.cluster_key_index = index.clone(); + index + } else { + v.cluster_key_index.clone() }; - executor.execute(&data_block)? - } else { - data_block - }; + cluster_stats = BlockStatistics::clusters_statistics( + v.cluster_key_id, + cluster_key_index, + block.clone(), + )?; + + // Remove unused columns before serialize + if v.data_schema != input_schema { + let executor = if let Some(executor) = &v.expression_executor { + executor.clone() + } else { + let exprs: Vec = input_schema + .fields() + .iter() + .map(|f| Expression::Column(f.name().to_owned())) + .collect(); + + let executor = ExpressionExecutor::try_create( + self.ctx.clone(), + "remove unused columns", + input_schema, + v.data_schema.clone(), + exprs, + true, + )?; + executor.validate()?; + v.expression_executor = Some(executor.clone()); + executor + }; + + block = executor.execute(&block)?; + } + } let mut acc = self.statistics_accumulator.take().unwrap_or_default(); let partial_acc = acc.begin(&block, cluster_stats)?; @@ -207,14 +198,12 @@ impl BlockStreamWriter { self.number_of_blocks_accumulated += 1; if self.number_of_blocks_accumulated >= self.num_block_threshold { let summary = acc.summary()?; - let cluster_stats = acc.summary_clusters(); let seg = SegmentInfo::new(acc.blocks_metas, Statistics { row_count: acc.summary_row_count, block_count: acc.summary_block_count, uncompressed_byte_size: acc.in_memory_size, compressed_byte_size: acc.file_size, col_stats: summary, - cluster_stats, }); // Reset state @@ -306,14 +295,12 @@ impl Compactor for BlockStreamWriter { None => Ok(None), Some(acc) => { let summary = acc.summary()?; - let cluster_stats = acc.summary_clusters(); let seg = SegmentInfo::new(acc.blocks_metas, Statistics { row_count: acc.summary_row_count, block_count: acc.summary_block_count, uncompressed_byte_size: acc.in_memory_size, compressed_byte_size: acc.file_size, col_stats: summary, - cluster_stats, }); Ok(Some(seg)) } diff --git a/query/src/storages/fuse/meta/common.rs b/query/src/storages/fuse/meta/common.rs index b8a404e86b02..3a14e15b34b6 100644 --- a/query/src/storages/fuse/meta/common.rs +++ b/query/src/storages/fuse/meta/common.rs @@ -18,13 +18,13 @@ use serde::Deserialize; use serde::Serialize; use uuid::Uuid; -use crate::storages::index::ClusterStatistics; use crate::storages::index::ColumnStatistics; pub type ColumnId = u32; pub type FormatVersion = u64; pub type SnapshotId = Uuid; pub type Location = (String, FormatVersion); +pub type ClusterKey = (u32, String); #[derive(Serialize, Deserialize, Clone, Debug, Default)] pub struct Statistics { @@ -35,7 +35,6 @@ pub struct Statistics { pub compressed_byte_size: u64, pub col_stats: HashMap, - pub cluster_stats: Option, } /// Thing has a u64 version nubmer diff --git a/query/src/storages/fuse/meta/mod.rs b/query/src/storages/fuse/meta/mod.rs index 526f212c358a..3b5caab30075 100644 --- a/query/src/storages/fuse/meta/mod.rs +++ b/query/src/storages/fuse/meta/mod.rs @@ -21,6 +21,7 @@ mod v0; mod v1; mod versions; +pub use common::ClusterKey; pub use common::ColumnId; pub use common::Compression; pub use common::Location; diff --git a/query/src/storages/fuse/meta/v1/snapshot.rs b/query/src/storages/fuse/meta/v1/snapshot.rs index b65669242d0d..b2f1bd9acda4 100644 --- a/query/src/storages/fuse/meta/v1/snapshot.rs +++ b/query/src/storages/fuse/meta/v1/snapshot.rs @@ -20,6 +20,7 @@ use common_datavalues::DataSchema; use serde::Deserialize; use serde::Serialize; +use crate::storages::fuse::meta::common::ClusterKey; use crate::storages::fuse::meta::common::FormatVersion; use crate::storages::fuse::meta::common::Location; use crate::storages::fuse::meta::common::SnapshotId; @@ -51,6 +52,9 @@ pub struct TableSnapshot { /// We rely on background merge tasks to keep merging segments, so that /// this the size of this vector could be kept reasonable pub segments: Vec, + + // The metadata of the cluster keys. + pub cluster_key_meta: Option, } impl TableSnapshot { @@ -61,6 +65,7 @@ impl TableSnapshot { schema: DataSchema, summary: Statistics, segments: Vec, + cluster_key_meta: Option, ) -> Self { // timestamp of the snapshot should always larger than the previous one's let now = Utc::now(); @@ -80,6 +85,7 @@ impl TableSnapshot { schema, summary, segments, + cluster_key_meta, } } @@ -100,6 +106,7 @@ impl From for TableSnapshot { schema: s.schema, summary: s.summary, segments: s.segments.into_iter().map(|l| (l, 0)).collect(), + cluster_key_meta: None, } } } diff --git a/query/src/storages/fuse/operations/append.rs b/query/src/storages/fuse/operations/append.rs index 6b371ffbf53e..0e918a8d677b 100644 --- a/query/src/storages/fuse/operations/append.rs +++ b/query/src/storages/fuse/operations/append.rs @@ -42,6 +42,7 @@ use crate::storages::fuse::DEFAULT_BLOCK_PER_SEGMENT; use crate::storages::fuse::DEFAULT_ROW_PER_BLOCK; use crate::storages::fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT; use crate::storages::fuse::FUSE_OPT_KEY_ROW_PER_BLOCK; +use crate::storages::index::ClusterKeyInfo; pub type AppendOperationLogEntryStream = std::pin::Pin> + Send>>; @@ -60,14 +61,21 @@ impl FuseTable { let da = ctx.get_storage_operator()?; + let cluster_key_info = self.cluster_key_meta.clone().map(|(id, _)| ClusterKeyInfo { + cluster_key_id: id, + cluster_key_index: vec![], + exprs: self.cluster_keys.clone(), + expression_executor: None, + data_schema: self.table_info.schema(), + }); + let mut segment_stream = BlockStreamWriter::write_block_stream( ctx.clone(), stream, rows_per_block, block_per_seg, self.meta_location_generator().clone(), - self.table_info.schema().clone(), - self.cluster_keys.clone(), + cluster_key_info, ) .await; @@ -117,12 +125,12 @@ impl FuseTable { ) })?; - let mut cluster_keys_index = Vec::with_capacity(self.cluster_keys.len()); - let mut expression_executor = None; + let mut cluster_key_info = None; if !self.cluster_keys.is_empty() { let input_schema = self.table_info.schema(); let mut merged = input_schema.fields().clone(); + let mut cluster_key_index = Vec::with_capacity(self.cluster_keys.len()); for expr in &self.cluster_keys { let cname = expr.column_name(); let index = match merged.iter().position(|x| x.name() == &cname) { @@ -132,11 +140,12 @@ impl FuseTable { } Some(idx) => idx, }; - cluster_keys_index.push(index); + cluster_key_index.push(index); } let output_schema = DataSchemaRefExt::create(merged); + let mut expression_executor = None; if output_schema != input_schema { pipeline.add_transform(|transform_input_port, transform_output_port| { ExpressionTransform::try_create( @@ -186,6 +195,14 @@ impl FuseTable { sort_descs.clone(), ) })?; + + cluster_key_info = Some(ClusterKeyInfo { + cluster_key_id: self.cluster_key_meta.as_ref().unwrap().0, + cluster_key_index, + exprs: self.cluster_keys.clone(), + expression_executor, + data_schema: input_schema.clone(), + }); } let mut sink_pipeline_builder = SinkPipeBuilder::create(); @@ -199,8 +216,7 @@ impl FuseTable { block_per_seg, da.clone(), self.meta_location_generator().clone(), - cluster_keys_index.clone(), - expression_executor.clone(), + cluster_key_info.clone(), )?, ); } diff --git a/query/src/storages/fuse/operations/commit.rs b/query/src/storages/fuse/operations/commit.rs index bca0afbc2319..a646ee0b06f6 100644 --- a/query/src/storages/fuse/operations/commit.rs +++ b/query/src/storages/fuse/operations/commit.rs @@ -35,6 +35,7 @@ use uuid::Uuid; use crate::sessions::QueryContext; use crate::sql::OPT_KEY_LEGACY_SNAPSHOT_LOC; use crate::sql::OPT_KEY_SNAPSHOT_LOCATION; +use crate::storages::fuse::meta::ClusterKey; use crate::storages::fuse::meta::Location; use crate::storages::fuse::meta::SegmentInfo; use crate::storages::fuse::meta::Statistics; @@ -168,6 +169,7 @@ impl FuseTable { schema, summary, segments, + self.cluster_key_meta.clone(), ) } else { Self::merge_table_operations( @@ -176,6 +178,7 @@ impl FuseTable { prev_version, segments, summary, + self.cluster_key_meta.clone(), )? }; @@ -221,6 +224,7 @@ impl FuseTable { prev_version: u64, mut new_segments: Vec, statistics: Statistics, + cluster_key_meta: Option, ) -> Result { // 1. merge stats with previous snapshot, if any let stats = if let Some(snapshot) = &previous { @@ -245,6 +249,7 @@ impl FuseTable { schema.clone(), stats, new_segments, + cluster_key_meta, ); Ok(new_snapshot) } @@ -303,16 +308,10 @@ impl FuseTable { acc.block_count += stats.block_count; acc.uncompressed_byte_size += stats.uncompressed_byte_size; acc.compressed_byte_size += stats.compressed_byte_size; - (acc.col_stats, acc.cluster_stats) = if acc.col_stats.is_empty() { - (stats.col_stats.clone(), stats.cluster_stats.clone()) + acc.col_stats = if acc.col_stats.is_empty() { + stats.col_stats.clone() } else { - ( - statistics::reduce_block_stats(&[&acc.col_stats, &stats.col_stats])?, - statistics::reduce_cluster_stats(&[ - &acc.cluster_stats, - &stats.cluster_stats, - ]), - ) + statistics::reduce_block_stats(&[&acc.col_stats, &stats.col_stats])? }; seg_acc.push(loc.clone()); Ok::<_, ErrorCode>((acc, seg_acc)) diff --git a/query/src/storages/fuse/operations/fuse_sink.rs b/query/src/storages/fuse/operations/fuse_sink.rs index 675ddaa30e13..840d26e4de3a 100644 --- a/query/src/storages/fuse/operations/fuse_sink.rs +++ b/query/src/storages/fuse/operations/fuse_sink.rs @@ -27,7 +27,6 @@ use crate::pipelines::new::processors::port::InputPort; use crate::pipelines::new::processors::processor::Event; use crate::pipelines::new::processors::processor::ProcessorPtr; use crate::pipelines::new::processors::Processor; -use crate::pipelines::transforms::ExpressionExecutor; use crate::sessions::QueryContext; use crate::storages::fuse::io::serialize_data_blocks; use crate::storages::fuse::io::TableMetaLocationGenerator; @@ -35,6 +34,7 @@ use crate::storages::fuse::meta::SegmentInfo; use crate::storages::fuse::meta::Statistics; use crate::storages::fuse::statistics::accumulator::BlockStatistics; use crate::storages::fuse::statistics::StatisticsAccumulator; +use crate::storages::index::ClusterKeyInfo; enum State { None, @@ -62,8 +62,7 @@ pub struct FuseTableSink { num_block_threshold: u64, meta_locations: TableMetaLocationGenerator, accumulator: StatisticsAccumulator, - cluster_keys_index: Vec, - expression_executor: Option, + cluster_key_info: Option, } impl FuseTableSink { @@ -73,8 +72,7 @@ impl FuseTableSink { num_block_threshold: usize, data_accessor: Operator, meta_locations: TableMetaLocationGenerator, - cluster_keys_index: Vec, - expression_executor: Option, + cluster_key_info: Option, ) -> Result { Ok(ProcessorPtr::create(Box::new(FuseTableSink { ctx, @@ -84,8 +82,7 @@ impl FuseTableSink { state: State::None, accumulator: Default::default(), num_block_threshold: num_block_threshold as u64, - cluster_keys_index, - expression_executor, + cluster_key_info, }))) } } @@ -133,17 +130,20 @@ impl Processor for FuseTableSink { fn process(&mut self) -> Result<()> { match std::mem::replace(&mut self.state, State::None) { State::NeedSerialize(data_block) => { - let cluster_stats = BlockStatistics::clusters_statistics( - self.cluster_keys_index.clone(), - data_block.clone(), - )?; - - // Remove unused columns before serialize - let block = if let Some(executor) = &self.expression_executor { - executor.execute(&data_block)? - } else { - data_block - }; + let mut cluster_stats = None; + let mut block = data_block; + if let Some(v) = &self.cluster_key_info { + cluster_stats = BlockStatistics::clusters_statistics( + v.cluster_key_id, + v.cluster_key_index.clone(), + block.clone(), + )?; + + // Remove unused columns before serialize + if let Some(executor) = &v.expression_executor { + block = executor.execute(&block)?; + } + } let location = self.meta_locations.gen_block_location(); let block_statistics = BlockStatistics::from(&block, location, cluster_stats)?; @@ -162,7 +162,6 @@ impl Processor for FuseTableSink { State::GenerateSegment => { let acc = std::mem::take(&mut self.accumulator); let col_stats = acc.summary()?; - let cluster_stats = acc.summary_clusters(); let segment_info = SegmentInfo::new(acc.blocks_metas, Statistics { row_count: acc.summary_row_count, @@ -170,7 +169,6 @@ impl Processor for FuseTableSink { uncompressed_byte_size: acc.in_memory_size, compressed_byte_size: acc.file_size, col_stats, - cluster_stats, }); self.state = State::SerializedSegment { diff --git a/query/src/storages/fuse/operations/truncate.rs b/query/src/storages/fuse/operations/truncate.rs index 2b686d2f7345..8604f3480f77 100644 --- a/query/src/storages/fuse/operations/truncate.rs +++ b/query/src/storages/fuse/operations/truncate.rs @@ -41,6 +41,7 @@ impl FuseTable { prev_snapshot.schema.clone(), Default::default(), vec![], + self.cluster_key_meta.clone(), ); let loc = self.meta_location_generator(); let new_snapshot_loc = diff --git a/query/src/storages/fuse/statistics/accumulator.rs b/query/src/storages/fuse/statistics/accumulator.rs index 57f12ab073f6..2c177cd50461 100644 --- a/query/src/storages/fuse/statistics/accumulator.rs +++ b/query/src/storages/fuse/statistics/accumulator.rs @@ -35,7 +35,6 @@ use crate::storages::index::ColumnsStatistics; pub struct StatisticsAccumulator { pub blocks_metas: Vec, pub blocks_statistics: Vec, - pub cluster_statistics: Vec>, pub summary_row_count: u64, pub summary_block_count: u64, pub in_memory_size: u64, @@ -60,7 +59,6 @@ impl StatisticsAccumulator { self.in_memory_size += block_in_memory_size; let block_stats = Self::acc_columns(block)?; self.blocks_statistics.push(block_stats.clone()); - self.cluster_statistics.push(cluster_stats.clone()); Ok(PartiallyAccumulated { accumulator: self, block_row_count: block.num_rows() as u64, @@ -82,8 +80,6 @@ impl StatisticsAccumulator { self.summary_row_count += statistics.block_rows_size; self.blocks_statistics .push(statistics.block_column_statistics.clone()); - self.cluster_statistics - .push(statistics.block_cluster_statistics.clone()); self.blocks_metas.push(BlockMeta { file_size, @@ -147,10 +143,6 @@ impl StatisticsAccumulator { super::reduce_block_stats(&self.blocks_statistics) } - pub fn summary_clusters(&self) -> Option { - super::reduce_cluster_stats(&self.cluster_statistics) - } - pub fn acc_columns(data_block: &DataBlock) -> common_exception::Result { let mut statistics = ColumnsStatistics::new(); @@ -290,22 +282,27 @@ impl BlockStatistics { } pub fn clusters_statistics( - cluster_keys: Vec, + cluster_key_id: u32, + cluster_key_index: Vec, block: DataBlock, ) -> Result> { - if cluster_keys.is_empty() { + if cluster_key_index.is_empty() { return Ok(None); } - let mut min = Vec::with_capacity(cluster_keys.len()); - let mut max = Vec::with_capacity(cluster_keys.len()); + let mut min = Vec::with_capacity(cluster_key_index.len()); + let mut max = Vec::with_capacity(cluster_key_index.len()); - for key in cluster_keys.iter() { + for key in cluster_key_index.iter() { let col = block.column(*key); min.push(col.get_checked(0)?); max.push(col.get_checked(col.len() - 1)?); } - Ok(Some(ClusterStatistics { min, max })) + Ok(Some(ClusterStatistics { + cluster_key_id, + min, + max, + })) } } diff --git a/query/src/storages/fuse/statistics/mod.rs b/query/src/storages/fuse/statistics/mod.rs index 66b60399c1e7..7b54746bf2cf 100644 --- a/query/src/storages/fuse/statistics/mod.rs +++ b/query/src/storages/fuse/statistics/mod.rs @@ -19,4 +19,3 @@ pub use accumulator::PartiallyAccumulated; pub use accumulator::StatisticsAccumulator; pub use reducers::merge_statistics; pub use reducers::reduce_block_stats; -pub use reducers::reduce_cluster_stats; diff --git a/query/src/storages/fuse/statistics/reducers.rs b/query/src/storages/fuse/statistics/reducers.rs index fe58c7baacdd..e3a6e6e8c47f 100644 --- a/query/src/storages/fuse/statistics/reducers.rs +++ b/query/src/storages/fuse/statistics/reducers.rs @@ -14,7 +14,6 @@ // use std::borrow::Borrow; -use std::cmp::Ordering; use std::collections::hash_map::Entry; use std::collections::HashMap; @@ -23,7 +22,6 @@ use common_exception::Result; use crate::storages::fuse::meta::ColumnId; use crate::storages::fuse::meta::Statistics; -use crate::storages::index::ClusterStatistics; use crate::storages::index::ColumnStatistics; use crate::storages::index::ColumnsStatistics; @@ -94,43 +92,6 @@ pub fn reduce_block_stats>(stats: &[T]) -> Result>>( - stats: &[T], -) -> Option { - if stats.iter().any(|s| s.borrow().is_none()) { - return None; - } - - let stat = stats[0].borrow().clone().unwrap(); - let mut min = stat.min.clone(); - let mut max = stat.max; - for stat in stats.iter().skip(1) { - let stat = stat.borrow().clone().unwrap(); - for (l, r) in min.iter().zip(stat.min.iter()) { - match l.cmp(r) { - Ordering::Equal => continue, - Ordering::Less => break, - Ordering::Greater => { - min = stat.min.clone(); - break; - } - } - } - - for (l, r) in max.iter().zip(stat.max.iter()) { - match l.cmp(r) { - Ordering::Equal => continue, - Ordering::Less => { - max = stat.max.clone(); - break; - } - Ordering::Greater => break, - } - } - } - Some(ClusterStatistics { min, max }) -} - pub fn merge_statistics(l: &Statistics, r: &Statistics) -> Result { let s = Statistics { row_count: l.row_count + r.row_count, @@ -138,7 +99,6 @@ pub fn merge_statistics(l: &Statistics, r: &Statistics) -> Result { uncompressed_byte_size: l.uncompressed_byte_size + r.uncompressed_byte_size, compressed_byte_size: l.compressed_byte_size + r.compressed_byte_size, col_stats: reduce_block_stats(&[&l.col_stats, &r.col_stats])?, - cluster_stats: reduce_cluster_stats(&[&l.cluster_stats, &r.cluster_stats]), }; Ok(s) } diff --git a/query/src/storages/fuse/table_functions/clustering_informations/clustering_information.rs b/query/src/storages/fuse/table_functions/clustering_informations/clustering_information.rs index 4173150b31cc..8db78b42f042 100644 --- a/query/src/storages/fuse/table_functions/clustering_informations/clustering_information.rs +++ b/query/src/storages/fuse/table_functions/clustering_informations/clustering_information.rs @@ -96,6 +96,12 @@ impl<'a> ClusteringInformation<'a> { return Err(ErrorCode::UnImplement("Unimplement error")); } + let cluster_key_id = block.cluster_stats.clone().unwrap().cluster_key_id; + let default_cluster_key_id = self.table.cluster_key_meta.clone().unwrap().0; + if cluster_key_id != default_cluster_key_id { + return Err(ErrorCode::UnImplement("Unimplement error")); + } + let cluster_stats = block.cluster_stats.clone().unwrap(); Ok((cluster_stats.min, cluster_stats.max)) } diff --git a/query/src/storages/index/cluster_key.rs b/query/src/storages/index/cluster_key.rs deleted file mode 100644 index cc86ad85a9eb..000000000000 --- a/query/src/storages/index/cluster_key.rs +++ /dev/null @@ -1,166 +0,0 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::cmp; -use std::collections::BTreeMap; -use std::collections::HashMap; -use std::collections::HashSet; - -use common_arrow::arrow::compute::sort as arrow_sort; -use common_datablocks::DataBlock; -use common_datavalues::prelude::*; -use common_exception::ErrorCode; -use common_exception::Result; -use common_planners::Expression; -use common_planners::ExpressionMonotonicityVisitor; -use common_planners::RequireColumnsVisitor; -use itertools::Itertools; -use serde_json::json; - -use crate::storages::fuse::meta::BlockMeta; -use crate::storages::index::range_filter::check_maybe_monotonic; - -#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] -pub struct ClusterStatistics { - pub min: Vec, - pub max: Vec, -} - -#[derive(Clone)] -pub struct ClusteringInformationExecutor { - blocks: Vec, - // (start, end). - points_map: BTreeMap, (Vec, Vec)>, - const_block_count: usize, -} - -pub struct ClusteringInformation { - pub total_block_count: u64, - pub total_constant_block_count: u64, - pub average_overlaps: f64, - pub average_depth: f64, - pub block_depth_histogram: VariantValue, -} - -impl ClusteringInformationExecutor { - pub fn create_by_cluster(blocks: Vec) -> Result { - let mut points_map: BTreeMap, (Vec, Vec)> = BTreeMap::new(); - let mut const_block_count = 0; - for (i, block) in blocks.iter().enumerate() { - // Todo(zhyass): if cluster_stats is none. - let min = block.cluster_stats.clone().unwrap().min; - let max = block.cluster_stats.clone().unwrap().max; - if min.eq(&max) { - const_block_count += 1; - } - - points_map - .entry(min.clone()) - .and_modify(|v| v.0.push(i)) - .or_insert((vec![i], vec![])); - - points_map - .entry(max.clone()) - .and_modify(|v| v.1.push(i)) - .or_insert((vec![], vec![i])); - } - - Ok(ClusteringInformationExecutor { - blocks, - points_map, - const_block_count, - }) - } - - pub fn execute(&self) -> Result { - if self.blocks.is_empty() { - return Ok(ClusteringInformation { - total_block_count: 0, - total_constant_block_count: 0, - average_overlaps: 0.0, - average_depth: 0.0, - block_depth_histogram: VariantValue::from(json!(null)), - }); - } - - let mut statis = Vec::new(); - let mut unfinished_parts: HashMap = HashMap::new(); - for (key, (start, end)) in &self.points_map { - let point_depth = unfinished_parts.len() + start.len(); - - for (_, val) in unfinished_parts.iter_mut() { - val.0 += start.len(); - val.1 = cmp::max(val.1, point_depth); - } - - start.iter().for_each(|&idx| { - unfinished_parts.insert(idx, (point_depth - 1, point_depth)); - }); - - end.iter().for_each(|&idx| { - let stat = unfinished_parts.remove(&idx).unwrap(); - statis.push(stat); - }); - } - assert_eq!(unfinished_parts.len(), 0); - - let mut sum_overlap = 0; - let mut sum_depth = 0; - let length = statis.len(); - let mp = statis - .into_iter() - .fold(BTreeMap::new(), |mut acc, (overlap, depth)| { - sum_overlap += overlap; - sum_depth += depth; - - let bucket = get_buckets(depth); - acc.entry(bucket).and_modify(|v| *v += 1).or_insert(1u32); - acc - }); - // round the float to 4 decimal places. - let average_depth = (10000.0 * sum_depth as f64 / length as f64).round() / 10000.0; - let average_overlaps = (10000.0 * sum_overlap as f64 / length as f64).round() / 10000.0; - - let objects = mp.iter().fold( - serde_json::Map::with_capacity(mp.len()), - |mut acc, (bucket, count)| { - acc.insert(format!("{:05}", bucket), json!(count)); - acc - }, - ); - let block_depth_histogram = VariantValue::from(serde_json::Value::Object(objects)); - - Ok(ClusteringInformation { - total_block_count: self.blocks.len() as u64, - total_constant_block_count: self.const_block_count as u64, - average_overlaps, - average_depth, - block_depth_histogram, - }) - } -} - -fn get_buckets(val: usize) -> u32 { - let mut val = val as u32; - if val <= 16 || val & (val - 1) == 0 { - return val; - } - - val |= val >> 1; - val |= val >> 2; - val |= val >> 4; - val |= val >> 8; - val |= val >> 16; - val + 1 -} diff --git a/query/src/storages/index/mod.rs b/query/src/storages/index/mod.rs index c645b6c98b3e..024d36ca3375 100644 --- a/query/src/storages/index/mod.rs +++ b/query/src/storages/index/mod.rs @@ -23,6 +23,7 @@ pub use bloom_filter::BloomFilterIndexer; pub use index_min_max::MinMaxIndex; pub use index_sparse::SparseIndex; pub use index_sparse::SparseIndexValue; +pub use range_filter::ClusterKeyInfo; pub use range_filter::ClusterStatistics; pub use range_filter::ColumnStatistics; pub use range_filter::ColumnsStatistics; diff --git a/query/src/storages/index/range_filter.rs b/query/src/storages/index/range_filter.rs index b39b7dafdf46..67b0fe717d09 100644 --- a/query/src/storages/index/range_filter.rs +++ b/query/src/storages/index/range_filter.rs @@ -45,10 +45,25 @@ pub struct ColumnStatistics { #[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] pub struct ClusterStatistics { + #[serde(default = "default_cluster_key_id")] + pub cluster_key_id: u32, pub min: Vec, pub max: Vec, } +fn default_cluster_key_id() -> u32 { + 0 +} + +#[derive(Clone)] +pub struct ClusterKeyInfo { + pub cluster_key_id: u32, + pub cluster_key_index: Vec, + pub exprs: Vec, + pub expression_executor: Option, + pub data_schema: DataSchemaRef, +} + #[derive(Debug, Clone)] pub struct RangeFilter { origin: DataSchemaRef, diff --git a/query/src/storages/result/result_table_sink.rs b/query/src/storages/result/result_table_sink.rs index 6091f0391aad..14d8df72e008 100644 --- a/query/src/storages/result/result_table_sink.rs +++ b/query/src/storages/result/result_table_sink.rs @@ -125,7 +125,6 @@ impl ResultTableSink { uncompressed_byte_size: acc.in_memory_size, compressed_byte_size: acc.file_size, col_stats, - cluster_stats: None, }); let meta = ResultTableMeta { diff --git a/query/src/storages/result/writer.rs b/query/src/storages/result/writer.rs index c90eb746e7a0..9e80dbdd5916 100644 --- a/query/src/storages/result/writer.rs +++ b/query/src/storages/result/writer.rs @@ -77,7 +77,6 @@ impl ResultTableWriter { uncompressed_byte_size: acc.in_memory_size, compressed_byte_size: acc.file_size, col_stats, - cluster_stats: None, }); let meta = ResultTableMeta { diff --git a/query/src/storages/storage_factory.rs b/query/src/storages/storage_factory.rs index 8048d65771f6..2023ad0080a5 100644 --- a/query/src/storages/storage_factory.rs +++ b/query/src/storages/storage_factory.rs @@ -47,7 +47,7 @@ where pub struct StorageDescription { pub engine_name: String, pub comment: String, - pub support_order_key: bool, + pub support_cluster_key: bool, } pub trait StorageDescriptor: Send + Sync { diff --git a/query/src/storages/storage_table.rs b/query/src/storages/storage_table.rs index f248a6374f9e..0fcf77ae9d50 100644 --- a/query/src/storages/storage_table.rs +++ b/query/src/storages/storage_table.rs @@ -77,6 +77,18 @@ pub trait Table: Sync + Send { vec![] } + async fn alter_cluster_keys( + &self, + _ctx: Arc, + _catalog_name: &str, + _cluster_key_str: String, + ) -> Result<()> { + Err(ErrorCode::UnsupportedEngineParams(format!( + "Unsupported clustering keys for engine: {}", + self.engine() + ))) + } + // defaults to generate one single part and empty statistics async fn read_partitions( &self, diff --git a/query/tests/it/interpreters/interpreter_alter_cluster_key.rs b/query/tests/it/interpreters/interpreter_alter_cluster_key.rs new file mode 100644 index 000000000000..316a6eacc45b --- /dev/null +++ b/query/tests/it/interpreters/interpreter_alter_cluster_key.rs @@ -0,0 +1,51 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_base::base::tokio; +use common_exception::Result; +use databend_query::interpreters::*; +use databend_query::sql::PlanParser; +use futures::TryStreamExt; +use pretty_assertions::assert_eq; + +#[tokio::test] +async fn test_alter_cluster_key_interpreter() -> Result<()> { + let ctx = crate::tests::create_query_context().await?; + + // Create table. + { + let query = "\ + CREATE TABLE default.a(\ + a bigint, b int, c varchar(255), d smallint, e Date\ + ) Engine = Fuse\ + "; + + let plan = PlanParser::parse(ctx.clone(), query).await?; + let executor = InterpreterFactory::get(ctx.clone(), plan.clone())?; + let _ = executor.execute(None).await?; + } + + // Add cluster key. + { + let plan = PlanParser::parse(ctx.clone(), "Alter TABLE a CLUSTER BY(a, b)").await?; + let executor = InterpreterFactory::get(ctx.clone(), plan.clone())?; + assert_eq!(executor.name(), "AlterClusterKeyInterpreter"); + let stream = executor.execute(None).await?; + let result = stream.try_collect::>().await?; + let expected = vec!["++", "++"]; + common_datablocks::assert_blocks_sorted_eq(expected, result.as_slice()); + } + + Ok(()) +} diff --git a/query/tests/it/interpreters/mod.rs b/query/tests/it/interpreters/mod.rs index b41f1513cd80..5df1af18e53e 100644 --- a/query/tests/it/interpreters/mod.rs +++ b/query/tests/it/interpreters/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. mod access; +mod interpreter_alter_cluster_key; mod interpreter_call; mod interpreter_database_create; mod interpreter_database_drop; diff --git a/query/tests/it/sql/parsers/parser_table.rs b/query/tests/it/sql/parsers/parser_table.rs index df51239fe026..f82b4ad05d58 100644 --- a/query/tests/it/sql/parsers/parser_table.rs +++ b/query/tests/it/sql/parsers/parser_table.rs @@ -304,3 +304,20 @@ fn truncate_table() -> Result<()> { Ok(()) } + +#[test] +fn alter_cluster_key() -> Result<()> { + { + let sql = "ALTER TABLE t1 CLUSTER BY (a, b)"; + let expected = DfStatement::AlterTable(DfAlterTable { + if_exists: false, + table_name: ObjectName(vec![Ident::new("t1")]), + action: AlterTableAction::AlterClusterKey(vec![ + Expr::Identifier(Ident::new("a")), + Expr::Identifier(Ident::new("b")), + ]), + }); + expect_parse_ok(sql, expected)?; + } + Ok(()) +} diff --git a/query/tests/it/storages/fuse/io.rs b/query/tests/it/storages/fuse/io.rs index 8c3bf8776db4..a34aeb900db2 100644 --- a/query/tests/it/storages/fuse/io.rs +++ b/query/tests/it/storages/fuse/io.rs @@ -47,8 +47,7 @@ async fn test_fuse_table_block_appender() -> Result<()> { DEFAULT_BLOCK_PER_SEGMENT, 0, locs.clone(), - schema.clone(), - vec![], + None, ) .await .collect::>() @@ -65,7 +64,7 @@ async fn test_fuse_table_block_appender() -> Result<()> { let number_of_blocks = 30; let max_rows_per_block = 3; let max_blocks_per_segment = 1; - let block = DataBlock::create(schema.clone(), vec![Series::from_data(vec![1, 2, 3])]); + let block = DataBlock::create(schema, vec![Series::from_data(vec![1, 2, 3])]); let blocks = std::iter::repeat(Ok(block)).take(number_of_blocks); let block_stream = futures::stream::iter(blocks); @@ -75,8 +74,7 @@ async fn test_fuse_table_block_appender() -> Result<()> { max_rows_per_block, max_blocks_per_segment, locs.clone(), - schema.clone(), - vec![], + None, ) .await .collect::>() @@ -96,8 +94,7 @@ async fn test_fuse_table_block_appender() -> Result<()> { DEFAULT_BLOCK_PER_SEGMENT, 0, locs, - schema, - vec![], + None, ) .await .collect::>() @@ -266,8 +263,7 @@ async fn test_block_stream_writer() -> Result<()> { max_rows_per_block, max_blocks_per_segment, locs, - DataSchemaRefExt::create(vec![DataField::new("a", i32::to_data_type())]), - vec![], + None, ) .await; let segs = stream.try_collect::>().await?; diff --git a/query/tests/it/storages/fuse/meta/snapshot.rs b/query/tests/it/storages/fuse/meta/snapshot.rs index bb61b4aa3fdf..dcdb5a190d28 100644 --- a/query/tests/it/storages/fuse/meta/snapshot.rs +++ b/query/tests/it/storages/fuse/meta/snapshot.rs @@ -22,7 +22,7 @@ fn default_snapshot() -> TableSnapshot { let uuid = Uuid::new_v4(); let schema = DataSchema::empty(); let stats = Default::default(); - TableSnapshot::new(uuid, &None, None, schema, stats, vec![]) + TableSnapshot::new(uuid, &None, None, schema, stats, vec![], None) } #[test] @@ -43,6 +43,7 @@ fn snapshot_timestamp_monotonic_increase() { schema, Default::default(), vec![], + None, ); let current_ts = current.timestamp.unwrap(); let prev_ts = prev.timestamp.unwrap(); @@ -65,6 +66,7 @@ fn snapshot_timestamp_time_skew_tolerance() { schema, Default::default(), vec![], + None, ); let current_ts = current.timestamp.unwrap(); let prev_ts = prev.timestamp.unwrap(); diff --git a/query/tests/it/storages/fuse/table.rs b/query/tests/it/storages/fuse/table.rs index ce5eb1fb7eb1..f6eae8318569 100644 --- a/query/tests/it/storages/fuse/table.rs +++ b/query/tests/it/storages/fuse/table.rs @@ -18,14 +18,21 @@ use std::default::Default; use common_base::base::tokio; use common_exception::Result; use common_meta_app::schema::TableInfo; +use common_meta_app::schema::TableMeta; +use common_planners::col; +use common_planners::AlterClusterKeyPlan; +use common_planners::CreateTablePlan; use common_planners::ReadDataSourcePlan; use common_planners::SourceInfo; use common_planners::TruncateTablePlan; use databend_query::catalogs::CATALOG_DEFAULT; +use databend_query::interpreters::AlterClusterKeyInterpreter; use databend_query::interpreters::CreateTableInterpreter; use databend_query::interpreters::InterpreterFactory; use databend_query::sql::PlanParser; use databend_query::sql::OPT_KEY_DATABASE_ID; +use databend_query::sql::OPT_KEY_SNAPSHOT_LOCATION; +use databend_query::storages::fuse::io::MetaReaders; use databend_query::storages::fuse::FuseTable; use databend_query::storages::ToReadDataSourcePlan; use futures::TryStreamExt; @@ -282,6 +289,66 @@ async fn test_fuse_table_optimize() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_fuse_alter_cluster_key() -> Result<()> { + let fixture = TestFixture::new().await; + let ctx = fixture.ctx(); + + let create_table_plan = CreateTablePlan { + if_not_exists: false, + tenant: fixture.default_tenant(), + catalog: fixture.default_catalog_name(), + db: fixture.default_db_name(), + table: fixture.default_table_name(), + table_meta: TableMeta { + schema: TestFixture::default_schema(), + engine: "FUSE".to_string(), + options: [ + // database id is required for FUSE + (OPT_KEY_DATABASE_ID.to_owned(), "1".to_owned()), + ] + .into(), + cluster_keys: vec![], + default_cluster_key_id: None, + ..Default::default() + }, + as_select: None, + cluster_keys: vec![], + }; + + // create test table + let interpreter = CreateTableInterpreter::try_create(ctx.clone(), create_table_plan)?; + interpreter.execute(None).await?; + + // add cluster key + let alter_cluster_key_plan = AlterClusterKeyPlan { + tenant: fixture.default_tenant(), + catalog_name: fixture.default_catalog_name(), + database_name: fixture.default_db_name(), + table_name: fixture.default_table_name(), + cluster_keys: vec![col("id")], + }; + let interpreter = AlterClusterKeyInterpreter::try_create(ctx.clone(), alter_cluster_key_plan)?; + interpreter.execute(None).await?; + + let table = fixture.latest_default_table().await?; + let table_info = table.get_table_info(); + assert_eq!(table_info.meta.cluster_keys, vec!["(id)".to_string()]); + assert_eq!(table_info.meta.default_cluster_key_id, Some(0)); + + let snapshot_loc = table + .get_table_info() + .options() + .get(OPT_KEY_SNAPSHOT_LOCATION) + .unwrap(); + let reader = MetaReaders::table_snapshot_reader(ctx.as_ref()); + let snapshot = reader.read(snapshot_loc.as_str(), None, 1).await?; + let expected = Some((0, "(id)".to_string())); + assert_eq!(snapshot.cluster_key_meta, expected); + + Ok(()) +} + #[test] fn test_parse_storage_prefix() -> Result<()> { let mut tbl_info = TableInfo::default(); diff --git a/query/tests/it/storages/fuse/table_test_fixture.rs b/query/tests/it/storages/fuse/table_test_fixture.rs index a3ce85b0e0ef..0d206183c4a6 100644 --- a/query/tests/it/storages/fuse/table_test_fixture.rs +++ b/query/tests/it/storages/fuse/table_test_fixture.rs @@ -134,7 +134,9 @@ impl TestFixture { (OPT_KEY_DATABASE_ID.to_owned(), "1".to_owned()), ] .into(), - cluster_keys: Some("(id)".to_string()), + cluster_key: Some("(id)".to_string()), + cluster_keys: vec!["(id)".to_string()], + default_cluster_key_id: Some(0), ..Default::default() }, as_select: None, diff --git a/tests/suites/0_stateless/09_fuse_engine/09_0015_remote_alter_cluster_key.result b/tests/suites/0_stateless/09_fuse_engine/09_0015_remote_alter_cluster_key.result new file mode 100644 index 000000000000..f0d1baf04585 --- /dev/null +++ b/tests/suites/0_stateless/09_fuse_engine/09_0015_remote_alter_cluster_key.result @@ -0,0 +1,6 @@ +1 1 +2 1 +0 3 +1 3 +4 4 +(b, a) 3 1 0.6667 1.6667 {"00001":1,"00002":2} diff --git a/tests/suites/0_stateless/09_fuse_engine/09_0015_remote_alter_cluster_key.sql b/tests/suites/0_stateless/09_fuse_engine/09_0015_remote_alter_cluster_key.sql new file mode 100644 index 000000000000..2c79a74d5faf --- /dev/null +++ b/tests/suites/0_stateless/09_fuse_engine/09_0015_remote_alter_cluster_key.sql @@ -0,0 +1,21 @@ +DROP DATABASE IF EXISTS db1; +CREATE DATABASE db1; +USE db1; + +-- Create table t09_0015 +CREATE TABLE IF NOT EXISTS t09_0015(a int, b int); + +-- Alter table add cluster key. +ALTER TABLE t09_0015 CLUSTER BY(b,a); + +INSERT INTO t09_0015 VALUES(0,3),(1,1); +INSERT INTO t09_0015 VALUES(1,3),(2,1); +INSERT INTO t09_0015 VALUES(4,4); + +SELECT * FROM t09_0015 ORDER BY b,a; + +select * from clustering_information('db1','t09_0015'); + +-- Drop table. +DROP TABLE t09_0015; +DROP DATABASE db1;