Skip to content

Commit

Permalink
refactor(pruning): Improve pruning metrics and logs (#2297)
Browse files Browse the repository at this point in the history
## What ❔

- Adds counters for outcomes (success, failure, error) for all pruning
conditions.
- Distinguishes between no-op and soft pruning latency.
- Logs latencies and stats in tree pruning.
- Other misc improvements.

## Why ❔

Improves pruning observability.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
slowli authored Jun 21, 2024
1 parent c2412cf commit e467028
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 34 deletions.
34 changes: 17 additions & 17 deletions core/lib/merkle_tree/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,14 +309,29 @@ enum Bound {
End,
}

const LARGE_NODE_COUNT_BUCKETS: Buckets = Buckets::values(&[
1_000.0,
2_000.0,
5_000.0,
10_000.0,
20_000.0,
50_000.0,
100_000.0,
200_000.0,
500_000.0,
1_000_000.0,
2_000_000.0,
5_000_000.0,
]);

#[derive(Debug, Metrics)]
#[metrics(prefix = "merkle_tree_pruning")]
struct PruningMetrics {
/// Minimum Merkle tree version targeted after a single pruning iteration. The iteration
/// may not remove all stale keys to this version if there are too many.
target_retained_version: Gauge<u64>,
/// Number of pruned node keys on a specific pruning iteration.
#[metrics(buckets = NODE_COUNT_BUCKETS)]
#[metrics(buckets = LARGE_NODE_COUNT_BUCKETS)]
key_count: Histogram<usize>,
/// Lower and upper boundaries on the new stale key versions deleted
/// during a pruning iteration. The lower boundary is inclusive, the upper one is exclusive.
Expand Down Expand Up @@ -368,26 +383,11 @@ pub(crate) enum RecoveryStage {
ParallelPersistence,
}

const CHUNK_SIZE_BUCKETS: Buckets = Buckets::values(&[
1_000.0,
2_000.0,
5_000.0,
10_000.0,
20_000.0,
50_000.0,
100_000.0,
200_000.0,
500_000.0,
1_000_000.0,
2_000_000.0,
5_000_000.0,
]);

#[derive(Debug, Metrics)]
#[metrics(prefix = "merkle_tree_recovery")]
pub(crate) struct RecoveryMetrics {
/// Number of entries in a recovered chunk.
#[metrics(buckets = CHUNK_SIZE_BUCKETS)]
#[metrics(buckets = LARGE_NODE_COUNT_BUCKETS)]
pub chunk_size: Histogram<usize>,
/// Latency of a specific stage of recovery for a single chunk.
#[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)]
Expand Down
8 changes: 5 additions & 3 deletions core/lib/merkle_tree/src/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,15 @@ impl<DB: PruneDatabase> MerkleTreePruner<DB> {
break;
}
}
load_stale_keys_latency.observe();
let load_stale_keys_latency = load_stale_keys_latency.observe();

if pruned_keys.is_empty() {
tracing::debug!("No stale keys to remove; skipping");
return Ok(None);
}
let deleted_stale_key_versions = min_stale_key_version..(max_stale_key_version + 1);
tracing::info!(
"Collected {} stale keys with new versions in {deleted_stale_key_versions:?}",
"Collected {} stale keys with new versions in {deleted_stale_key_versions:?} in {load_stale_keys_latency:?}",
pruned_keys.len()
);

Expand All @@ -186,7 +186,8 @@ impl<DB: PruneDatabase> MerkleTreePruner<DB> {
let patch = PrunePatchSet::new(pruned_keys, deleted_stale_key_versions);
let apply_patch_latency = PRUNING_TIMINGS.apply_patch.start();
self.db.prune(patch)?;
apply_patch_latency.observe();
let apply_patch_latency = apply_patch_latency.observe();
tracing::info!("Pruned stale keys in {apply_patch_latency:?}: {stats:?}");
Ok(Some(stats))
}

Expand Down Expand Up @@ -230,6 +231,7 @@ impl<DB: PruneDatabase> MerkleTreePruner<DB> {
self.poll_interval
};
}
tracing::info!("Stop signal received, tree pruning is shut down");
Ok(())
}
}
Expand Down
33 changes: 23 additions & 10 deletions core/node/db_pruner/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
//! Postgres pruning component.
use std::{sync::Arc, time::Duration};
use std::{
sync::Arc,
time::{Duration, Instant},
};

use anyhow::Context as _;
use serde::{Deserialize, Serialize};
Expand All @@ -10,7 +13,7 @@ use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthChe
use zksync_types::{L1BatchNumber, L2BlockNumber};

