Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added next level of output level SST's metadata for compactions #760

Merged
merged 11 commits into from
Sep 11, 2023
30 changes: 30 additions & 0 deletions librocksdb_sys/crocksdb/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6601,6 +6601,36 @@ int crocksdb_sst_partitioner_context_output_level(
return context->rep->output_level;
}

int crocksdb_sst_partitioner_context_next_level_segment_count(
crocksdb_sst_partitioner_context_t* context) {
return context->rep->OutputNextLevelSegmentCount();
}

size_t crocksdb_sst_partitioner_context_get_next_level_size(
crocksdb_sst_partitioner_context_t* context, int index) {
return context->rep->output_next_level_size[index];
}

void crocksdb_sst_partitioner_context_get_next_level_boundary(
crocksdb_sst_partitioner_context_t* context, int index, const char** key,
size_t* key_len) {
const auto s = context->rep->output_next_level_boundaries[index];
*key = s.data();
*key_len = s.size();
}

void crocksdb_sst_partitioner_context_push_bounary_and_size(
crocksdb_sst_partitioner_context_t* context, const char* boundary_key,
size_t boundary_key_len, size_t size) {
if (!context->rep->output_next_level_boundaries.empty()) {
// The first boundary means the left-bondary, which isn't a segment.
// Its size should be ignored.
context->rep->output_next_level_size.push_back(size);
}
context->rep->output_next_level_boundaries.emplace_back(boundary_key,
boundary_key_len);
}

