Skip to content

Commit

Permalink
fix(core): gc range queue diffs tail
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick committed Dec 9, 2024
1 parent fa4d748 commit 40f9f43
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 8 deletions.
6 changes: 2 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ tycho-util = { path = "./util", version = "0.1.4" }

[patch.crates-io]
weedb = { version = "0.3.8", git = "https://github.com/broxus/weedb.git", branch = "next-rocksdb" }
everscale-types = { git = "https://github.com/broxus/everscale-types.git", rev = "8ea6b16e81f1ffd88006263327004b891b5ef47c"}

[workspace.lints.rust]
future_incompatible = "warn"
Expand Down Expand Up @@ -241,7 +242,4 @@ opt-level = 3
[profile.dev.package.hashbrown]
opt-level = 3
[profile.dev.package."*"]
opt-level = 1

[patch.crates-io]
everscale-types = { git = "https://github.com/broxus/everscale-types.git", rev = "8ea6b16e81f1ffd88006263327004b891b5ef47c"}
opt-level = 1
2 changes: 1 addition & 1 deletion block-util/src/block/block_stuff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl BlockStuff {
pub const BOOT_OFFSET: Duration = Duration::from_secs(12 * 3600);

pub fn compute_is_persistent(block_utime: u32, prev_utime: u32) -> bool {
block_utime >> 10 != prev_utime >> 10
block_utime >> 17 != prev_utime >> 17
}

pub fn can_use_for_boot(block_utime: u32, now_utime: u32) -> bool {
Expand Down
4 changes: 2 additions & 2 deletions collator/src/collator/do_collate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1272,7 +1272,7 @@ impl CollatorStdImpl {
read_ext_msgs={}, read_int_msgs={}, \
read_new_msgs_from_iterator={}, inserted_new_msgs_to_iterator={} has_unprocessed_messages={}, \
total_execute_msgs_time_mc={}, \
wu_used_for_prepare_msgs_groups={}, wu_used_for_execute: {}, wu_used_for_finalize: {}",
wu_used_for_prepare_msgs_groups={}, wu_used_for_execute: {}, wu_used_for_finalize: {}, diffs_tail_len: {}",
block_id,
collation_data.start_lt, collation_data.next_lt, collation_data.execute_count_all,
collation_data.execute_count_ext, collation_data.ext_msgs_error_count,
Expand All @@ -1283,7 +1283,7 @@ impl CollatorStdImpl {
collation_data.read_ext_msgs_count, collation_data.read_int_msgs_from_iterator_count,
collation_data.read_new_msgs_from_iterator_count, collation_data.inserted_new_msgs_to_iterator_count, final_result.has_unprocessed_messages,
collation_data.total_execute_msgs_time_mc,
execute_result.prepare_groups_wu_total, execute_result.execute_groups_wu_total, finalize_wu_total
execute_result.prepare_groups_wu_total, execute_result.execute_groups_wu_total, finalize_wu_total, collation_data.diff_tail_len,
);

tracing::debug!(
Expand Down
16 changes: 15 additions & 1 deletion core/src/block_strider/subscriber/gc_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tycho_block_util::block::BlockStuff;
use tycho_storage::{BlocksGcType, Storage};
use tycho_util::metrics::HistogramGuard;

use crate::block_strider::subscriber::find_longest_diffs_tail;
use crate::block_strider::{
BlockSubscriber, BlockSubscriberContext, StateSubscriber, StateSubscriberContext,
};
Expand Down Expand Up @@ -254,7 +255,19 @@ impl GcSubscriber {
// TODO: Must be in sync with the largest possible archive size (in mc blocks).
const MIN_SAFE_DISTANCE: u32 = 100;

let safe_distance = std::cmp::max(safe_distance, MIN_SAFE_DISTANCE);
let tail_len = find_longest_diffs_tail(tick.mc_block_id, &storage).await;

let tail_len = tail_len.unwrap_or_else(|e| {
tracing::error!(?e, "failed to find longest diffs tail");
0
}) as u32;

tracing::info!(tail_len, "found longest diffs tail");

let safe_distance = [safe_distance, MIN_SAFE_DISTANCE, tail_len + 1]
.into_iter()
.max()
.unwrap();

// Compute the target masterchain block seqno
let target_seqno = match tick.mc_block_id.seqno.checked_sub(safe_distance) {
Expand Down Expand Up @@ -399,6 +412,7 @@ impl GcSubscriber {
tracing::info!("starting GC for target seqno: {}", target_seqno);

let hist = HistogramGuard::begin("tycho_gc_states_time");

if let Err(e) = storage
.shard_state_storage()
.remove_outdated_states(target_seqno)
Expand Down
57 changes: 57 additions & 0 deletions core/src/block_strider/subscriber/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use futures_util::future::{self, BoxFuture};
use tycho_block_util::archive::ArchiveData;
use tycho_block_util::block::BlockStuff;
use tycho_block_util::state::ShardStateStuff;
use tycho_storage::{BlockHandle, Storage};

pub use self::futures::{OptionHandleFut, OptionPrepareFut};
pub use self::gc_subscriber::{GcSubscriber, ManualGcTrigger};
Expand Down Expand Up @@ -420,3 +421,59 @@ pub mod test {
}
}
}

pub async fn find_longest_diffs_tail(mc_block: BlockId, storage: &Storage) -> Result<usize> {
let mc_block_stuff = load_mc_block_stuff(mc_block, storage).await?;

let shard_block_handles = load_shard_block_handles(&mc_block_stuff, storage).await?;

let mut min_tail_len = None;

for block_handle in shard_block_handles {
let block = storage
.block_storage()
.load_block_data(&block_handle)
.await?;
let tail_len = block.block().out_msg_queue_updates.tail_len as usize;

min_tail_len = Some(min_tail_len.map_or(tail_len, |min: usize| min.min(tail_len)));
}

let mc_tail_len = mc_block_stuff.block().out_msg_queue_updates.tail_len as usize;
let result_tail_len = min_tail_len.map_or(mc_tail_len, |min: usize| mc_tail_len.max(min));

Ok(result_tail_len)
}

async fn load_mc_block_stuff(mc_seqno: BlockId, storage: &Storage) -> Result<BlockStuff> {
let mc_handle = storage.block_handle_storage().load_handle(&mc_seqno);
if let Some(mc_handle) = mc_handle {
let mc_block_stuff = storage.block_storage().load_block_data(&mc_handle).await?;
Ok(mc_block_stuff)
} else {
anyhow::bail!("mc block handle not found: {mc_seqno}");
}
}

async fn load_shard_block_handles(
mc_block_stuff: &BlockStuff,
storage: &Storage,
) -> Result<Vec<BlockHandle>> {
let block_handles = storage.block_handle_storage();
let mut shard_block_handles = Vec::new();

for entry in mc_block_stuff.load_custom()?.shards.latest_blocks() {
let block_id = entry?;
if block_id.seqno == 0 {
continue;
}

let Some(block_handle) = block_handles.load_handle(&block_id) else {
anyhow::bail!("top shard block handle not found: {block_id}");
};

shard_block_handles.push(block_handle);
}

Ok(shard_block_handles)
}
4 changes: 4 additions & 0 deletions storage/src/store/persistent_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,10 @@ impl PersistentStateStorage {
messages.push(queue_diff.zip(&out_messages));
queue_diffs.push(queue_diff.diff().clone());

if tail_len == 1 {
break;
}

let prev_block_id = match top_block_info.load_prev_ref()? {
PrevBlockRef::Single(block_ref) => block_ref.as_block_id(shard_ident),
PrevBlockRef::AfterMerge { .. } => anyhow::bail!("merge not supported yet"),
Expand Down

0 comments on commit 40f9f43

Please sign in to comment.