Skip to content

Commit

Permalink
chore(storage): remove state store v1 (#8102)
Browse files Browse the repository at this point in the history
remove unused code state_store_v1 and local_version_manager

Approved-By: wenym1
  • Loading branch information
Li0k authored Feb 21, 2023
1 parent 7a0316b commit 4835160
Show file tree
Hide file tree
Showing 13 changed files with 31 additions and 1,565 deletions.
8 changes: 0 additions & 8 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,6 @@ pub struct StorageConfig {

#[serde(default = "default::storage::max_concurrent_compaction_task_number")]
pub max_concurrent_compaction_task_number: u64,

/// Whether to enable state_store_v1 for hummock
#[serde(default = "default::storage::enable_state_store_v1")]
pub enable_state_store_v1: bool,
}

impl Default for StorageConfig {
Expand Down Expand Up @@ -614,10 +610,6 @@ mod default {
pub fn max_concurrent_compaction_task_number() -> u64 {
16
}

pub fn enable_state_store_v1() -> bool {
false
}
}

pub mod streaming {
Expand Down
2 changes: 0 additions & 2 deletions src/storage/hummock_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ mod compactor_tests;
#[cfg(all(test, feature = "failpoints"))]
mod failpoint_tests;
#[cfg(test)]
mod local_version_manager_tests;
#[cfg(test)]
mod snapshot_tests;
#[cfg(test)]
mod state_store_tests;
Expand Down
3 changes: 0 additions & 3 deletions src/storage/hummock_test/src/local_version_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ use risingwave_pb::hummock::HummockVersion;
use risingwave_storage::hummock::compactor::CompactorContext;
use risingwave_storage::hummock::event_handler::hummock_event_handler::BufferTracker;
use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store;
use risingwave_storage::hummock::local_version::local_version_manager::{
LocalVersionManager, LocalVersionManagerRef,
};
use risingwave_storage::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch;
use risingwave_storage::hummock::shared_buffer::UncommittedData;
use risingwave_storage::hummock::test_utils::{
Expand Down
179 changes: 2 additions & 177 deletions src/storage/hummock_test/src/snapshot_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,12 @@ use std::sync::Arc;
use bytes::Bytes;
use futures::TryStreamExt;
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_meta::hummock::test_utils::setup_compute_env;
use risingwave_meta::hummock::MockHummockMetaClient;
use risingwave_rpc_client::HummockMetaClient;
use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store;
use risingwave_storage::hummock::test_utils::default_opts_for_test;
use risingwave_storage::hummock::*;
use risingwave_storage::monitor::{CompactorMetrics, HummockStateStoreMetrics};
use risingwave_storage::storage_value::StorageValue;
use risingwave_storage::store::{ReadOptions, StateStoreWrite, WriteOptions};
use risingwave_storage::StateStore;
use risingwave_storage::store::{ReadOptions, WriteOptions};

use crate::get_notification_client_for_test;
use crate::test_utils::{
with_hummock_storage_v1, with_hummock_storage_v2, HummockStateStoreTestTrait,
};
use crate::test_utils::{with_hummock_storage_v2, HummockStateStoreTestTrait};

macro_rules! assert_count_range_scan {
($storage:expr, $range:expr, $expect_count:expr, $epoch:expr) => {{
Expand Down Expand Up @@ -271,204 +262,38 @@ async fn test_snapshot_range_scan_inner(
assert_count_range_scan!(hummock_storage, .., 4, epoch);
}

#[ignore]
async fn test_snapshot_backward_range_scan_inner(enable_sync: bool, enable_commit: bool) {
let sstable_store = mock_sstable_store();
let hummock_options = Arc::new(default_opts_for_test());
let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) =
setup_compute_env(8080).await;
let mock_hummock_meta_client = Arc::new(MockHummockMetaClient::new(
hummock_manager_ref.clone(),
worker_node.id,
));

// TODO: may also test for v2 when the unit test is enabled.
let hummock_storage = HummockStorageV1::new(
hummock_options,
sstable_store,
mock_hummock_meta_client.clone(),
get_notification_client_for_test(env, hummock_manager_ref, worker_node),
Arc::new(HummockStateStoreMetrics::unused()),
Arc::new(risingwave_tracing::RwTracingService::disabled()),
Arc::new(CompactorMetrics::unused()),
)
.await
.unwrap();

let epoch = 1;
hummock_storage
.ingest_batch(
vec![
(Bytes::from("1"), StorageValue::new_put("test")),
(Bytes::from("2"), StorageValue::new_put("test")),
(Bytes::from("3"), StorageValue::new_put("test")),
(Bytes::from("4"), StorageValue::new_put("test")),
(Bytes::from("5"), StorageValue::new_put("test")),
(Bytes::from("6"), StorageValue::new_put("test")),
],
vec![],
WriteOptions {
epoch,
table_id: Default::default(),
},
)
.await
.unwrap();
if enable_sync {
let ssts = hummock_storage
.seal_and_sync_epoch(epoch)
.await
.unwrap()
.uncommitted_ssts;
if enable_commit {
mock_hummock_meta_client
.commit_epoch(epoch, ssts)
.await
.unwrap();
hummock_storage
.try_wait_epoch(HummockReadEpoch::Committed(epoch))
.await
.unwrap();
}
}
hummock_storage
.ingest_batch(
vec![
(Bytes::from("5"), StorageValue::new_put("test")),
(Bytes::from("6"), StorageValue::new_put("test")),
(Bytes::from("7"), StorageValue::new_put("test")),
(Bytes::from("8"), StorageValue::new_put("test")),
],
vec![],
WriteOptions {
epoch: epoch + 1,
table_id: Default::default(),
},
)
.await
.unwrap();
if enable_sync {
let ssts = hummock_storage
.seal_and_sync_epoch(epoch + 1)
.await
.unwrap()
.uncommitted_ssts;
if enable_commit {
mock_hummock_meta_client
.commit_epoch(epoch + 1, ssts)
.await
.unwrap();
hummock_storage
.try_wait_epoch(HummockReadEpoch::Committed(epoch + 1))
.await
.unwrap();
}
}

#[allow(unused_macros)]
macro_rules! key {
($idx:expr) => {
Bytes::from(stringify!($idx))
};
}

// TODO: re-enable it when backward range scan is supported again on hummock
// assert_count_backward_range_scan!(hummock_storage, key!(3)..=key!(2), 2, epoch);
// assert_count_backward_range_scan!(hummock_storage, key!(3)..key!(2), 1, epoch);
// assert_count_backward_range_scan!(hummock_storage, key!(3)..key!(1), 2, epoch);
// assert_count_backward_range_scan!(hummock_storage, key!(3)..=key!(1), 3, epoch);
// assert_count_backward_range_scan!(hummock_storage, key!(3)..key!(0), 3, epoch);
// assert_count_backward_range_scan!(hummock_storage, .., 6, epoch);
// assert_count_backward_range_scan!(hummock_storage, .., 8, epoch + 1);
// assert_count_backward_range_scan!(hummock_storage, key!(7)..key!(2), 5, epoch + 1);
}

#[tokio::test]
async fn test_snapshot_v1() {
let (storage, meta_client) = with_hummock_storage_v1().await;
test_snapshot_inner(storage, meta_client, false, false).await;
}

#[tokio::test]
async fn test_snapshot_v2() {
let (storage, meta_client) = with_hummock_storage_v2(Default::default()).await;
test_snapshot_inner(storage, meta_client, false, false).await;
}

#[tokio::test]
async fn test_snapshot_with_sync_v1() {
let (storage, meta_client) = with_hummock_storage_v1().await;
test_snapshot_inner(storage, meta_client, true, false).await;
}

#[tokio::test]
async fn test_snapshot_with_sync_v2() {
let (storage, meta_client) = with_hummock_storage_v2(Default::default()).await;
test_snapshot_inner(storage, meta_client, true, false).await;
}

#[tokio::test]
async fn test_snapshot_with_commit_v1() {
let (storage, meta_client) = with_hummock_storage_v1().await;
test_snapshot_inner(storage, meta_client, true, true).await;
}

#[tokio::test]
async fn test_snapshot_with_commit_v2() {
let (storage, meta_client) = with_hummock_storage_v2(Default::default()).await;
test_snapshot_inner(storage, meta_client, true, true).await;
}

#[tokio::test]
async fn test_snapshot_range_scan_v1() {
let (storage, meta_client) = with_hummock_storage_v1().await;
test_snapshot_range_scan_inner(storage, meta_client, false, false).await;
}

#[tokio::test]
async fn test_snapshot_range_scan_v2() {
let (storage, meta_client) = with_hummock_storage_v2(Default::default()).await;
test_snapshot_range_scan_inner(storage, meta_client, false, false).await;
}

#[tokio::test]
async fn test_snapshot_range_scan_with_sync_v1() {
let (storage, meta_client) = with_hummock_storage_v1().await;
test_snapshot_range_scan_inner(storage, meta_client, true, false).await;
}

#[tokio::test]
async fn test_snapshot_range_scan_with_sync_v2() {
let (storage, meta_client) = with_hummock_storage_v2(Default::default()).await;
test_snapshot_range_scan_inner(storage, meta_client, true, false).await;
}

#[tokio::test]
async fn test_snapshot_range_scan_with_commit_v1() {
let (storage, meta_client) = with_hummock_storage_v1().await;
test_snapshot_range_scan_inner(storage, meta_client, true, true).await;
}

#[tokio::test]
async fn test_snapshot_range_scan_with_commit_v2() {
let (storage, meta_client) = with_hummock_storage_v2(Default::default()).await;
test_snapshot_range_scan_inner(storage, meta_client, true, true).await;
}

#[ignore]
#[tokio::test]
async fn test_snapshot_backward_range_scan() {
test_snapshot_backward_range_scan_inner(false, false).await;
}

#[ignore]
#[tokio::test]
async fn test_snapshot_backward_range_scan_with_sync() {
test_snapshot_backward_range_scan_inner(true, false).await;
}

#[ignore]
#[tokio::test]
async fn test_snapshot_backward_range_scan_with_commit() {
test_snapshot_backward_range_scan_inner(true, true).await;
}
58 changes: 3 additions & 55 deletions src/storage/hummock_test/src/state_store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,15 @@ use risingwave_meta::hummock::MockHummockMetaClient;
use risingwave_rpc_client::HummockMetaClient;
use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store;
use risingwave_storage::hummock::test_utils::{count_stream, default_opts_for_test};
use risingwave_storage::hummock::{HummockStorage, HummockStorageV1};
use risingwave_storage::monitor::{CompactorMetrics, HummockStateStoreMetrics};
use risingwave_storage::hummock::HummockStorage;
use risingwave_storage::storage_value::StorageValue;
use risingwave_storage::store::{
ReadOptions, StateStore, StateStoreRead, StateStoreWrite, SyncResult, WriteOptions,
};

use crate::get_notification_client_for_test;
use crate::test_utils::{
with_hummock_storage_v1, with_hummock_storage_v2, HummockStateStoreTestTrait,
HummockV2MixedStateStore,
with_hummock_storage_v2, HummockStateStoreTestTrait, HummockV2MixedStateStore,
};

#[tokio::test]
Expand Down Expand Up @@ -75,12 +73,6 @@ async fn test_empty_read_v2() {
assert!(stream.try_next().await.unwrap().is_none());
}

#[tokio::test]
async fn test_basic_v1() {
let (hummock_storage, meta_client) = with_hummock_storage_v1().await;
test_basic_inner(hummock_storage, meta_client).await;
}

#[tokio::test]
async fn test_basic_v2() {
let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await;
Expand Down Expand Up @@ -427,12 +419,6 @@ async fn test_basic_inner(
assert!(value.is_none());
}

#[tokio::test]
async fn test_state_store_sync_v1() {
let (hummock_storage, meta_client) = with_hummock_storage_v1().await;
test_state_store_sync_inner(hummock_storage, meta_client).await;
}

#[tokio::test]
async fn test_state_store_sync_v2() {
let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await;
Expand Down Expand Up @@ -538,27 +524,7 @@ async fn test_reload_storage() {
let hummock_options = Arc::new(default_opts_for_test());
let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) =
setup_compute_env(8080).await;
let meta_client = Arc::new(MockHummockMetaClient::new(
hummock_manager_ref.clone(),
worker_node.id,
));

// TODO: may also test for v2 when the unit test is enabled.
let hummock_storage = HummockStorageV1::new(
hummock_options.clone(),
sstable_store.clone(),
meta_client.clone(),
get_notification_client_for_test(
env.clone(),
hummock_manager_ref.clone(),
worker_node.clone(),
),
Arc::new(HummockStateStoreMetrics::unused()),
Arc::new(risingwave_tracing::RwTracingService::disabled()),
Arc::new(CompactorMetrics::unused()),
)
.await
.unwrap();
let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await;
let anchor = Bytes::from("aa");

// First batch inserts the anchor and others.
Expand Down Expand Up @@ -754,12 +720,6 @@ async fn test_reload_storage() {
assert_eq!(len, 3);
}

#[tokio::test]
async fn test_write_anytime_v1() {
let (hummock_storage, meta_client) = with_hummock_storage_v1().await;
test_write_anytime_inner(hummock_storage, meta_client).await;
}

#[tokio::test]
async fn test_write_anytime_v2() {
let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await;
Expand Down Expand Up @@ -1054,12 +1014,6 @@ async fn test_write_anytime_inner(
assert!(!ssts2.is_empty());
}

#[tokio::test]
async fn test_delete_get_v1() {
let (hummock_storage, meta_client) = with_hummock_storage_v1().await;
test_delete_get_inner(hummock_storage, meta_client).await;
}

#[tokio::test]
async fn test_delete_get_v2() {
let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await;
Expand Down Expand Up @@ -1134,12 +1088,6 @@ async fn test_delete_get_inner(
.is_none());
}

#[tokio::test]
async fn test_multiple_epoch_sync_v1() {
let (hummock_storage, meta_client) = with_hummock_storage_v1().await;
test_multiple_epoch_sync_inner(hummock_storage, meta_client).await;
}

#[tokio::test]
async fn test_multiple_epoch_sync_v2() {
let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await;
Expand Down
Loading

0 comments on commit 4835160

Please sign in to comment.