Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

v1.6: Ensure cluster-confirmed roots are set on boot #17442

Merged
merged 3 commits into from
May 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ pub struct JsonRpcConfig {
pub rpc_threads: usize,
pub rpc_bigtable_timeout: Option<Duration>,
pub minimal_api: bool,
pub rpc_scan_and_fix_roots: bool,
}

#[derive(Clone)]
Expand Down
26 changes: 24 additions & 2 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ use std::{
sync::atomic::{AtomicBool, AtomicU64, Ordering},
sync::mpsc::Receiver,
sync::{Arc, Mutex, RwLock},
thread::sleep,
thread::{sleep, Builder},
time::Duration,
};

Expand Down Expand Up @@ -1086,6 +1086,23 @@ fn new_banks_from_ledger(
});
}

let blockstore = Arc::new(blockstore);
let blockstore_root_scan = if config.rpc_addrs.is_some()
&& config.rpc_config.enable_rpc_transaction_history
&& config.rpc_config.rpc_scan_and_fix_roots
{
let blockstore = blockstore.clone();
let exit = exit.clone();
Some(
Builder::new()
.name("blockstore-root-scan".to_string())
.spawn(move || blockstore.scan_and_fix_roots(&exit))
.unwrap(),
)
} else {
None
};

let process_options = blockstore_processor::ProcessOptions {
bpf_jit: config.bpf_jit,
poh_verify,
Expand All @@ -1098,7 +1115,6 @@ fn new_banks_from_ledger(
..blockstore_processor::ProcessOptions::default()
};

let blockstore = Arc::new(blockstore);
let transaction_history_services =
if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
initialize_rpc_transaction_history_services(
Expand Down Expand Up @@ -1190,6 +1206,12 @@ fn new_banks_from_ledger(
bank_forks.set_snapshot_config(config.snapshot_config.clone());
bank_forks.set_accounts_hash_interval_slots(config.accounts_hash_interval_slots);

if let Some(blockstore_root_scan) = blockstore_root_scan {
if let Err(err) = blockstore_root_scan.join() {
warn!("blockstore_root_scan failed to join {:?}", err);
}
}

(
genesis_config,
bank_forks,
Expand Down
49 changes: 49 additions & 0 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use std::{
path::{Path, PathBuf},
rc::Rc,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{sync_channel, Receiver, SyncSender, TrySendError},
Arc, Mutex, RwLock,
},
Expand Down Expand Up @@ -2882,13 +2883,61 @@ impl Blockstore {
self.last_root()
}

pub fn lowest_cleanup_slot(&self) -> Slot {
*self.lowest_cleanup_slot.read().unwrap()
}

pub fn storage_size(&self) -> Result<u64> {
self.db.storage_size()
}

pub fn is_primary_access(&self) -> bool {
self.db.is_primary_access()
}

pub fn scan_and_fix_roots(&self, exit: &Arc<AtomicBool>) -> Result<()> {
let ancestor_iterator = AncestorIterator::new(self.last_root(), &self)
.take_while(|&slot| slot >= self.lowest_cleanup_slot());

let mut find_missing_roots = Measure::start("find_missing_roots");
let mut roots_to_fix = vec![];
for slot in ancestor_iterator.filter(|slot| !self.is_root(*slot)) {
if exit.load(Ordering::Relaxed) {
return Ok(());
}
roots_to_fix.push(slot);
}
find_missing_roots.stop();
let mut fix_roots = Measure::start("fix_roots");
if !roots_to_fix.is_empty() {
info!("{} slots to be rooted", roots_to_fix.len());
for chunk in roots_to_fix.chunks(100) {
if exit.load(Ordering::Relaxed) {
return Ok(());
}
trace!("{:?}", chunk);
self.set_roots(&roots_to_fix)?;
}
} else {
debug!(
"No missing roots found in range {} to {}",
self.lowest_cleanup_slot(),
self.last_root()
);
}
fix_roots.stop();
datapoint_info!(
"blockstore-scan_and_fix_roots",
(
"find_missing_roots_us",
find_missing_roots.as_us() as i64,
i64
),
("num_roots_to_fix", roots_to_fix.len() as i64, i64),
("fix_roots_us", fix_roots.as_us() as i64, i64),
);
Ok(())
}
}

// Update the `completed_data_indexes` with a new shred `new_shred_index`. If a
Expand Down
17 changes: 16 additions & 1 deletion ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -977,14 +977,29 @@ fn load_frozen_forks(
).and_then(|supermajority_root| {
if supermajority_root > *root {
// If there's a cluster confirmed root greater than our last
// replayed root, then beccause the cluster confirmed root should
// replayed root, then because the cluster confirmed root should
// be descended from our last root, it must exist in `all_banks`
let cluster_root_bank = all_banks.get(&supermajority_root).unwrap();

// cluster root must be a descendant of our root, otherwise something
// is drastically wrong
assert!(cluster_root_bank.ancestors.contains_key(root));
info!("blockstore processor found new cluster confirmed root: {}, observed in bank: {}", cluster_root_bank.slot(), bank.slot());

// Ensure cluster-confirmed root and parents are set as root in blockstore
let mut rooted_slots = vec![];
let mut new_root_bank = cluster_root_bank.clone();
loop {
if new_root_bank.slot() == *root { break; } // Found the last root in the chain, yay!
assert!(new_root_bank.slot() > *root);

rooted_slots.push(new_root_bank.slot());
// As noted, the cluster confirmed root should be descended from
// our last root; therefore parent should be set
new_root_bank = new_root_bank.parent().unwrap();
}
inc_new_counter_info!("load_frozen_forks-cluster-confirmed-root", rooted_slots.len());
blockstore.set_roots(&rooted_slots).expect("Blockstore::set_roots should succeed");
Some(cluster_root_bank)
} else {
None
Expand Down
8 changes: 8 additions & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1607,6 +1607,13 @@ pub fn main() {
.default_value(&default_rpc_send_transaction_leader_forward_count)
.help("The number of upcoming leaders to which to forward transactions sent via rpc service."),
)
.arg(
Arg::with_name("rpc_scan_and_fix_roots")
.long("rpc-scan-and-fix-roots")
.takes_value(false)
.requires("enable_rpc_transaction_history")
.help("Verifies blockstore roots on boot and fixes any gaps"),
)
.arg(
Arg::with_name("halt_on_trusted_validators_accounts_hash_mismatch")
.long("halt-on-trusted-validators-accounts-hash-mismatch")
Expand Down Expand Up @@ -2093,6 +2100,7 @@ pub fn main() {
.ok()
.map(Duration::from_secs),
account_indexes: account_indexes.clone(),
rpc_scan_and_fix_roots: matches.is_present("rpc_scan_and_fix_roots"),
},
rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| {
(
Expand Down