use self::{
metrics::{MetricPruneType, METRICS},
metrics::{ConditionOutcome, PruneType, METRICS},
prune_conditions::{
ConsistencyCheckerProcessedBatch, L1BatchExistsCondition, L1BatchOlderThanPruneCondition,
NextL1BatchHasMetadataCondition, NextL1BatchWasExecutedCondition, PruneCondition,
Expand Down Expand Up @@ -128,15 +131,24 @@ impl DbPruner {
let mut errored_conditions = vec![];

for condition in &self.prune_conditions {
match condition.is_batch_prunable(l1_batch_number).await {
Ok(true) => successful_conditions.push(condition.to_string()),
Ok(false) => failed_conditions.push(condition.to_string()),
let outcome = match condition.is_batch_prunable(l1_batch_number).await {
Ok(true) => {
successful_conditions.push(condition.to_string());
ConditionOutcome::Success
}
Ok(false) => {
failed_conditions.push(condition.to_string());
ConditionOutcome::Fail
}
Err(error) => {
errored_conditions.push(condition.to_string());
tracing::warn!("Pruning condition '{condition}' resulted in an error: {error}");
ConditionOutcome::Error
}
}
};
METRICS.observe_condition(condition.as_ref(), outcome);
}

let result = failed_conditions.is_empty() && errored_conditions.is_empty();
if !result {
tracing::debug!(
Expand Down Expand Up @@ -172,7 +184,7 @@ impl DbPruner {
}

async fn soft_prune(&self, storage: &mut Connection<'_, Core>) -> anyhow::Result<bool> {
let latency = METRICS.pruning_chunk_duration[&MetricPruneType::Soft].start();
let start = Instant::now();
let mut transaction = storage.start_transaction().await?;

let mut current_pruning_info = transaction.pruning_dal().get_pruning_info().await?;
Expand All @@ -184,7 +196,7 @@ impl DbPruner {
+ self.config.pruned_batch_chunk_size,
);
if !self.is_l1_batch_prunable(next_l1_batch_to_prune).await {
latency.observe();
METRICS.pruning_chunk_duration[&PruneType::NoOp].observe(start.elapsed());
return Ok(false);
}

Expand All @@ -200,7 +212,8 @@ impl DbPruner {

transaction.commit().await?;

let latency = latency.observe();
let latency = start.elapsed();
METRICS.pruning_chunk_duration[&PruneType::Soft].observe(latency);
tracing::info!(
"Soft pruned db l1_batches up to {next_l1_batch_to_prune} and L2 blocks up to {next_l2_block_to_prune}, operation took {latency:?}",
);
Expand All @@ -216,7 +229,7 @@ impl DbPruner {
storage: &mut Connection<'_, Core>,
stop_receiver: &mut watch::Receiver<bool>,
) -> anyhow::Result<PruningIterationOutcome> {
let latency = METRICS.pruning_chunk_duration[&MetricPruneType::Hard].start();
let latency = METRICS.pruning_chunk_duration[&PruneType::Hard].start();
let mut transaction = storage.start_transaction().await?;

let mut current_pruning_info = transaction.pruning_dal().get_pruning_info().await?;
Expand Down
36 changes: 33 additions & 3 deletions core/node/db_pruner/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use std::time::Duration;

use vise::{Buckets, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, Metrics, Unit};
use vise::{
Buckets, Counter, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, Metrics, Unit,
};
use zksync_dal::pruning_dal::HardPruningStats;

use crate::prune_conditions::PruneCondition;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)]
#[metrics(label = "prune_type", rename_all = "snake_case")]
pub(super) enum MetricPruneType {
pub(super) enum PruneType {
NoOp,
Soft,
Hard,
}
Expand All @@ -21,21 +26,38 @@ enum PrunedEntityType {
CallTrace,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue)]
#[metrics(rename_all = "snake_case")]
pub(crate) enum ConditionOutcome {
Success,
Fail,
Error,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet)]
struct ConditionOutcomeLabels {
condition: &'static str,
outcome: ConditionOutcome,
}

const ENTITY_COUNT_BUCKETS: Buckets = Buckets::values(&[
1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1_000.0, 2_000.0, 5_000.0, 10_000.0,
20_000.0, 50_000.0, 100_000.0,
]);

#[derive(Debug, Metrics)]
#[metrics(prefix = "db_pruner")]
pub(super) struct DbPrunerMetrics {
/// Total latency of pruning chunk of L1 batches.
#[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)]
pub pruning_chunk_duration: Family<MetricPruneType, Histogram<Duration>>,
pub pruning_chunk_duration: Family<PruneType, Histogram<Duration>>,
/// Number of not-pruned L1 batches.
pub not_pruned_l1_batches_count: Gauge<u64>,
/// Number of entities deleted during a single hard pruning iteration, grouped by entity type.
#[metrics(buckets = ENTITY_COUNT_BUCKETS)]
deleted_entities: Family<PrunedEntityType, Histogram<u64>>,
/// Number of times a certain condition has resulted in a specific outcome (succeeded, failed, or errored).
condition_outcomes: Family<ConditionOutcomeLabels, Counter>,
}

impl DbPrunerMetrics {
Expand All @@ -61,6 +83,14 @@ impl DbPrunerMetrics {
self.deleted_entities[&PrunedEntityType::L2ToL1Log].observe(deleted_l2_to_l1_logs);
self.deleted_entities[&PrunedEntityType::CallTrace].observe(deleted_call_traces);
}

pub fn observe_condition(&self, condition: &dyn PruneCondition, outcome: ConditionOutcome) {
let labels = ConditionOutcomeLabels {
condition: condition.metric_label(),
outcome,
};
self.condition_outcomes[&labels].inc();
}
}

#[vise::register]
Expand Down
22 changes: 22 additions & 0 deletions core/node/db_pruner/src/prune_conditions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use zksync_types::L1BatchNumber;

#[async_trait]
pub(crate) trait PruneCondition: fmt::Debug + fmt::Display + Send + Sync + 'static {
fn metric_label(&self) -> &'static str;

async fn is_batch_prunable(&self, l1_batch_number: L1BatchNumber) -> anyhow::Result<bool>;
}

Expand All @@ -24,6 +26,10 @@ impl fmt::Display for L1BatchOlderThanPruneCondition {

#[async_trait]
impl PruneCondition for L1BatchOlderThanPruneCondition {
fn metric_label(&self) -> &'static str {
"l1_batch_older_than_minimum_age"
}

async fn is_batch_prunable(&self, l1_batch_number: L1BatchNumber) -> anyhow::Result<bool> {
let mut storage = self.pool.connection_tagged("db_pruner").await?;
let l1_batch_header = storage
Expand All @@ -50,6 +56,10 @@ impl fmt::Display for NextL1BatchWasExecutedCondition {

#[async_trait]
impl PruneCondition for NextL1BatchWasExecutedCondition {
fn metric_label(&self) -> &'static str {
"next_l1_batch_was_executed"
}

async fn is_batch_prunable(&self, l1_batch_number: L1BatchNumber) -> anyhow::Result<bool> {
let mut storage = self.pool.connection_tagged("db_pruner").await?;
let next_l1_batch_number = L1BatchNumber(l1_batch_number.0 + 1);
Expand All @@ -76,6 +86,10 @@ impl fmt::Display for NextL1BatchHasMetadataCondition {

#[async_trait]
impl PruneCondition for NextL1BatchHasMetadataCondition {
fn metric_label(&self) -> &'static str {
"next_l1_batch_has_metadata"
}

async fn is_batch_prunable(&self, l1_batch_number: L1BatchNumber) -> anyhow::Result<bool> {
let mut storage = self.pool.connection_tagged("db_pruner").await?;
let next_l1_batch_number = L1BatchNumber(l1_batch_number.0 + 1);
Expand Down Expand Up @@ -117,6 +131,10 @@ impl fmt::Display for L1BatchExistsCondition {

#[async_trait]
impl PruneCondition for L1BatchExistsCondition {
fn metric_label(&self) -> &'static str {
"l1_batch_exists"
}

async fn is_batch_prunable(&self, l1_batch_number: L1BatchNumber) -> anyhow::Result<bool> {
let mut storage = self.pool.connection_tagged("db_pruner").await?;
let l1_batch_header = storage
Expand All @@ -140,6 +158,10 @@ impl fmt::Display for ConsistencyCheckerProcessedBatch {

#[async_trait]
impl PruneCondition for ConsistencyCheckerProcessedBatch {
fn metric_label(&self) -> &'static str {
"l1_batch_consistency_checked"
}

async fn is_batch_prunable(&self, l1_batch_number: L1BatchNumber) -> anyhow::Result<bool> {
let mut storage = self.pool.connection_tagged("db_pruner").await?;
let last_processed_l1_batch = storage
Expand Down
6 changes: 5 additions & 1 deletion core/node/db_pruner/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,14 @@ impl fmt::Display for ConditionMock {

#[async_trait]
impl PruneCondition for ConditionMock {
fn metric_label(&self) -> &'static str {
"mock"
}

async fn is_batch_prunable(&self, l1_batch_number: L1BatchNumber) -> anyhow::Result<bool> {
self.is_batch_prunable_responses
.get(&l1_batch_number)
.cloned()
.copied()
.context("error!")
}
}
Expand Down

0 comments on commit e467028

Please sign in to comment.