diff --git a/Cargo.lock b/Cargo.lock index e380c7a92..691322085 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -890,8 +890,7 @@ dependencies = [ [[package]] name = "everscale-types" version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b73dfe88b44fb8dab157b67818568764a4e43f049313eecaf58e13c55e913af" +source = "git+https://github.com/broxus/everscale-types.git?rev=8ea6b16e81f1ffd88006263327004b891b5ef47c#8ea6b16e81f1ffd88006263327004b891b5ef47c" dependencies = [ "ahash", "anyhow", @@ -920,8 +919,7 @@ dependencies = [ [[package]] name = "everscale-types-proc" version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "817dbaf10f56aa40db0f83452c51671bd9f0b8741d56f38459ebb911959437e7" +source = "git+https://github.com/broxus/everscale-types.git?rev=8ea6b16e81f1ffd88006263327004b891b5ef47c#8ea6b16e81f1ffd88006263327004b891b5ef47c" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 6f4d6aa62..bc1b58e19 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -136,6 +136,7 @@ tycho-util = { path = "./util", version = "0.1.4" } [patch.crates-io] weedb = { version = "0.3.8", git = "https://github.com/broxus/weedb.git", branch = "next-rocksdb" } +everscale-types = { git = "https://github.com/broxus/everscale-types.git", rev = "8ea6b16e81f1ffd88006263327004b891b5ef47c"} [workspace.lints.rust] future_incompatible = "warn" @@ -241,4 +242,4 @@ opt-level = 3 [profile.dev.package.hashbrown] opt-level = 3 [profile.dev.package."*"] -opt-level = 1 +opt-level = 1 \ No newline at end of file diff --git a/block-util/src/block/block_proof_stuff.rs b/block-util/src/block/block_proof_stuff.rs index 76077ef18..a0e0101cd 100644 --- a/block-util/src/block/block_proof_stuff.rs +++ b/block-util/src/block/block_proof_stuff.rs @@ -43,6 +43,7 @@ impl BlockProofStuff { state_update: Lazy::new(&MerkleUpdate::default()).unwrap(), out_msg_queue_updates: OutMsgQueueUpdates { diff_hash: Default::default(), + tail_len: 0, }, extra: Lazy::new(&BlockExtra::default()).unwrap(), }; diff --git a/block-util/src/block/block_stuff.rs b/block-util/src/block/block_stuff.rs index 640c92742..a0bae6568 100644 --- a/block-util/src/block/block_stuff.rs +++ b/block-util/src/block/block_stuff.rs @@ -55,6 +55,7 @@ impl BlockStuff { state_update: Lazy::new(&MerkleUpdate::default()).unwrap(), out_msg_queue_updates: OutMsgQueueUpdates { diff_hash: Default::default(), + tail_len: 0, }, extra: Lazy::new(&BlockExtra::default()).unwrap(), }; diff --git a/block-util/src/lib.rs b/block-util/src/lib.rs index 4e003ab75..0ea3f82b7 100644 --- a/block-util/src/lib.rs +++ b/block-util/src/lib.rs @@ -4,5 +4,4 @@ pub mod config; pub mod dict; pub mod queue; pub mod state; - pub mod tl; diff --git a/block-util/src/queue/proto.rs b/block-util/src/queue/proto.rs index f7089e665..fc2fa022a 100644 --- a/block-util/src/queue/proto.rs +++ b/block-util/src/queue/proto.rs @@ -22,7 +22,8 @@ pub struct QueueDiff { /// Seqno of the corresponding block. pub seqno: u32, /// collator boundaries. - pub processed_upto: BTreeMap, + // TODO: should rename field in `proto.tl` on network reset + pub processed_to: BTreeMap, /// Min message queue key. pub min_message: QueueKey, /// Max message queue key. @@ -55,7 +56,7 @@ impl TlWrite for QueueDiff { 4 + tl::hash_bytes::SIZE_HINT + tl::shard_ident::SIZE_HINT + 4 - + processed_upto_map::size_hint(&self.processed_upto) + + processed_to_map::size_hint(&self.processed_to) + 2 * QueueKey::SIZE_HINT + messages_list::size_hint(&self.messages) } @@ -68,7 +69,7 @@ impl TlWrite for QueueDiff { tl::hash_bytes::write(&self.prev_hash, packet); tl::shard_ident::write(&self.shard_ident, packet); packet.write_u32(self.seqno); - processed_upto_map::write(&self.processed_upto, packet); + processed_to_map::write(&self.processed_to, packet); self.min_message.write_to(packet); self.max_message.write_to(packet); messages_list::write(&self.messages, packet); @@ -90,7 +91,7 @@ impl<'tl> TlRead<'tl> for QueueDiff { prev_hash: tl::hash_bytes::read(data, offset)?, shard_ident: tl::shard_ident::read(data, offset)?, seqno: u32::read_from(data, offset)?, - processed_upto: processed_upto_map::read(data, offset)?, + processed_to: processed_to_map::read(data, offset)?, min_message: QueueKey::read_from(data, offset)?, max_message: QueueKey::read_from(data, offset)?, messages: messages_list::read(data, offset)?, @@ -205,7 +206,7 @@ impl std::fmt::Display for QueueKey { } } -mod processed_upto_map { +mod processed_to_map { use tl_proto::{TlPacket, TlResult}; use super::*; @@ -222,9 +223,9 @@ mod processed_upto_map { pub fn write(items: &BTreeMap, packet: &mut P) { packet.write_u32(items.len() as u32); - for (shard_ident, processed_upto) in items { + for (shard_ident, processed_to) in items { tl::shard_ident::write(shard_ident, packet); - processed_upto.write_to(packet); + processed_to.write_to(packet); } } @@ -445,7 +446,7 @@ mod tests { prev_hash: HashBytes::from([0x33; 32]), shard_ident: ShardIdent::MASTERCHAIN, seqno: 123, - processed_upto: BTreeMap::from([ + processed_to: BTreeMap::from([ (ShardIdent::MASTERCHAIN, QueueKey { lt: 1, hash: HashBytes::from([0x11; 32]), @@ -491,7 +492,7 @@ mod tests { prev_hash, shard_ident: ShardIdent::MASTERCHAIN, seqno, - processed_upto: BTreeMap::from([ + processed_to: BTreeMap::from([ (ShardIdent::MASTERCHAIN, QueueKey { lt: 10 * seqno as u64, hash: HashBytes::from([seqno as u8; 32]), diff --git a/block-util/src/queue/queue_diff.rs b/block-util/src/queue/queue_diff.rs index 3aadf98a5..e73cb56da 100644 --- a/block-util/src/queue/queue_diff.rs +++ b/block-util/src/queue/queue_diff.rs @@ -27,11 +27,11 @@ impl QueueDiffStuffBuilder { } // TODO: Use iterator of `(ShardIdent, QueueKey)`? - pub fn with_processed_upto<'a, I>(mut self, processed_upto: I) -> Self + pub fn with_processed_to<'a, I>(mut self, processed_to: I) -> Self where I: IntoIterator, { - self.inner_mut().diff.processed_upto = processed_upto + self.inner_mut().diff.processed_to = processed_to .into_iter() .map(|(shard_ident, lt, hash)| (shard_ident, QueueKey { lt, hash: *hash })) .collect(); @@ -79,6 +79,14 @@ impl SerializedQueueDiff { &self.inner.diff.hash } + pub fn processed_to(&self) -> impl Iterator { + self.inner + .diff + .processed_to + .iter() + .map(|(shard_ident, key)| (*shard_ident, key)) + } + fn inner_mut(&mut self) -> &mut Inner { Arc::get_mut(&mut self.inner).expect("inner is not shared") } @@ -102,7 +110,7 @@ impl QueueDiffStuff { prev_hash: HashBytes::ZERO, shard_ident: block_id.shard, seqno: block_id.seqno, - processed_upto: BTreeMap::from([(block_id.shard, QueueKey::MIN)]), + processed_to: BTreeMap::from([(block_id.shard, QueueKey::MIN)]), min_message: QueueKey::MIN, max_message: QueueKey::MIN, messages: Vec::new(), @@ -134,7 +142,7 @@ impl QueueDiffStuff { prev_hash: *prev_hash, shard_ident, seqno, - processed_upto: Default::default(), + processed_to: Default::default(), min_message: Default::default(), max_message: Default::default(), messages: Default::default(), @@ -361,7 +369,7 @@ mod tests { prev_hash: HashBytes::ZERO, shard_ident: ShardIdent::BASECHAIN, seqno: 1, - processed_upto: Default::default(), + processed_to: Default::default(), min_message: QueueKey { lt: 0, hash: message_hashes[0], diff --git a/cli/src/node/mod.rs b/cli/src/node/mod.rs index df1df50b4..49df4b9fd 100644 --- a/cli/src/node/mod.rs +++ b/cli/src/node/mod.rs @@ -11,8 +11,8 @@ use tycho_block_util::block::BlockIdRelation; use tycho_block_util::state::MinRefMcStateTracker; use tycho_collator::collator::CollatorStdImplFactory; use tycho_collator::internal_queue::queue::{QueueConfig, QueueFactory, QueueFactoryStdImpl}; -use tycho_collator::internal_queue::state::persistent_state::PersistentStateImplFactory; -use tycho_collator::internal_queue::state::session_state::SessionStateImplFactory; +use tycho_collator::internal_queue::state::commited_state::CommittedStateImplFactory; +use tycho_collator::internal_queue::state::uncommitted_state::UncommittedStateImplFactory; use tycho_collator::manager::CollationManager; use tycho_collator::mempool::MempoolAdapterStdImpl; use tycho_collator::queue_adapter::MessageQueueAdapterStdImpl; @@ -316,12 +316,12 @@ impl Node { // Create collator tracing::info!("starting collator"); - let session_state_factory = SessionStateImplFactory::new(self.storage.clone()); - let persistent_state_factory = PersistentStateImplFactory::new(self.storage.clone()); + let session_state_factory = UncommittedStateImplFactory::new(self.storage.clone()); + let persistent_state_factory = CommittedStateImplFactory::new(self.storage.clone()); let queue_factory = QueueFactoryStdImpl { - session_state_factory, - persistent_state_factory, + uncommitted_state_factory: session_state_factory, + committed_state_factory: persistent_state_factory, config: self.internal_queue_config, }; let queue = queue_factory.create(); diff --git a/collator/src/collator/do_collate/finalize.rs b/collator/src/collator/do_collate/finalize.rs index 649645946..05cdd1010 100644 --- a/collator/src/collator/do_collate/finalize.rs +++ b/collator/src/collator/do_collate/finalize.rs @@ -15,6 +15,7 @@ use tycho_block_util::queue::{QueueDiffStuff, QueueKey, SerializedQueueDiff}; use tycho_block_util::state::ShardStateStuff; use tycho_consensus::prelude::ConsensusConfigExt; use tycho_util::metrics::HistogramGuard; +use tycho_util::FastHashMap; use super::phase::{Phase, PhaseState}; use super::PrevData; @@ -37,6 +38,16 @@ pub struct FinalizeState { impl PhaseState for FinalizeState {} +pub struct FinalizeBlockContext { + pub collation_session: Arc, + pub wu_used_from_last_anchor: u64, + pub usage_tree: UsageTree, + pub queue_diff: SerializedQueueDiff, + pub collator_config: Arc, + pub executor: MessagesExecutor, + pub diff_tail_len: u32, +} + impl Phase { pub fn update_queue_diff( &mut self, @@ -85,9 +96,9 @@ impl Phase { self.state.collation_data.block_id_short.seqno, &prev_hash, ) - .with_processed_upto( + .with_processed_to( diff_with_messages - .processed_upto + .processed_to .iter() .map(|(k, v)| (*k, v.lt, &v.hash)), ) @@ -98,6 +109,8 @@ impl Phase { ) .serialize(); + let diff_processed_to = diff_with_messages.processed_to.clone(); + let queue_diff_hash = *queue_diff.hash(); tracing::debug!(target: tracing_targets::COLLATOR, queue_diff_hash = %queue_diff_hash); @@ -115,7 +128,12 @@ impl Phase { &labels, ); - mq_adapter.apply_diff(diff_with_messages, block_id_short, &queue_diff_hash)?; + mq_adapter.apply_diff( + diff_with_messages, + block_id_short, + &queue_diff_hash, + max_message, + )?; let apply_queue_diff_elapsed = histogram.finish(); Ok(apply_queue_diff_elapsed) @@ -128,6 +146,7 @@ impl Phase { has_unprocessed_messages, diff_messages_len, create_queue_diff_elapsed, + processed_to: diff_processed_to, }, update_queue_task, )) @@ -135,15 +154,20 @@ impl Phase { pub fn finalize_block( mut self, - collation_session: Arc, - wu_used_from_last_anchor: u64, - usage_tree: UsageTree, - queue_diff: SerializedQueueDiff, - collator_config: Arc, - executor: MessagesExecutor, + ctx: FinalizeBlockContext, ) -> Result<(FinalizedBlock, ExecuteResult)> { tracing::debug!(target: tracing_targets::COLLATOR, "finalize_block()"); + let FinalizeBlockContext { + collation_session, + wu_used_from_last_anchor, + usage_tree, + queue_diff, + collator_config, + executor, + diff_tail_len, + } = ctx; + let wu_params_finalize = self .state .collation_config @@ -151,6 +175,7 @@ impl Phase { .finalize .clone(); + let processed_to = queue_diff.processed_to().map(|(k, v)| (k, *v)).collect(); let shard = self.state.collation_data.block_id_short.shard; let labels = &[("workchain", shard.workchain().to_string())]; @@ -483,6 +508,8 @@ impl Phase { None }; + self.state.collation_data.diff_tail_len = diff_tail_len; + // construct block let block = Block { global_id: self.state.mc_data.global_id, @@ -492,6 +519,7 @@ impl Phase { // do not use out msgs queue updates out_msg_queue_updates: OutMsgQueueUpdates { diff_hash: *queue_diff.hash(), + tail_len: diff_tail_len, }, extra: Lazy::new(&new_block_extra)?, }; @@ -555,6 +583,30 @@ impl Phase { self.state.collation_data.processed_upto.externals.as_ref(), ); + let mut shards_processed_to = FastHashMap::default(); + + for (shard_id, _) in shards.iter() { + // Extract processed information for updated shards + let shard_processed_to = self + .state + .collation_data + .top_shard_blocks + .iter() + .find(|top_block_info| top_block_info.block_id.shard == *shard_id) + .map(|top_block_info| top_block_info.processed_to.clone()) + .or_else(|| { + self.state + .mc_data + .shards_processed_to + .get(shard_id) + .cloned() + }); + + if let Some(value) = shard_processed_to { + shards_processed_to.insert(*shard_id, value); + } + } + Some(Arc::new(McData { global_id: new_block.as_ref().global_id, block_id: *new_block.id(), @@ -573,8 +625,8 @@ impl Phase { processed_upto: self.state.collation_data.processed_upto.clone(), top_processed_to_anchor, - ref_mc_state_handle: self.state.prev_shard_data.ref_mc_state_handle().clone(), + shards_processed_to, })) } }; @@ -587,7 +639,13 @@ impl Phase { block: new_block, is_key_block: new_block_info.key_block, prev_blocks_ids: self.state.prev_shard_data.blocks_ids().clone(), - top_shard_blocks_ids: self.state.collation_data.top_shard_blocks_ids.clone(), + top_shard_blocks_ids: self + .state + .collation_data + .top_shard_blocks + .iter() + .map(|b| b.block_id) + .collect(), collated_data, collated_file_hash: HashBytes::ZERO, chain_time: self.state.collation_data.get_gen_chain_time(), @@ -606,6 +664,7 @@ impl Phase { || self.state.mc_data.consensus_info, |mcd| mcd.consensus_info, ), + processed_to, }); let total_elapsed = histogram.finish(); diff --git a/collator/src/collator/do_collate/mod.rs b/collator/src/collator/do_collate/mod.rs index ddf0be459..3f34bc6da 100644 --- a/collator/src/collator/do_collate/mod.rs +++ b/collator/src/collator/do_collate/mod.rs @@ -10,6 +10,7 @@ use humantime::format_duration; use phase::{ActualState, Phase}; use prepare::PrepareState; use sha2::Digest; +use tycho_block_util::queue::QueueKey; use tycho_block_util::state::MinRefMcStateTracker; use tycho_storage::{NewBlockMeta, StoreStateHint}; use tycho_util::futures::JoinTask; @@ -22,6 +23,7 @@ use super::types::{ FinalizedCollationResult, ParsedExternals, PrevData, ReadNextExternalsMode, WorkingState, }; use super::CollatorStdImpl; +use crate::collator::do_collate::finalize::FinalizeBlockContext; use crate::collator::types::{ AnchorInfo, BlockCollationData, FinalResult, ParsedMessage, ShardDescriptionExt, UpdateQueueDiffResult, @@ -31,7 +33,8 @@ use crate::queue_adapter::MessageQueueAdapter; use crate::tracing_targets; use crate::types::{ BlockCollationResult, BlockIdExt, CollationSessionInfo, CollatorConfig, - DisplayBlockIdsIntoIter, DisplayBlockIdsIter, McData, TopBlockDescription, + DisplayBlockIdsIntoIter, DisplayBlockIdsIter, McData, ProcessedTo, ShardDescriptionShort, + TopBlockDescription, TopShardBlockInfo, }; #[cfg(test)] @@ -214,7 +217,7 @@ impl CollatorStdImpl { } fn run( - config: Arc, + collator_config: Arc, mq_adapter: Arc>, anchors_cache: AnchorsCache, state: Box, @@ -224,12 +227,12 @@ impl CollatorStdImpl { ) -> Result { let shard_id = state.shard_id; let labels: [(&str, String); 1] = [("workchain", shard_id.workchain().to_string())]; - + let mc_data = state.mc_data.clone(); let prepare_histogram = HistogramGuard::begin_with_labels("tycho_do_collate_prepare_time", &labels); let prepare_phase = - Phase::::new(config.clone(), mq_adapter, anchors_cache, state); + Phase::::new(collator_config.clone(), mq_adapter, anchors_cache, state); let mut execute_phase = prepare_phase.run()?; @@ -278,25 +281,48 @@ impl CollatorStdImpl { has_unprocessed_messages, diff_messages_len, create_queue_diff_elapsed, + processed_to, }, update_queue_task, - ) = finalize_phase.update_queue_diff(mq_iterator_adapter, shard_id, mq_adapter)?; + ) = finalize_phase.update_queue_diff(mq_iterator_adapter, shard_id, mq_adapter.clone())?; let finalize_block_timer = std::time::Instant::now(); + // Get current and masterchain processed values + let current_processed_to = processed_to.get(&shard_id).cloned(); + let mc_processed_to = mc_data + .processed_upto + .internals + .get(&shard_id) + .map(|mc| mc.processed_to_msg); + + let min_processed_to = calculate_min_processed_to( + &shard_id, + current_processed_to, + mc_processed_to, + &mc_data.shards, + &mc_data.shards_processed_to, + ); + + if let Some(value) = min_processed_to { + mq_adapter.trim_diffs(&shard_id, &value)?; + }; + let diff_tail_len = mq_adapter.get_diff_count_by_shard(&shard_id) as u32 + 1; + let span = tracing::Span::current(); let (finalize_phase_result, update_queue_task_result) = rayon::join( || { let _span = span.enter(); - finalize_phase.finalize_block( + finalize_phase.finalize_block(FinalizeBlockContext { collation_session, wu_used_from_last_anchor, usage_tree, queue_diff, - config, + collator_config, executor, - ) + diff_tail_len, + }) }, // wait update queue task before returning collation result // to be sure that queue was updated before block commit and next block collation @@ -841,6 +867,7 @@ impl CollatorStdImpl { proof_funds, #[cfg(feature = "block-creator-stats")] creators, + processed_to, } = top_block_descr; let mut new_shard_descr = Box::new(ShardDescription::from_block_info( @@ -898,7 +925,14 @@ impl CollatorStdImpl { collation_data_builder.store_shard_fees(shard_id, proof_funds)?; } - collation_data_builder.top_shard_blocks_ids.push(block_id); + let top_shard_block_info = TopShardBlockInfo { + block_id, + processed_to, + }; + + collation_data_builder + .top_shard_blocks + .push(top_shard_block_info); #[cfg(feature = "block-creator-stats")] collation_data_builder.register_shard_block_creators(creators)?; } @@ -1166,6 +1200,8 @@ impl CollatorStdImpl { .increment(collation_data.execute_count_new_int); metrics::gauge!("tycho_do_collate_block_seqno", &labels) .set(collation_data.block_id_short.seqno); + metrics::gauge!("tycho_do_collate_block_diff_tail_len", &labels) + .set(collation_data.diff_tail_len); } fn wu_metrics( @@ -1236,7 +1272,7 @@ impl CollatorStdImpl { read_ext_msgs={}, read_int_msgs={}, \ read_new_msgs_from_iterator={}, inserted_new_msgs_to_iterator={} has_unprocessed_messages={}, \ total_execute_msgs_time_mc={}, \ - wu_used_for_prepare_msgs_groups={}, wu_used_for_execute: {}, wu_used_for_finalize: {}", + wu_used_for_prepare_msgs_groups={}, wu_used_for_execute: {}, wu_used_for_finalize: {}, diffs_tail_len: {}", block_id, collation_data.start_lt, collation_data.next_lt, collation_data.execute_count_all, collation_data.execute_count_ext, collation_data.ext_msgs_error_count, @@ -1247,7 +1283,7 @@ impl CollatorStdImpl { collation_data.read_ext_msgs_count, collation_data.read_int_msgs_from_iterator_count, collation_data.read_new_msgs_from_iterator_count, collation_data.inserted_new_msgs_to_iterator_count, final_result.has_unprocessed_messages, collation_data.total_execute_msgs_time_mc, - execute_result.prepare_groups_wu_total, execute_result.execute_groups_wu_total, finalize_wu_total + execute_result.prepare_groups_wu_total, execute_result.execute_groups_wu_total, finalize_wu_total, collation_data.diff_tail_len, ); tracing::debug!( @@ -1305,3 +1341,71 @@ impl CollatorStdImpl { ); } } + +fn calculate_min_processed_to( + shard_id: &ShardIdent, + current_processed_to: Option, + mc_processed_to: Option, + mc_data_shards: &Vec<(ShardIdent, ShardDescriptionShort)>, + mc_data_shards_processed_to: &FastHashMap, +) -> Option { + fn find_min_processed_to( + shards: &Vec<(ShardIdent, ShardDescriptionShort)>, + shards_processed_to: &FastHashMap, + shard_id: &ShardIdent, + min_processed_to: &mut Option, + skip_condition: impl Fn(&ShardIdent) -> bool, + ) { + // Iterate through shards with updated top shard blocks and find min processed_to + for (shard, descr) in shards { + if skip_condition(shard) { + continue; + } + + if descr.top_sc_block_updated { + if let Some(value) = shards_processed_to.get(shard) { + if let Some(v) = value.get(shard_id) { + *min_processed_to = match *min_processed_to { + Some(current_min) => Some(current_min.min(*v)), + None => Some(*v), + }; + } + } + } + } + } + + let mut min_processed_to: Option = None; + + if shard_id.is_masterchain() { + find_min_processed_to( + mc_data_shards, + mc_data_shards_processed_to, + shard_id, + &mut min_processed_to, + |_| false, + ); + + // Combine with current and masterchain values + min_processed_to = [current_processed_to, min_processed_to] + .into_iter() + .flatten() + .min(); + } else { + find_min_processed_to( + mc_data_shards, + mc_data_shards_processed_to, + shard_id, + &mut min_processed_to, + |shard| shard == shard_id || shard.is_masterchain(), + ); + + // Combine with current and masterchain values and shard values + min_processed_to = [current_processed_to, min_processed_to, mc_processed_to] + .into_iter() + .flatten() + .min(); + } + + min_processed_to +} diff --git a/collator/src/collator/tests/collator_tests.rs b/collator/src/collator/tests/collator_tests.rs index 992257a0b..1fee3df0a 100644 --- a/collator/src/collator/tests/collator_tests.rs +++ b/collator/src/collator/tests/collator_tests.rs @@ -316,6 +316,7 @@ fn test_get_anchors_processing_info() { consensus_info: Default::default(), top_processed_to_anchor: 0, ref_mc_state_handle: tracker.insert(0), + shards_processed_to: Default::default(), }; //------ diff --git a/collator/src/collator/tests/do_collate_tests.rs b/collator/src/collator/tests/do_collate_tests.rs index e24d824c4..8e9778662 100644 --- a/collator/src/collator/tests/do_collate_tests.rs +++ b/collator/src/collator/tests/do_collate_tests.rs @@ -1,16 +1,19 @@ +use std::collections::BTreeMap; use std::sync::Arc; use everscale_types::models::*; use everscale_types::prelude::*; +use tycho_block_util::queue::QueueKey; +use tycho_util::FastHashMap; +use crate::collator::do_collate::calculate_min_processed_to; use crate::collator::types::{ BlockCollationData, BlockCollationDataBuilder, ParsedExternals, ReadNextExternalsMode, }; use crate::collator::{AnchorsCache, CollatorStdImpl}; use crate::mempool::make_stub_anchor; use crate::test_utils::try_init_test_tracing; -use crate::types::supported_capabilities; - +use crate::types::{supported_capabilities, ShardDescriptionShort}; pub(crate) fn fill_test_anchors_cache(anchors_cache: &mut AnchorsCache, shard_id: ShardIdent) { let mut prev_anchor_id = 0; for anchor_id in 1..=40 { @@ -296,3 +299,138 @@ fn test_read_next_externals() { let kv = anchors_cache.get(0); assert!(kv.is_none()); } + +#[test] +fn test_calculate_min_processed_to_masterchain() { + // Mock data for masterchain test + let shard_id = ShardIdent::MASTERCHAIN; + + let current_processed_to = Some(QueueKey::max_for_lt(5)); + let mc_processed_to = Some(QueueKey::max_for_lt(5)); + + let updated_shard = ShardIdent::new_full(0); + let not_updated_shard = ShardIdent::new_full(1); + + let mc_data_shards = vec![ + (updated_shard, ShardDescriptionShort { + top_sc_block_updated: true, + ..Default::default() + }), + (not_updated_shard, ShardDescriptionShort { + top_sc_block_updated: false, + ..Default::default() + }), + ]; + + let mut mc_data_shards_processed_to = FastHashMap::default(); + let mut processed_to = BTreeMap::new(); + processed_to.insert(shard_id, QueueKey::max_for_lt(4)); + + // check updated + mc_data_shards_processed_to.insert(updated_shard, processed_to); + + let result = calculate_min_processed_to( + &shard_id, + current_processed_to, + mc_processed_to, + &mc_data_shards, + &mc_data_shards_processed_to, + ); + + // updated shard should override current_processed_to + assert_eq!(result, Some(QueueKey::max_for_lt(4))); + + let mut mc_data_shards_processed_to = FastHashMap::default(); + let mut processed_to = BTreeMap::new(); + processed_to.insert(shard_id, QueueKey::max_for_lt(4)); + + // check updated + mc_data_shards_processed_to.insert(not_updated_shard, processed_to); + + let result = calculate_min_processed_to( + &shard_id, + current_processed_to, + mc_processed_to, + &mc_data_shards, + &mc_data_shards_processed_to, + ); + + // not updated shard should not override current_processed_to + assert_eq!(result, Some(QueueKey::max_for_lt(5))); +} + +#[test] +fn test_calculate_min_processed_to_shard() { + // Mock data for shard test + let shard_id = ShardIdent::new_full(2); + + let current_processed_to = Some(QueueKey::max_for_lt(10)); + + let updated_shard = ShardIdent::new_full(3); + let not_updated_shard = ShardIdent::new_full(4); + + let mc_data_shards = vec![ + (updated_shard, ShardDescriptionShort { + top_sc_block_updated: true, + ..Default::default() + }), + (not_updated_shard, ShardDescriptionShort { + top_sc_block_updated: false, + ..Default::default() + }), + ]; + + let mut mc_data_shards_processed_to = FastHashMap::default(); + let mut processed_to = BTreeMap::new(); + processed_to.insert(shard_id, QueueKey::max_for_lt(8)); + let mc_processed_to = Some(QueueKey::max_for_lt(9)); + + // Check updated shard + mc_data_shards_processed_to.insert(updated_shard, processed_to); + + let result = calculate_min_processed_to( + &shard_id, + current_processed_to, + mc_processed_to, + &mc_data_shards, + &mc_data_shards_processed_to, + ); + + // Updated shard should override current_processed_to + assert_eq!(result, Some(QueueKey::max_for_lt(8))); + + // Reset processed_to for not-updated shard + let mut mc_data_shards_processed_to = FastHashMap::default(); + let mut processed_to = BTreeMap::new(); + processed_to.insert(shard_id, QueueKey::max_for_lt(8)); + let mc_processed_to = Some(QueueKey::max_for_lt(11)); + // Check not updated shard + mc_data_shards_processed_to.insert(not_updated_shard, processed_to); + + let result = calculate_min_processed_to( + &shard_id, + current_processed_to, + mc_processed_to, + &mc_data_shards, + &mc_data_shards_processed_to, + ); + + // Not updated shard should not override current_processed_to + assert_eq!(result, Some(QueueKey::max_for_lt(10))); + + // Verify combination with masterchain value + let mc_data_shards_processed_to = FastHashMap::default(); + + let mc_processed_to = Some(QueueKey::max_for_lt(9)); + + let result = calculate_min_processed_to( + &shard_id, + current_processed_to, + mc_processed_to, + &mc_data_shards, + &mc_data_shards_processed_to, + ); + + // Minimum value should be returned + assert_eq!(result, Some(QueueKey::max_for_lt(9))); +} diff --git a/collator/src/collator/tests/execution_manager_tests.rs b/collator/src/collator/tests/execution_manager_tests.rs index 913b8f24f..053d02096 100644 --- a/collator/src/collator/tests/execution_manager_tests.rs +++ b/collator/src/collator/tests/execution_manager_tests.rs @@ -98,6 +98,7 @@ impl MessageQueueAdapter for MessageQueueA _diff: QueueDiffWithMessages, _block_id_short: BlockIdShort, _diff_hash: &HashBytes, + _end_key: QueueKey, ) -> Result<()> { unimplemented!() } @@ -122,7 +123,15 @@ impl MessageQueueAdapter for MessageQueueA unimplemented!() } - fn clear_session_state(&self) -> Result<()> { + fn clear_uncommitted_state(&self) -> Result<()> { + unimplemented!() + } + + fn trim_diffs(&self, _source_shard: &ShardIdent, _inclusive_until: &QueueKey) -> Result<()> { + unimplemented!() + } + + fn get_diff_count_by_shard(&self, _shard_ident: &ShardIdent) -> usize { unimplemented!() } } @@ -229,6 +238,7 @@ fn gen_stub_working_state( }, top_processed_to_anchor: 0, ref_mc_state_handle: prev_shard_data.ref_mc_state_handle().clone(), + shards_processed_to: Default::default(), }), collation_config: Arc::new(Default::default()), wu_used_from_last_anchor: 0, diff --git a/collator/src/collator/types.rs b/collator/src/collator/types.rs index 56103e635..2f060e100 100644 --- a/collator/src/collator/types.rs +++ b/collator/src/collator/types.rs @@ -23,7 +23,9 @@ use tycho_util::FastHashMap; use crate::mempool::{MempoolAnchor, MempoolAnchorId}; use crate::tracing_targets; -use crate::types::{BlockCandidate, McData, ProcessedUptoInfoStuff, ProofFunds}; +use crate::types::{ + BlockCandidate, McData, ProcessedTo, ProcessedUptoInfoStuff, ProofFunds, TopShardBlockInfo, +}; pub(super) struct WorkingState { pub next_block_id_short: BlockIdShort, @@ -204,7 +206,7 @@ pub(super) struct BlockCollationDataBuilder { pub block_create_count: FastHashMap, pub created_by: HashBytes, pub global_version: GlobalVersion, - pub top_shard_blocks_ids: Vec, + pub top_shard_blocks: Vec, /// Mempool config override for a new genesis pub mempool_config_override: Option, @@ -239,7 +241,7 @@ impl BlockCollationDataBuilder { created_by, global_version, shards: None, - top_shard_blocks_ids: vec![], + top_shard_blocks: vec![], mempool_config_override, } } @@ -299,7 +301,7 @@ impl BlockCollationDataBuilder { created_by: self.created_by, global_version: self.global_version, shards: self.shards, - top_shard_blocks_ids: self.top_shard_blocks_ids, + top_shard_blocks: self.top_shard_blocks, shard_fees: self.shard_fees, value_flow: self.value_flow, block_limit, @@ -328,6 +330,7 @@ impl BlockCollationDataBuilder { mempool_config_override: self.mempool_config_override, #[cfg(feature = "block-creator-stats")] block_create_count: self.block_create_count, + diff_tail_len: 0, } } } @@ -373,7 +376,7 @@ pub(super) struct BlockCollationData { pub processed_upto: ProcessedUptoInfoStuff, /// Ids of top blocks from shards that be included in the master block - pub top_shard_blocks_ids: Vec, + pub top_shard_blocks: Vec, shards: Option>>, @@ -398,6 +401,7 @@ pub(super) struct BlockCollationData { #[cfg(feature = "block-creator-stats")] pub block_create_count: FastHashMap, + pub diff_tail_len: u32, } impl BlockCollationData { @@ -1289,6 +1293,7 @@ pub struct UpdateQueueDiffResult { pub has_unprocessed_messages: bool, pub diff_messages_len: usize, pub create_queue_diff_elapsed: Duration, + pub processed_to: ProcessedTo, } pub struct FinalizedCollationResult { diff --git a/collator/src/internal_queue/gc.rs b/collator/src/internal_queue/gc.rs index 4532b8a7e..989b37b7c 100644 --- a/collator/src/internal_queue/gc.rs +++ b/collator/src/internal_queue/gc.rs @@ -7,7 +7,7 @@ use tokio::time::Duration; use tycho_block_util::queue::QueueKey; use tycho_util::metrics::HistogramGuard; -use crate::internal_queue::state::persistent_state::PersistentState; +use crate::internal_queue::state::commited_state::CommittedState; use crate::internal_queue::types::InternalMessageValue; use crate::tracing_targets; @@ -18,7 +18,7 @@ pub struct GcManager { impl GcManager { pub fn start( - persistent_state: Arc>, + committed_state: Arc>, execution_interval: Duration, ) -> Self { let delete_until = Arc::new(Mutex::new(GcRange::new())); @@ -33,11 +33,12 @@ impl GcManager { interval.tick().await; let delete_until = delete_until.lock().unwrap().clone(); - - let gc_state = gc_state.clone(); - let persistent_state = persistent_state.clone(); - tokio::task::spawn_blocking(move || { - gc_task(gc_state, persistent_state, delete_until); + tokio::task::spawn_blocking({ + let gc_state = gc_state.clone(); + let committed_state = committed_state.clone(); + move || { + gc_task(gc_state, committed_state, delete_until); + } }) .await .unwrap(); @@ -65,7 +66,7 @@ impl Drop for GcManager { fn gc_task( gc_state: Arc>, - persistent_state: Arc>, + committed_state: Arc>, delete_until: HashMap, ) { let _histogram = HistogramGuard::begin("tycho_internal_queue_gc_execute_task_time"); @@ -78,7 +79,7 @@ fn gc_task( .map_or(true, |last_key| *current_last_key > *last_key); if can_delete { - if let Err(e) = persistent_state.delete_messages(*shard, current_last_key) { + if let Err(e) = committed_state.delete_messages(*shard, current_last_key) { tracing::error!(target: tracing_targets::MQ, "failed to delete messages: {e:?}"); } diff --git a/collator/src/internal_queue/iterator.rs b/collator/src/internal_queue/iterator.rs index 0fb5d9856..b337e5a37 100644 --- a/collator/src/internal_queue/iterator.rs +++ b/collator/src/internal_queue/iterator.rs @@ -130,7 +130,7 @@ impl QueueIterator for QueueIteratorImpl { // fill processed_upto for (shard_id, message_key) in self.last_processed_message.iter() { // TODO: may be `diff.processed_upto` should be a HashMap and we can consume it from iterator - diff.processed_upto.insert(*shard_id, *message_key); + diff.processed_to.insert(*shard_id, *message_key); } // move new messages @@ -162,7 +162,7 @@ impl QueueIterator for QueueIteratorImpl { for (shard_id, message_key) in self.last_processed_message.iter() { // TODO: may be `diff.processed_upto` should be a HashMap and we can consume it from iterator - diff.processed_upto.insert(*shard_id, *message_key); + diff.processed_to.insert(*shard_id, *message_key); } diff.messages = self.new_messages.clone(); diff --git a/collator/src/internal_queue/queue.rs b/collator/src/internal_queue/queue.rs index e08666d13..53549c03f 100644 --- a/collator/src/internal_queue/queue.rs +++ b/collator/src/internal_queue/queue.rs @@ -11,15 +11,16 @@ use tycho_block_util::queue::QueueKey; use tycho_util::{serde_helpers, FastDashMap, FastHashMap}; use crate::internal_queue::gc::GcManager; -use crate::internal_queue::state::persistent_state::{ - PersistentState, PersistentStateFactory, PersistentStateImplFactory, PersistentStateStdImpl, -}; -use crate::internal_queue::state::session_state::{ - SessionState, SessionStateFactory, SessionStateImplFactory, SessionStateStdImpl, +use crate::internal_queue::state::commited_state::{ + CommittedState, CommittedStateFactory, CommittedStateImplFactory, CommittedStateStdImpl, }; use crate::internal_queue::state::state_iterator::StateIterator; +use crate::internal_queue::state::uncommitted_state::{ + UncommittedState, UncommittedStateFactory, UncommittedStateImplFactory, UncommittedStateStdImpl, +}; use crate::internal_queue::types::{InternalMessageValue, QueueDiffWithMessages}; - +use crate::tracing_targets; +use crate::types::ProcessedTo; // FACTORY #[derive(Debug, Serialize, Deserialize)] @@ -56,8 +57,8 @@ where } pub struct QueueFactoryStdImpl { - pub session_state_factory: SessionStateImplFactory, - pub persistent_state_factory: PersistentStateImplFactory, + pub uncommitted_state_factory: UncommittedStateImplFactory, + pub committed_state_factory: CommittedStateImplFactory, pub config: QueueConfig, } @@ -68,39 +69,49 @@ pub trait LocalQueue where V: InternalMessageValue + Send + Sync, { + /// Create iterator for specified shard and return it fn iterator( &self, ranges: &FastHashMap, for_shard_id: ShardIdent, ) -> Vec>>; + /// Add messages to uncommitted state from `diff.messages` and add diff to the cache fn apply_diff( &self, diff: QueueDiffWithMessages, block_id_short: BlockIdShort, diff_hash: &HashBytes, + end_key: QueueKey, ) -> Result<()>; + /// Move messages from uncommitted state to committed state and update gc ranges fn commit_diff(&self, mc_top_blocks: &[(BlockIdShort, bool)]) -> Result<()>; - fn clear_session_state(&self) -> Result<()>; + /// remove all data in uncommitted state storage + fn clear_uncommitted_state(&self) -> Result<()>; + /// returns the number of diffs in cache for the given shard + fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize; + /// removes all diffs from the cache that are less than `inclusive_until` which source shard is `source_shard` + fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()>; } // IMPLEMENTATION impl QueueFactory for QueueFactoryStdImpl { - type Queue = QueueImpl; + type Queue = QueueImpl; fn create(&self) -> Self::Queue { - let session_state = >::create( - &self.session_state_factory, + let uncommitted_state = >::create( + &self.uncommitted_state_factory, ); - let persistent_state = >::create( - &self.persistent_state_factory, + let committed_state = >::create( + &self.committed_state_factory, ); - let persistent_state = Arc::new(persistent_state); - let gc = GcManager::start::(persistent_state.clone(), self.config.gc_interval); + let committed_state = Arc::new(committed_state); + let gc = GcManager::start::(committed_state.clone(), self.config.gc_interval); QueueImpl { - session_state: Arc::new(session_state), - persistent_state, - diffs: Default::default(), + uncommitted_state: Arc::new(uncommitted_state), + committed_state, + uncommitted_diffs: Default::default(), + committed_diffs: Default::default(), gc, _phantom_data: Default::default(), } @@ -108,38 +119,29 @@ impl QueueFactory for QueueFactoryStdImpl { } struct ShortQueueDiff { - pub processed_upto: BTreeMap, - pub last_key: Option, + pub processed_to: ProcessedTo, + pub end_key: QueueKey, pub hash: HashBytes, } -impl From<(QueueDiffWithMessages, HashBytes)> for ShortQueueDiff { - fn from(value: (QueueDiffWithMessages, HashBytes)) -> Self { - Self { - processed_upto: value.0.processed_upto, - last_key: value.0.messages.last_key_value().map(|(key, _)| *key), - hash: value.1, - } - } -} - pub struct QueueImpl where - S: SessionState, - P: PersistentState, + S: UncommittedState, + P: CommittedState, V: InternalMessageValue, { - session_state: Arc, - persistent_state: Arc

, - diffs: FastDashMap>, + uncommitted_state: Arc, + committed_state: Arc

, + uncommitted_diffs: FastDashMap>, + committed_diffs: FastDashMap>, gc: GcManager, _phantom_data: PhantomData, } impl Queue for QueueImpl where - S: SessionState + Send + Sync, - P: PersistentState + Send + Sync + 'static, + S: UncommittedState + Send + Sync, + P: CommittedState + Send + Sync + 'static, V: InternalMessageValue + Send + Sync, { fn iterator( @@ -147,13 +149,15 @@ where ranges: &FastHashMap, for_shard_id: ShardIdent, ) -> Vec>> { - let snapshot = self.persistent_state.snapshot(); - let persistent_state_iterator = - self.persistent_state + let snapshot = self.committed_state.snapshot(); + let committed_state_iterator = + self.committed_state + .iterator(&snapshot, for_shard_id, ranges); + let uncommitted_state_iterator = + self.uncommitted_state .iterator(&snapshot, for_shard_id, ranges); - let session_state_iterator = self.session_state.iterator(&snapshot, for_shard_id, ranges); - vec![persistent_state_iterator, session_state_iterator] + vec![committed_state_iterator, uncommitted_state_iterator] } fn apply_diff( @@ -161,9 +165,13 @@ where diff: QueueDiffWithMessages, block_id_short: BlockIdShort, hash: &HashBytes, + end_key: QueueKey, ) -> Result<()> { // Get or insert the shard diffs for the given block_id_short.shard - let mut shard_diffs = self.diffs.entry(block_id_short.shard).or_default(); + let mut shard_diffs = self + .uncommitted_diffs + .entry(block_id_short.shard) + .or_default(); // Check for duplicate diffs based on the block_id_short.seqno and hash let shard_diff = shard_diffs.get(&block_id_short.seqno); @@ -178,7 +186,7 @@ where } } - let last_applied_seqno = shard_diffs.last_key_value().map(|(key, _)| *key); + let last_applied_seqno = shard_diffs.keys().next_back().cloned(); if let Some(last_applied_seqno) = last_applied_seqno { // Check if the diff is already applied @@ -196,68 +204,79 @@ where } } - // Add messages to session_state if there are any + // Add messages to uncommitted_state if there are any if !diff.messages.is_empty() { - self.session_state + self.uncommitted_state .add_messages(block_id_short.shard, &diff.messages)?; } + let short_diff = ShortQueueDiff { + processed_to: diff.processed_to, + end_key, + hash: *hash, + }; + // Insert the diff into the shard diffs - shard_diffs.insert(block_id_short.seqno, (diff, *hash).into()); + shard_diffs.insert(block_id_short.seqno, short_diff); Ok(()) } fn commit_diff(&self, mc_top_blocks: &[(BlockIdShort, bool)]) -> Result<()> { - let mut diffs_for_commit = vec![]; let mut shards_to_commit = FastHashMap::default(); let mut gc_ranges = FastHashMap::default(); for (block_id_short, top_shard_block_changed) in mc_top_blocks { - let mut diffs_to_remove = vec![]; - let prev_shard_diffs = self.diffs.get_mut(&block_id_short.shard); + let mut diffs_to_commit = vec![]; - if let Some(mut shard_diffs) = prev_shard_diffs { - shard_diffs + let prev_shard_uncommitted_diffs = + self.uncommitted_diffs.get_mut(&block_id_short.shard); + if let Some(mut shard_uncommitted_diffs) = prev_shard_uncommitted_diffs { + shard_uncommitted_diffs .range(..=block_id_short.seqno) .for_each(|(block_seqno, shard_diff)| { - // find last key to commit for each shard - diffs_for_commit.push(*block_id_short); - let last_key = shard_diff.last_key.unwrap_or_default(); + diffs_to_commit.push(*block_seqno); let current_last_key = shards_to_commit .entry(block_id_short.shard) - .or_insert_with(|| last_key); + .or_insert_with(|| shard_diff.end_key); - if last_key > *current_last_key { - *current_last_key = last_key; + if shard_diff.end_key > *current_last_key { + *current_last_key = shard_diff.end_key; } - // find min processed_upto for each shard for GC + // find min processed_to for each shard for GC if *block_seqno == block_id_short.seqno && *top_shard_block_changed { - for processed_upto in shard_diff.processed_upto.iter() { + for (shard_ident, processed_to_key) in shard_diff.processed_to.iter() { let last_key = gc_ranges - .entry(*processed_upto.0) - .or_insert_with(|| *processed_upto.1); + .entry(*shard_ident) + .or_insert_with(|| *processed_to_key); - if processed_upto.1 < last_key { - *last_key = *processed_upto.1; + if processed_to_key < last_key { + *last_key = *processed_to_key; } } } - - diffs_to_remove.push(*block_seqno); }); - for seqno in diffs_to_remove { - shard_diffs.remove(&seqno); + for seqno in diffs_to_commit { + if let Some(diff) = shard_uncommitted_diffs.remove(&seqno) { + // Move the diff to committed_diffs + let mut shard_committed_diffs = self + .committed_diffs + .entry(block_id_short.shard) + .or_default(); + shard_committed_diffs.insert(seqno, diff); + } } } } - self.session_state.commit_messages(&shards_to_commit)?; + self.uncommitted_state.commit_messages(&shards_to_commit)?; + + let uncommitted_diffs_count: usize = + self.uncommitted_diffs.iter().map(|r| r.value().len()).sum(); - let uncommitted_diffs_count: usize = self.diffs.iter().map(|r| r.value().len()).sum(); metrics::counter!("tycho_internal_queue_uncommitted_diffs_count") .increment(uncommitted_diffs_count as u64); @@ -268,8 +287,44 @@ where Ok(()) } - fn clear_session_state(&self) -> Result<()> { - self.diffs.clear(); - self.session_state.truncate() + fn clear_uncommitted_state(&self) -> Result<()> { + let diffs_before_clear: usize = + self.uncommitted_diffs.iter().map(|r| r.value().len()).sum(); + self.uncommitted_diffs.clear(); + let diffs_after_clear: usize = self.uncommitted_diffs.iter().map(|r| r.value().len()).sum(); + tracing::info!( + target: tracing_targets::MQ, + diffs_before_clear, + diffs_after_clear, + "Cleared uncommitted diffs.", + ); + self.uncommitted_state.truncate() + } + + fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize { + let uncommitted_count = self + .uncommitted_diffs + .get(shard_ident) + .map_or(0, |diffs| diffs.len()); + let committed_count = self + .committed_diffs + .get(shard_ident) + .map_or(0, |diffs| diffs.len()); + + uncommitted_count + committed_count + } + + fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()> { + if let Some(mut shard_diffs) = self.uncommitted_diffs.get_mut(source_shard) { + shard_diffs + .value_mut() + .retain(|_, diff| &diff.end_key > inclusive_until); + } + if let Some(mut shard_diffs) = self.committed_diffs.get_mut(source_shard) { + shard_diffs + .value_mut() + .retain(|_, diff| &diff.end_key > inclusive_until); + } + Ok(()) } } diff --git a/collator/src/internal_queue/state/persistent_state.rs b/collator/src/internal_queue/state/commited_state.rs similarity index 68% rename from collator/src/internal_queue/state/persistent_state.rs rename to collator/src/internal_queue/state/commited_state.rs index 6367c3eef..cb7533b6c 100644 --- a/collator/src/internal_queue/state/persistent_state.rs +++ b/collator/src/internal_queue/state/commited_state.rs @@ -12,52 +12,52 @@ use crate::internal_queue::types::InternalMessageValue; // CONFIG -pub struct PersistentStateConfig { +pub struct CommittedStateConfig { pub storage: Storage, } // FACTORY -impl PersistentStateFactory for F +impl CommittedStateFactory for F where F: Fn() -> R, - R: PersistentState, + R: CommittedState, V: InternalMessageValue, { - type PersistentState = R; + type CommittedState = R; - fn create(&self) -> Self::PersistentState { + fn create(&self) -> Self::CommittedState { self() } } -pub struct PersistentStateImplFactory { +pub struct CommittedStateImplFactory { pub storage: Storage, } -impl PersistentStateImplFactory { +impl CommittedStateImplFactory { pub fn new(storage: Storage) -> Self { Self { storage } } } -impl PersistentStateFactory for PersistentStateImplFactory { - type PersistentState = PersistentStateStdImpl; +impl CommittedStateFactory for CommittedStateImplFactory { + type CommittedState = CommittedStateStdImpl; - fn create(&self) -> Self::PersistentState { - PersistentStateStdImpl::new(self.storage.clone()) + fn create(&self) -> Self::CommittedState { + CommittedStateStdImpl::new(self.storage.clone()) } } -pub trait PersistentStateFactory { - type PersistentState: PersistentState; +pub trait CommittedStateFactory { + type CommittedState: CommittedState; - fn create(&self) -> Self::PersistentState; + fn create(&self) -> Self::CommittedState; } // TRAIT -pub trait PersistentState: Send + Sync { +pub trait CommittedState: Send + Sync { fn snapshot(&self) -> OwnedSnapshot; fn iterator( @@ -72,17 +72,17 @@ pub trait PersistentState: Send + Sync { // IMPLEMENTATION -pub struct PersistentStateStdImpl { +pub struct CommittedStateStdImpl { storage: Storage, } -impl PersistentStateStdImpl { +impl CommittedStateStdImpl { pub fn new(storage: Storage) -> Self { Self { storage } } } -impl PersistentState for PersistentStateStdImpl { +impl CommittedState for CommittedStateStdImpl { fn snapshot(&self) -> OwnedSnapshot { self.storage.internal_queue_storage().snapshot() } @@ -99,7 +99,7 @@ impl PersistentState for PersistentStateStdImpl { let iter = self .storage .internal_queue_storage() - .build_iterator_persistent(snapshot); + .build_iterator_committed(snapshot); shard_iters_with_ranges .insert(shard, ShardIteratorWithRange::new(iter, range.0, range.1)); diff --git a/collator/src/internal_queue/state/mod.rs b/collator/src/internal_queue/state/mod.rs index f663078af..a30ba226b 100644 --- a/collator/src/internal_queue/state/mod.rs +++ b/collator/src/internal_queue/state/mod.rs @@ -1,5 +1,5 @@ -pub mod persistent_state; -pub mod session_state; +pub mod commited_state; pub mod shard_iterator; pub mod state_iterator; pub mod states_iterators_manager; +pub mod uncommitted_state; diff --git a/collator/src/internal_queue/state/states_iterators_manager.rs b/collator/src/internal_queue/state/states_iterators_manager.rs index 6f48bf62d..a89b2cce8 100644 --- a/collator/src/internal_queue/state/states_iterators_manager.rs +++ b/collator/src/internal_queue/state/states_iterators_manager.rs @@ -33,12 +33,9 @@ impl StatesIteratorsManager { } pub fn current_position(&self) -> FastHashMap { - let mut result = FastHashMap::default(); - for iterator in &self.iterators { - for (shard, position) in iterator.current_position() { - result.insert(shard, position); - } - } - result + self.iterators + .iter() + .flat_map(|iterator| iterator.current_position()) + .collect() } } diff --git a/collator/src/internal_queue/state/session_state.rs b/collator/src/internal_queue/state/uncommitted_state.rs similarity index 70% rename from collator/src/internal_queue/state/session_state.rs rename to collator/src/internal_queue/state/uncommitted_state.rs index 35257b131..dfdc29bfa 100644 --- a/collator/src/internal_queue/state/session_state.rs +++ b/collator/src/internal_queue/state/uncommitted_state.rs @@ -17,53 +17,53 @@ use crate::internal_queue::types::InternalMessageValue; // CONFIG -pub struct SessionStateConfig { +pub struct UncommittedStateConfig { pub storage: Storage, } // FACTORY -impl SessionStateFactory for F +impl UncommittedStateFactory for F where F: Fn() -> R, - R: SessionState, + R: UncommittedState, V: InternalMessageValue, { - type SessionState = R; + type UncommittedState = R; - fn create(&self) -> Self::SessionState { + fn create(&self) -> Self::UncommittedState { self() } } -pub struct SessionStateImplFactory { +pub struct UncommittedStateImplFactory { pub storage: Storage, } -impl SessionStateImplFactory { +impl UncommittedStateImplFactory { pub fn new(storage: Storage) -> Self { Self { storage } } } -impl SessionStateFactory for SessionStateImplFactory { - type SessionState = SessionStateStdImpl; +impl UncommittedStateFactory for UncommittedStateImplFactory { + type UncommittedState = UncommittedStateStdImpl; - fn create(&self) -> Self::SessionState { - SessionStateStdImpl::new(self.storage.clone()) + fn create(&self) -> Self::UncommittedState { + UncommittedStateStdImpl::new(self.storage.clone()) } } -pub trait SessionStateFactory { - type SessionState: LocalSessionState; +pub trait UncommittedStateFactory { + type UncommittedState: LocalUncommittedState; - fn create(&self) -> Self::SessionState; + fn create(&self) -> Self::UncommittedState; } // TRAIT -#[trait_variant::make(SessionState: Send)] -pub trait LocalSessionState { +#[trait_variant::make(UncommittedState: Send)] +pub trait LocalUncommittedState { fn add_messages(&self, source: ShardIdent, messages: &BTreeMap>) -> Result<()>; @@ -80,17 +80,17 @@ pub trait LocalSessionState { // IMPLEMENTATION -pub struct SessionStateStdImpl { +pub struct UncommittedStateStdImpl { storage: Storage, } -impl SessionStateStdImpl { +impl UncommittedStateStdImpl { pub fn new(storage: Storage) -> Self { Self { storage } } } -impl SessionState for SessionStateStdImpl { +impl UncommittedState for UncommittedStateStdImpl { /// write new messages to storage fn add_messages( &self, @@ -102,7 +102,7 @@ impl SessionState for SessionStateStdImpl { for (internal_message_key, message) in messages.iter() { self.storage .internal_queue_storage() - .insert_message_session( + .insert_message_uncommitted( &mut batch, tycho_storage::model::ShardsInternalMessagesKey::new( source, @@ -130,7 +130,7 @@ impl SessionState for SessionStateStdImpl { let iter = self .storage .internal_queue_storage() - .build_iterator_session(snapshot); + .build_iterator_uncommitted(snapshot); shard_iters_with_ranges.insert(shard, ShardIteratorWithRange::new(iter, *start, *end)); } @@ -139,14 +139,13 @@ impl SessionState for SessionStateStdImpl { } fn commit_messages(&self, ranges: &FastHashMap) -> Result<()> { - let ranges = ranges - .iter() - .map(|(shard, key)| (*shard, *key)) - .collect::>(); + let ranges = ranges.iter().map(|(shard, key)| (*shard, *key)).collect(); self.storage.internal_queue_storage().commit(ranges) } fn truncate(&self) -> Result<()> { - self.storage.internal_queue_storage().clear_session_queue() + self.storage + .internal_queue_storage() + .clear_uncommitted_queue() } } diff --git a/collator/src/internal_queue/types.rs b/collator/src/internal_queue/types.rs index 2b2214722..db6ad2942 100644 --- a/collator/src/internal_queue/types.rs +++ b/collator/src/internal_queue/types.rs @@ -5,22 +5,23 @@ use std::sync::Arc; use anyhow::{bail, Context, Result}; use everscale_types::boc::Boc; use everscale_types::cell::{Cell, HashBytes, Load}; -use everscale_types::models::{IntAddr, IntMsgInfo, Message, MsgInfo, OutMsgDescr, ShardIdent}; +use everscale_types::models::{IntAddr, IntMsgInfo, Message, MsgInfo, OutMsgDescr}; use tycho_block_util::queue::{QueueDiff, QueueDiffStuff, QueueKey}; use super::state::state_iterator::MessageExt; +use crate::types::ProcessedTo; #[derive(Default, Debug, Clone)] pub struct QueueDiffWithMessages { pub messages: BTreeMap>, - pub processed_upto: BTreeMap, + pub processed_to: ProcessedTo, } impl QueueDiffWithMessages { pub fn new() -> Self { Self { messages: BTreeMap::new(), - processed_upto: BTreeMap::new(), + processed_to: BTreeMap::new(), } } } @@ -30,8 +31,8 @@ impl QueueDiffWithMessages { queue_diff_stuff: &QueueDiffStuff, out_msg_description: &OutMsgDescr, ) -> Result { - let QueueDiff { processed_upto, .. } = queue_diff_stuff.as_ref(); - let processed_upto: BTreeMap = processed_upto + let QueueDiff { processed_to, .. } = queue_diff_stuff.as_ref(); + let processed_to = processed_to .iter() .map(|(shard_ident, key)| (*shard_ident, *key)) .collect(); @@ -51,7 +52,7 @@ impl QueueDiffWithMessages { Ok(Self { messages, - processed_upto, + processed_to, }) } } diff --git a/collator/src/manager/blocks_cache.rs b/collator/src/manager/blocks_cache.rs index c80b68556..b8073711e 100644 --- a/collator/src/manager/blocks_cache.rs +++ b/collator/src/manager/blocks_cache.rs @@ -6,7 +6,7 @@ use everscale_types::models::{ BlockId, BlockIdShort, ConsensusInfo, Lazy, OutMsgDescr, ShardIdent, ValueFlow, }; use parking_lot::Mutex; -use tycho_block_util::queue::{QueueDiffStuff, QueueKey}; +use tycho_block_util::queue::QueueDiffStuff; use tycho_block_util::state::ShardStateStuff; use tycho_util::{FastDashMap, FastHashMap}; @@ -18,7 +18,8 @@ use crate::manager::types::{AdditionalShardBlockCacheInfo, BlockCacheEntryData}; use crate::state_node::StateNodeAdapter; use crate::tracing_targets; use crate::types::{ - BlockCandidate, DisplayIntoIter, DisplayIter, McData, ProofFunds, TopBlockDescription, + BlockCandidate, DisplayIntoIter, DisplayIter, McData, ProcessedTo, ProofFunds, + TopBlockDescription, }; use crate::validator::ValidationStatus; @@ -44,6 +45,12 @@ impl BlocksCache { for mut shard_cache in self.shards.iter_mut() { for (_, entry) in shard_cache.blocks.iter().rev() { if entry.ref_by_mc_seqno == next_mc_block_id_short.seqno { + let processed_to = entry + .int_processed_to() + .iter() + .map(|(shard, queue_key)| (*shard, *queue_key)) + .collect(); + if let Some(additional_info) = entry.data.get_additional_shard_block_cache_info()? { @@ -55,6 +62,7 @@ impl BlocksCache { proof_funds: std::mem::take(&mut shard_cache.data.proof_funds), #[cfg(feature = "block-creator-stats")] creators: std::mem::take(&mut shard_cache.data.creators), + processed_to, }); break; } @@ -85,11 +93,10 @@ impl BlocksCache { Ok(consensus_info) } - pub fn reset_top_shard_blocks_additional_info(&self) -> Result<()> { + pub fn reset_top_shard_blocks_additional_info(&self) { for mut shard_cache in self.shards.iter_mut() { - shard_cache.data.reset_top_shard_block_additional_info()?; + shard_cache.data.reset_top_shard_block_additional_info(); } - Ok(()) } /// Find shard block in cache and then get containing master block id if link exists @@ -133,7 +140,7 @@ impl BlocksCache { pub fn get_all_processed_to_by_mc_block_from_cache( &self, mc_block_key: &BlockCacheKey, - ) -> Result>>> { + ) -> Result>> { let mut all_processed_to = FastHashMap::default(); if mc_block_key.seqno == 0 { @@ -149,6 +156,9 @@ impl BlocksCache { mc_block_key, ) }; + + let processed_to = mc_block_entry.int_processed_to().clone(); + updated_top_shard_block_ids = mc_block_entry .top_shard_blocks_info .iter() @@ -156,10 +166,7 @@ impl BlocksCache { .map(|(id, _)| id) .cloned() .collect::>(); - all_processed_to.insert( - mc_block_entry.block_id, - Some(mc_block_entry.int_processed_to().clone()), - ); + all_processed_to.insert(mc_block_entry.block_id, Some(processed_to)); } for top_sc_block_id in updated_top_shard_block_ids { @@ -167,15 +174,21 @@ impl BlocksCache { continue; } + let mut processed_to = BTreeMap::default(); + // try to find in cache - let mut int_processed_to_opt = None; if let Some(shard_cache) = self.shards.get(&top_sc_block_id.shard) { if let Some(sc_block_entry) = shard_cache.blocks.get(&top_sc_block_id.seqno) { - int_processed_to_opt = Some(sc_block_entry.int_processed_to().clone()); + sc_block_entry + .int_processed_to() + .iter() + .for_each(|(shard, queue_key)| { + processed_to.insert(*shard, *queue_key); + }); } } - all_processed_to.insert(top_sc_block_id, int_processed_to_opt); + all_processed_to.insert(top_sc_block_id, Some(processed_to)); } Ok(all_processed_to) @@ -900,14 +913,12 @@ impl ShardBlocksCacheData { Ok(()) } - fn reset_top_shard_block_additional_info(&mut self) -> Result<()> { + fn reset_top_shard_block_additional_info(&mut self) { self.value_flow = Default::default(); self.proof_funds = Default::default(); #[cfg(feature = "block-creator-stats")] self.creators.clear(); - - Ok(()) } } diff --git a/collator/src/manager/mod.rs b/collator/src/manager/mod.rs index 40cc0e68b..5bdf13122 100644 --- a/collator/src/manager/mod.rs +++ b/collator/src/manager/mod.rs @@ -12,7 +12,6 @@ use everscale_types::models::{ use parking_lot::{Mutex, RwLock}; use tokio::sync::Notify; use tycho_block_util::block::{calc_next_block_id_short, ValidatorSubsetInfo}; -use tycho_block_util::queue::QueueKey; use tycho_block_util::state::ShardStateStuff; use tycho_core::global_config::MempoolGlobalConfig; use tycho_util::metrics::HistogramGuard; @@ -40,7 +39,7 @@ use crate::state_node::{StateNodeAdapter, StateNodeAdapterFactory, StateNodeEven use crate::types::{ BlockCollationResult, BlockIdExt, CollationSessionId, CollationSessionInfo, CollatorConfig, DebugIter, DisplayAsShortId, DisplayBlockIdsIntoIter, DisplayIter, DisplayTuple, McData, - ShardDescriptionExt, ShardDescriptionShort, ShardHashesExt, + ProcessedTo, ShardDescriptionExt, ShardDescriptionShort, ShardHashesExt, }; use crate::utils::async_dispatcher::{AsyncDispatcher, STANDARD_ASYNC_DISPATCHER_BUFFER_SIZE}; use crate::utils::block::detect_top_processed_to_anchor; @@ -480,6 +479,7 @@ where queue_diff_with_msgs, queue_diff.block_id().as_short_id(), queue_diff.diff_hash(), + queue_diff.diff().max_message, ) } @@ -722,7 +722,7 @@ where "collator was cancelled before", ); - self.mq_adapter.clear_session_state()?; + self.mq_adapter.clear_uncommitted_state()?; let (last_collated_mc_block_id, applied_mc_queue_range) = self .blocks_cache .get_last_collated_block_and_applied_mc_queue_range(); @@ -768,7 +768,7 @@ where } } - self.mq_adapter.clear_session_state()?; + self.mq_adapter.clear_uncommitted_state()?; } tracing::debug!( @@ -984,7 +984,7 @@ where } } - self.mq_adapter.clear_session_state()?; + self.mq_adapter.clear_uncommitted_state()?; } tracing::debug!(target: tracing_targets::COLLATION_MANAGER, @@ -1178,15 +1178,31 @@ where new_genesis_time_millis = mp_cfg_override.genesis_time_millis, "will drop uncommitted internal messages from queue on new genesis", ); - self.mq_adapter.clear_session_state()?; + self.mq_adapter.clear_uncommitted_state()?; } } - // get min internals processed upto - let min_processed_to_by_shards = self + // internals processed upto + let processed_to_by_shards = self .read_min_processed_to_for_mc_block(&last_applied_mc_block_key) .await?; + // calc internals processed upto + let mut min_processed_to_by_shards = BTreeMap::default(); + + for min_processed_upto in processed_to_by_shards.values() { + for (shard_id, to_key) in min_processed_upto { + min_processed_to_by_shards + .entry(shard_id) + .and_modify(|min| { + if &to_key < min { + *min = to_key; + } + }) + .or_insert(to_key); + } + } + tracing::debug!(target: tracing_targets::COLLATION_MANAGER, min_processed_to_by_shards = %DisplayIter(min_processed_to_by_shards.iter().map(DisplayTuple)), ); @@ -1211,8 +1227,8 @@ where // try load required previous queue diffs let mut prev_queue_diffs = vec![]; - for (shard_id, min_processed_to) in min_processed_to_by_shards { - let Some((_, prev_block_ids)) = before_tail_block_ids.get(&shard_id) else { + for (shard_id, min_processed_to) in &min_processed_to_by_shards { + let Some((_, prev_block_ids)) = before_tail_block_ids.get(shard_id) else { continue; }; let mut prev_block_ids: VecDeque<_> = prev_block_ids.iter().cloned().collect(); @@ -1250,7 +1266,7 @@ where ); return Ok(false); }; - let diff_required = queue_diff_stuff.as_ref().max_message > min_processed_to; + let diff_required = &queue_diff_stuff.as_ref().max_message > min_processed_to; tracing::debug!(target: tracing_targets::COLLATION_MANAGER, diff_block_id = %prev_block_id.as_short_id(), diff_required, @@ -1272,6 +1288,7 @@ where queue_diff_with_messages, *queue_diff_stuff.diff_hash(), prev_block_id, + queue_diff_stuff.diff().max_message, )); let prev_ids_info = block_stuff.construct_prev_id()?; @@ -1284,9 +1301,17 @@ where } // apply required previous queue diffs - while let Some((diff, diff_hash, block_id)) = prev_queue_diffs.pop() { - self.mq_adapter - .apply_diff(diff, block_id.as_short_id(), &diff_hash)?; + while let Some((diff, diff_hash, block_id, max_message_key)) = prev_queue_diffs.pop() { + self.mq_adapter.apply_diff( + diff, + block_id.as_short_id(), + &diff_hash, + max_message_key, + )?; + } + // trim diffs tails for all shards + for (shard_id, min_processed_to) in min_processed_to_by_shards { + self.mq_adapter.trim_diffs(shard_id, min_processed_to)?; } // sync all applied blocks @@ -1342,9 +1367,9 @@ where // reset top shard blocks info // because next we will start to collate new shard blocks after the sync - self.blocks_cache.reset_top_shard_blocks_additional_info()?; + self.blocks_cache.reset_top_shard_blocks_additional_info(); - let mc_data = McData::load_from_state(state)?; + let mc_data = McData::load_from_state(state, processed_to_by_shards)?; let top_processed_to_anchor = mc_data.top_processed_to_anchor; // remove all previous blocks from cache @@ -1377,8 +1402,8 @@ where async fn read_min_processed_to_for_mc_block( &self, mc_block_key: &BlockCacheKey, - ) -> Result> { - let mut result = BTreeMap::new(); + ) -> Result> { + let mut result = FastHashMap::default(); if mc_block_key.seqno == 0 { return Ok(result); @@ -1393,26 +1418,20 @@ where Some(processed_to) => processed_to, None => { // try get from storage - let loaded = utils::load_only_queue_diff_stuff( + utils::load_only_queue_diff_stuff( self.state_node_adapter.as_ref(), &top_block_id, ) - .await?; - - loaded.as_ref().processed_upto.clone() + .await? + .as_ref() + .processed_to + .clone() + .into_iter() + .collect() } }; - for (shard_id, to_key) in processed_to { - result - .entry(shard_id) - .and_modify(|min| { - if &to_key < min { - *min = to_key; - } - }) - .or_insert(to_key); - } + result.insert(top_block_id.shard, processed_to.clone()); } Ok(result) diff --git a/collator/src/manager/types.rs b/collator/src/manager/types.rs index a9e6b280b..4bc3cd63e 100644 --- a/collator/src/manager/types.rs +++ b/collator/src/manager/types.rs @@ -1,17 +1,17 @@ -use std::collections::BTreeMap; use std::fmt::{Debug, Display}; use std::sync::Arc; use anyhow::{anyhow, Result}; use everscale_types::models::{BlockId, BlockIdShort, BlockInfo, Lazy, OutMsgDescr, ShardIdent}; use tokio::sync::Notify; -use tycho_block_util::queue::{QueueDiffStuff, QueueKey}; +use tycho_block_util::queue::QueueDiffStuff; use tycho_block_util::state::ShardStateStuff; use tycho_network::PeerId; use tycho_util::FastHashMap; use crate::types::{ - ArcSignature, BlockCandidate, BlockStuffForSync, DebugDisplayOpt, McData, ShardDescriptionExt, + ArcSignature, BlockCandidate, BlockStuffForSync, DebugDisplayOpt, McData, ProcessedTo, + ShardDescriptionExt, }; pub(super) type BlockCacheKey = BlockIdShort; @@ -328,18 +328,12 @@ impl BlockCacheEntry { } } - pub fn int_processed_to(&self) -> &BTreeMap { + pub fn int_processed_to(&self) -> &ProcessedTo { match &self.data { BlockCacheEntryData::Collated { candidate_stuff, .. - } => { - &candidate_stuff - .candidate - .queue_diff_aug - .diff() - .processed_upto - } - BlockCacheEntryData::Received { queue_diff, .. } => &queue_diff.diff().processed_upto, + } => &candidate_stuff.candidate.queue_diff_aug.diff().processed_to, + BlockCacheEntryData::Received { queue_diff, .. } => &queue_diff.diff().processed_to, } } } diff --git a/collator/src/queue_adapter.rs b/collator/src/queue_adapter.rs index 983428afb..8b6e4d94f 100644 --- a/collator/src/queue_adapter.rs +++ b/collator/src/queue_adapter.rs @@ -7,15 +7,15 @@ use tycho_util::FastHashMap; use crate::internal_queue::iterator::{QueueIterator, QueueIteratorExt, QueueIteratorImpl}; use crate::internal_queue::queue::{Queue, QueueImpl}; -use crate::internal_queue::state::persistent_state::PersistentStateStdImpl; -use crate::internal_queue::state::session_state::SessionStateStdImpl; +use crate::internal_queue::state::commited_state::CommittedStateStdImpl; use crate::internal_queue::state::states_iterators_manager::StatesIteratorsManager; +use crate::internal_queue::state::uncommitted_state::UncommittedStateStdImpl; use crate::internal_queue::types::{InternalMessageValue, QueueDiffWithMessages}; use crate::tracing_targets; use crate::types::{DisplayIter, DisplayTuple, DisplayTupleRef}; pub struct MessageQueueAdapterStdImpl { - queue: QueueImpl, + queue: QueueImpl, } pub trait MessageQueueAdapter: Send + Sync @@ -29,15 +29,16 @@ where shards_from: FastHashMap, shards_to: FastHashMap, ) -> Result>>; - /// Apply diff to the current queue session state (waiting for the operation to complete) + /// Apply diff to the current queue uncommitted state (waiting for the operation to complete) fn apply_diff( &self, diff: QueueDiffWithMessages, block_id_short: BlockIdShort, diff_hash: &HashBytes, + end_key: QueueKey, ) -> Result<()>; - /// Commit previously applied diff, saving changes to persistent state (waiting for the operation to complete). + /// Commit previously applied diff, saving changes to committed state (waiting for the operation to complete). /// Return `None` if specified diff does not exist. fn commit_diff(&self, mc_top_blocks: Vec<(BlockIdShort, bool)>) -> Result<()>; @@ -55,11 +56,16 @@ where messages: Vec<(ShardIdent, QueueKey)>, ) -> Result<()>; - fn clear_session_state(&self) -> Result<()>; + fn clear_uncommitted_state(&self) -> Result<()>; + /// removes all diffs from the cache that are less than `inclusive_until` which source shard is `source_shard` + fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()>; + + /// returns the number of diffs in cache for the given shard + fn get_diff_count_by_shard(&self, shard_ident: &ShardIdent) -> usize; } impl MessageQueueAdapterStdImpl { - pub fn new(queue: QueueImpl) -> Self { + pub fn new(queue: QueueImpl) -> Self { Self { queue } } } @@ -99,16 +105,17 @@ impl MessageQueueAdapter for MessageQueueAdapterStdI diff: QueueDiffWithMessages, block_id_short: BlockIdShort, hash: &HashBytes, + end_key: QueueKey, ) -> Result<()> { let time = std::time::Instant::now(); let len = diff.messages.len(); - let processed_upto = diff.processed_upto.clone(); - self.queue.apply_diff(diff, block_id_short, hash)?; + let processed_to = diff.processed_to.clone(); + self.queue.apply_diff(diff, block_id_short, hash, end_key)?; tracing::info!(target: tracing_targets::MQ_ADAPTER, new_messages_len = len, elapsed = ?time.elapsed(), - processed_upto = %DisplayIter(processed_upto.iter().map(DisplayTuple)), + processed_to = %DisplayIter(processed_to.iter().map(DisplayTuple)), "Diff applied", ); Ok(()) @@ -150,7 +157,22 @@ impl MessageQueueAdapter for MessageQueueAdapterStdI iterator.commit(messages) } - fn clear_session_state(&self) -> Result<()> { - self.queue.clear_session_state() + fn clear_uncommitted_state(&self) -> Result<()> { + tracing::trace!(target: tracing_targets::MQ_ADAPTER, "Clearing uncommitted state"); + self.queue.clear_uncommitted_state() + } + + fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()> { + tracing::trace!( + target: tracing_targets::MQ_ADAPTER, + source_shard = %source_shard, + inclusive_until = %inclusive_until, + "Trimming diffs" + ); + self.queue.trim_diffs(source_shard, inclusive_until) + } + + fn get_diff_count_by_shard(&self, shard_ident: &ShardIdent) -> usize { + self.queue.get_diffs_count_by_shard(shard_ident) } } diff --git a/collator/src/types.rs b/collator/src/types.rs index e26729ba9..166da2c5f 100644 --- a/collator/src/types.rs +++ b/collator/src/types.rs @@ -214,10 +214,15 @@ pub struct McData { pub top_processed_to_anchor: MempoolAnchorId, pub ref_mc_state_handle: RefMcStateHandle, + + pub shards_processed_to: FastHashMap, } impl McData { - pub fn load_from_state(state_stuff: &ShardStateStuff) -> Result> { + pub fn load_from_state( + state_stuff: &ShardStateStuff, + shards_processed_to: FastHashMap, + ) -> Result> { let block_id = *state_stuff.block_id(); let extra = state_stuff.state_extra()?; let state = state_stuff.as_ref(); @@ -258,6 +263,7 @@ impl McData { top_processed_to_anchor, ref_mc_state_handle: state_stuff.ref_mc_state_handle().clone(), + shards_processed_to, })) } @@ -290,6 +296,7 @@ pub struct BlockCandidate { pub created_by: HashBytes, pub queue_diff_aug: QueueDiffStuffAug, pub consensus_info: ConsensusInfo, + pub processed_to: FastHashMap, } #[derive(Default, Clone)] @@ -431,6 +438,7 @@ pub struct TopBlockDescription { pub proof_funds: ProofFunds, #[cfg(feature = "block-creator-stats")] pub creators: Vec, + pub processed_to: ProcessedTo, } #[derive(Debug)] @@ -615,7 +623,7 @@ impl std::fmt::Display for Display } } -#[derive(Debug, Clone, Copy, Eq, PartialEq)] +#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)] pub struct ShardDescriptionShort { pub ext_processed_to_anchor_id: u32, pub top_sc_block_updated: bool, @@ -668,3 +676,11 @@ where Ok(res) } } + +#[derive(Debug, Clone)] +pub struct TopShardBlockInfo { + pub block_id: BlockId, + pub processed_to: ProcessedTo, +} + +pub type ProcessedTo = BTreeMap; diff --git a/collator/tests/collation_tests.rs b/collator/tests/collation_tests.rs index 8d8ea3175..109c64719 100644 --- a/collator/tests/collation_tests.rs +++ b/collator/tests/collation_tests.rs @@ -8,8 +8,8 @@ use tycho_block_util::block::BlockIdRelation; use tycho_block_util::state::MinRefMcStateTracker; use tycho_collator::collator::CollatorStdImplFactory; use tycho_collator::internal_queue::queue::{QueueFactory, QueueFactoryStdImpl}; -use tycho_collator::internal_queue::state::persistent_state::PersistentStateImplFactory; -use tycho_collator::internal_queue::state::session_state::SessionStateImplFactory; +use tycho_collator::internal_queue::state::commited_state::CommittedStateImplFactory; +use tycho_collator::internal_queue::state::uncommitted_state::UncommittedStateImplFactory; use tycho_collator::manager::CollationManager; use tycho_collator::mempool::MempoolAdapterStubImpl; use tycho_collator::queue_adapter::MessageQueueAdapterStdImpl; @@ -93,12 +93,12 @@ async fn test_collation_process_on_stubs() { tracing::info!("Trying to start CollationManager"); - let session_state_factory = SessionStateImplFactory::new(storage.clone()); - let persistent_state_factory = PersistentStateImplFactory::new(storage.clone()); + let uncommitted_state_factory = UncommittedStateImplFactory::new(storage.clone()); + let committed_state_factory = CommittedStateImplFactory::new(storage.clone()); let queue_factory = QueueFactoryStdImpl { - session_state_factory, - persistent_state_factory, + uncommitted_state_factory, + committed_state_factory, config: Default::default(), }; let queue = queue_factory.create(); diff --git a/collator/tests/internal_queue.rs b/collator/tests/internal_queue.rs index df042cf3f..5d4137b54 100644 --- a/collator/tests/internal_queue.rs +++ b/collator/tests/internal_queue.rs @@ -16,13 +16,13 @@ use tycho_block_util::queue::{QueueDiff, QueueDiffStuff, QueueKey}; use tycho_collator::internal_queue::queue::{ Queue, QueueConfig, QueueFactory, QueueFactoryStdImpl, QueueImpl, }; -use tycho_collator::internal_queue::state::persistent_state::{ - PersistentStateImplFactory, PersistentStateStdImpl, -}; -use tycho_collator::internal_queue::state::session_state::{ - SessionStateImplFactory, SessionStateStdImpl, +use tycho_collator::internal_queue::state::commited_state::{ + CommittedStateImplFactory, CommittedStateStdImpl, }; use tycho_collator::internal_queue::state::states_iterators_manager::StatesIteratorsManager; +use tycho_collator::internal_queue::state::uncommitted_state::{ + UncommittedStateImplFactory, UncommittedStateStdImpl, +}; use tycho_collator::internal_queue::types::{InternalMessageValue, QueueDiffWithMessages}; use tycho_collator::test_utils::prepare_test_storage; use tycho_util::FastHashMap; @@ -94,16 +94,16 @@ async fn test_queue() -> anyhow::Result<()> { let (storage, _tmp_dir) = prepare_test_storage().await.unwrap(); let queue_factory = QueueFactoryStdImpl { - session_state_factory: SessionStateImplFactory { + uncommitted_state_factory: UncommittedStateImplFactory { storage: storage.clone(), }, - persistent_state_factory: PersistentStateImplFactory { storage }, + committed_state_factory: CommittedStateImplFactory { storage }, config: QueueConfig { gc_interval: Duration::from_secs(1), }, }; - let queue: QueueImpl = + let queue: QueueImpl = queue_factory.create(); let block = BlockIdShort { shard: ShardIdent::new_full(0), @@ -139,7 +139,9 @@ async fn test_queue() -> anyhow::Result<()> { .insert(stored_object.key(), stored_object.clone()); } - queue.apply_diff(diff, block, &HashBytes::from([1; 32]))?; + let end_key = *diff.messages.iter().last().unwrap().0; + + queue.apply_diff(diff, block, &HashBytes::from([1; 32]), end_key)?; let top_blocks = vec![(block, true)]; @@ -182,7 +184,8 @@ async fn test_queue() -> anyhow::Result<()> { let top_blocks = vec![(block2, true)]; - queue.apply_diff(diff, block2, &HashBytes::from([0; 32]))?; + let end_key = *diff.messages.iter().last().unwrap().0; + queue.apply_diff(diff, block2, &HashBytes::from([0; 32]), end_key)?; queue.commit_diff(&top_blocks)?; let mut ranges = FastHashMap::default(); @@ -226,16 +229,16 @@ async fn test_queue_clear() -> anyhow::Result<()> { let (storage, _tmp_dir) = prepare_test_storage().await.unwrap(); let queue_factory = QueueFactoryStdImpl { - session_state_factory: SessionStateImplFactory { + uncommitted_state_factory: UncommittedStateImplFactory { storage: storage.clone(), }, - persistent_state_factory: PersistentStateImplFactory { storage }, + committed_state_factory: CommittedStateImplFactory { storage }, config: QueueConfig { gc_interval: Duration::from_secs(1), }, }; - let queue: QueueImpl = + let queue: QueueImpl = queue_factory.create(); let block = BlockIdShort { shard: ShardIdent::new_full(0), @@ -253,7 +256,9 @@ async fn test_queue_clear() -> anyhow::Result<()> { .insert(stored_object.key(), stored_object.clone()); } - queue.apply_diff(diff, block, &HashBytes::from([1; 32]))?; + let end_key = *diff.messages.iter().last().unwrap().0; + + queue.apply_diff(diff, block, &HashBytes::from([1; 32]), end_key)?; let mut ranges = FastHashMap::default(); ranges.insert( @@ -275,7 +280,7 @@ async fn test_queue_clear() -> anyhow::Result<()> { let mut iterator_manager = StatesIteratorsManager::new(iterators); assert!(iterator_manager.next().ok().is_some()); - queue.clear_session_state()?; + queue.clear_uncommitted_state()?; let iterators = queue.iterator(&ranges, ShardIdent::new_full(1)); @@ -395,7 +400,7 @@ fn test_queue_diff_with_messages_from_queue_diff_stuff() -> anyhow::Result<()> { prev_hash: HashBytes::from([0x33; 32]), shard_ident: ShardIdent::MASTERCHAIN, seqno: 123, - processed_upto: BTreeMap::from([ + processed_to: BTreeMap::from([ (ShardIdent::MASTERCHAIN, QueueKey { lt: 1, hash: message1_hash, @@ -428,7 +433,7 @@ fn test_queue_diff_with_messages_from_queue_diff_stuff() -> anyhow::Result<()> { let diff_with_messages = QueueDiffWithMessages::from_queue_diff(&queue_diff_stuff, &out_msg)?; - assert_eq!(diff_with_messages.processed_upto, diff.processed_upto,); + assert_eq!(diff_with_messages.processed_to, diff.processed_to,); assert_eq!( diff_with_messages @@ -451,3 +456,96 @@ fn create_dump_msg_envelope(message: Lazy) -> Lazy { }) .unwrap() } + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_queue_tail() -> anyhow::Result<()> { + let (storage, _tmp_dir) = prepare_test_storage().await.unwrap(); + + let queue_factory = QueueFactoryStdImpl { + uncommitted_state_factory: UncommittedStateImplFactory { + storage: storage.clone(), + }, + committed_state_factory: CommittedStateImplFactory { storage }, + config: QueueConfig { + gc_interval: Duration::from_secs(1), + }, + }; + + let queue: QueueImpl = + queue_factory.create(); + let block_mc1 = BlockIdShort { + shard: ShardIdent::new_full(-1), + seqno: 0, + }; + + let block_mc2 = BlockIdShort { + shard: ShardIdent::new_full(-1), + seqno: 1, + }; + let mut diff_mc1 = QueueDiffWithMessages::new(); + let mut diff_mc2 = QueueDiffWithMessages::new(); + + let stored_objects = vec![ + create_stored_object( + 1, + "-1:6d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732", + )?, + create_stored_object( + 2, + "-1:6d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732", + )?, + create_stored_object( + 3, + "0:7d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732", + )?, + create_stored_object( + 4, + "-1:6d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732", + )?, + create_stored_object( + 5, + "-1:6d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732", + )?, + ]; + + if let Some(stored_object) = stored_objects.first() { + diff_mc1 + .messages + .insert(stored_object.key(), stored_object.clone()); + } + + for stored_object in &stored_objects { + diff_mc2 + .messages + .insert(stored_object.key(), stored_object.clone()); + } + + let end_key_mc1 = *diff_mc1.messages.iter().last().unwrap().0; + let end_key_mc2 = *diff_mc2.messages.iter().last().unwrap().0; + + // apply two diffs + queue.apply_diff(diff_mc1, block_mc1, &HashBytes::from([1; 32]), end_key_mc1)?; + queue.apply_diff(diff_mc2, block_mc2, &HashBytes::from([2; 32]), end_key_mc2)?; + + let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::new_full(-1)); + + assert_eq!(diff_len_mc, 2); + + // commit first diff + queue.commit_diff(&[(block_mc1, true)])?; + let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::new_full(-1)); + + assert_eq!(diff_len_mc, 2); + + // trim first diff + queue.trim_diffs(&ShardIdent::new_full(-1), &end_key_mc1)?; + let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::new_full(-1)); + assert_eq!(diff_len_mc, 1); + + // clear uncommitted state with second diff + queue.clear_uncommitted_state()?; + let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::new_full(-1)); + assert_eq!(diff_len_mc, 0); + + Ok(()) +} diff --git a/core/src/block_strider/subscriber/gc_subscriber.rs b/core/src/block_strider/subscriber/gc_subscriber.rs index 5435df095..ec84ee810 100644 --- a/core/src/block_strider/subscriber/gc_subscriber.rs +++ b/core/src/block_strider/subscriber/gc_subscriber.rs @@ -14,6 +14,7 @@ use tycho_block_util::block::BlockStuff; use tycho_storage::{BlocksGcType, Storage}; use tycho_util::metrics::HistogramGuard; +use crate::block_strider::subscriber::find_longest_diffs_tail; use crate::block_strider::{ BlockSubscriber, BlockSubscriberContext, StateSubscriber, StateSubscriberContext, }; @@ -254,7 +255,19 @@ impl GcSubscriber { // TODO: Must be in sync with the largest possible archive size (in mc blocks). const MIN_SAFE_DISTANCE: u32 = 100; - let safe_distance = std::cmp::max(safe_distance, MIN_SAFE_DISTANCE); + let tail_len = find_longest_diffs_tail(tick.mc_block_id, &storage).await; + + let tail_len = tail_len.unwrap_or_else(|e| { + tracing::error!(?e, "failed to find longest diffs tail"); + 0 + }) as u32; + + tracing::info!(tail_len, "found longest diffs tail"); + + let safe_distance = [safe_distance, MIN_SAFE_DISTANCE, tail_len + 1] + .into_iter() + .max() + .unwrap(); // Compute the target masterchain block seqno let target_seqno = match tick.mc_block_id.seqno.checked_sub(safe_distance) { @@ -399,6 +412,7 @@ impl GcSubscriber { tracing::info!("starting GC for target seqno: {}", target_seqno); let hist = HistogramGuard::begin("tycho_gc_states_time"); + if let Err(e) = storage .shard_state_storage() .remove_outdated_states(target_seqno) diff --git a/core/src/block_strider/subscriber/mod.rs b/core/src/block_strider/subscriber/mod.rs index 2491e49f1..bb9a4a973 100644 --- a/core/src/block_strider/subscriber/mod.rs +++ b/core/src/block_strider/subscriber/mod.rs @@ -7,6 +7,7 @@ use futures_util::future::{self, BoxFuture}; use tycho_block_util::archive::ArchiveData; use tycho_block_util::block::BlockStuff; use tycho_block_util::state::ShardStateStuff; +use tycho_storage::{BlockHandle, Storage}; pub use self::futures::{OptionHandleFut, OptionPrepareFut}; pub use self::gc_subscriber::{GcSubscriber, ManualGcTrigger}; @@ -420,3 +421,59 @@ pub mod test { } } } + +pub async fn find_longest_diffs_tail(mc_block: BlockId, storage: &Storage) -> Result { + let mc_block_stuff = load_mc_block_stuff(mc_block, storage).await?; + + let shard_block_handles = load_shard_block_handles(&mc_block_stuff, storage).await?; + + let mut max_tail_len = None; + + for block_handle in shard_block_handles { + let block = storage + .block_storage() + .load_block_data(&block_handle) + .await?; + let tail_len = block.block().out_msg_queue_updates.tail_len as usize; + + max_tail_len = Some(max_tail_len.map_or(tail_len, |max: usize| max.max(tail_len))); + } + + let mc_tail_len = mc_block_stuff.block().out_msg_queue_updates.tail_len as usize; + let result_tail_len = max_tail_len.map_or(mc_tail_len, |max: usize| mc_tail_len.max(max)); + + Ok(result_tail_len) +} + +async fn load_mc_block_stuff(mc_seqno: BlockId, storage: &Storage) -> Result { + let mc_handle = storage.block_handle_storage().load_handle(&mc_seqno); + if let Some(mc_handle) = mc_handle { + let mc_block_stuff = storage.block_storage().load_block_data(&mc_handle).await?; + Ok(mc_block_stuff) + } else { + anyhow::bail!("mc block handle not found: {mc_seqno}"); + } +} + +async fn load_shard_block_handles( + mc_block_stuff: &BlockStuff, + storage: &Storage, +) -> Result> { + let block_handles = storage.block_handle_storage(); + let mut shard_block_handles = Vec::new(); + + for entry in mc_block_stuff.load_custom()?.shards.latest_blocks() { + let block_id = entry?; + if block_id.seqno == 0 { + continue; + } + + let Some(block_handle) = block_handles.load_handle(&block_id) else { + anyhow::bail!("top shard block handle not found: {block_id}"); + }; + + shard_block_handles.push(block_handle); + } + + Ok(shard_block_handles) +} diff --git a/core/src/block_strider/subscriber/ps_subscriber.rs b/core/src/block_strider/subscriber/ps_subscriber.rs index 6dd7f89ec..fc29d0edb 100644 --- a/core/src/block_strider/subscriber/ps_subscriber.rs +++ b/core/src/block_strider/subscriber/ps_subscriber.rs @@ -1,11 +1,8 @@ -use std::collections::BTreeMap; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use anyhow::Result; -use everscale_types::models::ShardIdent; use tycho_block_util::block::BlockStuff; -use tycho_block_util::queue::QueueKey; use tycho_block_util::state::RefMcStateHandle; use tycho_storage::{BlockHandle, Storage}; @@ -145,18 +142,6 @@ impl Inner { let mut shard_block_handles = Vec::new(); - // Compute the minimal referenced LT for each shard - let mut min_processed_upto = BTreeMap::new(); - let merge = |block_handle: BlockHandle, - mut processed_upto: BTreeMap| async move { - let queue_diff = blocks.load_queue_diff(&block_handle).await?; - for (&shard, &key) in &queue_diff.as_ref().processed_upto { - let existing = processed_upto.entry(shard).or_insert(key); - *existing = std::cmp::min(*existing, key); - } - Ok::<_, anyhow::Error>(processed_upto) - }; - for entry in mc_block.load_custom()?.shards.latest_blocks() { let block_id = entry?; if block_id.seqno == 0 { @@ -172,32 +157,20 @@ impl Inner { // first, without waiting for other states or queues to be saved. block_handles.set_block_persistent(&block_handle); - min_processed_upto = merge(block_handle.clone(), min_processed_upto).await?; shard_block_handles.push(block_handle); } - min_processed_upto = merge(mc_block_handle.clone(), min_processed_upto).await?; // Store queue state for each shard let mc_seqno = mc_block_handle.id().seqno; for block_handle in shard_block_handles { let block = blocks.load_block_data(&block_handle).await?; - - let min_shard = min_processed_upto - .get(&block.id().shard) - .copied() - .unwrap_or_default(); persistent_states - .store_queue_state(mc_seqno, &block_handle, block, min_shard.lt) + .store_queue_state(mc_seqno, &block_handle, block) .await?; } - // Store queue state for masterchain - let min_mc = min_processed_upto - .get(&ShardIdent::MASTERCHAIN) - .copied() - .unwrap_or_default(); persistent_states - .store_queue_state(mc_seqno, &mc_block_handle, mc_block, min_mc.lt) + .store_queue_state(mc_seqno, &mc_block_handle, mc_block) .await } } diff --git a/scripts/gen-dashboard.py b/scripts/gen-dashboard.py index e95f46718..e3b10da37 100644 --- a/scripts/gen-dashboard.py +++ b/scripts/gen-dashboard.py @@ -1058,7 +1058,12 @@ def collation_metrics() -> RowPanel: create_counter_panel( "tycho_collator_anchor_import_skipped_count", "Number of anchor import skipped", - labels_selectors=['workchain=~"$workchain"'], + labels_selectors=['workchain=~"$workchain"'] + ), + create_gauge_panel( + "tycho_do_collate_block_diff_tail_len", + "Diff tail length", + labels=['workchain=~"$workchain"'], ), ] return create_row("collator: Collation Metrics", metrics) diff --git a/storage/src/db/kv_db/mod.rs b/storage/src/db/kv_db/mod.rs index 095ca3320..4254e0117 100644 --- a/storage/src/db/kv_db/mod.rs +++ b/storage/src/db/kv_db/mod.rs @@ -133,7 +133,7 @@ weedb::tables! { pub temp_cells: tables::TempCells, pub block_connections: tables::BlockConnections, pub shards_internal_messages: tables::ShardsInternalMessages, - pub shards_internal_messages_session: tables::ShardsInternalMessagesSession, + pub shards_internal_messages_uncommitted: tables::ShardsInternalMessagesSession, } } diff --git a/storage/src/store/internal_queue/mod.rs b/storage/src/store/internal_queue/mod.rs index 6e870a25f..de9ac92e3 100644 --- a/storage/src/store/internal_queue/mod.rs +++ b/storage/src/store/internal_queue/mod.rs @@ -81,11 +81,95 @@ impl InternalQueueStorage { .await? } + pub fn delete_messages( + &self, + source_shard: ShardIdent, + from: &QueueKey, + to: &QueueKey, + ) -> Result<()> { + let start_key = ShardsInternalMessagesKey::new(source_shard, *from); + let end_key = ShardsInternalMessagesKey::new(source_shard, *to); + + let shards_internal_messages_cf = self.db.shards_internal_messages.cf(); + + let mut batch = WriteBatch::default(); + + let start_key = start_key.to_vec(); + let end_key = end_key.to_vec(); + + batch.delete_range_cf(&shards_internal_messages_cf, &start_key, &end_key); + batch.delete_cf(&shards_internal_messages_cf, &end_key); + + self.db.rocksdb().write(batch)?; + self.db.rocksdb().compact_range_cf( + &shards_internal_messages_cf, + Some(&start_key), + Some(&end_key), + ); + + Ok(()) + } + + pub fn commit(&self, ranges: FastHashMap) -> Result<()> { + let snapshot = self.snapshot(); + + let mut batch = WriteBatch::default(); + + for range in ranges { + let from = ShardsInternalMessagesKey { + shard_ident: range.0, + internal_message_key: QueueKey::MIN, + }; + let to = ShardsInternalMessagesKey { + shard_ident: range.0, + internal_message_key: range.1, + }; + + let mut readopts = self + .db + .shards_internal_messages_uncommitted + .new_read_config(); + readopts.set_snapshot(&snapshot); + + let internal_messages_uncommitted_cf = + self.db.shards_internal_messages_uncommitted.cf(); + let internal_messages_cf = self.db.shards_internal_messages.cf(); + let mut iter = self + .db + .rocksdb() + .raw_iterator_cf_opt(&internal_messages_uncommitted_cf, readopts); + + iter.seek(from.to_vec().as_slice()); + + while iter.valid() { + let (mut key, value) = match (iter.key(), iter.value()) { + (Some(key), Some(value)) => (key, value), + _ => break, + }; + + let current_position = ShardsInternalMessagesKey::deserialize(&mut key); + + if current_position > to || current_position < from { + break; + } + let current_position_vec = current_position.to_vec(); + batch.delete_cf(&internal_messages_uncommitted_cf, ¤t_position_vec); + batch.put_cf(&internal_messages_cf, ¤t_position_vec, value); + + iter.next(); + } + } + + self.db.rocksdb().write(batch)?; + + Ok(()) + } + pub fn snapshot(&self) -> OwnedSnapshot { self.db.owned_snapshot() } - pub fn build_iterator_persistent(&self, snapshot: &OwnedSnapshot) -> OwnedIterator { + pub fn build_iterator_committed(&self, snapshot: &OwnedSnapshot) -> OwnedIterator { self.build_iterator( self.db.shards_internal_messages.cf(), self.db.shards_internal_messages.new_read_config(), @@ -93,32 +177,21 @@ impl InternalQueueStorage { ) } - pub fn build_iterator_session(&self, snapshot: &OwnedSnapshot) -> OwnedIterator { + pub fn build_iterator_uncommitted(&self, snapshot: &OwnedSnapshot) -> OwnedIterator { self.build_iterator( - self.db.shards_internal_messages_session.cf(), - self.db.shards_internal_messages_session.new_read_config(), + self.db.shards_internal_messages_uncommitted.cf(), + self.db + .shards_internal_messages_uncommitted + .new_read_config(), snapshot, ) } - - fn clear_queue(&self, cf: &BoundedCfHandle<'_>) -> Result<()> { - let start_key = [0x00; ShardsInternalMessagesKey::SIZE_HINT]; - let end_key = [0xFF; ShardsInternalMessagesKey::SIZE_HINT]; - self.db - .rocksdb() - .delete_range_cf(cf, &start_key, &end_key)?; - self.db - .rocksdb() - .compact_range_cf(cf, Some(start_key), Some(end_key)); - Ok(()) - } - - pub fn clear_session_queue(&self) -> Result<()> { - let cf = self.db.shards_internal_messages_session.cf(); + pub fn clear_uncommitted_queue(&self) -> Result<()> { + let cf = self.db.shards_internal_messages_uncommitted.cf(); self.clear_queue(&cf) } - pub fn clear_persistent_queue(&self) -> Result<()> { + pub fn clear_committed_queue(&self) -> Result<()> { let cf = self.db.shards_internal_messages.cf(); self.clear_queue(&cf) } @@ -132,17 +205,29 @@ impl InternalQueueStorage { WriteBatch::default() } - pub fn insert_message_session( + pub fn insert_message_uncommitted( &self, batch: &mut WriteBatch, key: ShardsInternalMessagesKey, dest: &IntAddr, value: &[u8], ) -> Result<()> { - let cf = self.db.shards_internal_messages_session.cf(); + let cf = self.db.shards_internal_messages_uncommitted.cf(); Self::insert_message(batch, cf, key, dest.workchain() as i8, dest.prefix(), value) } + fn clear_queue(&self, cf: &BoundedCfHandle<'_>) -> Result<()> { + let start_key = [0x00; ShardsInternalMessagesKey::SIZE_HINT]; + let end_key = [0xFF; ShardsInternalMessagesKey::SIZE_HINT]; + self.db + .rocksdb() + .delete_range_cf(cf, &start_key, &end_key)?; + self.db + .rocksdb() + .compact_range_cf(cf, Some(start_key), Some(end_key)); + Ok(()) + } + fn build_iterator( &self, cf: BoundedCfHandle<'_>, @@ -181,95 +266,4 @@ impl InternalQueueStorage { Ok(()) } - - pub fn delete_messages( - &self, - source_shard: ShardIdent, - from: &QueueKey, - to: &QueueKey, - ) -> Result<()> { - let start_key = ShardsInternalMessagesKey::new(source_shard, *from); - let end_key = ShardsInternalMessagesKey::new(source_shard, *to); - - let shards_internal_messages_cf = self.db.shards_internal_messages.cf(); - - let mut batch = WriteBatch::default(); - - let start_key = start_key.to_vec(); - let end_key = end_key.to_vec(); - - batch.delete_range_cf(&shards_internal_messages_cf, &start_key, &end_key); - batch.delete_cf(&shards_internal_messages_cf, &end_key); - - self.db.rocksdb().write(batch)?; - self.db.rocksdb().compact_range_cf( - &shards_internal_messages_cf, - Some(&start_key), - Some(&end_key), - ); - - Ok(()) - } - - pub fn commit(&self, ranges: FastHashMap) -> Result<()> { - let snapshot = self.snapshot(); - - let mut batch = WriteBatch::default(); - - for range in ranges { - let from = ShardsInternalMessagesKey { - shard_ident: range.0, - internal_message_key: QueueKey::MIN, - }; - let to = ShardsInternalMessagesKey { - shard_ident: range.0, - internal_message_key: range.1, - }; - - let mut readopts = self.db.shards_internal_messages_session.new_read_config(); - readopts.set_snapshot(&snapshot); - - let internal_messages_session_cf = self.db.shards_internal_messages_session.cf(); - let internal_messages_cf = self.db.shards_internal_messages.cf(); - let mut iter = self - .db - .rocksdb() - .raw_iterator_cf_opt(&internal_messages_session_cf, readopts); - - iter.seek(from.to_vec().as_slice()); - - while iter.valid() { - let (mut key, value) = match (iter.key(), iter.value()) { - (Some(key), Some(value)) => (key, value), - _ => break, - }; - - let current_position = ShardsInternalMessagesKey::deserialize(&mut key); - - if current_position > to || current_position < from { - break; - } - - let dest_workchain = value[0] as i8; - let dest_prefix = u64::from_be_bytes(value[1..9].try_into().unwrap()); - let cell_bytes = &value[9..]; - - batch.delete_cf(&internal_messages_session_cf, current_position.to_vec()); - Self::insert_message( - &mut batch, - internal_messages_cf, - current_position, - dest_workchain, - dest_prefix, - cell_bytes, - )?; - - iter.next(); - } - } - - self.db.rocksdb().write(batch)?; - - Ok(()) - } } diff --git a/storage/src/store/persistent_state/mod.rs b/storage/src/store/persistent_state/mod.rs index 7747fcaaf..260e4a82f 100644 --- a/storage/src/store/persistent_state/mod.rs +++ b/storage/src/store/persistent_state/mod.rs @@ -400,14 +400,12 @@ impl PersistentStateStorage { .await? } - // TODO: Remove `min_lt` and simplify (see https://github.com/broxus/tycho/issues/358) #[tracing::instrument(skip_all, fields(mc_seqno = mc_seqno, block_id = %block.id()))] pub async fn store_queue_state( &self, mc_seqno: u32, handle: &BlockHandle, block: BlockStuff, - min_lt: u64, ) -> Result<()> { if self .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Queue) @@ -425,7 +423,10 @@ impl PersistentStateStorage { let mut top_block_handle = handle.clone(); let mut top_block = block; - loop { + + let mut tail_len = top_block.block().out_msg_queue_updates.tail_len as usize; + + while tail_len > 0 { let queue_diff = this.blocks.load_queue_diff(&top_block_handle).await?; let top_block_info = top_block.load_info()?; @@ -435,10 +436,7 @@ impl PersistentStateStorage { messages.push(queue_diff.zip(&out_messages)); queue_diffs.push(queue_diff.diff().clone()); - // NOTE: Load blocks while their `end_lt` is greater than the lowest required LT - // across all shards. We also must include an additional block before the - // `min_lt` to be able to verify the full range. - if top_block_info.end_lt <= min_lt { + if tail_len == 1 { break; } @@ -447,11 +445,6 @@ impl PersistentStateStorage { PrevBlockRef::AfterMerge { .. } => anyhow::bail!("merge not supported yet"), }; - // Stop on zerostate - if prev_block_id.seqno == 0 { - break; - } - let Some(prev_block_handle) = this.block_handles.load_handle(&prev_block_id) else { anyhow::bail!("prev block handle not found for: {prev_block_id}"); }; @@ -459,6 +452,7 @@ impl PersistentStateStorage { top_block_handle = prev_block_handle; top_block = prev_block; + tail_len -= 1; } let state = QueueStateHeader { diff --git a/storage/src/store/persistent_state/queue_state/reader.rs b/storage/src/store/persistent_state/queue_state/reader.rs index aeac8b5f0..0cae254b5 100644 --- a/storage/src/store/persistent_state/queue_state/reader.rs +++ b/storage/src/store/persistent_state/queue_state/reader.rs @@ -30,7 +30,10 @@ impl<'a> QueueStateReader<'a> { "top queue diff hash mismatch" ); - // TODO: Check `queue_diffs` length (https://github.com/broxus/tycho/issues/358) + anyhow::ensure!( + state.header.queue_diffs.len() == top_update.tail_len as usize, + "queue diffs count mismatch" + ); Ok(Self { state, diff --git a/storage/src/store/persistent_state/tests.rs b/storage/src/store/persistent_state/tests.rs index 0be0ccd35..f3bd95aa5 100644 --- a/storage/src/store/persistent_state/tests.rs +++ b/storage/src/store/persistent_state/tests.rs @@ -241,7 +241,7 @@ async fn persistent_queue_state_read_write() -> Result<()> { let out_msgs = OutMsgDescr::try_from_sorted_slice(&messages)?; let queue_diff = QueueDiffStuff::builder(shard, seqno, &prev_hash) - .with_processed_upto([(shard, 0, &HashBytes::ZERO)]) + .with_processed_to([(shard, 0, &HashBytes::ZERO)]) .with_messages( &QueueKey::max_for_lt(0), &QueueKey::max_for_lt(0), @@ -351,9 +351,16 @@ async fn persistent_queue_state_read_write() -> Result<()> { written }; + let tail_len = blocks + .iter() + .rev() + .map(|block| block.queue_diff.as_ref().clone()) + .len() as u32; + // Read queue queue state from file let top_update = OutMsgQueueUpdates { diff_hash: *blocks.last().unwrap().queue_diff.diff_hash(), + tail_len, }; let mut reader = QueueStateReader::begin_from_mapped(&decompressed, &top_update)?; assert_eq!(reader.state().header, target_header);