Skip to content

Commit

Permalink
feat(meta): refactor compaction scheduler and selector (#7830)
Browse files Browse the repository at this point in the history
Approved-By: Li0k
  • Loading branch information
Little-Wallace authored Feb 13, 2023
1 parent e1164f2 commit d0b90fb
Show file tree
Hide file tree
Showing 18 changed files with 350 additions and 625 deletions.
342 changes: 90 additions & 252 deletions src/meta/src/hummock/compaction/level_selector.rs

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{CompactTask, CompactionConfig, InputLevel, KeyRange, LevelType};

pub use crate::hummock::compaction::level_selector::{
selector_option, DynamicLevelSelector, LevelSelector, ManualCompactionSelector, SelectorOption,
default_level_selector, DynamicLevelSelector, LevelSelector, ManualCompactionSelector,
SpaceReclaimCompactionSelector, TtlCompactionSelector,
};
use crate::hummock::compaction::overlap_strategy::{OverlapStrategy, RangeOverlapStrategy};
use crate::hummock::compaction_group::CompactionGroup;
use crate::hummock::level_handler::LevelHandler;
use crate::rpc::metrics::MetaMetrics;

Expand Down Expand Up @@ -118,14 +119,15 @@ impl CompactStatus {
&mut self,
levels: &Levels,
task_id: HummockCompactionTaskId,
compaction_group_id: CompactionGroupId,
group: &CompactionGroup,
stats: &mut LocalSelectorStatistic,
selector: &mut Box<dyn LevelSelector>,
) -> Option<CompactTask> {
// When we compact the files, we must make the result of compaction meet the following
// conditions, for any user key, the epoch of it in the file existing in the lower
// layer must be larger.
let ret = selector.pick_compaction(task_id, levels, &mut self.level_handlers, stats)?;
let ret =
selector.pick_compaction(task_id, group, levels, &mut self.level_handlers, stats)?;
let target_level_id = ret.input.target_level;

let compression_algorithm = match ret.compression_algorithm.as_str() {
Expand All @@ -145,7 +147,7 @@ impl CompactStatus {
// level.
gc_delete_keys: target_level_id == self.level_handlers.len() - 1,
task_status: TaskStatus::Pending as i32,
compaction_group_id,
compaction_group_id: group.group_id,
existing_table_ids: vec![],
compression_algorithm,
target_file_size: ret.target_file_size,
Expand Down
63 changes: 37 additions & 26 deletions src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ pub mod tests {
use crate::hummock::compaction::level_selector::{LevelSelector, ManualCompactionSelector};
use crate::hummock::compaction::overlap_strategy::RangeOverlapStrategy;
use crate::hummock::compaction::LocalSelectorStatistic;
use crate::hummock::compaction_group::CompactionGroup;
use crate::hummock::test_utils::iterator_test_key_of_epoch;

fn clean_task_state(level_handler: &mut LevelHandler) {
Expand Down Expand Up @@ -1139,7 +1140,8 @@ pub mod tests {

#[test]
fn test_manual_compaction_selector_l0() {
let config = Arc::new(CompactionConfigBuilder::new().max_level(4).build());
let config = CompactionConfigBuilder::new().max_level(4).build();
let group_config = CompactionGroup::new(1, config);
let l0 = generate_l0_nonoverlapping_sublevels(vec![
generate_table(0, 1, 0, 500, 1),
generate_table(1, 1, 0, 500, 1),
Expand Down Expand Up @@ -1181,13 +1183,15 @@ pub mod tests {
internal_table_id: HashSet::default(),
level: 0,
};
let mut selector = ManualCompactionSelector::new(
config.clone(),
Arc::new(RangeOverlapStrategy::default()),
option,
);
let mut selector = ManualCompactionSelector::new(option);
let task = selector
.pick_compaction(1, &levels, &mut levels_handler, &mut local_stats)
.pick_compaction(
1,
&group_config,
&levels,
&mut levels_handler,
&mut local_stats,
)
.unwrap();
assert_compaction_task(&task, &levels_handler);
assert_eq!(task.input.input_levels.len(), 2);
Expand All @@ -1214,13 +1218,15 @@ pub mod tests {
internal_table_id: HashSet::default(),
level: 0,
};
let mut selector = ManualCompactionSelector::new(
config,
Arc::new(RangeOverlapStrategy::default()),
option,
);
let mut selector = ManualCompactionSelector::new(option);
let task = selector
.pick_compaction(2, &levels, &mut levels_handler, &mut local_stats)
.pick_compaction(
2,
&group_config,
&levels,
&mut levels_handler,
&mut local_stats,
)
.unwrap();
assert_compaction_task(&task, &levels_handler);
assert_eq!(task.input.input_levels.len(), 3);
Expand All @@ -1234,7 +1240,8 @@ pub mod tests {
/// tests `DynamicLevelSelector::manual_pick_compaction`
#[test]
fn test_manual_compaction_selector() {
let config = Arc::new(CompactionConfigBuilder::new().max_level(4).build());
let config = CompactionConfigBuilder::new().max_level(4).build();
let group_config = CompactionGroup::new(1, config);
let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
assert_eq!(l0.sub_levels.len(), 0);
let levels = vec![
Expand Down Expand Up @@ -1282,13 +1289,15 @@ pub mod tests {
internal_table_id: HashSet::default(),
level: 3,
};
let mut selector = ManualCompactionSelector::new(
config.clone(),
Arc::new(RangeOverlapStrategy::default()),
option,
);
let mut selector = ManualCompactionSelector::new(option);
let task = selector
.pick_compaction(1, &levels, &mut levels_handler, &mut local_stats)
.pick_compaction(
1,
&group_config,
&levels,
&mut levels_handler,
&mut local_stats,
)
.unwrap();
assert_compaction_task(&task, &levels_handler);
assert_eq!(task.input.input_levels.len(), 2);
Expand Down Expand Up @@ -1317,13 +1326,15 @@ pub mod tests {
internal_table_id: HashSet::default(),
level: 4,
};
let mut selector = ManualCompactionSelector::new(
config,
Arc::new(RangeOverlapStrategy::default()),
option,
);
let mut selector = ManualCompactionSelector::new(option);
let task = selector
.pick_compaction(1, &levels, &mut levels_handler, &mut local_stats)
.pick_compaction(
1,
&group_config,
&levels,
&mut levels_handler,
&mut local_stats,
)
.unwrap();
assert_compaction_task(&task, &levels_handler);
assert_eq!(task.input.input_levels.len(), 2);
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/compaction/picker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ mod ttl_reclaim_compaction_picker;
pub use base_level_compaction_picker::LevelCompactionPicker;
pub use manual_compaction_picker::ManualCompactionPicker;
pub use min_overlap_compaction_picker::MinOverlappingPicker;
pub use space_reclaim_compaction_picker::SpaceReclaimCompactionPicker;
pub use space_reclaim_compaction_picker::{SpaceReclaimCompactionPicker, SpaceReclaimPickerState};
pub use tier_compaction_picker::TierCompactionPicker;
pub use ttl_reclaim_compaction_picker::TtlReclaimCompactionPicker;
pub use ttl_reclaim_compaction_picker::{TtlPickerState, TtlReclaimCompactionPicker};
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,24 @@ use std::collections::HashSet;
use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{InputLevel, SstableInfo};

use crate::hummock::compaction::{CompactionInput, CompactionPicker, LocalPickerStatistic};
use crate::hummock::compaction::CompactionInput;
use crate::hummock::level_handler::LevelHandler;

pub struct SpaceReclaimCompactionPicker {
// config
pub max_space_reclaim_bytes: u64,
pub all_table_ids: HashSet<u32>,
}

// state
#[derive(Default)]
pub struct SpaceReclaimPickerState {
pub last_select_index: usize,
}

impl SpaceReclaimCompactionPicker {
pub fn new(max_space_reclaim_bytes: u64, all_table_ids: HashSet<u32>) -> Self {
Self {
max_space_reclaim_bytes,
last_select_index: 0,
all_table_ids,
}
}
Expand All @@ -47,27 +48,27 @@ impl SpaceReclaimCompactionPicker {
}
}

impl CompactionPicker for SpaceReclaimCompactionPicker {
fn pick_compaction(
impl SpaceReclaimCompactionPicker {
pub fn pick_compaction(
&mut self,
levels: &Levels,
level_handlers: &[LevelHandler],
_stats: &mut LocalPickerStatistic,
state: &mut SpaceReclaimPickerState,
) -> Option<CompactionInput> {
assert!(!levels.levels.is_empty());
let reclaimed_level = levels.levels.last().unwrap();
let mut select_input_ssts = vec![];
let level_handler = &level_handlers[reclaimed_level.level_idx as usize];

if self.last_select_index >= reclaimed_level.table_infos.len() {
self.last_select_index = 0;
if state.last_select_index >= reclaimed_level.table_infos.len() {
state.last_select_index = 0;
}

let start_indedx = self.last_select_index;
let start_indedx = state.last_select_index;
let mut select_file_size = 0;

for sst in &reclaimed_level.table_infos[start_indedx..] {
self.last_select_index += 1;
state.last_select_index += 1;
if level_handler.is_pending_compact(&sst.id) || self.filter(sst) {
continue;
}
Expand Down Expand Up @@ -105,8 +106,6 @@ impl CompactionPicker for SpaceReclaimCompactionPicker {
#[cfg(test)]
mod test {

use std::sync::Arc;

use itertools::Itertools;
use risingwave_pb::hummock::compact_task;
pub use risingwave_pb::hummock::{KeyRange, Level, LevelType};
Expand All @@ -117,20 +116,19 @@ mod test {
assert_compaction_task, generate_l0_nonoverlapping_sublevels, generate_level,
generate_table_with_table_ids,
};
use crate::hummock::compaction::level_selector::{
LevelSelector, SpaceReclaimCompactionSelector,
};
use crate::hummock::compaction::{selector_option, LocalSelectorStatistic, SelectorOption};
use crate::hummock::compaction::level_selector::SpaceReclaimCompactionSelector;
use crate::hummock::compaction::{LevelSelector, LocalSelectorStatistic};
use crate::hummock::compaction_group::CompactionGroup;

#[test]
fn test_space_reclaim_compaction_selector() {
let max_space_reclaim_bytes = 400;
let config = Arc::new(
CompactionConfigBuilder::new()
.max_level(4)
.max_space_reclaim_bytes(max_space_reclaim_bytes)
.build(),
);
let config = CompactionConfigBuilder::new()
.max_level(4)
.max_space_reclaim_bytes(max_space_reclaim_bytes)
.build();
let mut group_config = CompactionGroup::new(1, config);

let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
assert_eq!(l0.sub_levels.len(), 0);
let levels = vec![
Expand Down Expand Up @@ -168,15 +166,18 @@ mod test {
};
let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
let mut local_stats = LocalSelectorStatistic::default();
let selector_option = selector_option::SpaceReclaimCompactionSelectorOption {
compaction_config: config.clone(),
all_table_ids: HashSet::default(),
};
let mut selector = SpaceReclaimCompactionSelector::new(selector_option);

let mut selector = SpaceReclaimCompactionSelector::default();
{
// pick space reclaim
let task = selector
.pick_compaction(1, &levels, &mut levels_handler, &mut local_stats)
.pick_compaction(
1,
&group_config,
&levels,
&mut levels_handler,
&mut local_stats,
)
.unwrap();
assert_compaction_task(&task, &levels_handler);
assert_eq!(task.input.input_levels.len(), 2);
Expand Down Expand Up @@ -215,7 +216,13 @@ mod test {

// pick space reclaim
let task = selector
.pick_compaction(1, &levels, &mut levels_handler, &mut local_stats)
.pick_compaction(
1,
&group_config,
&levels,
&mut levels_handler,
&mut local_stats,
)
.unwrap();
assert_compaction_task(&task, &levels_handler);
assert_eq!(task.input.input_levels.len(), 2);
Expand Down Expand Up @@ -250,14 +257,15 @@ mod test {
}
}

let selector_option = selector_option::SpaceReclaimCompactionSelectorOption {
compaction_config: config.clone(),
all_table_ids: HashSet::from_iter([2, 3, 4, 5, 6, 7, 8, 9, 10]),
};
selector.try_update(SelectorOption::SpaceReclaim(selector_option));

group_config.member_table_ids = HashSet::from_iter([2, 3, 4, 5, 6, 7, 8, 9, 10]);
// pick space reclaim
let task = selector.pick_compaction(1, &levels, &mut levels_handler, &mut local_stats);
let task = selector.pick_compaction(
1,
&group_config,
&levels,
&mut levels_handler,
&mut local_stats,
);
assert!(task.is_none());
}

Expand All @@ -268,14 +276,16 @@ mod test {
}
}

let selector_option = selector_option::SpaceReclaimCompactionSelectorOption {
compaction_config: config,
all_table_ids: HashSet::from_iter([2, 3, 4, 5, 6, 7, 8, 9]),
};
selector.try_update(SelectorOption::SpaceReclaim(selector_option));
group_config.member_table_ids = HashSet::from_iter([2, 3, 4, 5, 6, 7, 8, 9]);
// pick space reclaim
let task = selector
.pick_compaction(1, &levels, &mut levels_handler, &mut local_stats)
.pick_compaction(
1,
&group_config,
&levels,
&mut levels_handler,
&mut local_stats,
)
.unwrap();
assert_compaction_task(&task, &levels_handler);
assert_eq!(task.input.input_levels.len(), 2);
Expand Down
Loading

0 comments on commit d0b90fb

Please sign in to comment.