Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retire sync head (and sync MMR) #3556

Merged
merged 1 commit into from
Feb 24, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 5 additions & 60 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
@@ -151,7 +151,6 @@ pub struct Chain {
orphans: Arc<OrphanBlockPool>,
txhashset: Arc<RwLock<txhashset::TxHashSet>>,
header_pmmr: Arc<RwLock<txhashset::PMMRHandle<BlockHeader>>>,
sync_pmmr: Arc<RwLock<txhashset::PMMRHandle<BlockHeader>>>,
verifier_cache: Arc<RwLock<dyn VerifierCache>>,
pibd_segmenter: Arc<RwLock<Option<Segmenter>>>,
// POW verification function
@@ -187,20 +186,8 @@ impl Chain {
ProtocolVersion(1),
None,
)?;
let mut sync_pmmr = PMMRHandle::new(
Path::new(&db_root).join("header").join("sync_head"),
false,
ProtocolVersion(1),
None,
)?;

setup_head(
&genesis,
&store,
&mut header_pmmr,
&mut sync_pmmr,
&mut txhashset,
)?;
setup_head(&genesis, &store, &mut header_pmmr, &mut txhashset)?;

// Initialize the output_pos index based on UTXO set
// and NRD kernel_pos index based recent kernel history.
@@ -218,7 +205,6 @@ impl Chain {
orphans: Arc::new(OrphanBlockPool::new()),
txhashset: Arc::new(RwLock::new(txhashset)),
header_pmmr: Arc::new(RwLock::new(header_pmmr)),
sync_pmmr: Arc::new(RwLock::new(sync_pmmr)),
pibd_segmenter: Arc::new(RwLock::new(None)),
pow_verifier,
verifier_cache,
@@ -273,7 +259,6 @@ impl Chain {
};
log_head("head", self.head()?);
log_head("header_head", self.header_head()?);
log_head("sync_head", self.get_sync_head()?);
Ok(())
}

@@ -497,23 +482,14 @@ impl Chain {
/// This is only ever used during sync and is based on sync_head.
/// We update header_head here if our total work increases.
pub fn sync_block_headers(&self, headers: &[BlockHeader], opts: Options) -> Result<(), Error> {
let mut sync_pmmr = self.sync_pmmr.write();
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();

// Sync the chunk of block headers, updating sync_head as necessary.
// Sync the chunk of block headers, updating header_head if total work increases.
{
let batch = self.store.batch()?;
let mut ctx = self.new_ctx(opts, batch, &mut sync_pmmr, &mut txhashset)?;
pipe::sync_block_headers(headers, &mut ctx)?;
ctx.batch.commit()?;
}

// Now "process" the last block header, updating header_head to match sync_head.
if let Some(header) = headers.last() {
let batch = self.store.batch()?;
let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?;
pipe::process_block_header(header, &mut ctx)?;
pipe::process_block_headers(headers, &mut ctx)?;
ctx.batch.commit()?;
}

@@ -944,21 +920,6 @@ impl Chain {
Ok(())
}

/// Rebuild the sync MMR based on current header_head.
/// We rebuild the sync MMR when first entering sync mode so ensure we
/// have an MMR we can safely rewind based on the headers received from a peer.
pub fn rebuild_sync_mmr(&self, head: &Tip) -> Result<(), Error> {
let mut sync_pmmr = self.sync_pmmr.write();
let mut batch = self.store.batch()?;
let header = batch.get_block_header(&head.hash())?;
txhashset::header_extending(&mut sync_pmmr, &mut batch, |ext, batch| {
pipe::rewind_and_apply_header_fork(&header, ext, batch)?;
Ok(())
})?;
batch.commit()?;
Ok(())
}

/// Finds the "fork point" where header chain diverges from full block chain.
/// If we are syncing this will correspond to the last full block where
/// the next header is known but we do not yet have the full block.
@@ -1572,18 +1533,9 @@ impl Chain {
}
}

/// Get the tip of the current "sync" header chain.
/// This may be significantly different to current header chain.
pub fn get_sync_head(&self) -> Result<Tip, Error> {
let hash = self.sync_pmmr.read().head_hash()?;
let header = self.store.get_block_header(&hash)?;
Ok(Tip::from_header(&header))
}

/// Gets multiple headers at the provided heights.
/// Note: Uses the sync pmmr, not the header pmmr.
pub fn get_locator_hashes(&self, heights: &[u64]) -> Result<Vec<Hash>, Error> {
let pmmr = self.sync_pmmr.read();
let pmmr = self.header_pmmr.read();
heights
.iter()
.map(|h| pmmr.get_header_hash_by_height(*h))
@@ -1611,12 +1563,11 @@ fn setup_head(
genesis: &Block,
store: &store::ChainStore,
header_pmmr: &mut txhashset::PMMRHandle<BlockHeader>,
sync_pmmr: &mut txhashset::PMMRHandle<BlockHeader>,
txhashset: &mut txhashset::TxHashSet,
) -> Result<(), Error> {
let mut batch = store.batch()?;

// Apply the genesis header to header and sync MMRs.
// Apply the genesis header to header MMR.
{
if batch.get_block_header(&genesis.hash()).is_err() {
batch.save_block_header(&genesis.header)?;
@@ -1627,12 +1578,6 @@ fn setup_head(
ext.apply_header(&genesis.header)
})?;
}

if sync_pmmr.last_pos == 0 {
txhashset::header_extending(sync_pmmr, &mut batch, |ext, _| {
ext.apply_header(&genesis.header)
})?;
}
}

// Make sure our header PMMR is consistent with header_head from db if it exists.
16 changes: 11 additions & 5 deletions chain/src/pipe.rs
Original file line number Diff line number Diff line change
@@ -179,9 +179,9 @@ pub fn process_block(
}
}

/// Sync a chunk of block headers.
/// Process a batch of sequential block headers.
/// This is only used during header sync.
pub fn sync_block_headers(
pub fn process_block_headers(
headers: &[BlockHeader],
ctx: &mut BlockContext<'_>,
) -> Result<(), Error> {
@@ -193,14 +193,14 @@ pub fn sync_block_headers(
// Check if we know about all these headers. If so we can accept them quickly.
// If they *do not* increase total work on the sync chain we are done.
// If they *do* increase total work then we should process them to update sync_head.
let sync_head = {
let head = {
let hash = ctx.header_pmmr.head_hash()?;
let header = ctx.batch.get_block_header(&hash)?;
Tip::from_header(&header)
};

if let Ok(existing) = ctx.batch.get_block_header(&last_header.hash()) {
if !has_more_work(&existing, &sync_head) {
if !has_more_work(&existing, &head) {
return Ok(());
}
}
@@ -216,7 +216,13 @@ pub fn sync_block_headers(
txhashset::header_extending(&mut ctx.header_pmmr, &mut ctx.batch, |ext, batch| {
rewind_and_apply_header_fork(&last_header, ext, batch)?;
Ok(())
})
})?;

if has_more_work(last_header, &head) {
update_header_head(&Tip::from_header(last_header), &mut ctx.batch)?;
}

Ok(())
}

/// Process a block header. Update the header MMR and corresponding header_head if this header
34 changes: 7 additions & 27 deletions servers/src/grin/sync/header_sync.rs
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ use std::sync::Arc;

use crate::chain::{self, SyncState, SyncStatus};
use crate::common::types::Error;
use crate::core::core::hash::{Hash, Hashed};
use crate::core::core::hash::Hash;
use crate::core::pow::Difficulty;
use crate::p2p::{self, types::ReasonForBan, Capabilities, Peer};

@@ -59,28 +59,10 @@ impl HeaderSync {
let enable_header_sync = match self.sync_state.status() {
SyncStatus::BodySync { .. }
| SyncStatus::HeaderSync { .. }
| SyncStatus::TxHashsetDone => true,
SyncStatus::NoSync | SyncStatus::Initial | SyncStatus::AwaitingPeers(_) => {
let sync_head = self.chain.get_sync_head()?;
debug!(
"sync: initial transition to HeaderSync. sync_head: {} at {}, resetting to: {} at {}",
sync_head.hash(),
sync_head.height,
header_head.hash(),
header_head.height,
);

// Reset sync_head to header_head on transition to HeaderSync,
// but ONLY on initial transition to HeaderSync state.
//
// The header_head and sync_head may diverge here in the presence of a fork
// in the header chain. Ensure we track the new advertised header chain here
// correctly, so reset any previous (and potentially stale) sync_head to match
// our last known "good" header_head.
//
self.chain.rebuild_sync_mmr(&header_head)?;
true
}
| SyncStatus::TxHashsetDone
| SyncStatus::NoSync
| SyncStatus::Initial
| SyncStatus::AwaitingPeers(_) => true,
_ => false,
};

@@ -211,11 +193,9 @@ impl HeaderSync {
return None;
}

/// We build a locator based on sync_head.
/// Even if sync_head is significantly out of date we will "reset" it once we
/// start getting headers back from a peer.
/// Build a locator based on header_head.
fn get_locator(&mut self) -> Result<Vec<Hash>, Error> {
let tip = self.chain.get_sync_head()?;
let tip = self.chain.header_head()?;
let heights = get_locator_heights(tip.height);
let locator = self.chain.get_locator_hashes(&heights)?;
Ok(locator)