Skip to content

Commit

Permalink
[fuse] add insert overwrite stmt
Browse files Browse the repository at this point in the history
  • Loading branch information
Veeupup committed Nov 30, 2021
1 parent 4249445 commit eb25dd9
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 21 deletions.
7 changes: 7 additions & 0 deletions common/planners/src/plan_insert_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct InsertIntoPlan {
pub tbl_name: String,
pub tbl_id: MetaId,
pub schema: DataSchemaRef,
pub overwrite: bool,

pub select_plan: Option<Box<PlanNode>>,
pub value_exprs_opt: Option<Vec<Vec<Expression>>>,
Expand All @@ -48,13 +49,15 @@ impl InsertIntoPlan {
table: String,
table_meta_id: MetaId,
schema: DataSchemaRef,
overwrite: bool,
select_plan: PlanNode,
) -> InsertIntoPlan {
InsertIntoPlan {
db_name: db,
tbl_name: table,
tbl_id: table_meta_id,
schema,
overwrite,
select_plan: Some(Box::new(select_plan)),
value_exprs_opt: None,
format: None,
Expand All @@ -66,13 +69,15 @@ impl InsertIntoPlan {
table: String,
table_meta_id: MetaId,
schema: DataSchemaRef,
overwrite: bool,
values: Vec<Vec<Expression>>,
) -> InsertIntoPlan {
InsertIntoPlan {
db_name: db,
tbl_name: table,
tbl_id: table_meta_id,
schema,
overwrite,
select_plan: None,
value_exprs_opt: Some(values),
format: None,
Expand All @@ -84,13 +89,15 @@ impl InsertIntoPlan {
table: String,
table_meta_id: MetaId,
schema: DataSchemaRef,
overwrite: bool,
format: Option<String>,
) -> InsertIntoPlan {
InsertIntoPlan {
db_name: db,
tbl_name: table,
tbl_id: table_meta_id,
schema,
overwrite,
select_plan: None,
value_exprs_opt: None,
format,
Expand Down
7 changes: 6 additions & 1 deletion query/src/catalogs/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ pub trait Table: Sync + Send {
)))
}

async fn commit(&self, _ctx: Arc<QueryContext>, _operations: Vec<DataBlock>) -> Result<()> {
async fn commit(
&self,
_ctx: Arc<QueryContext>,
_operations: Vec<DataBlock>,
_overwrite: bool,
) -> Result<()> {
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion query/src/interpreters/interpreter_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl Interpreter for CopyInterpreter {
.await?
.try_collect()
.await?;
table.commit(self.ctx.clone(), r).await?;
table.commit(self.ctx.clone(), r, false).await?;

Ok(Box::pin(DataBlockStream::create(
self.plan.schema(),
Expand Down
6 changes: 5 additions & 1 deletion query/src/interpreters/interpreter_insert_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ impl Interpreter for InsertIntoInterpreter {

// feed back the append operation logs to table
table
.commit(self.ctx.clone(), append_op_logs.try_collect().await?)
.commit(
self.ctx.clone(),
append_op_logs.try_collect().await?,
self.plan.overwrite,
)
.await?;

Ok(Box::pin(DataBlockStream::create(
Expand Down
25 changes: 17 additions & 8 deletions query/src/sql/statements/statement_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,6 @@ impl DfInsertStatement {
}

fn is_supported(&self) -> Result<()> {
if self.overwrite {
return Err(ErrorCode::SyntaxException(
"Unsupport insert overwrite statement.",
));
}

if self.partitioned.is_some() {
return Err(ErrorCode::SyntaxException(
"Unsupport insert ... partition statement.",
Expand Down Expand Up @@ -155,7 +149,14 @@ impl DfInsertStatement {
}

Ok(AnalyzedResult::SimpleQuery(PlanNode::InsertInto(
InsertIntoPlan::insert_values(db, table, table_meta_id, schema, value_exprs),
InsertIntoPlan::insert_values(
db,
table,
table_meta_id,
schema,
self.overwrite,
value_exprs,
),
)))
}

Expand All @@ -174,6 +175,7 @@ impl DfInsertStatement {
table,
table_meta_id,
table_schema,
self.overwrite,
self.format.clone(),
),
)))
Expand All @@ -193,7 +195,14 @@ impl DfInsertStatement {
let select_plan =
PlanParser::build_plan(vec![DfStatement::Query(statement)], ctx.clone()).await?;
Ok(AnalyzedResult::SimpleQuery(PlanNode::InsertInto(
InsertIntoPlan::insert_select(db, table, table_meta_id, table_schema, select_plan),
InsertIntoPlan::insert_select(
db,
table,
table_meta_id,
table_schema,
self.overwrite,
select_plan,
),
)))
}

Expand Down
4 changes: 3 additions & 1 deletion query/src/storages/fuse/index/min_max_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ async fn test_min_max_index() -> Result<()> {
let da = ctx.get_data_accessor()?;
let stream = Box::pin(futures::stream::iter(blocks));
let r = table.append_data(ctx.clone(), stream).await?;
table.commit(ctx.clone(), r.try_collect().await?).await?;
table
.commit(ctx.clone(), r.try_collect().await?, false)
.await?;

// get the latest tbl
let table = catalog
Expand Down
19 changes: 14 additions & 5 deletions query/src/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,24 @@ impl FuseTable {
&self,
ctx: Arc<QueryContext>,
operation_log: TableOperationLog,
overwrite: bool,
) -> Result<()> {
// TODO OCC retry & resolves conflicts if applicable

let prev = self.table_snapshot(ctx.clone()).await?;
let new_snapshot = Self::merge_table_operations(
self.table_info.meta.schema.as_ref(),
prev,
operation_log,
)?;
let new_snapshot = if overwrite {
let schema = self.table_info.meta.schema.as_ref().clone();
let (segments, summary) = statistics::merge_append_operations(&schema, operation_log)?;
TableSnapshot {
snapshot_id: Uuid::new_v4(),
prev_snapshot_id: prev.as_ref().map(|v| v.snapshot_id),
schema,
summary,
segments,
}
} else {
Self::merge_table_operations(self.table_info.meta.schema.as_ref(), prev, operation_log)?
};

let uuid = new_snapshot.snapshot_id;
let snapshot_loc = io::snapshot_location(uuid.to_simple().to_string().as_str());
Expand Down
9 changes: 7 additions & 2 deletions query/src/storages/fuse/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,18 @@ impl Table for FuseTable {
)))
}

async fn commit(&self, _ctx: Arc<QueryContext>, operations: Vec<DataBlock>) -> Result<()> {
async fn commit(
&self,
_ctx: Arc<QueryContext>,
operations: Vec<DataBlock>,
overwrite: bool,
) -> Result<()> {
// only append operation supported currently
let append_log_entries = operations
.iter()
.map(AppendOperationLogEntry::try_from)
.collect::<Result<Vec<AppendOperationLogEntry>>>()?;
self.do_commit(_ctx, append_log_entries).await
self.do_commit(_ctx, append_log_entries, overwrite).await
}

async fn truncate(
Expand Down
8 changes: 6 additions & 2 deletions query/src/storages/fuse/table_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ async fn test_fuse_table_simple_case() -> Result<()> {
)));

let r = table.append_data(ctx.clone(), stream).await?;
table.commit(ctx.clone(), r.try_collect().await?).await?;
table
.commit(ctx.clone(), r.try_collect().await?, false)
.await?;

// get the latest tbl
let prev_version = table.get_table_info().ident.version;
Expand Down Expand Up @@ -149,7 +151,9 @@ async fn test_fuse_table_truncate() -> Result<()> {
)));

let r = table.append_data(ctx.clone(), stream).await?;
table.commit(ctx.clone(), r.try_collect().await?).await?;
table
.commit(ctx.clone(), r.try_collect().await?, false)
.await?;
let source_plan = table.read_plan(ctx.clone(), None).await?;

// get the latest tbl
Expand Down

0 comments on commit eb25dd9

Please sign in to comment.