From eb25dd95f18806a92e818100106178f42487a52b Mon Sep 17 00:00:00 2001 From: Veeupup <931418134@qq.com> Date: Tue, 30 Nov 2021 16:33:50 +0800 Subject: [PATCH 1/3] [fuse] add insert overwrite stmt --- common/planners/src/plan_insert_into.rs | 7 ++++++ query/src/catalogs/table.rs | 7 +++++- query/src/interpreters/interpreter_copy.rs | 2 +- .../interpreters/interpreter_insert_into.rs | 6 ++++- query/src/sql/statements/statement_insert.rs | 25 +++++++++++++------ query/src/storages/fuse/index/min_max_test.rs | 4 ++- query/src/storages/fuse/operations/commit.rs | 19 ++++++++++---- query/src/storages/fuse/table.rs | 9 +++++-- query/src/storages/fuse/table_test.rs | 8 ++++-- 9 files changed, 66 insertions(+), 21 deletions(-) diff --git a/common/planners/src/plan_insert_into.rs b/common/planners/src/plan_insert_into.rs index d8d44408eece..f7ab0794f8b4 100644 --- a/common/planners/src/plan_insert_into.rs +++ b/common/planners/src/plan_insert_into.rs @@ -24,6 +24,7 @@ pub struct InsertIntoPlan { pub tbl_name: String, pub tbl_id: MetaId, pub schema: DataSchemaRef, + pub overwrite: bool, pub select_plan: Option>, pub value_exprs_opt: Option>>, @@ -48,6 +49,7 @@ impl InsertIntoPlan { table: String, table_meta_id: MetaId, schema: DataSchemaRef, + overwrite: bool, select_plan: PlanNode, ) -> InsertIntoPlan { InsertIntoPlan { @@ -55,6 +57,7 @@ impl InsertIntoPlan { tbl_name: table, tbl_id: table_meta_id, schema, + overwrite, select_plan: Some(Box::new(select_plan)), value_exprs_opt: None, format: None, @@ -66,6 +69,7 @@ impl InsertIntoPlan { table: String, table_meta_id: MetaId, schema: DataSchemaRef, + overwrite: bool, values: Vec>, ) -> InsertIntoPlan { InsertIntoPlan { @@ -73,6 +77,7 @@ impl InsertIntoPlan { tbl_name: table, tbl_id: table_meta_id, schema, + overwrite, select_plan: None, value_exprs_opt: Some(values), format: None, @@ -84,6 +89,7 @@ impl InsertIntoPlan { table: String, table_meta_id: MetaId, schema: DataSchemaRef, + overwrite: bool, format: Option, ) -> InsertIntoPlan { InsertIntoPlan { @@ -91,6 +97,7 @@ impl InsertIntoPlan { tbl_name: table, tbl_id: table_meta_id, schema, + overwrite, select_plan: None, value_exprs_opt: None, format, diff --git a/query/src/catalogs/table.rs b/query/src/catalogs/table.rs index 355860c9e769..970b83b8765c 100644 --- a/query/src/catalogs/table.rs +++ b/query/src/catalogs/table.rs @@ -99,7 +99,12 @@ pub trait Table: Sync + Send { ))) } - async fn commit(&self, _ctx: Arc, _operations: Vec) -> Result<()> { + async fn commit( + &self, + _ctx: Arc, + _operations: Vec, + _overwrite: bool, + ) -> Result<()> { Ok(()) } diff --git a/query/src/interpreters/interpreter_copy.rs b/query/src/interpreters/interpreter_copy.rs index dd63223d0a1b..03534c5b71f0 100644 --- a/query/src/interpreters/interpreter_copy.rs +++ b/query/src/interpreters/interpreter_copy.rs @@ -92,7 +92,7 @@ impl Interpreter for CopyInterpreter { .await? .try_collect() .await?; - table.commit(self.ctx.clone(), r).await?; + table.commit(self.ctx.clone(), r, false).await?; Ok(Box::pin(DataBlockStream::create( self.plan.schema(), diff --git a/query/src/interpreters/interpreter_insert_into.rs b/query/src/interpreters/interpreter_insert_into.rs index 0a6d068d2014..2a9c748501c0 100644 --- a/query/src/interpreters/interpreter_insert_into.rs +++ b/query/src/interpreters/interpreter_insert_into.rs @@ -103,7 +103,11 @@ impl Interpreter for InsertIntoInterpreter { // feed back the append operation logs to table table - .commit(self.ctx.clone(), append_op_logs.try_collect().await?) + .commit( + self.ctx.clone(), + append_op_logs.try_collect().await?, + self.plan.overwrite, + ) .await?; Ok(Box::pin(DataBlockStream::create( diff --git a/query/src/sql/statements/statement_insert.rs b/query/src/sql/statements/statement_insert.rs index 6e7ba5515ca7..f5944f7c3e20 100644 --- a/query/src/sql/statements/statement_insert.rs +++ b/query/src/sql/statements/statement_insert.rs @@ -98,12 +98,6 @@ impl DfInsertStatement { } fn is_supported(&self) -> Result<()> { - if self.overwrite { - return Err(ErrorCode::SyntaxException( - "Unsupport insert overwrite statement.", - )); - } - if self.partitioned.is_some() { return Err(ErrorCode::SyntaxException( "Unsupport insert ... partition statement.", @@ -155,7 +149,14 @@ impl DfInsertStatement { } Ok(AnalyzedResult::SimpleQuery(PlanNode::InsertInto( - InsertIntoPlan::insert_values(db, table, table_meta_id, schema, value_exprs), + InsertIntoPlan::insert_values( + db, + table, + table_meta_id, + schema, + self.overwrite, + value_exprs, + ), ))) } @@ -174,6 +175,7 @@ impl DfInsertStatement { table, table_meta_id, table_schema, + self.overwrite, self.format.clone(), ), ))) @@ -193,7 +195,14 @@ impl DfInsertStatement { let select_plan = PlanParser::build_plan(vec![DfStatement::Query(statement)], ctx.clone()).await?; Ok(AnalyzedResult::SimpleQuery(PlanNode::InsertInto( - InsertIntoPlan::insert_select(db, table, table_meta_id, table_schema, select_plan), + InsertIntoPlan::insert_select( + db, + table, + table_meta_id, + table_schema, + self.overwrite, + select_plan, + ), ))) } diff --git a/query/src/storages/fuse/index/min_max_test.rs b/query/src/storages/fuse/index/min_max_test.rs index be540ecb3ea6..76d64ecb3c96 100644 --- a/query/src/storages/fuse/index/min_max_test.rs +++ b/query/src/storages/fuse/index/min_max_test.rs @@ -80,7 +80,9 @@ async fn test_min_max_index() -> Result<()> { let da = ctx.get_data_accessor()?; let stream = Box::pin(futures::stream::iter(blocks)); let r = table.append_data(ctx.clone(), stream).await?; - table.commit(ctx.clone(), r.try_collect().await?).await?; + table + .commit(ctx.clone(), r.try_collect().await?, false) + .await?; // get the latest tbl let table = catalog diff --git a/query/src/storages/fuse/operations/commit.rs b/query/src/storages/fuse/operations/commit.rs index 14b9bbc8caa7..9f64f295ad19 100644 --- a/query/src/storages/fuse/operations/commit.rs +++ b/query/src/storages/fuse/operations/commit.rs @@ -37,15 +37,24 @@ impl FuseTable { &self, ctx: Arc, operation_log: TableOperationLog, + overwrite: bool, ) -> Result<()> { // TODO OCC retry & resolves conflicts if applicable let prev = self.table_snapshot(ctx.clone()).await?; - let new_snapshot = Self::merge_table_operations( - self.table_info.meta.schema.as_ref(), - prev, - operation_log, - )?; + let new_snapshot = if overwrite { + let schema = self.table_info.meta.schema.as_ref().clone(); + let (segments, summary) = statistics::merge_append_operations(&schema, operation_log)?; + TableSnapshot { + snapshot_id: Uuid::new_v4(), + prev_snapshot_id: prev.as_ref().map(|v| v.snapshot_id), + schema, + summary, + segments, + } + } else { + Self::merge_table_operations(self.table_info.meta.schema.as_ref(), prev, operation_log)? + }; let uuid = new_snapshot.snapshot_id; let snapshot_loc = io::snapshot_location(uuid.to_simple().to_string().as_str()); diff --git a/query/src/storages/fuse/table.rs b/query/src/storages/fuse/table.rs index b01d3c53001a..831908f0baf1 100644 --- a/query/src/storages/fuse/table.rs +++ b/query/src/storages/fuse/table.rs @@ -97,13 +97,18 @@ impl Table for FuseTable { ))) } - async fn commit(&self, _ctx: Arc, operations: Vec) -> Result<()> { + async fn commit( + &self, + _ctx: Arc, + operations: Vec, + overwrite: bool, + ) -> Result<()> { // only append operation supported currently let append_log_entries = operations .iter() .map(AppendOperationLogEntry::try_from) .collect::>>()?; - self.do_commit(_ctx, append_log_entries).await + self.do_commit(_ctx, append_log_entries, overwrite).await } async fn truncate( diff --git a/query/src/storages/fuse/table_test.rs b/query/src/storages/fuse/table_test.rs index 6a6bc854673f..dc7ec41d07d4 100644 --- a/query/src/storages/fuse/table_test.rs +++ b/query/src/storages/fuse/table_test.rs @@ -48,7 +48,9 @@ async fn test_fuse_table_simple_case() -> Result<()> { ))); let r = table.append_data(ctx.clone(), stream).await?; - table.commit(ctx.clone(), r.try_collect().await?).await?; + table + .commit(ctx.clone(), r.try_collect().await?, false) + .await?; // get the latest tbl let prev_version = table.get_table_info().ident.version; @@ -149,7 +151,9 @@ async fn test_fuse_table_truncate() -> Result<()> { ))); let r = table.append_data(ctx.clone(), stream).await?; - table.commit(ctx.clone(), r.try_collect().await?).await?; + table + .commit(ctx.clone(), r.try_collect().await?, false) + .await?; let source_plan = table.read_plan(ctx.clone(), None).await?; // get the latest tbl From c5cba8f4acc5f6ab117af7857b38a7b44c8e4f65 Mon Sep 17 00:00:00 2001 From: Veeupup <931418134@qq.com> Date: Tue, 30 Nov 2021 19:10:13 +0800 Subject: [PATCH 2/3] fix lint --- query/src/sql/statements/statement_insert.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/query/src/sql/statements/statement_insert.rs b/query/src/sql/statements/statement_insert.rs index 58d84ce467de..201f6e263fdb 100644 --- a/query/src/sql/statements/statement_insert.rs +++ b/query/src/sql/statements/statement_insert.rs @@ -193,7 +193,8 @@ impl DfInsertStatement { let statement = DfQueryStatement::try_from(source.clone())?; let select_plan = - PlanParser::build_plan(vec![DfStatement::Query(Box::new(statement))], ctx.clone()).await?; + PlanParser::build_plan(vec![DfStatement::Query(Box::new(statement))], ctx.clone()) + .await?; Ok(AnalyzedResult::SimpleQuery(Box::new(PlanNode::InsertInto( InsertIntoPlan::insert_select( db, From 5c43f99ebb1eb2c16d8f3a48771158bdef1ae4f9 Mon Sep 17 00:00:00 2001 From: Veeupup <931418134@qq.com> Date: Tue, 30 Nov 2021 20:57:04 +0800 Subject: [PATCH 3/3] add insert overwrite test --- .../fuse/statistics/statistics_test.rs | 4 +- query/src/storages/fuse/table_test.rs | 76 ++++++++++++++++++- query/src/storages/fuse/table_test_fixture.rs | 6 +- .../13_0000_insert_overwrite.result | 6 ++ .../0_stateless/13_0000_insert_overwrite.sql | 17 +++++ 5 files changed, 101 insertions(+), 8 deletions(-) create mode 100644 tests/suites/0_stateless/13_0000_insert_overwrite.result create mode 100644 tests/suites/0_stateless/13_0000_insert_overwrite.sql diff --git a/query/src/storages/fuse/statistics/statistics_test.rs b/query/src/storages/fuse/statistics/statistics_test.rs index 841fa621df1f..22ce3413490b 100644 --- a/query/src/storages/fuse/statistics/statistics_test.rs +++ b/query/src/storages/fuse/statistics/statistics_test.rs @@ -41,7 +41,7 @@ fn test_ft_stats_block_stats() -> common_exception::Result<()> { #[test] fn test_ft_stats_col_stats_reduce() -> common_exception::Result<()> { - let blocks = TestFixture::gen_block_stream(10); + let blocks = TestFixture::gen_block_stream(10, 1); let schema = DataSchemaRefExt::create(vec![DataField::new("a", DataType::Int32, false)]); let col_stats = blocks .iter() @@ -59,7 +59,7 @@ fn test_ft_stats_col_stats_reduce() -> common_exception::Result<()> { #[test] fn test_ft_stats_accumulator() -> common_exception::Result<()> { - let blocks = TestFixture::gen_block_stream(10); + let blocks = TestFixture::gen_block_stream(10, 1); let mut stats_acc = accumulator::StatisticsAccumulator::new(); let mut meta_acc = block_meta_acc::BlockMetaAccumulator::new(); blocks.iter().try_for_each(|item| { diff --git a/query/src/storages/fuse/table_test.rs b/query/src/storages/fuse/table_test.rs index 1daaffc7e227..01b592d94a3b 100644 --- a/query/src/storages/fuse/table_test.rs +++ b/query/src/storages/fuse/table_test.rs @@ -41,10 +41,10 @@ async fn test_fuse_table_simple_case() -> Result<()> { ) .await?; - // insert 10 blocks + // insert 5 blocks let num_blocks = 5; let stream = Box::pin(futures::stream::iter(TestFixture::gen_block_stream( - num_blocks, + num_blocks, 1, ))); let r = table.append_data(ctx.clone(), stream).await?; @@ -70,7 +70,7 @@ async fn test_fuse_table_simple_case() -> Result<()> { ctx.try_set_partitions(parts)?; let stream = table - .read(ctx, &ReadDataSourcePlan { + .read(ctx.clone(), &ReadDataSourcePlan { table_info: Default::default(), scan_fields: None, parts: Default::default(), @@ -107,6 +107,74 @@ async fn test_fuse_table_simple_case() -> Result<()> { ]; common_datablocks::assert_blocks_sorted_eq(expected, blocks.as_slice()); + // test commit with overwrite + + // insert overwrite 5 blocks + let num_blocks = 5; + let stream = Box::pin(futures::stream::iter(TestFixture::gen_block_stream( + num_blocks, 4, + ))); + + let r = table.append_data(ctx.clone(), stream).await?; + table + .commit(ctx.clone(), r.try_collect().await?, true) + .await?; + + // get the latest tbl + let prev_version = table.get_table_info().ident.version; + let table = catalog + .get_table( + fixture.default_db().as_str(), + fixture.default_table().as_str(), + ) + .await?; + assert_ne!(prev_version, table.get_table_info().ident.version); + + let (stats, parts) = table.read_partitions(ctx.clone(), None).await?; + assert_eq!(parts.len(), num_blocks as usize); + assert_eq!(stats.read_rows, num_blocks as usize * 3); + + // inject partitions to current ctx + ctx.try_set_partitions(parts)?; + + let stream = table + .read(ctx.clone(), &ReadDataSourcePlan { + table_info: Default::default(), + scan_fields: None, + parts: Default::default(), + statistics: Default::default(), + description: "".to_string(), + tbl_args: None, + push_downs: None, + }) + .await?; + let blocks = stream.try_collect::>().await?; + let rows: usize = blocks.iter().map(|block| block.num_rows()).sum(); + assert_eq!(rows, num_blocks as usize * 3); + + let expected = vec![ + "+----+", // + "| id |", // + "+----+", // + "| 4 |", // + "| 4 |", // + "| 4 |", // + "| 4 |", // + "| 4 |", // + "| 5 |", // + "| 5 |", // + "| 5 |", // + "| 5 |", // + "| 5 |", // + "| 6 |", // + "| 6 |", // + "| 6 |", // + "| 6 |", // + "| 6 |", // + "+----+", // + ]; + common_datablocks::assert_blocks_sorted_eq(expected, blocks.as_slice()); + Ok(()) } @@ -147,7 +215,7 @@ async fn test_fuse_table_truncate() -> Result<()> { // 2. truncate table which has data let num_blocks = 10; let stream = Box::pin(futures::stream::iter(TestFixture::gen_block_stream( - num_blocks, + num_blocks, 1, ))); let r = table.append_data(ctx.clone(), stream).await?; diff --git a/query/src/storages/fuse/table_test_fixture.rs b/query/src/storages/fuse/table_test_fixture.rs index ca4cd784ae50..bc6e77dd0f73 100644 --- a/query/src/storages/fuse/table_test_fixture.rs +++ b/query/src/storages/fuse/table_test_fixture.rs @@ -100,14 +100,16 @@ impl TestFixture { } } - pub fn gen_block_stream(num: u32) -> Vec> { + pub fn gen_block_stream(num: u32, start: i32) -> Vec> { (0..num) .into_iter() .map(|_v| { let schema = DataSchemaRefExt::create(vec![DataField::new("a", DataType::Int32, false)]); Ok(DataBlock::create_by_array(schema, vec![Series::new(vec![ - 1, 2, 3, + start, + start + 1, + start + 2, ])])) }) .collect() diff --git a/tests/suites/0_stateless/13_0000_insert_overwrite.result b/tests/suites/0_stateless/13_0000_insert_overwrite.result new file mode 100644 index 000000000000..ee070b75edb7 --- /dev/null +++ b/tests/suites/0_stateless/13_0000_insert_overwrite.result @@ -0,0 +1,6 @@ +1 1 1 origin +2 2 2 origin +3 3 3 change +4 4 4 change +5 5 5 change2 +6 6 6 change2 diff --git a/tests/suites/0_stateless/13_0000_insert_overwrite.sql b/tests/suites/0_stateless/13_0000_insert_overwrite.sql new file mode 100644 index 000000000000..99078b84a6a2 --- /dev/null +++ b/tests/suites/0_stateless/13_0000_insert_overwrite.sql @@ -0,0 +1,17 @@ +DROP DATABASE IF EXISTS db1; +CREATE DATABASE db1; +USE db1; + +CREATE TABLE IF NOT EXISTS t1(a Int8, b UInt32, c UInt64, d String) Engine = FUSE; +CREATE TABLE IF NOT EXISTS t2(a Int8, b UInt32, c UInt64, d String) Engine = FUSE; + +INSERT INTO t1 (a,b,c,d) VALUES(1, 1, 1, 'origin'), (2, 2, 2, 'origin'); +INSERT INTO t2 (a,b,c,d) VALUES(3, 3, 3, 'change'), (4, 4, 4, 'change'); + +select * from t1; +INSERT OVERWRITE t1 select * from t2; +select * from t1; +INSERT OVERWRITE t1 VALUES (5, 5, 5, 'change2'), (6, 6, 6, 'change2'); +select * from t1; + +DROP DATABASE db1;