Skip to content

Commit

Permalink
feat(collator): queue diff sync
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick committed Nov 29, 2024
1 parent a6b115b commit c1b9a26
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 74 deletions.
7 changes: 4 additions & 3 deletions collator/src/collator/do_collate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ use crate::collator::types::{
use crate::internal_queue::types::EnqueuedMessage;
use crate::queue_adapter::MessageQueueAdapter;
use crate::tracing_targets;
use crate::types::{BlockCollationResult, BlockIdExt, CollationSessionInfo, CollatorConfig, DisplayBlockIdsIntoIter, DisplayBlockIdsIter, McData, TopBlockDescription, TopShardBlockInfo};
use crate::types::{
BlockCollationResult, BlockIdExt, CollationSessionInfo, CollatorConfig,
DisplayBlockIdsIntoIter, DisplayBlockIdsIter, McData, TopBlockDescription, TopShardBlockInfo,
};

#[cfg(test)]
#[path = "../tests/do_collate_tests.rs"]
Expand Down Expand Up @@ -1227,8 +1230,6 @@ impl CollatorStdImpl {
.increment(collation_data.execute_count_new_int);
metrics::gauge!("tycho_do_collate_block_seqno", &labels)
.set(collation_data.block_id_short.seqno);
metrics::gauge!("tycho_do_collate_block_seqno", &labels)
.set(collation_data.block_id_short.seqno);
metrics::gauge!("tycho_do_collate_block_diff_tail_len", &labels)
.set(collation_data.diff_tail_len);
}
Expand Down
63 changes: 46 additions & 17 deletions collator/src/internal_queue/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ impl<V: InternalMessageValue> QueueFactory<V> for QueueFactoryStdImpl {
QueueImpl {
session_state: Arc::new(session_state),
persistent_state,
diffs: Default::default(),
session_diffs: Default::default(),
persistent_diffs: Default::default(),
gc,
_phantom_data: Default::default(),
}
Expand All @@ -121,6 +122,7 @@ struct ShortQueueDiff {
pub end_key: QueueKey,
pub hash: HashBytes,
}

pub struct QueueImpl<S, P, V>
where
S: SessionState<V>,
Expand All @@ -129,7 +131,9 @@ where
{
session_state: Arc<S>,
persistent_state: Arc<P>,
diffs: FastDashMap<ShardIdent, BTreeMap<u32, ShortQueueDiff>>,
// diffs: FastDashMap<ShardIdent, BTreeMap<u32, ShortQueueDiff>>,
session_diffs: FastDashMap<ShardIdent, BTreeMap<u32, ShortQueueDiff>>,
persistent_diffs: FastDashMap<ShardIdent, BTreeMap<u32, ShortQueueDiff>>,
gc: GcManager,
_phantom_data: PhantomData<V>,
}
Expand Down Expand Up @@ -162,7 +166,7 @@ where
end_key: QueueKey,
) -> Result<()> {
// Get or insert the shard diffs for the given block_id_short.shard
let mut shard_diffs = self.diffs.entry(block_id_short.shard).or_default();
let mut shard_diffs = self.session_diffs.entry(block_id_short.shard).or_default();

// Check for duplicate diffs based on the block_id_short.seqno and hash
let shard_diff = shard_diffs.get(&block_id_short.seqno);
Expand All @@ -177,7 +181,7 @@ where
}
}

let last_applied_seqno = shard_diffs.last_key_value().map(|(key, _)| *key);
let last_applied_seqno = shard_diffs.keys().next_back().cloned();

if let Some(last_applied_seqno) = last_applied_seqno {
// Check if the diff is already applied
Expand Down Expand Up @@ -214,18 +218,17 @@ where
}

fn commit_diff(&self, mc_top_blocks: &[(BlockIdShort, bool)]) -> Result<()> {
let mut diffs_for_commit = vec![];
let mut shards_to_commit = FastHashMap::default();
let mut gc_ranges = FastHashMap::default();

for (block_id_short, top_shard_block_changed) in mc_top_blocks {
let prev_shard_diffs = self.diffs.get_mut(&block_id_short.shard);
let mut diffs_to_commit = vec![];

if let Some(shard_diffs) = prev_shard_diffs {
shard_diffs
.range(..=block_id_short.seqno)
.for_each(|(block_seqno, shard_diff)| {
diffs_for_commit.push(*block_id_short);
let prev_shard_session_diffs = self.session_diffs.get_mut(&block_id_short.shard);
if let Some(mut shard_session_diffs) = prev_shard_session_diffs {
shard_session_diffs.range(..=block_id_short.seqno).for_each(
|(block_seqno, shard_diff)| {
diffs_to_commit.push(*block_seqno);

let current_last_key = shards_to_commit
.entry(block_id_short.shard)
Expand All @@ -247,13 +250,27 @@ where
}
}
}
});
},
);

for seqno in diffs_to_commit {
if let Some(diff) = shard_session_diffs.remove(&seqno) {
// Move the diff to persistent_diffs
let mut shard_persistent_diffs = self
.persistent_diffs
.entry(block_id_short.shard)
.or_default();
shard_persistent_diffs.insert(seqno, diff);
}
}
}
}

