Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIBD_IMPL] PIBD tree sync via network and kill/resume functionality #3691

Merged
merged 5 commits into from
Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,10 @@ impl Chain {
}
}
// If no desegmenter or headers don't match init
// Stop previous thread if running
if let Some(d) = self.pibd_desegmenter.read().as_ref() {
d.stop_validation_thread();
}
// TODO: (Check whether we can do this.. we *should* be able to modify this as the desegmenter
// is in flight and we cross a horizon boundary, but needs more thinking)
let desegmenter = self.init_desegmenter(archive_header)?;
Expand Down Expand Up @@ -1631,9 +1635,33 @@ fn setup_head(
// Note: We are rewinding and validating against a writeable extension.
// If validation is successful we will truncate the backend files
// to match the provided block header.
let header = batch.get_block_header(&head.last_block_h)?;
let mut pibd_in_progress = false;
let header = {
let head = batch.get_block_header(&head.last_block_h)?;
let pibd_tip = store.pibd_head()?;
let pibd_head = batch.get_block_header(&pibd_tip.last_block_h)?;
if pibd_head.height > head.height {
pibd_in_progress = true;
pibd_head
} else {
head
}
};

let res = txhashset::extending(header_pmmr, txhashset, &mut batch, |ext, batch| {
// If we're still downloading via PIBD, don't worry about sums and validations just yet
// We still want to rewind to the last completed block to ensure a consistent state
if pibd_in_progress {
debug!(
"init: PIBD appears to be in progress at height {}, hash {}, not validating, will attempt to continue",
header.height,
header.hash()
);
let extension = &mut ext.extension;
extension.rewind_mmrs_to_last_inserted_leaves()?;
return Ok(());
}

pipe::rewind_and_apply_fork(&header, ext, batch, &|_| Ok(()))?;

let extension = &mut ext.extension;
Expand Down
27 changes: 27 additions & 0 deletions chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::core::core::hash::{Hash, Hashed};
use crate::core::core::{Block, BlockHeader, BlockSums};
use crate::core::pow::Difficulty;
use crate::core::ser::{DeserializationMode, ProtocolVersion, Readable, Writeable};
use crate::core::{genesis, global, global::ChainTypes};
use crate::linked_list::MultiIndex;
use crate::types::{CommitPos, Tip};
use crate::util::secp::pedersen::Commitment;
Expand All @@ -35,6 +36,7 @@ const BLOCK_HEADER_PREFIX: u8 = b'h';
const BLOCK_PREFIX: u8 = b'b';
const HEAD_PREFIX: u8 = b'H';
const TAIL_PREFIX: u8 = b'T';
const PIBD_HEAD_PREFIX: u8 = b'I';
const HEADER_HEAD_PREFIX: u8 = b'G';
const OUTPUT_POS_PREFIX: u8 = b'p';

Expand Down Expand Up @@ -75,6 +77,26 @@ impl ChainStore {
option_to_not_found(self.db.get_ser(&[TAIL_PREFIX], None), || "TAIL".to_owned())
}

/// The current PIBD head (will differ from the other heads. Return genesis block if PIBD head doesn't exist).
pub fn pibd_head(&self) -> Result<Tip, Error> {
let res = option_to_not_found(self.db.get_ser(&[PIBD_HEAD_PREFIX], None), || {
"PIBD_HEAD".to_owned()
});

// todo: fix duplication in batch below
match res {
Ok(r) => Ok(r),
Err(_) => {
let gen = match global::get_chain_type() {
ChainTypes::Mainnet => genesis::genesis_main(),
ChainTypes::Testnet => genesis::genesis_test(),
_ => genesis::genesis_dev(),
};
Ok(Tip::from_header(&gen.header))
}
}
}

/// Header of the block at the head of the block chain (not the same thing as header_head).
pub fn head_header(&self) -> Result<BlockHeader, Error> {
self.get_block_header(&self.head()?.last_block_h)
Expand Down Expand Up @@ -201,6 +223,11 @@ impl<'a> Batch<'a> {
self.db.put_ser(&[HEADER_HEAD_PREFIX], t)
}

/// Save PIBD head to db.
pub fn save_pibd_head(&self, t: &Tip) -> Result<(), Error> {
self.db.put_ser(&[PIBD_HEAD_PREFIX], t)
}

/// get block
pub fn get_block(&self, h: &Hash) -> Result<Block, Error> {
option_to_not_found(self.db.get_ser(&to_key(BLOCK_PREFIX, h), None), || {
Expand Down
111 changes: 98 additions & 13 deletions chain/src/txhashset/desegmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
//! segmenter

use std::sync::Arc;
use std::thread;
use std::time::Duration;

use crate::core::core::hash::Hash;
use crate::core::core::{pmmr, pmmr::ReadablePMMR};
Expand All @@ -25,8 +27,9 @@ use crate::core::core::{
};
use crate::error::Error;
use crate::txhashset::{BitmapAccumulator, BitmapChunk, TxHashSet};
use crate::types::Tip;
use crate::util::secp::pedersen::RangeProof;
use crate::util::RwLock;
use crate::util::{RwLock, StopState};

use crate::store;
use crate::txhashset;
Expand All @@ -41,6 +44,8 @@ pub struct Desegmenter {
archive_header: BlockHeader,
store: Arc<store::ChainStore>,

validator_stop_state: Arc<StopState>,

default_bitmap_segment_height: u8,
default_output_segment_height: u8,
default_rangeproof_segment_height: u8,
Expand Down Expand Up @@ -75,6 +80,7 @@ impl Desegmenter {
header_pmmr,
archive_header,
store,
validator_stop_state: Arc::new(StopState::new()),
bitmap_accumulator: BitmapAccumulator::new(),
default_bitmap_segment_height: 9,
default_output_segment_height: 11,
Expand Down Expand Up @@ -111,6 +117,84 @@ impl Desegmenter {
self.all_segments_complete
}

/// Launch a separate validation thread, which will update and validate the body head
/// as we go
pub fn launch_validation_thread(&self) {
let stop_state = self.validator_stop_state.clone();
let txhashset = self.txhashset.clone();
let header_pmmr = self.header_pmmr.clone();
let store = self.store.clone();
let _ = thread::Builder::new()
.name("pibd-validation".to_string())
.spawn(move || {
Desegmenter::validation_loop(stop_state, txhashset, store, header_pmmr);
});
}

/// Stop the validation loop
pub fn stop_validation_thread(&self) {
self.validator_stop_state.stop();
}

/// Validation loop
fn validation_loop(
stop_state: Arc<StopState>,
txhashset: Arc<RwLock<TxHashSet>>,
store: Arc<store::ChainStore>,
header_pmmr: Arc<RwLock<txhashset::PMMRHandle<BlockHeader>>>,
) {
let mut latest_block_height = 0;
loop {
if stop_state.is_stopped() {
break;
}
thread::sleep(Duration::from_millis(1000));

trace!("In Desegmenter Validation Loop");
let local_output_mmr_size;
let local_kernel_mmr_size;
let local_rangeproof_mmr_size;
{
let txhashset = txhashset.read();
local_output_mmr_size = txhashset.output_mmr_size();
local_kernel_mmr_size = txhashset.kernel_mmr_size();
local_rangeproof_mmr_size = txhashset.rangeproof_mmr_size();
}

trace!("Output MMR Size: {}", local_output_mmr_size);
trace!("Rangeproof MMR Size: {}", local_rangeproof_mmr_size);
trace!("Kernel MMR Size: {}", local_kernel_mmr_size);

// Find latest 'complete' header.
// First take lesser of rangeproof and output mmr sizes
let latest_output_size =
std::cmp::min(local_output_mmr_size, local_rangeproof_mmr_size);
// Find first header in which 'output_mmr_size' and 'kernel_mmr_size' are greater than
// given sizes

{
let header_pmmr = header_pmmr.read();
let res = header_pmmr.get_first_header_with(
latest_output_size,
local_kernel_mmr_size,
latest_block_height,
store.clone(),
);
if let Some(h) = res {
latest_block_height = h.height;
debug!("PIBD Desegmenter Validation Loop: Latest block is: {:?}", h);
// TODO: 'In-flight' validation. At the moment the entire tree
// will be presented for validation after all segments are downloaded
// TODO: Unwraps
let tip = Tip::from_header(&h);
let batch = store.batch().unwrap();
batch.save_pibd_head(&tip).unwrap();
batch.commit().unwrap();
}
}
}
}

/// Apply next set of segments that are ready to be appended to their respective trees,
/// and kick off any validations that can happen. TODO: figure out where and how
/// this should be called considering any thread blocking implications
Expand Down Expand Up @@ -297,7 +381,7 @@ impl Desegmenter {
/// TODO: Accumulator will likely need to be stored locally to deal with server
/// being shut down and restarted
pub fn finalize_bitmap(&mut self) -> Result<(), Error> {
debug!(
trace!(
"pibd_desegmenter: finalizing and caching bitmap - accumulator root: {}",
self.bitmap_accumulator.root()
);
Expand Down Expand Up @@ -326,7 +410,7 @@ impl Desegmenter {
// Number of leaves (BitmapChunks)
self.bitmap_mmr_leaf_count =
(pmmr::n_leaves(self.archive_header.output_mmr_size) + 1023) / 1024;
debug!(
trace!(
"pibd_desegmenter - expected number of leaves in bitmap MMR: {}",
self.bitmap_mmr_leaf_count
);
Expand All @@ -343,7 +427,7 @@ impl Desegmenter {
)
.clone();

debug!(
trace!(
"pibd_desegmenter - expected size of bitmap MMR: {}",
self.bitmap_mmr_size
);
Expand Down Expand Up @@ -393,7 +477,7 @@ impl Desegmenter {
segment: Segment<BitmapChunk>,
output_root_hash: Hash,
) -> Result<(), Error> {
debug!("pibd_desegmenter: add bitmap segment");
trace!("pibd_desegmenter: add bitmap segment");
segment.validate_with(
self.bitmap_mmr_size, // Last MMR pos at the height being validated, in this case of the bitmap root
None,
Expand All @@ -402,7 +486,7 @@ impl Desegmenter {
output_root_hash, // Other root
true,
)?;
debug!("pibd_desegmenter: adding segment to cache");
trace!("pibd_desegmenter: adding segment to cache");
// All okay, add to our cached list of bitmap segments
self.cache_bitmap_segment(segment);
Ok(())
Expand All @@ -411,7 +495,7 @@ impl Desegmenter {
/// Apply a bitmap segment at the index cache
pub fn apply_bitmap_segment(&mut self, idx: usize) -> Result<(), Error> {
let segment = self.bitmap_segment_cache.remove(idx);
debug!(
trace!(
"pibd_desegmenter: apply bitmap segment at segment idx {}",
segment.identifier().idx
);
Expand Down Expand Up @@ -446,7 +530,7 @@ impl Desegmenter {
/// Apply an output segment at the index cache
pub fn apply_output_segment(&mut self, idx: usize) -> Result<(), Error> {
let segment = self.output_segment_cache.remove(idx);
debug!(
trace!(
"pibd_desegmenter: applying output segment at segment idx {}",
segment.identifier().idx
);
Expand All @@ -460,6 +544,7 @@ impl Desegmenter {
|ext, _batch| {
let extension = &mut ext.extension;
extension.apply_output_segment(segment)?;
debug!("Returning Ok");
Ok(())
},
)?;
Expand Down Expand Up @@ -509,7 +594,7 @@ impl Desegmenter {
segment: Segment<OutputIdentifier>,
bitmap_root: Option<Hash>,
) -> Result<(), Error> {
debug!("pibd_desegmenter: add output segment");
trace!("pibd_desegmenter: add output segment");
// TODO: This, something very wrong, probably need to reset entire body sync
// check bitmap root matches what we already have
/*if bitmap_root != Some(self.bitmap_accumulator.root()) {
Expand Down Expand Up @@ -552,7 +637,7 @@ impl Desegmenter {
/// Apply a rangeproof segment at the index cache
pub fn apply_rangeproof_segment(&mut self, idx: usize) -> Result<(), Error> {
let segment = self.rangeproof_segment_cache.remove(idx);
debug!(
trace!(
"pibd_desegmenter: applying rangeproof segment at segment idx {}",
segment.identifier().idx
);
Expand Down Expand Up @@ -611,7 +696,7 @@ impl Desegmenter {

/// Adds a Rangeproof segment
pub fn add_rangeproof_segment(&mut self, segment: Segment<RangeProof>) -> Result<(), Error> {
debug!("pibd_desegmenter: add rangeproof segment");
trace!("pibd_desegmenter: add rangeproof segment");
segment.validate(
self.archive_header.output_mmr_size, // Last MMR pos at the height being validated
self.bitmap_cache.as_ref(),
Expand Down Expand Up @@ -644,7 +729,7 @@ impl Desegmenter {
/// Apply a kernel segment at the index cache
pub fn apply_kernel_segment(&mut self, idx: usize) -> Result<(), Error> {
let segment = self.kernel_segment_cache.remove(idx);
debug!(
trace!(
"pibd_desegmenter: applying kernel segment at segment idx {}",
segment.identifier().idx
);
Expand Down Expand Up @@ -699,7 +784,7 @@ impl Desegmenter {

/// Adds a Kernel segment
pub fn add_kernel_segment(&mut self, segment: Segment<TxKernel>) -> Result<(), Error> {
debug!("pibd_desegmenter: add kernel segment");
trace!("pibd_desegmenter: add kernel segment");
segment.validate(
self.archive_header.kernel_mmr_size, // Last MMR pos at the height being validated
None,
Expand Down
Loading