Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A few retention period fixes and suggestions #19

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
@@ -307,13 +307,39 @@ impl Consensus {
is_consensus_exiting,
};

// TODO (post HF): remove the upgrade
// Database upgrade to include pruning samples
this.pruning_samples_database_upgrade();
// Run database upgrades if any
this.run_database_upgrades();

this
}

/// A procedure for calling database upgrades which are self-contained (i.e., do not require knowing the DB version)
fn run_database_upgrades(&self) {
// Upgrade to initialize the new retention root field correctly
self.retention_root_database_upgrade();

// TODO (post HF): remove this upgrade
// Database upgrade to include pruning samples
self.pruning_samples_database_upgrade();
}

fn retention_root_database_upgrade(&self) {
let mut pruning_point_store = self.pruning_point_store.write();
if pruning_point_store.retention_period_root().unwrap_option().is_none() {
let mut batch = rocksdb::WriteBatch::default();
if self.config.is_archival {
// The retention checkpoint is what was previously known as history root
let retention_checkpoint = pruning_point_store.retention_checkpoint().unwrap();
pruning_point_store.set_retention_period_root(&mut batch, retention_checkpoint).unwrap();
} else {
// For non-archival nodes the retention root was the pruning point
let pruning_point = pruning_point_store.get().unwrap().pruning_point;
pruning_point_store.set_retention_period_root(&mut batch, pruning_point).unwrap();
}
self.db.write(batch).unwrap();
}
}

