Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): implement read pruning by vnode #2882

Merged
merged 18 commits into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ mod tests {
// Data will be materialized in associated streaming task.
let epoch = u64::MAX;
let full_range = (Bound::<Vec<u8>>::Unbounded, Bound::<Vec<u8>>::Unbounded);
let store_content = store.scan(full_range, None, epoch).await?;
let store_content = store.scan(full_range, None, epoch, vec![]).await?;
assert!(store_content.is_empty());

handle.await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/bench/ss_bench/operations/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl Operations {
let mut sizes: Vec<usize> = vec![];
for key in keys {
let start = Instant::now();
let val_size = match store.get(&key, u64::MAX).await.unwrap() {
let val_size = match store.get(&key, u64::MAX, None).await.unwrap() {
Some(v) => v.len(),
None => 0,
};
Expand Down
1 change: 1 addition & 0 deletions src/bench/ss_bench/operations/prefix_scan_random.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl Operations {
prefix.chunk().to_vec()..next_key(prefix.chunk()),
None,
u64::MAX,
vec![],
)
.await
.unwrap();
Expand Down
8 changes: 6 additions & 2 deletions src/ctl/src/cmd_impl/hummock/list_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,18 @@ pub async fn list_kv(epoch: u64, table_id: u32) -> anyhow::Result<()> {
let scan_result = match table_id {
u32::MAX => {
tracing::info!("using .. as range");
hummock.scan::<_, Vec<u8>>(.., None, u64::MAX).await?
hummock
.scan::<_, Vec<u8>>(.., None, u64::MAX, vec![])
.await?
}
_ => {
let mut buf = BytesMut::with_capacity(5);
buf.put_u8(b't');
buf.put_u32(table_id);
let range = buf.to_vec()..next_key(buf.to_vec().as_slice());
hummock.scan::<_, Vec<u8>>(range, None, u64::MAX).await?
hummock
.scan::<_, Vec<u8>>(range, None, u64::MAX, vec![])
.await?
}
};
for (k, v) in scan_result {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ mod tests {
storage
.local_version_manager()
.try_update_pinned_version(version);
let get_val = storage.get(&key, epoch).await.unwrap().unwrap();
let get_val = storage.get(&key, epoch, None).await.unwrap().unwrap();
assert_eq!(get_val, val);

// 6. get compact task and there should be none
Expand Down
8 changes: 6 additions & 2 deletions src/storage/src/hummock/snapshot_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ use crate::storage_value::StorageValue;
use crate::store::StateStoreIter;
use crate::StateStore;

// TODO(Yuanxin): Might rewrite this later.
macro_rules! assert_count_range_scan {
($storage:expr, $range:expr, $expect_count:expr, $epoch:expr) => {{
let mut it = $storage.iter::<_, Vec<u8>>($range, $epoch).await.unwrap();
let mut it = $storage
.iter::<_, Vec<u8>>($range, $epoch, vec![])
.await
.unwrap();
let mut count = 0;
loop {
match it.next().await.unwrap() {
Expand All @@ -42,7 +46,7 @@ macro_rules! assert_count_range_scan {
macro_rules! assert_count_backward_range_scan {
($storage:expr, $range:expr, $expect_count:expr, $epoch:expr) => {{
let mut it = $storage
.backward_iter::<_, Vec<u8>>($range, $epoch)
.backward_iter::<_, Vec<u8>>($range, $epoch, vec![])
.await
.unwrap();
let mut count = 0;
Expand Down
33 changes: 28 additions & 5 deletions src/storage/src/hummock/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;

use bytes::Bytes;
use itertools::Itertools;
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::key::key_with_epoch;
use risingwave_pb::hummock::{SstableInfo, VNodeBitmap};

Expand Down Expand Up @@ -296,7 +297,12 @@ impl StateStore for HummockStorage {

define_state_store_associated_type!();

fn get<'a>(&'a self, key: &'a [u8], epoch: u64) -> Self::GetFuture<'_> {
fn get<'a>(
&'a self,
key: &'a [u8],
epoch: u64,
_vnode: Option<VirtualNode>,
Copy link
Member

@fuyufjh fuyufjh May 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering there might not be such a "PointGet" operator, I think the type of vnodes should also be Vec<VirtualNode>. Nevermind, it's not a big problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer always using &'a VirtualNode, so that it will function efficiently even when vnode mapping is large.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer always using &'a VirtualNode, so that it will function efficiently even when vnode mapping is large.

Will Option<&'a VirtualNode> still cause some overhead due to the construction of Option?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, it has exactly the same value size as &'a VirtualNode.

) -> Self::GetFuture<'_> {
xx01cyx marked this conversation as resolved.
Show resolved Hide resolved
async move { self.get_with_vnode_set(key, epoch, None).await }
}

Expand All @@ -305,26 +311,33 @@ impl StateStore for HummockStorage {
key_range: R,
limit: Option<usize>,
epoch: u64,
vnodes: Vec<VirtualNode>,
) -> Self::ScanFuture<'_, R, B>
where
R: RangeBounds<B> + Send,
B: AsRef<[u8]> + Send,
{
async move { self.iter(key_range, epoch).await?.collect(limit).await }
async move {
self.iter(key_range, epoch, vnodes)
.await?
.collect(limit)
.await
}
}

fn backward_scan<R, B>(
&self,
key_range: R,
limit: Option<usize>,
epoch: u64,
vnodes: Vec<VirtualNode>,
) -> Self::BackwardScanFuture<'_, R, B>
where
R: RangeBounds<B> + Send,
B: AsRef<[u8]> + Send,
{
async move {
self.backward_iter(key_range, epoch)
self.backward_iter(key_range, epoch, vnodes)
.await?
.collect(limit)
.await
Expand Down Expand Up @@ -371,7 +384,12 @@ impl StateStore for HummockStorage {

/// Returns an iterator that scan from the begin key to the end key
/// The result is based on a snapshot corresponding to the given `epoch`.
fn iter<R, B>(&self, key_range: R, epoch: u64) -> Self::IterFuture<'_, R, B>
fn iter<R, B>(
&self,
key_range: R,
epoch: u64,
_vnodes: Vec<VirtualNode>,
) -> Self::IterFuture<'_, R, B>
where
R: RangeBounds<B> + Send,
B: AsRef<[u8]> + Send,
Expand All @@ -381,7 +399,12 @@ impl StateStore for HummockStorage {

/// Returns a backward iterator that scans from the end key to the begin key
/// The result is based on a snapshot corresponding to the given `epoch`.
fn backward_iter<R, B>(&self, key_range: R, epoch: u64) -> Self::BackwardIterFuture<'_, R, B>
fn backward_iter<R, B>(
&self,
key_range: R,
epoch: u64,
_vnodes: Vec<VirtualNode>,
) -> Self::BackwardIterFuture<'_, R, B>
where
R: RangeBounds<B> + Send,
B: AsRef<[u8]> + Send,
Expand Down
75 changes: 54 additions & 21 deletions src/storage/src/hummock/state_store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,22 @@ async fn test_basic() {
hummock_storage.ingest_batch(batch1, epoch1).await.unwrap();

// Get the value after flushing to remote.
let value = hummock_storage.get(&anchor, epoch1).await.unwrap().unwrap();
let value = hummock_storage
.get(&anchor, epoch1, None)
.await
.unwrap()
.unwrap();
assert_eq!(value, Bytes::from("111"));
let value = hummock_storage
.get(&Bytes::from("bb"), epoch1)
.get(&Bytes::from("bb"), epoch1, None)
.await
.unwrap()
.unwrap();
assert_eq!(value, Bytes::from("222"));

// Test looking for a nonexistent key. `next()` would return the next key.
let value = hummock_storage
.get(&Bytes::from("ab"), epoch1)
.get(&Bytes::from("ab"), epoch1, None)
.await
.unwrap();
assert_eq!(value, None);
Expand All @@ -105,50 +109,62 @@ async fn test_basic() {
hummock_storage.ingest_batch(batch2, epoch2).await.unwrap();

// Get the value after flushing to remote.
let value = hummock_storage.get(&anchor, epoch2).await.unwrap().unwrap();
let value = hummock_storage
.get(&anchor, epoch2, None)
.await
.unwrap()
.unwrap();
assert_eq!(value, Bytes::from("111111"));

// Write the third batch.
let epoch3 = epoch2 + 1;
hummock_storage.ingest_batch(batch3, epoch3).await.unwrap();

// Get the value after flushing to remote.
let value = hummock_storage.get(&anchor, epoch3).await.unwrap();
let value = hummock_storage.get(&anchor, epoch3, None).await.unwrap();
assert_eq!(value, None);

// Get non-existent maximum key.
let value = hummock_storage
.get(&Bytes::from("ff"), epoch3)
.get(&Bytes::from("ff"), epoch3, None)
.await
.unwrap();
assert_eq!(value, None);

// Write aa bb
let mut iter = hummock_storage
.iter(..=b"ee".to_vec(), epoch1)
.iter(..=b"ee".to_vec(), epoch1, vec![])
.await
.unwrap();
let len = count_iter(&mut iter).await;
assert_eq!(len, 2);

// Get the anchor value at the first snapshot
let value = hummock_storage.get(&anchor, epoch1).await.unwrap().unwrap();
let value = hummock_storage
.get(&anchor, epoch1, None)
.await
.unwrap()
.unwrap();
assert_eq!(value, Bytes::from("111"));

// Get the anchor value at the second snapshot
let value = hummock_storage.get(&anchor, epoch2).await.unwrap().unwrap();
let value = hummock_storage
.get(&anchor, epoch2, None)
.await
.unwrap()
.unwrap();
assert_eq!(value, Bytes::from("111111"));
// Update aa, write cc
let mut iter = hummock_storage
.iter(..=b"ee".to_vec(), epoch2)
.iter(..=b"ee".to_vec(), epoch2, vec![])
.await
.unwrap();
let len = count_iter(&mut iter).await;
assert_eq!(len, 3);

// Delete aa, write dd,ee
let mut iter = hummock_storage
.iter(..=b"ee".to_vec(), epoch3)
.iter(..=b"ee".to_vec(), epoch3, vec![])
.await
.unwrap();
let len = count_iter(&mut iter).await;
Expand All @@ -165,20 +181,21 @@ async fn test_basic() {
.unwrap();
hummock_storage.wait_epoch(epoch1).await.unwrap();
let value = hummock_storage
.get(&Bytes::from("bb"), epoch2)
.get(&Bytes::from("bb"), epoch2, None)
.await
.unwrap()
.unwrap();
assert_eq!(value, Bytes::from("222"));
let value = hummock_storage
.get(&Bytes::from("dd"), epoch2)
.get(&Bytes::from("dd"), epoch2, None)
.await
.unwrap();
assert!(value.is_none());
}

#[tokio::test]
async fn test_vnode_filter() {
// TODO(Yuanxin): rewrite this test
let sstable_store = mock_sstable_store();
let hummock_options = Arc::new(default_config_for_test());
let (_env, hummock_manager_ref, _cluster_manager_ref, worker_node) =
Expand Down Expand Up @@ -341,7 +358,7 @@ async fn test_state_store_sync() {
// hummock_storage.shared_buffer_manager().size() as u64
// );

// triger a sync
// trigger a sync
hummock_storage.sync(Some(epoch)).await.unwrap();

// TODO: Uncomment the following lines after flushed sstable can be accessed.
Expand Down Expand Up @@ -408,12 +425,16 @@ async fn test_reload_storage() {
.unwrap();

// Get the value after flushing to remote.
let value = hummock_storage.get(&anchor, epoch1).await.unwrap().unwrap();
let value = hummock_storage
.get(&anchor, epoch1, None)
.await
.unwrap()
.unwrap();
assert_eq!(value, Bytes::from("111"));

// Test looking for a nonexistent key. `next()` would return the next key.
let value = hummock_storage
.get(&Bytes::from("ab"), epoch1)
.get(&Bytes::from("ab"), epoch1, None)
.await
.unwrap();
assert_eq!(value, None);
Expand All @@ -423,27 +444,39 @@ async fn test_reload_storage() {
hummock_storage.ingest_batch(batch2, epoch2).await.unwrap();

// Get the value after flushing to remote.
let value = hummock_storage.get(&anchor, epoch2).await.unwrap().unwrap();
let value = hummock_storage
.get(&anchor, epoch2, None)
.await
.unwrap()
.unwrap();
assert_eq!(value, Bytes::from("111111"));

// Write aa bb
let mut iter = hummock_storage
.iter(..=b"ee".to_vec(), epoch1)
.iter(..=b"ee".to_vec(), epoch1, vec![])
.await
.unwrap();
let len = count_iter(&mut iter).await;
assert_eq!(len, 2);

// Get the anchor value at the first snapshot
let value = hummock_storage.get(&anchor, epoch1).await.unwrap().unwrap();
let value = hummock_storage
.get(&anchor, epoch1, None)
.await
.unwrap()
.unwrap();
assert_eq!(value, Bytes::from("111"));

// Get the anchor value at the second snapshot
let value = hummock_storage.get(&anchor, epoch2).await.unwrap().unwrap();
let value = hummock_storage
.get(&anchor, epoch2, None)
.await
.unwrap()
.unwrap();
assert_eq!(value, Bytes::from("111111"));
// Update aa, write cc
let mut iter = hummock_storage
.iter(..=b"ee".to_vec(), epoch2)
.iter(..=b"ee".to_vec(), epoch2, vec![])
.await
.unwrap();
let len = count_iter(&mut iter).await;
Expand Down
Loading