Skip to content

Commit

Permalink
fix issue of header sync stuck after rejoining: clear all headers at …
Browse files Browse the repository at this point in the history
…heights current body head height all the way up to current header height
  • Loading branch information
ppca committed Apr 5, 2023
1 parent 8f0d2a1 commit 31147be
Showing 1 changed file with 47 additions and 37 deletions.
84 changes: 47 additions & 37 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1957,41 +1957,20 @@ impl<'a> ChainStoreUpdate<'a> {
self.chunk_tail = Some(height);
}

fn clear_chunk_data_and_headers_at_height(&mut self, height: BlockHeight) -> Result<(), Error> {
let chunk_hashes = self.chain_store.get_all_chunk_hashes_by_height(height)?;
for chunk_hash in chunk_hashes {
// 1. Delete chunk-related data
let chunk = self.get_chunk(&chunk_hash)?.clone();
debug_assert_eq!(chunk.cloned_header().height_created(), height);
for transaction in chunk.transactions() {
self.gc_col(DBCol::Transactions, transaction.get_hash().as_bytes());
}
for receipt in chunk.receipts() {
self.gc_col(DBCol::Receipts, receipt.get_hash().as_bytes());
fn clear_header_data_for_heights(&mut self, start: BlockHeight, end: BlockHeight) -> Result<(), Error> {
for height in start..end+1 {
let header_hashes = self.chain_store.get_all_header_hashes_by_height(height)?;
for header_hash in header_hashes {
// Delete header_hash-indexed data: block header
let mut store_update = self.store().store_update();
let key: &[u8] = header_hash.as_bytes();
store_update.delete(DBCol::BlockHeader, key);
self.chain_store.headers.pop(key);
self.merge(store_update);
}

// 2. Delete chunk_hash-indexed data
let chunk_hash = chunk_hash.as_bytes();
self.gc_col(DBCol::Chunks, chunk_hash);
self.gc_col(DBCol::PartialChunks, chunk_hash);
self.gc_col(DBCol::InvalidChunks, chunk_hash);
}

let header_hashes = self.chain_store.get_all_header_hashes_by_height(height)?;
for header_hash in header_hashes {
// 3. Delete header_hash-indexed data: block header
let mut store_update = self.store().store_update();
let key: &[u8] = header_hash.as_bytes();
store_update.delete(DBCol::BlockHeader, key);
self.chain_store.headers.pop(key);
self.merge(store_update);
let key = index_to_bytes(height);
self.gc_col(DBCol::HeaderHashesByHeight, &key);
}

// 4. Delete chunks_tail-related data
let key = index_to_bytes(height);
self.gc_col(DBCol::ChunkHashesByHeight, &key);
self.gc_col(DBCol::HeaderHashesByHeight, &key);

Ok(())
}

Expand Down Expand Up @@ -2250,14 +2229,16 @@ impl<'a> ChainStoreUpdate<'a> {
&mut self,
runtime_adapter: &dyn RuntimeWithEpochManagerAdapter,
) -> Result<(), Error> {
let header_head = self.header_head().unwrap();
let header_head_height = header_head.height;
let block_hash = self.head().unwrap().last_block_hash;

let block =
self.get_block(&block_hash).expect("block data is not expected to be already cleaned");

let epoch_id = block.header().epoch_id();

let height = block.header().height();
let head_height = block.header().height();

// 1. Delete shard_id-indexed data (TrieChanges, Receipts, ChunkExtra, State Headers and Parts, FlatStorage data)
for shard_id in 0..block.header().chunk_mask().len() as ShardId {
Expand Down Expand Up @@ -2318,9 +2299,38 @@ impl<'a> ChainStoreUpdate<'a> {
self.gc_col(DBCol::NextBlockHashes, block.header().prev_hash().as_bytes());

// 4. Update or delete block_hash_per_height
self.gc_col_block_per_height(&block_hash, height, block.header().epoch_id())?;
self.gc_col_block_per_height(&block_hash, head_height, block.header().epoch_id())?;

self.clear_chunk_data_at_height(head_height)?;

self.clear_header_data_for_heights(head_height, header_head_height)?;

Ok(())
}

fn clear_chunk_data_at_height(&mut self, height: BlockHeight) -> Result<(), Error> {
let chunk_hashes = self.chain_store.get_all_chunk_hashes_by_height(height)?;
for chunk_hash in chunk_hashes {
// 1. Delete chunk-related data
let chunk = self.get_chunk(&chunk_hash)?.clone();
debug_assert_eq!(chunk.cloned_header().height_created(), height);
for transaction in chunk.transactions() {
self.gc_col(DBCol::Transactions, transaction.get_hash().as_bytes());
}
for receipt in chunk.receipts() {
self.gc_col(DBCol::Receipts, receipt.get_hash().as_bytes());
}

// 2. Delete chunk_hash-indexed data
let chunk_hash = chunk_hash.as_bytes();
self.gc_col(DBCol::Chunks, chunk_hash);
self.gc_col(DBCol::PartialChunks, chunk_hash);
self.gc_col(DBCol::InvalidChunks, chunk_hash);
}

self.clear_chunk_data_and_headers_at_height(height)?;
// 4. Delete chun, header and blocks per height
let key = index_to_bytes(height);
self.gc_col(DBCol::ChunkHashesByHeight, &key);

Ok(())
}
Expand Down Expand Up @@ -2422,7 +2432,7 @@ impl<'a> ChainStoreUpdate<'a> {
Ok(())
}

pub fn gc_col(&mut self, col: DBCol, key: &[u8]) {
fn gc_col(&mut self, col: DBCol, key: &[u8]) {
let mut store_update = self.store().store_update();
match col {
DBCol::OutgoingReceipts => {
Expand Down

0 comments on commit 31147be

Please sign in to comment.