Skip to content

Commit

Permalink
feat: distributed execution of compact statement (#12750)
Browse files Browse the repository at this point in the history
* compact distribute

* remove unused codes

* update test case

* add sqllogic test

* fix test

---------

Co-authored-by: dantengsky <dantengsky@gmail.com>
  • Loading branch information
zhyass and dantengsky authored Sep 18, 2023
1 parent 8ca74fe commit 7495817
Show file tree
Hide file tree
Showing 46 changed files with 1,434 additions and 1,105 deletions.
23 changes: 18 additions & 5 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use common_meta_types::MetaId;
use common_pipeline_core::Pipeline;
use common_storage::StorageMetrics;
use storages_common_table_meta::meta::SnapshotId;
use storages_common_table_meta::meta::TableSnapshot;

use crate::plan::DataSourceInfo;
use crate::plan::DataSourcePlan;
Expand Down Expand Up @@ -300,16 +301,28 @@ pub trait Table: Sync + Send {
unimplemented!()
}

// return false if the table does not need to be compacted.
#[async_backtrace::framed]
async fn compact(
async fn compact_segments(
&self,
ctx: Arc<dyn TableContext>,
target: CompactTarget,
limit: Option<usize>,
pipeline: &mut Pipeline,
) -> Result<()> {
let (_, _, _, _) = (ctx, target, limit, pipeline);
let (_, _) = (ctx, limit);

Err(ErrorCode::Unimplemented(format!(
"table {}, of engine type {}, does not support compact segments",
self.name(),
self.get_table_info().engine(),
)))
}

#[async_backtrace::framed]
async fn compact_blocks(
&self,
ctx: Arc<dyn TableContext>,
limit: Option<usize>,
) -> Result<Option<(Partitions, Arc<TableSnapshot>)>> {
let (_, _) = (ctx, limit);

Err(ErrorCode::Unimplemented(format!(
"table {}, of engine type {}, does not support compact",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use common_pipeline_core::processors::port::InputPort;
use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::processors::processor::Event;
use common_pipeline_core::processors::Processor;

#[async_trait::async_trait]
pub trait AsyncAccumulatingTransform: Send {
const NAME: &'static str;
Expand Down
11 changes: 7 additions & 4 deletions src/query/service/src/interpreters/interpreter_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ use crate::schedulers::build_query_pipeline;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
use crate::sessions::TableContext;
use crate::sql::executor::FinalCommit;
use crate::sql::executor::CommitSink;
use crate::sql::executor::MutationKind;
use crate::sql::plans::DeletePlan;
use crate::stream::PullingExecutorStream;

Expand Down Expand Up @@ -275,7 +276,7 @@ impl DeleteInterpreter {
partitions: Partitions,
table_info: TableInfo,
col_indices: Vec<usize>,
snapshot: TableSnapshot,
snapshot: Arc<TableSnapshot>,
catalog_info: CatalogInfo,
is_distributed: bool,
query_row_id_col: bool,
Expand All @@ -300,12 +301,14 @@ impl DeleteInterpreter {
});
}

Ok(PhysicalPlan::FinalCommit(Box::new(FinalCommit {
Ok(PhysicalPlan::CommitSink(CommitSink {
input: Box::new(root),
snapshot,
table_info,
catalog_info,
})))
mutation_kind: MutationKind::Delete,
merge_meta: true,
}))
}
}

Expand Down
9 changes: 5 additions & 4 deletions src/query/service/src/interpreters/interpreter_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use common_expression::FieldIndex;
use common_expression::RemoteExpr;
use common_functions::BUILTIN_FUNCTIONS;
use common_meta_app::schema::TableInfo;
use common_sql::executor::CommitSink;
use common_sql::executor::MergeInto;
use common_sql::executor::MergeIntoSource;
use common_sql::executor::MutationAggregate;
use common_sql::executor::MutationKind;
use common_sql::executor::PhysicalPlan;
use common_sql::executor::PhysicalPlanBuilder;
Expand Down Expand Up @@ -297,14 +297,15 @@ impl MergeIntoInterpreter {
});

// build mutation_aggregate
let physical_plan = PhysicalPlan::MutationAggregate(Box::new(MutationAggregate {
let physical_plan = PhysicalPlan::CommitSink(CommitSink {
input: Box::new(merge_into),
snapshot: (*base_snapshot).clone(),
snapshot: base_snapshot,
table_info: table_info.clone(),
catalog_info: catalog_.info(),
// let's use update first, we will do some optimizeations and select exact strategy
mutation_kind: MutationKind::Update,
}));
merge_meta: false,
});

Ok((physical_plan, table_info.clone()))
}
Expand Down
19 changes: 9 additions & 10 deletions src/query/service/src/interpreters/interpreter_replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use common_exception::Result;
use common_expression::DataSchemaRef;
use common_meta_app::principal::StageInfo;
use common_sql::executor::AsyncSourcerPlan;
use common_sql::executor::CommitSink;
use common_sql::executor::Deduplicate;
use common_sql::executor::Exchange;
use common_sql::executor::MutationAggregate;
use common_sql::executor::MutationKind;
use common_sql::executor::OnConflictField;
use common_sql::executor::PhysicalPlan;
Expand Down Expand Up @@ -238,15 +238,14 @@ impl ReplaceInterpreter {
ignore_exchange: false,
}));
}
root = Box::new(PhysicalPlan::MutationAggregate(Box::new(
MutationAggregate {
input: root,
snapshot: (*base_snapshot).clone(),
table_info: table_info.clone(),
catalog_info: catalog.info(),
mutation_kind: MutationKind::Replace,
},
)));
root = Box::new(PhysicalPlan::CommitSink(CommitSink {
input: root,
snapshot: base_snapshot,
table_info: table_info.clone(),
catalog_info: catalog.info(),
mutation_kind: MutationKind::Replace,
merge_meta: false,
}));
Ok((root, purge_info))
}

