Skip to content

Commit

Permalink
Add metric to measure the time it takes to gather enough assignments (p…
Browse files Browse the repository at this point in the history
…aritytech#4587)

To understand with high granularity how many assignment tranches are
triggered before we concur that we have enough assignments.

This metric is important because the triggering of an assignment creates
a lot of work in the system for approving the candidate and gossiping
the necessary messages.

---------

Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
Co-authored-by: ordian <write@reusable.software>
  • Loading branch information
2 people authored and TarekkMA committed Aug 2, 2024
1 parent a0c4cb3 commit df1a8ae
Show file tree
Hide file tree
Showing 3 changed files with 406 additions and 6 deletions.
6 changes: 5 additions & 1 deletion polkadot/node/core/approval-voting/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ pub(crate) mod tests {
use super::*;
use crate::{
approval_db::common::{load_block_entry, DbBackend},
RuntimeInfo, RuntimeInfoConfig,
RuntimeInfo, RuntimeInfoConfig, MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS,
};
use ::test_helpers::{dummy_candidate_receipt, dummy_hash};
use assert_matches::assert_matches;
Expand All @@ -622,6 +622,7 @@ pub(crate) mod tests {
node_features::FeatureIndex, ExecutorParams, Id as ParaId, IndexedVec, NodeFeatures,
SessionInfo, ValidatorId, ValidatorIndex,
};
use schnellru::{ByLength, LruMap};
pub(crate) use sp_consensus_babe::{
digests::{CompatibleDigestItem, PreDigest, SecondaryVRFPreDigest},
AllowedSlots, BabeEpochConfiguration, Epoch as BabeEpoch,
Expand Down Expand Up @@ -658,6 +659,9 @@ pub(crate) mod tests {
clock: Box::new(MockClock::default()),
assignment_criteria: Box::new(MockAssignmentCriteria::default()),
spans: HashMap::new(),
per_block_assignments_gathering_times: LruMap::new(ByLength::new(
MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS,
)),
}
}

Expand Down
170 changes: 166 additions & 4 deletions polkadot/node/core/approval-voting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ use sc_keystore::LocalKeystore;
use sp_application_crypto::Pair;
use sp_consensus::SyncOracle;
use sp_consensus_slots::Slot;
use std::time::Instant;

// The max number of blocks we keep track of assignments gathering times. Normally,
// this would never be reached because we prune the data on finalization, but we need
// to also ensure the data is not growing unecessarily large.
const MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS: u32 = 100;

use futures::{
channel::oneshot,
Expand Down Expand Up @@ -182,6 +188,14 @@ struct MetricsInner {
time_recover_and_approve: prometheus::Histogram,
candidate_signatures_requests_total: prometheus::Counter<prometheus::U64>,
unapproved_candidates_in_unfinalized_chain: prometheus::Gauge<prometheus::U64>,
// The time it takes in each stage to gather enough assignments.
// We defined a `stage` as being the entire process of gathering enough assignments to
// be able to approve a candidate:
// E.g:
// - Stage 0: We wait for the needed_approvals assignments to be gathered.
// - Stage 1: We wait for enough tranches to cover all no-shows in stage 0.
// - Stage 2: We wait for enough tranches to cover all no-shows of stage 1.
assignments_gathering_time_by_stage: prometheus::HistogramVec,
}

/// Approval Voting metrics.
Expand Down Expand Up @@ -302,6 +316,20 @@ impl Metrics {
metrics.unapproved_candidates_in_unfinalized_chain.set(count as u64);
}
}

pub fn observe_assignment_gathering_time(&self, stage: usize, elapsed_as_millis: usize) {
if let Some(metrics) = &self.0 {
let stage_string = stage.to_string();
// We don't want to have too many metrics entries with this label to not put unncessary
// pressure on the metrics infrastructure, so we cap the stage at 10, which is
// equivalent to having already a finalization lag to 10 * no_show_slots, so it should
// be more than enough.
metrics
.assignments_gathering_time_by_stage
.with_label_values(&[if stage < 10 { stage_string.as_str() } else { "inf" }])
.observe(elapsed_as_millis as f64);
}
}
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -431,6 +459,17 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
assignments_gathering_time_by_stage: prometheus::register(
prometheus::HistogramVec::new(
prometheus::HistogramOpts::new(
"polkadot_parachain_assignments_gather_time_by_stage_ms",
"The time in ms it takes for each stage to gather enough assignments needed for approval",
)
.buckets(vec![0.0, 250.0, 500.0, 1000.0, 2000.0, 4000.0, 8000.0, 16000.0, 32000.0]),
&["stage"],
)?,
registry,
)?,
};

Ok(Metrics(Some(metrics)))
Expand Down Expand Up @@ -788,6 +827,28 @@ struct State {
clock: Box<dyn Clock + Send + Sync>,
assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
spans: HashMap<Hash, jaeger::PerLeafSpan>,
// Per block, candidate records about how long we take until we gather enough
// assignments, this is relevant because it gives us a good idea about how many
// tranches we trigger and why.
per_block_assignments_gathering_times:
LruMap<BlockNumber, HashMap<(Hash, CandidateHash), AssignmentGatheringRecord>>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
struct AssignmentGatheringRecord {
// The stage we are in.
// Candidate assignment gathering goes in stages, first we wait for needed_approvals(stage 0)
// Then if we have no-shows, we move into stage 1 and wait for enough tranches to cover all
// no-shows.
stage: usize,
// The time we started the stage.
stage_start: Option<Instant>,
}

impl Default for AssignmentGatheringRecord {
fn default() -> Self {
AssignmentGatheringRecord { stage: 0, stage_start: Some(Instant::now()) }
}
}

#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
Expand Down Expand Up @@ -893,6 +954,96 @@ impl State {
},
}
}

