Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup rebuild sync MMR logic #3001

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 6 additions & 18 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,18 +226,6 @@ impl Chain {
Ok(())
}

/// Reset sync_head to current header_head.
/// We do this when we first transition to header_sync to ensure we extend
/// the "sync" header MMR from a known consistent state and to ensure we track
/// the header chain correctly at the fork point.
pub fn reset_sync_head(&self) -> Result<Tip, Error> {
let batch = self.store.batch()?;
batch.reset_sync_head()?;
let head = batch.get_sync_head()?;
batch.commit()?;
Ok(head)
}

/// Processes a single block, then checks for orphans, processing
/// those as well if they're found
pub fn process_block(&self, b: Block, opts: Options) -> Result<Option<Tip>, Error> {
Expand Down Expand Up @@ -708,14 +696,15 @@ impl Chain {
/// Rebuild the sync MMR based on current header_head.
/// We rebuild the sync MMR when first entering sync mode so ensure we
/// have an MMR we can safely rewind based on the headers received from a peer.
/// TODO - think about how to optimize this.
pub fn rebuild_sync_mmr(&self, head: &Tip) -> Result<(), Error> {
let mut txhashset = self.txhashset.write();
let mut batch = self.store.batch()?;
let header = batch.get_block_header(&head.hash())?;
txhashset::sync_extending(&mut txhashset, &mut batch, |extension| {
extension.rebuild(head, &self.genesis)?;
pipe::rewind_and_apply_header_fork(&header, extension)?;
Ok(())
})?;
batch.save_sync_head(&head)?;
batch.commit()?;
Ok(())
}
Expand All @@ -724,7 +713,6 @@ impl Chain {
/// We rebuild the header MMR after receiving a txhashset from a peer.
/// The txhashset contains output, rangeproof and kernel MMRs but we construct
/// the header MMR locally based on headers from our db.
/// TODO - think about how to optimize this.
fn rebuild_header_mmr(
&self,
head: &Tip,
Expand Down Expand Up @@ -1425,7 +1413,7 @@ fn setup_head(
let header_head = batch.header_head()?;
if batch.get_block_header(&header_head.last_block_h).is_ok() {
// Reset sync_head to be consistent with current header_head.
batch.reset_sync_head()?;
batch.save_sync_head(&header_head)?;
} else {
// Reset both header_head and sync_head to be consistent with current head.
warn!(
Expand All @@ -1435,8 +1423,8 @@ fn setup_head(
head.last_block_h,
head.height,
);
batch.reset_header_head()?;
batch.reset_sync_head()?;
batch.save_header_head(&head)?;
batch.save_sync_head(&head)?;
}

batch.commit()?;
Expand Down
45 changes: 18 additions & 27 deletions chain/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,11 @@ pub fn sync_block_headers(
if headers.is_empty() {
return Ok(());
}

let first_header = headers.first().expect("first header");
let last_header = headers.last().expect("last header");
let prev_header = ctx.batch.get_previous_header(&first_header)?;

// Check these are the "next" headers, i.e. we already know about the previous one.
ctx.batch.get_previous_header(&first_header)?;

// Check if we know about all these headers. If so we can accept them quickly.
// If they *do not* increase total work on the sync chain we are done.
Expand All @@ -191,22 +192,20 @@ pub fn sync_block_headers(
}
}

txhashset::sync_extending(&mut ctx.txhashset, &mut ctx.batch, |extension| {
rewind_and_apply_header_fork(&prev_header, extension)?;
for header in headers {
extension.validate_root(header)?;
extension.apply_header(header)?;
add_block_header(header, &extension.batch)?;
}
Ok(())
})?;

// Validate all our headers now that we have added each "previous"
// header to the db in this batch above.
// Validate and add all headers in this chunk to our current batch.
// If anything fails to validate the entire batch will rollback.
for header in headers {
validate_header(header, ctx)?;
add_block_header(header, &ctx.batch)?;
}

// Now we can simply rewind and apply all the headers in this chunk
// up to and including the last header.
// If anything fails to validate the entire batch will rollback.
txhashset::sync_extending(&mut ctx.txhashset, &mut ctx.batch, |extension| {
rewind_and_apply_header_fork(&last_header, extension)
})?;

if has_more_work(&last_header, &sync_head) {
update_sync_head(&Tip::from_header(&last_header), &mut ctx.batch)?;
}
Expand All @@ -220,7 +219,7 @@ pub fn sync_block_headers(
/// to allow processing to continue (for header itself).
pub fn process_block_header(header: &BlockHeader, ctx: &mut BlockContext<'_>) -> Result<(), Error> {
// Check this header is not an orphan, we must know about the previous header to continue.
let prev_header = ctx.batch.get_previous_header(&header)?;
ctx.batch.get_previous_header(&header)?;

// If this header is "known" then stop processing the header.
// Do not stop processing with an error though.
Expand All @@ -239,19 +238,17 @@ pub fn process_block_header(header: &BlockHeader, ctx: &mut BlockContext<'_>) ->
}
}

validate_header(header, ctx)?;
add_block_header(header, &ctx.batch)?;

txhashset::header_extending(&mut ctx.txhashset, &mut ctx.batch, |extension| {
rewind_and_apply_header_fork(&prev_header, extension)?;
extension.validate_root(header)?;
extension.apply_header(header)?;
rewind_and_apply_header_fork(&header, extension)?;
if !has_more_work(&header, &header_head) {
extension.force_rollback();
}
Ok(())
})?;

validate_header(header, ctx)?;
add_block_header(header, &ctx.batch)?;

// Update header_head independently of chain head (full blocks).
// If/when we process the corresponding full block we will update the
// chain head to match. This allows our header chain to extend safely beyond
Expand Down Expand Up @@ -527,12 +524,6 @@ pub fn rewind_and_apply_header_fork(
header: &BlockHeader,
ext: &mut txhashset::HeaderExtension<'_>,
) -> Result<(), Error> {
let head = ext.head();
if header.hash() == head.last_block_h {
// Nothing to rewind and nothing to reapply. Done.
return Ok(());
}

let mut fork_hashes = vec![];
let mut current = header.clone();
while current.height > 0 && !ext.is_on_current_chain(&current).is_ok() {
Expand Down
12 changes: 0 additions & 12 deletions chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,18 +180,6 @@ impl<'a> Batch<'a> {
self.db.put_ser(&vec![SYNC_HEAD_PREFIX], t)
}

/// Reset sync_head to the current head of the header chain.
pub fn reset_sync_head(&self) -> Result<(), Error> {
let head = self.header_head()?;
self.save_sync_head(&head)
}

/// Reset header_head to the current head of the body chain.
pub fn reset_header_head(&self) -> Result<(), Error> {
let tip = self.head()?;
self.save_header_head(&tip)
}

/// get block
pub fn get_block(&self, h: &Hash) -> Result<Block, Error> {
option_to_not_found(
Expand Down
6 changes: 1 addition & 5 deletions servers/src/grin/sync/header_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,13 @@ impl HeaderSync {
header_head.height,
);

// Reset sync_head to header_head on transition to HeaderSync,
// Reset sync MMR based on current header MMR on transition to HeaderSync,
// but ONLY on initial transition to HeaderSync state.
//
// The header_head and sync_head may diverge here in the presence of a fork
// in the header chain. Ensure we track the new advertised header chain here
// correctly, so reset any previous (and potentially stale) sync_head to match
// our last known "good" header_head.
//
self.chain.reset_sync_head()?;

// Rebuild the sync MMR to match our updated sync_head.
self.chain.rebuild_sync_mmr(&header_head)?;

self.history_locator.retain(|&x| x.0 == 0);
Expand Down