Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: statement delete from... #5691

Merged
merged 43 commits into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
2c34920
skeleton: parser, statment, plan & table method
dantengsky May 30, 2022
64c8cdb
minor refactoring
dantengsky May 31, 2022
50967e7
deletion collector
dantengsky May 31, 2022
c62a2ce
merge updated segment
dantengsky May 31, 2022
bbdf592
refact: extract code from collector into reducer
dantengsky Jun 1, 2022
dc4bb7d
basic delete operation
dantengsky Jun 1, 2022
93040cd
seems work
dantengsky Jun 1, 2022
74beea8
refact: remove duplicated code
dantengsky Jun 1, 2022
5612c4b
fix: nullary expression of where clause
dantengsky Jun 1, 2022
bffa780
stateless test cases
dantengsky Jun 2, 2022
adb621b
disable 'delete' in test_statements_in_legacy_suites
dantengsky Jun 2, 2022
acab2b1
remove empty segments
dantengsky Jun 2, 2022
2757981
optimize filter
dantengsky Jun 6, 2022
8c9daa9
refine block replacement
dantengsky Jun 6, 2022
e11ecbb
enable segment cache
dantengsky Jun 6, 2022
e3f368b
refact
dantengsky Jun 6, 2022
3a8c5e6
Merge remote-tracking branch 'origin/main' into feat-delete
dantengsky Jun 7, 2022
9a408ee
make lint
dantengsky Jun 7, 2022
c3c511c
refine block writer
dantengsky Jun 8, 2022
b5483d9
refine segment writer
dantengsky Jun 8, 2022
8cdabea
code minor gc
dantengsky Jun 8, 2022
fa1ae09
Merge remote-tracking branch 'origin/main' into feat-delete
dantengsky Jun 9, 2022
a47a79a
make lint
dantengsky Jun 9, 2022
0f78cac
Merge remote-tracking branch 'origin/main' into feat-delete
dantengsky Jun 13, 2022
e3ea643
simple delete statment of new parser
dantengsky Jun 13, 2022
06cebd2
Merge remote-tracking branch 'origin/main' into feat-delete
dantengsky Jun 23, 2022
7fd89ec
fix ut test & merge conflicts
dantengsky Jun 23, 2022
735fabf
tweak stateless test
dantengsky Jun 23, 2022
45527d4
revert .devcontainer
dantengsky Jun 23, 2022
76ceb3e
fix compile error of hive feature
dantengsky Jun 23, 2022
b951573
Merge remote-tracking branch 'origin/main' into feat-delete
dantengsky Jun 23, 2022
9615fd0
tidy up
dantengsky Jun 23, 2022
894dcc0
tweak tests
dantengsky Jun 23, 2022
3dd2fda
minor refactoring
dantengsky Jun 24, 2022
888a2b5
adapt to new planner
dantengsky Jun 26, 2022
a15827e
Merge remote-tracking branch 'origin/main' into feat-delete
dantengsky Jun 26, 2022
76cc1ae
resolve merge conflicts
dantengsky Jun 26, 2022
65f9375
make lint
dantengsky Jun 27, 2022
35244ed
tweak stateless test case
dantengsky Jun 27, 2022
f53a540
cleanup unnecessary logs
dantengsky Jun 27, 2022
ca255ae
Update query/src/sql/planner/plans/mod.rs
dantengsky Jun 27, 2022
d9fcbfb
add tracing instruments
dantengsky Jun 27, 2022
d9420fc
fix compile error introduced by instrument
dantengsky Jun 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fmt:
cargo fmt

lint:
cargo fmt
cargo fmt --all
cargo clippy --workspace --all-targets -- -D warnings
# Cargo.toml file formatter(make setup to install)
taplo fmt
Expand Down
18 changes: 18 additions & 0 deletions common/ast/src/ast/statements/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ pub enum Statement<'a> {

