Skip to content

Commit

Permalink
Merge pull request #5691 from dantengsky/feat-delete
Browse files Browse the repository at this point in the history
Feat: statement `delete from... `
  • Loading branch information
BohuTANG authored Jun 27, 2022
2 parents edd4bad + d9420fc commit 43ffa8b
Show file tree
Hide file tree
Showing 75 changed files with 1,563 additions and 355 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fmt:
cargo fmt

lint:
cargo fmt
cargo fmt --all
cargo clippy --workspace --all-targets -- -D warnings
# Cargo.toml file formatter(make setup to install)
taplo fmt
Expand Down
18 changes: 18 additions & 0 deletions common/ast/src/ast/statements/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ pub enum Statement<'a> {

Insert(InsertStmt<'a>),

Delete {
catalog: Option<Identifier<'a>>,
database: Option<Identifier<'a>>,
table: Identifier<'a>,
selection: Option<Expr<'a>>,
},
// Databases
ShowDatabases(ShowDatabasesStmt<'a>),
ShowCreateDatabase(ShowCreateDatabaseStmt<'a>),
Expand Down Expand Up @@ -160,6 +166,18 @@ impl<'a> Display for Statement<'a> {
}
Statement::Query(query) => write!(f, "{query}")?,
Statement::Insert(insert) => write!(f, "{insert}")?,
Statement::Delete {
catalog,
database,
table,
selection,
} => {
write!(f, "DELETE FROM ")?;
write_comma_separated_list(f, catalog.iter().chain(database).chain(Some(table)))?;
if let Some(conditions) = selection {
write!(f, "WHERE {conditions} ")?;
}
}
Statement::Copy(stmt) => write!(f, "{stmt}")?,
Statement::ShowSettings => {}
Statement::ShowProcessList => write!(f, "SHOW PROCESSLIST")?,
Expand Down
14 changes: 14 additions & 0 deletions common/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,19 @@ pub fn statement(i: Input) -> IResult<Statement> {
})
},
);

