Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): update interpreter and storage support #9261

Merged
merged 32 commits into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d1422cd
create file and dir
zhyass Dec 15, 2022
23ae3e0
update
zhyass Dec 21, 2022
7710609
rename deletepart to mutationpart
zhyass Dec 22, 2022
d5eed3a
update source
zhyass Dec 29, 2022
d828040
make lint
zhyass Dec 29, 2022
5de1ef5
rename deletion_transform to mutation_transform
zhyass Dec 29, 2022
518fa74
add interpreter_update
zhyass Dec 29, 2022
aba9f90
enable update
zhyass Dec 30, 2022
5e17e8c
fix test case
zhyass Dec 30, 2022
b1a4d21
Add unit test
zhyass Jan 3, 2023
d3e76c4
Merge remote-tracking branch 'upstream/main' into feature_update
zhyass Jan 3, 2023
bdbc7db
fix conflict
zhyass Jan 3, 2023
4d25c4f
Add sqllogic test
zhyass Jan 3, 2023
e99bea0
add scan progress
zhyass Jan 3, 2023
3a707a5
Add serialize data transform
zhyass Jan 6, 2023
e01cb7a
Merge remote-tracking branch 'upstream/main' into feature_update
zhyass Jan 6, 2023
2e8e87e
resolve conflict
zhyass Jan 6, 2023
9fe6f0e
add mutation source
zhyass Jan 10, 2023
b49199d
remove deletion source
zhyass Jan 10, 2023
7249fe9
Merge remote-tracking branch 'upstream/main' into feature_update
zhyass Jan 10, 2023
93322b6
resolve conflict
zhyass Jan 10, 2023
cbe97ac
remove update source
zhyass Jan 10, 2023
09f50c8
remove unused codes
zhyass Jan 10, 2023
0dcc149
update
zhyass Jan 12, 2023
cc8f39a
Merge remote-tracking branch 'upstream/main' into feature_update
zhyass Jan 12, 2023
17b9c0b
fix bug
zhyass Jan 12, 2023
5696a6c
format codes
zhyass Jan 13, 2023
1b53a03
add mutation block pruning
zhyass Jan 13, 2023
b4ecec3
fix tests
zhyass Jan 13, 2023
fa27986
Add sql logic tests
zhyass Jan 15, 2023
789af98
Merge remote-tracking branch 'upstream/main' into feature_update
zhyass Jan 15, 2023
030fc95
Merge branch 'main' into feature_update
mergify[bot] Jan 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,23 @@ pub trait Table: Sync + Send {
)))
}

async fn update(
&self,
ctx: Arc<dyn TableContext>,
filter: Option<Expression>,
col_indices: Vec<usize>,
update_list: Vec<(usize, Expression)>,
pipeline: &mut Pipeline,
) -> Result<()> {
let (_, _, _, _, _) = (ctx, filter, col_indices, update_list, pipeline);

Err(ErrorCode::Unimplemented(format!(
"table {}, of engine type {}, does not support UPDATE",
self.name(),
self.get_table_info().engine(),
)))
}

