Skip to content

Commit

Permalink
cleanup redundant sync_head and associated MMR (#3556)
Browse files Browse the repository at this point in the history
  • Loading branch information
antiochp authored Feb 24, 2021
1 parent 3583028 commit 03b7518
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 92 deletions.
65 changes: 5 additions & 60 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ pub struct Chain {
orphans: Arc<OrphanBlockPool>,
txhashset: Arc<RwLock<txhashset::TxHashSet>>,
header_pmmr: Arc<RwLock<txhashset::PMMRHandle<BlockHeader>>>,
sync_pmmr: Arc<RwLock<txhashset::PMMRHandle<BlockHeader>>>,
verifier_cache: Arc<RwLock<dyn VerifierCache>>,
pibd_segmenter: Arc<RwLock<Option<Segmenter>>>,
// POW verification function
Expand Down Expand Up @@ -187,20 +186,8 @@ impl Chain {
ProtocolVersion(1),
None,
)?;
let mut sync_pmmr = PMMRHandle::new(
Path::new(&db_root).join("header").join("sync_head"),
false,
ProtocolVersion(1),
None,
)?;

setup_head(
&genesis,
&store,
&mut header_pmmr,
&mut sync_pmmr,
&mut txhashset,
)?;
setup_head(&genesis, &store, &mut header_pmmr, &mut txhashset)?;

// Initialize the output_pos index based on UTXO set
// and NRD kernel_pos index based recent kernel history.
Expand All @@ -218,7 +205,6 @@ impl Chain {
orphans: Arc::new(OrphanBlockPool::new()),
txhashset: Arc::new(RwLock::new(txhashset)),
header_pmmr: Arc::new(RwLock::new(header_pmmr)),
sync_pmmr: Arc::new(RwLock::new(sync_pmmr)),
pibd_segmenter: Arc::new(RwLock::new(None)),
pow_verifier,
verifier_cache,
Expand Down Expand Up @@ -273,7 +259,6 @@ impl Chain {
};
log_head("head", self.head()?);
log_head("header_head", self.header_head()?);
log_head("sync_head", self.get_sync_head()?);
Ok(())
}

Expand Down Expand Up @@ -497,23 +482,14 @@ impl Chain {
/// This is only ever used during sync and is based on sync_head.
/// We update header_head here if our total work increases.
pub fn sync_block_headers(&self, headers: &[BlockHeader], opts: Options) -> Result<(), Error> {
let mut sync_pmmr = self.sync_pmmr.write();
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();

// Sync the chunk of block headers, updating sync_head as necessary.
// Sync the chunk of block headers, updating header_head if total work increases.
{
let batch = self.store.batch()?;
let mut ctx = self.new_ctx(opts, batch, &mut sync_pmmr, &mut txhashset)?;
pipe::sync_block_headers(headers, &mut ctx)?;
ctx.batch.commit()?;
}

// Now "process" the last block header, updating header_head to match sync_head.
if let Some(header) = headers.last() {
let batch = self.store.batch()?;
let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?;
pipe::process_block_header(header, &mut ctx)?;
pipe::process_block_headers(headers, &mut ctx)?;
ctx.batch.commit()?;
}

Expand Down Expand Up @@ -944,21 +920,6 @@ impl Chain {
Ok(())
}

/// Rebuild the sync MMR based on current header_head.
/// We rebuild the sync MMR when first entering sync mode so ensure we
/// have an MMR we can safely rewind based on the headers received from a peer.
pub fn rebuild_sync_mmr(&self, head: &Tip) -> Result<(), Error> {
let mut sync_pmmr = self.sync_pmmr.write();
let mut batch = self.store.batch()?;
let header = batch.get_block_header(&head.hash())?;
txhashset::header_extending(&mut sync_pmmr, &mut batch, |ext, batch| {
pipe::rewind_and_apply_header_fork(&header, ext, batch)?;
Ok(())
})?;
batch.commit()?;
Ok(())
}

/// Finds the "fork point" where header chain diverges from full block chain.
/// If we are syncing this will correspond to the last full block where
/// the next header is known but we do not yet have the full block.
Expand Down Expand Up @@ -1572,18 +1533,9 @@ impl Chain {
}
}

/// Get the tip of the current "sync" header chain.
/// This may be significantly different to current header chain.
pub fn get_sync_head(&self) -> Result<Tip, Error> {
let hash = self.sync_pmmr.read().head_hash()?;
let header = self.store.get_block_header(&hash)?;
Ok(Tip::from_header(&header))
}

/// Gets multiple headers at the provided heights.
/// Note: Uses the sync pmmr, not the header pmmr.
pub fn get_locator_hashes(&self, heights: &[u64]) -> Result<Vec<Hash>, Error> {
let pmmr = self.sync_pmmr.read();
let pmmr = self.header_pmmr.read();
heights
.iter()
.map(|h| pmmr.get_header_hash_by_height(*h))
Expand Down Expand Up @@ -1611,12 +1563,11 @@ fn setup_head(
genesis: &Block,
store: &store::ChainStore,
header_pmmr: &mut txhashset::PMMRHandle<BlockHeader>,
sync_pmmr: &mut txhashset::PMMRHandle<BlockHeader>,
txhashset: &mut txhashset::TxHashSet,
) -> Result<(), Error> {
let mut batch = store.batch()?;

// Apply the genesis header to header and sync MMRs.
// Apply the genesis header to header MMR.
{
if batch.get_block_header(&genesis.hash()).is_err() {
batch.save_block_header(&genesis.header)?;
Expand All @@ -1627,12 +1578,6 @@ fn setup_head(
ext.apply_header(&genesis.header)
})?;
}

if sync_pmmr.last_pos == 0 {
txhashset::header_extending(sync_pmmr, &mut batch, |ext, _| {
ext.apply_header(&genesis.header)
})?;
}
}

// Make sure our header PMMR is consistent with header_head from db if it exists.
Expand Down
16 changes: 11 additions & 5 deletions chain/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ pub fn process_block(
}
}

/// Sync a chunk of block headers.
/// Process a batch of sequential block headers.
/// This is only used during header sync.
pub fn sync_block_headers(
pub fn process_block_headers(
headers: &[BlockHeader],
ctx: &mut BlockContext<'_>,
) -> Result<(), Error> {
Expand All @@ -193,14 +193,14 @@ pub fn sync_block_headers(
// Check if we know about all these headers. If so we can accept them quickly.
// If they *do not* increase total work on the sync chain we are done.
// If they *do* increase total work then we should process them to update sync_head.
let sync_head = {
let head = {
let hash = ctx.header_pmmr.head_hash()?;
let header = ctx.batch.get_block_header(&hash)?;
Tip::from_header(&header)
};

if let Ok(existing) = ctx.batch.get_block_header(&last_header.hash()) {
if !has_more_work(&existing, &sync_head) {
if !has_more_work(&existing, &head) {
return Ok(());
}
}
Expand All @@ -216,7 +216,13 @@ pub fn sync_block_headers(
txhashset::header_extending(&mut ctx.header_pmmr, &mut ctx.batch, |ext, batch| {
rewind_and_apply_header_fork(&last_header, ext, batch)?;
Ok(())
})
})?;

if has_more_work(last_header, &head) {
update_header_head(&Tip::from_header(last_header), &mut ctx.batch)?;
}

Ok(())
}

/// Process a block header. Update the header MMR and corresponding header_head if this header
Expand Down
34 changes: 7 additions & 27 deletions servers/src/grin/sync/header_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;

use crate::chain::{self, SyncState, SyncStatus};
use crate::common::types::Error;
use crate::core::core::hash::{Hash, Hashed};
use crate::core::core::hash::Hash;
use crate::core::pow::Difficulty;
use crate::p2p::{self, types::ReasonForBan, Capabilities, Peer};

Expand Down Expand Up @@ -59,28 +59,10 @@ impl HeaderSync {
let enable_header_sync = match self.sync_state.status() {
SyncStatus::BodySync { .. }
| SyncStatus::HeaderSync { .. }
| SyncStatus::TxHashsetDone => true,
SyncStatus::NoSync | SyncStatus::Initial | SyncStatus::AwaitingPeers(_) => {
let sync_head = self.chain.get_sync_head()?;
debug!(
"sync: initial transition to HeaderSync. sync_head: {} at {}, resetting to: {} at {}",
sync_head.hash(),
sync_head.height,
header_head.hash(),
header_head.height,
);

// Reset sync_head to header_head on transition to HeaderSync,
// but ONLY on initial transition to HeaderSync state.
//
// The header_head and sync_head may diverge here in the presence of a fork
// in the header chain. Ensure we track the new advertised header chain here
// correctly, so reset any previous (and potentially stale) sync_head to match
// our last known "good" header_head.
//
self.chain.rebuild_sync_mmr(&header_head)?;
true
}
| SyncStatus::TxHashsetDone
| SyncStatus::NoSync
| SyncStatus::Initial
| SyncStatus::AwaitingPeers(_) => true,
_ => false,
};

Expand Down Expand Up @@ -211,11 +193,9 @@ impl HeaderSync {
return None;
}

/// We build a locator based on sync_head.
/// Even if sync_head is significantly out of date we will "reset" it once we
/// start getting headers back from a peer.
/// Build a locator based on header_head.
fn get_locator(&mut self) -> Result<Vec<Hash>, Error> {
let tip = self.chain.get_sync_head()?;
let tip = self.chain.header_head()?;
let heights = get_locator_heights(tip.height);
let locator = self.chain.get_locator_hashes(&heights)?;
Ok(locator)
Expand Down

0 comments on commit 03b7518

Please sign in to comment.