self.session_state.commit_messages(&shards_to_commit)?;

let uncommitted_diffs_count: usize = self.diffs.iter().map(|r| r.value().len()).sum();
let uncommitted_diffs_count: usize =
self.session_diffs.iter().map(|r| r.value().len()).sum();

metrics::counter!("tycho_internal_queue_uncommitted_diffs_count")
.increment(uncommitted_diffs_count as u64);

Expand All @@ -265,17 +282,29 @@ where
}

fn clear_session_state(&self) -> Result<()> {
self.session_diffs.clear();
self.session_state.truncate()
}

fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize {
self.diffs.get(shard_ident).map_or(0, |diffs| diffs.len())
let session_count = self
.session_diffs
.get(shard_ident)
.map_or(0, |diffs| diffs.len());
let persistent_count = self
.persistent_diffs
.get(shard_ident)
.map_or(0, |diffs| diffs.len());
session_count + persistent_count
}

fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()> {
let shard_diffs = self.diffs.get_mut(source_shard);

if let Some(mut shard_diffs) = shard_diffs {
if let Some(mut shard_diffs) = self.session_diffs.get_mut(source_shard) {
shard_diffs
.value_mut()
.retain(|_, diff| &diff.end_key > inclusive_until);
}
if let Some(mut shard_diffs) = self.persistent_diffs.get_mut(source_shard) {
shard_diffs
.value_mut()
.retain(|_, diff| &diff.end_key > inclusive_until);
Expand Down
10 changes: 3 additions & 7 deletions collator/src/manager/blocks_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,10 @@ impl BlocksCache {
Ok(consensus_info)
}

pub fn reset_top_shard_blocks_additional_info(&self) -> Result<()> {
pub fn reset_top_shard_blocks_additional_info(&self) {
for mut shard_cache in self.shards.iter_mut() {
shard_cache.data.reset_top_shard_block_additional_info()?;
shard_cache.data.reset_top_shard_block_additional_info();
}
Ok(())
}

/// Find shard block in cache and then get containing master block id if link exists
Expand Down Expand Up @@ -874,7 +873,6 @@ impl BlocksCacheData for MasterBlocksCacheData {
type NewCollated = ();
type NewReceived = ();

// TODO !!! remove result
fn on_update_collated(&mut self, candidate: &BlockCandidate) -> Result<()> {
self.update_last_collated_block_id(candidate.block.id());
Ok(())
Expand Down Expand Up @@ -921,14 +919,12 @@ impl ShardBlocksCacheData {
Ok(())
}

fn reset_top_shard_block_additional_info(&mut self) -> Result<()> {
fn reset_top_shard_block_additional_info(&mut self) {
self.value_flow = Default::default();
self.proof_funds = Default::default();

#[cfg(feature = "block-creator-stats")]
self.creators.clear();

Ok(())
}
}

Expand Down
2 changes: 1 addition & 1 deletion collator/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1364,7 +1364,7 @@ where

// reset top shard blocks info
// because next we will start to collate new shard blocks after the sync
self.blocks_cache.reset_top_shard_blocks_additional_info()?;
self.blocks_cache.reset_top_shard_blocks_additional_info();

let mc_data = McData::load_from_state(state, processed_to_by_shards)?;

Expand Down
31 changes: 2 additions & 29 deletions core/src/block_strider/subscriber/ps_subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

use anyhow::Result;
use everscale_types::models::ShardIdent;
use tycho_block_util::block::BlockStuff;
use tycho_block_util::queue::QueueKey;
use tycho_block_util::state::RefMcStateHandle;
use tycho_storage::{BlockHandle, Storage};

Expand Down Expand Up @@ -145,18 +142,6 @@ impl Inner {

let mut shard_block_handles = Vec::new();

// Compute the minimal referenced LT for each shard
let mut min_processed_upto = BTreeMap::new();
let merge = |block_handle: BlockHandle,
mut processed_upto: BTreeMap<ShardIdent, QueueKey>| async move {
let queue_diff = blocks.load_queue_diff(&block_handle).await?;
for (&shard, &key) in &queue_diff.as_ref().processed_upto {
let existing = processed_upto.entry(shard).or_insert(key);
*existing = std::cmp::min(*existing, key);
}
Ok::<_, anyhow::Error>(processed_upto)
};

for entry in mc_block.load_custom()?.shards.latest_blocks() {
let block_id = entry?;
if block_id.seqno == 0 {
Expand All @@ -172,32 +157,20 @@ impl Inner {
// first, without waiting for other states or queues to be saved.
block_handles.set_block_persistent(&block_handle);

min_processed_upto = merge(block_handle.clone(), min_processed_upto).await?;
shard_block_handles.push(block_handle);
}
min_processed_upto = merge(mc_block_handle.clone(), min_processed_upto).await?;

// Store queue state for each shard
let mc_seqno = mc_block_handle.id().seqno;
for block_handle in shard_block_handles {
let block = blocks.load_block_data(&block_handle).await?;

let min_shard = min_processed_upto
.get(&block.id().shard)
.copied()
.unwrap_or_default();
persistent_states
.store_queue_state(mc_seqno, &block_handle, block, min_shard.lt)
.store_queue_state(mc_seqno, &block_handle, block)
.await?;
}

// Store queue state for masterchain
let min_mc = min_processed_upto
.get(&ShardIdent::MASTERCHAIN)
.copied()
.unwrap_or_default();
persistent_states
.store_queue_state(mc_seqno, &mc_block_handle, mc_block, min_mc.lt)
.store_queue_state(mc_seqno, &mc_block_handle, mc_block)
.await
}
}
20 changes: 5 additions & 15 deletions storage/src/store/persistent_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,14 +400,12 @@ impl PersistentStateStorage {
.await?
}

// TODO: Remove `min_lt` and simplify (see https://github.com/broxus/tycho/issues/358)
#[tracing::instrument(skip_all, fields(mc_seqno = mc_seqno, block_id = %block.id()))]
pub async fn store_queue_state(
&self,
mc_seqno: u32,
handle: &BlockHandle,
block: BlockStuff,
min_lt: u64,
) -> Result<()> {
if self
.try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Queue)
Expand All @@ -425,7 +423,10 @@ impl PersistentStateStorage {

let mut top_block_handle = handle.clone();
let mut top_block = block;
loop {

let mut tail_len = top_block.block().out_msg_queue_updates.tail_len as usize;

while tail_len > 0 {
let queue_diff = this.blocks.load_queue_diff(&top_block_handle).await?;
let top_block_info = top_block.load_info()?;

Expand All @@ -435,30 +436,19 @@ impl PersistentStateStorage {
messages.push(queue_diff.zip(&out_messages));
queue_diffs.push(queue_diff.diff().clone());

// NOTE: Load blocks while their `end_lt` is greater than the lowest required LT
// across all shards. We also must include an additional block before the
// `min_lt` to be able to verify the full range.
if top_block_info.end_lt <= min_lt {
break;
}

let prev_block_id = match top_block_info.load_prev_ref()? {
PrevBlockRef::Single(block_ref) => block_ref.as_block_id(shard_ident),
PrevBlockRef::AfterMerge { .. } => anyhow::bail!("merge not supported yet"),
};

// Stop on zerostate
if prev_block_id.seqno == 0 {
break;
}

let Some(prev_block_handle) = this.block_handles.load_handle(&prev_block_id) else {
anyhow::bail!("prev block handle not found for: {prev_block_id}");
};
let prev_block = this.blocks.load_block_data(&prev_block_handle).await?;

top_block_handle = prev_block_handle;
top_block = prev_block;
tail_len -= 1;
}

let state = QueueStateHeader {
Expand Down
5 changes: 4 additions & 1 deletion storage/src/store/persistent_state/queue_state/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ impl<'a> QueueStateReader<'a> {
"top queue diff hash mismatch"
);

// TODO: Check `queue_diffs` length (https://github.com/broxus/tycho/issues/358)
anyhow::ensure!(
state.header.queue_diffs.len() == top_update.tail_len as usize,
"queue diffs count mismatch"
);

Ok(Self {
state,
Expand Down
8 changes: 7 additions & 1 deletion storage/src/store/persistent_state/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,16 @@ async fn persistent_queue_state_read_write() -> Result<()> {
written
};

let tail_len = blocks
.iter()
.rev()
.map(|block| block.queue_diff.as_ref().clone())
.len() as u32;

// Read queue queue state from file
let top_update = OutMsgQueueUpdates {
diff_hash: *blocks.last().unwrap().queue_diff.diff_hash(),
tail_len: 0,
tail_len,
};
let mut reader = QueueStateReader::begin_from_mapped(&decompressed, &top_update)?;
assert_eq!(reader.state().header, target_header);
Expand Down

0 comments on commit c1b9a26

Please sign in to comment.