Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storge): write progress in compact hook #16901

Merged
merged 2 commits into from
Nov 21, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 32 additions & 29 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,43 +78,50 @@ async fn do_hook_compact(
}

pipeline.set_on_finished(move |info: &ExecutionInfo| {
let compaction_limits = match compact_target.mutation_kind {
MutationKind::Insert => {
let compaction_num_block_hint = ctx.get_compaction_num_block_hint(&compact_target.table);
info!("table {} hint number of blocks need to be compacted {}", compact_target.table, compaction_num_block_hint);
if compaction_num_block_hint == 0 {
return Ok(());
}
CompactionLimits {
segment_limit: None,
block_limit: Some(compaction_num_block_hint as usize),
if info.res.is_ok() {
let op_name = &trace_ctx.operation_name;
metrics_inc_compact_hook_main_operation_time_ms(op_name, trace_ctx.start.elapsed().as_millis() as u64);
info!("execute {op_name} finished successfully. running table optimization job.");

let compact_start_at = Instant::now();
let compaction_limits = match compact_target.mutation_kind {
MutationKind::Insert => {
let compaction_num_block_hint = ctx.get_compaction_num_block_hint(&compact_target.table);
info!("table {} hint number of blocks need to be compacted {}", compact_target.table, compaction_num_block_hint);
if compaction_num_block_hint == 0 {
return Ok(());
}
CompactionLimits {
segment_limit: None,
block_limit: Some(compaction_num_block_hint as usize),
}
}
}
_ => {
let auto_compaction_segments_limit = ctx.get_settings().get_auto_compaction_segments_limit()?;
CompactionLimits {
segment_limit: Some(auto_compaction_segments_limit as usize),
block_limit: None,
_ => {
let auto_compaction_segments_limit = ctx.get_settings().get_auto_compaction_segments_limit()?;
CompactionLimits {
segment_limit: Some(auto_compaction_segments_limit as usize),
block_limit: None,
}
}
}
};
};

let op_name = &trace_ctx.operation_name;
metrics_inc_compact_hook_main_operation_time_ms(op_name, trace_ctx.start.elapsed().as_millis() as u64);
// keep the original progress value
let progress = ctx.get_write_progress();
let progress_value = progress.as_ref().get_values();

let compact_start_at = Instant::now();
if info.res.is_ok() {
info!("execute {op_name} finished successfully. running table optimization job.");
match GlobalIORuntime::instance().block_on({
compact_table(ctx, compact_target, compaction_limits, lock_opt)
}) {
Ok(_) => {
info!("execute {op_name} finished successfully. table optimization job finished.");
}
Err(e) => { info!("execute {op_name} finished successfully. table optimization job failed. {:?}", e) }
Err(e) => { info!("execute {op_name} finished successfully. table optimization job failed. {:?}", e); }
}

// reset the progress value
progress.set(&progress_value);
metrics_inc_compact_hook_compact_time_ms(&trace_ctx.operation_name, compact_start_at.elapsed().as_millis() as u64);
}
metrics_inc_compact_hook_compact_time_ms(&trace_ctx.operation_name, compact_start_at.elapsed().as_millis() as u64);

Ok(())
});
Expand All @@ -139,8 +146,6 @@ async fn compact_table(
)
.await?;
let settings = ctx.get_settings();
// keep the original progress value
let progress_value = ctx.get_write_progress_value();

let do_recluster = !table.cluster_keys(ctx.clone()).is_empty();
let do_compact = compaction_limits.block_limit.is_some() || !do_recluster;
Expand Down Expand Up @@ -203,7 +208,5 @@ async fn compact_table(
assert!(build_res.main_pipeline.is_empty());
}

// reset the progress value
ctx.get_write_progress().set(&progress_value);
Ok(())
}
Loading