Skip to content

Commit

Permalink
fix: correctly implement check_state_consistency (#3711)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Jul 7, 2022
1 parent 078c2f6 commit 3bb55cf
Showing 1 changed file with 49 additions and 30 deletions.
79 changes: 49 additions & 30 deletions src/meta/src/hummock/hummock_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::BorrowMut;
use std::collections::{BTreeMap, HashSet, VecDeque};
use std::future::Future;
use std::ops::DerefMut;
Expand All @@ -36,7 +37,7 @@ use risingwave_pb::hummock::{
};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::MetaLeaderInfo;
use tokio::sync::RwLock;
use tokio::sync::{RwLock, RwLockWriteGuard};

use crate::cluster::{ClusterManagerRef, META_NODE_ID};
use crate::hummock::compaction::{CompactStatus, ManualCompactionOption};
Expand Down Expand Up @@ -185,9 +186,22 @@ where
Ok(instance)
}

/// Load state from meta store.
async fn load_meta_store_state(&self) -> Result<()> {
let mut compaction_guard = self.compaction.write().await;
let mut versioning_guard = self.versioning.write().await;
self.load_meta_store_state_impl(
compaction_guard.borrow_mut(),
versioning_guard.borrow_mut(),
)
.await
}

/// Load state from meta store.
async fn load_meta_store_state_impl(
&self,
compaction_guard: &mut RwLockWriteGuard<'_, Compaction>,
versioning_guard: &mut RwLockWriteGuard<'_, Versioning>,
) -> Result<()> {
let compaction_statuses = CompactStatus::list(self.env.meta_store())
.await?
.into_iter()
Expand Down Expand Up @@ -215,7 +229,6 @@ where
.map(|assigned| (assigned.key().unwrap(), assigned))
.collect();

let mut versioning_guard = self.versioning.write().await;
versioning_guard.current_version_id = CurrentHummockVersionId::get(self.env.meta_store())
.await?
.unwrap_or_else(CurrentHummockVersionId::new);
Expand Down Expand Up @@ -1255,33 +1268,39 @@ where
// TODO: use proc macro to call check_state_consistency
#[cfg(test)]
pub async fn check_state_consistency(&self) {
let get_state = || async {
let compaction_guard = self.compaction.read().await;
let versioning_guard = self.versioning.read().await;
let compact_statuses_copy = compaction_guard.compaction_statuses.clone();
let compact_task_assignment_copy = compaction_guard.compact_task_assignment.clone();
let current_version_id_copy = versioning_guard.current_version_id.clone();
// let hummmock_versions_copy = versioning_guard.hummock_versions.clone();
let pinned_versions_copy = versioning_guard.pinned_versions.clone();
let pinned_snapshots_copy = versioning_guard.pinned_snapshots.clone();
let stale_sstables_copy = versioning_guard.stale_sstables.clone();
let sst_id_infos_copy = versioning_guard.sstable_id_infos.clone();
(
compact_statuses_copy,
compact_task_assignment_copy,
current_version_id_copy,
// hummmock_versions_copy,
pinned_versions_copy,
pinned_snapshots_copy,
stale_sstables_copy,
sst_id_infos_copy,
)
};
let mem_state = get_state().await;
self.load_meta_store_state()
.await
.expect("Failed to load state from meta store");
let loaded_state = get_state().await;
use std::borrow::Borrow;
let mut compaction_guard = self.compaction.write().await;
let mut versioning_guard = self.versioning.write().await;
let get_state =
|compaction_guard: &RwLockWriteGuard<'_, Compaction>,
versioning_guard: &RwLockWriteGuard<'_, Versioning>| {
let compact_statuses_copy = compaction_guard.compaction_statuses.clone();
let compact_task_assignment_copy = compaction_guard.compact_task_assignment.clone();
let current_version_id_copy = versioning_guard.current_version_id.clone();
// let hummmock_versions_copy = versioning_guard.hummock_versions.clone();
let pinned_versions_copy = versioning_guard.pinned_versions.clone();
let pinned_snapshots_copy = versioning_guard.pinned_snapshots.clone();
let stale_sstables_copy = versioning_guard.stale_sstables.clone();
let sst_id_infos_copy = versioning_guard.sstable_id_infos.clone();
(
compact_statuses_copy,
compact_task_assignment_copy,
current_version_id_copy,
// hummmock_versions_copy,
pinned_versions_copy,
pinned_snapshots_copy,
stale_sstables_copy,
sst_id_infos_copy,
)
};
let mem_state = get_state(compaction_guard.borrow(), versioning_guard.borrow());
self.load_meta_store_state_impl(
compaction_guard.borrow_mut(),
versioning_guard.borrow_mut(),
)
.await
.expect("Failed to load state from meta store");
let loaded_state = get_state(compaction_guard.borrow(), versioning_guard.borrow());
assert_eq!(
mem_state, loaded_state,
"hummock in-mem state is inconsistent with meta store state",
Expand Down

0 comments on commit 3bb55cf

Please sign in to comment.