fn get_block_compact_thresholds(&self) -> BlockCompactThresholds {
BlockCompactThresholds {
max_rows_per_block: 1000 * 1000,
Expand Down
9 changes: 5 additions & 4 deletions src/query/service/src/interpreters/interpreter_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ use std::sync::Arc;
use common_datavalues::DataSchemaRef;
use common_exception::Result;
use common_pipeline_core::Pipeline;
use common_sql::executor::ExpressionBuilderWithoutRenaming;
use common_sql::plans::DeletePlan;

use crate::interpreters::Interpreter;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::PipelineBuildResult;
use crate::sessions::QueryContext;
use crate::sessions::TableContext;
use crate::sql::executor::ExpressionBuilderWithoutRenaming;
use crate::sql::plans::DeletePlan;
use crate::sql::plans::ScalarExpr;

/// interprets DeletePlan
Expand All @@ -35,7 +35,7 @@ pub struct DeleteInterpreter {
}

impl DeleteInterpreter {
/// Create the DelectInterpreter from DelectPlan
/// Create the DeleteInterpreter from DeletePlan
pub fn try_create(ctx: Arc<QueryContext>, plan: DeletePlan) -> Result<Self> {
Ok(DeleteInterpreter { ctx, plan })
}
Expand All @@ -48,7 +48,7 @@ impl Interpreter for DeleteInterpreter {
"DeleteInterpreter"
}

/// Get the schema of SelectPlan
/// Get the schema of DeletePlan
fn schema(&self) -> DataSchemaRef {
self.plan.schema()
}
Expand All @@ -60,6 +60,7 @@ impl Interpreter for DeleteInterpreter {
let db_name = self.plan.database_name.as_str();
let tbl_name = self.plan.table_name.as_str();
let tbl = self.ctx.get_table(catalog_name, db_name, tbl_name).await?;

let (filter, col_indices) = if let Some(scalar) = &self.plan.selection {
let eb = ExpressionBuilderWithoutRenaming::create(self.plan.metadata.clone());
(
Expand Down
9 changes: 5 additions & 4 deletions src/query/service/src/interpreters/interpreter_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::sync::Arc;

use common_ast::ast::ExplainKind;
use common_exception::ErrorCode;
use common_exception::Result;
use tracing::error;

Expand All @@ -35,6 +34,7 @@ use crate::interpreters::CreateShareInterpreter;
use crate::interpreters::DropShareInterpreter;
use crate::interpreters::DropUserInterpreter;
use crate::interpreters::SetRoleInterpreter;
use crate::interpreters::UpdateInterpreter;
use crate::sessions::QueryContext;
use crate::sql::plans::Plan;

Expand Down Expand Up @@ -200,9 +200,10 @@ impl InterpreterFactory {
*delete.clone(),
)?)),

Plan::Update(_update) => Err(ErrorCode::Unimplemented(
"Unimplement for update".to_string(),
)),
Plan::Update(update) => Ok(Arc::new(UpdateInterpreter::try_create(
ctx,
*update.clone(),
)?)),

// Roles
Plan::CreateRole(create_role) => Ok(Arc::new(CreateRoleInterpreter::try_create(
Expand Down
108 changes: 108 additions & 0 deletions src/query/service/src/interpreters/interpreter_update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_datavalues::DataSchemaRef;
use common_exception::ErrorCode;
use common_exception::Result;
use common_pipeline_core::Pipeline;

use crate::interpreters::Interpreter;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::PipelineBuildResult;
use crate::sessions::QueryContext;
use crate::sessions::TableContext;
use crate::sql::executor::ExpressionBuilderWithoutRenaming;
use crate::sql::plans::ScalarExpr;
use crate::sql::plans::UpdatePlan;

/// interprets UpdatePlan
pub struct UpdateInterpreter {
ctx: Arc<QueryContext>,
plan: UpdatePlan,
}

impl UpdateInterpreter {
/// Create the UpdateInterpreter from UpdatePlan
pub fn try_create(ctx: Arc<QueryContext>, plan: UpdatePlan) -> Result<Self> {
Ok(UpdateInterpreter { ctx, plan })
}
}

#[async_trait::async_trait]
impl Interpreter for UpdateInterpreter {
/// Get the name of current interpreter
fn name(&self) -> &str {
"UpdateInterpreter"
}

/// Get the schema of UpdatePlan
fn schema(&self) -> DataSchemaRef {
self.plan.schema()
}

#[tracing::instrument(level = "debug", name = "update_interpreter_execute", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))]
async fn execute2(&self) -> Result<PipelineBuildResult> {
// TODO check privilege
let catalog_name = self.plan.catalog.as_str();
let db_name = self.plan.database.as_str();
let tbl_name = self.plan.table.as_str();
let tbl = self.ctx.get_table(catalog_name, db_name, tbl_name).await?;

let eb = ExpressionBuilderWithoutRenaming::create(self.plan.metadata.clone());
// TODO(zhyass): selection and update_list support subquery.
let (filter, col_indices) = if let Some(scalar) = &self.plan.selection {
(
Some(eb.build(scalar)?),
scalar.used_columns().into_iter().collect(),
)
} else {
(None, vec![])
};

let update_list = self.plan.update_list.iter().try_fold(
Vec::with_capacity(self.plan.update_list.len()),
|mut acc, (id, scalar)| {
let expr = eb.build(scalar)?;
acc.push((*id, expr));
Ok::<_, ErrorCode>(acc)
},
)?;

let mut pipeline = Pipeline::create();
tbl.update(
self.ctx.clone(),
filter,
col_indices,
update_list,
&mut pipeline,
)
.await?;
if !pipeline.pipes.is_empty() {
let settings = self.ctx.get_settings();
pipeline.set_max_threads(settings.get_max_threads()? as usize);
let query_id = self.ctx.get_id();
let executor_settings = ExecutorSettings::try_create(&settings, query_id)?;
let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?;

self.ctx.set_executor(Arc::downgrade(&executor.get_inner()));
executor.execute()?;
drop(executor);
}

Ok(PipelineBuildResult::create())
}
}
2 changes: 2 additions & 0 deletions src/query/service/src/interpreters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ mod interpreter_table_show_create;
mod interpreter_table_truncate;
mod interpreter_table_undrop;
mod interpreter_unsetting;
mod interpreter_update;
mod interpreter_use_database;
mod interpreter_user_alter;
mod interpreter_user_create;
Expand Down Expand Up @@ -135,6 +136,7 @@ pub use interpreter_table_show_create::ShowCreateTableInterpreter;
pub use interpreter_table_truncate::TruncateTableInterpreter;
pub use interpreter_table_undrop::UndropTableInterpreter;
pub use interpreter_unsetting::UnSettingInterpreter;
pub use interpreter_update::UpdateInterpreter;
pub use interpreter_use_database::UseDatabaseInterpreter;
pub use interpreter_user_alter::AlterUserInterpreter;
pub use interpreter_user_create::CreateUserInterpreter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
// limitations under the License.

mod block_compact_mutator;
mod deletion_mutator;
mod deletion;
mod recluster_mutator;
mod segments_compact_mutator;
mod update;

pub use deletion_mutator::do_deletion;
pub use deletion::do_deletion;
124 changes: 124 additions & 0 deletions src/query/service/tests/it/storages/fuse/operations/mutation/update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2023 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_base::base::tokio;
use common_exception::ErrorCode;
use common_exception::Result;
use common_sql::executor::ExpressionBuilderWithoutRenaming;
use common_sql::plans::Plan;
use common_sql::plans::ScalarExpr;
use common_sql::plans::UpdatePlan;
use common_sql::Planner;
use common_storages_factory::Table;
use common_storages_fuse::FuseTable;
use databend_query::pipelines::executor::ExecutorSettings;
use databend_query::pipelines::executor::PipelineCompleteExecutor;
use databend_query::sessions::QueryContext;
use databend_query::sessions::TableContext;

use crate::storages::fuse::table_test_fixture::execute_command;
use crate::storages::fuse::table_test_fixture::execute_query;
use crate::storages::fuse::table_test_fixture::expects_ok;
use crate::storages::fuse::table_test_fixture::TestFixture;

#[tokio::test(flavor = "multi_thread")]
async fn test_update_mutator_multiple_empty_segments() -> Result<()> {
BohuTANG marked this conversation as resolved.
Show resolved Hide resolved
let fixture = TestFixture::new().await;
let ctx = fixture.ctx();
let tbl_name = fixture.default_table_name();
let db_name = fixture.default_db_name();

fixture.create_normal_table().await?;

// insert
for i in 0..10 {
let qry = format!("insert into {}.{}(id) values({})", db_name, tbl_name, i);
execute_command(ctx.clone(), qry.as_str()).await?;
}

let catalog = ctx.get_catalog(fixture.default_catalog_name().as_str())?;
let table = catalog
.get_table(ctx.get_tenant().as_str(), &db_name, &tbl_name)
.await?;
// update
let query = format!("update {}.{} set id=0 where id>0", db_name, tbl_name);
let mut planner = Planner::new(ctx.clone());
let (plan, _, _) = planner.plan_sql(&query).await?;
if let Plan::Update(update) = plan {
do_update(ctx.clone(), table.clone(), *update).await?;
}

// check count
let expected = vec![
"+-------+",
"| count |",
"+-------+",
"| 10 |",
"+-------+",
];
let qry = format!(
"select count(1) as count from {}.{} where id=0",
db_name, tbl_name
);
expects_ok(
"check count",
execute_query(fixture.ctx(), qry.as_str()).await,
expected,
)
.await?;
Ok(())
}

pub async fn do_update(
ctx: Arc<QueryContext>,
table: Arc<dyn Table>,
plan: UpdatePlan,
) -> Result<()> {
let eb = ExpressionBuilderWithoutRenaming::create(plan.metadata.clone());
let (filter, col_indices) = if let Some(scalar) = &plan.selection {
(
Some(eb.build(scalar)?),
scalar.used_columns().into_iter().collect(),
)
} else {
(None, vec![])
};
let update_list = plan.update_list.iter().try_fold(
Vec::with_capacity(plan.update_list.len()),
|mut acc, (id, scalar)| {
let expr = eb.build(scalar)?;
acc.push((*id, expr));
Ok::<_, ErrorCode>(acc)
},
)?;

let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let settings = ctx.get_settings();
let mut pipeline = common_pipeline_core::Pipeline::create();
fuse_table
.update(ctx.clone(), filter, col_indices, update_list, &mut pipeline)
.await?;
if !pipeline.pipes.is_empty() {
pipeline.set_max_threads(settings.get_max_threads()? as usize);
let query_id = ctx.get_id();
let executor_settings = ExecutorSettings::try_create(&settings, query_id)?;
let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?;
ctx.set_executor(Arc::downgrade(&executor.get_inner()));
executor.execute()?;
drop(executor);
}
Ok(())
}
1 change: 1 addition & 0 deletions src/query/sql/src/planner/binder/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ impl<'a> Binder {
database: database_name,
table: table_name,
table_id,
metadata: self.metadata.clone(),
update_list: update_columns,
selection: push_downs,
};
Expand Down
11 changes: 11 additions & 0 deletions src/query/sql/src/planner/plans/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,28 @@
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use common_datavalues::DataSchema;
use common_datavalues::DataSchemaRef;
use common_meta_types::MetaId;

use crate::plans::Scalar;
use crate::MetadataRef;

#[derive(Clone, Debug)]
pub struct UpdatePlan {
pub catalog: String,
pub database: String,
pub table: String,
pub table_id: MetaId,
pub metadata: MetadataRef,
pub update_list: HashMap<usize, Scalar>,
pub selection: Option<Scalar>,
}

impl UpdatePlan {
pub fn schema(&self) -> DataSchemaRef {
Arc::new(DataSchema::empty())
}
}
Loading