Skip to content

Commit

Permalink
refactor(collator): renaming and refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick committed Nov 29, 2024
1 parent 1184237 commit a888612
Show file tree
Hide file tree
Showing 14 changed files with 89 additions and 101 deletions.
43 changes: 22 additions & 21 deletions collator/src/collator/do_collate/finalize.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{BTreeMap, HashMap};
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -98,7 +98,7 @@ impl Phase<FinalizeState> {
)
.with_processed_upto(
diff_with_messages
.processed_upto
.processed_to
.iter()
.map(|(k, v)| (*k, v.lt, &v.hash)),
)
Expand All @@ -109,7 +109,7 @@ impl Phase<FinalizeState> {
)
.serialize();

let processed_upto = diff_with_messages.processed_upto.clone();
let diff_processed_to = diff_with_messages.processed_to.clone();

let queue_diff_hash = *queue_diff.hash();
tracing::debug!(target: tracing_targets::COLLATOR, queue_diff_hash = %queue_diff_hash);
Expand Down Expand Up @@ -146,7 +146,7 @@ impl Phase<FinalizeState> {
has_unprocessed_messages,
diff_messages_len,
create_queue_diff_elapsed,
processed_upto,
processed_to: diff_processed_to,
},
update_queue_task,
))
Expand Down Expand Up @@ -175,13 +175,16 @@ impl Phase<FinalizeState> {
.finalize
.clone();

let mut processed_upto = FastHashMap::default();

for (shard_ident, processed_upto_stuff) in
&self.state.collation_data.processed_upto.internals
{
processed_upto.insert(*shard_ident, processed_upto_stuff.processed_to_msg);
}
let processed_to = self
.state
.collation_data
.processed_upto
.internals
.iter()
.map(|(shard_ident, processed_upto_stuff)| {
(*shard_ident, processed_upto_stuff.processed_to_msg)
})
.collect();

let shard = self.state.collation_data.block_id_short.shard;

Expand Down Expand Up @@ -590,33 +593,31 @@ impl Phase<FinalizeState> {
self.state.collation_data.processed_upto.externals.as_ref(),
);

let mut shards_processed_upto = HashMap::default();
let mut shards_processed_to = FastHashMap::default();

for (shard_id, shard_data) in shards.iter() {
if !shard_data.top_sc_block_updated {
continue;
}

// Extract processed information for updated shards
let processed_upto = self
let shard_processed_to = self
.state
.collation_data
.top_shard_blocks
.iter()
.find(|top_block_info| top_block_info.block_id.shard == *shard_id)
.map(|top_block_info| {
FastHashMap::from(top_block_info.processed_upto.clone())
})
.map(|top_block_info| top_block_info.processed_to.clone())
.or_else(|| {
self.state
.mc_data
.shards_processed_upto
.shards_processed_to
.get(shard_id)
.cloned()
});

if let Some(processed_upto) = processed_upto {
shards_processed_upto.insert(*shard_id, processed_upto);
if let Some(value) = shard_processed_to {
shards_processed_to.insert(*shard_id, value);
}
}

Expand All @@ -639,7 +640,7 @@ impl Phase<FinalizeState> {
processed_upto: self.state.collation_data.processed_upto.clone(),
top_processed_to_anchor,
ref_mc_state_handle: self.state.prev_shard_data.ref_mc_state_handle().clone(),
shards_processed_upto,
shards_processed_to,
}))
}
};
Expand Down Expand Up @@ -671,7 +672,7 @@ impl Phase<FinalizeState> {
|| self.state.mc_data.consensus_info,
|mcd| mcd.consensus_info,
),
processed_upto,
processed_to,
});

