Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Add some metrics to dispute-coordinator. #5884

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 42 additions & 18 deletions node/core/dispute-coordinator/src/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,15 @@ impl Initialized {
update: ActiveLeavesUpdate,
now: u64,
) -> Result<()> {
let on_chain_votes =
self.scraper.process_active_leaves_update(ctx.sender(), &update).await?;
self.participation.process_active_leaves_update(ctx, &update).await?;
let _total = self.metrics.time_active_leaves_update("total");
let on_chain_votes = {
let _scraper = self.metrics.time_active_leaves_update("scraper");
self.scraper.process_active_leaves_update(ctx.sender(), &update).await?
};
{
let _scraper = self.metrics.time_active_leaves_update("participation");
self.participation.process_active_leaves_update(ctx, &update).await?;
}

if let Some(new_leaf) = update.activated {
match self
Expand Down Expand Up @@ -306,8 +312,7 @@ impl Initialized {
Ok(SessionWindowUpdate::Unchanged) => {},
};

// The `runtime-api` subsystem has an internal queue which serializes the execution,
// so there is no point in running these in parallel.
let _on_chain_votes = self.metrics.time_active_leaves_update("on-chain-votes");
for votes in on_chain_votes {
let _ = self.process_on_chain_votes(ctx, overlay_db, votes, now).await.map_err(
|error| {
Expand Down Expand Up @@ -392,13 +397,16 @@ impl Initialized {
ValidDisputeStatementKind::BackingValid(relay_parent),
};
debug_assert!(
SignedDisputeStatement::new_checked(
DisputeStatement::Valid(valid_statement_kind),
candidate_hash,
session,
validator_public.clone(),
validator_signature.clone(),
).is_ok(),
{
let _signature_check = self.metrics.time_active_leaves_update("on-chain-vote-signature-check");
SignedDisputeStatement::new_checked(
DisputeStatement::Valid(valid_statement_kind),
candidate_hash,
session,
validator_public.clone(),
validator_signature.clone(),
).is_ok()
},
"Scraped backing votes had invalid signature! candidate: {:?}, session: {:?}, validator_public: {:?}",
candidate_hash,
session,
Expand All @@ -416,16 +424,18 @@ impl Initialized {
})
.collect();

let import_result = self
.handle_import_statements(
let import_result = {
let _vote_import = self.metrics.time_active_leaves_update("backing-vote-import");
self.handle_import_statements(
ctx,
overlay_db,
MaybeCandidateReceipt::Provides(candidate_receipt),
session,
statements,
now,
)
.await?;
.await?
};
match import_result {
ImportStatementsResult::ValidImport => gum::trace!(
target: LOG_TARGET,
Expand Down Expand Up @@ -512,8 +522,10 @@ impl Initialized {
))
})
.collect::<Vec<_>>();
let import_result = self
.handle_import_statements(
let import_result = {
let _dispute_vote_import =
self.metrics.time_active_leaves_update("dispute-vote-import");
self.handle_import_statements(
ctx,
overlay_db,
// TODO <https://github.com/paritytech/polkadot/issues/4011>
Expand All @@ -522,7 +534,8 @@ impl Initialized {
statements,
now,
)
.await?;
.await?
};
match import_result {
ImportStatementsResult::ValidImport => gum::trace!(
target: LOG_TARGET,
Expand All @@ -549,13 +562,15 @@ impl Initialized {
message: DisputeCoordinatorMessage,
now: Timestamp,
) -> Result<Box<dyn FnOnce() -> JfyiResult<()>>> {
let _any_req = self.metrics.time_process_message("total");
match message {
DisputeCoordinatorMessage::ImportStatements {
candidate_receipt,
session,
statements,
pending_confirmation,
} => {
let _req = self.metrics.time_process_message("ImportStatements");
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?candidate_receipt.hash(),
Expand Down Expand Up @@ -588,6 +603,7 @@ impl Initialized {
}
},
DisputeCoordinatorMessage::RecentDisputes(tx) => {
let _req = self.metrics.time_process_message("RecentDisputes");
// Return error if session information is missing.
self.ensure_available_session_info()?;

Expand All @@ -602,6 +618,7 @@ impl Initialized {
let _ = tx.send(recent_disputes.keys().cloned().collect());
},
DisputeCoordinatorMessage::ActiveDisputes(tx) => {
let _req = self.metrics.time_process_message("ActiveDisputes");
// Return error if session information is missing.
self.ensure_available_session_info()?;

Expand All @@ -618,8 +635,10 @@ impl Initialized {
.map(|(k, _)| k)
.collect(),
);
gum::trace!(target: LOG_TARGET, "DisputeCoordinatorMessage::ActiveDisputes DONE");
},
DisputeCoordinatorMessage::QueryCandidateVotes(query, tx) => {
let _req = self.metrics.time_process_message("QueryCandidateVotes");
// Return error if session information is missing.
self.ensure_available_session_info()?;

Expand Down Expand Up @@ -664,6 +683,7 @@ impl Initialized {
block_descriptions,
tx,
} => {
let _req = self.metrics.time_process_message("DetermineUndisputedChain");
// Return error if session information is missing.
self.ensure_available_session_info()?;
gum::trace!(
Expand All @@ -677,6 +697,10 @@ impl Initialized {
base_hash,
block_descriptions,
)?;
gum::trace!(
target: LOG_TARGET,
"DisputeCoordinatorMessage::DeterminedUndisputedChain"
);

let _ = tx.send(undisputed_chain);
},
Expand Down
52 changes: 52 additions & 0 deletions node/core/dispute-coordinator/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ struct MetricsInner {
queued_participations: prometheus::CounterVec<prometheus::U64>,
/// How long vote cleanup batches take.
vote_cleanup_time: prometheus::Histogram,
/// How long processing `ActiveLeaves` update takes.
///
/// in total and on individual processing steps.
active_leaves_update_time: prometheus::HistogramVec,
/// How long processing a certain message takes.
process_message_time: prometheus::HistogramVec,
}

/// Candidate validation metrics.
Expand Down Expand Up @@ -88,6 +94,24 @@ impl Metrics {
pub(crate) fn time_vote_cleanup(&self) -> Option<prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.vote_cleanup_time.start_timer())
}

pub(crate) fn time_active_leaves_update(
&self,
label: &'static str,
) -> Option<prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| {
metrics.active_leaves_update_time.with_label_values(&[label]).start_timer()
})
}

pub(crate) fn time_process_message(
&self,
label: &'static str,
) -> Option<prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| {
metrics.process_message_time.with_label_values(&[label]).start_timer()
})
}
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -147,6 +171,34 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
active_leaves_update_time: prometheus::register(
prometheus::HistogramVec::new(
prometheus::HistogramOpts::new(
"polkadot_parachain_dispute_coordinator_active_leaves_update_time",
"Time spent on tasks on active leaves update",
)
.buckets(vec![
0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.5,
1.0, 2.0, 3.0,
]),
&["task"],
)?,
registry,
)?,
process_message_time: prometheus::register(
prometheus::HistogramVec::new(
prometheus::HistogramOpts::new(
"polkadot_parachain_dispute_coordinator_process_message_time",
"Time spent processing messages",
)
.buckets(vec![
0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.5,
1.0, 2.0, 3.0,
]),
&["msg"],
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
Expand Down