From 335007ca286c964b8f64d47f20a7f4e808b76596 Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Tue, 8 Nov 2022 11:58:02 +0100 Subject: [PATCH] State-db refactoring (#12239) * Prune discarded blocks immediately * state-db refactoring part 1 * Some renames * Get rid of pending state * Revert "Prune discarded blocks immediately" This reverts commit 790f54038b52ff379a573ed0806f38d09af098ec. * Cleanup * Make clippy happy * Minor changes --- client/db/src/lib.rs | 18 +- client/state-db/src/lib.rs | 57 +--- client/state-db/src/noncanonical.rs | 245 ++++------------ client/state-db/src/pruning.rs | 418 +++++++--------------------- 4 files changed, 181 insertions(+), 557 deletions(-) diff --git a/client/db/src/lib.rs b/client/db/src/lib.rs index fc031e2aaba59..3bbff1625f2f9 100644 --- a/client/db/src/lib.rs +++ b/client/db/src/lib.rs @@ -1994,15 +1994,15 @@ impl sc_client_api::backend::Backend for Backend { let usage = operation.old_state.usage_info(); self.state_usage.merge_sm(usage); - match self.try_commit_operation(operation) { - Ok(_) => { - self.storage.state_db.apply_pending(); - Ok(()) - }, - e @ Err(_) => { - self.storage.state_db.revert_pending(); - e - }, + if let Err(e) = self.try_commit_operation(operation) { + let state_meta_db = StateMetaDb(self.storage.db.clone()); + self.storage + .state_db + .reset(state_meta_db) + .map_err(sp_blockchain::Error::from_state_db)?; + Err(e) + } else { + Ok(()) } } diff --git a/client/state-db/src/lib.rs b/client/state-db/src/lib.rs index f21b707a489f0..01a198a1b3c1e 100644 --- a/client/state-db/src/lib.rs +++ b/client/state-db/src/lib.rs @@ -291,6 +291,7 @@ pub struct StateDbSync { non_canonical: NonCanonicalOverlay, pruning: Option>, pinned: HashMap, + ref_counting: bool, } impl @@ -311,7 +312,7 @@ impl PruningMode::ArchiveAll | PruningMode::ArchiveCanonical => None, }; - Ok(StateDbSync { mode, non_canonical, pruning, pinned: Default::default() }) + Ok(StateDbSync { mode, non_canonical, pruning, pinned: Default::default(), ref_counting }) } fn insert_block( @@ -372,9 +373,9 @@ impl match self.pruning.as_ref() { None => IsPruned::NotPruned, Some(pruning) => match pruning.have_block(hash, number) { - HaveBlock::NotHave => IsPruned::Pruned, - HaveBlock::Have => IsPruned::NotPruned, - HaveBlock::MayHave => IsPruned::MaybePruned, + HaveBlock::No => IsPruned::Pruned, + HaveBlock::Yes => IsPruned::NotPruned, + HaveBlock::Maybe => IsPruned::MaybePruned, }, } } @@ -444,9 +445,9 @@ impl let have_block = self.non_canonical.have_block(hash) || self.pruning.as_ref().map_or(false, |pruning| { match pruning.have_block(hash, number) { - HaveBlock::NotHave => false, - HaveBlock::Have => true, - HaveBlock::MayHave => hint(), + HaveBlock::No => false, + HaveBlock::Yes => true, + HaveBlock::Maybe => hint(), } }); if have_block { @@ -496,30 +497,6 @@ impl db.get(key.as_ref()).map_err(Error::Db) } - fn apply_pending(&mut self) { - self.non_canonical.apply_pending(); - if let Some(pruning) = &mut self.pruning { - pruning.apply_pending(); - } - let next_hash = self.pruning.as_mut().map(|p| p.next_hash()); - trace!( - target: "forks", - "First available: {:?} ({}), Last canon: {:?} ({}), Best forks: {:?}", - next_hash, - self.pruning.as_ref().map(|p| p.pending()).unwrap_or(0), - self.non_canonical.last_canonicalized_hash(), - self.non_canonical.last_canonicalized_block_number().unwrap_or(0), - self.non_canonical.top_level(), - ); - } - - fn revert_pending(&mut self) { - if let Some(pruning) = &mut self.pruning { - pruning.revert_pending(); - } - self.non_canonical.revert_pending(); - } - fn memory_info(&self) -> StateDbMemoryInfo { StateDbMemoryInfo { non_canonical: MemorySize::from_bytes(malloc_size(&self.non_canonical)), @@ -654,14 +631,11 @@ impl return self.db.read().is_pruned(hash, number) } - /// Apply all pending changes - pub fn apply_pending(&self) { - self.db.write().apply_pending(); - } - - /// Revert all pending changes - pub fn revert_pending(&self) { - self.db.write().revert_pending(); + /// Reset in-memory changes to the last disk-backed state. + pub fn reset(&self, db: D) -> Result<(), Error> { + let mut state_db = self.db.write(); + *state_db = StateDbSync::new(state_db.mode.clone(), state_db.ref_counting, db)?; + Ok(()) } /// Returns the current memory statistics of this instance. @@ -766,9 +740,7 @@ mod tests { ) .unwrap(), ); - state_db.apply_pending(); db.commit(&state_db.canonicalize_block(&H256::from_low_u64_be(1)).unwrap()); - state_db.apply_pending(); db.commit( &state_db .insert_block( @@ -779,11 +751,8 @@ mod tests { ) .unwrap(), ); - state_db.apply_pending(); db.commit(&state_db.canonicalize_block(&H256::from_low_u64_be(21)).unwrap()); - state_db.apply_pending(); db.commit(&state_db.canonicalize_block(&H256::from_low_u64_be(3)).unwrap()); - state_db.apply_pending(); (db, state_db) } diff --git a/client/state-db/src/noncanonical.rs b/client/state-db/src/noncanonical.rs index 559fc7ca023fe..3711cf7a42667 100644 --- a/client/state-db/src/noncanonical.rs +++ b/client/state-db/src/noncanonical.rs @@ -19,8 +19,6 @@ //! Canonicalization window. //! Maintains trees of block overlays and allows discarding trees/roots //! The overlays are added in `insert` and removed in `canonicalize`. -//! All pending changes are kept in memory until next call to `apply_pending` or -//! `revert_pending` use super::{to_meta_key, ChangeSet, CommitSet, DBValue, Error, Hash, MetaDb, StateDbError}; use codec::{Decode, Encode}; @@ -37,8 +35,6 @@ pub struct NonCanonicalOverlay { last_canonicalized: Option<(BlockHash, u64)>, levels: VecDeque>, parents: HashMap, - pending_canonicalizations: Vec, - pending_insertions: Vec, values: HashMap, // ref counted // would be deleted but kept around because block is pinned, ref counted. pinned: HashMap, @@ -229,8 +225,6 @@ impl NonCanonicalOverlay { last_canonicalized, levels, parents, - pending_canonicalizations: Default::default(), - pending_insertions: Default::default(), pinned: Default::default(), pinned_insertions: Default::default(), values, @@ -316,9 +310,8 @@ impl NonCanonicalOverlay { deleted: changeset.deleted, }; commit.meta.inserted.push((journal_key, journal_record.encode())); - trace!(target: "state-db", "Inserted uncanonicalized changeset {}.{} ({} inserted, {} deleted)", number, index, journal_record.inserted.len(), journal_record.deleted.len()); + trace!(target: "state-db", "Inserted uncanonicalized changeset {}.{} {:?} ({} inserted, {} deleted)", number, index, hash, journal_record.inserted.len(), journal_record.deleted.len()); insert_values(&mut self.values, journal_record.inserted); - self.pending_insertions.push(hash.clone()); Ok(commit) } @@ -355,24 +348,7 @@ impl NonCanonicalOverlay { } pub fn last_canonicalized_block_number(&self) -> Option { - match self.last_canonicalized.as_ref().map(|&(_, n)| n) { - Some(n) => Some(n + self.pending_canonicalizations.len() as u64), - None if !self.pending_canonicalizations.is_empty() => - Some(self.pending_canonicalizations.len() as u64), - _ => None, - } - } - - pub fn last_canonicalized_hash(&self) -> Option { - self.last_canonicalized.as_ref().map(|&(ref h, _)| h.clone()) - } - - pub fn top_level(&self) -> Vec<(BlockHash, u64)> { - let start = self.last_canonicalized_block_number().unwrap_or(0); - self.levels - .get(self.pending_canonicalizations.len()) - .map(|level| level.blocks.iter().map(|r| (r.hash.clone(), start)).collect()) - .unwrap_or_default() + self.last_canonicalized.as_ref().map(|&(_, n)| n) } /// Select a top-level root and canonicalized it. Discards all sibling subtrees and the root. @@ -384,10 +360,10 @@ impl NonCanonicalOverlay { commit: &mut CommitSet, ) -> Result { trace!(target: "state-db", "Canonicalizing {:?}", hash); - let level = self - .levels - .get(self.pending_canonicalizations.len()) - .ok_or(StateDbError::InvalidBlock)?; + let level = match self.levels.pop_front() { + Some(level) => level, + None => return Err(StateDbError::InvalidBlock), + }; let index = level .blocks .iter() @@ -396,91 +372,63 @@ impl NonCanonicalOverlay { let mut discarded_journals = Vec::new(); let mut discarded_blocks = Vec::new(); - for (i, overlay) in level.blocks.iter().enumerate() { - if i != index { + for (i, overlay) in level.blocks.into_iter().enumerate() { + let mut pinned_children = 0; + // That's the one we need to canonicalize + if i == index { + commit.data.inserted.extend(overlay.inserted.iter().map(|k| { + ( + k.clone(), + self.values + .get(k) + .expect("For each key in overlays there's a value in values") + .1 + .clone(), + ) + })); + commit.data.deleted.extend(overlay.deleted.clone()); + } else { + // Discard this overlay self.discard_journals( - self.pending_canonicalizations.len() + 1, + 0, &mut discarded_journals, &mut discarded_blocks, &overlay.hash, ); + pinned_children = discard_descendants( + &mut self.levels.as_mut_slices(), + &mut self.values, + &mut self.parents, + &self.pinned, + &mut self.pinned_insertions, + &overlay.hash, + ); + } + if self.pinned.contains_key(&overlay.hash) { + pinned_children += 1; + } + if pinned_children != 0 { + self.pinned_insertions + .insert(overlay.hash.clone(), (overlay.inserted, pinned_children)); + } else { + self.parents.remove(&overlay.hash); + discard_values(&mut self.values, overlay.inserted); } discarded_journals.push(overlay.journal_key.clone()); discarded_blocks.push(overlay.hash.clone()); } - - // get the one we need to canonicalize - let overlay = &level.blocks[index]; - commit.data.inserted.extend(overlay.inserted.iter().map(|k| { - ( - k.clone(), - self.values - .get(k) - .expect("For each key in overlays there's a value in values") - .1 - .clone(), - ) - })); - commit.data.deleted.extend(overlay.deleted.clone()); - commit.meta.deleted.append(&mut discarded_journals); - let canonicalized = - (hash.clone(), self.front_block_number() + self.pending_canonicalizations.len() as u64); + + let canonicalized = (hash.clone(), self.front_block_number()); commit .meta .inserted .push((to_meta_key(LAST_CANONICAL, &()), canonicalized.encode())); trace!(target: "state-db", "Discarding {} records", commit.meta.deleted.len()); - self.pending_canonicalizations.push(hash.clone()); - Ok(canonicalized.1) - } - fn apply_canonicalizations(&mut self) { - let last = self.pending_canonicalizations.last().cloned(); - let count = self.pending_canonicalizations.len() as u64; - for hash in self.pending_canonicalizations.drain(..) { - trace!(target: "state-db", "Post canonicalizing {:?}", hash); - let level = - self.levels.pop_front().expect("Hash validity is checked in `canonicalize`"); - let index = level - .blocks - .iter() - .position(|overlay| overlay.hash == hash) - .expect("Hash validity is checked in `canonicalize`"); - - // discard unfinalized overlays and values - for (i, overlay) in level.blocks.into_iter().enumerate() { - let mut pinned_children = if i != index { - discard_descendants( - &mut self.levels.as_mut_slices(), - &mut self.values, - &mut self.parents, - &self.pinned, - &mut self.pinned_insertions, - &overlay.hash, - ) - } else { - 0 - }; - if self.pinned.contains_key(&overlay.hash) { - pinned_children += 1; - } - if pinned_children != 0 { - self.pinned_insertions - .insert(overlay.hash.clone(), (overlay.inserted, pinned_children)); - } else { - self.parents.remove(&overlay.hash); - discard_values(&mut self.values, overlay.inserted); - } - } - } - if let Some(hash) = last { - let last_canonicalized = ( - hash, - self.last_canonicalized.as_ref().map(|(_, n)| n + count).unwrap_or(count - 1), - ); - self.last_canonicalized = Some(last_canonicalized); - } + let num = canonicalized.1; + self.last_canonicalized = Some(canonicalized); + Ok(num) } /// Get a value from the node overlay. This searches in every existing changeset. @@ -494,8 +442,7 @@ impl NonCanonicalOverlay { /// Check if the block is in the canonicalization queue. pub fn have_block(&self, hash: &BlockHash) -> bool { - (self.parents.contains_key(hash) || self.pending_insertions.contains(hash)) && - !self.pending_canonicalizations.contains(hash) + self.parents.contains_key(hash) } /// Revert a single level. Returns commit set that deletes the journal or `None` if not @@ -543,50 +490,8 @@ impl NonCanonicalOverlay { } } - fn revert_insertions(&mut self) { - self.pending_insertions.reverse(); - for hash in self.pending_insertions.drain(..) { - self.parents.remove(&hash); - // find a level. When iterating insertions backwards the hash is always last in the - // level. - let level_index = self - .levels - .iter() - .position(|level| { - level.blocks.last().expect("Hash is added in `insert` in reverse order").hash == - hash - }) - .expect("Hash is added in insert"); - - let overlay_index = self.levels[level_index].blocks.len() - 1; - let overlay = self.levels[level_index].remove(overlay_index); - discard_values(&mut self.values, overlay.inserted); - if self.levels[level_index].blocks.is_empty() { - debug_assert_eq!(level_index, self.levels.len() - 1); - self.levels.pop_back(); - } - } - } - - /// Apply all pending changes - pub fn apply_pending(&mut self) { - self.apply_canonicalizations(); - self.pending_insertions.clear(); - } - - /// Revert all pending changes - pub fn revert_pending(&mut self) { - self.pending_canonicalizations.clear(); - self.revert_insertions(); - } - /// Pin state values in memory pub fn pin(&mut self, hash: &BlockHash) { - if self.pending_insertions.contains(hash) { - // Pinning pending state is not implemented. Pending states - // won't be pruned for quite some time anyway, so it's not a big deal. - return - } let refs = self.pinned.entry(hash.clone()).or_default(); if *refs == 0 { trace!(target: "state-db-pin", "Pinned non-canon block: {:?}", hash); @@ -779,7 +684,6 @@ mod tests { let mut commit = CommitSet::default(); overlay.canonicalize(&h1, &mut commit).unwrap(); db.commit(&commit); - overlay.apply_pending(); assert_eq!(overlay.levels.len(), 1); let overlay2 = NonCanonicalOverlay::::new(&db).unwrap(); @@ -806,18 +710,13 @@ mod tests { let mut commit = CommitSet::default(); overlay.canonicalize(&h1, &mut commit).unwrap(); db.commit(&commit); - assert!(contains(&overlay, 5)); - assert_eq!(overlay.levels.len(), 2); - assert_eq!(overlay.parents.len(), 2); - overlay.apply_pending(); - assert_eq!(overlay.levels.len(), 1); - assert_eq!(overlay.parents.len(), 1); assert!(!contains(&overlay, 5)); assert!(contains(&overlay, 7)); + assert_eq!(overlay.levels.len(), 1); + assert_eq!(overlay.parents.len(), 1); let mut commit = CommitSet::default(); overlay.canonicalize(&h2, &mut commit).unwrap(); db.commit(&commit); - overlay.apply_pending(); assert_eq!(overlay.levels.len(), 0); assert_eq!(overlay.parents.len(), 0); assert!(db.data_eq(&make_db(&[1, 4, 6, 7, 8]))); @@ -836,13 +735,11 @@ mod tests { let mut commit = CommitSet::default(); overlay.canonicalize(&h_1, &mut commit).unwrap(); db.commit(&commit); - assert!(contains(&overlay, 1)); - overlay.apply_pending(); assert!(!contains(&overlay, 1)); } #[test] - fn insert_with_pending_canonicalization() { + fn insert_and_canonicalize() { let h1 = H256::random(); let h2 = H256::random(); let h3 = H256::random(); @@ -851,13 +748,11 @@ mod tests { let changeset = make_changeset(&[], &[]); db.commit(&overlay.insert(&h1, 1, &H256::default(), changeset.clone()).unwrap()); db.commit(&overlay.insert(&h2, 2, &h1, changeset.clone()).unwrap()); - overlay.apply_pending(); let mut commit = CommitSet::default(); overlay.canonicalize(&h1, &mut commit).unwrap(); overlay.canonicalize(&h2, &mut commit).unwrap(); db.commit(&commit); db.commit(&overlay.insert(&h3, 3, &h2, changeset.clone()).unwrap()); - overlay.apply_pending(); assert_eq!(overlay.levels.len(), 1); } @@ -927,7 +822,6 @@ mod tests { let mut commit = CommitSet::default(); overlay.canonicalize(&h_1, &mut commit).unwrap(); db.commit(&commit); - overlay.apply_pending(); assert_eq!(overlay.levels.len(), 2); assert_eq!(overlay.parents.len(), 6); assert!(!contains(&overlay, 1)); @@ -948,7 +842,6 @@ mod tests { let mut commit = CommitSet::default(); overlay.canonicalize(&h_1_2, &mut commit).unwrap(); db.commit(&commit); - overlay.apply_pending(); assert_eq!(overlay.levels.len(), 1); assert_eq!(overlay.parents.len(), 3); assert!(!contains(&overlay, 11)); @@ -965,7 +858,6 @@ mod tests { let mut commit = CommitSet::default(); overlay.canonicalize(&h_1_2_2, &mut commit).unwrap(); db.commit(&commit); - overlay.apply_pending(); assert_eq!(overlay.levels.len(), 0); assert_eq!(overlay.parents.len(), 0); assert!(db.data_eq(&make_db(&[1, 12, 122]))); @@ -994,31 +886,6 @@ mod tests { assert!(overlay.revert_one().is_none()); } - #[test] - fn revert_pending_insertion() { - let h1 = H256::random(); - let h2_1 = H256::random(); - let h2_2 = H256::random(); - let db = make_db(&[]); - let mut overlay = NonCanonicalOverlay::::new(&db).unwrap(); - let changeset1 = make_changeset(&[5, 6], &[2]); - let changeset2 = make_changeset(&[7, 8], &[5, 3]); - let changeset3 = make_changeset(&[9], &[]); - overlay.insert(&h1, 1, &H256::default(), changeset1).unwrap(); - assert!(contains(&overlay, 5)); - overlay.insert(&h2_1, 2, &h1, changeset2).unwrap(); - overlay.insert(&h2_2, 2, &h1, changeset3).unwrap(); - assert!(contains(&overlay, 7)); - assert!(contains(&overlay, 5)); - assert!(contains(&overlay, 9)); - assert_eq!(overlay.levels.len(), 2); - assert_eq!(overlay.parents.len(), 3); - overlay.revert_pending(); - assert!(!contains(&overlay, 5)); - assert_eq!(overlay.levels.len(), 0); - assert_eq!(overlay.parents.len(), 0); - } - #[test] fn keeps_pinned() { let mut db = make_db(&[]); @@ -1033,14 +900,12 @@ mod tests { let mut overlay = NonCanonicalOverlay::::new(&db).unwrap(); db.commit(&overlay.insert(&h_1, 1, &H256::default(), c_1).unwrap()); db.commit(&overlay.insert(&h_2, 1, &H256::default(), c_2).unwrap()); - overlay.apply_pending(); overlay.pin(&h_1); let mut commit = CommitSet::default(); overlay.canonicalize(&h_2, &mut commit).unwrap(); db.commit(&commit); - overlay.apply_pending(); assert!(contains(&overlay, 1)); overlay.unpin(&h_1); assert!(!contains(&overlay, 1)); @@ -1064,14 +929,12 @@ mod tests { db.commit(&overlay.insert(&h_1, 1, &H256::default(), c_1).unwrap()); db.commit(&overlay.insert(&h_2, 1, &H256::default(), c_2).unwrap()); db.commit(&overlay.insert(&h_3, 1, &H256::default(), c_3).unwrap()); - overlay.apply_pending(); overlay.pin(&h_1); let mut commit = CommitSet::default(); overlay.canonicalize(&h_3, &mut commit).unwrap(); db.commit(&commit); - overlay.apply_pending(); // 1_2 should be discarded, 1_1 is pinned assert!(contains(&overlay, 1)); overlay.unpin(&h_1); @@ -1094,14 +957,12 @@ mod tests { db.commit(&overlay.insert(&h_11, 1, &H256::default(), c_11).unwrap()); db.commit(&overlay.insert(&h_12, 1, &H256::default(), c_12).unwrap()); db.commit(&overlay.insert(&h_21, 2, &h_11, c_21).unwrap()); - overlay.apply_pending(); overlay.pin(&h_21); let mut commit = CommitSet::default(); overlay.canonicalize(&h_12, &mut commit).unwrap(); db.commit(&commit); - overlay.apply_pending(); // 1_1 and 2_1 should be both pinned assert!(contains(&overlay, 1)); overlay.unpin(&h_21); @@ -1129,12 +990,10 @@ mod tests { overlay.canonicalize(&root, &mut commit).unwrap(); overlay.canonicalize(&h2, &mut commit).unwrap(); // h11 should stay in the DB db.commit(&commit); - overlay.apply_pending(); assert_eq!(overlay.levels.len(), 1); assert!(contains(&overlay, 21)); assert!(!contains(&overlay, 11)); assert!(db.get_meta(&to_journal_key(12, 1)).unwrap().is_some()); - assert!(db.get_meta(&to_journal_key(12, 0)).unwrap().is_none()); // Restore into a new overlay and check that journaled value exists. let mut overlay = NonCanonicalOverlay::::new(&db).unwrap(); @@ -1143,7 +1002,6 @@ mod tests { let mut commit = CommitSet::default(); overlay.canonicalize(&h21, &mut commit).unwrap(); // h11 should stay in the DB db.commit(&commit); - overlay.apply_pending(); assert!(!contains(&overlay, 21)); } @@ -1167,7 +1025,6 @@ mod tests { overlay.canonicalize(&root, &mut commit).unwrap(); overlay.canonicalize(&h2, &mut commit).unwrap(); // h11 should stay in the DB db.commit(&commit); - overlay.apply_pending(); // add another block at top level. It should reuse journal index 0 of previously discarded // block diff --git a/client/state-db/src/pruning.rs b/client/state-db/src/pruning.rs index 50a46def59541..458522b8119fd 100644 --- a/client/state-db/src/pruning.rs +++ b/client/state-db/src/pruning.rs @@ -29,11 +29,8 @@ use crate::{ DEFAULT_MAX_BLOCK_CONSTRAINT, }; use codec::{Decode, Encode}; -use log::{error, trace, warn}; -use std::{ - cmp, - collections::{HashMap, HashSet, VecDeque}, -}; +use log::trace; +use std::collections::{HashMap, HashSet, VecDeque}; pub(crate) const LAST_PRUNED: &[u8] = b"last_pruned"; const PRUNING_JOURNAL: &[u8] = b"pruning_journal"; @@ -44,14 +41,8 @@ pub struct RefWindow { /// A queue of blocks keep tracking keys that should be deleted for each block in the /// pruning window. queue: DeathRowQueue, - /// Block number that corresponds to the front of `death_rows`. + /// Block number that is next to be pruned. base: u64, - /// Number of call of `note_canonical` after - /// last call `apply_pending` or `revert_pending` - pending_canonicalizations: usize, - /// Number of calls of `prune_one` after - /// last call `apply_pending` or `revert_pending` - pending_prunings: usize, } /// `DeathRowQueue` used to keep track of blocks in the pruning window, there are two flavors: @@ -72,13 +63,13 @@ enum DeathRowQueue { #[ignore_malloc_size_of = "Shared data"] db: D, /// A queue of keys that should be deleted for each block in the pruning window. - /// Only caching the first fews blocks of the pruning window, blocks inside are + /// Only caching the first few blocks of the pruning window, blocks inside are /// successive and ordered by block number cache: VecDeque>, /// A soft limit of the cache's size cache_capacity: usize, - /// The number of blocks in queue that are not loaded into `cache`. - uncached_blocks: usize, + /// Last block number added to the window + last: Option, }, } @@ -99,7 +90,7 @@ impl DeathRowQueue { let record: JournalRecord = Decode::decode(&mut record.as_slice())?; trace!(target: "state-db", "Pruning journal entry {} ({} inserted, {} deleted)", block, record.inserted.len(), record.deleted.len()); - queue.import(base, record); + queue.import(base, block, record); }, None => break, } @@ -113,36 +104,30 @@ impl DeathRowQueue { fn new_db_backed( db: D, base: u64, - mut uncached_blocks: usize, + last: Option, window_size: u32, ) -> Result, Error> { // limit the cache capacity from 1 to `DEFAULT_MAX_BLOCK_CONSTRAINT` let cache_capacity = window_size.clamp(1, DEFAULT_MAX_BLOCK_CONSTRAINT) as usize; let mut cache = VecDeque::with_capacity(cache_capacity); trace!(target: "state-db", "Reading pruning journal for the database-backed queue. Pending #{}", base); - // Load block from db - DeathRowQueue::load_batch_from_db( - &db, - &mut uncached_blocks, - &mut cache, - base, - cache_capacity, - )?; - Ok(DeathRowQueue::DbBacked { db, cache, cache_capacity, uncached_blocks }) + DeathRowQueue::load_batch_from_db(&db, &mut cache, base, cache_capacity)?; + Ok(DeathRowQueue::DbBacked { db, cache, cache_capacity, last }) } /// import a new block to the back of the queue - fn import(&mut self, base: u64, journal_record: JournalRecord) { + fn import(&mut self, base: u64, num: u64, journal_record: JournalRecord) { let JournalRecord { hash, inserted, deleted } = journal_record; + trace!(target: "state-db", "Importing {}, base={}", num, base); match self { - DeathRowQueue::DbBacked { uncached_blocks, cache, cache_capacity, .. } => { - // `uncached_blocks` is zero means currently all block are loaded into `cache` - // thus if `cache` is not full, load the next block into `cache` too - if *uncached_blocks == 0 && cache.len() < *cache_capacity { + DeathRowQueue::DbBacked { cache, cache_capacity, last, .. } => { + // If the new block continues cached range and there is space, load it directly into + // cache. + if num == base + cache.len() as u64 && cache.len() < *cache_capacity { + trace!(target: "state-db", "Adding to DB backed cache {:?} (#{})", hash, num); cache.push_back(DeathRow { hash, deleted: deleted.into_iter().collect() }); - } else { - *uncached_blocks += 1; } + *last = Some(num); }, DeathRowQueue::Mem { death_rows, death_index } => { // remove all re-inserted keys from death rows @@ -168,16 +153,9 @@ impl DeathRowQueue { base: u64, ) -> Result>, Error> { match self { - DeathRowQueue::DbBacked { db, uncached_blocks, cache, cache_capacity } => { - if cache.is_empty() && *uncached_blocks != 0 { - // load more blocks from db since there are still blocks in it - DeathRowQueue::load_batch_from_db( - db, - uncached_blocks, - cache, - base, - *cache_capacity, - )?; + DeathRowQueue::DbBacked { db, cache, cache_capacity, .. } => { + if cache.is_empty() { + DeathRowQueue::load_batch_from_db(db, cache, base, *cache_capacity)?; } Ok(cache.pop_front()) }, @@ -193,113 +171,37 @@ impl DeathRowQueue { } } - /// Revert recent additions to the queue, namely remove `amount` number of blocks from the back - /// of the queue, `base` is the block number of the first block of the queue - fn revert_recent_add(&mut self, base: u64, amout: usize) { - debug_assert!(amout <= self.len()); - match self { - DeathRowQueue::DbBacked { uncached_blocks, cache, .. } => { - // remove from `uncached_blocks` if it can cover - if *uncached_blocks >= amout { - *uncached_blocks -= amout; - return - } - // reset `uncached_blocks` and remove remain blocks from `cache` - let remain = amout - *uncached_blocks; - *uncached_blocks = 0; - cache.truncate(cache.len() - remain); - }, - DeathRowQueue::Mem { death_rows, death_index } => { - // Revert recent addition to the queue - // Note that pending insertions might cause some existing deletions to be removed - // from `death_index` We don't bother to track and revert that for now. This means - // that a few nodes might end up no being deleted in case transaction fails and - // `revert_pending` is called. - death_rows.truncate(death_rows.len() - amout); - let new_max_block = death_rows.len() as u64 + base; - death_index.retain(|_, block| *block < new_max_block); - }, - } - } - - /// Load a batch of blocks from the backend database into `cache`, start from (and include) the - /// next block followe the last block of `cache`, `base` is the block number of the first block - /// of the queue + /// Load a batch of blocks from the backend database into `cache`, starting from `base` and up + /// to `base + cache_capacity` fn load_batch_from_db( db: &D, - uncached_blocks: &mut usize, cache: &mut VecDeque>, base: u64, cache_capacity: usize, ) -> Result<(), Error> { - // return if all blocks already loaded into `cache` and there are no other - // blocks in the backend database - if *uncached_blocks == 0 { - return Ok(()) - } let start = base + cache.len() as u64; - let batch_size = cmp::min(*uncached_blocks, cache_capacity); - let mut loaded = 0; + let batch_size = cache_capacity; for i in 0..batch_size as u64 { match load_death_row_from_db::(db, start + i)? { Some(row) => { cache.push_back(row); - loaded += 1; }, - // block may added to the queue but not commit into the db yet, if there are - // data missing in the db `load_death_row_from_db` should return a db error None => break, } } - *uncached_blocks -= loaded; Ok(()) } - /// Get the block in the given index of the queue, `base` is the block number of the - /// first block of the queue - fn get( - &mut self, - base: u64, - index: usize, - ) -> Result>, Error> { - match self { - DeathRowQueue::DbBacked { db, uncached_blocks, cache, cache_capacity } => { - // check if `index` target a block reside on disk - if index >= cache.len() && index < cache.len() + *uncached_blocks { - // if `index` target the next batch of `DeathRow`, load a batch from db - if index - cache.len() < cmp::min(*uncached_blocks, *cache_capacity) { - DeathRowQueue::load_batch_from_db( - db, - uncached_blocks, - cache, - base, - *cache_capacity, - )?; - } else { - // load a single `DeathRow` from db, but do not insert it to `cache` - // because `cache` is a queue of successive `DeathRow` - // NOTE: this branch should not be entered because blocks are visited - // in successive increasing order, just keeping it for robustness - return load_death_row_from_db(db, base + index as u64) - } - } - Ok(cache.get(index).cloned()) - }, - DeathRowQueue::Mem { death_rows, .. } => Ok(death_rows.get(index).cloned()), - } - } - /// Check if the block at the given `index` of the queue exist - /// it is the caller's responsibility to ensure `index` won't be out of bound + /// it is the caller's responsibility to ensure `index` won't be out of bounds fn have_block(&self, hash: &BlockHash, index: usize) -> HaveBlock { match self { DeathRowQueue::DbBacked { cache, .. } => { if cache.len() > index { (cache[index].hash == *hash).into() } else { - // the block not exist in `cache`, but it may exist in the unload - // blocks - HaveBlock::MayHave + // The block is not in the cache but it still may exist on disk. + HaveBlock::Maybe } }, DeathRowQueue::Mem { death_rows, .. } => (death_rows[index].hash == *hash).into(), @@ -307,11 +209,10 @@ impl DeathRowQueue { } /// Return the number of block in the pruning window - fn len(&self) -> usize { + fn len(&self, base: u64) -> u64 { match self { - DeathRowQueue::DbBacked { uncached_blocks, cache, .. } => - cache.len() + *uncached_blocks, - DeathRowQueue::Mem { death_rows, .. } => death_rows.len(), + DeathRowQueue::DbBacked { last, .. } => last.map_or(0, |l| l + 1 - base), + DeathRowQueue::Mem { death_rows, .. } => death_rows.len() as u64, } } @@ -326,10 +227,11 @@ impl DeathRowQueue { } #[cfg(test)] - fn get_db_backed_queue_state(&self) -> Option<(&VecDeque>, usize)> { + fn get_db_backed_queue_state( + &self, + ) -> Option<(&VecDeque>, Option)> { match self { - DeathRowQueue::DbBacked { cache, uncached_blocks, .. } => - Some((cache, *uncached_blocks)), + DeathRowQueue::DbBacked { cache, last, .. } => Some((cache, *last)), DeathRowQueue::Mem { .. } => None, } } @@ -369,20 +271,20 @@ fn to_journal_key(block: u64) -> Vec { /// The result return by `RefWindow::have_block` #[derive(Debug, PartialEq, Eq)] pub enum HaveBlock { - /// Definitely not having this block - NotHave, - /// May or may not have this block, need futher checking - MayHave, - /// Definitely having this block - Have, + /// Definitely don't have this block. + No, + /// May or may not have this block, need further checking + Maybe, + /// Definitely has this block + Yes, } impl From for HaveBlock { fn from(have: bool) -> Self { if have { - HaveBlock::Have + HaveBlock::Yes } else { - HaveBlock::NotHave + HaveBlock::No } } } @@ -409,37 +311,36 @@ impl RefWindow { let queue = if count_insertions { DeathRowQueue::new_mem(&db, base)? } else { - let unload = match last_canonicalized_number { + let last = match last_canonicalized_number { Some(last_canonicalized_number) => { debug_assert!(last_canonicalized_number + 1 >= base); - last_canonicalized_number + 1 - base + Some(last_canonicalized_number) }, // None means `LAST_CANONICAL` is never been wrote, since the pruning journals are // in the same `CommitSet` as `LAST_CANONICAL`, it means no pruning journal have // ever been committed to the db, thus set `unload` to zero - None => 0, + None => None, }; - DeathRowQueue::new_db_backed(db, base, unload as usize, window_size)? + DeathRowQueue::new_db_backed(db, base, last, window_size)? }; - Ok(RefWindow { queue, base, pending_canonicalizations: 0, pending_prunings: 0 }) + Ok(RefWindow { queue, base }) } pub fn window_size(&self) -> u64 { - (self.queue.len() - self.pending_prunings) as u64 + self.queue.len(self.base) as u64 } /// Get the hash of the next pruning block pub fn next_hash(&mut self) -> Result, Error> { - let res = match &self.queue { - DeathRowQueue::DbBacked { cache, .. } => - if self.pending_prunings < cache.len() { - cache.get(self.pending_prunings).map(|r| r.hash.clone()) - } else { - self.get(self.pending_prunings)?.map(|r| r.hash) - }, - DeathRowQueue::Mem { death_rows, .. } => - death_rows.get(self.pending_prunings).map(|r| r.hash.clone()), + let res = match &mut self.queue { + DeathRowQueue::DbBacked { db, cache, cache_capacity, .. } => { + if cache.is_empty() { + DeathRowQueue::load_batch_from_db(db, cache, self.base, *cache_capacity)?; + } + cache.front().map(|r| r.hash.clone()) + }, + DeathRowQueue::Mem { death_rows, .. } => death_rows.front().map(|r| r.hash.clone()), }; Ok(res) } @@ -448,68 +349,34 @@ impl RefWindow { 0 } - // Return the block number of the first block that not been pending pruned - pub fn pending(&self) -> u64 { - self.base + self.pending_prunings as u64 - } - fn is_empty(&self) -> bool { - self.queue.len() <= self.pending_prunings + self.window_size() == 0 } // Check if a block is in the pruning window and not be pruned yet pub fn have_block(&self, hash: &BlockHash, number: u64) -> HaveBlock { // if the queue is empty or the block number exceed the pruning window, we definitely // do not have this block - if self.is_empty() || - number < self.pending() || - number >= self.base + self.queue.len() as u64 - { - return HaveBlock::NotHave + if self.is_empty() || number < self.base || number >= self.base + self.window_size() { + return HaveBlock::No } self.queue.have_block(hash, (number - self.base) as usize) } - fn get(&mut self, index: usize) -> Result>, Error> { - if index >= self.queue.len() { - return Ok(None) - } - match self.queue.get(self.base, index)? { - None => { - if matches!(self.queue, DeathRowQueue::DbBacked { .. }) && - // whether trying to get a pending canonicalize block which may not commit to the db yet - index >= self.queue.len() - self.pending_canonicalizations - { - trace!(target: "state-db", "Trying to get a pending canonicalize block that not commit to the db yet"); - Err(Error::StateDb(StateDbError::BlockUnavailable)) - } else { - // A block of the queue is missing, this may happen if `CommitSet` are commit to - // db concurrently and calling `apply_pending/revert_pending` out of order, this - // should not happen under current implementation but keeping it as a defensive - error!(target: "state-db", "Block record is missing from the pruning window, block number {}", self.base + index as u64); - Err(Error::StateDb(StateDbError::BlockMissing)) - } - }, - s => Ok(s), - } - } - /// Prune next block. Expects at least one block in the window. Adds changes to `commit`. pub fn prune_one(&mut self, commit: &mut CommitSet) -> Result<(), Error> { - if let Some(pruned) = self.get(self.pending_prunings)? { + if let Some(pruned) = self.queue.pop_front(self.base)? { trace!(target: "state-db", "Pruning {:?} ({} deleted)", pruned.hash, pruned.deleted.len()); - let index = self.base + self.pending_prunings as u64; + let index = self.base; commit.data.deleted.extend(pruned.deleted.into_iter()); commit.meta.inserted.push((to_meta_key(LAST_PRUNED, &()), index.encode())); - commit - .meta - .deleted - .push(to_journal_key(self.base + self.pending_prunings as u64)); - self.pending_prunings += 1; + commit.meta.deleted.push(to_journal_key(self.base)); + self.base += 1; + Ok(()) } else { - warn!(target: "state-db", "Trying to prune when there's nothing to prune"); + trace!(target: "state-db", "Trying to prune when there's nothing to prune"); + Err(Error::StateDb(StateDbError::BlockUnavailable)) } - Ok(()) } /// Add a change set to the window. Creates a journal record and pushes it to `commit` @@ -519,10 +386,10 @@ impl RefWindow { number: u64, commit: &mut CommitSet, ) -> Result<(), Error> { - if self.base == 0 && self.queue.len() == 0 && number > 0 { + if self.base == 0 && self.is_empty() && number > 0 { // assume that parent was canonicalized self.base = number; - } else if (self.base + self.queue.len() as u64) != number { + } else if (self.base + self.window_size()) != number { return Err(Error::StateDb(StateDbError::InvalidBlockNumber)) } trace!(target: "state-db", "Adding to pruning window: {:?} ({} inserted, {} deleted)", hash, commit.data.inserted.len(), commit.data.deleted.len()); @@ -531,38 +398,12 @@ impl RefWindow { } else { Default::default() }; - let deleted = ::std::mem::take(&mut commit.data.deleted); + let deleted = std::mem::take(&mut commit.data.deleted); let journal_record = JournalRecord { hash: hash.clone(), inserted, deleted }; commit.meta.inserted.push((to_journal_key(number), journal_record.encode())); - self.queue.import(self.base, journal_record); - self.pending_canonicalizations += 1; + self.queue.import(self.base, number, journal_record); Ok(()) } - - /// Apply all pending changes - pub fn apply_pending(&mut self) { - self.pending_canonicalizations = 0; - for _ in 0..self.pending_prunings { - let pruned = self - .queue - .pop_front(self.base) - // NOTE: `pop_front` should not return `MetaDb::Error` because blocks are visited - // by `RefWindow::prune_one` first then `RefWindow::apply_pending` and - // `DeathRowQueue::get` should load the blocks into cache already - .expect("block must loaded in cache thus no MetaDb::Error") - .expect("pending_prunings is always < queue.len()"); - trace!(target: "state-db", "Applying pruning {:?} ({} deleted)", pruned.hash, pruned.deleted.len()); - self.base += 1; - } - self.pending_prunings = 0; - } - - /// Revert all pending changes - pub fn revert_pending(&mut self) { - self.queue.revert_recent_add(self.base, self.pending_canonicalizations); - self.pending_canonicalizations = 0; - self.pending_prunings = 0; - } } #[cfg(test)] @@ -601,13 +442,14 @@ mod tests { let mut pruning: RefWindow = RefWindow::new(db, DEFAULT_MAX_BLOCK_CONSTRAINT, true).unwrap(); let mut commit = CommitSet::default(); - pruning.prune_one(&mut commit).unwrap(); + assert_eq!( + Err(Error::StateDb(StateDbError::BlockUnavailable)), + pruning.prune_one(&mut commit) + ); assert_eq!(pruning.base, 0); let (death_rows, death_index) = pruning.queue.get_mem_queue_state().unwrap(); assert!(death_rows.is_empty()); assert!(death_index.is_empty()); - assert!(pruning.pending_prunings == 0); - assert!(pruning.pending_canonicalizations == 0); } #[test] @@ -619,9 +461,8 @@ mod tests { let hash = H256::random(); pruning.note_canonical(&hash, 0, &mut commit).unwrap(); db.commit(&commit); - assert_eq!(pruning.have_block(&hash, 0), HaveBlock::Have); - pruning.apply_pending(); - assert_eq!(pruning.have_block(&hash, 0), HaveBlock::Have); + assert_eq!(pruning.have_block(&hash, 0), HaveBlock::Yes); + assert_eq!(pruning.have_block(&hash, 0), HaveBlock::Yes); assert!(commit.data.deleted.is_empty()); let (death_rows, death_index) = pruning.queue.get_mem_queue_state().unwrap(); assert_eq!(death_rows.len(), 1); @@ -631,10 +472,9 @@ mod tests { let mut commit = CommitSet::default(); pruning.prune_one(&mut commit).unwrap(); - assert_eq!(pruning.have_block(&hash, 0), HaveBlock::NotHave); + assert_eq!(pruning.have_block(&hash, 0), HaveBlock::No); db.commit(&commit); - pruning.apply_pending(); - assert_eq!(pruning.have_block(&hash, 0), HaveBlock::NotHave); + assert_eq!(pruning.have_block(&hash, 0), HaveBlock::No); assert!(db.data_eq(&make_db(&[2, 4, 5]))); let (death_rows, death_index) = pruning.queue.get_mem_queue_state().unwrap(); assert!(death_rows.is_empty()); @@ -653,7 +493,6 @@ mod tests { let mut commit = make_commit(&[5], &[2]); pruning.note_canonical(&H256::random(), 1, &mut commit).unwrap(); db.commit(&commit); - pruning.apply_pending(); assert!(db.data_eq(&make_db(&[1, 2, 3, 4, 5]))); check_journal(&pruning, &db); @@ -661,12 +500,10 @@ mod tests { let mut commit = CommitSet::default(); pruning.prune_one(&mut commit).unwrap(); db.commit(&commit); - pruning.apply_pending(); assert!(db.data_eq(&make_db(&[2, 3, 4, 5]))); let mut commit = CommitSet::default(); pruning.prune_one(&mut commit).unwrap(); db.commit(&commit); - pruning.apply_pending(); assert!(db.data_eq(&make_db(&[3, 4, 5]))); assert_eq!(pruning.base, 2); } @@ -690,7 +527,6 @@ mod tests { let mut commit = CommitSet::default(); pruning.prune_one(&mut commit).unwrap(); db.commit(&commit); - pruning.apply_pending(); assert!(db.data_eq(&make_db(&[3, 4, 5]))); assert_eq!(pruning.base, 2); } @@ -710,7 +546,6 @@ mod tests { pruning.note_canonical(&H256::random(), 2, &mut commit).unwrap(); db.commit(&commit); assert!(db.data_eq(&make_db(&[1, 2, 3]))); - pruning.apply_pending(); check_journal(&pruning, &db); @@ -725,7 +560,6 @@ mod tests { pruning.prune_one(&mut commit).unwrap(); db.commit(&commit); assert!(db.data_eq(&make_db(&[1, 3]))); - pruning.apply_pending(); assert_eq!(pruning.base, 3); } @@ -756,7 +590,6 @@ mod tests { pruning.prune_one(&mut commit).unwrap(); db.commit(&commit); assert!(db.data_eq(&make_db(&[1, 3]))); - pruning.apply_pending(); assert_eq!(pruning.base, 3); } @@ -775,7 +608,6 @@ mod tests { pruning.note_canonical(&H256::random(), 2, &mut commit).unwrap(); db.commit(&commit); assert!(db.data_eq(&make_db(&[1, 2, 3]))); - pruning.apply_pending(); check_journal(&pruning, &db); @@ -861,9 +693,9 @@ mod tests { let cache_capacity = DEFAULT_MAX_BLOCK_CONSTRAINT as usize; // start as an empty queue - let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap(); + let (cache, last) = pruning.queue.get_db_backed_queue_state().unwrap(); assert_eq!(cache.len(), 0); - assert_eq!(uncached_blocks, 0); + assert_eq!(last, None); // import blocks // queue size and content should match @@ -872,21 +704,19 @@ mod tests { pruning.note_canonical(&(i as u64), i as u64, &mut commit).unwrap(); push_last_canonicalized(i as u64, &mut commit); db.commit(&commit); - // block will fill in cache first - let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap(); + // blocks will fill the cache first + let (cache, last) = pruning.queue.get_db_backed_queue_state().unwrap(); if i < cache_capacity { assert_eq!(cache.len(), i + 1); - assert_eq!(uncached_blocks, 0); } else { assert_eq!(cache.len(), cache_capacity); - assert_eq!(uncached_blocks, i - cache_capacity + 1); } + assert_eq!(last, Some(i as u64)); } - pruning.apply_pending(); - assert_eq!(pruning.queue.len(), cache_capacity + 10); - let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap(); + assert_eq!(pruning.window_size(), cache_capacity as u64 + 10); + let (cache, last) = pruning.queue.get_db_backed_queue_state().unwrap(); assert_eq!(cache.len(), cache_capacity); - assert_eq!(uncached_blocks, 10); + assert_eq!(last, Some(cache_capacity as u64 + 10 - 1)); for i in 0..cache_capacity { assert_eq!(cache[i].hash, i as u64); } @@ -897,29 +727,26 @@ mod tests { pruning .note_canonical(&(cache_capacity as u64 + 10), cache_capacity as u64 + 10, &mut commit) .unwrap(); - assert_eq!(pruning.queue.len(), cache_capacity + 11); - let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap(); + assert_eq!(pruning.window_size(), cache_capacity as u64 + 11); + let (cache, _) = pruning.queue.get_db_backed_queue_state().unwrap(); assert_eq!(cache.len(), cache_capacity); - assert_eq!(uncached_blocks, 11); // revert the last add that no apply yet // NOTE: do not commit the previous `CommitSet` to db - pruning.revert_pending(); - assert_eq!(pruning.queue.len(), cache_capacity + 10); - let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap(); + pruning = RefWindow::new(db.clone(), DEFAULT_MAX_BLOCK_CONSTRAINT, false).unwrap(); + let cache_capacity = DEFAULT_MAX_BLOCK_CONSTRAINT as usize; + assert_eq!(pruning.window_size(), cache_capacity as u64 + 10); + let (cache, _) = pruning.queue.get_db_backed_queue_state().unwrap(); assert_eq!(cache.len(), cache_capacity); - assert_eq!(uncached_blocks, 10); // remove one block from the start of the queue // block is removed from the head of cache let mut commit = CommitSet::default(); pruning.prune_one(&mut commit).unwrap(); db.commit(&commit); - pruning.apply_pending(); - assert_eq!(pruning.queue.len(), cache_capacity + 9); - let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap(); + assert_eq!(pruning.window_size(), cache_capacity as u64 + 9); + let (cache, _) = pruning.queue.get_db_backed_queue_state().unwrap(); assert_eq!(cache.len(), cache_capacity - 1); - assert_eq!(uncached_blocks, 10); for i in 0..(cache_capacity - 1) { assert_eq!(cache[i].hash, (i + 1) as u64); } @@ -928,10 +755,9 @@ mod tests { // `cache` is full again but the content of the queue should be the same let pruning: RefWindow = RefWindow::new(db, DEFAULT_MAX_BLOCK_CONSTRAINT, false).unwrap(); - assert_eq!(pruning.queue.len(), cache_capacity + 9); - let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap(); + assert_eq!(pruning.window_size(), cache_capacity as u64 + 9); + let (cache, _) = pruning.queue.get_db_backed_queue_state().unwrap(); assert_eq!(cache.len(), cache_capacity); - assert_eq!(uncached_blocks, 9); for i in 0..cache_capacity { assert_eq!(cache[i].hash, (i + 1) as u64); } @@ -952,24 +778,13 @@ mod tests { db.commit(&commit); } - // the following operations won't triger loading block from db: + // the following operations won't trigger loading block from db: // - getting block in cache // - getting block not in the queue - let index = cache_capacity; - assert_eq!( - pruning.queue.get(0, index - 1).unwrap().unwrap().hash, - cache_capacity as u64 - 1 - ); - assert_eq!(pruning.queue.get(0, cache_capacity * 2 + 10).unwrap(), None); - let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap(); + assert_eq!(pruning.next_hash().unwrap().unwrap(), 0); + let (cache, last) = pruning.queue.get_db_backed_queue_state().unwrap(); assert_eq!(cache.len(), cache_capacity); - assert_eq!(uncached_blocks, cache_capacity + 10); - - // getting a block not in cache will triger loading block from db - assert_eq!(pruning.queue.get(0, index).unwrap().unwrap().hash, cache_capacity as u64); - let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap(); - assert_eq!(cache.len(), cache_capacity * 2); - assert_eq!(uncached_blocks, 10); + assert_eq!(last, Some(cache_capacity as u64 * 2 + 10 - 1)); // clear all block loaded in cache for _ in 0..cache_capacity * 2 { @@ -977,29 +792,22 @@ mod tests { pruning.prune_one(&mut commit).unwrap(); db.commit(&commit); } - pruning.apply_pending(); - let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap(); + let (cache, _) = pruning.queue.get_db_backed_queue_state().unwrap(); assert!(cache.is_empty()); - assert_eq!(uncached_blocks, 10); - // getting the hash of block that not in cache will also triger loading + // getting the hash of block that not in cache will also trigger loading // the remaining blocks from db - assert_eq!( - pruning.queue.get(pruning.base, 0).unwrap().unwrap().hash, - (cache_capacity * 2) as u64 - ); - let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap(); + assert_eq!(pruning.next_hash().unwrap().unwrap(), (cache_capacity * 2) as u64); + let (cache, _) = pruning.queue.get_db_backed_queue_state().unwrap(); assert_eq!(cache.len(), 10); - assert_eq!(uncached_blocks, 0); // load a new queue from db // `cache` should be the same let pruning: RefWindow = RefWindow::new(db, DEFAULT_MAX_BLOCK_CONSTRAINT, false).unwrap(); - assert_eq!(pruning.queue.len(), 10); - let (cache, uncached_blocks) = pruning.queue.get_db_backed_queue_state().unwrap(); + assert_eq!(pruning.window_size(), 10); + let (cache, _) = pruning.queue.get_db_backed_queue_state().unwrap(); assert_eq!(cache.len(), 10); - assert_eq!(uncached_blocks, 0); for i in 0..10 { assert_eq!(cache[i].hash, (cache_capacity * 2 + i) as u64); } @@ -1030,11 +838,8 @@ mod tests { assert_eq!(pruning.next_hash().unwrap(), Some(i)); pruning.prune_one(&mut commit).unwrap(); } - // return `BlockUnavailable` for block that did not commit to db - assert_eq!( - pruning.next_hash().unwrap_err(), - Error::StateDb(StateDbError::BlockUnavailable) - ); + // return `None` for block that did not commit to db + assert_eq!(pruning.next_hash().unwrap(), None); assert_eq!( pruning.prune_one(&mut commit).unwrap_err(), Error::StateDb(StateDbError::BlockUnavailable) @@ -1044,12 +849,5 @@ mod tests { assert_eq!(pruning.next_hash().unwrap(), Some(index)); pruning.prune_one(&mut commit).unwrap(); db.commit(&commit); - - // import a block and do not commit it to db before calling `apply_pending` - pruning - .note_canonical(&(index + 1), index + 1, &mut make_commit(&[], &[])) - .unwrap(); - pruning.apply_pending(); - assert_eq!(pruning.next_hash().unwrap_err(), Error::StateDb(StateDbError::BlockMissing)); } }