Skip to content

Commit

Permalink
refactor(storage): sstable and block level interfaces use fullkey str…
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu authored Mar 13, 2023
1 parent 32100f3 commit c0aa78b
Show file tree
Hide file tree
Showing 13 changed files with 180 additions and 128 deletions.
4 changes: 2 additions & 2 deletions src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ fn print_kv_pairs(

while block_iter.is_valid() {
let raw_full_key = block_iter.key();
let full_key = FullKey::decode(block_iter.key());
let full_key = block_iter.key();
let raw_user_key = full_key.user_key.encode();

let full_val = block_iter.value();
Expand All @@ -245,7 +245,7 @@ fn print_kv_pairs(
println!(
"\t\t full key: {:02x?}, len={}",
raw_full_key,
raw_full_key.len()
raw_full_key.encoded_len()
);
println!("\t\tfull value: {:02x?}, len={}", full_val, full_val.len());
println!("\t\t user key: {:02x?}", raw_user_key);
Expand Down
5 changes: 3 additions & 2 deletions src/storage/benches/bench_block_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use bytes::{BufMut, Bytes, BytesMut};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use risingwave_hummock_sdk::key::FullKey;
use risingwave_storage::hummock::{
Block, BlockBuilder, BlockBuilderOptions, BlockHolder, BlockIterator, CompressionAlgorithm,
};
Expand Down Expand Up @@ -88,7 +89,7 @@ fn bench_block_iter(c: &mut Criterion) {
iter.seek_to_first();
for t in 1..=TABLES_PER_SSTABLE {
for i in 1..=KEYS_PER_TABLE {
assert_eq!(iter.key(), key(t, i).to_vec());
assert_eq!(iter.key(), FullKey::decode(&key(t, i)));
assert_eq!(iter.value(), value(i).to_vec());
iter.next();
}
Expand All @@ -108,7 +109,7 @@ fn build_block_data(t: u32, i: u64) -> Bytes {
let mut builder = BlockBuilder::new(options);
for tt in 1..=t {
for ii in 1..=i {
builder.add(&key(tt, ii), &value(ii));
builder.add(FullKey::decode(&key(tt, ii)), &value(ii));
}
}
Bytes::from(builder.build().to_vec())
Expand Down
6 changes: 5 additions & 1 deletion src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ async fn build_table(
let end = start + 8;
full_key.user_key.table_key[table_key_len - 8..].copy_from_slice(&i.to_be_bytes());
builder
.add(&full_key, HummockValue::put(&value[start..end]), true)
.add(
full_key.to_ref(),
HummockValue::put(&value[start..end]),
true,
)
.await
.unwrap();
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/benches/bench_multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ async fn build_tables<F: SstableWriterFactory>(
for i in RANGE {
builder
.add_full_key(
&FullKey::from_user_key(test_user_key_of(i).as_ref(), 1),
FullKey::from_user_key(test_user_key_of(i).as_ref(), 1),
HummockValue::put(VALUE),
true,
)
Expand Down
51 changes: 29 additions & 22 deletions src/storage/src/hummock/compactor/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl SstableStreamIterator {
/// key >= `seek_key`. If that block does not contain such a KV-pair, the iterator continues to
/// the first KV-pair of the next block. If `seek_key` is not given, the iterator will move to
/// the very first KV-pair of the stream's first block.
pub async fn seek(&mut self, seek_key: Option<&[u8]>) -> HummockResult<()> {
pub async fn seek(&mut self, seek_key: Option<FullKey<&[u8]>>) -> HummockResult<()> {
// Load first block.
self.next_block().await?;

Expand Down Expand Up @@ -156,7 +156,7 @@ impl SstableStreamIterator {
Ok(())
}

fn key(&self) -> &[u8] {
fn key(&self) -> FullKey<&[u8]> {
self.block_iter
.as_ref()
.unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
Expand Down Expand Up @@ -225,18 +225,20 @@ impl ConcatSstableIterator {
}

/// Resets the iterator, loads the specified SST, and seeks in that SST to `seek_key` if given.
async fn seek_idx(&mut self, idx: usize, seek_key: Option<&[u8]>) -> HummockResult<()> {
async fn seek_idx(
&mut self,
idx: usize,
seek_key: Option<FullKey<&[u8]>>,
) -> HummockResult<()> {
self.sstable_iter.take();
let seek_key: Option<&[u8]> = match (seek_key, self.key_range.left.is_empty()) {
(Some(seek_key), false) => {
match KeyComparator::compare_encoded_full_key(seek_key, &self.key_range.left) {
Ordering::Less | Ordering::Equal => Some(&self.key_range.left),
Ordering::Greater => Some(seek_key),
}
}
let seek_key: Option<FullKey<&[u8]>> = match (seek_key, self.key_range.left.is_empty()) {
(Some(seek_key), false) => match seek_key.cmp(&FullKey::decode(&self.key_range.left)) {
Ordering::Less | Ordering::Equal => Some(FullKey::decode(&self.key_range.left)),
Ordering::Greater => Some(seek_key),
},
(Some(seek_key), true) => Some(seek_key),
(None, true) => None,
(None, false) => Some(&self.key_range.left),
(None, false) => Some(FullKey::decode(&self.key_range.left)),
};

if idx < self.tables.len() {
Expand All @@ -252,8 +254,7 @@ impl ConcatSstableIterator {
// start_index points to the greatest block whose smallest_key <= seek_key.
block_metas
.partition_point(|block| {
KeyComparator::compare_encoded_full_key(&block.smallest_key, seek_key)
!= Ordering::Greater
seek_key.cmp(&FullKey::decode(&block.smallest_key)) != Ordering::Less
})
.saturating_sub(1)
}
Expand Down Expand Up @@ -323,7 +324,7 @@ impl HummockIterator for ConcatSstableIterator {
}

fn key(&self) -> FullKey<&[u8]> {
FullKey::decode(self.sstable_iter.as_ref().expect("no table iter").key())
self.sstable_iter.as_ref().expect("no table iter").key()
}

fn value(&self) -> HummockValue<&[u8]> {
Expand Down Expand Up @@ -362,7 +363,7 @@ impl HummockIterator for ConcatSstableIterator {
KeyComparator::compare_encoded_full_key(max_sst_key, seek_key) == Ordering::Less
});

self.seek_idx(table_idx, Some(key_slice)).await
self.seek_idx(table_idx, Some(key)).await
}
}

Expand Down Expand Up @@ -521,20 +522,22 @@ mod tests {
let block_2_smallest_key = block_metas[2].smallest_key.clone();
// Use block_1_smallest_key as seek key and result in the first KV of block 1.
let seek_key = block_1_smallest_key.clone();
iter.seek_idx(0, Some(seek_key.as_slice())).await.unwrap();
iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
.await
.unwrap();
assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
// Use prev_full_key(block_1_smallest_key) as seek key and result in the first KV of block
// 1.
let seek_key = prev_full_key(block_1_smallest_key.as_slice());
iter.seek_idx(0, Some(seek_key.as_slice())).await.unwrap();
iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
.await
.unwrap();
assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
iter.next().await.unwrap();
let block_1_second_key = iter.key().to_vec();
// Use a big enough seek key and result in invalid iterator.
let seek_key = test_key_of(30001);
iter.seek_idx(0, Some(seek_key.encode().as_slice()))
.await
.unwrap();
iter.seek_idx(0, Some(seek_key.to_ref())).await.unwrap();
assert!(!iter.is_valid());

// Test seek_idx. Result is dominated by key range rather than given seek key.
Expand All @@ -547,11 +550,15 @@ mod tests {
// Use block_2_smallest_key as seek key and result in invalid iterator.
let seek_key = block_2_smallest_key.clone();
assert!(KeyComparator::compare_encoded_full_key(&seek_key, &kr.right) == Ordering::Greater);
iter.seek_idx(0, Some(seek_key.as_slice())).await.unwrap();
iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
.await
.unwrap();
assert!(!iter.is_valid());
// Use a small enough seek key and result in the second KV of block 1.
let seek_key = test_key_of(0).encode();
iter.seek_idx(0, Some(seek_key.as_slice())).await.unwrap();
iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
.await
.unwrap();
assert!(iter.is_valid());
assert_eq!(iter.key(), block_1_second_key.to_ref());
// Use None seek key and result in the second KV of block 1.
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ impl Compactor {

// Don't allow two SSTs to share same user key
sst_builder
.add_full_key(&iter_key, value, is_new_user_key)
.add_full_key(iter_key, value, is_new_user_key)
.await?;

iter.next().await?;
Expand Down
10 changes: 7 additions & 3 deletions src/storage/src/hummock/sstable/backward_sstable_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ impl BackwardSstableIterator {
}

/// Seeks to a block, and then seeks to the key if `seek_key` is given.
async fn seek_idx(&mut self, idx: isize, seek_key: Option<&[u8]>) -> HummockResult<()> {
async fn seek_idx(
&mut self,
idx: isize,
seek_key: Option<FullKey<&[u8]>>,
) -> HummockResult<()> {
if idx >= self.sst.value().block_count() as isize || idx < 0 {
self.block_iter = None;
} else {
Expand Down Expand Up @@ -104,7 +108,7 @@ impl HummockIterator for BackwardSstableIterator {
}

fn key(&self) -> FullKey<&[u8]> {
FullKey::decode(self.block_iter.as_ref().expect("no block iter").key())
self.block_iter.as_ref().expect("no block iter").key()
}

fn value(&self) -> HummockValue<&[u8]> {
Expand Down Expand Up @@ -148,7 +152,7 @@ impl HummockIterator for BackwardSstableIterator {
.saturating_sub(1); // considering the boundary of 0
let block_idx = block_idx as isize;

self.seek_idx(block_idx, Some(encoded_key_slice)).await?;
self.seek_idx(block_idx, Some(key)).await?;
if !self.is_valid() {
// Seek to prev block
self.seek_idx(block_idx - 1, None).await?;
Expand Down
Loading

0 comments on commit c0aa78b

Please sign in to comment.