Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(collator): fix collators lags and panics after sync #494

Merged
merged 4 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions collator/src/collator/do_collate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use super::types::{
AnchorsCache, BlockCollationDataBuilder, CollationResult, ExecuteResult, FinalizedBlock,
FinalizedCollationResult, ParsedExternals, PrevData, ReadNextExternalsMode, WorkingState,
};
use super::CollatorStdImpl;
use super::{CollatorStdImpl, ForceMasterCollation};
use crate::collator::types::{
AnchorInfo, BlockCollationData, FinalResult, ParsedMessage, ShardDescriptionExt,
UpdateQueueDiffResult,
Expand All @@ -45,6 +45,7 @@ mod phase;
mod prepare;

impl CollatorStdImpl {
/// [`force_next_mc_block`] - should force next master block collation after this block
#[tracing::instrument(
parent = None,
skip_all,
Expand All @@ -57,6 +58,7 @@ impl CollatorStdImpl {
&mut self,
working_state: Box<WorkingState>,
top_shard_blocks_info: Option<Vec<TopBlockDescription>>,
force_next_mc_block: ForceMasterCollation,
) -> Result<()> {
let labels: [(&str, String); 1] = [("workchain", self.shard_id.workchain().to_string())];
let total_collation_histogram =
Expand Down Expand Up @@ -161,7 +163,12 @@ impl CollatorStdImpl {
let FinalizedCollationResult {
handle_block_candidate_elapsed,
} = self
.finalize_collation(final_result.has_unprocessed_messages, finalized, tracker)
.finalize_collation(
final_result.has_unprocessed_messages,
finalized,
tracker,
force_next_mc_block,
)
.await?;

let total_elapsed = total_collation_histogram.finish();
Expand Down Expand Up @@ -1038,6 +1045,7 @@ impl CollatorStdImpl {
has_unprocessed_messages: bool,
finalized: FinalizedBlock,
tracker: MinRefMcStateTracker,
force_next_mc_block: ForceMasterCollation,
) -> Result<FinalizedCollationResult> {
let labels = [("workchain", self.shard_id.workchain().to_string())];

Expand Down Expand Up @@ -1088,6 +1096,7 @@ impl CollatorStdImpl {
prev_mc_block_id: finalized.old_mc_data.block_id,
mc_data: finalized.mc_data.clone(),
collation_config: collation_config.clone(),
force_next_mc_block,
})
.await?;

Expand Down
31 changes: 25 additions & 6 deletions collator/src/collator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ impl CollatorStdImpl {
));
} else {
// last processed_to anchor in shard can be before last procssed in master
// it is normal, so we should not cancel collation buy we unable to import init anchors
// it is normal, so we should not cancel collation but we unable to import init anchors
false
}
} else {
Expand Down Expand Up @@ -1035,7 +1035,9 @@ impl CollatorStdImpl {

// do not import anchor if mempool may be paused
// needs to process more anchors in collator first
if prev_anchor_id.saturating_sub(top_processed_to_anchor) > max_consensus_lag_rounds / 2 {
if prev_anchor_id.saturating_sub(top_processed_to_anchor)
> max_consensus_lag_rounds.saturating_mul(2).saturating_div(3)
{
metrics::counter!("tycho_collator_anchor_import_skipped_count", &labels).increment(1);
return Ok(ImportNextAnchor::Skipped);
}
Expand Down Expand Up @@ -1413,8 +1415,12 @@ impl CollatorStdImpl {
}
}

self.do_collate(working_state, Some(top_shard_blocks_info))
.await
self.do_collate(
working_state,
Some(top_shard_blocks_info),
ForceMasterCollation::No,
)
.await
}

/// Run collation if there are internals,
Expand Down Expand Up @@ -1692,6 +1698,9 @@ impl CollatorStdImpl {
last_imported_chain_time,
"there are no pending internals or externals, will import next anchor",
);

// drop used wu when anchor was not imported by used wu
working_state.wu_used_from_last_anchor = 0;
}
(_, _, true) => {
tracing::info!(target: tracing_targets::COLLATOR,
Expand Down Expand Up @@ -1790,14 +1799,15 @@ impl CollatorStdImpl {

imported_anchors_has_externals |= has_our_externals;

// reduce used wu by one imported anchor
working_state.wu_used_from_last_anchor = working_state
.wu_used_from_last_anchor
.saturating_sub(wu_used_to_import_next_anchor);

tracing::debug!(target: tracing_targets::COLLATOR,
wu_used_from_last_anchor = working_state.wu_used_from_last_anchor,
force_import_anchor_by_used_wu,
"wu_used_from_last_anchor dropped to",
"used wu dropped to",
);

if working_state.wu_used_from_last_anchor
Expand Down Expand Up @@ -1868,8 +1878,17 @@ impl CollatorStdImpl {
"will collate next shard block",
);

// should force next master block collation after this shard block
// when anchor import was skipped
let force_next_mc_block = if anchor_import_skipped && uncommitted_chain_length > 4 {
ForceMasterCollation::ByAnchorImportSkipped
} else {
ForceMasterCollation::No
};

drop(histogram);
self.do_collate(working_state, None).await?;
self.do_collate(working_state, None, force_next_mc_block)
.await?;
}
TryCollateCheck::NoPendingMessages
| TryCollateCheck::ForceMcBlockByUncommittedChainLength => {
Expand Down
23 changes: 22 additions & 1 deletion collator/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ where
&collation_result.prev_mc_block_id,
block_id.shard,
candidate_chain_time,
ForceMasterCollation::No,
collation_result.force_next_mc_block,
Some(block_id),
collation_result.collation_config.mc_block_min_interval_ms as _,
)
Expand Down Expand Up @@ -1324,6 +1324,11 @@ where
&mc_block_entry.top_shard_blocks_info,
)?;

// when we run sync by any reason we should drop uncommitted queue updates
// after restoring the required state
// to avoid panics if next block was already collated before an it is incorrect
self.mq_adapter.clear_session_state()?;

let state = mc_block_entry.cached_state()?;

// HACK: do not need to set master block latest chain time from zerostate when using mempool stub
Expand All @@ -1335,6 +1340,8 @@ where
);
}

