diff --git a/Cargo.lock b/Cargo.lock index 084fb8716c4..be5cc65f3a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2042,6 +2042,7 @@ dependencies = [ "rocksdb", "serde", "strum", + "strum_macros", "thiserror", "tracing", ] diff --git a/chain/chain/Cargo.toml b/chain/chain/Cargo.toml index 68e9e0a5840..887196cbb3b 100644 --- a/chain/chain/Cargo.toml +++ b/chain/chain/Cargo.toml @@ -18,6 +18,7 @@ num-rational = "0.2.4" tracing = "0.1.13" thiserror = "1.0" strum = "0.18" +strum_macros = "0.18" borsh = "0.6.2" diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index a6ea772a570..da025a8688c 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -952,13 +952,17 @@ impl Chain { chain_store_update.commit()?; // clear all trie data - let mut store_update = StoreUpdate::new_with_tries(self.runtime_adapter.get_tries()); - let stored_state = self.store().store().iter_prefix(ColState, &[]); - for (key, _) in stored_state { + let keys: Vec> = + self.store().store().iter_prefix(ColState, &[]).map(|kv| kv.0.into()).collect(); + let tries = self.runtime_adapter.get_tries(); + let mut chain_store_update = self.mut_store().store_update(); + let mut store_update = StoreUpdate::new_with_tries(tries); + for key in keys.iter() { store_update.delete(ColState, key.as_ref()); + chain_store_update.inc_gc_col_state(); } - let mut chain_store_update = self.mut_store().store_update(); chain_store_update.merge(store_update); + // The reason to reset tail here is not to allow Tail be greater than Head chain_store_update.reset_tail(); chain_store_update.commit()?; @@ -1793,12 +1797,9 @@ impl Chain { sync_hash: CryptoHash, num_parts: u64, ) -> Result<(), Error> { - let mut store_update = self.store.owned_store().store_update(); - for part_id in 0..num_parts { - let key = StatePartKey(sync_hash, shard_id, part_id).try_to_vec()?; - store_update.delete(ColStateParts, &key); - } - Ok(store_update.commit()?) + let mut chain_store_update = self.mut_store().store_update(); + chain_store_update.gc_col_state_parts(sync_hash, shard_id, num_parts)?; + Ok(chain_store_update.commit()?) } /// Apply transactions in chunks for the next epoch in blocks that were blocked on the state sync diff --git a/chain/chain/src/store.rs b/chain/chain/src/store.rs index f51c0945649..b7e8dc2d3dd 100644 --- a/chain/chain/src/store.rs +++ b/chain/chain/src/store.rs @@ -8,6 +8,7 @@ use borsh::{BorshDeserialize, BorshSerialize}; use cached::{Cached, SizedCache}; use chrono::Utc; use serde::Serialize; +use strum::IntoEnumIterator; use tracing::debug; use near_primitives::block::{Approval, Tip}; @@ -35,19 +36,47 @@ use near_store::{ ColEpochLightClientBlocks, ColIncomingReceipts, ColInvalidChunks, ColLastBlockWithNewChunk, ColNextBlockHashes, ColNextBlockWithNewChunk, ColOutgoingReceipts, ColPartialChunks, ColReceiptIdToShardId, ColStateChanges, ColStateDlInfos, ColStateHeaders, ColTransactionResult, - ColTransactions, ColTrieChanges, KeyForStateChanges, ShardTries, Store, StoreUpdate, + ColTransactions, ColTrieChanges, DBCol, KeyForStateChanges, ShardTries, Store, StoreUpdate, TrieChanges, WrappedTrieChanges, CHUNK_TAIL_KEY, HEADER_HEAD_KEY, HEAD_KEY, - LARGEST_TARGET_HEIGHT_KEY, LATEST_KNOWN_KEY, SYNC_HEAD_KEY, TAIL_KEY, + LARGEST_TARGET_HEIGHT_KEY, LATEST_KNOWN_KEY, NUM_COLS, SYNC_HEAD_KEY, TAIL_KEY, }; use crate::byzantine_assert; use crate::error::{Error, ErrorKind}; -use crate::types::{Block, BlockHeader, LatestKnown, ReceiptProofResponse, ReceiptResponse}; +use crate::types::{ + Block, BlockHeader, LatestKnown, ReceiptProofResponse, ReceiptResponse, StatePartKey, +}; /// lru cache size const CACHE_SIZE: usize = 100; const CHUNK_CACHE_SIZE: usize = 1024; +lazy_static! { + pub static ref SHOULD_COL_GC: Vec = { + let mut col_gc = vec![true; NUM_COLS]; + col_gc[DBCol::ColDbVersion as usize] = false; // DB version is unrelated to GC + col_gc[DBCol::ColBlockMisc as usize] = false; + col_gc[DBCol::ColBlockHeader as usize] = false; // header sync needs headers + col_gc[DBCol::ColGCCount as usize] = false; // GC count it self isn't GCed + col_gc[DBCol::ColBlockHeight as usize] = false; // block sync needs it + genesis should be accessible + col_gc[DBCol::ColTransactionResult as usize] = false; + col_gc[DBCol::ColPeers as usize] = false; // Peers is unrelated to GC + col_gc[DBCol::ColEpochInfo as usize] = false; + col_gc[DBCol::ColStateDlInfos as usize] = false; + col_gc[DBCol::ColBlockInfo as usize] = false; + col_gc[DBCol::ColBlockMerkleTree as usize] = false; + col_gc[DBCol::ColEpochStart as usize] = false; + col_gc[DBCol::ColAccountAnnouncements as usize] = false; + col_gc[DBCol::ColEpochLightClientBlocks as usize] = false; + col_gc[DBCol::ColLastBlockWithNewChunk as usize] = false; + col_gc[DBCol::ColPeerComponent as usize] = false; // Peer related info doesn't GC + col_gc[DBCol::LastComponentNonce as usize] = false; + col_gc[DBCol::ColComponentEdges as usize] = false; + col_gc[DBCol::ColBlockOrdinal as usize] = false; + col_gc + }; +} + #[derive(Debug, PartialEq, BorshSerialize, BorshDeserialize, Serialize)] pub struct ShardInfo(pub ShardId, pub ChunkHash); @@ -1064,6 +1093,8 @@ impl ChainStoreCacheUpdate { } } +type GCCount = u64; + /// Provides layer to update chain without touching the underlying database. /// This serves few purposes, main one is that even if executable exists/fails during update the database is in consistent state. pub struct ChainStoreUpdate<'a> { @@ -1086,6 +1117,7 @@ pub struct ChainStoreUpdate<'a> { add_state_dl_infos: Vec, remove_state_dl_infos: Vec, challenged_blocks: HashSet, + gc_count: Vec, } impl<'a> ChainStoreUpdate<'a> { @@ -1107,6 +1139,7 @@ impl<'a> ChainStoreUpdate<'a> { add_state_dl_infos: vec![], remove_state_dl_infos: vec![], challenged_blocks: HashSet::default(), + gc_count: vec![0; NUM_COLS], } } @@ -1844,8 +1877,6 @@ impl<'a> ChainStoreUpdate<'a> { } pub fn clear_chunk_data(&mut self, min_chunk_height: BlockHeight) -> Result<(), Error> { - let mut store_update = self.store().store_update(); - let chunk_tail = self.chunk_tail()?; for height in chunk_tail..min_chunk_height { if height == self.get_genesis_height() { @@ -1856,41 +1887,24 @@ impl<'a> ChainStoreUpdate<'a> { // 1. Delete chunk-related data let chunk = self.get_chunk(&chunk_hash)?.clone(); debug_assert_eq!(chunk.header.inner.height_created, height); - // 1a. Delete from receipt_id_to_shard_id (ColReceiptIdToShardId) for receipt in chunk.receipts { - store_update.delete(ColReceiptIdToShardId, receipt.receipt_id.as_ref()); - self.chain_store - .receipt_id_to_shard_id - .cache_remove(&receipt.receipt_id.into()); + self.gc_col(ColReceiptIdToShardId, &receipt.receipt_id.into()); } - // 1b. Delete from ColTransactions for transaction in chunk.transactions { - store_update.delete(ColTransactions, transaction.get_hash().as_ref()); - self.chain_store.transactions.cache_remove(&transaction.get_hash().into()); + self.gc_col(ColTransactions, &transaction.get_hash().into()); } // 2. Delete chunk_hash-indexed data let chunk_header_hash = chunk_hash.clone().into(); - let chunk_header_hash_ref = chunk_hash.as_ref(); - // 2a. Delete chunks (ColChunks) - store_update.delete(ColChunks, chunk_header_hash_ref); - self.chain_store.chunks.cache_remove(&chunk_header_hash); - // 2b. Delete chunk extras (ColChunkExtra) - store_update.delete(ColChunkExtra, chunk_header_hash_ref); - self.chain_store.chunk_extras.cache_remove(&chunk_header_hash); - // 2c. Delete partial_chunks (ColPartialChunks) - store_update.delete(ColPartialChunks, chunk_header_hash_ref); - self.chain_store.partial_chunks.cache_remove(&chunk_header_hash); - // 2d. Delete invalid chunks (ColInvalidChunks) - store_update.delete(ColInvalidChunks, chunk_header_hash_ref); - self.chain_store.invalid_chunks.cache_remove(&chunk_header_hash); + self.gc_col(ColChunks, &chunk_header_hash); + self.gc_col(ColChunkExtra, &chunk_header_hash); + self.gc_col(ColPartialChunks, &chunk_header_hash); + self.gc_col(ColInvalidChunks, &chunk_header_hash); } // 3. Delete chunks_tail-related data - // 3a. Delete from ColChunkHashesByHeight - store_update.delete(ColChunkHashesByHeight, &index_to_bytes(height)); + self.gc_col(ColChunkHashesByHeight, &index_to_bytes(height)); } self.update_chunk_tail(min_chunk_height); - self.merge(store_update); Ok(()) } @@ -1915,10 +1929,11 @@ impl<'a> ChainStoreUpdate<'a> { tries .revert_insertions(&trie_changes, shard_id, &mut store_update) .map(|_| { - store_update.delete( + self.gc_col( ColTrieChanges, &get_block_shard_id(&block_hash, shard_id), ); + self.inc_gc_col_state(); }) .map_err(|err| ErrorKind::Other(err.to_string())) }) @@ -1934,10 +1949,11 @@ impl<'a> ChainStoreUpdate<'a> { tries .apply_deletions(&trie_changes, shard_id, &mut store_update) .map(|_| { - store_update.delete( + self.gc_col( ColTrieChanges, &get_block_shard_id(&block_hash, shard_id), ); + self.inc_gc_col_state(); }) .map_err(|err| ErrorKind::Other(err.to_string())) }) @@ -1949,7 +1965,7 @@ impl<'a> ChainStoreUpdate<'a> { GCMode::StateSync => { // Not apply the data from ColTrieChanges for shard_id in 0..header.chunk_mask().len() as ShardId { - store_update.delete(ColTrieChanges, &get_block_shard_id(&block_hash, shard_id)); + self.gc_col(ColTrieChanges, &get_block_shard_id(&block_hash, shard_id)); } } } @@ -1971,92 +1987,45 @@ impl<'a> ChainStoreUpdate<'a> { // 2. Delete shard_id-indexed data (shards, receipts, transactions) for shard_id in 0..block.header().chunk_mask().len() as ShardId { - // 2a. Delete outgoing receipts (ColOutgoingReceipts) - store_update.delete(ColOutgoingReceipts, &get_block_shard_id(&block_hash, shard_id)); - self.chain_store - .outgoing_receipts - .cache_remove(&get_block_shard_id(&block_hash, shard_id)); - // 2b. Delete incoming receipts (ColIncomingReceipts) - store_update.delete(ColIncomingReceipts, &get_block_shard_id(&block_hash, shard_id)); - self.chain_store - .incoming_receipts - .cache_remove(&get_block_shard_id(&block_hash, shard_id)); - // 2c. Delete from chunk_hash_per_height_shard (ColChunkPerHeightShard) - store_update.delete(ColChunkPerHeightShard, &get_height_shard_id(height, shard_id)); - self.chain_store - .chunk_hash_per_height_shard - .cache_remove(&get_height_shard_id(height, shard_id)); - // 2d. Delete from next_block_with_new_chunk (ColNextBlockWithNewChunk) - store_update - .delete(ColNextBlockWithNewChunk, &get_block_shard_id(&block_hash, shard_id)); - self.chain_store - .next_block_with_new_chunk - .cache_remove(&get_block_shard_id(&block_hash, shard_id)); - // 2e. Delete from ColStateHeaders + let height_shard_id = get_block_shard_id(&block_hash, shard_id); + self.gc_col(ColOutgoingReceipts, &height_shard_id); + self.gc_col(ColIncomingReceipts, &height_shard_id); + self.gc_col(ColChunkPerHeightShard, &height_shard_id); + self.gc_col(ColNextBlockWithNewChunk, &height_shard_id); let key = StateHeaderKey(shard_id, block_hash).try_to_vec()?; - store_update.delete(ColStateHeaders, &key); - // 2f. Delete from ColStateParts - // Already done, check chain.clear_downloaded_parts() + self.gc_col(ColStateHeaders, &key); } // 3. Delete block_hash-indexed data - let block_hash_ref = block_hash.as_ref(); - // 3a. Delete block (ColBlock) - store_update.delete(ColBlock, block_hash_ref); - self.chain_store.blocks.cache_remove(&block_hash.into()); - // 3b. Delete block header (ColBlockHeader) - don't do because header sync needs headers - // 3c. Delete block extras (ColBlockExtra) - store_update.delete(ColBlockExtra, block_hash_ref); - self.chain_store.block_extras.cache_remove(&block_hash.into()); - // 3d. Delete from next_block_hashes (ColNextBlockHashes) - store_update.delete(ColNextBlockHashes, block_hash_ref); - self.chain_store.next_block_hashes.cache_remove(&block_hash.into()); - // 3e. Delete from ColChallengedBlocks - store_update.delete(ColChallengedBlocks, block_hash_ref); - // 3f. Delete from ColBlocksToCatchup - store_update.delete(ColBlocksToCatchup, block_hash_ref); - // 3g. Delete from KV state changes + let block_hash_vec: Vec = block_hash.as_ref().into(); + self.gc_col(ColBlock, &block_hash_vec); + self.gc_col(ColBlockExtra, &block_hash_vec); + self.gc_col(ColNextBlockHashes, &block_hash_vec); + self.gc_col(ColChallengedBlocks, &block_hash_vec); + self.gc_col(ColBlocksToCatchup, &block_hash_vec); let storage_key = KeyForStateChanges::get_prefix(&block_hash); - // 3g1. We should collect all the keys which key prefix equals to `block_hash` - let stored_state_changes = - self.chain_store.store().iter_prefix(ColStateChanges, storage_key.as_ref()); - // 3g2. Remove from ColStateChanges all found State Changes - for (key, _) in stored_state_changes { - store_update.delete(ColStateChanges, key.as_ref()); + let stored_state_changes: Vec> = self + .chain_store + .store() + .iter_prefix(ColStateChanges, storage_key.as_ref()) + .map(|key| key.0.into()) + .collect(); + for key in stored_state_changes { + self.gc_col(ColStateChanges, &key); } - // 3h. Delete from ColBlockRefCount - store_update.delete(ColBlockRefCount, block_hash_ref); - self.chain_store.block_refcounts.cache_remove(&block_hash.into()); + self.gc_col(ColBlockRefCount, &block_hash_vec); + + // 4. Update or delete block_hash_per_height + self.gc_col_block_per_height(&block_hash, height, &block.header().epoch_id())?; match gc_mode { GCMode::Fork(_) => { - // 4. Forks only clearing - // 4a. Update block_hash_per_height - let epoch_to_hashes_ref = self.get_all_block_hashes_by_height(height)?; - let mut epoch_to_hashes = epoch_to_hashes_ref.clone(); - let hashes = epoch_to_hashes - .get_mut(&block.header().epoch_id()) - .expect("current epoch id should exist"); - hashes.remove(&block_hash); - store_update.set_ser( - ColBlockPerHeight, - &index_to_bytes(height), - &epoch_to_hashes, - )?; - self.chain_store - .block_hash_per_height - .cache_set(index_to_bytes(height), epoch_to_hashes); - // 4b. Decreasing block refcount + // 5. Forks only clearing self.dec_block_refcount(block.header().prev_hash())?; } GCMode::Canonical(_) => { - // 5. Canonical Chain clearing - // 5a. Delete blocks with current height (ColBlockPerHeight) - store_update.delete(ColBlockPerHeight, &index_to_bytes(height)); - self.chain_store.block_hash_per_height.cache_remove(&index_to_bytes(height)); - // 5b. Delete from ColBlockHeight - don't do because: block sync needs it + genesis should be accessible - - // 6. Delete chunks and chunk-indexed data + // 6. Canonical Chain only clearing + // Delete chunks and chunk-indexed data let mut min_chunk_height = self.tail()?; for chunk_header in block.chunks() { if min_chunk_height > chunk_header.inner.height_created { @@ -2067,11 +2036,6 @@ impl<'a> ChainStoreUpdate<'a> { } GCMode::StateSync => { // 7. State Sync clearing - // 7a. Delete blocks with current height (ColBlockPerHeight) - store_update.delete(ColBlockPerHeight, &index_to_bytes(height)); - self.chain_store.block_hash_per_height.cache_remove(&index_to_bytes(height)); - // 7b. Delete from ColBlockHeight - don't do because: block sync needs it + genesis should be accessible - // Chunks deleted separately } }; @@ -2079,6 +2043,184 @@ impl<'a> ChainStoreUpdate<'a> { Ok(()) } + fn get_gc_count(&self, col: DBCol) -> GCCount { + if self.gc_count[col as usize] != 0 { + self.gc_count[col as usize] + } else if let Ok(Some(count)) = self + .store() + .get(DBCol::ColGCCount, &borsh::ser::BorshSerialize::try_to_vec(&col).unwrap()) + { + borsh::de::BorshDeserialize::try_from_slice(&count).unwrap() + } else { + 0 + } + } + + fn inc_gc(&mut self, col: DBCol) { + let new_count = self.get_gc_count(col) + 1; + self.gc_count[col as usize] = new_count; + } + + pub fn gc_col_block_per_height( + &mut self, + block_hash: &CryptoHash, + height: BlockHeight, + epoch_id: &EpochId, + ) -> Result<(), Error> { + let mut store_update = self.store().store_update(); + let epoch_to_hashes_ref = self.get_all_block_hashes_by_height(height)?; + let mut epoch_to_hashes = epoch_to_hashes_ref.clone(); + let hashes = + epoch_to_hashes.get_mut(epoch_id).ok_or("current epoch id should exist".to_string())?; + hashes.remove(&block_hash); + if hashes.is_empty() { + epoch_to_hashes.remove(epoch_id); + } + if epoch_to_hashes.is_empty() { + store_update.delete(ColBlockPerHeight, &index_to_bytes(height)); + self.chain_store.block_hash_per_height.cache_remove(&index_to_bytes(height)); + } else { + store_update.set_ser(ColBlockPerHeight, &index_to_bytes(height), &epoch_to_hashes)?; + self.chain_store + .block_hash_per_height + .cache_set(index_to_bytes(height), epoch_to_hashes); + } + self.inc_gc(DBCol::ColBlockPerHeight); + self.merge(store_update); + Ok(()) + } + + pub fn inc_gc_col_state(&mut self) { + self.inc_gc(DBCol::ColState); + } + + pub fn gc_col_state_parts( + &mut self, + sync_hash: CryptoHash, + shard_id: ShardId, + num_parts: u64, + ) -> Result<(), Error> { + for part_id in 0..num_parts { + let key = StatePartKey(sync_hash, shard_id, part_id).try_to_vec()?; + self.gc_col(DBCol::ColStateParts, &key); + } + Ok(()) + } + + fn gc_col(&mut self, col: DBCol, key: &Vec) { + assert!(SHOULD_COL_GC[col as usize]); + let mut store_update = self.store().store_update(); + match col { + DBCol::ColOutgoingReceipts => { + store_update.delete(col, key); + self.chain_store.outgoing_receipts.cache_remove(key); + } + DBCol::ColIncomingReceipts => { + store_update.delete(col, key); + self.chain_store.incoming_receipts.cache_remove(key); + } + DBCol::ColChunkPerHeightShard => { + store_update.delete(col, key); + self.chain_store.chunk_hash_per_height_shard.cache_remove(key); + } + DBCol::ColNextBlockWithNewChunk => { + store_update.delete(col, key); + self.chain_store.next_block_with_new_chunk.cache_remove(key); + } + DBCol::ColStateHeaders => { + store_update.delete(col, key); + } + DBCol::ColBlock => { + store_update.delete(col, key); + self.chain_store.blocks.cache_remove(key); + } + DBCol::ColBlockExtra => { + store_update.delete(col, key); + self.chain_store.block_extras.cache_remove(key); + } + DBCol::ColNextBlockHashes => { + store_update.delete(col, key); + self.chain_store.next_block_hashes.cache_remove(key); + } + DBCol::ColChallengedBlocks => { + store_update.delete(col, key); + } + DBCol::ColBlocksToCatchup => { + store_update.delete(col, key); + } + DBCol::ColStateChanges => { + store_update.delete(col, key); + } + DBCol::ColBlockRefCount => { + store_update.delete(col, key); + self.chain_store.block_refcounts.cache_remove(key); + } + DBCol::ColReceiptIdToShardId => { + store_update.delete(col, key); + self.chain_store.receipt_id_to_shard_id.cache_remove(key); + } + DBCol::ColTransactions => { + store_update.delete(col, key); + self.chain_store.transactions.cache_remove(key); + } + DBCol::ColChunks => { + store_update.delete(col, key); + self.chain_store.chunks.cache_remove(key); + } + DBCol::ColChunkExtra => { + store_update.delete(col, key); + self.chain_store.chunk_extras.cache_remove(key); + } + DBCol::ColPartialChunks => { + store_update.delete(col, key); + self.chain_store.partial_chunks.cache_remove(key); + } + DBCol::ColInvalidChunks => { + store_update.delete(col, key); + self.chain_store.invalid_chunks.cache_remove(key); + } + DBCol::ColChunkHashesByHeight => { + store_update.delete(col, key); + self.chain_store.chunk_hash_per_height_shard.cache_remove(key); + } + DBCol::ColStateParts => { + store_update.delete(col, key); + } + DBCol::ColState => { + panic!("Actual gc happens elsewhere, call inc_gc_col_state to increase gc count"); + } + DBCol::ColTrieChanges => { + store_update.delete(col, key); + } + DBCol::ColBlockPerHeight => { + panic!("Must use gc_col_glock_per_height method to gc ColBlockPerHeight"); + } + DBCol::ColDbVersion + | DBCol::ColBlockMisc + | DBCol::ColBlockHeader + | DBCol::ColGCCount + | DBCol::ColBlockHeight + | DBCol::ColTransactionResult + | DBCol::ColPeers + | DBCol::ColEpochInfo + | DBCol::ColStateDlInfos + | DBCol::ColBlockInfo + | DBCol::ColBlockMerkleTree + | DBCol::ColEpochStart + | DBCol::ColAccountAnnouncements + | DBCol::ColEpochLightClientBlocks + | DBCol::ColLastBlockWithNewChunk + | DBCol::ColPeerComponent + | DBCol::LastComponentNonce + | DBCol::ColComponentEdges + | DBCol::ColBlockOrdinal => { + unreachable!(); + } + } + self.inc_gc(col); + self.merge(store_update); + } + /// Merge another StoreUpdate into this one pub fn merge(&mut self, store_update: StoreUpdate) { self.store_updates.push(store_update); @@ -2340,6 +2482,13 @@ impl<'a> ChainStoreUpdate<'a> { for (chunk_hash, chunk) in self.chain_store_cache_update.invalid_chunks.iter() { store_update.set_ser(ColInvalidChunks, chunk_hash.as_ref(), chunk)?; } + for col in DBCol::iter() { + store_update.set_ser( + DBCol::ColGCCount, + &borsh::ser::BorshSerialize::try_to_vec(&col).expect("Failed to deserialize DBCol"), + &self.gc_count[col as usize], + )?; + } for other in self.store_updates.drain(..) { store_update.merge(other); } @@ -2476,6 +2625,7 @@ mod tests { use std::sync::Arc; use cached::Cached; + use strum::IntoEnumIterator; use near_chain_configs::GenesisConfig; use near_crypto::KeyType; @@ -2493,6 +2643,8 @@ mod tests { use crate::test_utils::KeyValueRuntime; use crate::{Chain, ChainGenesis, DoomslugThresholdMode}; + use near_store::DBCol; + fn get_chain() -> Chain { get_chain_with_epoch_length(10) } @@ -2757,6 +2909,30 @@ mod tests { } } assert!(check_refcount_map(&mut chain).is_ok()); + + let store_update = chain.mut_store().store_update(); + + let gced_cols = [ + DBCol::ColBlock, + DBCol::ColOutgoingReceipts, + DBCol::ColIncomingReceipts, + DBCol::ColBlocksToCatchup, + DBCol::ColChallengedBlocks, + DBCol::ColStateHeaders, + DBCol::ColBlockExtra, + DBCol::ColBlockPerHeight, + DBCol::ColNextBlockHashes, + DBCol::ColNextBlockWithNewChunk, + DBCol::ColChunkPerHeightShard, + DBCol::ColBlockRefCount, + ]; + for i in DBCol::iter() { + if gced_cols.contains(&i) { + assert!(store_update.get_gc_count(i) == 7); + } else { + assert!(store_update.get_gc_count(i) == 0); + } + } } #[test] diff --git a/core/primitives/src/version.rs b/core/primitives/src/version.rs index cb674b3ac37..213928a281d 100644 --- a/core/primitives/src/version.rs +++ b/core/primitives/src/version.rs @@ -11,7 +11,7 @@ pub struct Version { pub type DbVersion = u32; /// Current version of the database. -pub const DB_VERSION: DbVersion = 1; +pub const DB_VERSION: DbVersion = 2; /// Protocol version type. pub type ProtocolVersion = u32; diff --git a/core/store/src/db.rs b/core/store/src/db.rs index 9902d34bf2f..84060c05cb7 100644 --- a/core/store/src/db.rs +++ b/core/store/src/db.rs @@ -3,6 +3,8 @@ use std::collections::HashMap; use std::io; use std::sync::RwLock; +use borsh::{BorshDeserialize, BorshSerialize}; + use rocksdb::{ BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, Direction, IteratorMode, Options, ReadOptions, WriteBatch, DB, @@ -34,7 +36,7 @@ impl Into for DBError { } } -#[derive(PartialEq, Debug, Copy, Clone, EnumIter)] +#[derive(PartialEq, Debug, Copy, Clone, EnumIter, BorshDeserialize, BorshSerialize)] pub enum DBCol { /// Column to indicate which version of database this is. ColDbVersion = 0, @@ -91,10 +93,12 @@ pub enum DBCol { ColChunkHashesByHeight = 39, /// Block ordinals. ColBlockOrdinal = 40, + /// GC Count for each column + ColGCCount = 41, } // Do not move this line from enum DBCol -const NUM_COLS: usize = 41; +pub const NUM_COLS: usize = 42; impl std::fmt::Display for DBCol { fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { @@ -140,6 +144,7 @@ impl std::fmt::Display for DBCol { Self::ColBlockMerkleTree => "block merkle tree", Self::ColChunkHashesByHeight => "chunk hashes indexed by height_created", Self::ColBlockOrdinal => "block ordinal", + Self::ColGCCount => "gc count", }; write!(formatter, "{}", desc) } diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index a2cd17c08dd..11597f03550 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -11,7 +11,7 @@ use cached::{Cached, SizedCache}; pub use db::DBCol::{self, *}; pub use db::{ CHUNK_TAIL_KEY, HEADER_HEAD_KEY, HEAD_KEY, LARGEST_TARGET_HEIGHT_KEY, LATEST_KNOWN_KEY, - SYNC_HEAD_KEY, TAIL_KEY, + NUM_COLS, SYNC_HEAD_KEY, TAIL_KEY, }; use near_crypto::PublicKey; use near_primitives::account::{AccessKey, Account}; diff --git a/neard/src/lib.rs b/neard/src/lib.rs index 1093c55a2f0..24248075bbd 100644 --- a/neard/src/lib.rs +++ b/neard/src/lib.rs @@ -3,7 +3,7 @@ use std::path::Path; use std::sync::Arc; use actix::{Actor, Addr}; -use log::info; +use log::{error, info}; use tracing::trace; use near_chain::ChainGenesis; @@ -53,8 +53,23 @@ pub fn get_default_home() -> String { /// Function checks current version of the database and applies migrations to the database. pub fn apply_store_migrations(path: &String) { - let _db_version = get_store_version(path); + let db_version = get_store_version(path); + if db_version > near_primitives::version::DB_VERSION { + error!(target: "near", "DB version {} is created by a newer version of neard, please update neard or delete data", db_version); + std::process::exit(1); + } + if db_version == near_primitives::version::DB_VERSION { + return; + } // Add migrations here based on `db_version`. + if db_version == 1 { + // version 1 => 2: add gc column + // Does not need to do anything since open db with option `create_missing_column_families` + // Nevertheless need to bump db version, because db_version 1 binary can't open db_version 2 db + info!(target: "near", "Migrate DB from version 1 to 2"); + let store = create_store(&path); + set_store_version(&store); + } } pub fn init_and_migrate_store(home_dir: &Path) -> Arc {