-
Notifications
You must be signed in to change notification settings - Fork 632
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: undo-block tool #8681
feat: undo-block tool #8681
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,6 +47,7 @@ use crate::chunks_store::ReadOnlyChunksStore; | |
use crate::types::{Block, BlockHeader, LatestKnown}; | ||
use crate::{byzantine_assert, RuntimeWithEpochManagerAdapter}; | ||
use near_store::db::StoreStatistics; | ||
use near_store::flat::store_helper; | ||
use std::sync::Arc; | ||
|
||
/// lru cache size | ||
|
@@ -1956,6 +1957,27 @@ impl<'a> ChainStoreUpdate<'a> { | |
self.chunk_tail = Some(height); | ||
} | ||
|
||
fn clear_header_data_for_heights( | ||
&mut self, | ||
start: BlockHeight, | ||
end: BlockHeight, | ||
) -> Result<(), Error> { | ||
for height in start..=end { | ||
let header_hashes = self.chain_store.get_all_header_hashes_by_height(height)?; | ||
for header_hash in header_hashes { | ||
// Delete header_hash-indexed data: block header | ||
let mut store_update = self.store().store_update(); | ||
let key: &[u8] = header_hash.as_bytes(); | ||
store_update.delete(DBCol::BlockHeader, key); | ||
self.chain_store.headers.pop(key); | ||
self.merge(store_update); | ||
} | ||
let key = index_to_bytes(height); | ||
self.gc_col(DBCol::HeaderHashesByHeight, &key); | ||
} | ||
Ok(()) | ||
} | ||
|
||
pub fn clear_chunk_data_and_headers( | ||
&mut self, | ||
min_chunk_height: BlockHeight, | ||
|
@@ -2205,6 +2227,118 @@ impl<'a> ChainStoreUpdate<'a> { | |
Ok(()) | ||
} | ||
|
||
// Delete all data in rocksdb that are partially or wholly indexed and can be looked up by hash of the current head of the chain | ||
ppca marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// and that indicates a link between current head and its prev block | ||
pub fn clear_head_block_data( | ||
&mut self, | ||
runtime_adapter: &dyn RuntimeWithEpochManagerAdapter, | ||
) -> Result<(), Error> { | ||
let header_head = self.header_head().unwrap(); | ||
let header_head_height = header_head.height; | ||
let block_hash = self.head().unwrap().last_block_hash; | ||
|
||
let block = | ||
self.get_block(&block_hash).expect("block data is not expected to be already cleaned"); | ||
|
||
let epoch_id = block.header().epoch_id(); | ||
|
||
let head_height = block.header().height(); | ||
|
||
// 1. Delete shard_id-indexed data (TrieChanges, Receipts, ChunkExtra, State Headers and Parts, FlatStorage data) | ||
for shard_id in 0..block.header().chunk_mask().len() as ShardId { | ||
let shard_uid = runtime_adapter.shard_id_to_uid(shard_id, epoch_id).unwrap(); | ||
let block_shard_id = get_block_shard_uid(&block_hash, &shard_uid); | ||
|
||
// delete TrieChanges | ||
self.gc_col(DBCol::TrieChanges, &block_shard_id); | ||
|
||
// delete Receipts | ||
self.gc_outgoing_receipts(&block_hash, shard_id); | ||
self.gc_col(DBCol::IncomingReceipts, &block_shard_id); | ||
|
||
// delete DBCol::ChunkExtra based on shard_uid since it's indexed by shard_uid in the storage | ||
self.gc_col(DBCol::ChunkExtra, &block_shard_id); | ||
|
||
// delete state parts and state headers | ||
if let Ok(shard_state_header) = self.chain_store.get_state_header(shard_id, block_hash) | ||
{ | ||
let state_num_parts = | ||
get_num_state_parts(shard_state_header.state_root_node().memory_usage); | ||
self.gc_col_state_parts(block_hash, shard_id, state_num_parts)?; | ||
let state_header_key = StateHeaderKey(shard_id, block_hash).try_to_vec()?; | ||
self.gc_col(DBCol::StateHeaders, &state_header_key); | ||
} | ||
|
||
// delete flat storage columns: FlatStateChanges and FlatStateDeltaMetadata | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @pugachAG @Longarithm flat state column changes are here, please help review There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Regarding epoch manager, will There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, we don't need anything else. |
||
if cfg!(feature = "protocol_feature_flat_state") { | ||
let mut store_update = self.store().store_update(); | ||
store_helper::remove_delta(&mut store_update, shard_uid, block_hash); | ||
self.merge(store_update); | ||
} | ||
} | ||
|
||
// 2. Delete block_hash-indexed data | ||
self.gc_col(DBCol::Block, block_hash.as_bytes()); | ||
self.gc_col(DBCol::BlockExtra, block_hash.as_bytes()); | ||
self.gc_col(DBCol::NextBlockHashes, block_hash.as_bytes()); | ||
self.gc_col(DBCol::ChallengedBlocks, block_hash.as_bytes()); | ||
self.gc_col(DBCol::BlocksToCatchup, block_hash.as_bytes()); | ||
let storage_key = KeyForStateChanges::for_block(&block_hash); | ||
let stored_state_changes: Vec<Box<[u8]>> = self | ||
.chain_store | ||
.store() | ||
.iter_prefix(DBCol::StateChanges, storage_key.as_ref()) | ||
.map(|item| item.map(|(key, _)| key)) | ||
.collect::<io::Result<Vec<_>>>()?; | ||
for key in stored_state_changes { | ||
self.gc_col(DBCol::StateChanges, &key); | ||
} | ||
self.gc_col(DBCol::BlockRefCount, block_hash.as_bytes()); | ||
self.gc_outcomes(&block)?; | ||
self.gc_col(DBCol::BlockInfo, block_hash.as_bytes()); | ||
self.gc_col(DBCol::StateDlInfos, block_hash.as_bytes()); | ||
|
||
// 3. update columns related to prev block (block refcount and NextBlockHashes) | ||
self.dec_block_refcount(block.header().prev_hash())?; | ||
self.gc_col(DBCol::NextBlockHashes, block.header().prev_hash().as_bytes()); | ||
|
||
// 4. Update or delete block_hash_per_height | ||
self.gc_col_block_per_height(&block_hash, head_height, block.header().epoch_id())?; | ||
|
||
self.clear_chunk_data_at_height(head_height)?; | ||
|
||
self.clear_header_data_for_heights(head_height, header_head_height)?; | ||
|
||
Ok(()) | ||
} | ||
|
||
fn clear_chunk_data_at_height(&mut self, height: BlockHeight) -> Result<(), Error> { | ||
let chunk_hashes = self.chain_store.get_all_chunk_hashes_by_height(height)?; | ||
for chunk_hash in chunk_hashes { | ||
// 1. Delete chunk-related data | ||
let chunk = self.get_chunk(&chunk_hash)?.clone(); | ||
debug_assert_eq!(chunk.cloned_header().height_created(), height); | ||
for transaction in chunk.transactions() { | ||
self.gc_col(DBCol::Transactions, transaction.get_hash().as_bytes()); | ||
} | ||
for receipt in chunk.receipts() { | ||
self.gc_col(DBCol::Receipts, receipt.get_hash().as_bytes()); | ||
} | ||
|
||
// 2. Delete chunk_hash-indexed data | ||
let chunk_hash = chunk_hash.as_bytes(); | ||
self.gc_col(DBCol::Chunks, chunk_hash); | ||
self.gc_col(DBCol::PartialChunks, chunk_hash); | ||
self.gc_col(DBCol::InvalidChunks, chunk_hash); | ||
} | ||
|
||
// 4. Delete chunk hashes per height | ||
let key = index_to_bytes(height); | ||
self.gc_col(DBCol::ChunkHashesByHeight, &key); | ||
|
||
Ok(()) | ||
} | ||
|
||
pub fn gc_col_block_per_height( | ||
&mut self, | ||
block_hash: &CryptoHash, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,3 +9,4 @@ mod runtimes; | |
#[cfg(feature = "sandbox")] | ||
mod sandbox; | ||
mod sharding_upgrade; | ||
mod undo_block; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
use near_chain::{ | ||
ChainGenesis, ChainStore, ChainStoreAccess, Provenance, RuntimeWithEpochManagerAdapter, | ||
}; | ||
use near_chain_configs::Genesis; | ||
use near_client::test_utils::TestEnv; | ||
use near_o11y::testonly::init_test_logger; | ||
use near_store::test_utils::create_test_store; | ||
use near_store::Store; | ||
use near_undo_block::undo_block; | ||
use nearcore::config::GenesisExt; | ||
use std::path::Path; | ||
use std::sync::Arc; | ||
|
||
/// Setup environment with one Near client for testing. | ||
fn setup_env( | ||
genesis: &Genesis, | ||
store: Store, | ||
) -> (TestEnv, Arc<dyn RuntimeWithEpochManagerAdapter>) { | ||
let chain_genesis = ChainGenesis::new(genesis); | ||
let runtime: Arc<dyn RuntimeWithEpochManagerAdapter> = | ||
nearcore::NightshadeRuntime::test(Path::new("../../../.."), store, genesis); | ||
(TestEnv::builder(chain_genesis).runtime_adapters(vec![runtime.clone()]).build(), runtime) | ||
} | ||
|
||
// Checks that Near client can successfully undo block on given height and then produce and process block normally after restart | ||
fn test_undo_block(epoch_length: u64, stop_height: u64) { | ||
ppca marked this conversation as resolved.
Show resolved
Hide resolved
|
||
init_test_logger(); | ||
|
||
let save_trie_changes = true; | ||
|
||
let mut genesis = Genesis::test(vec!["test0".parse().unwrap(), "test1".parse().unwrap()], 1); | ||
genesis.config.epoch_length = epoch_length; | ||
|
||
let store = create_test_store(); | ||
let (mut env, runtime) = setup_env(&genesis, store.clone()); | ||
|
||
for i in 1..=stop_height { | ||
let block = env.clients[0].produce_block(i).unwrap().unwrap(); | ||
env.process_block(0, block, Provenance::PRODUCED); | ||
} | ||
|
||
let mut chain_store = | ||
ChainStore::new(store.clone(), genesis.config.genesis_height, save_trie_changes); | ||
|
||
let current_head = chain_store.head().unwrap(); | ||
let prev_block_hash = current_head.prev_block_hash; | ||
|
||
undo_block(&mut chain_store, &*runtime).unwrap(); | ||
|
||
// after undo, the current head should be the prev_block_hash | ||
assert_eq!(chain_store.head().unwrap().last_block_hash.as_bytes(), prev_block_hash.as_bytes()); | ||
assert_eq!(chain_store.head().unwrap().height, stop_height - 1); | ||
|
||
// set up an environment again with the same store | ||
let (mut env, _) = setup_env(&genesis, store.clone()); | ||
// the new env should be able to produce block normally | ||
let block = env.clients[0].produce_block(stop_height).unwrap().unwrap(); | ||
env.process_block(0, block, Provenance::PRODUCED); | ||
|
||
// after processing the new block, the head should now be at stop_height | ||
assert_eq!(chain_store.head().unwrap().height, stop_height); | ||
} | ||
|
||
#[test] | ||
fn test_undo_block_middle_of_epoch() { | ||
test_undo_block(5, 3) | ||
} | ||
|
||
#[test] | ||
fn test_undo_block_end_of_epoch() { | ||
test_undo_block(5, 5) | ||
} | ||
|
||
#[test] | ||
fn test_undo_block_start_of_epoch() { | ||
test_undo_block(5, 6) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
[package] | ||
name = "near-undo-block" | ||
version = "0.0.0" | ||
authors.workspace = true | ||
publish = false | ||
edition.workspace = true | ||
|
||
[dependencies] | ||
anyhow.workspace = true | ||
clap.workspace = true | ||
tracing.workspace = true | ||
chrono.workspace = true | ||
|
||
near-chain.workspace = true | ||
near-chain-configs.workspace = true | ||
near-store.workspace = true | ||
nearcore.workspace = true | ||
near-primitives.workspace = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment: It sucks that
gc_col
doesn't work here and assumes that it is called only during GC... Later we can consider renaminggc_col
toremove_key
- it doesn't remove column as one may think.