Insert(InsertStmt<'a>),

Delete {
catalog: Option<Identifier<'a>>,
database: Option<Identifier<'a>>,
table: Identifier<'a>,
selection: Option<Expr<'a>>,
},
// Databases
ShowDatabases(ShowDatabasesStmt<'a>),
ShowCreateDatabase(ShowCreateDatabaseStmt<'a>),
Expand Down Expand Up @@ -160,6 +166,18 @@ impl<'a> Display for Statement<'a> {
}
Statement::Query(query) => write!(f, "{query}")?,
Statement::Insert(insert) => write!(f, "{insert}")?,
Statement::Delete {
catalog,
database,
table,
selection,
} => {
write!(f, "DELETE FROM ")?;
write_comma_separated_list(f, catalog.iter().chain(database).chain(Some(table)))?;
if let Some(conditions) = selection {
write!(f, "WHERE {conditions} ")?;
}
}
Statement::Copy(stmt) => write!(f, "{stmt}")?,
Statement::ShowSettings => {}
Statement::ShowProcessList => write!(f, "SHOW PROCESSLIST")?,
Expand Down
14 changes: 14 additions & 0 deletions common/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,19 @@ pub fn statement(i: Input) -> IResult<Statement> {
})
},
);

let delete = map(
rule! {
DELETE ~ FROM ~ #peroid_separated_idents_1_to_3
~ ( WHERE ~ ^#expr )?
},
|(_, _, (catalog, database, table), opt_where_block)| Statement::Delete {
catalog,
database,
table,
selection: opt_where_block.map(|(_, selection)| selection),
},
);
let show_settings = value(Statement::ShowSettings, rule! { SHOW ~ SETTINGS });
let show_stages = value(Statement::ShowStages, rule! { SHOW ~ STAGES });
let show_process_list = value(Statement::ShowProcessList, rule! { SHOW ~ PROCESSLIST });
Expand Down Expand Up @@ -662,6 +675,7 @@ pub fn statement(i: Input) -> IResult<Statement> {
#map(query, |query| Statement::Query(Box::new(query)))
| #explain : "`EXPLAIN [PIPELINE | GRAPH] <statement>`"
| #insert : "`INSERT INTO [TABLE] <table> [(<column>, ...)] (FORMAT <format> | VALUES <values> | <query>)`"
| #delete : "`DELETE FROM <table> [WHERE ...]`"
| #show_settings : "`SHOW SETTINGS`"
| #show_stages : "`SHOW STAGES`"
| #show_process_list : "`SHOW PROCESSLIST`"
Expand Down
4 changes: 2 additions & 2 deletions common/ast/src/parser/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ pub enum TokenKind {
DECADE,
#[token("DEFAULT", ignore(ascii_case))]
DEFAULT,
#[token("DELETE", ignore(ascii_case))]
DELETE,
#[token("DESC", ignore(ascii_case))]
DESC,
#[token("DESCRIBE", ignore(ascii_case))]
Expand Down Expand Up @@ -577,8 +579,6 @@ pub enum TokenKind {
USAGE,
#[token("UPDATE", ignore(ascii_case))]
UPDATE,
#[token("DELETE", ignore(ascii_case))]
DELETE,
#[token("SUPER", ignore(ascii_case))]
SUPER,
#[token("STATUS", ignore(ascii_case))]
Expand Down
8 changes: 4 additions & 4 deletions common/datablocks/src/kernels/data_block_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ use common_exception::Result;
use crate::DataBlock;

impl DataBlock {
pub fn filter_block(block: &DataBlock, predicate: &ColumnRef) -> Result<DataBlock> {
pub fn filter_block(block: DataBlock, predicate: &ColumnRef) -> Result<DataBlock> {
if block.num_columns() == 0 || block.num_rows() == 0 {
return Ok(block.clone());
return Ok(block);
}

let predict_boolean_nonull = Self::cast_to_nonull_boolean(predicate)?;
// faster path for constant filter
if predict_boolean_nonull.is_const() {
let flag = predict_boolean_nonull.get_bool(0)?;
if flag {
return Ok(block.clone());
return Ok(block);
} else {
return Ok(DataBlock::empty_with_schema(block.schema().clone()));
}
Expand All @@ -42,7 +42,7 @@ impl DataBlock {
let rows = boolean_col.len();
let count_zeros = boolean_col.values().null_count();
match count_zeros {
0 => Ok(block.clone()),
0 => Ok(block),
_ => {
if count_zeros == rows {
return Ok(DataBlock::empty_with_schema(block.schema().clone()));
Expand Down
8 changes: 4 additions & 4 deletions common/datablocks/tests/it/kernels/data_block_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn test_filter_non_const_data_block() -> Result<()> {
]);

let predicate = Series::from_data(vec![true, false, true, true, false, false]);
let block = DataBlock::filter_block(&block, &predicate)?;
let block = DataBlock::filter_block(block, &predicate)?;

common_datablocks::assert_blocks_eq(
vec![
Expand Down Expand Up @@ -62,7 +62,7 @@ fn test_filter_all_false_data_block() -> Result<()> {
]);

let predicate = Series::from_data(vec![false, false, false, false, false, false]);
let block = DataBlock::filter_block(&block, &predicate)?;
let block = DataBlock::filter_block(block, &predicate)?;

common_datablocks::assert_blocks_eq(
vec!["+---+---+", "| a | b |", "+---+---+", "+---+---+"],
Expand All @@ -88,7 +88,7 @@ fn test_filter_const_data_block() -> Result<()> {
]);

let predicate = Series::from_data(vec![true, false, true, true, false, false]);
let block = DataBlock::filter_block(&block, &predicate)?;
let block = DataBlock::filter_block(block, &predicate)?;

common_datablocks::assert_blocks_eq(
vec![
Expand Down Expand Up @@ -122,7 +122,7 @@ fn test_filter_all_const_data_block() -> Result<()> {
]);

let predicate = Series::from_data(vec![true, false, true, true, false, false]);
let block = DataBlock::filter_block(&block, &predicate)?;
let block = DataBlock::filter_block(block, &predicate)?;

common_datablocks::assert_blocks_eq(
vec![
Expand Down
2 changes: 1 addition & 1 deletion common/datavalues/src/data_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl DataSchema {

/// project will do column pruning.
#[must_use]
pub fn project(&self, projection: Vec<usize>) -> Self {
pub fn project(&self, projection: &[usize]) -> Self {
let fields = projection
.iter()
.map(|idx| self.fields()[*idx].clone())
Expand Down
3 changes: 3 additions & 0 deletions common/planners/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod plan_database_drop;
mod plan_database_rename;
mod plan_database_show_create;
mod plan_database_undrop;
mod plan_delete;
mod plan_empty;
mod plan_explain;
mod plan_expression;
Expand Down Expand Up @@ -120,6 +121,7 @@ pub use plan_database_rename::RenameDatabaseEntity;
pub use plan_database_rename::RenameDatabasePlan;
pub use plan_database_show_create::ShowCreateDatabasePlan;
pub use plan_database_undrop::UndropDatabasePlan;
pub use plan_delete::DeletePlan;
pub use plan_empty::EmptyPlan;
pub use plan_explain::ExplainPlan;
pub use plan_explain::ExplainType;
Expand All @@ -135,6 +137,7 @@ pub use plan_expression_common::expr_as_column_expr;
pub use plan_expression_common::extract_aliases;
pub use plan_expression_common::find_aggregate_exprs;
pub use plan_expression_common::find_aggregate_exprs_in_expr;
pub use plan_expression_common::find_column_exprs;
pub use plan_expression_common::find_columns_not_satisfy_exprs;
pub use plan_expression_common::find_window_exprs;
pub use plan_expression_common::find_window_exprs_in_expr;
Expand Down
37 changes: 37 additions & 0 deletions common/planners/src/plan_delete.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_datavalues::DataSchema;
use common_datavalues::DataSchemaRef;
use common_meta_app::schema::TableIdent;

use crate::Expression;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
pub struct DeletePlan {
pub catalog_name: String,
pub database_name: String,
pub table_name: String,
pub table_id: TableIdent,
pub selection: Option<Expression>,
pub projection: Vec<usize>,
}

impl DeletePlan {
pub fn schema(&self) -> DataSchemaRef {
Arc::new(DataSchema::empty())
}
}
10 changes: 10 additions & 0 deletions common/planners/src/plan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::CreateUserPlan;
use crate::CreateUserStagePlan;
use crate::CreateUserUDFPlan;
use crate::CreateViewPlan;
use crate::DeletePlan;
use crate::DescribeTablePlan;
use crate::DescribeUserStagePlan;
use crate::DropDatabasePlan;
Expand Down Expand Up @@ -110,6 +111,9 @@ pub enum PlanNode {
// Insert.
Insert(InsertPlan),

// Delete.
Delete(DeletePlan),

// Copy.
Copy(CopyPlan),

Expand Down Expand Up @@ -219,6 +223,9 @@ impl PlanNode {
// Insert.
PlanNode::Insert(v) => v.schema(),

// Delete.
PlanNode::Delete(v) => v.schema(),

// Copy.
PlanNode::Copy(v) => v.schema(),

Expand Down Expand Up @@ -327,6 +334,9 @@ impl PlanNode {
// Insert.
PlanNode::Insert(_) => "InsertPlan",

// Delete.
PlanNode::Delete(_) => "DeletePlan",

// Copy.
PlanNode::Copy(_) => "CopyPlan",

Expand Down
8 changes: 8 additions & 0 deletions common/planners/src/plan_node_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::CreateUserPlan;
use crate::CreateUserStagePlan;
use crate::CreateUserUDFPlan;
use crate::CreateViewPlan;
use crate::DeletePlan;
use crate::DescribeTablePlan;
use crate::DescribeUserStagePlan;
use crate::DropDatabasePlan;
Expand Down Expand Up @@ -139,6 +140,9 @@ pub trait PlanRewriter: Sized {
// Insert.
PlanNode::Insert(plan) => self.rewrite_insert_into(plan),

// Delete.
PlanNode::Delete(plan) => self.rewrite_delete_into(plan),

// Copy.
PlanNode::Copy(plan) => self.rewrite_copy(plan),

Expand Down Expand Up @@ -448,6 +452,10 @@ pub trait PlanRewriter: Sized {
Ok(PlanNode::Insert(plan.clone()))
}

fn rewrite_delete_into(&mut self, plan: &DeletePlan) -> Result<PlanNode> {
Ok(PlanNode::Delete(plan.clone()))
}

fn rewrite_copy(&mut self, plan: &CopyPlan) -> Result<PlanNode> {
Ok(PlanNode::Copy(plan.clone()))
}
Expand Down
8 changes: 8 additions & 0 deletions common/planners/src/plan_node_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::CreateUserPlan;
use crate::CreateUserStagePlan;
use crate::CreateUserUDFPlan;
use crate::CreateViewPlan;
use crate::DeletePlan;
use crate::DescribeTablePlan;
use crate::DescribeUserStagePlan;
use crate::DropDatabasePlan;
Expand Down Expand Up @@ -151,6 +152,9 @@ pub trait PlanVisitor {
// Insert.
PlanNode::Insert(plan) => self.visit_insert_into(plan),

// Insert.
PlanNode::Delete(plan) => self.visit_delete_into(plan),

// Copy.
PlanNode::Copy(plan) => self.visit_copy(plan),

Expand Down Expand Up @@ -435,6 +439,10 @@ pub trait PlanVisitor {
Ok(())
}

fn visit_delete_into(&mut self, _: &DeletePlan) -> Result<()> {
Ok(())
}

fn visit_copy(&mut self, _: &CopyPlan) -> Result<()> {
Ok(())
}
Expand Down
3 changes: 1 addition & 2 deletions common/streams/src/sources/source_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ impl<R> ParquetSource<R>
where R: AsyncRead + AsyncSeek + Unpin + Send
{
fn create(builder: ParquetSourceBuilder, reader: R) -> Self {
let arrow_table_schema =
Arc::new(builder.schema.project(builder.projection.clone())).to_arrow();
let arrow_table_schema = Arc::new(builder.schema.project(&builder.projection)).to_arrow();

ParquetSource {
reader,
Expand Down
Loading