Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storge): write progress in compact hook #16901

Merged
merged 2 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 32 additions & 29 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,43 +78,50 @@ async fn do_hook_compact(
}

pipeline.set_on_finished(move |info: &ExecutionInfo| {
let compaction_limits = match compact_target.mutation_kind {
MutationKind::Insert => {
let compaction_num_block_hint = ctx.get_compaction_num_block_hint(&compact_target.table);
info!("table {} hint number of blocks need to be compacted {}", compact_target.table, compaction_num_block_hint);
if compaction_num_block_hint == 0 {
return Ok(());
}
CompactionLimits {
segment_limit: None,
block_limit: Some(compaction_num_block_hint as usize),
if info.res.is_ok() {
let op_name = &trace_ctx.operation_name;
metrics_inc_compact_hook_main_operation_time_ms(op_name, trace_ctx.start.elapsed().as_millis() as u64);
info!("execute {op_name} finished successfully. running table optimization job.");

let compact_start_at = Instant::now();
let compaction_limits = match compact_target.mutation_kind {
MutationKind::Insert => {
let compaction_num_block_hint = ctx.get_compaction_num_block_hint(&compact_target.table);
info!("table {} hint number of blocks need to be compacted {}", compact_target.table, compaction_num_block_hint);
if compaction_num_block_hint == 0 {
return Ok(());
}
CompactionLimits {
segment_limit: None,
block_limit: Some(compaction_num_block_hint as usize),
}
}
}
_ => {
let auto_compaction_segments_limit = ctx.get_settings().get_auto_compaction_segments_limit()?;
CompactionLimits {
segment_limit: Some(auto_compaction_segments_limit as usize),
block_limit: None,
_ => {
let auto_compaction_segments_limit = ctx.get_settings().get_auto_compaction_segments_limit()?;
CompactionLimits {
segment_limit: Some(auto_compaction_segments_limit as usize),
block_limit: None,
}
}
}
};
};

let op_name = &trace_ctx.operation_name;
metrics_inc_compact_hook_main_operation_time_ms(op_name, trace_ctx.start.elapsed().as_millis() as u64);
// keep the original progress value
let progress = ctx.get_write_progress();
let progress_value = progress.as_ref().get_values();

let compact_start_at = Instant::now();
if info.res.is_ok() {
info!("execute {op_name} finished successfully. running table optimization job.");
match GlobalIORuntime::instance().block_on({
compact_table(ctx, compact_target, compaction_limits, lock_opt)
}) {
Ok(_) => {
info!("execute {op_name} finished successfully. table optimization job finished.");
}
Err(e) => { info!("execute {op_name} finished successfully. table optimization job failed. {:?}", e) }
Err(e) => { info!("execute {op_name} finished successfully. table optimization job failed. {:?}", e); }
}

// reset the progress value
progress.set(&progress_value);
metrics_inc_compact_hook_compact_time_ms(&trace_ctx.operation_name, compact_start_at.elapsed().as_millis() as u64);
}
metrics_inc_compact_hook_compact_time_ms(&trace_ctx.operation_name, compact_start_at.elapsed().as_millis() as u64);

Ok(())
});
Expand All @@ -139,8 +146,6 @@ async fn compact_table(
)
.await?;
let settings = ctx.get_settings();
// keep the original progress value
let progress_value = ctx.get_write_progress_value();

let do_recluster = !table.cluster_keys(ctx.clone()).is_empty();
let do_compact = compaction_limits.block_limit.is_some() || !do_recluster;
Expand Down Expand Up @@ -203,7 +208,5 @@ async fn compact_table(
assert!(build_res.main_pipeline.is_empty());
}

// reset the progress value
ctx.get_write_progress().set(&progress_value);
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,19 @@ insert into t1 values(3),(5),(8);
statement ok
insert into t1 values(4),(6);

query III
select segment_count, block_count, row_count from fuse_snapshot('i15760', 't1') limit 10;
query II
select segment_count, row_count from fuse_snapshot('i15760', 't1') limit 10;
----
1 1 8
1 2 8
3 3 8
2 2 6
1 1 3
1 8
1 8
3 8
2 6
1 3

query F
select average_depth from clustering_information('i15760', 't1')
----
1.0

statement ok
drop table t1 all;
Loading