Skip to content

Commit

Permalink
fix: ensure files can only be picked once (apache#995)
Browse files Browse the repository at this point in the history
## Rationale
In current design, sst files may be picked multiple times.

## Detailed Changes
- Mark files as in compacting when pick files candidates, and reset it
to false when CompactionTask is dropped.

## Test Plan
Manually
  • Loading branch information
jiacai2050 authored and MichaelLeeHZ committed Jun 21, 2023
1 parent f7f169d commit 6628639
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 83 deletions.
108 changes: 72 additions & 36 deletions analytic_engine/src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,13 +318,26 @@ pub struct ExpiredFiles {

#[derive(Default, Clone)]
pub struct CompactionTask {
pub compaction_inputs: Vec<CompactionInputFiles>,
pub expired: Vec<ExpiredFiles>,
inputs: Vec<CompactionInputFiles>,
expired: Vec<ExpiredFiles>,
}

impl Drop for CompactionTask {
fn drop(&mut self) {
// When a CompactionTask is dropped, it means
// 1. the task finished successfully, or
// 2. the task is cancelled for some reason, like memory limit
//
// In case 2, we need to mark files as not compacted in order for them to be
// scheduled again. In case 1, the files will be moved out of level controller,
// so it doesn't care what the flag is, so it's safe to set false here.
self.mark_files_being_compacted(false);
}
}

impl CompactionTask {
pub fn mark_files_being_compacted(&self, being_compacted: bool) {
for input in &self.compaction_inputs {
fn mark_files_being_compacted(&self, being_compacted: bool) {
for input in &self.inputs {
for file in &input.files {
file.set_being_compacted(being_compacted);
}
Expand All @@ -337,29 +350,76 @@ impl CompactionTask {
}

// Estimate the size of the total input files.
#[inline]
pub fn estimated_total_input_file_size(&self) -> usize {
let total_input_size: u64 = self
.compaction_inputs
.inputs
.iter()
.map(|v| v.files.iter().map(|f| f.size()).sum::<u64>())
.sum();

total_input_size as usize
}

#[inline]
pub fn num_compact_files(&self) -> usize {
self.compaction_inputs.iter().map(|v| v.files.len()).sum()
self.inputs.iter().map(|v| v.files.len()).sum()
}

pub fn num_expired_files(&self) -> usize {
self.expired.iter().map(|v| v.files.len()).sum()
#[inline]
pub fn is_empty(&self) -> bool {
self.is_input_empty() && self.expired.is_empty()
}

#[inline]
pub fn is_input_empty(&self) -> bool {
self.inputs.is_empty()
}

#[inline]
pub fn expired(&self) -> &[ExpiredFiles] {
&self.expired
}

#[inline]
pub fn inputs(&self) -> &[CompactionInputFiles] {
&self.inputs
}
}

pub struct CompactionTaskBuilder {
expired: Vec<ExpiredFiles>,
inputs: Vec<CompactionInputFiles>,
}

impl CompactionTaskBuilder {
pub fn with_expired(expired: Vec<ExpiredFiles>) -> Self {
Self {
expired,
inputs: Vec::new(),
}
}

pub fn add_inputs(&mut self, files: CompactionInputFiles) {
self.inputs.push(files);
}

pub fn build(self) -> CompactionTask {
let task = CompactionTask {
expired: self.expired,
inputs: self.inputs,
};

task.mark_files_being_compacted(true);

task
}
}

impl fmt::Debug for CompactionTask {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CompactionTask")
.field("inputs", &self.compaction_inputs)
.field("inputs", &self.inputs)
.field(
"expired",
&self
Expand All @@ -380,36 +440,12 @@ impl fmt::Debug for CompactionTask {
}
}

pub struct PickerManager {
default_picker: CompactionPickerRef,
time_window_picker: CompactionPickerRef,
size_tiered_picker: CompactionPickerRef,
}

impl Default for PickerManager {
fn default() -> Self {
let size_tiered_picker = Arc::new(CommonCompactionPicker::new(
CompactionStrategy::SizeTiered(SizeTieredCompactionOptions::default()),
));
let time_window_picker = Arc::new(CommonCompactionPicker::new(
CompactionStrategy::TimeWindow(TimeWindowCompactionOptions::default()),
));

Self {
default_picker: time_window_picker.clone(),
size_tiered_picker,
time_window_picker,
}
}
}
#[derive(Default)]
pub struct PickerManager;

impl PickerManager {
pub fn get_picker(&self, strategy: CompactionStrategy) -> CompactionPickerRef {
match strategy {
CompactionStrategy::Default => self.default_picker.clone(),
CompactionStrategy::SizeTiered(_) => self.size_tiered_picker.clone(),
CompactionStrategy::TimeWindow(_) => self.time_window_picker.clone(),
}
Arc::new(CommonCompactionPicker::new(strategy))
}
}

Expand Down
64 changes: 31 additions & 33 deletions analytic_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use snafu::Snafu;

use crate::{
compaction::{
CompactionInputFiles, CompactionStrategy, CompactionTask, SizeTieredCompactionOptions,
TimeWindowCompactionOptions,
CompactionInputFiles, CompactionStrategy, CompactionTask, CompactionTaskBuilder,
SizeTieredCompactionOptions, TimeWindowCompactionOptions,
},
sst::{
file::{FileHandle, Level},
Expand Down Expand Up @@ -60,7 +60,7 @@ pub trait CompactionPicker {
fn pick_compaction(
&self,
ctx: PickerContext,
levels_controller: &LevelsController,
levels_controller: &mut LevelsController,
) -> Result<CompactionTask>;
}

Expand All @@ -86,10 +86,10 @@ pub struct CommonCompactionPicker {
impl CommonCompactionPicker {
pub fn new(strategy: CompactionStrategy) -> Self {
let level_picker: LevelPickerRef = match strategy {
CompactionStrategy::SizeTiered(_) | CompactionStrategy::Default => {
Arc::new(SizeTieredPicker::default())
CompactionStrategy::SizeTiered(_) => Arc::new(SizeTieredPicker::default()),
CompactionStrategy::TimeWindow(_) | CompactionStrategy::Default => {
Arc::new(TimeWindowPicker::default())
}
CompactionStrategy::TimeWindow(_) => Arc::new(TimeWindowPicker::default()),
};
Self { level_picker }
}
Expand Down Expand Up @@ -123,13 +123,11 @@ impl CompactionPicker for CommonCompactionPicker {
fn pick_compaction(
&self,
ctx: PickerContext,
levels_controller: &LevelsController,
levels_controller: &mut LevelsController,
) -> Result<CompactionTask> {
let expire_time = ctx.ttl.map(Timestamp::expire_time);
let mut compaction_task = CompactionTask {
expired: levels_controller.expired_ssts(expire_time),
..Default::default()
};
let mut builder =
CompactionTaskBuilder::with_expired(levels_controller.expired_ssts(expire_time));

if let Some(input_files) =
self.pick_compact_candidates(&ctx, levels_controller, expire_time)
Expand All @@ -139,10 +137,10 @@ impl CompactionPicker for CommonCompactionPicker {
ctx.strategy, input_files
);

compaction_task.compaction_inputs = vec![input_files];
builder.add_inputs(input_files);
}

Ok(compaction_task)
Ok(builder.build())
}
}

Expand Down Expand Up @@ -734,39 +732,39 @@ mod tests {
};
let now = Timestamp::now();
{
let lc = build_old_bucket_case(now.as_i64());
let task = twp.pick_compaction(ctx.clone(), &lc).unwrap();
assert_eq!(task.compaction_inputs[0].files.len(), 2);
assert_eq!(task.compaction_inputs[0].files[0].id(), 0);
assert_eq!(task.compaction_inputs[0].files[1].id(), 1);
let mut lc = build_old_bucket_case(now.as_i64());
let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap();
assert_eq!(task.inputs[0].files.len(), 2);
assert_eq!(task.inputs[0].files[0].id(), 0);
assert_eq!(task.inputs[0].files[1].id(), 1);
assert_eq!(task.expired[0].files.len(), 1);
assert_eq!(task.expired[0].files[0].id(), 3);
}

{
let lc = build_newest_bucket_case(now.as_i64());
let task = twp.pick_compaction(ctx.clone(), &lc).unwrap();
assert_eq!(task.compaction_inputs[0].files.len(), 4);
assert_eq!(task.compaction_inputs[0].files[0].id(), 2);
assert_eq!(task.compaction_inputs[0].files[1].id(), 3);
assert_eq!(task.compaction_inputs[0].files[2].id(), 4);
assert_eq!(task.compaction_inputs[0].files[3].id(), 5);
let mut lc = build_newest_bucket_case(now.as_i64());
let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap();
assert_eq!(task.inputs[0].files.len(), 4);
assert_eq!(task.inputs[0].files[0].id(), 2);
assert_eq!(task.inputs[0].files[1].id(), 3);
assert_eq!(task.inputs[0].files[2].id(), 4);
assert_eq!(task.inputs[0].files[3].id(), 5);
}

{
let lc = build_newest_bucket_no_match_case(now.as_i64());
let task = twp.pick_compaction(ctx.clone(), &lc).unwrap();
assert_eq!(task.compaction_inputs.len(), 0);
let mut lc = build_newest_bucket_no_match_case(now.as_i64());
let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap();
assert_eq!(task.inputs.len(), 0);
}

// If ttl is None, then no file is expired.
ctx.ttl = None;
{
let lc = build_old_bucket_case(now.as_i64());
let task = twp.pick_compaction(ctx, &lc).unwrap();
assert_eq!(task.compaction_inputs[0].files.len(), 2);
assert_eq!(task.compaction_inputs[0].files[0].id(), 0);
assert_eq!(task.compaction_inputs[0].files[1].id(), 1);
let mut lc = build_old_bucket_case(now.as_i64());
let task = twp.pick_compaction(ctx, &mut lc).unwrap();
assert_eq!(task.inputs[0].files.len(), 2);
assert_eq!(task.inputs[0].files[0].id(), 0);
assert_eq!(task.inputs[0].files[1].id(), 1);
assert!(task.expired[0].files.is_empty());
}
}
Expand Down
10 changes: 2 additions & 8 deletions analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl OngoingTaskLimit {

if dropped > 0 {
warn!(
"Too many compaction pending tasks, limit: {}, dropped {} older tasks.",
"Too many compaction pending tasks, limit:{}, dropped:{}.",
self.max_pending_compaction_tasks, dropped,
);
}
Expand Down Expand Up @@ -462,10 +462,7 @@ impl ScheduleWorker {
waiter_notifier: WaiterNotifier,
token: MemoryUsageToken,
) {
// Mark files being in compaction.
compaction_task.mark_files_being_compacted(true);

let keep_scheduling_compaction = !compaction_task.compaction_inputs.is_empty();
let keep_scheduling_compaction = !compaction_task.is_input_empty();

let runtime = self.runtime.clone();
let space_store = self.space_store.clone();
Expand Down Expand Up @@ -503,9 +500,6 @@ impl ScheduleWorker {
.await;

if let Err(e) = &res {
// Compaction is failed, we need to unset the compaction mark.
compaction_task.mark_files_being_compacted(false);

error!(
"Failed to compact table, table_name:{}, table_id:{}, request_id:{}, err:{}",
table_data.name, table_data.id, request_id, e
Expand Down
9 changes: 5 additions & 4 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,22 +669,23 @@ impl SpaceStore {
"Begin compact table, table_name:{}, id:{}, task:{:?}",
table_data.name, table_data.id, task
);
let inputs = task.inputs();
let mut edit_meta = VersionEditMeta {
space_id: table_data.space_id,
table_id: table_data.id,
flushed_sequence: 0,
// Use the number of compaction inputs as the estimated number of files to add.
files_to_add: Vec::with_capacity(task.compaction_inputs.len()),
files_to_add: Vec::with_capacity(inputs.len()),
files_to_delete: vec![],
mems_to_remove: vec![],
};

if task.num_expired_files() == 0 && task.num_compact_files() == 0 {
if task.is_empty() {
// Nothing to compact.
return Ok(());
}

for files in &task.expired {
for files in task.expired() {
self.delete_expired_files(table_data, request_id, files, &mut edit_meta);
}

Expand All @@ -696,7 +697,7 @@ impl SpaceStore {
task.num_compact_files(),
);

for input in &task.compaction_inputs {
for input in inputs {
self.compact_input_files(
request_id,
table_data,
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/table/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,9 +727,9 @@ impl TableVersion {
picker_ctx: PickerContext,
picker: &CompactionPickerRef,
) -> picker::Result<CompactionTask> {
let inner = self.inner.read().unwrap();
let mut inner = self.inner.write().unwrap();

picker.pick_compaction(picker_ctx, &inner.levels_controller)
picker.pick_compaction(picker_ctx, &mut inner.levels_controller)
}

pub fn has_expired_sst(&self, expire_time: Option<Timestamp>) -> bool {
Expand Down

0 comments on commit 6628639

Please sign in to comment.