fn pruning_samples_database_upgrade(&self) {
//
// For the first time this version runs, make sure we populate pruning samples
@@ -622,8 +648,7 @@ impl ConsensusApi for Consensus {
}

fn get_retention_period_root(&self) -> Hash {
let pruning_point_read = self.pruning_point_store.read();
pruning_point_read.retention_period_root().unwrap_or(pruning_point_read.pruning_point().unwrap())
self.pruning_point_store.read().retention_period_root().unwrap()
}

/// Estimates the number of blocks and headers stored in the node database.
55 changes: 22 additions & 33 deletions consensus/src/pipeline/pruning_processor/processor.rs
Original file line number Diff line number Diff line change
@@ -134,24 +134,22 @@ impl PruningProcessor {
fn recover_pruning_workflows_if_needed(&self) {
let pruning_point_read = self.pruning_point_store.read();
let pruning_point = pruning_point_read.pruning_point().unwrap();
let retention_checkpoint = pruning_point_read.retention_checkpoint().unwrap_option();
let retention_period_root = pruning_point_read.retention_period_root().unwrap_or(pruning_point);
let pruning_utxoset_position = self.pruning_utxoset_stores.read().utxoset_position().unwrap_option();
let retention_checkpoint = pruning_point_read.retention_checkpoint().unwrap();
let retention_period_root = pruning_point_read.retention_period_root().unwrap();
let pruning_utxoset_position = self.pruning_utxoset_stores.read().utxoset_position().unwrap();
drop(pruning_point_read);

debug!(
"[PRUNING PROCESSOR] recovery check: current pruning point: {}, retention checkpoint: {:?}, pruning utxoset position: {:?}",
pruning_point, retention_checkpoint, pruning_utxoset_position
);

if let Some(pruning_utxoset_position) = pruning_utxoset_position {
// This indicates the node crashed during a former pruning point move and we need to recover
if pruning_utxoset_position != pruning_point {
info!("Recovering pruning utxo-set from {} to the pruning point {}", pruning_utxoset_position, pruning_point);
if !self.advance_pruning_utxoset(pruning_utxoset_position, pruning_point) {
info!("Interrupted while advancing the pruning point UTXO set: Process is exiting");
return;
}
// This indicates the node crashed during a former pruning point move and we need to recover
if pruning_utxoset_position != pruning_point {
info!("Recovering pruning utxo-set from {} to the pruning point {}", pruning_utxoset_position, pruning_point);
if !self.advance_pruning_utxoset(pruning_utxoset_position, pruning_point) {
info!("Interrupted while advancing the pruning point UTXO set: Process is exiting");
return;
}
}

@@ -161,15 +159,12 @@ impl PruningProcessor {
retention_period_root,
pruning_point
);
if let Some(retention_checkpoint) = retention_checkpoint {
// This indicates the node crashed or was forced to stop during a former data prune operation hence
// we need to complete it
if retention_checkpoint != retention_period_root {
self.prune(pruning_point, retention_period_root);
}
}

// TODO: both `pruning_utxoset_position` and `retention_checkpoint` are new DB keys so for now we assume correct state if the keys are missing
// This indicates the node crashed or was forced to stop during a former data prune operation hence
// we need to complete it
if retention_checkpoint != retention_period_root {
self.prune(pruning_point, retention_period_root);
}
}

fn advance_pruning_point_and_candidate_if_possible(&self, sink_ghostdag_data: CompactGhostdagData) {
@@ -182,8 +177,7 @@ impl PruningProcessor {
);

if !new_pruning_points.is_empty() {
let retention_period_root =
pruning_point_read.retention_period_root().unwrap_or(pruning_point_read.pruning_point().unwrap());
let retention_period_root = pruning_point_read.retention_period_root().unwrap();

// Update past pruning points and pruning point stores
let mut batch = WriteBatch::default();
@@ -549,7 +543,7 @@ impl PruningProcessor {
/// This function is expected to be called only when a new pruning point is determined and right before
/// doing any pruning. Pruning point must be the new pruning point this node is advancing to.
///
/// retention_period_root is guaranteed to be in the past(pruning_point)
/// The returned retention_period_root is guaranteed to be in past(pruning_point) or the pruning point itself.
fn advance_retention_period_root(&self, retention_period_root: Hash, pruning_point: Hash) -> Hash {
match self.config.retention_period_days {
// If the retention period wasn't set, immediately default to the pruning point.
@@ -561,15 +555,19 @@ impl PruningProcessor {
// to this function serves as a clamp.
let retention_period_ms = (retention_period_days * 86400.0 * 1000.0).ceil() as u64;

// The target timestamp we would like to find a point below
let retention_period_root_ts_target = unix_now().saturating_sub(retention_period_ms);

// Iterate from the new pruning point to the prev retention root and search for the first point with enough days above it.
// Note that prev retention root is always a past pruning point, so we can iterate via pruning samples until we reach it.
let mut new_retention_period_root = pruning_point;

trace!(
"Adjusting the retention period root to cover the required retention period. Target timestamp: {}",
retention_period_root_ts_target,
);

while self.reachability_service.is_dag_ancestor_of(retention_period_root, new_retention_period_root) {
while new_retention_period_root != retention_period_root {
let block = new_retention_period_root;

let timestamp = self.headers_store.get_timestamp(block).unwrap();
@@ -580,16 +578,7 @@ impl PruningProcessor {
break;
}

new_retention_period_root =
self.pruning_samples_store.pruning_sample_from_pov(block).unwrap_or(retention_period_root);
}

// We may be at a pruning sample that's in the past or anticone of current retention_period_root. Clamp to retention_period_root here.
// Happens when the node is newly started and retention_period_root itself still doesn't cover the full retention period.
let is_new_root_in_future_of_old =
self.reachability_service.is_dag_ancestor_of(retention_period_root, new_retention_period_root);
if !is_new_root_in_future_of_old {
new_retention_period_root = retention_period_root;
new_retention_period_root = self.pruning_samples_store.pruning_sample_from_pov(block).unwrap();
}

new_retention_period_root
16 changes: 8 additions & 8 deletions kaspad/src/daemon.rs
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ use std::{fs, path::PathBuf, process::exit, sync::Arc, time::Duration};
use async_channel::unbounded;
use kaspa_consensus_core::{
config::ConfigBuilder,
constants::TRANSIENT_BYTE_TO_MASS_FACTOR,
errors::config::{ConfigError, ConfigResult},
};
use kaspa_consensus_notify::{root::ConsensusNotificationRoot, service::NotifyService};
@@ -50,9 +51,9 @@ pub const DESIRED_DAEMON_SOFT_FD_LIMIT: u64 = 8 * 1024;
/// this value may impact the database performance).
pub const MINIMUM_DAEMON_SOFT_FD_LIMIT: u64 = 4 * 1024;

// If set, the retention period days must at least be this value.
// This value is assumed to be greater than all pruning periods.
const MINIMUM_RETENTION_PERIOD_DAYS: f64 = 3.0;
/// If set, the retention period days must be at least this value
/// (otherwise it is meaningless since pruning periods are typically at least 2 days long)
const MINIMUM_RETENTION_PERIOD_DAYS: f64 = 2.0;
const ONE_GIGABYTE: f64 = 1_000_000_000.0;

use crate::args::Args;
@@ -238,8 +239,6 @@ pub fn create_core_with_runtime(runtime: &Runtime, args: &Args, fd_total_budget:
.build(),
);

// TODO: Validate `config` forms a valid set of properties

let app_dir = get_app_dir_from_args(args);
let db_dir = app_dir.join(network.to_prefixed()).join(DEFAULT_DATA_DIR);

@@ -282,7 +281,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm

if !args.archival && args.retention_period_days.is_some() {
let retention_period_days = args.retention_period_days.unwrap();
// Look at only post-fork values
// Look only at post-fork values (which are the worst-case)
let finality_depth = config.finality_depth().after();
let target_time_per_block = config.target_time_per_block().after(); // in ms

@@ -291,10 +290,11 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
let total_blocks = retention_period_milliseconds / target_time_per_block;
// This worst case usage only considers block space. It does not account for usage of
// other stores (reachability, block status, mempool, etc.)
let worst_case_usage = ((total_blocks + finality_depth) * (config.max_block_mass / 4)) as f64 / ONE_GIGABYTE;
let worst_case_usage =
((total_blocks + finality_depth) * (config.max_block_mass / TRANSIENT_BYTE_TO_MASS_FACTOR)) as f64 / ONE_GIGABYTE;

info!(
"Retention period is set to {} days. Disk usage may be up to {:.2} GB for block space per pruning period.",
"Retention period is set to {} days. Disk usage may be up to {:.2} GB for block space required for this period.",
retention_period_days, worst_case_usage
);
} else {