Skip to content

Commit

Permalink
add compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Jul 3, 2023
1 parent 777087d commit 6f47274
Showing 1 changed file with 49 additions and 8 deletions.
57 changes: 49 additions & 8 deletions analytic_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,26 @@ fn trim_to_threshold(

/// Size tiered compaction strategy
/// See https://github.com/jeffjirsa/twcs/blob/master/src/main/java/com/jeffjirsa/cassandra/db/compaction/SizeTieredCompactionStrategy.java
#[derive(Default)]
pub struct SizeTieredPicker {}
pub struct SizeTieredPicker {
// Origin solution will only consider file size, but this will cause data corrupt, see
// https://github.com/CeresDB/ceresdb/pull/1041
//
// So could only compact files with adjacent seq, or sst without overlapping range.
// Currently solution is relative simple, only pick adjacent sst.
// Maybe a better, but more complex solution could be introduced later.
prefer_seq: bool,
}

impl Default for SizeTieredPicker {
fn default() -> Self {
Self {
// Remove this when pick_by_seq is stable.
prefer_seq: std::env::var("CERESDB_SIZE_PICKER_PREFER_BY_SEQ")
.unwrap_or_else(|_| "true".to_string())
== "true",
}
}
}

/// Similar size files group
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -244,12 +262,16 @@ impl LevelPicker for SizeTieredPicker {
let opts = ctx.size_tiered_opts();
// Iterate the segment in reverse order, so newest segment is examined first.
for (idx, (segment_key, segment)) in files_by_segment.iter().rev().enumerate() {
let files = Self::pick_by_seq(
segment.to_vec(),
opts.min_threshold,
opts.max_threshold,
opts.max_input_sstable_size.as_byte(),
);
let files = if self.prefer_seq {
Self::pick_by_seq(
segment.to_vec(),
opts.min_threshold,
opts.max_threshold,
opts.max_input_sstable_size.as_byte(),
)
} else {
Self::pick_by_size(segment.to_vec(), &opts)
};

if files.is_some() {
info!(
Expand Down Expand Up @@ -303,6 +325,25 @@ impl SizeTieredPicker {
None
}

fn pick_by_size(
files: Vec<FileHandle>,
opts: &SizeTieredCompactionOptions,
) -> Option<Vec<FileHandle>> {
let buckets = Self::get_buckets(
files,
opts.bucket_high,
opts.bucket_low,
opts.min_sstable_size.as_byte() as f32,
);

Self::most_interesting_bucket(
buckets,
opts.min_threshold,
opts.max_threshold,
opts.max_input_sstable_size.as_byte(),
)
}

/// Group files of similar size into buckets.
fn get_buckets(
mut files: Vec<FileHandle>,
Expand Down

0 comments on commit 6f47274

Please sign in to comment.