Self::reset_collation_sync_status(&mut self.collation_sync_state.lock());

// TODO: refactor this logic
// replace last collated block id with last applied
self.blocks_cache
Expand Down Expand Up @@ -1894,6 +1901,20 @@ where
}
}

/// Reset collation status from `WaitForMasterStatus` to `AttemptsInProgress` for every shard.
///
/// Use this method before resuming collation after sync to avoid ambiguous situations.
/// If any shard has collation status `WaitForMasterStatus` and sync was executed,
/// when master collation check was finished first then it will enqueue one more resume for shard,
/// so we will have two parallel collations for shard that will cause panic futher.
fn reset_collation_sync_status(guard: &mut CollationSyncState) {
for (_, collation_state) in guard.states.iter_mut() {
if collation_state.status == CollationStatus::WaitForMasterStatus {
collation_state.status = CollationStatus::AttemptsInProgress;
}
}
}

/// 1. Store collation status for current shard
/// 2. Detect the next step: wait for master status, resume attempts, run master collation
fn detect_next_collation_step(
Expand Down
2 changes: 2 additions & 0 deletions collator/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tycho_block_util::state::{RefMcStateHandle, ShardStateStuff};
use tycho_network::PeerId;
use tycho_util::FastHashMap;

use crate::collator::ForceMasterCollation;
use crate::mempool::MempoolAnchorId;
use crate::utils::block::detect_top_processed_to_anchor;

Expand Down Expand Up @@ -69,6 +70,7 @@ pub struct BlockCollationResult {
pub prev_mc_block_id: BlockId,
pub mc_data: Option<Arc<McData>>,
pub collation_config: Arc<CollationConfig>,
pub force_next_mc_block: ForceMasterCollation,
}

/// Processed up to info for externals and internals.
Expand Down
Loading