const char* crocksdb_sst_partitioner_context_smallest_key(
crocksdb_sst_partitioner_context_t* context, size_t* key_len) {
auto& smallest_key = context->rep->smallest_user_key;
Expand Down
14 changes: 14 additions & 0 deletions librocksdb_sys/crocksdb/crocksdb/c.h
Original file line number Diff line number Diff line change
Expand Up @@ -2595,6 +2595,20 @@ crocksdb_sst_partitioner_context_smallest_key(
extern C_ROCKSDB_LIBRARY_API const char*
crocksdb_sst_partitioner_context_largest_key(
crocksdb_sst_partitioner_context_t* context, size_t* key_len);
extern C_ROCKSDB_LIBRARY_API int
crocksdb_sst_partitioner_context_next_level_segment_count(
crocksdb_sst_partitioner_context_t* context);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_sst_partitioner_context_get_next_level_boundary(
crocksdb_sst_partitioner_context_t* context, int index, const char** key,
size_t* key_len);
extern C_ROCKSDB_LIBRARY_API size_t
crocksdb_sst_partitioner_context_get_next_level_size(
crocksdb_sst_partitioner_context_t* context, int index);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_sst_partitioner_context_push_bounary_and_size(
crocksdb_sst_partitioner_context_t* context, const char* boundary_key,
size_t boundary_key_len, size_t size);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_sst_partitioner_context_set_is_full_compaction(
crocksdb_sst_partitioner_context_t* context,
Expand Down
19 changes: 19 additions & 0 deletions librocksdb_sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2833,6 +2833,25 @@ extern "C" {
largest_key: *const c_char,
key_len: size_t,
);
pub fn crocksdb_sst_partitioner_context_get_next_level_boundary(
context: *mut DBSstPartitionerContext,
index: c_int,
key: *mut *const c_char,
key_len: *mut size_t,
);
pub fn crocksdb_sst_partitioner_context_get_next_level_size(
context: *mut DBSstPartitionerContext,
index: c_int,
) -> size_t;
pub fn crocksdb_sst_partitioner_context_push_bounary_and_size(
context: *mut DBSstPartitionerContext,
boundary_key: *const c_char,
boundary_key_len: size_t,
size: size_t,
);
pub fn crocksdb_sst_partitioner_context_next_level_segment_count(
context: *mut DBSstPartitionerContext,
) -> c_int;

pub fn crocksdb_sst_partitioner_factory_create(
underlying: *mut c_void,
Expand Down
84 changes: 84 additions & 0 deletions src/sst_partitioner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub struct SstPartitionerContext<'a> {
pub output_level: i32,
pub smallest_key: &'a [u8],
pub largest_key: &'a [u8],
pub next_level_boundaries: Vec<&'a [u8]>,
pub next_level_sizes: Vec<usize>,
}

pub trait SstPartitioner {
Expand Down Expand Up @@ -103,6 +105,15 @@ extern "C" fn sst_partitioner_factory_create_partitioner<F: SstPartitionerFactor
context: *mut DBSstPartitionerContext,
) -> *mut DBSstPartitioner {
let factory = unsafe { &*(ctx as *mut F) };
let segment_size =
unsafe { crocksdb_ffi::crocksdb_sst_partitioner_context_next_level_segment_count(context) };
let boundary_size = if segment_size == 0 {
0
} else {
segment_size + 1
};
let mut next_level_boundaries = Vec::with_capacity(boundary_size as usize);
let mut next_level_sizes = Vec::with_capacity(segment_size as usize);
let context = unsafe {
let mut smallest_key_len: usize = 0;
let smallest_key = crocksdb_ffi::crocksdb_sst_partitioner_context_smallest_key(
Expand All @@ -114,6 +125,37 @@ extern "C" fn sst_partitioner_factory_create_partitioner<F: SstPartitionerFactor
context,
&mut largest_key_len,
) as *const u8;
if segment_size > 0 {
let mut first_boundary_key = std::ptr::null();
let mut first_boundary_key_len = 0usize;
crocksdb_ffi::crocksdb_sst_partitioner_context_get_next_level_boundary(
context,
0,
&mut first_boundary_key,
&mut first_boundary_key_len as _,
);
next_level_boundaries.push(slice::from_raw_parts(
first_boundary_key as *const u8,
first_boundary_key_len,
))
}
for i in 0..segment_size {
let mut boundary_key_len = 0usize;
let mut boundary_key = std::ptr::null();
crocksdb_ffi::crocksdb_sst_partitioner_context_get_next_level_boundary(
context,
i + 1,
&mut boundary_key as _,
&mut boundary_key_len as _,
);
let size =
crocksdb_ffi::crocksdb_sst_partitioner_context_get_next_level_size(context, i);
next_level_boundaries.push(slice::from_raw_parts(
boundary_key as *const u8,
boundary_key_len,
));
next_level_sizes.push(size);
}
SstPartitionerContext {
is_full_compaction: crocksdb_ffi::crocksdb_sst_partitioner_context_is_full_compaction(
context,
Expand All @@ -123,6 +165,8 @@ extern "C" fn sst_partitioner_factory_create_partitioner<F: SstPartitionerFactor
output_level: crocksdb_ffi::crocksdb_sst_partitioner_context_output_level(context),
smallest_key: slice::from_raw_parts(smallest_key, smallest_key_len),
largest_key: slice::from_raw_parts(largest_key, largest_key_len),
next_level_boundaries,
next_level_sizes,
}
};
match factory.create_partitioner(&context) {
Expand Down Expand Up @@ -188,6 +232,9 @@ mod test {
pub output_level: Option<i32>,
pub smallest_key: Option<Vec<u8>>,
pub largest_key: Option<Vec<u8>>,

pub next_level_boundaries: Vec<Vec<u8>>,
pub next_level_sizes: Vec<usize>,
}

impl Default for TestState {
Expand All @@ -211,6 +258,8 @@ mod test {
output_level: None,
smallest_key: None,
largest_key: None,
next_level_boundaries: vec![],
next_level_sizes: vec![],
}
}
}
Expand Down Expand Up @@ -273,6 +322,12 @@ mod test {
s.output_level = Some(context.output_level);
s.smallest_key = Some(context.smallest_key.to_vec());
s.largest_key = Some(context.largest_key.to_vec());
s.next_level_boundaries = context
.next_level_boundaries
.iter()
.map(|v| v.to_vec())
.collect();
s.next_level_sizes = context.next_level_sizes.clone();

Some(TestSstPartitioner {
state: self.state.clone(),
Expand Down Expand Up @@ -305,6 +360,12 @@ mod test {
const OUTPUT_LEVEL: i32 = 3;
const SMALLEST_KEY: &[u8] = b"aaaa";
const LARGEST_KEY: &[u8] = b"bbbb";
const BOUNDARIES_AND_SIZES: [(&[u8], usize); 4] = [
(b"aaaa", 0usize),
(b"aaab", 42usize),
(b"aaba", 96usize),
(b"bbbb", 256usize),
];

let s = Arc::new(Mutex::new(TestState::default()));
let factory = new_sst_partitioner_factory(TestSstPartitionerFactory { state: s.clone() });
Expand All @@ -329,6 +390,14 @@ mod test {
LARGEST_KEY.as_ptr() as *const c_char,
LARGEST_KEY.len(),
);
for (boundary_key, size) in BOUNDARIES_AND_SIZES {
crocksdb_ffi::crocksdb_sst_partitioner_context_push_bounary_and_size(
context,
boundary_key.as_ptr() as *const i8,
boundary_key.len(),
size,
);
}
}
let partitioner = unsafe {
crocksdb_ffi::crocksdb_sst_partitioner_factory_create_partitioner(factory, context)
Expand All @@ -341,6 +410,21 @@ mod test {
assert_eq!(OUTPUT_LEVEL, sl.output_level.unwrap());
assert_eq!(SMALLEST_KEY, sl.smallest_key.as_ref().unwrap().as_slice());
assert_eq!(LARGEST_KEY, sl.largest_key.as_ref().unwrap().as_slice());
assert_eq!(
BOUNDARIES_AND_SIZES
.iter()
.map(|x| x.0)
.collect::<Vec<&'static [u8]>>(),
sl.next_level_boundaries
);
assert_eq!(
BOUNDARIES_AND_SIZES
.iter()
.skip(1)
.map(|x| x.1)
.collect::<Vec<usize>>(),
sl.next_level_sizes
);
}
unsafe {
crocksdb_ffi::crocksdb_sst_partitioner_destroy(partitioner);
Expand Down