Skip to content

Commit

Permalink
refactor(collator): refactor diffs tail and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick committed Dec 9, 2024
1 parent 964a1a7 commit 4a93855
Show file tree
Hide file tree
Showing 16 changed files with 307 additions and 126 deletions.
2 changes: 1 addition & 1 deletion block-util/src/block/block_stuff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl BlockStuff {
pub const BOOT_OFFSET: Duration = Duration::from_secs(12 * 3600);

pub fn compute_is_persistent(block_utime: u32, prev_utime: u32) -> bool {
block_utime >> 17 != prev_utime >> 17
block_utime >> 10 != prev_utime >> 10
}

pub fn can_use_for_boot(block_utime: u32, now_utime: u32) -> bool {
Expand Down
1 change: 0 additions & 1 deletion block-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@ pub mod config;
pub mod dict;
pub mod queue;
pub mod state;

pub mod tl;
15 changes: 8 additions & 7 deletions block-util/src/queue/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ pub struct QueueDiff {
/// Seqno of the corresponding block.
pub seqno: u32,
/// collator boundaries.
pub processed_upto: BTreeMap<ShardIdent, QueueKey>,
// TODO: should rename field in `proto.tl` on network reset
pub processed_to: BTreeMap<ShardIdent, QueueKey>,
/// Min message queue key.
pub min_message: QueueKey,
/// Max message queue key.
Expand Down Expand Up @@ -55,7 +56,7 @@ impl TlWrite for QueueDiff {
4 + tl::hash_bytes::SIZE_HINT
+ tl::shard_ident::SIZE_HINT
+ 4
+ processed_upto_map::size_hint(&self.processed_upto)
+ processed_to_map::size_hint(&self.processed_to)
+ 2 * QueueKey::SIZE_HINT
+ messages_list::size_hint(&self.messages)
}
Expand All @@ -68,7 +69,7 @@ impl TlWrite for QueueDiff {
tl::hash_bytes::write(&self.prev_hash, packet);
tl::shard_ident::write(&self.shard_ident, packet);
packet.write_u32(self.seqno);
processed_upto_map::write(&self.processed_upto, packet);
processed_to_map::write(&self.processed_to, packet);
self.min_message.write_to(packet);
self.max_message.write_to(packet);
messages_list::write(&self.messages, packet);
Expand All @@ -90,7 +91,7 @@ impl<'tl> TlRead<'tl> for QueueDiff {
prev_hash: tl::hash_bytes::read(data, offset)?,
shard_ident: tl::shard_ident::read(data, offset)?,
seqno: u32::read_from(data, offset)?,
processed_upto: processed_upto_map::read(data, offset)?,
processed_to: processed_to_map::read(data, offset)?,
min_message: QueueKey::read_from(data, offset)?,
max_message: QueueKey::read_from(data, offset)?,
messages: messages_list::read(data, offset)?,
Expand Down Expand Up @@ -205,7 +206,7 @@ impl std::fmt::Display for QueueKey {
}
}

mod processed_upto_map {
mod processed_to_map {
use tl_proto::{TlPacket, TlResult};

use super::*;
Expand Down Expand Up @@ -445,7 +446,7 @@ mod tests {
prev_hash: HashBytes::from([0x33; 32]),
shard_ident: ShardIdent::MASTERCHAIN,
seqno: 123,
processed_upto: BTreeMap::from([
processed_to: BTreeMap::from([
(ShardIdent::MASTERCHAIN, QueueKey {
lt: 1,
hash: HashBytes::from([0x11; 32]),
Expand Down Expand Up @@ -491,7 +492,7 @@ mod tests {
prev_hash,
shard_ident: ShardIdent::MASTERCHAIN,
seqno,
processed_upto: BTreeMap::from([
processed_to: BTreeMap::from([
(ShardIdent::MASTERCHAIN, QueueKey {
lt: 10 * seqno as u64,
hash: HashBytes::from([seqno as u8; 32]),
Expand Down
16 changes: 12 additions & 4 deletions block-util/src/queue/queue_diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl QueueDiffStuffBuilder {
where
I: IntoIterator<Item = (ShardIdent, u64, &'a HashBytes)>,
{
self.inner_mut().diff.processed_upto = processed_to
self.inner_mut().diff.processed_to = processed_to
.into_iter()
.map(|(shard_ident, lt, hash)| (shard_ident, QueueKey { lt, hash: *hash }))
.collect();
Expand Down Expand Up @@ -79,6 +79,14 @@ impl SerializedQueueDiff {
&self.inner.diff.hash
}

pub fn processed_to(&self) -> impl Iterator<Item = (ShardIdent, &QueueKey)> {
self.inner
.diff
.processed_to
.iter()
.map(|(shard_ident, key)| (*shard_ident, key))
}

fn inner_mut(&mut self) -> &mut Inner {
Arc::get_mut(&mut self.inner).expect("inner is not shared")
}
Expand All @@ -102,7 +110,7 @@ impl QueueDiffStuff {
prev_hash: HashBytes::ZERO,
shard_ident: block_id.shard,
seqno: block_id.seqno,
processed_upto: BTreeMap::from([(block_id.shard, QueueKey::MIN)]),
processed_to: BTreeMap::from([(block_id.shard, QueueKey::MIN)]),
min_message: QueueKey::MIN,
max_message: QueueKey::MIN,
messages: Vec::new(),
Expand Down Expand Up @@ -134,7 +142,7 @@ impl QueueDiffStuff {
prev_hash: *prev_hash,
shard_ident,
seqno,
processed_upto: Default::default(),
processed_to: Default::default(),
min_message: Default::default(),
max_message: Default::default(),
messages: Default::default(),
Expand Down Expand Up @@ -361,7 +369,7 @@ mod tests {
prev_hash: HashBytes::ZERO,
shard_ident: ShardIdent::BASECHAIN,
seqno: 1,
processed_upto: Default::default(),
processed_to: Default::default(),
min_message: QueueKey {
lt: 0,
hash: message_hashes[0],
Expand Down
26 changes: 9 additions & 17 deletions collator/src/collator/do_collate/finalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,7 @@ impl Phase<FinalizeState> {
.finalize
.clone();

let processed_to = self
.state
.collation_data
.processed_upto
.internals
.iter()
.map(|(shard_ident, processed_upto_stuff)| {
(*shard_ident, processed_upto_stuff.processed_to_msg)
})
.collect();

let processed_to = queue_diff.processed_to().map(|(k, v)| (k, *v)).collect();
let shard = self.state.collation_data.block_id_short.shard;

let labels = &[("workchain", shard.workchain().to_string())];
Expand Down Expand Up @@ -595,11 +585,7 @@ impl Phase<FinalizeState> {

let mut shards_processed_to = FastHashMap::default();

for (shard_id, shard_data) in shards.iter() {
if !shard_data.top_sc_block_updated {
continue;
}

for (shard_id, _) in shards.iter() {
// Extract processed information for updated shards
let shard_processed_to = self
.state
Expand Down Expand Up @@ -653,7 +639,13 @@ impl Phase<FinalizeState> {
block: new_block,
is_key_block: new_block_info.key_block,
prev_blocks_ids: self.state.prev_shard_data.blocks_ids().clone(),
top_shard_blocks: self.state.collation_data.top_shard_blocks.clone(),
top_shard_blocks_ids: self
.state
.collation_data
.top_shard_blocks
.iter()
.map(|b| b.block_id)
.collect(),
collated_data,
collated_file_hash: HashBytes::ZERO,
chain_time: self.state.collation_data.get_gen_chain_time(),
Expand Down
129 changes: 83 additions & 46 deletions collator/src/collator/do_collate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ use crate::queue_adapter::MessageQueueAdapter;
use crate::tracing_targets;
use crate::types::{
BlockCollationResult, BlockIdExt, CollationSessionInfo, CollatorConfig,
DisplayBlockIdsIntoIter, DisplayBlockIdsIter, McData, TopBlockDescription, TopShardBlockInfo,
DisplayBlockIdsIntoIter, DisplayBlockIdsIter, McData, ProcessedTo, ShardDescriptionShort,
TopBlockDescription, TopShardBlockInfo,
};

#[cfg(test)]
Expand Down Expand Up @@ -287,53 +288,21 @@ impl CollatorStdImpl {

let finalize_block_timer = std::time::Instant::now();

let mut min_processed_to: Option<QueueKey> = None;

// Get current and masterchain processed values
let current_processed_to = processed_to.get(&shard_id).cloned();
let mc_processed_to = mc_data.processed_upto.internals.get(&shard_id).cloned();

if shard_id.is_masterchain() {
// Iterate through shards to find the minimum processed value from shards
for shard_processed_to in mc_data.shards_processed_to.values() {
if let Some(value) = shard_processed_to.get(&shard_id) {
min_processed_to = min_processed_to.map(|current_min| current_min.min(*value));
}
}

// Combine with current and masterchain values
min_processed_to = [current_processed_to, min_processed_to]
.into_iter()
.flatten()
.min();
} else {
// Iterate through shards for non-masterchain
// replace for current shard processed upto from current collation
for (iter_shard_ident, shard_processed_to) in &mc_data.shards_processed_to {
let shard_processed_to = if iter_shard_ident == &shard_id {
current_processed_to
} else {
shard_processed_to.get(&shard_id).cloned()
};

// find minimum processed upto value
if let Some(value) = shard_processed_to {
min_processed_to = match min_processed_to {
Some(current_min) => Some(current_min.min(value)),
None => Some(value),
};
}
}

// Combine with masterchain processed value
min_processed_to = [
min_processed_to,
mc_processed_to.map(|mc| mc.processed_to_msg),
]
.into_iter()
.flatten()
.min();
}
let mc_processed_to = mc_data
.processed_upto
.internals
.get(&shard_id)
.map(|mc| mc.processed_to_msg);

let min_processed_to = calculate_min_processed_to(
&shard_id,
current_processed_to,
mc_processed_to,
&mc_data.shards,
&mc_data.shards_processed_to,
);

if let Some(value) = min_processed_to {
mq_adapter.trim_diffs(&shard_id, &value)?;
Expand Down Expand Up @@ -1372,3 +1341,71 @@ impl CollatorStdImpl {
);
}
}

fn calculate_min_processed_to(
shard_id: &ShardIdent,
current_processed_to: Option<QueueKey>,
mc_processed_to: Option<QueueKey>,
mc_data_shards: &Vec<(ShardIdent, ShardDescriptionShort)>,
mc_data_shards_processed_to: &FastHashMap<ShardIdent, ProcessedTo>,
) -> Option<QueueKey> {
fn find_min_processed_to(
shards: &Vec<(ShardIdent, ShardDescriptionShort)>,
shards_processed_to: &FastHashMap<ShardIdent, ProcessedTo>,
shard_id: &ShardIdent,
min_processed_to: &mut Option<QueueKey>,
skip_condition: impl Fn(&ShardIdent) -> bool,
) {
// Iterate through shards with updated top shard blocks and find min processed_to
for (shard, descr) in shards {
if skip_condition(shard) {
continue;
}

if descr.top_sc_block_updated {
if let Some(value) = shards_processed_to.get(shard) {
if let Some(v) = value.get(shard_id) {
*min_processed_to = match *min_processed_to {
Some(current_min) => Some(current_min.min(*v)),
None => Some(*v),
};
}
}
}
}
}

let mut min_processed_to: Option<QueueKey> = None;

if shard_id.is_masterchain() {
find_min_processed_to(
mc_data_shards,
mc_data_shards_processed_to,
shard_id,
&mut min_processed_to,
|_| false,
);

// Combine with current and masterchain values
min_processed_to = [current_processed_to, min_processed_to]
.into_iter()
.flatten()
.min();
} else {
find_min_processed_to(
mc_data_shards,
mc_data_shards_processed_to,
shard_id,
&mut min_processed_to,
|shard| shard == shard_id || shard.is_masterchain(),
);

// Combine with current and masterchain values and shard values
min_processed_to = [current_processed_to, min_processed_to, mc_processed_to]
.into_iter()
.flatten()
.min();
}

min_processed_to
}
Loading

0 comments on commit 4a93855

Please sign in to comment.