Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add a threshold for compact block #8322

Merged
merged 8 commits into from
Oct 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/query/ast/src/ast/format/ast_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1349,7 +1349,7 @@ impl<'ast> Visitor<'ast> for AstFormatVisitor {
let mut children = Vec::new();
self.visit_table_ref(&stmt.catalog, &stmt.database, &stmt.table);
children.push(self.children.pop().unwrap());
if let Some(action) = stmt.action {
if let Some(action) = &stmt.action {
let action_name = format!("Action {}", action);
let action_format_ctx = AstFormatContext::new(action_name);
children.push(FormatTreeNode::new(action_format_ctx));
Expand Down
33 changes: 21 additions & 12 deletions src/query/ast/src/ast/statements/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ pub struct OptimizeTableStmt<'a> {
pub catalog: Option<Identifier<'a>>,
pub database: Option<Identifier<'a>>,
pub table: Identifier<'a>,
pub action: Option<OptimizeTableAction>,
pub action: Option<OptimizeTableAction<'a>>,
}

impl Display for OptimizeTableStmt<'_> {
Expand Down Expand Up @@ -444,32 +444,41 @@ impl Display for Engine {
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CompactTarget {
Block,
Segment,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OptimizeTableAction {
#[derive(Debug, Clone, PartialEq)]
pub enum OptimizeTableAction<'a> {
All,
Purge,
Compact(CompactTarget),
Compact {
target: CompactTarget,
limit: Option<Expr<'a>>,
},
}

impl Display for OptimizeTableAction {
impl<'a> Display for OptimizeTableAction<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OptimizeTableAction::All => write!(f, "ALL"),
OptimizeTableAction::Purge => write!(f, "PURGE"),
OptimizeTableAction::Compact(action) => match action {
CompactTarget::Block => {
write!(f, "COMPACT BLOCK")
OptimizeTableAction::Compact { target, limit } => {
match target {
CompactTarget::Block => {
write!(f, "COMPACT BLOCK")?;
}
CompactTarget::Segment => {
write!(f, "COMPACT SEGMENT")?;
}
}
CompactTarget::Segment => {
write!(f, "COMPACT SEGMENT")
if let Some(limit) = limit {
write!(f, " LIMIT {limit}")?;
}
},
Ok(())
}
}
}
}
Expand Down
13 changes: 6 additions & 7 deletions src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1344,13 +1344,12 @@ pub fn optimize_table_action(i: Input) -> IResult<OptimizeTableAction> {
alt((
value(OptimizeTableAction::All, rule! { ALL }),
value(OptimizeTableAction::Purge, rule! { PURGE }),
value(
OptimizeTableAction::Compact(CompactTarget::Segment),
rule! { COMPACT ~ SEGMENT},
),
value(
OptimizeTableAction::Compact(CompactTarget::Block),
rule! { COMPACT},
map(
rule! { COMPACT ~ (SEGMENT)? ~ ( LIMIT ~ ^#expr )?},
|(_, opt_segment, opt_limit)| OptimizeTableAction::Compact {
target: opt_segment.map_or(CompactTarget::Block, |_| CompactTarget::Segment),
limit: opt_limit.map(|(_, limit)| limit),
},
),
))(i)
}
Expand Down
3 changes: 2 additions & 1 deletion src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,10 @@ pub trait Table: Sync + Send {
&self,
ctx: Arc<dyn TableContext>,
target: CompactTarget,
limit: Option<usize>,
pipeline: &mut Pipeline,
) -> Result<Option<Box<dyn TableMutator>>> {
let (_, _, _) = (ctx, target, pipeline);
let (_, _, _, _) = (ctx, target, limit, pipeline);

Err(ErrorCode::UnImplement(format!(
"table {}, of engine type {}, does not support compact",
Expand Down
4 changes: 2 additions & 2 deletions src/query/planner/src/plans/optimize_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ impl OptimizeTablePlan {
pub enum OptimizeTableAction {
All,
Purge,
CompactBlocks,
CompactSegments,
CompactBlocks(Option<usize>),
CompactSegments(Option<usize>),
}
19 changes: 15 additions & 4 deletions src/query/service/src/interpreters/interpreter_table_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,26 @@ impl Interpreter for OptimizeTableInterpreter {
);
let do_compact_blocks = matches!(
action,
OptimizeTableAction::CompactBlocks | OptimizeTableAction::All
OptimizeTableAction::CompactBlocks(_) | OptimizeTableAction::All
);

let do_compact_segments_only = matches!(action, OptimizeTableAction::CompactSegments);
let do_compact_segments_only = matches!(action, OptimizeTableAction::CompactSegments(_));

let limit_opt = match action {
OptimizeTableAction::CompactBlocks(limit_opt) => *limit_opt,
OptimizeTableAction::CompactSegments(limit_opt) => *limit_opt,
_ => None,
};

if do_compact_segments_only {
let mut pipeline = Pipeline::create();
if let Some(mutator) = table
.compact(ctx.clone(), CompactTarget::Segments, &mut pipeline)
.compact(
ctx.clone(),
CompactTarget::Segments,
limit_opt,
&mut pipeline,
)
.await?
{
mutator.try_commit(table).await?;
Expand All @@ -78,7 +89,7 @@ impl Interpreter for OptimizeTableInterpreter {
if do_compact_blocks {
let mut pipeline = Pipeline::create();
let mutator = table
.compact(ctx.clone(), CompactTarget::Blocks, &mut pipeline)
.compact(ctx.clone(), CompactTarget::Blocks, limit_opt, &mut pipeline)
.await?;

if let Some(mutator) = mutator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ async fn build_mutator(
settings.set_max_threads(1)?;
let mut pipeline = common_pipeline_core::Pipeline::create();
let mutator = fuse_table
.compact(ctx.clone(), CompactTarget::Blocks, &mut pipeline)
.compact(ctx.clone(), CompactTarget::Blocks, None, &mut pipeline)
.await?;
assert!(mutator.is_some());
let mutator = mutator.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async fn test_compact_segment_normal_case() -> Result<()> {
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let mut pipeline = common_pipeline_core::Pipeline::create();
let mutator = fuse_table
.compact(ctx.clone(), CompactTarget::Segments, &mut pipeline)
.compact(ctx.clone(), CompactTarget::Segments, None, &mut pipeline)
.await?;
assert!(mutator.is_some());
let mutator = mutator.unwrap();
Expand Down Expand Up @@ -102,7 +102,7 @@ async fn test_compact_segment_resolvable_conflict() -> Result<()> {
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let mut pipeline = common_pipeline_core::Pipeline::create();
let mutator = fuse_table
.compact(ctx.clone(), CompactTarget::Segments, &mut pipeline)
.compact(ctx.clone(), CompactTarget::Segments, None, &mut pipeline)
.await?;
assert!(mutator.is_some());
let mutator = mutator.unwrap();
Expand Down Expand Up @@ -165,7 +165,7 @@ async fn test_compact_segment_unresolvable_conflict() -> Result<()> {
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let mut pipeline = common_pipeline_core::Pipeline::create();
let mutator = fuse_table
.compact(ctx.clone(), CompactTarget::Segments, &mut pipeline)
.compact(ctx.clone(), CompactTarget::Segments, None, &mut pipeline)
.await?;
assert!(mutator.is_some());
let mutator = mutator.unwrap();
Expand Down Expand Up @@ -200,7 +200,7 @@ async fn compact_segment(ctx: Arc<QueryContext>, table: &Arc<dyn Table>) -> Resu
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let mut pipeline = common_pipeline_core::Pipeline::create();
let mutator = fuse_table
.compact(ctx, CompactTarget::Segments, &mut pipeline)
.compact(ctx, CompactTarget::Segments, None, &mut pipeline)
.await?
.unwrap();
mutator.try_commit(table.clone()).await
Expand Down
46 changes: 0 additions & 46 deletions src/query/service/tests/it/storages/fuse/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,11 @@

use common_base::base::tokio;
use common_exception::Result;
use futures::TryStreamExt;

use crate::storages::fuse::table_test_fixture::append_sample_data;
use crate::storages::fuse::table_test_fixture::append_sample_data_overwrite;
use crate::storages::fuse::table_test_fixture::check_data_dir;
use crate::storages::fuse::table_test_fixture::execute_command;
use crate::storages::fuse::table_test_fixture::execute_query;
use crate::storages::fuse::table_test_fixture::expects_ok;
use crate::storages::fuse::table_test_fixture::history_should_have_only_one_item;
use crate::storages::fuse::table_test_fixture::TestFixture;

Expand Down Expand Up @@ -66,46 +63,3 @@ async fn do_insertions(fixture: &TestFixture) -> Result<()> {
append_sample_data_overwrite(1, true, fixture).await?;
Ok(())
}

#[tokio::test]
async fn test_fuse_snapshot_optimize_compact() -> Result<()> {
let fixture = TestFixture::new().await;
let db = fixture.default_db_name();
let tbl = fixture.default_table_name();
fixture.create_default_table().await?;

// insert 5 blocks
let n = 5;
for _ in 0..n {
let table = fixture.latest_default_table().await?;
let num_blocks = 1;
let stream = TestFixture::gen_sample_blocks_stream(num_blocks, 1);

let blocks = stream.try_collect().await?;
fixture
.append_commit_blocks(table.clone(), blocks, false, true)
.await?;
}

// optimize compact
let qry = format!("optimize table {}.{} compact", db, tbl);
execute_command(fixture.ctx(), qry.as_str()).await?;

// optimize compact should keep the histories
// there should be 6 history items there, 5 for the above insertions, 1 for that compaction
let expected = vec![
"+----------+",
"| count(*) |",
"+----------+",
"| 6 |",
"+----------+",
];
let qry = format!("select count(*) from fuse_snapshot('{}', '{}')", db, tbl);

expects_ok(
"count_should_be_1",
execute_query(fixture.ctx(), qry.as_str()).await,
expected,
)
.await
}
32 changes: 24 additions & 8 deletions src/query/sql/src/planner/binder/ddl/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -778,14 +778,30 @@ impl<'a> Binder {
.map(|ident| normalize_identifier(ident, &self.name_resolution_ctx).name)
.unwrap_or_else(|| self.ctx.get_current_database());
let table = normalize_identifier(table, &self.name_resolution_ctx).name;
let action = action.map_or(OptimizeTableAction::Purge, |v| match v {
AstOptimizeTableAction::All => OptimizeTableAction::All,
AstOptimizeTableAction::Purge => OptimizeTableAction::Purge,
AstOptimizeTableAction::Compact(target) => match target {
CompactTarget::Block => OptimizeTableAction::CompactBlocks,
CompactTarget::Segment => OptimizeTableAction::CompactSegments,
},
});
let action = if let Some(ast_action) = action {
match ast_action {
AstOptimizeTableAction::All => OptimizeTableAction::All,
AstOptimizeTableAction::Purge => OptimizeTableAction::Purge,
AstOptimizeTableAction::Compact { target, limit } => {
let limit_cnt = match limit {
Some(Expr::Literal {
lit: Literal::Integer(uint),
..
}) => Some(*uint as usize),
Some(_) => {
return Err(ErrorCode::IllegalDataType("Unsupported limit type"));
}
_ => None,
};
match target {
CompactTarget::Block => OptimizeTableAction::CompactBlocks(limit_cnt),
CompactTarget::Segment => OptimizeTableAction::CompactSegments(limit_cnt),
}
}
}
} else {
OptimizeTableAction::Purge
};

Ok(Plan::OptimizeTable(Box::new(OptimizeTablePlan {
catalog,
Expand Down
3 changes: 2 additions & 1 deletion src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,9 +456,10 @@ impl Table for FuseTable {
&self,
ctx: Arc<dyn TableContext>,
target: CompactTarget,
limit: Option<usize>,
pipeline: &mut Pipeline,
) -> Result<Option<Box<dyn TableMutator>>> {
self.do_compact(ctx, target, pipeline).await
self.do_compact(ctx, target, limit, pipeline).await
}

async fn recluster(
Expand Down
16 changes: 9 additions & 7 deletions src/query/storages/fuse/src/operations/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,19 @@ use crate::TableMutator;
use crate::DEFAULT_BLOCK_PER_SEGMENT;
use crate::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;

struct CompactOptions {
base_snapshot: Arc<TableSnapshot>,
block_per_seg: usize,
pub struct CompactOptions {
// the snapshot that compactor working on, it never changed during phases compaction.
pub base_snapshot: Arc<TableSnapshot>,
pub block_per_seg: usize,
pub limit: Option<usize>,
}

impl FuseTable {
pub(crate) async fn do_compact(
&self,
ctx: Arc<dyn TableContext>,
target: CompactTarget,
limit: Option<usize>,
pipeline: &mut Pipeline,
) -> Result<Option<Box<dyn TableMutator>>> {
let snapshot_opt = self.read_table_snapshot(ctx.clone()).await?;
Expand All @@ -63,6 +66,7 @@ impl FuseTable {
let compact_params = CompactOptions {
base_snapshot,
block_per_seg,
limit,
};

match target {
Expand All @@ -79,9 +83,8 @@ impl FuseTable {
) -> Result<Option<Box<dyn TableMutator>>> {
let mut segment_mutator = SegmentCompactMutator::try_create(
ctx.clone(),
options.base_snapshot,
options,
self.meta_location_generator().clone(),
options.block_per_seg,
self.operator.clone(),
)?;

Expand All @@ -103,10 +106,9 @@ impl FuseTable {
let block_per_seg = options.block_per_seg;
let mut mutator = FullCompactMutator::try_create(
ctx.clone(),
options.base_snapshot,
options,
block_compactor.clone(),
self.meta_location_generator().clone(),
block_per_seg,
self.cluster_key_meta.is_some(),
self.operator.clone(),
)?;
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ mod truncate;

pub mod util;

pub(crate) use compact::CompactOptions;
pub use fuse_sink::FuseTableSink;
pub use mutation::delete_from_block;
pub use mutation::DeletionMutator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use opendal::Operator;

use crate::io::Files;

#[derive(Default, Clone, Debug)]
#[derive(Default)]
pub struct AbortOperation {
pub segments: Vec<String>,
pub blocks: Vec<String>,
Expand Down
Loading