Expand Down
88 changes: 77 additions & 11 deletions src/query/service/src/interpreters/interpreter_table_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,31 @@ use std::sync::Arc;
use std::time::SystemTime;

use common_base::runtime::GlobalIORuntime;
use common_catalog::plan::Partitions;
use common_catalog::table::CompactTarget;
use common_catalog::table::TableExt;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_app::schema::CatalogInfo;
use common_meta_app::schema::TableInfo;
use common_pipeline_core::Pipeline;
use common_sql::executor::CommitSink;
use common_sql::executor::CompactPartial;
use common_sql::executor::Exchange;
use common_sql::executor::FragmentKind;
use common_sql::executor::MutationKind;
use common_sql::executor::PhysicalPlan;
use common_sql::plans::OptimizeTableAction;
use common_sql::plans::OptimizeTablePlan;
use common_storages_factory::NavigationPoint;
use storages_common_table_meta::meta::TableSnapshot;

use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterClusteringHistory;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::PipelineBuildResult;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
use crate::sessions::TableContext;

Expand Down Expand Up @@ -71,6 +82,41 @@ impl Interpreter for OptimizeTableInterpreter {
}

impl OptimizeTableInterpreter {
pub fn build_physical_plan(
parts: Partitions,
table_info: TableInfo,
snapshot: Arc<TableSnapshot>,
catalog_info: CatalogInfo,
is_distributed: bool,
) -> Result<PhysicalPlan> {
let merge_meta = parts.is_lazy;
let mut root = PhysicalPlan::CompactPartial(CompactPartial {
parts,
table_info: table_info.clone(),
catalog_info: catalog_info.clone(),
column_ids: snapshot.schema.to_leaf_column_id_set(),
});

if is_distributed {
root = PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(root),
kind: FragmentKind::Merge,
keys: vec![],
ignore_exchange: false,
});
}

Ok(PhysicalPlan::CommitSink(CommitSink {
input: Box::new(root),
table_info,
catalog_info,
snapshot,
mutation_kind: MutationKind::Compact,
merge_meta,
}))
}

async fn build_pipeline(
&self,
target: CompactTarget,
Expand All @@ -80,13 +126,12 @@ impl OptimizeTableInterpreter {
.ctx
.get_table(&self.plan.catalog, &self.plan.database, &self.plan.table)
.await?;
let need_recluster = !table.cluster_keys(self.ctx.clone()).is_empty()
&& matches!(target, CompactTarget::Blocks);

let table_info = table.get_table_info().clone();
// check if the table is locked.
let catalog = self.ctx.get_catalog(&self.plan.catalog).await?;
let reply = catalog
.list_table_lock_revs(table.get_table_info().ident.table_id)
.list_table_lock_revs(table_info.ident.table_id)
.await?;
if !reply.is_empty() {
return Err(ErrorCode::TableAlreadyLocked(format!(
Expand All @@ -95,19 +140,40 @@ impl OptimizeTableInterpreter {
)));
}

let mut compact_pipeline = Pipeline::create();
table
.compact(
self.ctx.clone(),
target,
self.plan.limit,
&mut compact_pipeline,
)
if matches!(target, CompactTarget::Segments) {
table
.compact_segments(self.ctx.clone(), self.plan.limit)
.await?;
return Ok(PipelineBuildResult::create());
}

let res = table
.compact_blocks(self.ctx.clone(), self.plan.limit)
.await?;

let is_distributed = !self.ctx.get_cluster().is_empty();
let catalog_info = catalog.info();
let mut compact_pipeline = if let Some((parts, snapshot)) = res {
let physical_plan = Self::build_physical_plan(
parts,
table_info,
snapshot,
catalog_info,
is_distributed,
)?;

let build_res =
build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan, false)
.await?;
build_res.main_pipeline
} else {
Pipeline::create()
};

let mut build_res = PipelineBuildResult::create();
let settings = self.ctx.get_settings();
let mut reclustered_block_count = 0;
let need_recluster = !table.cluster_keys(self.ctx.clone()).is_empty();
if need_recluster {
if !compact_pipeline.is_empty() {
compact_pipeline.set_max_threads(settings.get_max_threads()? as usize);
Expand Down
Loading

1 comment on commit 7495817

@vercel
Copy link

@vercel vercel bot commented on 7495817 Sep 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.