let delete = map(
rule! {
DELETE ~ FROM ~ #peroid_separated_idents_1_to_3
~ ( WHERE ~ ^#expr )?
},
|(_, _, (catalog, database, table), opt_where_block)| Statement::Delete {
catalog,
database,
table,
selection: opt_where_block.map(|(_, selection)| selection),
},
);
let show_settings = value(Statement::ShowSettings, rule! { SHOW ~ SETTINGS });
let show_stages = value(Statement::ShowStages, rule! { SHOW ~ STAGES });
let show_process_list = value(Statement::ShowProcessList, rule! { SHOW ~ PROCESSLIST });
Expand Down Expand Up @@ -662,6 +675,7 @@ pub fn statement(i: Input) -> IResult<Statement> {
#map(query, |query| Statement::Query(Box::new(query)))
| #explain : "`EXPLAIN [PIPELINE | GRAPH] <statement>`"
| #insert : "`INSERT INTO [TABLE] <table> [(<column>, ...)] (FORMAT <format> | VALUES <values> | <query>)`"
| #delete : "`DELETE FROM <table> [WHERE ...]`"
| #show_settings : "`SHOW SETTINGS`"
| #show_stages : "`SHOW STAGES`"
| #show_process_list : "`SHOW PROCESSLIST`"
Expand Down
4 changes: 2 additions & 2 deletions common/ast/src/parser/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ pub enum TokenKind {
DECADE,
#[token("DEFAULT", ignore(ascii_case))]
DEFAULT,
#[token("DELETE", ignore(ascii_case))]
DELETE,
#[token("DESC", ignore(ascii_case))]
DESC,
#[token("DESCRIBE", ignore(ascii_case))]
Expand Down Expand Up @@ -577,8 +579,6 @@ pub enum TokenKind {
USAGE,
#[token("UPDATE", ignore(ascii_case))]
UPDATE,
#[token("DELETE", ignore(ascii_case))]
DELETE,
#[token("SUPER", ignore(ascii_case))]
SUPER,
#[token("STATUS", ignore(ascii_case))]
Expand Down
8 changes: 4 additions & 4 deletions common/datablocks/src/kernels/data_block_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ use common_exception::Result;
use crate::DataBlock;

impl DataBlock {
pub fn filter_block(block: &DataBlock, predicate: &ColumnRef) -> Result<DataBlock> {
pub fn filter_block(block: DataBlock, predicate: &ColumnRef) -> Result<DataBlock> {
if block.num_columns() == 0 || block.num_rows() == 0 {
return Ok(block.clone());
return Ok(block);
}

let predict_boolean_nonull = Self::cast_to_nonull_boolean(predicate)?;
// faster path for constant filter
if predict_boolean_nonull.is_const() {
let flag = predict_boolean_nonull.get_bool(0)?;
if flag {
return Ok(block.clone());
return Ok(block);
} else {
return Ok(DataBlock::empty_with_schema(block.schema().clone()));
}
Expand All @@ -42,7 +42,7 @@ impl DataBlock {
let rows = boolean_col.len();
let count_zeros = boolean_col.values().null_count();
match count_zeros {
0 => Ok(block.clone()),
0 => Ok(block),
_ => {
if count_zeros == rows {
return Ok(DataBlock::empty_with_schema(block.schema().clone()));
Expand Down
8 changes: 4 additions & 4 deletions common/datablocks/tests/it/kernels/data_block_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn test_filter_non_const_data_block() -> Result<()> {
]);

let predicate = Series::from_data(vec![true, false, true, true, false, false]);
let block = DataBlock::filter_block(&block, &predicate)?;
let block = DataBlock::filter_block(block, &predicate)?;

common_datablocks::assert_blocks_eq(
vec![
Expand Down Expand Up @@ -62,7 +62,7 @@ fn test_filter_all_false_data_block() -> Result<()> {
]);

let predicate = Series::from_data(vec![false, false, false, false, false, false]);
let block = DataBlock::filter_block(&block, &predicate)?;
let block = DataBlock::filter_block(block, &predicate)?;

common_datablocks::assert_blocks_eq(
vec!["+---+---+", "| a | b |", "+---+---+", "+---+---+"],
Expand All @@ -88,7 +88,7 @@ fn test_filter_const_data_block() -> Result<()> {
]);

let predicate = Series::from_data(vec![true, false, true, true, false, false]);
let block = DataBlock::filter_block(&block, &predicate)?;
let block = DataBlock::filter_block(block, &predicate)?;

common_datablocks::assert_blocks_eq(
vec![
Expand Down Expand Up @@ -122,7 +122,7 @@ fn test_filter_all_const_data_block() -> Result<()> {
]);

let predicate = Series::from_data(vec![true, false, true, true, false, false]);
let block = DataBlock::filter_block(&block, &predicate)?;
let block = DataBlock::filter_block(block, &predicate)?;

common_datablocks::assert_blocks_eq(
vec![
Expand Down
2 changes: 1 addition & 1 deletion common/datavalues/src/data_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl DataSchema {

/// project will do column pruning.
#[must_use]
pub fn project(&self, projection: Vec<usize>) -> Self {
pub fn project(&self, projection: &[usize]) -> Self {
let fields = projection
.iter()
.map(|idx| self.fields()[*idx].clone())
Expand Down
3 changes: 3 additions & 0 deletions common/planners/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod plan_database_drop;
mod plan_database_rename;
mod plan_database_show_create;
mod plan_database_undrop;
mod plan_delete;
mod plan_empty;
mod plan_explain;
mod plan_expression;
Expand Down Expand Up @@ -120,6 +121,7 @@ pub use plan_database_rename::RenameDatabaseEntity;
pub use plan_database_rename::RenameDatabasePlan;
pub use plan_database_show_create::ShowCreateDatabasePlan;
pub use plan_database_undrop::UndropDatabasePlan;
pub use plan_delete::DeletePlan;
pub use plan_empty::EmptyPlan;
pub use plan_explain::ExplainPlan;
pub use plan_explain::ExplainType;
Expand All @@ -135,6 +137,7 @@ pub use plan_expression_common::expr_as_column_expr;
pub use plan_expression_common::extract_aliases;
pub use plan_expression_common::find_aggregate_exprs;
pub use plan_expression_common::find_aggregate_exprs_in_expr;
pub use plan_expression_common::find_column_exprs;
pub use plan_expression_common::find_columns_not_satisfy_exprs;
pub use plan_expression_common::find_window_exprs;
pub use plan_expression_common::find_window_exprs_in_expr;
Expand Down
37 changes: 37 additions & 0 deletions common/planners/src/plan_delete.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_datavalues::DataSchema;
use common_datavalues::DataSchemaRef;
use common_meta_app::schema::TableIdent;

use crate::Expression;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
pub struct DeletePlan {
pub catalog_name: String,
pub database_name: String,
pub table_name: String,
pub table_id: TableIdent,
pub selection: Option<Expression>,
pub projection: Vec<usize>,
}

impl DeletePlan {
pub fn schema(&self) -> DataSchemaRef {
Arc::new(DataSchema::empty())
}
}
10 changes: 10 additions & 0 deletions common/planners/src/plan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::CreateUserPlan;
use crate::CreateUserStagePlan;
use crate::CreateUserUDFPlan;
use crate::CreateViewPlan;
use crate::DeletePlan;
use crate::DescribeTablePlan;
use crate::DescribeUserStagePlan;
use crate::DropDatabasePlan;
Expand Down Expand Up @@ -110,6 +111,9 @@ pub enum PlanNode {
// Insert.
Insert(InsertPlan),

// Delete.
Delete(DeletePlan),

// Copy.
Copy(CopyPlan),

Expand Down Expand Up @@ -219,6 +223,9 @@ impl PlanNode {
// Insert.
PlanNode::Insert(v) => v.schema(),

// Delete.
PlanNode::Delete(v) => v.schema(),

// Copy.
PlanNode::Copy(v) => v.schema(),

Expand Down Expand Up @@ -327,6 +334,9 @@ impl PlanNode {
// Insert.
PlanNode::Insert(_) => "InsertPlan",

// Delete.
PlanNode::Delete(_) => "DeletePlan",

// Copy.
PlanNode::Copy(_) => "CopyPlan",

Expand Down
8 changes: 8 additions & 0 deletions common/planners/src/plan_node_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::CreateUserPlan;
use crate::CreateUserStagePlan;
use crate::CreateUserUDFPlan;
use crate::CreateViewPlan;
use crate::DeletePlan;
use crate::DescribeTablePlan;
use crate::DescribeUserStagePlan;
use crate::DropDatabasePlan;
Expand Down Expand Up @@ -139,6 +140,9 @@ pub trait PlanRewriter: Sized {
// Insert.
PlanNode::Insert(plan) => self.rewrite_insert_into(plan),

// Delete.
PlanNode::Delete(plan) => self.rewrite_delete_into(plan),

// Copy.
PlanNode::Copy(plan) => self.rewrite_copy(plan),

Expand Down Expand Up @@ -448,6 +452,10 @@ pub trait PlanRewriter: Sized {
Ok(PlanNode::Insert(plan.clone()))
}

fn rewrite_delete_into(&mut self, plan: &DeletePlan) -> Result<PlanNode> {
Ok(PlanNode::Delete(plan.clone()))
}

fn rewrite_copy(&mut self, plan: &CopyPlan) -> Result<PlanNode> {
Ok(PlanNode::Copy(plan.clone()))
}
Expand Down
8 changes: 8 additions & 0 deletions common/planners/src/plan_node_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::CreateUserPlan;
use crate::CreateUserStagePlan;
use crate::CreateUserUDFPlan;
use crate::CreateViewPlan;
use crate::DeletePlan;
use crate::DescribeTablePlan;
use crate::DescribeUserStagePlan;
use crate::DropDatabasePlan;
Expand Down Expand Up @@ -151,6 +152,9 @@ pub trait PlanVisitor {
// Insert.
PlanNode::Insert(plan) => self.visit_insert_into(plan),

// Insert.
PlanNode::Delete(plan) => self.visit_delete_into(plan),

// Copy.
PlanNode::Copy(plan) => self.visit_copy(plan),

Expand Down Expand Up @@ -435,6 +439,10 @@ pub trait PlanVisitor {
Ok(())
}

fn visit_delete_into(&mut self, _: &DeletePlan) -> Result<()> {
Ok(())
}

fn visit_copy(&mut self, _: &CopyPlan) -> Result<()> {
Ok(())
}
Expand Down
3 changes: 1 addition & 2 deletions common/streams/src/sources/source_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ impl<R> ParquetSource<R>
where R: AsyncRead + AsyncSeek + Unpin + Send
{
fn create(builder: ParquetSourceBuilder, reader: R) -> Self {
let arrow_table_schema =
Arc::new(builder.schema.project(builder.projection.clone())).to_arrow();
let arrow_table_schema = Arc::new(builder.schema.project(&builder.projection)).to_arrow();

ParquetSource {
reader,
Expand Down
Loading

0 comments on commit 43ffa8b

Please sign in to comment.