Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fuse table] insert overwrite stmt implementation #3150

Merged
merged 4 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions common/planners/src/plan_insert_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct InsertIntoPlan {
pub tbl_name: String,
pub tbl_id: MetaId,
pub schema: DataSchemaRef,
pub overwrite: bool,

pub select_plan: Option<Box<PlanNode>>,
pub value_exprs_opt: Option<Vec<Vec<Expression>>>,
Expand All @@ -48,13 +49,15 @@ impl InsertIntoPlan {
table: String,
table_meta_id: MetaId,
schema: DataSchemaRef,
overwrite: bool,
select_plan: PlanNode,
) -> InsertIntoPlan {
InsertIntoPlan {
db_name: db,
tbl_name: table,
tbl_id: table_meta_id,
schema,
overwrite,
select_plan: Some(Box::new(select_plan)),
value_exprs_opt: None,
format: None,
Expand All @@ -66,13 +69,15 @@ impl InsertIntoPlan {
table: String,
table_meta_id: MetaId,
schema: DataSchemaRef,
overwrite: bool,
values: Vec<Vec<Expression>>,
) -> InsertIntoPlan {
InsertIntoPlan {
db_name: db,
tbl_name: table,
tbl_id: table_meta_id,
schema,
overwrite,
select_plan: None,
value_exprs_opt: Some(values),
format: None,
Expand All @@ -84,13 +89,15 @@ impl InsertIntoPlan {
table: String,
table_meta_id: MetaId,
schema: DataSchemaRef,
overwrite: bool,
format: Option<String>,
) -> InsertIntoPlan {
InsertIntoPlan {
db_name: db,
tbl_name: table,
tbl_id: table_meta_id,
schema,
overwrite,
select_plan: None,
value_exprs_opt: None,
format,
Expand Down
2 changes: 1 addition & 1 deletion query/src/interpreters/interpreter_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl Interpreter for CopyInterpreter {
.await?
.try_collect()
.await?;
table.commit(self.ctx.clone(), r).await?;
table.commit(self.ctx.clone(), r, false).await?;

Ok(Box::pin(DataBlockStream::create(
self.plan.schema(),
Expand Down
6 changes: 5 additions & 1 deletion query/src/interpreters/interpreter_insert_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ impl Interpreter for InsertIntoInterpreter {

// feed back the append operation logs to table
table
.commit(self.ctx.clone(), append_op_logs.try_collect().await?)
.commit(
self.ctx.clone(),
append_op_logs.try_collect().await?,
self.plan.overwrite,
)
.await?;

Ok(Box::pin(DataBlockStream::create(
Expand Down
25 changes: 17 additions & 8 deletions query/src/sql/statements/statement_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,6 @@ impl DfInsertStatement {
}

fn is_supported(&self) -> Result<()> {
if self.overwrite {
return Err(ErrorCode::SyntaxException(
"Unsupport insert overwrite statement.",
));
}

if self.partitioned.is_some() {
return Err(ErrorCode::SyntaxException(
"Unsupport insert ... partition statement.",
Expand Down Expand Up @@ -155,7 +149,14 @@ impl DfInsertStatement {
}

Ok(AnalyzedResult::SimpleQuery(Box::new(PlanNode::InsertInto(
InsertIntoPlan::insert_values(db, table, table_meta_id, schema, value_exprs),
InsertIntoPlan::insert_values(
db,
table,
table_meta_id,
schema,
self.overwrite,
value_exprs,
),
))))
}

Expand All @@ -174,6 +175,7 @@ impl DfInsertStatement {
table,
table_meta_id,
table_schema,
self.overwrite,
self.format.clone(),
),
))))
Expand All @@ -194,7 +196,14 @@ impl DfInsertStatement {
PlanParser::build_plan(vec![DfStatement::Query(Box::new(statement))], ctx.clone())
.await?;
Ok(AnalyzedResult::SimpleQuery(Box::new(PlanNode::InsertInto(
InsertIntoPlan::insert_select(db, table, table_meta_id, table_schema, select_plan),
InsertIntoPlan::insert_select(
db,
table,
table_meta_id,
table_schema,
self.overwrite,
select_plan,
),
))))
}

Expand Down
4 changes: 3 additions & 1 deletion query/src/storages/fuse/index/min_max_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ async fn test_min_max_index() -> Result<()> {
let da = ctx.get_data_accessor()?;
let stream = Box::pin(futures::stream::iter(blocks));
let r = table.append_data(ctx.clone(), stream).await?;
table.commit(ctx.clone(), r.try_collect().await?).await?;
table
.commit(ctx.clone(), r.try_collect().await?, false)
.await?;

// get the latest tbl
let table = catalog
Expand Down
19 changes: 14 additions & 5 deletions query/src/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,24 @@ impl FuseTable {
&self,
ctx: Arc<QueryContext>,
operation_log: TableOperationLog,
overwrite: bool,
) -> Result<()> {
// TODO OCC retry & resolves conflicts if applicable

let prev = self.table_snapshot(ctx.clone()).await?;
let new_snapshot = Self::merge_table_operations(
self.table_info.meta.schema.as_ref(),
prev,
operation_log,
)?;
let new_snapshot = if overwrite {
let schema = self.table_info.meta.schema.as_ref().clone();
let (segments, summary) = statistics::merge_append_operations(&schema, operation_log)?;
TableSnapshot {
snapshot_id: Uuid::new_v4(),
prev_snapshot_id: prev.as_ref().map(|v| v.snapshot_id),
schema,
summary,
segments,
}
} else {
Self::merge_table_operations(self.table_info.meta.schema.as_ref(), prev, operation_log)?
};

let uuid = new_snapshot.snapshot_id;
let snapshot_loc = io::snapshot_location(uuid.to_simple().to_string().as_str());
Expand Down
4 changes: 2 additions & 2 deletions query/src/storages/fuse/statistics/statistics_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fn test_ft_stats_block_stats() -> common_exception::Result<()> {

#[test]
fn test_ft_stats_col_stats_reduce() -> common_exception::Result<()> {
let blocks = TestFixture::gen_block_stream(10);
let blocks = TestFixture::gen_block_stream(10, 1);
let schema = DataSchemaRefExt::create(vec![DataField::new("a", DataType::Int32, false)]);
let col_stats = blocks
.iter()
Expand All @@ -59,7 +59,7 @@ fn test_ft_stats_col_stats_reduce() -> common_exception::Result<()> {

#[test]
fn test_ft_stats_accumulator() -> common_exception::Result<()> {
let blocks = TestFixture::gen_block_stream(10);
let blocks = TestFixture::gen_block_stream(10, 1);
let mut stats_acc = accumulator::StatisticsAccumulator::new();
let mut meta_acc = block_meta_acc::BlockMetaAccumulator::new();
blocks.iter().try_for_each(|item| {
Expand Down
9 changes: 7 additions & 2 deletions query/src/storages/fuse/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,18 @@ impl Table for FuseTable {
)))
}

async fn commit(&self, _ctx: Arc<QueryContext>, operations: Vec<DataBlock>) -> Result<()> {
async fn commit(
&self,
_ctx: Arc<QueryContext>,
operations: Vec<DataBlock>,
overwrite: bool,
) -> Result<()> {
// only append operation supported currently
let append_log_entries = operations
.iter()
.map(AppendOperationLogEntry::try_from)
.collect::<Result<Vec<AppendOperationLogEntry>>>()?;
self.do_commit(_ctx, append_log_entries).await
self.do_commit(_ctx, append_log_entries, overwrite).await
}

async fn truncate(
Expand Down
84 changes: 78 additions & 6 deletions query/src/storages/fuse/table_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@ async fn test_fuse_table_simple_case() -> Result<()> {
)
.await?;

// insert 10 blocks
// insert 5 blocks
let num_blocks = 5;
let stream = Box::pin(futures::stream::iter(TestFixture::gen_block_stream(
num_blocks,
num_blocks, 1,
)));

let r = table.append_data(ctx.clone(), stream).await?;
table.commit(ctx.clone(), r.try_collect().await?).await?;
table
.commit(ctx.clone(), r.try_collect().await?, false)
.await?;

// get the latest tbl
let prev_version = table.get_table_info().ident.version;
Expand All @@ -68,7 +70,7 @@ async fn test_fuse_table_simple_case() -> Result<()> {
ctx.try_set_partitions(parts)?;

let stream = table
.read(ctx, &ReadDataSourcePlan {
.read(ctx.clone(), &ReadDataSourcePlan {
table_info: Default::default(),
scan_fields: None,
parts: Default::default(),
Expand Down Expand Up @@ -105,6 +107,74 @@ async fn test_fuse_table_simple_case() -> Result<()> {
];
common_datablocks::assert_blocks_sorted_eq(expected, blocks.as_slice());

// test commit with overwrite

// insert overwrite 5 blocks
let num_blocks = 5;
let stream = Box::pin(futures::stream::iter(TestFixture::gen_block_stream(
num_blocks, 4,
)));

let r = table.append_data(ctx.clone(), stream).await?;
table
.commit(ctx.clone(), r.try_collect().await?, true)
.await?;

// get the latest tbl
let prev_version = table.get_table_info().ident.version;
let table = catalog
.get_table(
fixture.default_db().as_str(),
fixture.default_table().as_str(),
)
.await?;
assert_ne!(prev_version, table.get_table_info().ident.version);

let (stats, parts) = table.read_partitions(ctx.clone(), None).await?;
assert_eq!(parts.len(), num_blocks as usize);
assert_eq!(stats.read_rows, num_blocks as usize * 3);

// inject partitions to current ctx
ctx.try_set_partitions(parts)?;

let stream = table
.read(ctx.clone(), &ReadDataSourcePlan {
table_info: Default::default(),
scan_fields: None,
parts: Default::default(),
statistics: Default::default(),
description: "".to_string(),
tbl_args: None,
push_downs: None,
})
.await?;
let blocks = stream.try_collect::<Vec<_>>().await?;
let rows: usize = blocks.iter().map(|block| block.num_rows()).sum();
assert_eq!(rows, num_blocks as usize * 3);

let expected = vec![
"+----+", //
"| id |", //
"+----+", //
"| 4 |", //
"| 4 |", //
"| 4 |", //
"| 4 |", //
"| 4 |", //
"| 5 |", //
"| 5 |", //
"| 5 |", //
"| 5 |", //
"| 5 |", //
"| 6 |", //
"| 6 |", //
"| 6 |", //
"| 6 |", //
"| 6 |", //
"+----+", //
];
common_datablocks::assert_blocks_sorted_eq(expected, blocks.as_slice());

Ok(())
}

Expand Down Expand Up @@ -145,11 +215,13 @@ async fn test_fuse_table_truncate() -> Result<()> {
// 2. truncate table which has data
let num_blocks = 10;
let stream = Box::pin(futures::stream::iter(TestFixture::gen_block_stream(
num_blocks,
num_blocks, 1,
)));

let r = table.append_data(ctx.clone(), stream).await?;
table.commit(ctx.clone(), r.try_collect().await?).await?;
table
.commit(ctx.clone(), r.try_collect().await?, false)
.await?;
let source_plan = table.read_plan(ctx.clone(), None).await?;

// get the latest tbl
Expand Down
6 changes: 4 additions & 2 deletions query/src/storages/fuse/table_test_fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,16 @@ impl TestFixture {
}
}

pub fn gen_block_stream(num: u32) -> Vec<Result<DataBlock>> {
pub fn gen_block_stream(num: u32, start: i32) -> Vec<Result<DataBlock>> {
(0..num)
.into_iter()
.map(|_v| {
let schema =
DataSchemaRefExt::create(vec![DataField::new("a", DataType::Int32, false)]);
Ok(DataBlock::create_by_array(schema, vec![Series::new(vec![
1, 2, 3,
start,
start + 1,
start + 2,
])]))
})
.collect()
Expand Down
7 changes: 6 additions & 1 deletion query/src/storages/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,12 @@ pub trait Table: Sync + Send {
)))
}

async fn commit(&self, _ctx: Arc<QueryContext>, _operations: Vec<DataBlock>) -> Result<()> {
async fn commit(
&self,
_ctx: Arc<QueryContext>,
_operations: Vec<DataBlock>,
_overwrite: bool,
) -> Result<()> {
Ok(())
}

Expand Down
6 changes: 6 additions & 0 deletions tests/suites/0_stateless/13_0000_insert_overwrite.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
1 1 1 origin
2 2 2 origin
3 3 3 change
4 4 4 change
5 5 5 change2
6 6 6 change2
17 changes: 17 additions & 0 deletions tests/suites/0_stateless/13_0000_insert_overwrite.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
DROP DATABASE IF EXISTS db1;
CREATE DATABASE db1;
USE db1;

CREATE TABLE IF NOT EXISTS t1(a Int8, b UInt32, c UInt64, d String) Engine = FUSE;
CREATE TABLE IF NOT EXISTS t2(a Int8, b UInt32, c UInt64, d String) Engine = FUSE;

INSERT INTO t1 (a,b,c,d) VALUES(1, 1, 1, 'origin'), (2, 2, 2, 'origin');
INSERT INTO t2 (a,b,c,d) VALUES(3, 3, 3, 'change'), (4, 4, 4, 'change');

select * from t1;
INSERT OVERWRITE t1 select * from t2;
select * from t1;
INSERT OVERWRITE t1 VALUES (5, 5, 5, 'change2'), (6, 6, 6, 'change2');
select * from t1;

DROP DATABASE db1;