diff --git a/Makefile b/Makefile index 25092ef1473c4..01f335fd1c95e 100644 --- a/Makefile +++ b/Makefile @@ -17,7 +17,7 @@ fmt: cargo fmt lint: - cargo fmt + cargo fmt --all cargo clippy --workspace --all-targets -- -D warnings # Cargo.toml file formatter(make setup to install) taplo fmt diff --git a/common/ast/src/ast/statements/statement.rs b/common/ast/src/ast/statements/statement.rs index 98aa7cfb20cfc..aa01f9c34cc7d 100644 --- a/common/ast/src/ast/statements/statement.rs +++ b/common/ast/src/ast/statements/statement.rs @@ -55,6 +55,12 @@ pub enum Statement<'a> { Insert(InsertStmt<'a>), + Delete { + catalog: Option>, + database: Option>, + table: Identifier<'a>, + selection: Option>, + }, // Databases ShowDatabases(ShowDatabasesStmt<'a>), ShowCreateDatabase(ShowCreateDatabaseStmt<'a>), @@ -160,6 +166,18 @@ impl<'a> Display for Statement<'a> { } Statement::Query(query) => write!(f, "{query}")?, Statement::Insert(insert) => write!(f, "{insert}")?, + Statement::Delete { + catalog, + database, + table, + selection, + } => { + write!(f, "DELETE FROM ")?; + write_comma_separated_list(f, catalog.iter().chain(database).chain(Some(table)))?; + if let Some(conditions) = selection { + write!(f, "WHERE {conditions} ")?; + } + } Statement::Copy(stmt) => write!(f, "{stmt}")?, Statement::ShowSettings => {} Statement::ShowProcessList => write!(f, "SHOW PROCESSLIST")?, diff --git a/common/ast/src/parser/statement.rs b/common/ast/src/parser/statement.rs index 2c3d28ab63bde..18a08b25413de 100644 --- a/common/ast/src/parser/statement.rs +++ b/common/ast/src/parser/statement.rs @@ -67,6 +67,19 @@ pub fn statement(i: Input) -> IResult { }) }, ); + + let delete = map( + rule! { + DELETE ~ FROM ~ #peroid_separated_idents_1_to_3 + ~ ( WHERE ~ ^#expr )? + }, + |(_, _, (catalog, database, table), opt_where_block)| Statement::Delete { + catalog, + database, + table, + selection: opt_where_block.map(|(_, selection)| selection), + }, + ); let show_settings = value(Statement::ShowSettings, rule! { SHOW ~ SETTINGS }); let show_stages = value(Statement::ShowStages, rule! { SHOW ~ STAGES }); let show_process_list = value(Statement::ShowProcessList, rule! { SHOW ~ PROCESSLIST }); @@ -662,6 +675,7 @@ pub fn statement(i: Input) -> IResult { #map(query, |query| Statement::Query(Box::new(query))) | #explain : "`EXPLAIN [PIPELINE | GRAPH] `" | #insert : "`INSERT INTO [TABLE] [(, ...)] (FORMAT | VALUES | )`" + | #delete : "`DELETE FROM
[WHERE ...]`" | #show_settings : "`SHOW SETTINGS`" | #show_stages : "`SHOW STAGES`" | #show_process_list : "`SHOW PROCESSLIST`" diff --git a/common/ast/src/parser/token.rs b/common/ast/src/parser/token.rs index b5d351162e7d4..c7da877826c69 100644 --- a/common/ast/src/parser/token.rs +++ b/common/ast/src/parser/token.rs @@ -319,6 +319,8 @@ pub enum TokenKind { DECADE, #[token("DEFAULT", ignore(ascii_case))] DEFAULT, + #[token("DELETE", ignore(ascii_case))] + DELETE, #[token("DESC", ignore(ascii_case))] DESC, #[token("DESCRIBE", ignore(ascii_case))] @@ -577,8 +579,6 @@ pub enum TokenKind { USAGE, #[token("UPDATE", ignore(ascii_case))] UPDATE, - #[token("DELETE", ignore(ascii_case))] - DELETE, #[token("SUPER", ignore(ascii_case))] SUPER, #[token("STATUS", ignore(ascii_case))] diff --git a/common/datablocks/src/kernels/data_block_filter.rs b/common/datablocks/src/kernels/data_block_filter.rs index 2d48570974c07..a15404fa56865 100644 --- a/common/datablocks/src/kernels/data_block_filter.rs +++ b/common/datablocks/src/kernels/data_block_filter.rs @@ -22,9 +22,9 @@ use common_exception::Result; use crate::DataBlock; impl DataBlock { - pub fn filter_block(block: &DataBlock, predicate: &ColumnRef) -> Result { + pub fn filter_block(block: DataBlock, predicate: &ColumnRef) -> Result { if block.num_columns() == 0 || block.num_rows() == 0 { - return Ok(block.clone()); + return Ok(block); } let predict_boolean_nonull = Self::cast_to_nonull_boolean(predicate)?; @@ -32,7 +32,7 @@ impl DataBlock { if predict_boolean_nonull.is_const() { let flag = predict_boolean_nonull.get_bool(0)?; if flag { - return Ok(block.clone()); + return Ok(block); } else { return Ok(DataBlock::empty_with_schema(block.schema().clone())); } @@ -42,7 +42,7 @@ impl DataBlock { let rows = boolean_col.len(); let count_zeros = boolean_col.values().null_count(); match count_zeros { - 0 => Ok(block.clone()), + 0 => Ok(block), _ => { if count_zeros == rows { return Ok(DataBlock::empty_with_schema(block.schema().clone())); diff --git a/common/datablocks/tests/it/kernels/data_block_filter.rs b/common/datablocks/tests/it/kernels/data_block_filter.rs index 7a2cb1138deb1..82f88b52e7aa4 100644 --- a/common/datablocks/tests/it/kernels/data_block_filter.rs +++ b/common/datablocks/tests/it/kernels/data_block_filter.rs @@ -31,7 +31,7 @@ fn test_filter_non_const_data_block() -> Result<()> { ]); let predicate = Series::from_data(vec![true, false, true, true, false, false]); - let block = DataBlock::filter_block(&block, &predicate)?; + let block = DataBlock::filter_block(block, &predicate)?; common_datablocks::assert_blocks_eq( vec![ @@ -62,7 +62,7 @@ fn test_filter_all_false_data_block() -> Result<()> { ]); let predicate = Series::from_data(vec![false, false, false, false, false, false]); - let block = DataBlock::filter_block(&block, &predicate)?; + let block = DataBlock::filter_block(block, &predicate)?; common_datablocks::assert_blocks_eq( vec!["+---+---+", "| a | b |", "+---+---+", "+---+---+"], @@ -88,7 +88,7 @@ fn test_filter_const_data_block() -> Result<()> { ]); let predicate = Series::from_data(vec![true, false, true, true, false, false]); - let block = DataBlock::filter_block(&block, &predicate)?; + let block = DataBlock::filter_block(block, &predicate)?; common_datablocks::assert_blocks_eq( vec![ @@ -122,7 +122,7 @@ fn test_filter_all_const_data_block() -> Result<()> { ]); let predicate = Series::from_data(vec![true, false, true, true, false, false]); - let block = DataBlock::filter_block(&block, &predicate)?; + let block = DataBlock::filter_block(block, &predicate)?; common_datablocks::assert_blocks_eq( vec![ diff --git a/common/datavalues/src/data_schema.rs b/common/datavalues/src/data_schema.rs index b9374b752f2fd..9bee361e6811d 100644 --- a/common/datavalues/src/data_schema.rs +++ b/common/datavalues/src/data_schema.rs @@ -131,7 +131,7 @@ impl DataSchema { /// project will do column pruning. #[must_use] - pub fn project(&self, projection: Vec) -> Self { + pub fn project(&self, projection: &[usize]) -> Self { let fields = projection .iter() .map(|idx| self.fields()[*idx].clone()) diff --git a/common/planners/src/lib.rs b/common/planners/src/lib.rs index 5d89d12fd600a..1416765e903b7 100644 --- a/common/planners/src/lib.rs +++ b/common/planners/src/lib.rs @@ -22,6 +22,7 @@ mod plan_database_drop; mod plan_database_rename; mod plan_database_show_create; mod plan_database_undrop; +mod plan_delete; mod plan_empty; mod plan_explain; mod plan_expression; @@ -120,6 +121,7 @@ pub use plan_database_rename::RenameDatabaseEntity; pub use plan_database_rename::RenameDatabasePlan; pub use plan_database_show_create::ShowCreateDatabasePlan; pub use plan_database_undrop::UndropDatabasePlan; +pub use plan_delete::DeletePlan; pub use plan_empty::EmptyPlan; pub use plan_explain::ExplainPlan; pub use plan_explain::ExplainType; @@ -135,6 +137,7 @@ pub use plan_expression_common::expr_as_column_expr; pub use plan_expression_common::extract_aliases; pub use plan_expression_common::find_aggregate_exprs; pub use plan_expression_common::find_aggregate_exprs_in_expr; +pub use plan_expression_common::find_column_exprs; pub use plan_expression_common::find_columns_not_satisfy_exprs; pub use plan_expression_common::find_window_exprs; pub use plan_expression_common::find_window_exprs_in_expr; diff --git a/common/planners/src/plan_delete.rs b/common/planners/src/plan_delete.rs new file mode 100644 index 0000000000000..1b0c239f5177f --- /dev/null +++ b/common/planners/src/plan_delete.rs @@ -0,0 +1,37 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_datavalues::DataSchema; +use common_datavalues::DataSchemaRef; +use common_meta_app::schema::TableIdent; + +use crate::Expression; + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] +pub struct DeletePlan { + pub catalog_name: String, + pub database_name: String, + pub table_name: String, + pub table_id: TableIdent, + pub selection: Option, + pub projection: Vec, +} + +impl DeletePlan { + pub fn schema(&self) -> DataSchemaRef { + Arc::new(DataSchema::empty()) + } +} diff --git a/common/planners/src/plan_node.rs b/common/planners/src/plan_node.rs index 89e0e727b78ae..83ea9baa80986 100644 --- a/common/planners/src/plan_node.rs +++ b/common/planners/src/plan_node.rs @@ -34,6 +34,7 @@ use crate::CreateUserPlan; use crate::CreateUserStagePlan; use crate::CreateUserUDFPlan; use crate::CreateViewPlan; +use crate::DeletePlan; use crate::DescribeTablePlan; use crate::DescribeUserStagePlan; use crate::DropDatabasePlan; @@ -110,6 +111,9 @@ pub enum PlanNode { // Insert. Insert(InsertPlan), + // Delete. + Delete(DeletePlan), + // Copy. Copy(CopyPlan), @@ -219,6 +223,9 @@ impl PlanNode { // Insert. PlanNode::Insert(v) => v.schema(), + // Delete. + PlanNode::Delete(v) => v.schema(), + // Copy. PlanNode::Copy(v) => v.schema(), @@ -327,6 +334,9 @@ impl PlanNode { // Insert. PlanNode::Insert(_) => "InsertPlan", + // Delete. + PlanNode::Delete(_) => "DeletePlan", + // Copy. PlanNode::Copy(_) => "CopyPlan", diff --git a/common/planners/src/plan_node_rewriter.rs b/common/planners/src/plan_node_rewriter.rs index 2e9ba27f78145..012229a6b48ad 100644 --- a/common/planners/src/plan_node_rewriter.rs +++ b/common/planners/src/plan_node_rewriter.rs @@ -40,6 +40,7 @@ use crate::CreateUserPlan; use crate::CreateUserStagePlan; use crate::CreateUserUDFPlan; use crate::CreateViewPlan; +use crate::DeletePlan; use crate::DescribeTablePlan; use crate::DescribeUserStagePlan; use crate::DropDatabasePlan; @@ -139,6 +140,9 @@ pub trait PlanRewriter: Sized { // Insert. PlanNode::Insert(plan) => self.rewrite_insert_into(plan), + // Delete. + PlanNode::Delete(plan) => self.rewrite_delete_into(plan), + // Copy. PlanNode::Copy(plan) => self.rewrite_copy(plan), @@ -448,6 +452,10 @@ pub trait PlanRewriter: Sized { Ok(PlanNode::Insert(plan.clone())) } + fn rewrite_delete_into(&mut self, plan: &DeletePlan) -> Result { + Ok(PlanNode::Delete(plan.clone())) + } + fn rewrite_copy(&mut self, plan: &CopyPlan) -> Result { Ok(PlanNode::Copy(plan.clone())) } diff --git a/common/planners/src/plan_node_visitor.rs b/common/planners/src/plan_node_visitor.rs index 233942d42a570..49643c340b29b 100644 --- a/common/planners/src/plan_node_visitor.rs +++ b/common/planners/src/plan_node_visitor.rs @@ -33,6 +33,7 @@ use crate::CreateUserPlan; use crate::CreateUserStagePlan; use crate::CreateUserUDFPlan; use crate::CreateViewPlan; +use crate::DeletePlan; use crate::DescribeTablePlan; use crate::DescribeUserStagePlan; use crate::DropDatabasePlan; @@ -151,6 +152,9 @@ pub trait PlanVisitor { // Insert. PlanNode::Insert(plan) => self.visit_insert_into(plan), + // Insert. + PlanNode::Delete(plan) => self.visit_delete_into(plan), + // Copy. PlanNode::Copy(plan) => self.visit_copy(plan), @@ -435,6 +439,10 @@ pub trait PlanVisitor { Ok(()) } + fn visit_delete_into(&mut self, _: &DeletePlan) -> Result<()> { + Ok(()) + } + fn visit_copy(&mut self, _: &CopyPlan) -> Result<()> { Ok(()) } diff --git a/common/streams/src/sources/source_parquet.rs b/common/streams/src/sources/source_parquet.rs index 0c332f1444529..38d7313953d7a 100644 --- a/common/streams/src/sources/source_parquet.rs +++ b/common/streams/src/sources/source_parquet.rs @@ -80,8 +80,7 @@ impl ParquetSource where R: AsyncRead + AsyncSeek + Unpin + Send { fn create(builder: ParquetSourceBuilder, reader: R) -> Self { - let arrow_table_schema = - Arc::new(builder.schema.project(builder.projection.clone())).to_arrow(); + let arrow_table_schema = Arc::new(builder.schema.project(&builder.projection)).to_arrow(); ParquetSource { reader, diff --git a/query/src/interpreters/interpreter_delete.rs b/query/src/interpreters/interpreter_delete.rs new file mode 100644 index 0000000000000..87a2bbcce17c3 --- /dev/null +++ b/query/src/interpreters/interpreter_delete.rs @@ -0,0 +1,74 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_datavalues::DataSchema; +use common_datavalues::DataSchemaRef; +use common_exception::Result; +use common_planners::DeletePlan; +use common_streams::DataBlockStream; +use common_streams::SendableDataBlockStream; +use common_tracing::tracing; + +use crate::interpreters::Interpreter; +use crate::interpreters::InterpreterPtr; +use crate::sessions::QueryContext; + +/// interprets DeletePlan +pub struct DeleteInterpreter { + ctx: Arc, + plan: DeletePlan, +} + +impl DeleteInterpreter { + /// Create the DelectInterpreter from DelectPlan + pub fn try_create(ctx: Arc, plan: DeletePlan) -> Result { + Ok(Arc::new(DeleteInterpreter { ctx, plan })) + } +} + +#[async_trait::async_trait] +impl Interpreter for DeleteInterpreter { + /// Get the name of current interpreter + fn name(&self) -> &str { + "DeleteInterpreter" + } + + /// Get the schema of SelectPlan + fn schema(&self) -> DataSchemaRef { + self.plan.schema() + } + + #[tracing::instrument(level = "debug", name = "delete_interpreter_execute", skip(self, _input_stream), fields(ctx.id = self.ctx.get_id().as_str()))] + async fn execute( + &self, + _input_stream: Option, + ) -> Result { + // TODO + // 1. check privilege + // 2. optimize the plan, at least constant folding? + let catalog_name = self.plan.catalog_name.as_str(); + let db_name = self.plan.database_name.as_str(); + let tbl_name = self.plan.table_name.as_str(); + let tbl = self.ctx.get_table(catalog_name, db_name, tbl_name).await?; + tbl.delete(self.ctx.clone(), self.plan.clone()).await?; + + Ok(Box::pin(DataBlockStream::create( + Arc::new(DataSchema::empty()), + None, + vec![], + ))) + } +} diff --git a/query/src/interpreters/interpreter_factory.rs b/query/src/interpreters/interpreter_factory.rs index d8acfc797e27b..b5cba8458796e 100644 --- a/query/src/interpreters/interpreter_factory.rs +++ b/query/src/interpreters/interpreter_factory.rs @@ -38,6 +38,7 @@ use crate::interpreters::CreateTableInterpreter; use crate::interpreters::CreateUserInterpreter; use crate::interpreters::CreateUserUDFInterpreter; use crate::interpreters::CreateViewInterpreter; +use crate::interpreters::DeleteInterpreter; use crate::interpreters::DescribeTableInterpreter; use crate::interpreters::DropDatabaseInterpreter; use crate::interpreters::DropRoleInterpreter; @@ -92,6 +93,7 @@ impl InterpreterFactory { PlanNode::Select(v) => SelectInterpreter::try_create(ctx_clone, v), PlanNode::Explain(v) => ExplainInterpreter::try_create(ctx_clone, v), PlanNode::Insert(v) => InsertInterpreter::try_create(ctx_clone, v, false), + PlanNode::Delete(v) => DeleteInterpreter::try_create(ctx_clone, v), PlanNode::Copy(v) => CopyInterpreter::try_create(ctx_clone, v), PlanNode::Call(v) => CallInterpreter::try_create(ctx_clone, v), PlanNode::Show(ShowPlan::ShowDatabases(v)) => { diff --git a/query/src/interpreters/interpreter_factory_v2.rs b/query/src/interpreters/interpreter_factory_v2.rs index bd044f1c7352a..903892f43233f 100644 --- a/query/src/interpreters/interpreter_factory_v2.rs +++ b/query/src/interpreters/interpreter_factory_v2.rs @@ -199,6 +199,8 @@ impl InterpreterFactoryV2 { InsertInterpreterV2::try_create(ctx.clone(), *insert.clone(), false) } + Plan::Delete(delete) => DeleteInterpreter::try_create(ctx.clone(), *delete.clone()), + // Roles Plan::ShowRoles => ShowRolesInterpreter::try_create(ctx.clone()), Plan::CreateRole(create_role) => { diff --git a/query/src/interpreters/mod.rs b/query/src/interpreters/mod.rs index 22cb3601c5bf5..d0e15111ecf13 100644 --- a/query/src/interpreters/mod.rs +++ b/query/src/interpreters/mod.rs @@ -28,6 +28,7 @@ mod interpreter_database_drop; mod interpreter_database_rename; mod interpreter_database_show_create; mod interpreter_database_undrop; +mod interpreter_delete; mod interpreter_empty; mod interpreter_explain; mod interpreter_explain_v2; @@ -102,6 +103,7 @@ pub use interpreter_database_drop::DropDatabaseInterpreter; pub use interpreter_database_rename::RenameDatabaseInterpreter; pub use interpreter_database_show_create::ShowCreateDatabaseInterpreter; pub use interpreter_database_undrop::UndropDatabaseInterpreter; +pub use interpreter_delete::DeleteInterpreter; pub use interpreter_empty::EmptyInterpreter; pub use interpreter_explain::ExplainInterpreter; pub use interpreter_explain_v2::ExplainInterpreterV2; diff --git a/query/src/pipelines/new/processors/transforms/hash_join/chaining_hash_table.rs b/query/src/pipelines/new/processors/transforms/hash_join/chaining_hash_table.rs index 3fc4b58848005..31539ab967729 100644 --- a/query/src/pipelines/new/processors/transforms/hash_join/chaining_hash_table.rs +++ b/query/src/pipelines/new/processors/transforms/hash_join/chaining_hash_table.rs @@ -322,7 +322,7 @@ impl ChainingHashTable { let func_ctx = self.ctx.try_get_function_context()?; let filter_vector = pred.eval(&func_ctx, &merged)?; let filtered_block = - DataBlock::filter_block(&merged, filter_vector.vector())?; + DataBlock::filter_block(merged, filter_vector.vector())?; if !filtered_block.is_empty() { results.push(DataBlock::block_take_by_indices(input, &[i as u32])?); } @@ -349,7 +349,7 @@ impl ChainingHashTable { let func_ctx = self.ctx.try_get_function_context()?; let filter_vector = pred.eval(&func_ctx, &merged)?; let filtered_block = - DataBlock::filter_block(&merged, filter_vector.vector())?; + DataBlock::filter_block(merged, filter_vector.vector())?; if filtered_block.is_empty() { results.push(DataBlock::block_take_by_indices(input, &[i as u32])?); } @@ -412,7 +412,7 @@ impl ChainingHashTable { // Here, we directly use `predicate_column` to filter the result block. // But pay attention to the fact that **reserved side** may also be filtered. // **reserved side** is the left side in left join, and the right side in right join. - result_block = DataBlock::filter_block(&result_block, filter_vector.vector())?; + result_block = DataBlock::filter_block(result_block, filter_vector.vector())?; } // If result_block is empty, we need to supply a NULL block for probe_block. if result_block.is_empty() { @@ -597,8 +597,8 @@ impl HashJoinState for ChainingHashTable { if let Some(filter) = &self.other_predicate { let func_ctx = self.ctx.try_get_function_context()?; let mut filtered_blocks = Vec::with_capacity(data_blocks.len()); - for block in data_blocks.iter() { - let filter_vector = filter.eval(&func_ctx, block)?; + for block in data_blocks.into_iter() { + let filter_vector = filter.eval(&func_ctx, &block)?; filtered_blocks.push(DataBlock::filter_block(block, filter_vector.vector())?); } data_blocks = filtered_blocks; diff --git a/query/src/pipelines/new/processors/transforms/transform_filter.rs b/query/src/pipelines/new/processors/transforms/transform_filter.rs index c8c3f7750b508..426e913230094 100644 --- a/query/src/pipelines/new/processors/transforms/transform_filter.rs +++ b/query/src/pipelines/new/processors/transforms/transform_filter.rs @@ -107,7 +107,7 @@ impl Transform for TransformFilterImpl { } let filter_block = self.executor.execute(&data)?; - self.correct_with_schema(DataBlock::filter_block(&data, filter_block.column(0))?) + self.correct_with_schema(DataBlock::filter_block(data, filter_block.column(0))?) } } @@ -122,6 +122,6 @@ impl Transform for TransformFilterImpl { } let filter_block = self.executor.execute(&data)?; - self.correct_with_schema(DataBlock::filter_block(&data, filter_block.column(0))?) + self.correct_with_schema(DataBlock::filter_block(data, filter_block.column(0))?) } } diff --git a/query/src/pipelines/new/processors/transforms/transform_filter_v2.rs b/query/src/pipelines/new/processors/transforms/transform_filter_v2.rs index 4e1249b872e5f..686db140389e9 100644 --- a/query/src/pipelines/new/processors/transforms/transform_filter_v2.rs +++ b/query/src/pipelines/new/processors/transforms/transform_filter_v2.rs @@ -51,6 +51,6 @@ impl Transform for TransformFilterV2 { fn transform(&mut self, data: DataBlock) -> Result { let typed_vector = self.predicate.eval(&self.func_ctx, &data)?; let column = typed_vector.vector(); - DataBlock::filter_block(&data, column) + DataBlock::filter_block(data, column) } } diff --git a/query/src/pipelines/transforms/transform_filter.rs b/query/src/pipelines/transforms/transform_filter.rs index a09f130f67f07..ae64a9996ad60 100644 --- a/query/src/pipelines/transforms/transform_filter.rs +++ b/query/src/pipelines/transforms/transform_filter.rs @@ -79,7 +79,7 @@ impl FilterTransform { } let filter_block = executor.execute(&data)?; - DataBlock::filter_block(&data, filter_block.column(0)) + DataBlock::filter_block(data, filter_block.column(0)) } fn filter_map(executor: Arc, data: DataBlock) -> Option> { diff --git a/query/src/sql/parsers/mod.rs b/query/src/sql/parsers/mod.rs index 7f75899813f7c..4689713a6294b 100644 --- a/query/src/sql/parsers/mod.rs +++ b/query/src/sql/parsers/mod.rs @@ -15,6 +15,7 @@ mod parser_call; mod parser_copy; mod parser_database; +mod parser_delete; mod parser_exists; mod parser_explain; mod parser_insert; diff --git a/query/src/sql/parsers/parser_delete.rs b/query/src/sql/parsers/parser_delete.rs new file mode 100644 index 0000000000000..d0961fed278e7 --- /dev/null +++ b/query/src/sql/parsers/parser_delete.rs @@ -0,0 +1,43 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Borrow from apache/arrow/rust/datafusion/src/sql/sql_parser +// See notice.md + +use sqlparser::keywords::Keyword; +use sqlparser::parser::ParserError; + +use crate::sql::statements::DfDeleteStatement; +use crate::sql::DfParser; +use crate::sql::DfStatement; + +impl<'a> DfParser<'a> { + // DELETE. + pub(crate) fn parse_delete(&mut self) -> Result, ParserError> { + self.parser.next_token(); + let parser = &mut self.parser; + parser.expect_keyword(Keyword::FROM)?; + let table_name = parser.parse_object_name()?; + let selection = if parser.parse_keyword(Keyword::WHERE) { + Some(parser.parse_expr()?) + } else { + None + }; + + Ok(DfStatement::Delete(Box::new(DfDeleteStatement { + name: table_name, + selection, + }))) + } +} diff --git a/query/src/sql/planner/binder/delete.rs b/query/src/sql/planner/binder/delete.rs new file mode 100644 index 0000000000000..6f2cc57766ac6 --- /dev/null +++ b/query/src/sql/planner/binder/delete.rs @@ -0,0 +1,109 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use common_ast::ast::Expr; +use common_ast::ast::Identifier; +use common_exception::ErrorCode; +use common_exception::Result; +use common_planners::DeletePlan; +use common_planners::Expression; + +use crate::sql::binder::Binder; +use crate::sql::binder::ScalarBinder; +use crate::sql::exec::ExpressionBuilder; +use crate::sql::plans::Plan; +use crate::sql::statements::query::QueryASTIRVisitor; +use crate::sql::BindContext; + +pub struct DeleteCollectPushDowns {} +impl QueryASTIRVisitor> for DeleteCollectPushDowns { + fn visit_expr(expr: &mut Expression, require_columns: &mut HashSet) -> Result<()> { + if let Expression::Column(name) = expr { + if !require_columns.contains(name) { + require_columns.insert(name.clone()); + } + } + + Ok(()) + } + + fn visit_filter(predicate: &mut Expression, data: &mut HashSet) -> Result<()> { + Self::visit_recursive_expr(predicate, data) + } +} +impl<'a> Binder { + pub(in crate::sql::planner::binder) async fn bind_delete( + &mut self, + bind_context: &BindContext, + catalog: &'a Option>, + database: &'a Option>, + table: &'a Identifier<'a>, + selection: &'a Option>, + ) -> Result { + let mut scalar_binder = + ScalarBinder::new(bind_context, self.ctx.clone(), self.metadata.clone()); + let mut expression = None; + + let mut require_columns: HashSet = HashSet::new(); + if let Some(expr) = selection { + let (scalar, _) = scalar_binder.bind(expr).await?; + let eb = ExpressionBuilder::create(self.metadata.clone()); + let mut pred_expr = eb.build(&scalar)?; + DeleteCollectPushDowns::visit_filter(&mut pred_expr, &mut require_columns)?; + expression = Some(pred_expr); + } + + let catalog_name = match catalog { + Some(catalog) => catalog.name.clone(), + None => self.ctx.get_current_catalog(), + }; + let database_name = match database { + Some(database) => database.name.clone(), + None => self.ctx.get_current_database(), + }; + let table_name = table.name.clone(); + let table = self + .ctx + .get_table(&catalog_name, &database_name, &table_name) + .await?; + + let tbl_info = table.get_table_info(); + let table_id = tbl_info.ident.clone(); + let mut projection = vec![]; + let schema = tbl_info.meta.schema.as_ref(); + for col_name in require_columns { + // TODO refine this, performance & error message + if let Some((idx, _)) = schema.column_with_name(col_name.as_str()) { + projection.push(idx); + } else { + return Err(ErrorCode::UnknownColumn(format!( + "Column [{}] not found", + col_name + ))); + } + } + + let plan = DeletePlan { + catalog_name, + database_name, + table_name, + table_id, + selection: expression, + projection, + }; + Ok(Plan::Delete(Box::new(plan))) + } +} diff --git a/query/src/sql/planner/binder/mod.rs b/query/src/sql/planner/binder/mod.rs index 866f7642bf658..6d1b788ba815b 100644 --- a/query/src/sql/planner/binder/mod.rs +++ b/query/src/sql/planner/binder/mod.rs @@ -39,6 +39,7 @@ mod aggregate; mod bind_context; mod copy; mod ddl; +mod delete; mod distinct; mod insert; mod join; @@ -188,6 +189,15 @@ impl<'a> Binder { self.bind_remove_stage(location, pattern).await? } Statement::Insert(stmt) => self.bind_insert(bind_context, stmt).await?, + Statement::Delete { + catalog, + database, + table, + selection, + } => { + self.bind_delete(bind_context, catalog, database, table, selection) + .await? + } Statement::Grant(stmt) => self.bind_grant(stmt).await?, diff --git a/query/src/sql/planner/format/display_plan.rs b/query/src/sql/planner/format/display_plan.rs index 34f2ca48b6f18..b32132f158afe 100644 --- a/query/src/sql/planner/format/display_plan.rs +++ b/query/src/sql/planner/format/display_plan.rs @@ -64,6 +64,7 @@ impl Plan { // Insert Plan::Insert(insert) => Ok(format!("{:?}", insert)), + Plan::Delete(delete) => Ok(format!("{:?}", delete)), // Stages Plan::ShowStages => Ok("SHOW STAGES".to_string()), diff --git a/query/src/sql/planner/plans/mod.rs b/query/src/sql/planner/plans/mod.rs index 40bde4e8c5a93..0161e4ca94223 100644 --- a/query/src/sql/planner/plans/mod.rs +++ b/query/src/sql/planner/plans/mod.rs @@ -51,6 +51,7 @@ use common_planners::CreateTablePlan; use common_planners::CreateUserPlan; use common_planners::CreateUserStagePlan; use common_planners::CreateViewPlan; +use common_planners::DeletePlan; use common_planners::DescribeTablePlan; use common_planners::DescribeUserStagePlan; use common_planners::DropDatabasePlan; @@ -150,6 +151,7 @@ pub enum Plan { // Insert Insert(Box), + Delete(Box), // Views CreateView(Box), @@ -228,6 +230,7 @@ impl Display for Plan { Plan::RevokePriv(_) => write!(f, "RevokePriv"), Plan::RevokeRole(_) => write!(f, "RevokeRole"), Plan::Insert(_) => write!(f, "Insert"), + Plan::Delete(_) => write!(f, "Delete"), } } } @@ -287,6 +290,7 @@ impl Plan { Plan::RevokePriv(_) => Arc::new(DataSchema::empty()), Plan::RevokeRole(_) => Arc::new(DataSchema::empty()), Plan::Insert(plan) => plan.schema(), + Plan::Delete(_) => Arc::new(DataSchema::empty()), } } } diff --git a/query/src/sql/sql_parser.rs b/query/src/sql/sql_parser.rs index edd8906c32ee6..bcc4b91f34ac0 100644 --- a/query/src/sql/sql_parser.rs +++ b/query/src/sql/sql_parser.rs @@ -198,6 +198,7 @@ impl<'a> DfParser<'a> { Keyword::SET => self.parse_set(), Keyword::INSERT => self.parse_insert(), Keyword::SELECT | Keyword::WITH | Keyword::VALUES => self.parse_query(), + Keyword::DELETE => self.parse_delete(), Keyword::GRANT => { self.parser.next_token(); self.parse_grant() diff --git a/query/src/sql/sql_statement.rs b/query/src/sql/sql_statement.rs index ce17d2fd5021a..ffd9d02330524 100644 --- a/query/src/sql/sql_statement.rs +++ b/query/src/sql/sql_statement.rs @@ -41,6 +41,7 @@ use crate::sql::statements::DfCreateTable; use crate::sql::statements::DfCreateUDF; use crate::sql::statements::DfCreateUser; use crate::sql::statements::DfCreateView; +use crate::sql::statements::DfDeleteStatement; use crate::sql::statements::DfDescribeTable; use crate::sql::statements::DfDropDatabase; use crate::sql::statements::DfDropRole; @@ -133,6 +134,9 @@ pub enum DfStatement<'a> { // Insert InsertQuery(DfInsertStatement<'a>), + // Delete + Delete(Box), + // User CreateUser(DfCreateUser), AlterUser(DfAlterUser), diff --git a/query/src/sql/statements/analyzer_statement.rs b/query/src/sql/statements/analyzer_statement.rs index c3d058ba43a39..7146788d3fd2c 100644 --- a/query/src/sql/statements/analyzer_statement.rs +++ b/query/src/sql/statements/analyzer_statement.rs @@ -173,6 +173,7 @@ impl<'a> AnalyzableStatement for DfStatement<'a> { DfStatement::ShowGrants(v) => v.analyze(ctx).await, DfStatement::KillStatement(v) => v.analyze(ctx).await, DfStatement::InsertQuery(v) => v.analyze(ctx).await, + DfStatement::Delete(v) => v.analyze(ctx).await, DfStatement::SetVariable(v) => v.analyze(ctx).await, DfStatement::CreateUser(v) => v.analyze(ctx).await, DfStatement::AlterUser(v) => v.analyze(ctx).await, diff --git a/query/src/sql/statements/mod.rs b/query/src/sql/statements/mod.rs index 8f6d7fe5d3c47..e8a87817ddc4a 100644 --- a/query/src/sql/statements/mod.rs +++ b/query/src/sql/statements/mod.rs @@ -33,6 +33,7 @@ mod statement_create_udf; mod statement_create_user; mod statement_create_user_stage; mod statement_create_view; +mod statement_delete; mod statement_describe_table; mod statement_describe_user_stage; mod statement_drop_database; @@ -102,6 +103,7 @@ pub use statement_create_user::DfCreateUser; pub use statement_create_user::DfUserWithOption; pub use statement_create_user_stage::DfCreateUserStage; pub use statement_create_view::DfCreateView; +pub use statement_delete::DfDeleteStatement; pub use statement_describe_table::DfDescribeTable; pub use statement_describe_user_stage::DfDescribeUserStage; pub use statement_drop_database::DfDropDatabase; diff --git a/query/src/sql/statements/statement_delete.rs b/query/src/sql/statements/statement_delete.rs new file mode 100644 index 0000000000000..e94b76afffe1c --- /dev/null +++ b/query/src/sql/statements/statement_delete.rs @@ -0,0 +1,123 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use common_datavalues::DataSchema; +use common_exception::ErrorCode; +use common_exception::Result; +use common_planners::DeletePlan; +use common_planners::Expression; +use common_planners::PlanNode; +use common_tracing::tracing; +use sqlparser::ast::Expr; +use sqlparser::ast::ObjectName; + +use crate::sessions::QueryContext; +use crate::sql::statements::query::QueryASTIRVisitor; +use crate::sql::statements::resolve_table; +use crate::sql::statements::AnalyzableStatement; +use crate::sql::statements::AnalyzedResult; +use crate::sql::statements::ExpressionAnalyzer; +use crate::sql::statements::QueryRelation; +use crate::storages::view::view_table::VIEW_ENGINE; + +#[derive(Debug, Clone, PartialEq)] +pub struct DfDeleteStatement { + pub name: ObjectName, + pub selection: Option, +} + +pub struct DeleteCollectPushDowns {} + +/// Collect the query need to push downs parts . +impl QueryASTIRVisitor> for DeleteCollectPushDowns { + fn visit_expr(expr: &mut Expression, require_columns: &mut HashSet) -> Result<()> { + if let Expression::Column(name) = expr { + if !require_columns.contains(name) { + require_columns.insert(name.clone()); + } + } + + Ok(()) + } + + fn visit_filter(predicate: &mut Expression, data: &mut HashSet) -> Result<()> { + Self::visit_recursive_expr(predicate, data) + } +} + +#[async_trait::async_trait] +impl AnalyzableStatement for DfDeleteStatement { + #[tracing::instrument(level = "debug", skip(self, ctx), fields(ctx.id = ctx.get_id().as_str()))] + async fn analyze(&self, ctx: Arc) -> Result { + let (catalog_name, database_name, table_name) = + resolve_table(&ctx, &self.name, "DELETE from TABLE")?; + + let table = ctx + .get_table(&catalog_name, &database_name, &table_name) + .await?; + let tbl_info = table.get_table_info(); + if tbl_info.engine() == VIEW_ENGINE { + return Err(ErrorCode::SemanticError("Delete from view not allowed")); + } + + let tenant = ctx.get_tenant(); + let udfs = ctx.get_user_manager().get_udfs(&tenant).await?; + let analyzer = ExpressionAnalyzer::create_with_udfs_support(ctx, udfs); + let mut require_columns = HashSet::new(); + let selection = if let Some(predicate) = &self.selection { + let mut pred_expr = analyzer.analyze(predicate).await?; + DeleteCollectPushDowns::visit_filter(&mut pred_expr, &mut require_columns)?; + Some(pred_expr) + } else { + None + }; + + let table_id = tbl_info.ident.clone(); + let mut projection = vec![]; + let schema = tbl_info.meta.schema.as_ref(); + for col_name in require_columns { + // TODO refine this, performance & error message + if let Some((idx, _)) = schema.column_with_name(col_name.as_str()) { + projection.push(idx); + } else { + return Err(ErrorCode::UnknownColumn(format!( + "Column [{}] not found", + col_name + ))); + } + } + + // Parallel / Distributed execution of deletion not supported yet + Ok(AnalyzedResult::SimpleQuery(Box::new(PlanNode::Delete( + DeletePlan { + catalog_name, + database_name, + table_name, + table_id, + selection, + projection, + }, + )))) + } +} + +#[derive(Clone, Default)] +pub struct DeleteAnalyzeState { + pub filter: Option, + pub relation: QueryRelation, + pub schema: Arc, +} diff --git a/query/src/sql/statements/statement_select.rs b/query/src/sql/statements/statement_select.rs index e26ff7e519401..ce83f8886b413 100644 --- a/query/src/sql/statements/statement_select.rs +++ b/query/src/sql/statements/statement_select.rs @@ -301,9 +301,6 @@ impl DfQueryStatement { name_parts, .. } => { - // TODO - // shall we put the catalog name in the table_info? - // table already resolved here let catalog_name = Self::resolve_catalog(&ctx, &name_parts)?; let source_plan = table .read_plan_with_catalog(ctx.clone(), catalog_name, push_downs) diff --git a/query/src/storages/fuse/fuse_table.rs b/query/src/storages/fuse/fuse_table.rs index ed3d4ac8c11ef..a5d3a72a337c3 100644 --- a/query/src/storages/fuse/fuse_table.rs +++ b/query/src/storages/fuse/fuse_table.rs @@ -25,6 +25,7 @@ use common_meta_app::schema::TableInfo; use common_meta_app::schema::TableMeta; use common_meta_app::schema::UpdateTableMetaReq; use common_meta_types::MatchSeq; +use common_planners::DeletePlan; use common_planners::Expression; use common_planners::Extras; use common_planners::Partitions; @@ -383,6 +384,7 @@ impl Table for FuseTable { Ok(Box::pin(data_block_stream)) } + #[tracing::instrument(level = "debug", name = "fuse_table_commit_insertion", skip(self, ctx, operations), fields(ctx.id = ctx.get_id().as_str()))] async fn commit_insertion( &self, ctx: Arc, @@ -400,15 +402,18 @@ impl Table for FuseTable { .await } + #[tracing::instrument(level = "debug", name = "fuse_table_truncate", skip(self, ctx), fields(ctx.id = ctx.get_id().as_str()))] async fn truncate( &self, ctx: Arc, truncate_plan: TruncateTablePlan, ) -> Result<()> { self.check_mutable()?; - self.do_truncate(ctx, truncate_plan).await + self.do_truncate(ctx, truncate_plan.purge, truncate_plan.catalog.as_str()) + .await } + #[tracing::instrument(level = "debug", name = "fuse_table_optimize", skip(self, ctx), fields(ctx.id = ctx.get_id().as_str()))] async fn optimize(&self, ctx: Arc, keep_last_snapshot: bool) -> Result<()> { self.check_mutable()?; self.do_gc(&ctx, keep_last_snapshot).await @@ -424,6 +429,7 @@ impl Table for FuseTable { })) } + #[tracing::instrument(level = "debug", name = "fuse_table_navigate_to", skip(self, ctx), fields(ctx.id = ctx.get_id().as_str()))] async fn navigate_to( &self, ctx: Arc, @@ -438,4 +444,9 @@ impl Table for FuseTable { } } } + + #[tracing::instrument(level = "debug", name = "fuse_table_delete", skip(self, ctx), fields(ctx.id = ctx.get_id().as_str()))] + async fn delete(&self, ctx: Arc, delete_plan: DeletePlan) -> Result<()> { + self.do_delete(ctx, &delete_plan).await + } } diff --git a/query/src/storages/fuse/io/locations.rs b/query/src/storages/fuse/io/locations.rs index 08befa8cf199a..28ef165494337 100644 --- a/query/src/storages/fuse/io/locations.rs +++ b/query/src/storages/fuse/io/locations.rs @@ -27,7 +27,7 @@ use crate::storages::fuse::meta::SnapshotVersion; use crate::storages::fuse::meta::Versioned; static SNAPSHOT_V0: SnapshotVersion = SnapshotVersion::V0(PhantomData); -static SNAPHOST_V1: SnapshotVersion = SnapshotVersion::V1(PhantomData); +static SNAPSHOT_V1: SnapshotVersion = SnapshotVersion::V1(PhantomData); #[derive(Clone)] pub struct TableMetaLocationGenerator { @@ -71,8 +71,8 @@ impl TableMetaLocationGenerator { } pub fn snapshot_version(location: impl AsRef) -> u64 { - if location.as_ref().ends_with(SNAPHOST_V1.suffix()) { - SNAPHOST_V1.version() + if location.as_ref().ends_with(SNAPSHOT_V1.suffix()) { + SNAPSHOT_V1.version() } else { SNAPSHOT_V0.version() } diff --git a/query/src/storages/fuse/io/mod.rs b/query/src/storages/fuse/io/mod.rs index 5f83d3b48cffc..8364ea5239617 100644 --- a/query/src/storages/fuse/io/mod.rs +++ b/query/src/storages/fuse/io/mod.rs @@ -27,4 +27,6 @@ pub use write::write_block; pub use write::write_meta; pub use write::BlockCompactor; pub use write::BlockStreamWriter; +pub use write::BlockWriter; pub use write::SegmentInfoStream; +pub use write::SegmentWriter; diff --git a/query/src/storages/fuse/io/read/block_reader.rs b/query/src/storages/fuse/io/read/block_reader.rs index 152ef7b546377..6a661abdc6f85 100644 --- a/query/src/storages/fuse/io/read/block_reader.rs +++ b/query/src/storages/fuse/io/read/block_reader.rs @@ -45,6 +45,7 @@ use crate::storages::fuse::fuse_part::ColumnMeta; use crate::storages::fuse::fuse_part::FusePartInfo; use crate::storages::fuse::io::retry; use crate::storages::fuse::io::retry::Retryable; +use crate::storages::fuse::meta::BlockMeta; use crate::storages::fuse::meta::Compression; #[derive(Clone)] @@ -62,7 +63,7 @@ impl BlockReader { schema: DataSchemaRef, projection: Vec, ) -> Result> { - let projected_schema = DataSchemaRef::new(schema.project(projection.clone())); + let projected_schema = DataSchemaRef::new(schema.project(&projection)); let arrow_schema = schema.to_arrow(); let parquet_schema_descriptor = to_parquet_schema(&arrow_schema)?; @@ -75,7 +76,7 @@ impl BlockReader { })) } - fn to_deserialize( + fn to_array_iter( meta: &ColumnMeta, chunk: Vec, rows: usize, @@ -106,6 +107,71 @@ impl BlockReader { )?) } + // TODO refine these + + #[tracing::instrument(level = "debug", skip_all)] + pub async fn read_with_block_meta(&self, meta: &BlockMeta) -> Result { + let (num_rows, columns_array_iter) = self.read_columns_with_block_meta(meta).await?; + let mut deserializer = RowGroupDeserializer::new(columns_array_iter, num_rows, None); + self.try_next_block(&mut deserializer) + } + // TODO refine these + + pub async fn read_columns_with_block_meta( + &self, + meta: &BlockMeta, + ) -> Result<(usize, Vec>)> { + let rows = meta.row_count as usize; + let num_cols = self.projection.len(); + let mut column_chunk_futs = Vec::with_capacity(num_cols); + let mut col_idx = Vec::with_capacity(num_cols); + for index in &self.projection { + let column_meta = meta + .col_metas + .get(&(*index as u32)) + .ok_or_else(|| ErrorCode::LogicalError(format!("index out of bound {}", index)))?; + let column_reader = self.operator.object(&meta.location.0); + let fut = async move { + let column_chunk = column_reader + .range_read(column_meta.offset..column_meta.offset + column_meta.len) + .await?; + Ok::<_, ErrorCode>(column_chunk) + } + .instrument(debug_span!("read_col_chunk")); + column_chunk_futs.push(fut); + col_idx.push(index); + } + + let chunks = futures::stream::iter(column_chunk_futs) + .buffered(std::cmp::min(10, num_cols)) + .try_collect::>() + .await?; + + let mut columns_array_iter = Vec::with_capacity(num_cols); + for (i, column_chunk) in chunks.into_iter().enumerate() { + let idx = *col_idx[i]; + let field = self.arrow_schema.fields[idx].clone(); + let column_descriptor = &self.parquet_schema_descriptor.columns()[idx]; + //let column_meta = &part.columns_meta[&idx]; + let column_meta = &meta + .col_metas + .get(&(i as u32)) + .ok_or_else(|| ErrorCode::LogicalError(format!("index out of bound {}", i)))?; + let part_col_meta = + ColumnMeta::create(column_meta.offset, column_meta.len, column_meta.num_values); + columns_array_iter.push(Self::to_array_iter( + &part_col_meta, + column_chunk, + rows, + column_descriptor, + field, + &meta.compression(), + )?); + } + + Ok((rows, columns_array_iter)) + } + async fn read_columns(&self, part: PartInfoPtr) -> Result<(usize, Vec>)> { let part = FusePartInfo::from_part(&part)?; @@ -139,7 +205,7 @@ impl BlockReader { let field = self.arrow_schema.fields[idx].clone(); let column_descriptor = &self.parquet_schema_descriptor.columns()[idx]; let column_meta = &part.columns_meta[&idx]; - columns_array_iter.push(Self::to_deserialize( + columns_array_iter.push(Self::to_array_iter( column_meta, column_chunk, rows, @@ -168,7 +234,7 @@ impl BlockReader { let field = self.arrow_schema.fields[index].clone(); let column_descriptor = &self.parquet_schema_descriptor.columns()[index]; let column_meta = &part.columns_meta[&index]; - columns_array_iter.push(Self::to_deserialize( + columns_array_iter.push(Self::to_array_iter( column_meta, column_chunk, num_rows, diff --git a/query/src/storages/fuse/io/write/block_stream_writer.rs b/query/src/storages/fuse/io/write/block_stream_writer.rs index fa3ffdb77b623..fddd7828ebf37 100644 --- a/query/src/storages/fuse/io/write/block_stream_writer.rs +++ b/query/src/storages/fuse/io/write/block_stream_writer.rs @@ -12,10 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::sync::Arc; -use common_arrow::parquet::FileMetaData; use common_datablocks::DataBlock; use common_exception::ErrorCode; use common_exception::Result; @@ -31,10 +29,9 @@ use super::block_writer; use crate::pipelines::transforms::ExpressionExecutor; use crate::sessions::QueryContext; use crate::storages::fuse::io::TableMetaLocationGenerator; -use crate::storages::fuse::meta::ColumnId; -use crate::storages::fuse::meta::ColumnMeta; use crate::storages::fuse::meta::SegmentInfo; use crate::storages::fuse::meta::Statistics; +use crate::storages::fuse::operations::column_metas; use crate::storages::fuse::statistics::accumulator::BlockStatistics; use crate::storages::fuse::statistics::StatisticsAccumulator; use crate::storages::index::ClusterKeyInfo; @@ -60,7 +57,7 @@ impl BlockStreamWriter { block_per_segment: usize, meta_locations: TableMetaLocationGenerator, cluster_key_info: Option, - ) -> SegmentInfoStream { + ) -> Result { // filter out empty blocks let block_stream = block_stream.try_filter(|block| std::future::ready(block.num_rows() > 0)); @@ -75,21 +72,25 @@ impl BlockStreamWriter { // Write out the blocks. // And transform the stream of DataBlocks into Stream of SegmentInfo at the same time. - let block_writer = - BlockStreamWriter::new(block_per_segment, meta_locations, ctx, cluster_key_info); + let block_writer = BlockStreamWriter::try_create( + block_per_segment, + meta_locations, + ctx, + cluster_key_info, + )?; let segments = Self::transform(Box::pin(block_stream), block_writer); - Box::pin(segments) + Ok(Box::pin(segments)) } - pub fn new( + pub fn try_create( num_block_threshold: usize, meta_locations: TableMetaLocationGenerator, ctx: Arc, cluster_key_info: Option, - ) -> Self { - let data_accessor = ctx.get_storage_operator().unwrap(); - Self { + ) -> Result { + let data_accessor = ctx.get_storage_operator()?; + Ok(Self { num_block_threshold, data_accessor, number_of_blocks_accumulated: 0, @@ -97,7 +98,7 @@ impl BlockStreamWriter { meta_locations, cluster_key_info, ctx, - } + }) } /// Transforms a stream of S to a stream of T @@ -136,7 +137,7 @@ impl BlockStreamWriter { let mut block = data_block; if let Some(v) = self.cluster_key_info.as_mut() { let input_schema = block.schema().clone(); - let cluster_key_index = if v.cluster_key_index.is_empty() { + if v.cluster_key_index.is_empty() { let fields = input_schema.fields().clone(); let index = v .exprs @@ -146,16 +147,13 @@ impl BlockStreamWriter { fields.iter().position(|f| f.name() == &cname).unwrap() }) .collect::>(); - v.cluster_key_index = index.clone(); - index - } else { - v.cluster_key_index.clone() + v.cluster_key_index = index; }; cluster_stats = BlockStatistics::clusters_statistics( v.cluster_key_id, - cluster_key_index, - block.clone(), + &v.cluster_key_index, + &block, )?; // Remove unused columns before serialize @@ -188,12 +186,10 @@ impl BlockStreamWriter { let mut acc = self.statistics_accumulator.take().unwrap_or_default(); let partial_acc = acc.begin(&block, cluster_stats)?; - let schema = block.schema().to_arrow(); let location = self.meta_locations.gen_block_location(); let (file_size, file_meta_data) = - block_writer::write_block(&schema, block, self.data_accessor.clone(), &location) - .await?; - let col_metas = Self::column_metas(&file_meta_data)?; + block_writer::write_block(block, &self.data_accessor, &location).await?; + let col_metas = column_metas(&file_meta_data)?; acc = partial_acc.end(file_size, location, col_metas); self.number_of_blocks_accumulated += 1; if self.number_of_blocks_accumulated >= self.num_block_threshold { @@ -218,50 +214,6 @@ impl BlockStreamWriter { Ok(None) } } - - fn column_metas(file_meta: &FileMetaData) -> Result> { - // currently we use one group only - let num_row_groups = file_meta.row_groups.len(); - if num_row_groups != 1 { - return Err(ErrorCode::ParquetError(format!( - "invalid parquet file, expects only one row group, but got {}", - num_row_groups - ))); - } - let row_group = &file_meta.row_groups[0]; - let mut col_metas = HashMap::with_capacity(row_group.columns.len()); - for (idx, col_chunk) in row_group.columns.iter().enumerate() { - match &col_chunk.meta_data { - Some(chunk_meta) => { - let col_start = - if let Some(dict_page_offset) = chunk_meta.dictionary_page_offset { - dict_page_offset - } else { - chunk_meta.data_page_offset - }; - let col_len = chunk_meta.total_compressed_size; - assert!( - col_start >= 0 && col_len >= 0, - "column start and length should not be negative" - ); - let num_values = chunk_meta.num_values as u64; - let res = ColumnMeta { - offset: col_start as u64, - len: col_len as u64, - num_values, - }; - col_metas.insert(idx as u32, res); - } - None => { - return Err(ErrorCode::ParquetError(format!( - "invalid parquet file, meta data of column idx {} is empty", - idx - ))) - } - } - } - Ok(col_metas) - } } /// Takes elements of type S in, and spills elements of type T. diff --git a/query/src/storages/fuse/io/write/block_writer.rs b/query/src/storages/fuse/io/write/block_writer.rs index cd2441d05d5ce..e32e247e49203 100644 --- a/query/src/storages/fuse/io/write/block_writer.rs +++ b/query/src/storages/fuse/io/write/block_writer.rs @@ -15,7 +15,6 @@ use common_arrow::arrow::chunk::Chunk; use common_arrow::arrow::datatypes::DataType as ArrowDataType; -use common_arrow::arrow::datatypes::Schema as ArrowSchema; use common_arrow::arrow::io::parquet::write::RowGroupIterator; use common_arrow::arrow::io::parquet::write::WriteOptions; use common_arrow::parquet::compression::CompressionOptions; @@ -32,11 +31,53 @@ use opendal::Operator; use crate::storages::fuse::io::retry; use crate::storages::fuse::io::retry::Retryable; +use crate::storages::fuse::io::TableMetaLocationGenerator; +use crate::storages::fuse::meta::BlockMeta; +use crate::storages::fuse::meta::Versioned; +use crate::storages::fuse::operations::util; +use crate::storages::fuse::statistics::accumulator; + +pub struct BlockWriter<'a> { + location_generator: &'a TableMetaLocationGenerator, + data_accessor: &'a Operator, +} + +impl<'a> BlockWriter<'a> { + pub fn new( + data_accessor: &'a Operator, + location_generator: &'a TableMetaLocationGenerator, + ) -> Self { + Self { + location_generator, + data_accessor, + } + } + pub async fn write(&self, block: DataBlock) -> Result { + let location = self.location_generator.gen_block_location(); + let data_accessor = &self.data_accessor; + let row_count = block.num_rows() as u64; + let block_size = block.memory_size() as u64; + let col_stats = accumulator::columns_statistics(&block)?; + let (file_size, file_meta_data) = write_block(block, data_accessor, &location).await?; + let col_metas = util::column_metas(&file_meta_data)?; + let cluster_stats = None; // TODO confirm this with zhyass + let location = (location, DataBlock::VERSION); + let block_meta = BlockMeta::new( + row_count, + block_size, + file_size, + col_stats, + col_metas, + cluster_stats, + location, + ); + Ok(block_meta) + } +} pub async fn write_block( - _arrow_schema: &ArrowSchema, block: DataBlock, - data_accessor: Operator, + data_accessor: &Operator, location: &str, ) -> Result<(u64, FileMetaData)> { // we need a configuration of block size threshold here diff --git a/query/src/storages/fuse/io/write/mod.rs b/query/src/storages/fuse/io/write/mod.rs index 8b38cfcfd585e..b3a3e3440120c 100644 --- a/query/src/storages/fuse/io/write/mod.rs +++ b/query/src/storages/fuse/io/write/mod.rs @@ -15,6 +15,7 @@ mod block_stream_writer; mod block_writer; mod meta_writer; +mod segment_writer; // for testing only pub use block_stream_writer::BlockCompactor; @@ -22,4 +23,6 @@ pub use block_stream_writer::BlockStreamWriter; pub use block_stream_writer::SegmentInfoStream; pub use block_writer::serialize_data_blocks; pub use block_writer::write_block; +pub use block_writer::BlockWriter; pub use meta_writer::write_meta; +pub use segment_writer::SegmentWriter; diff --git a/query/src/storages/fuse/io/write/segment_writer.rs b/query/src/storages/fuse/io/write/segment_writer.rs new file mode 100644 index 0000000000000..413b62dc685c0 --- /dev/null +++ b/query/src/storages/fuse/io/write/segment_writer.rs @@ -0,0 +1,58 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +use std::sync::Arc; + +use common_cache::Cache; +use common_exception::Result; +use opendal::Operator; + +use crate::storages::fuse::cache::SegmentInfoCache; +use crate::storages::fuse::io::write_meta; +use crate::storages::fuse::io::TableMetaLocationGenerator; +use crate::storages::fuse::meta::Location; +use crate::storages::fuse::meta::SegmentInfo; +use crate::storages::fuse::meta::Versioned; + +pub struct SegmentWriter<'a> { + location_generator: &'a TableMetaLocationGenerator, + data_accessor: &'a Operator, + cache: &'a Option, +} + +impl<'a> SegmentWriter<'a> { + pub fn new( + data_accessor: &'a Operator, + location_generator: &'a TableMetaLocationGenerator, + cache: &'a Option, + ) -> Self { + Self { + location_generator, + data_accessor, + cache, + } + } + pub async fn write_segment(&self, segment: SegmentInfo) -> Result { + let segment_path = self.location_generator.gen_segment_info_location(); + let segment_location = (segment_path, SegmentInfo::VERSION); + write_meta(self.data_accessor, segment_location.0.as_str(), &segment).await?; + + if let Some(ref cache) = self.cache { + let cache = &mut cache.write().await; + cache.put(segment_location.0.clone(), Arc::new(segment)); + } + Ok(segment_location) + } +} diff --git a/query/src/storages/fuse/meta/v1/segment.rs b/query/src/storages/fuse/meta/v1/segment.rs index 2983f62f75075..23e3d0cc160a7 100644 --- a/query/src/storages/fuse/meta/v1/segment.rs +++ b/query/src/storages/fuse/meta/v1/segment.rs @@ -58,7 +58,34 @@ pub struct BlockMeta { /// `Lz4` is merely for backward compatibility, it will NO longer be /// used in the write path. #[serde(default = "Compression::legacy")] - pub compression: Compression, + compression: Compression, +} + +impl BlockMeta { + pub fn new( + row_count: u64, + block_size: u64, + file_size: u64, + col_stats: HashMap, + col_metas: HashMap, + cluster_stats: Option, + location: Location, + ) -> Self { + Self { + row_count, + block_size, + file_size, + col_stats, + col_metas, + cluster_stats, + location, + compression: Compression::Lz4Raw, + } + } + + pub fn compression(&self) -> Compression { + self.compression + } } impl SegmentInfo { diff --git a/query/src/storages/fuse/meta/v1/snapshot.rs b/query/src/storages/fuse/meta/v1/snapshot.rs index 22cb7e9664f3b..a453b8246e054 100644 --- a/query/src/storages/fuse/meta/v1/snapshot.rs +++ b/query/src/storages/fuse/meta/v1/snapshot.rs @@ -19,6 +19,7 @@ use chrono::Utc; use common_datavalues::DataSchema; use serde::Deserialize; use serde::Serialize; +use uuid::Uuid; use crate::storages::fuse::meta::common::ClusterKey; use crate::storages::fuse::meta::common::FormatVersion; @@ -88,6 +89,20 @@ impl TableSnapshot { } } + pub fn from_previous(previous: &TableSnapshot) -> Self { + let id = Uuid::new_v4(); + let clone = previous.clone(); + Self::new( + id, + &clone.timestamp, + Some((clone.snapshot_id, clone.format_version)), + clone.schema, + clone.summary, + clone.segments, + clone.cluster_key_meta, + ) + } + pub fn format_version(&self) -> u64 { self.format_version } diff --git a/query/src/storages/fuse/operations/append.rs b/query/src/storages/fuse/operations/append.rs index 1a21c65a091ba..8bb15ce100f8c 100644 --- a/query/src/storages/fuse/operations/append.rs +++ b/query/src/storages/fuse/operations/append.rs @@ -78,7 +78,7 @@ impl FuseTable { self.meta_location_generator().clone(), cluster_key_info, ) - .await; + .await?; let locs = self.meta_location_generator().clone(); let segment_info_cache = ctx.get_storage_cache_manager().get_table_segment_cache(); diff --git a/query/src/storages/fuse/operations/commit.rs b/query/src/storages/fuse/operations/commit.rs index b83b9dd04d2fa..ba5ebd3adb43b 100644 --- a/query/src/storages/fuse/operations/commit.rs +++ b/query/src/storages/fuse/operations/commit.rs @@ -270,7 +270,7 @@ impl FuseTable { Ok(new_snapshot) } - async fn commit_to_meta_server( + pub async fn commit_to_meta_server( ctx: &QueryContext, catalog_name: &str, table_info: &TableInfo, @@ -327,7 +327,7 @@ impl FuseTable { acc.col_stats = if acc.col_stats.is_empty() { stats.col_stats.clone() } else { - statistics::reduce_block_stats(&[&acc.col_stats, &stats.col_stats])? + statistics::reduce_block_statistics(&[&acc.col_stats, &stats.col_stats])? }; seg_acc.push(loc.clone()); Ok::<_, ErrorCode>((acc, seg_acc)) diff --git a/query/src/storages/fuse/operations/delete.rs b/query/src/storages/fuse/operations/delete.rs new file mode 100644 index 0000000000000..ffa1ef6b5c0df --- /dev/null +++ b/query/src/storages/fuse/operations/delete.rs @@ -0,0 +1,127 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_exception::Result; +use common_planners::DeletePlan; +use common_planners::Expression; +use common_planners::Extras; +use common_tracing::tracing::debug; + +use crate::sessions::QueryContext; +use crate::storages::fuse::meta::TableSnapshot; +use crate::storages::fuse::operations::mutation::delete_from_block; +use crate::storages::fuse::operations::mutation::mutations_collector::Deletion; +use crate::storages::fuse::operations::mutation::mutations_collector::DeletionCollector; +use crate::storages::fuse::pruning::BlockPruner; +use crate::storages::fuse::FuseTable; +use crate::storages::Table; + +impl FuseTable { + pub async fn do_delete(&self, ctx: Arc, plan: &DeletePlan) -> Result<()> { + let snapshot_opt = self.read_table_snapshot(ctx.as_ref()).await?; + + // check if table is empty + let snapshot = if let Some(val) = snapshot_opt { + val + } else { + // no snapshot, no deletion + return Ok(()); + }; + + if snapshot.summary.row_count == 0 { + // empty snapshot, no deletion + return Ok(()); + } + + // check if unconditional deletion + if let Some(filter) = &plan.selection { + self.delete_rows(ctx, &snapshot, filter, plan).await + } else { + // deleting the whole table... just a truncate + let purge = false; + debug!( + "unconditionally delete from table, {}.{}.{}", + plan.catalog_name, plan.database_name, plan.table_name + ); + return self + .do_truncate(ctx.clone(), purge, plan.catalog_name.as_str()) + .await; + } + } + + async fn delete_rows( + &self, + ctx: Arc, + snapshot: &Arc, + filter: &Expression, + plan: &DeletePlan, + ) -> Result<()> { + let mut deletion_collector = + DeletionCollector::try_create(ctx.as_ref(), &self.meta_location_generator, snapshot)?; + let schema = self.table_info.schema(); + // TODO refine pruner + let extras = Extras { + projection: Some(plan.projection.clone()), + filters: vec![filter.clone()], + limit: None, + order_by: vec![], + }; + let push_downs = Some(extras); + let block_metas = BlockPruner::new(snapshot.clone()) + .apply(ctx.as_ref(), schema, &push_downs) + .await?; + + // delete block one by one. + // this could be executed in a distributed manner (till new planner, pipeline settled down) + for (seg_idx, block_meta) in block_metas { + let proj = plan.projection.clone(); + match delete_from_block(self, &block_meta, &ctx, proj, filter).await? { + Deletion::NothingDeleted => { + // false positive, we should keep the whole block + continue; + } + Deletion::Remains(r) => { + // after deletion, the data block `r` remains, let keep it by replacing the block + // located at `block_meta.location`, of segment indexed by `seg_idx`, with a new block `r` + deletion_collector + .replace_with(seg_idx, block_meta.location.clone(), r) + .await? + } + } + } + self.commit_deletion(ctx.as_ref(), deletion_collector, &plan.catalog_name) + .await + } + + async fn commit_deletion( + &self, + ctx: &QueryContext, + del_holder: DeletionCollector<'_>, + catalog_name: &str, + ) -> Result<()> { + let (new_snapshot, loc) = del_holder.into_new_snapshot().await?; + Self::commit_to_meta_server( + ctx, + catalog_name, + self.get_table_info(), + loc, + &new_snapshot.summary, + ) + .await?; + // TODO check if error is recoverable, and try to resolve the conflict + Ok(()) + } +} diff --git a/query/src/storages/fuse/operations/fuse_sink.rs b/query/src/storages/fuse/operations/fuse_sink.rs index 2ee6eb4287833..c1d45a044fac6 100644 --- a/query/src/storages/fuse/operations/fuse_sink.rs +++ b/query/src/storages/fuse/operations/fuse_sink.rs @@ -140,8 +140,8 @@ impl Processor for FuseTableSink { if let Some(v) = &self.cluster_key_info { cluster_stats = BlockStatistics::clusters_statistics( v.cluster_key_id, - v.cluster_key_index.clone(), - block.clone(), + &v.cluster_key_index, + &block, )?; // Remove unused columns before serialize diff --git a/query/src/storages/fuse/operations/mod.rs b/query/src/storages/fuse/operations/mod.rs index defd172143022..77a738ec1516d 100644 --- a/query/src/storages/fuse/operations/mod.rs +++ b/query/src/storages/fuse/operations/mod.rs @@ -14,14 +14,19 @@ mod append; mod commit; +mod delete; mod fuse_sink; mod gc; +mod mutation; mod navigate; mod operation_log; mod read; mod read_partitions; mod truncate; +pub mod util; + pub use fuse_sink::FuseTableSink; pub use operation_log::AppendOperationLogEntry; pub use operation_log::TableOperationLog; +pub use util::column_metas; diff --git a/query/src/storages/fuse/operations/mutation/block_filter.rs b/query/src/storages/fuse/operations/mutation/block_filter.rs new file mode 100644 index 0000000000000..a8a42cdc70d59 --- /dev/null +++ b/query/src/storages/fuse/operations/mutation/block_filter.rs @@ -0,0 +1,101 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_datablocks::DataBlock; +use common_datavalues::DataSchemaRefExt; +use common_exception::Result; +use common_planners::Expression; + +use crate::pipelines::transforms::ExpressionExecutor; +use crate::sessions::QueryContext; +use crate::storages::fuse::meta::BlockMeta; +use crate::storages::fuse::operations::mutation::mutations_collector::Deletion; +use crate::storages::fuse::FuseTable; + +pub async fn delete_from_block( + table: &FuseTable, + block_meta: &BlockMeta, + ctx: &Arc, + filter_column_ids: Vec, + filter_expr: &Expression, +) -> Result { + let mut filtering_whole_block = false; + + // extract the columns that are going to be filtered on + let col_ids = { + if filter_column_ids.is_empty() { + filtering_whole_block = true; + // To be optimized (in `interpreter_delete`, if we adhere the style of interpreter_select) + // In this case, the expr being evaluated is unrelated to the value of rows: + // - if the `filter_expr` is of "constant" function: + // for the whole block, whether all of the rows should be kept or dropped + // - but, the expr may NOT be deterministic, e.g. + // A nullary non-constant func, which returns true, false or NULL randomly + all_the_columns_ids(table) + } else { + filter_column_ids + } + }; + // read the cols that we are going to filtering on + let reader = table.create_block_reader(ctx, col_ids)?; + let data_block = reader.read_with_block_meta(block_meta).await?; + + // inverse the expr + let inverse_expr = Expression::UnaryExpression { + op: "not".to_string(), + expr: Box::new(filter_expr.clone()), + }; + + let schema = table.table_info.schema(); + let expr_field = inverse_expr.to_data_field(&schema)?; + let expr_schema = DataSchemaRefExt::create(vec![expr_field]); + + let expr_exec = ExpressionExecutor::try_create( + ctx.clone(), + "filter expression executor (delete) ", + schema.clone(), + expr_schema, + vec![inverse_expr], + false, + )?; + + // get the single col data block, which indicates the rows should be kept/removed + let filter_result = expr_exec.execute(&data_block)?; + + // read the whole block + let whole_block = if filtering_whole_block { + data_block + } else { + let whole_table_proj = all_the_columns_ids(table); + let whole_block_reader = table.create_block_reader(ctx, whole_table_proj)?; + whole_block_reader.read_with_block_meta(block_meta).await? + }; + + // returns the data remains after deletion + let data_block = DataBlock::filter_block(whole_block, filter_result.column(0))?; + let res = if data_block.num_rows() == block_meta.row_count as usize { + Deletion::NothingDeleted + } else { + Deletion::Remains(data_block) + }; + Ok(res) +} + +fn all_the_columns_ids(table: &FuseTable) -> Vec { + (0..table.table_info.schema().fields().len()) + .into_iter() + .collect::>() +} diff --git a/query/src/storages/fuse/operations/mutation/mod.rs b/query/src/storages/fuse/operations/mutation/mod.rs new file mode 100644 index 0000000000000..d36ac3e51d49a --- /dev/null +++ b/query/src/storages/fuse/operations/mutation/mod.rs @@ -0,0 +1,18 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod block_filter; +pub mod mutations_collector; + +pub use block_filter::delete_from_block; diff --git a/query/src/storages/fuse/operations/mutation/mutations_collector.rs b/query/src/storages/fuse/operations/mutation/mutations_collector.rs new file mode 100644 index 0000000000000..b42b84d0d2f15 --- /dev/null +++ b/query/src/storages/fuse/operations/mutation/mutations_collector.rs @@ -0,0 +1,175 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use common_datablocks::DataBlock; +use common_exception::ErrorCode; +use common_exception::Result; +use opendal::Operator; + +use crate::sessions::QueryContext; +use crate::storages::fuse::io::BlockWriter; +use crate::storages::fuse::io::MetaReaders; +use crate::storages::fuse::io::SegmentWriter; +use crate::storages::fuse::io::TableMetaLocationGenerator; +use crate::storages::fuse::meta::BlockMeta; +use crate::storages::fuse::meta::Location; +use crate::storages::fuse::meta::SegmentInfo; +use crate::storages::fuse::meta::TableSnapshot; +use crate::storages::fuse::statistics::reducers::reduce_block_metas; +use crate::storages::fuse::statistics::reducers::reduce_statistics; + +pub enum Deletion { + NothingDeleted, + Remains(DataBlock), +} + +pub struct Replacement { + original_block_loc: Location, + new_block_meta: Option, +} + +pub type SegmentIndex = usize; + +pub struct DeletionCollector<'a> { + mutations: HashMap>, + ctx: &'a QueryContext, + location_generator: &'a TableMetaLocationGenerator, + base_snapshot: &'a TableSnapshot, + data_accessor: Operator, +} + +impl<'a> DeletionCollector<'a> { + pub fn try_create( + ctx: &'a QueryContext, + location_generator: &'a TableMetaLocationGenerator, + base_snapshot: &'a TableSnapshot, + ) -> Result { + let data_accessor = ctx.get_storage_operator()?; + Ok(Self { + mutations: HashMap::new(), + ctx, + location_generator, + base_snapshot, + data_accessor, + }) + } + + pub async fn into_new_snapshot(self) -> Result<(TableSnapshot, String)> { + let snapshot = self.base_snapshot; + let mut new_snapshot = TableSnapshot::from_previous(snapshot); + let segment_reader = MetaReaders::segment_info_reader(self.ctx); + + let segment_info_cache = self + .ctx + .get_storage_cache_manager() + .get_table_segment_cache(); + let seg_writer = SegmentWriter::new( + &self.data_accessor, + self.location_generator, + &segment_info_cache, + ); + + for (seg_idx, replacements) in self.mutations { + let seg_loc = &snapshot.segments[seg_idx]; + let segment = segment_reader.read(&seg_loc.0, None, seg_loc.1).await?; + + let block_positions = segment + .blocks + .iter() + .enumerate() + .map(|(idx, meta)| (&meta.location, idx)) + .collect::>(); + + let mut new_segment = SegmentInfo::new(segment.blocks.clone(), segment.summary.clone()); + + for replacement in replacements { + let position = block_positions + .get(&replacement.original_block_loc) + .ok_or_else(|| { + ErrorCode::LogicalError(format!( + "block location not found {:?}", + &replacement.original_block_loc + )) + })?; + if let Some(block_meta) = replacement.new_block_meta { + new_segment.blocks[*position] = block_meta; + } else { + new_segment.blocks.remove(*position); + } + } + + if new_segment.blocks.is_empty() { + // remove the segment if no blocks there + new_snapshot.segments.remove(seg_idx); + } else { + let new_summary = reduce_block_metas(&new_segment.blocks)?; + new_segment.summary = new_summary; + let new_segment_location = seg_writer.write_segment(new_segment).await?; + new_snapshot.segments[seg_idx] = new_segment_location; + } + } + + let mut new_segment_summaries = vec![]; + for (loc, ver) in &new_snapshot.segments { + let seg = segment_reader.read(loc, None, *ver).await?; + // only need the summary, drop the reference to segment ASAP + new_segment_summaries.push(seg.summary.clone()) + } + + // update the summary of new snapshot + let new_summary = reduce_statistics(&new_segment_summaries)?; + new_snapshot.summary = new_summary; + + // write the new segment out (and keep it in undo log) + let snapshot_loc = self.location_generator.snapshot_location_from_uuid( + &new_snapshot.snapshot_id, + new_snapshot.format_version(), + )?; + let bytes = serde_json::to_vec(&new_snapshot)?; + self.data_accessor + .object(&snapshot_loc) + .write(bytes) + .await?; + Ok((new_snapshot, snapshot_loc)) + } + + /// Replaces + /// the block located at `block_location` of segment indexed by `seg_idx` + /// With a new block `r` + pub async fn replace_with( + &mut self, + seg_idx: usize, + location_of_block_to_be_replaced: Location, + replace_with: DataBlock, + ) -> Result<()> { + // write new block, and keep the mutations + let new_block_meta = if replace_with.num_rows() == 0 { + None + } else { + let block_writer = BlockWriter::new(&self.data_accessor, self.location_generator); + Some(block_writer.write(replace_with).await?) + }; + let original_block_loc = location_of_block_to_be_replaced; + self.mutations + .entry(seg_idx) + .or_default() + .push(Replacement { + original_block_loc, + new_block_meta, + }); + Ok(()) + } +} diff --git a/query/src/storages/fuse/operations/read.rs b/query/src/storages/fuse/operations/read.rs index 9617f63bd8597..c8bdba343ddc4 100644 --- a/query/src/storages/fuse/operations/read.rs +++ b/query/src/storages/fuse/operations/read.rs @@ -46,7 +46,8 @@ impl FuseTable { ctx: Arc, push_downs: &Option, ) -> Result { - let block_reader = self.create_block_reader(&ctx, push_downs)?; + let block_reader = + self.create_block_reader(&ctx, self.projection_of_push_downs(push_downs))?; let iter = std::iter::from_fn(move || match ctx.clone().try_get_partitions(1) { Err(_) => None, @@ -67,12 +68,18 @@ impl FuseTable { Ok(Box::pin(stream)) } - fn create_block_reader( + pub fn create_block_reader( &self, ctx: &Arc, - push_downs: &Option, + projection: Vec, ) -> Result> { - let projection = if let Some(Extras { + let operator = ctx.get_storage_operator()?; + let table_schema = self.table_info.schema(); + BlockReader::create(operator, table_schema, projection) + } + + pub fn projection_of_push_downs(&self, push_downs: &Option) -> Vec { + if let Some(Extras { projection: Some(prj), .. }) = push_downs @@ -82,11 +89,7 @@ impl FuseTable { (0..self.table_info.schema().fields().len()) .into_iter() .collect::>() - }; - - let operator = ctx.get_storage_operator()?; - let table_schema = self.table_info.schema(); - BlockReader::create(operator, table_schema, projection) + } } #[inline] @@ -96,7 +99,8 @@ impl FuseTable { plan: &ReadDataSourcePlan, pipeline: &mut NewPipeline, ) -> Result<()> { - let block_reader = self.create_block_reader(&ctx, &plan.push_downs)?; + let block_reader = + self.create_block_reader(&ctx, self.projection_of_push_downs(&plan.push_downs))?; let parts_len = plan.parts.len(); let max_threads = ctx.get_settings().get_max_threads()? as usize; diff --git a/query/src/storages/fuse/operations/read_partitions.rs b/query/src/storages/fuse/operations/read_partitions.rs index 901ec8664b86f..bb62dc1231836 100644 --- a/query/src/storages/fuse/operations/read_partitions.rs +++ b/query/src/storages/fuse/operations/read_partitions.rs @@ -46,7 +46,10 @@ impl FuseTable { let schema = self.table_info.schema(); let block_metas = BlockPruner::new(snapshot.clone()) .apply(ctx.as_ref(), schema, &push_downs) - .await?; + .await? + .into_iter() + .map(|(_, v)| v) + .collect::>(); let partitions_scanned = block_metas.len(); let partitions_total = snapshot.summary.block_count as usize; @@ -96,7 +99,6 @@ impl FuseTable { fn is_exact(push_downs: &Option) -> bool { match push_downs { None => true, - // We don't have limit push down in parquet reader Some(extra) => extra.filters.is_empty(), } } @@ -191,7 +193,7 @@ impl FuseTable { format_version, rows_count, columns_meta, - meta.compression, + meta.compression(), ) } @@ -210,12 +212,15 @@ impl FuseTable { let rows_count = meta.row_count; let location = meta.location.0.clone(); let format_version = meta.location.1; + // TODO + // row_count should be a hint value of LIMIT, + // not the count the rows in this partition FusePartInfo::create( location, format_version, rows_count, columns_meta, - meta.compression, + meta.compression(), ) } diff --git a/query/src/storages/fuse/operations/truncate.rs b/query/src/storages/fuse/operations/truncate.rs index 8604f3480f77e..af6fbb6bc0623 100644 --- a/query/src/storages/fuse/operations/truncate.rs +++ b/query/src/storages/fuse/operations/truncate.rs @@ -19,7 +19,6 @@ use common_exception::Result; use common_meta_app::schema::TableStatistics; use common_meta_app::schema::UpdateTableMetaReq; use common_meta_types::MatchSeq; -use common_planners::TruncateTablePlan; use uuid::Uuid; use crate::sessions::QueryContext; @@ -30,7 +29,12 @@ use crate::storages::fuse::FuseTable; impl FuseTable { #[inline] - pub async fn do_truncate(&self, ctx: Arc, plan: TruncateTablePlan) -> Result<()> { + pub async fn do_truncate( + &self, + ctx: Arc, + purge: bool, + catalog_name: &str, + ) -> Result<()> { if let Some(prev_snapshot) = self.read_table_snapshot(ctx.as_ref()).await? { let prev_id = prev_snapshot.snapshot_id; @@ -50,7 +54,7 @@ impl FuseTable { let bytes = serde_json::to_vec(&new_snapshot)?; operator.object(&new_snapshot_loc).write(bytes).await?; - if plan.purge { + if purge { let keep_last_snapshot = false; self.do_gc(&ctx, keep_last_snapshot).await? } @@ -66,7 +70,7 @@ impl FuseTable { let table_id = self.table_info.ident.table_id; let table_version = self.table_info.ident.seq; - ctx.get_catalog(&plan.catalog)? + ctx.get_catalog(catalog_name)? .update_table_meta(UpdateTableMetaReq { table_id, seq: MatchSeq::Exact(table_version), diff --git a/query/src/storages/fuse/operations/util.rs b/query/src/storages/fuse/operations/util.rs new file mode 100644 index 0000000000000..90f55d122328e --- /dev/null +++ b/query/src/storages/fuse/operations/util.rs @@ -0,0 +1,65 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use common_arrow::parquet::FileMetaData; +use common_exception::ErrorCode; +use common_exception::Result; + +use crate::storages::fuse::meta::ColumnId; +use crate::storages::fuse::meta::ColumnMeta; + +pub fn column_metas(file_meta: &FileMetaData) -> Result> { + // currently we use one group only + let num_row_groups = file_meta.row_groups.len(); + if num_row_groups != 1 { + return Err(ErrorCode::ParquetError(format!( + "invalid parquet file, expects only one row group, but got {}", + num_row_groups + ))); + } + let row_group = &file_meta.row_groups[0]; + let mut col_metas = HashMap::with_capacity(row_group.columns.len()); + for (idx, col_chunk) in row_group.columns.iter().enumerate() { + match &col_chunk.meta_data { + Some(chunk_meta) => { + let col_start = if let Some(dict_page_offset) = chunk_meta.dictionary_page_offset { + dict_page_offset + } else { + chunk_meta.data_page_offset + }; + let col_len = chunk_meta.total_compressed_size; + assert!( + col_start >= 0 && col_len >= 0, + "column start and length should not be negative" + ); + let num_values = chunk_meta.num_values as u64; + let res = ColumnMeta { + offset: col_start as u64, + len: col_len as u64, + num_values, + }; + col_metas.insert(idx as u32, res); + } + None => { + return Err(ErrorCode::ParquetError(format!( + "invalid parquet file, meta data of column idx {} is empty", + idx + ))) + } + } + } + Ok(col_metas) +} diff --git a/query/src/storages/fuse/pruning/block_pruner.rs b/query/src/storages/fuse/pruning/block_pruner.rs index 1c6a3b72fd626..aa7fc8b369e61 100644 --- a/query/src/storages/fuse/pruning/block_pruner.rs +++ b/query/src/storages/fuse/pruning/block_pruner.rs @@ -18,6 +18,7 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use common_datavalues::DataSchemaRef; +use common_exception::ErrorCode; use common_exception::Result; use common_planners::Extras; use common_tracing::tracing; @@ -29,14 +30,14 @@ use crate::storages::fuse::io::MetaReaders; use crate::storages::fuse::meta::BlockMeta; use crate::storages::fuse::meta::SegmentInfo; use crate::storages::fuse::meta::TableSnapshot; -use crate::storages::index::ColumnsStatistics; use crate::storages::index::RangeFilter; +use crate::storages::index::StatisticsOfColumns; pub struct BlockPruner { table_snapshot: Arc, } -type Pred = Box Result + Send + Sync + Unpin>; +type Pred = Box Result + Send + Sync + Unpin>; impl BlockPruner { pub fn new(table_snapshot: Arc) -> Self { Self { table_snapshot } @@ -48,15 +49,15 @@ impl BlockPruner { ctx: &QueryContext, schema: DataSchemaRef, push_down: &Option, - ) -> Result> { + ) -> Result> { let block_pred: Pred = match push_down { Some(exprs) if !exprs.filters.is_empty() => { // for the time being, we only handle the first expr - let verifiable_expression = + let range_filter = RangeFilter::try_create(Arc::new(ctx.clone()), &exprs.filters[0], schema)?; - Box::new(move |v: &ColumnsStatistics| verifiable_expression.eval(v)) + Box::new(move |v: &StatisticsOfColumns| range_filter.eval(v)) } - _ => Box::new(|_: &ColumnsStatistics| Ok(true)), + _ => Box::new(|_: &StatisticsOfColumns| Ok(true)), }; let segment_locs = self.table_snapshot.segments.clone(); @@ -79,24 +80,33 @@ impl BlockPruner { let accumulated_rows = AtomicUsize::new(0); // A !Copy Wrapper of u64 - struct NonCopy(u64); + struct NonCopy(T); // convert u64 (which is Copy) into NonCopy( struct which is !Copy) // so that "async move" can be avoided in the latter async block // See https://github.com/rust-lang/rust/issues/81653 - let segment_locs = segment_locs.into_iter().map(|(s, v)| (s, NonCopy(v))); + let segment_locs = segment_locs + .into_iter() + .enumerate() + .map(|(idx, (loc, ver))| (NonCopy(idx), (loc, NonCopy(ver)))); let stream = futures::stream::iter(segment_locs) - .map(|(seg_loc, u)| async { + .map(|(idx, (seg_loc, u))| async { let version = { u }.0; // use block expression to force moving + let idx = { idx }.0; if accumulated_rows.load(Ordering::Acquire) < limit { let reader = MetaReaders::segment_info_reader(ctx); let segment_info = reader.read(seg_loc, None, version).await?; - Self::filter_segment( - segment_info.as_ref(), - &block_pred, - &accumulated_rows, - limit, + Ok::<_, ErrorCode>( + Self::filter_segment( + segment_info.as_ref(), + &block_pred, + &accumulated_rows, + limit, + )? + .into_iter() + .map(|v| (idx, v)) + .collect::>(), ) } else { Ok(vec![]) diff --git a/query/src/storages/fuse/statistics/accumulator.rs b/query/src/storages/fuse/statistics/accumulator.rs index beb3acf0ec15b..83a2194d4e8aa 100644 --- a/query/src/storages/fuse/statistics/accumulator.rs +++ b/query/src/storages/fuse/statistics/accumulator.rs @@ -18,23 +18,22 @@ use std::collections::HashMap; use common_arrow::parquet::FileMetaData; use common_datablocks::DataBlock; use common_datavalues::prelude::*; -use common_exception::ErrorCode; use common_exception::Result; use common_functions::aggregates::eval_aggr; use crate::storages::fuse::meta::BlockMeta; use crate::storages::fuse::meta::ColumnId; use crate::storages::fuse::meta::ColumnMeta; -use crate::storages::fuse::meta::Compression; use crate::storages::fuse::meta::Versioned; +use crate::storages::fuse::operations::column_metas; use crate::storages::index::ClusterStatistics; use crate::storages::index::ColumnStatistics; -use crate::storages::index::ColumnsStatistics; +use crate::storages::index::StatisticsOfColumns; #[derive(Default)] pub struct StatisticsAccumulator { pub blocks_metas: Vec, - pub blocks_statistics: Vec, + pub blocks_statistics: Vec, pub summary_row_count: u64, pub summary_block_count: u64, pub in_memory_size: u64, @@ -57,7 +56,7 @@ impl StatisticsAccumulator { self.summary_block_count += 1; self.summary_row_count += row_count; self.in_memory_size += block_in_memory_size; - let block_stats = Self::acc_columns(block)?; + let block_stats = columns_statistics(block)?; self.blocks_statistics.push(block_stats.clone()); Ok(PartiallyAccumulated { accumulator: self, @@ -81,107 +80,28 @@ impl StatisticsAccumulator { self.blocks_statistics .push(statistics.block_column_statistics.clone()); - self.blocks_metas.push(BlockMeta { + let row_count = statistics.block_rows_size; + let block_size = statistics.block_bytes_size; + let col_stats = statistics.block_column_statistics.clone(); + let location = (statistics.block_file_location, DataBlock::VERSION); + let col_metas = column_metas(&meta)?; + let cluster_stats = statistics.block_cluster_statistics; + + self.blocks_metas.push(BlockMeta::new( + row_count, + block_size, file_size, - compression: Compression::Lz4Raw, - row_count: statistics.block_rows_size, - block_size: statistics.block_bytes_size, - col_stats: statistics.block_column_statistics.clone(), - location: (statistics.block_file_location, DataBlock::VERSION), - col_metas: Self::column_metas(&meta)?, - cluster_stats: statistics.block_cluster_statistics, - }); + col_stats, + col_metas, + cluster_stats, + location, + )); Ok(()) } - fn column_metas(file_meta: &FileMetaData) -> Result> { - // currently we use one group only - let num_row_groups = file_meta.row_groups.len(); - if num_row_groups != 1 { - return Err(ErrorCode::ParquetError(format!( - "invalid parquet file, expects only one row group, but got {}", - num_row_groups - ))); - } - let row_group = &file_meta.row_groups[0]; - let mut col_metas = HashMap::with_capacity(row_group.columns.len()); - for (idx, col_chunk) in row_group.columns.iter().enumerate() { - match &col_chunk.meta_data { - Some(chunk_meta) => { - let col_start = - if let Some(dict_page_offset) = chunk_meta.dictionary_page_offset { - dict_page_offset - } else { - chunk_meta.data_page_offset - }; - let col_len = chunk_meta.total_compressed_size; - assert!( - col_start >= 0 && col_len >= 0, - "column start and length should not be negative" - ); - let num_values = chunk_meta.num_values as u64; - let res = ColumnMeta { - offset: col_start as u64, - len: col_len as u64, - num_values, - }; - col_metas.insert(idx as u32, res); - } - None => { - return Err(ErrorCode::ParquetError(format!( - "invalid parquet file, meta data of column idx {} is empty", - idx - ))); - } - } - } - Ok(col_metas) - } - - pub fn summary(&self) -> Result { - super::reduce_block_stats(&self.blocks_statistics) - } - - pub fn acc_columns(data_block: &DataBlock) -> common_exception::Result { - let mut statistics = ColumnsStatistics::new(); - - let rows = data_block.num_rows(); - for idx in 0..data_block.num_columns() { - let col = data_block.column(idx); - let field = data_block.schema().field(idx); - let column_field = ColumnWithField::new(col.clone(), field.clone()); - - let mut min = DataValue::Null; - let mut max = DataValue::Null; - - let mins = eval_aggr("min", vec![], &[column_field.clone()], rows)?; - let maxs = eval_aggr("max", vec![], &[column_field], rows)?; - - if mins.len() > 0 { - min = mins.get(0); - } - if maxs.len() > 0 { - max = maxs.get(0); - } - let (is_all_null, bitmap) = col.validity(); - let null_count = match (is_all_null, bitmap) { - (true, _) => rows, - (false, Some(bitmap)) => bitmap.null_count(), - (false, None) => 0, - }; - - let in_memory_size = col.memory_size() as u64; - let col_stats = ColumnStatistics { - min, - max, - null_count: null_count as u64, - in_memory_size, - }; - - statistics.insert(idx as u32, col_stats); - } - Ok(statistics) + pub fn summary(&self) -> Result { + super::reduce_block_statistics(&self.blocks_statistics) } } @@ -202,16 +122,22 @@ impl PartiallyAccumulated { ) -> StatisticsAccumulator { let mut stats = &mut self.accumulator; stats.file_size += file_size; - let block_meta = BlockMeta { - row_count: self.block_row_count, - block_size: self.block_size, + + let row_count = self.block_row_count; + let block_size = self.block_size; + let col_stats = self.block_columns_statistics; + let cluster_stats = self.block_cluster_statistics; + let location = (location, DataBlock::VERSION); + + let block_meta = BlockMeta::new( + row_count, + block_size, file_size, - col_stats: self.block_columns_statistics, + col_stats, col_metas, - cluster_stats: self.block_cluster_statistics, - location: (location, DataBlock::VERSION), - compression: Compression::Lz4Raw, - }; + cluster_stats, + location, + ); stats.blocks_metas.push(block_meta); self.accumulator } @@ -235,56 +161,15 @@ impl BlockStatistics { block_file_location: location, block_rows_size: data_block.num_rows() as u64, block_bytes_size: data_block.memory_size() as u64, - block_column_statistics: Self::columns_statistics(data_block)?, + block_column_statistics: columns_statistics(data_block)?, block_cluster_statistics: cluster_stats, }) } - pub fn columns_statistics(data_block: &DataBlock) -> Result { - let mut statistics = ColumnsStatistics::new(); - - let rows = data_block.num_rows(); - for idx in 0..data_block.num_columns() { - let col = data_block.column(idx); - let field = data_block.schema().field(idx); - let column_field = ColumnWithField::new(col.clone(), field.clone()); - - let mut min = DataValue::Null; - let mut max = DataValue::Null; - - let mins = eval_aggr("min", vec![], &[column_field.clone()], rows)?; - let maxs = eval_aggr("max", vec![], &[column_field], rows)?; - - if mins.len() > 0 { - min = mins.get(0); - } - if maxs.len() > 0 { - max = maxs.get(0); - } - let (is_all_null, bitmap) = col.validity(); - let null_count = match (is_all_null, bitmap) { - (true, _) => rows, - (false, Some(bitmap)) => bitmap.null_count(), - (false, None) => 0, - }; - - let in_memory_size = col.memory_size() as u64; - let col_stats = ColumnStatistics { - min, - max, - null_count: null_count as u64, - in_memory_size, - }; - - statistics.insert(idx as u32, col_stats); - } - Ok(statistics) - } - pub fn clusters_statistics( cluster_key_id: u32, - cluster_key_index: Vec, - block: DataBlock, + cluster_key_index: &[usize], + block: &DataBlock, ) -> Result> { if cluster_key_index.is_empty() { return Ok(None); @@ -322,3 +207,44 @@ impl BlockStatistics { })) } } + +pub fn columns_statistics(data_block: &DataBlock) -> Result { + let mut statistics = StatisticsOfColumns::new(); + + let rows = data_block.num_rows(); + for idx in 0..data_block.num_columns() { + let col = data_block.column(idx); + let field = data_block.schema().field(idx); + let column_field = ColumnWithField::new(col.clone(), field.clone()); + + let mut min = DataValue::Null; + let mut max = DataValue::Null; + + let mins = eval_aggr("min", vec![], &[column_field.clone()], rows)?; + let maxs = eval_aggr("max", vec![], &[column_field], rows)?; + + if mins.len() > 0 { + min = mins.get(0); + } + if maxs.len() > 0 { + max = maxs.get(0); + } + let (is_all_null, bitmap) = col.validity(); + let null_count = match (is_all_null, bitmap) { + (true, _) => rows, + (false, Some(bitmap)) => bitmap.null_count(), + (false, None) => 0, + }; + + let in_memory_size = col.memory_size() as u64; + let col_stats = ColumnStatistics { + min, + max, + null_count: null_count as u64, + in_memory_size, + }; + + statistics.insert(idx as u32, col_stats); + } + Ok(statistics) +} diff --git a/query/src/storages/fuse/statistics/mod.rs b/query/src/storages/fuse/statistics/mod.rs index 7b54746bf2cf9..348db253fee8c 100644 --- a/query/src/storages/fuse/statistics/mod.rs +++ b/query/src/storages/fuse/statistics/mod.rs @@ -18,4 +18,4 @@ pub mod reducers; pub use accumulator::PartiallyAccumulated; pub use accumulator::StatisticsAccumulator; pub use reducers::merge_statistics; -pub use reducers::reduce_block_stats; +pub use reducers::reduce_block_statistics; diff --git a/query/src/storages/fuse/statistics/reducers.rs b/query/src/storages/fuse/statistics/reducers.rs index e3a6e6e8c47f5..397badd8b394d 100644 --- a/query/src/storages/fuse/statistics/reducers.rs +++ b/query/src/storages/fuse/statistics/reducers.rs @@ -14,39 +14,39 @@ // use std::borrow::Borrow; -use std::collections::hash_map::Entry; use std::collections::HashMap; use common_datavalues::DataValue; use common_exception::Result; +use crate::storages::fuse::meta::BlockMeta; use crate::storages::fuse::meta::ColumnId; use crate::storages::fuse::meta::Statistics; use crate::storages::index::ColumnStatistics; -use crate::storages::index::ColumnsStatistics; +use crate::storages::index::StatisticsOfColumns; -pub fn reduce_block_stats>(stats: &[T]) -> Result { - let len = stats.len(); - - // transpose Vec> to HashMap<_, (_, Vec<_>)> +pub fn reduce_block_statistics>( + stats: &[T], +) -> Result { + // Combine statistics of a column into `Vec`, that is: + // from : `&[HashMap]` + // to : `HashMap)>` let col_stat_list = stats.iter().fold(HashMap::new(), |acc, item| { item.borrow().iter().fold( acc, |mut acc: HashMap>, (col_id, stats)| { - let entry = acc.entry(*col_id); - match entry { - Entry::Occupied(_) => { - entry.and_modify(|v| v.push(stats)); - } - Entry::Vacant(_) => { - entry.or_insert_with(|| vec![stats]); - } - } + acc.entry(*col_id) + .or_insert_with(|| vec![stats]) + .push(stats); acc }, ) }); + // Reduce the `Vec<&ColumnStatistics` into ColumnStatistics`, i.e.: + // from : `HashMap)>` + // to : `type BlockStatistics = HashMap` + let len = stats.len(); col_stat_list .iter() .try_fold(HashMap::with_capacity(len), |mut acc, (id, stats)| { @@ -56,8 +56,6 @@ pub fn reduce_block_stats>(stats: &[T]) -> Result>(stats: &[T]) -> Result Result { block_count: l.block_count + r.block_count, uncompressed_byte_size: l.uncompressed_byte_size + r.uncompressed_byte_size, compressed_byte_size: l.compressed_byte_size + r.compressed_byte_size, - col_stats: reduce_block_stats(&[&l.col_stats, &r.col_stats])?, + col_stats: reduce_block_statistics(&[&l.col_stats, &r.col_stats])?, }; Ok(s) } + +pub fn reduce_statistics>(stats: &[T]) -> Result { + let mut statistics = Statistics::default(); + for item in stats { + statistics = merge_statistics(&statistics, item.borrow())? + } + Ok(statistics) +} + +pub fn reduce_block_metas>(block_metas: &[T]) -> Result { + let mut row_count: u64 = 0; + let mut block_count: u64 = 0; + let mut uncompressed_byte_size: u64 = 0; + let mut compressed_byte_size: u64 = 0; + + block_metas.iter().for_each(|b| { + let b = b.borrow(); + row_count += b.row_count; + block_count += 1; + uncompressed_byte_size += b.block_size; + compressed_byte_size += b.file_size; + }); + + let stats = block_metas + .iter() + .map(|v| &v.borrow().col_stats) + .collect::>(); + let merged_col_stats = reduce_block_statistics(&stats)?; + + Ok(Statistics { + row_count, + block_count, + uncompressed_byte_size, + compressed_byte_size, + col_stats: merged_col_stats, + }) +} diff --git a/query/src/storages/hive/hive_parquet_block_reader.rs b/query/src/storages/hive/hive_parquet_block_reader.rs index 23b583eb25f8a..40f862af9bdb6 100644 --- a/query/src/storages/hive/hive_parquet_block_reader.rs +++ b/query/src/storages/hive/hive_parquet_block_reader.rs @@ -56,7 +56,7 @@ impl HiveParquetBlockReader { schema: DataSchemaRef, projection: Vec, ) -> Result> { - let projected_schema = DataSchemaRef::new(schema.project(projection.clone())); + let projected_schema = DataSchemaRef::new(schema.project(&projection)); let arrow_schema = schema.to_arrow(); let parquet_schema_descriptor = to_parquet_schema(&arrow_schema)?; diff --git a/query/src/storages/index/mod.rs b/query/src/storages/index/mod.rs index 024d36ca33755..6c7241a5ff2e8 100644 --- a/query/src/storages/index/mod.rs +++ b/query/src/storages/index/mod.rs @@ -26,8 +26,8 @@ pub use index_sparse::SparseIndexValue; pub use range_filter::ClusterKeyInfo; pub use range_filter::ClusterStatistics; pub use range_filter::ColumnStatistics; -pub use range_filter::ColumnsStatistics; pub use range_filter::RangeFilter; +pub use range_filter::StatisticsOfColumns; #[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)] pub enum IndexSchemaVersion { diff --git a/query/src/storages/index/range_filter.rs b/query/src/storages/index/range_filter.rs index 67b0fe717d09d..ab9dd8ba72d51 100644 --- a/query/src/storages/index/range_filter.rs +++ b/query/src/storages/index/range_filter.rs @@ -33,7 +33,7 @@ use common_planners::RequireColumnsVisitor; use crate::pipelines::transforms::ExpressionExecutor; use crate::sessions::QueryContext; -pub type ColumnsStatistics = HashMap; +pub type StatisticsOfColumns = HashMap; #[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] pub struct ColumnStatistics { @@ -105,7 +105,7 @@ impl RangeFilter { }) } - pub fn eval(&self, stats: &ColumnsStatistics) -> Result { + pub fn eval(&self, stats: &StatisticsOfColumns) -> Result { let mut columns = Vec::with_capacity(self.stat_columns.len()); for col in self.stat_columns.iter() { let val_opt = col.apply_stat_value(stats, self.origin.clone())?; @@ -226,7 +226,7 @@ impl StatColumn { fn apply_stat_value( &self, - stats: &ColumnsStatistics, + stats: &StatisticsOfColumns, schema: DataSchemaRef, ) -> Result> { if self.stat_type == StatType::Nulls { diff --git a/query/src/storages/memory/memory_table.rs b/query/src/storages/memory/memory_table.rs index 821fa8b6ea575..1c87d04bdc579 100644 --- a/query/src/storages/memory/memory_table.rs +++ b/query/src/storages/memory/memory_table.rs @@ -191,7 +191,7 @@ impl Table for MemoryTable { let blocks = match push_downs { Some(push_downs) => match &push_downs.projection { Some(prj) => { - let pruned_schema = Arc::new(self.table_info.schema().project(prj.clone())); + let pruned_schema = Arc::new(self.table_info.schema().project(prj)); let mut pruned_blocks = Vec::with_capacity(raw_blocks.len()); for raw_block in raw_blocks { @@ -317,7 +317,7 @@ impl MemoryTableSource { fn projection(&self, data_block: DataBlock) -> Result> { if let Some(extras) = &self.extras { if let Some(projection) = &extras.projection { - let pruned_schema = data_block.schema().project(projection.clone()); + let pruned_schema = data_block.schema().project(projection); let raw_columns = data_block.columns(); let columns = projection .iter() diff --git a/query/src/storages/random/random_table.rs b/query/src/storages/random/random_table.rs index aa29ebf69d5f1..2798d302c2ceb 100644 --- a/query/src/storages/random/random_table.rs +++ b/query/src/storages/random/random_table.rs @@ -100,7 +100,7 @@ impl Table for RandomTable { let mut schema = self.schema(); if let Some(projection) = push_downs.projection { // do projection on schema - schema = Arc::new(schema.project(projection)); + schema = Arc::new(schema.project(&projection)); } let limit = match push_downs.limit { Some(limit) => limit, @@ -149,7 +149,7 @@ impl Table for RandomTable { if let Some(extras) = push_downs { if let Some(projection) = extras.projection { // do projection on schema - output_schema = Arc::new(output_schema.project(projection)); + output_schema = Arc::new(output_schema.project(&projection)); } } diff --git a/query/src/storages/storage_table.rs b/query/src/storages/storage_table.rs index dc1f4df13fc2d..94d10f48eca0a 100644 --- a/query/src/storages/storage_table.rs +++ b/query/src/storages/storage_table.rs @@ -24,6 +24,7 @@ use common_exception::ErrorCode; use common_exception::Result; use common_meta_app::schema::TableInfo; use common_meta_types::MetaId; +use common_planners::DeletePlan; use common_planners::Expression; use common_planners::Extras; use common_planners::Partitions; @@ -188,13 +189,22 @@ pub trait Table: Sync + Send { _instant: &NavigationPoint, ) -> Result> { Err(ErrorCode::UnImplement(format!( - "table {}, of engine type {}, do not support time travel", + "table {}, of engine type {}, does not support time travel", + self.name(), + self.get_table_info().engine(), + ))) + } + + async fn delete(&self, _ctx: Arc, _delete_plan: DeletePlan) -> Result<()> { + Err(ErrorCode::UnImplement(format!( + "table {}, of engine type {}, does not support DELETE FROM", self.name(), self.get_table_info().engine(), ))) } } +#[derive(Debug)] pub enum NavigationPoint { SnapshotID(String), TimePoint(DateTime), diff --git a/query/tests/it/storages/fuse/io.rs b/query/tests/it/storages/fuse/io.rs index 60a4d2cc6c2c5..7765fd51a6b99 100644 --- a/query/tests/it/storages/fuse/io.rs +++ b/query/tests/it/storages/fuse/io.rs @@ -61,7 +61,7 @@ async fn test_fuse_table_block_appender() -> Result<()> { locs.clone(), None, ) - .await + .await? .collect::>() .await; @@ -88,7 +88,7 @@ async fn test_fuse_table_block_appender() -> Result<()> { locs.clone(), None, ) - .await + .await? .collect::>() .await; @@ -108,7 +108,7 @@ async fn test_fuse_table_block_appender() -> Result<()> { locs, None, ) - .await + .await? .collect::>() .await; @@ -277,7 +277,7 @@ async fn test_block_stream_writer() -> Result<()> { locs, None, ) - .await; + .await?; let segs = stream.try_collect::>().await?; // verify the number of blocks @@ -442,9 +442,8 @@ async fn test_block_writer_retry() -> Result<()> { }); let mock = Arc::new(Mock::with_exception(errors)); let op = Operator::new(mock.clone()); - let schema = common_arrow::arrow::datatypes::Schema::default(); let block = DataBlock::empty(); - let r = write_block(&schema, block, op, "loc").await; + let r = write_block(block, &op, "loc").await; assert!(r.is_err()); let e = r.unwrap_err(); assert_eq!(ErrorCode::storage_other_code(), e.code()); diff --git a/query/tests/it/storages/fuse/operations/navigate.rs b/query/tests/it/storages/fuse/operations/navigate.rs index 2aa8c2b07f0dd..68caaf329e819 100644 --- a/query/tests/it/storages/fuse/operations/navigate.rs +++ b/query/tests/it/storages/fuse/operations/navigate.rs @@ -91,7 +91,6 @@ async fn test_fuse_navigate() -> Result<()> { // 3. there should be two snapshots assert_eq!(2, snapshots.len()); - eprintln!("snapshots {:?}", &snapshots); // 4. navigate to the first snapshot // history is order by timestamp DESC diff --git a/query/tests/it/storages/fuse/operations/read_plan.rs b/query/tests/it/storages/fuse/operations/read_plan.rs index 80c45dbc71de1..712995d7db4ab 100644 --- a/query/tests/it/storages/fuse/operations/read_plan.rs +++ b/query/tests/it/storages/fuse/operations/read_plan.rs @@ -24,7 +24,6 @@ use databend_query::catalogs::CATALOG_DEFAULT; use databend_query::interpreters::CreateTableInterpreter; use databend_query::storages::fuse::meta::BlockMeta; use databend_query::storages::fuse::meta::ColumnMeta; -use databend_query::storages::fuse::meta::Compression; use databend_query::storages::fuse::FuseTable; use databend_query::storages::index::ColumnStatistics; use futures::TryStreamExt; @@ -60,19 +59,21 @@ fn test_to_partitions() -> Result<()> { .map(|col_id| (col_id as u32, col_metas_gen())) .collect::>(); - let block_meta = BlockMeta { - row_count: 0, - block_size: cols_stats - .iter() - .map(|(_, col_stats)| col_stats.in_memory_size) - .sum(), - file_size: 0, - col_stats: cols_stats.clone(), - col_metas: cols_metas, - cluster_stats: None, - location: ("".to_owned(), 0), - compression: Compression::Lz4Raw, - }; + let cluster_stats = None; + let location = ("".to_owned(), 0); + let block_size = cols_stats + .iter() + .map(|(_, col_stats)| col_stats.in_memory_size) + .sum(); + let block_meta = BlockMeta::new( + 0, + block_size, + 0, + cols_stats.clone(), + cols_metas, + cluster_stats, + location, + ); let blocks_metas = (0..num_of_block) .into_iter() diff --git a/query/tests/it/storages/fuse/pruning.rs b/query/tests/it/storages/fuse/pruning.rs index e8fc62818cff7..77ccf9d4f50ff 100644 --- a/query/tests/it/storages/fuse/pruning.rs +++ b/query/tests/it/storages/fuse/pruning.rs @@ -50,6 +50,7 @@ async fn apply_block_pruning( BlockPruner::new(table_snapshot) .apply(ctx.as_ref(), schema, push_down) .await + .map(|v| v.into_iter().map(|(_, v)| v).collect()) } #[tokio::test] diff --git a/query/tests/it/storages/fuse/statistics.rs b/query/tests/it/storages/fuse/statistics.rs index 56d4f5b80f22b..6eab184b7c5ec 100644 --- a/query/tests/it/storages/fuse/statistics.rs +++ b/query/tests/it/storages/fuse/statistics.rs @@ -19,7 +19,6 @@ use common_datablocks::DataBlock; use common_datavalues::prelude::*; use databend_query::storages::fuse::statistics::accumulator; use databend_query::storages::fuse::statistics::reducers; -use databend_query::storages::fuse::statistics::StatisticsAccumulator; use crate::storages::fuse::statistics::accumulator::BlockStatistics; use crate::storages::fuse::table_test_fixture::TestFixture; @@ -28,7 +27,7 @@ use crate::storages::fuse::table_test_fixture::TestFixture; fn test_ft_stats_block_stats() -> common_exception::Result<()> { let schema = DataSchemaRefExt::create(vec![DataField::new("a", i32::to_data_type())]); let block = DataBlock::create(schema, vec![Series::from_data(vec![1, 2, 3])]); - let r = StatisticsAccumulator::acc_columns(&block)?; + let r = accumulator::columns_statistics(&block)?; assert_eq!(1, r.len()); let col_stats = r.get(&0).unwrap(); assert_eq!(col_stats.min, DataValue::Int64(1)); @@ -45,9 +44,9 @@ fn test_ft_stats_col_stats_reduce() -> common_exception::Result<()> { let blocks = TestFixture::gen_sample_blocks_ex(num_of_blocks, rows_per_block, val_start_with); let col_stats = blocks .iter() - .map(|b| StatisticsAccumulator::acc_columns(&b.clone().unwrap())) + .map(|b| accumulator::columns_statistics(&b.clone().unwrap())) .collect::>>()?; - let r = reducers::reduce_block_stats(&col_stats); + let r = reducers::reduce_block_statistics(&col_stats); assert!(r.is_ok()); let r = r.unwrap(); assert_eq!(1, r.len()); @@ -81,13 +80,13 @@ fn test_ft_stats_cluster_stats() -> common_exception::Result<()> { Series::from_data(vec![1i32, 2, 3]), Series::from_data(vec!["123456", "234567", "345678"]), ]); - let stats = BlockStatistics::clusters_statistics(0, vec![0], blocks.clone())?; + let stats = BlockStatistics::clusters_statistics(0, &[0], &blocks)?; assert!(stats.is_some()); let stats = stats.unwrap(); assert_eq!(vec![DataValue::Int64(1)], stats.min); assert_eq!(vec![DataValue::Int64(3)], stats.max); - let stats = BlockStatistics::clusters_statistics(1, vec![1], blocks)?; + let stats = BlockStatistics::clusters_statistics(1, &[1], &blocks)?; assert!(stats.is_some()); let stats = stats.unwrap(); assert_eq!(vec![DataValue::String(b"12345".to_vec())], stats.min); diff --git a/query/tests/it/storages/index/range_filter.rs b/query/tests/it/storages/index/range_filter.rs index a45b9dc0ae1dc..dd040a2a9b9d0 100644 --- a/query/tests/it/storages/index/range_filter.rs +++ b/query/tests/it/storages/index/range_filter.rs @@ -21,8 +21,8 @@ use common_planners::*; use databend_query::storages::index::range_filter::build_verifiable_expr; use databend_query::storages::index::range_filter::left_bound_for_like_pattern; use databend_query::storages::index::range_filter::right_bound_for_like_pattern; -use databend_query::storages::index::range_filter::ColumnsStatistics; use databend_query::storages::index::range_filter::StatColumns; +use databend_query::storages::index::range_filter::StatisticsOfColumns; use databend_query::storages::index::ColumnStatistics; use databend_query::storages::index::RangeFilter; @@ -36,7 +36,7 @@ async fn test_range_filter() -> Result<()> { DataField::new("c", Vu8::to_data_type()), ]); - let mut stats: ColumnsStatistics = HashMap::new(); + let mut stats: StatisticsOfColumns = HashMap::new(); stats.insert(0u32, ColumnStatistics { min: DataValue::Int64(1), max: DataValue::Int64(20), diff --git a/tests/suites/0_stateless/03_dml/03_0025_delete_from.result b/tests/suites/0_stateless/03_dml/03_0025_delete_from.result new file mode 100644 index 0000000000000..23850c27d8b5e --- /dev/null +++ b/tests/suites/0_stateless/03_dml/03_0025_delete_from.result @@ -0,0 +1,9 @@ +selection not match, nothing deleted +1 +delete one rows +the row should be deleted +1 +other rows should be kept +1 +deleted unconditionally +1 diff --git a/tests/suites/0_stateless/03_dml/03_0025_delete_from.sql b/tests/suites/0_stateless/03_dml/03_0025_delete_from.sql new file mode 100644 index 0000000000000..9ea625c03b05c --- /dev/null +++ b/tests/suites/0_stateless/03_dml/03_0025_delete_from.sql @@ -0,0 +1,25 @@ +DROP DATABASE IF EXISTS db1; +CREATE DATABASE db1; +USE db1; + +-- setup +CREATE TABLE IF NOT EXISTS t(c1 Int, c2 Int ); +INSERT INTO t VALUES(1,2); +INSERT INTO t VALUES(3,4); + +select 'selection not match, nothing deleted'; +delete from t where c1 > 3; +select count(*) = 2 from t; + +select 'delete one rows'; +delete from t where c1 = 1; +select 'the row should be deleted'; +select count(*) = 0 from t where c1 = 1; +select 'other rows should be kept'; +select count(*) = 1 from t where c1 <> 1; + +select 'deleted unconditionally'; +delete from t; +select count(*) = 0 from t; + +DROP DATABASE db1; diff --git a/tests/suites/0_stateless/03_dml/03_0025_delete_from_v2.result b/tests/suites/0_stateless/03_dml/03_0025_delete_from_v2.result new file mode 100644 index 0000000000000..23850c27d8b5e --- /dev/null +++ b/tests/suites/0_stateless/03_dml/03_0025_delete_from_v2.result @@ -0,0 +1,9 @@ +selection not match, nothing deleted +1 +delete one rows +the row should be deleted +1 +other rows should be kept +1 +deleted unconditionally +1 diff --git a/tests/suites/0_stateless/03_dml/03_0025_delete_from_v2.sql b/tests/suites/0_stateless/03_dml/03_0025_delete_from_v2.sql new file mode 100644 index 0000000000000..ed8cc34b6c2f2 --- /dev/null +++ b/tests/suites/0_stateless/03_dml/03_0025_delete_from_v2.sql @@ -0,0 +1,27 @@ +set enable_planner_v2 = 1; + +DROP DATABASE IF EXISTS db1; +CREATE DATABASE db1; +USE db1; + +-- setup +CREATE TABLE IF NOT EXISTS t(c1 Int, c2 Int ); +INSERT INTO t VALUES(1,2); +INSERT INTO t VALUES(3,4); + +select 'selection not match, nothing deleted'; +delete from t where c1 > 3; +select count(*) = 2 from t; + +select 'delete one rows'; +delete from t where c1 = 1; +select 'the row should be deleted'; +select count(*) = 0 from t where c1 = 1; +select 'other rows should be kept'; +select count(*) = 1 from t where c1 <> 1; + +select 'deleted unconditionally'; +delete from t; +select count(*) = 0 from t; + +DROP DATABASE db1;