let total_elapsed = histogram.finish();
Expand Down
58 changes: 28 additions & 30 deletions collator/src/collator/do_collate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,73 +274,71 @@ impl CollatorStdImpl {

let (executor, mq_iterator_adapter, mq_adapter) = execution_wrapper.destruct();

// state.mc_data.shards.first().unwrap().pr

let (
UpdateQueueDiffResult {
queue_diff,
has_unprocessed_messages,
diff_messages_len,
create_queue_diff_elapsed,
processed_upto,
processed_to,
},
update_queue_task,
) = finalize_phase.update_queue_diff(mq_iterator_adapter, shard_id, mq_adapter.clone())?;

let finalize_block_timer = std::time::Instant::now();

let mut min_processed_upto: Option<QueueKey> = None;
let mut min_processed_to: Option<QueueKey> = None;

// Get current and masterchain processed values
let current_processed_upto = processed_upto.get(&shard_id).cloned();
let mc_processed_upto = mc_data.processed_upto.internals.get(&shard_id).cloned();
let current_processed_to = processed_to.get(&shard_id).cloned();
let mc_processed_to = mc_data.processed_upto.internals.get(&shard_id).cloned();

if shard_id.is_masterchain() {
// Iterate through shards to find the minimum processed value
for shard_processed_upto in mc_data.shards_processed_upto.values() {
if let Some(processed_upto) = shard_processed_upto.get(&shard_id) {
min_processed_upto = match min_processed_upto {
Some(current_min) => Some(current_min.min(*processed_upto)),
None => Some(*processed_upto),
};
// Iterate through shards to find the minimum processed value from shards
for shard_processed_to in mc_data.shards_processed_to.values() {
if let Some(value) = shard_processed_to.get(&shard_id) {
min_processed_to = min_processed_to.map(|current_min| current_min.min(*value));
}
}

// Combine with current and masterchain values
min_processed_upto = [current_processed_upto, min_processed_upto]
min_processed_to = [current_processed_to, min_processed_to]
.into_iter()
.flatten()
.min();
} else {
// Iterate through shards for non-masterchain
for (iter_shard_ident, shard_processed_upto) in &mc_data.shards_processed_upto {
let processed_upto = if iter_shard_ident == &shard_id {
current_processed_upto
// replace for current shard processed upto from current collation
for (iter_shard_ident, shard_processed_to) in &mc_data.shards_processed_to {
let shard_processed_to = if iter_shard_ident == &shard_id {
current_processed_to
} else {
shard_processed_upto.get(&shard_id).cloned()
shard_processed_to.get(&shard_id).cloned()
};

if let Some(processed_upto) = processed_upto {
min_processed_upto = match min_processed_upto {
Some(current_min) => Some(current_min.min(processed_upto)),
None => Some(processed_upto),
// find minimum processed upto value
if let Some(value) = shard_processed_to {
min_processed_to = match min_processed_to {
Some(current_min) => Some(current_min.min(value)),
None => Some(value),
};
}
}

// Combine with masterchain processed value
min_processed_upto = [
min_processed_upto,
mc_processed_upto.map(|mc| mc.processed_to_msg),
min_processed_to = [
min_processed_to,
mc_processed_to.map(|mc| mc.processed_to_msg),
]
.into_iter()
.flatten()
.min();
}

if let Some(min_processed_upto) = min_processed_upto {
mq_adapter.trim_diffs(&shard_id, &min_processed_upto)?;
if let Some(value) = min_processed_to {
mq_adapter.trim_diffs(&shard_id, &value)?;
};
let diff_tail_len = mq_adapter.get_diff_count_by_shard(&shard_id) as u32;
let diff_tail_len = mq_adapter.get_diff_count_by_shard(&shard_id) as u32 + 1;

let span = tracing::Span::current();
let (finalize_phase_result, update_queue_task_result) = rayon::join(
Expand Down Expand Up @@ -900,7 +898,7 @@ impl CollatorStdImpl {
proof_funds,
#[cfg(feature = "block-creator-stats")]
creators,
processed_upto,
processed_to,
} = top_block_descr;

let mut new_shard_descr = Box::new(ShardDescription::from_block_info(
Expand Down Expand Up @@ -960,7 +958,7 @@ impl CollatorStdImpl {

let top_shard_block_info = TopShardBlockInfo {
block_id,
processed_upto,
processed_to,
};

collation_data_builder
Expand Down
2 changes: 1 addition & 1 deletion collator/src/collator/tests/collator_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ fn test_get_anchors_processing_info() {
consensus_info: Default::default(),
top_processed_to_anchor: 0,
ref_mc_state_handle: tracker.insert(0),
shards_processed_upto: Default::default(),
shards_processed_to: Default::default(),
};

//------
Expand Down
2 changes: 1 addition & 1 deletion collator/src/collator/tests/execution_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ fn gen_stub_working_state(
},
top_processed_to_anchor: 0,
ref_mc_state_handle: prev_shard_data.ref_mc_state_handle().clone(),
shards_processed_upto: Default::default(),
shards_processed_to: Default::default(),
}),
collation_config: Arc::new(Default::default()),
wu_used_from_last_anchor: 0,
Expand Down
2 changes: 1 addition & 1 deletion collator/src/collator/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,7 @@ pub struct UpdateQueueDiffResult {
pub has_unprocessed_messages: bool,
pub diff_messages_len: usize,
pub create_queue_diff_elapsed: Duration,
pub processed_upto: BTreeMap<ShardIdent, QueueKey>,
pub processed_to: BTreeMap<ShardIdent, QueueKey>,
}

pub struct FinalizedCollationResult {
Expand Down
4 changes: 2 additions & 2 deletions collator/src/internal_queue/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl<V: InternalMessageValue> QueueIterator<V> for QueueIteratorImpl<V> {
// fill processed_upto
for (shard_id, message_key) in self.last_processed_message.iter() {
// TODO: may be `diff.processed_upto` should be a HashMap and we can consume it from iterator
diff.processed_upto.insert(*shard_id, *message_key);
diff.processed_to.insert(*shard_id, *message_key);
}

// move new messages
Expand Down Expand Up @@ -162,7 +162,7 @@ impl<V: InternalMessageValue> QueueIterator<V> for QueueIteratorImpl<V> {

for (shard_id, message_key) in self.last_processed_message.iter() {
// TODO: may be `diff.processed_upto` should be a HashMap and we can consume it from iterator
diff.processed_upto.insert(*shard_id, *message_key);
diff.processed_to.insert(*shard_id, *message_key);
}

diff.messages = self.new_messages.clone();
Expand Down
18 changes: 9 additions & 9 deletions collator/src/internal_queue/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl<V: InternalMessageValue> QueueFactory<V> for QueueFactoryStdImpl {
}

struct ShortQueueDiff {
pub processed_upto: BTreeMap<ShardIdent, QueueKey>,
pub processed_to: BTreeMap<ShardIdent, QueueKey>,
pub end_key: QueueKey,
pub hash: HashBytes,
}
Expand All @@ -131,7 +131,6 @@ where
{
session_state: Arc<S>,
persistent_state: Arc<P>,
// diffs: FastDashMap<ShardIdent, BTreeMap<u32, ShortQueueDiff>>,
session_diffs: FastDashMap<ShardIdent, BTreeMap<u32, ShortQueueDiff>>,
persistent_diffs: FastDashMap<ShardIdent, BTreeMap<u32, ShortQueueDiff>>,
gc: GcManager,
Expand Down Expand Up @@ -206,7 +205,7 @@ where
}

let short_diff = ShortQueueDiff {
processed_upto: diff.processed_upto,
processed_to: diff.processed_to,
end_key,
hash: *hash,
};
Expand Down Expand Up @@ -238,15 +237,15 @@ where
*current_last_key = shard_diff.end_key;
}

// find min processed_upto for each shard for GC
// find min processed_to for each shard for GC
if *block_seqno == block_id_short.seqno && *top_shard_block_changed {
for processed_upto in shard_diff.processed_upto.iter() {
for (shard_ident, processed_to_key) in shard_diff.processed_to.iter() {
let last_key = gc_ranges
.entry(*processed_upto.0)
.or_insert_with(|| *processed_upto.1);
.entry(*shard_ident)
.or_insert_with(|| *processed_to_key);

if processed_upto.1 < last_key {
*last_key = *processed_upto.1;
if processed_to_key < last_key {
*last_key = *processed_to_key;
}
}
}
Expand Down Expand Up @@ -295,6 +294,7 @@ where
.persistent_diffs
.get(shard_ident)
.map_or(0, |diffs| diffs.len());

session_count + persistent_count
}

Expand Down
5 changes: 1 addition & 4 deletions collator/src/internal_queue/state/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,7 @@ impl<V: InternalMessageValue> SessionState<V> for SessionStateStdImpl {
}

fn commit_messages(&self, ranges: &FastHashMap<ShardIdent, QueueKey>) -> Result<()> {
let ranges = ranges
.iter()
.map(|(shard, key)| (*shard, *key))
.collect::<FastHashMap<_, _>>();
let ranges = ranges.iter().map(|(shard, key)| (*shard, *key)).collect();
self.storage.internal_queue_storage().commit(ranges)
}

Expand Down
8 changes: 4 additions & 4 deletions collator/src/internal_queue/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ use super::state::state_iterator::MessageExt;
#[derive(Default, Debug, Clone)]
pub struct QueueDiffWithMessages<V: InternalMessageValue> {
pub messages: BTreeMap<QueueKey, Arc<V>>,
pub processed_upto: BTreeMap<ShardIdent, QueueKey>,
pub processed_to: BTreeMap<ShardIdent, QueueKey>,
}

impl<V: InternalMessageValue> QueueDiffWithMessages<V> {
pub fn new() -> Self {
Self {
messages: BTreeMap::new(),
processed_upto: BTreeMap::new(),
processed_to: BTreeMap::new(),
}
}
}
Expand All @@ -31,7 +31,7 @@ impl QueueDiffWithMessages<EnqueuedMessage> {
out_msg_description: &OutMsgDescr,
) -> Result<Self> {
let QueueDiff { processed_upto, .. } = queue_diff_stuff.as_ref();
let processed_upto: BTreeMap<ShardIdent, QueueKey> = processed_upto
let processed_to: BTreeMap<ShardIdent, QueueKey> = processed_upto
.iter()
.map(|(shard_ident, key)| (*shard_ident, *key))
.collect();
Expand All @@ -51,7 +51,7 @@ impl QueueDiffWithMessages<EnqueuedMessage> {

Ok(Self {
messages,
processed_upto,
processed_to,
})
}
}
Expand Down
Loading

0 comments on commit a888612

Please sign in to comment.