fn mark_begining_of_gathering_assignments(
&mut self,
block_number: BlockNumber,
block_hash: Hash,
candidate: CandidateHash,
) {
if let Some(record) = self
.per_block_assignments_gathering_times
.get_or_insert(block_number, HashMap::new)
.and_then(|records| Some(records.entry((block_hash, candidate)).or_default()))
{
if record.stage_start.is_none() {
record.stage += 1;
gum::debug!(
target: LOG_TARGET,
stage = ?record.stage,
?block_hash,
?candidate,
"Started a new assignment gathering stage",
);
record.stage_start = Some(Instant::now());
}
}
}

fn mark_gathered_enough_assignments(
&mut self,
block_number: BlockNumber,
block_hash: Hash,
candidate: CandidateHash,
) -> AssignmentGatheringRecord {
let record = self
.per_block_assignments_gathering_times
.get(&block_number)
.and_then(|entry| entry.get_mut(&(block_hash, candidate)));
let stage = record.as_ref().map(|record| record.stage).unwrap_or_default();
AssignmentGatheringRecord {
stage,
stage_start: record.and_then(|record| record.stage_start.take()),
}
}

fn cleanup_assignments_gathering_timestamp(&mut self, remove_lower_than: BlockNumber) {
while let Some((block_number, _)) = self.per_block_assignments_gathering_times.peek_oldest()
{
if *block_number < remove_lower_than {
self.per_block_assignments_gathering_times.pop_oldest();
} else {
break
}
}
}

fn observe_assignment_gathering_status(
&mut self,
metrics: &Metrics,
required_tranches: &RequiredTranches,
block_hash: Hash,
block_number: BlockNumber,
candidate_hash: CandidateHash,
) {
match required_tranches {
RequiredTranches::All | RequiredTranches::Pending { .. } => {
self.mark_begining_of_gathering_assignments(
block_number,
block_hash,
candidate_hash,
);
},
RequiredTranches::Exact { .. } => {
let time_to_gather =
self.mark_gathered_enough_assignments(block_number, block_hash, candidate_hash);
if let Some(gathering_started) = time_to_gather.stage_start {
if gathering_started.elapsed().as_millis() > 6000 {
gum::trace!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
"Long assignment gathering time",
);
}
metrics.observe_assignment_gathering_time(
time_to_gather.stage,
gathering_started.elapsed().as_millis() as usize,
)
}
},
}
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -942,6 +1093,9 @@ where
clock: subsystem.clock,
assignment_criteria,
spans: HashMap::new(),
per_block_assignments_gathering_times: LruMap::new(ByLength::new(
MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS,
)),
};

// `None` on start-up. Gets initialized/updated on leaf update
Expand Down Expand Up @@ -973,7 +1127,7 @@ where
subsystem.metrics.on_wakeup();
process_wakeup(
&mut ctx,
&state,
&mut state,
&mut overlayed_db,
&mut session_info_provider,
woken_block,
Expand Down Expand Up @@ -1632,6 +1786,7 @@ async fn handle_from_overseer<Context>(
// `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans
// accordingly.
wakeups.prune_finalized_wakeups(block_number, &mut state.spans);
state.cleanup_assignments_gathering_timestamp(block_number);

// // `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans
// accordingly. let hash_set =
Expand Down Expand Up @@ -2478,7 +2633,7 @@ where

async fn check_and_import_approval<T, Sender>(
sender: &mut Sender,
state: &State,
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
session_info_provider: &mut RuntimeInfo,
metrics: &Metrics,
Expand Down Expand Up @@ -2710,7 +2865,7 @@ impl ApprovalStateTransition {
// as necessary and schedules any further wakeups.
async fn advance_approval_state<Sender>(
sender: &mut Sender,
state: &State,
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
session_info_provider: &mut RuntimeInfo,
metrics: &Metrics,
Expand Down Expand Up @@ -2761,6 +2916,13 @@ where
approval_entry,
status.required_tranches.clone(),
);
state.observe_assignment_gathering_status(
&metrics,
&status.required_tranches,
block_hash,
block_entry.block_number(),
candidate_hash,
);

// Check whether this is approved, while allowing a maximum
// assignment tick of `now - APPROVAL_DELAY` - that is, that
Expand Down Expand Up @@ -2941,7 +3103,7 @@ fn should_trigger_assignment(
#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
async fn process_wakeup<Context>(
ctx: &mut Context,
state: &State,
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
session_info_provider: &mut RuntimeInfo,
relay_block: Hash,
Expand Down
Loading

0 comments on commit df1a8ae

Please sign in to comment.