Skip to content

Commit

Permalink
fix(pruning): Fix DB pruner responsiveness during shutdown (#2058)
Browse files Browse the repository at this point in the history
## What ❔

Makes DB pruner more responsive during node shutdown.

## Why ❔

Improves UX for node operators.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
  • Loading branch information
slowli authored May 29, 2024
1 parent 3202461 commit 0a07312
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 42 deletions.
50 changes: 40 additions & 10 deletions core/lib/db_connection/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
collections::HashMap,
fmt,
fmt, io,
panic::Location,
sync::{
atomic::{AtomicUsize, Ordering},
Expand Down Expand Up @@ -203,18 +203,48 @@ impl<'a, DB: DbMarker> Connection<'a, DB> {
matches!(self.inner, ConnectionInner::Transaction { .. })
}

/// Commits a transactional connection (one which was created by calling [`Self::start_transaction()`]).
/// If this connection is not transactional, returns an error.
pub async fn commit(self) -> DalResult<()> {
if let ConnectionInner::Transaction {
transaction: postgres,
tags,
} = self.inner
{
postgres
match self.inner {
ConnectionInner::Transaction {
transaction: postgres,
tags,
} => postgres
.commit()
.await
.map_err(|err| DalConnectionError::commit_transaction(err, tags.cloned()).into())
} else {
panic!("Connection::commit can only be invoked after calling Connection::begin_transaction");
.map_err(|err| DalConnectionError::commit_transaction(err, tags.cloned()).into()),
ConnectionInner::Pooled(conn) => {
let err = io::Error::new(
io::ErrorKind::Other,
"`Connection::commit()` can only be invoked after calling `Connection::begin_transaction()`",
);
Err(DalConnectionError::commit_transaction(sqlx::Error::Io(err), conn.tags).into())
}
}
}

/// Rolls back a transactional connection (one which was created by calling [`Self::start_transaction()`]).
/// If this connection is not transactional, returns an error.
pub async fn rollback(self) -> DalResult<()> {
match self.inner {
ConnectionInner::Transaction {
transaction: postgres,
tags,
} => postgres
.rollback()
.await
.map_err(|err| DalConnectionError::rollback_transaction(err, tags.cloned()).into()),
ConnectionInner::Pooled(conn) => {
let err = io::Error::new(
io::ErrorKind::Other,
"`Connection::rollback()` can only be invoked after calling `Connection::begin_transaction()`",
);
Err(
DalConnectionError::rollback_transaction(sqlx::Error::Io(err), conn.tags)
.into(),
)
}
}
}

Expand Down
13 changes: 13 additions & 0 deletions core/lib/db_connection/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ enum ConnectionAction {
AcquireConnection,
StartTransaction,
CommitTransaction,
RollbackTransaction,
}

impl ConnectionAction {
Expand All @@ -108,6 +109,7 @@ impl ConnectionAction {
Self::AcquireConnection => "acquiring DB connection",
Self::StartTransaction => "starting DB transaction",
Self::CommitTransaction => "committing DB transaction",
Self::RollbackTransaction => "rolling back DB transaction",
}
}
}
Expand Down Expand Up @@ -165,6 +167,17 @@ impl DalConnectionError {
connection_tags,
}
}

pub(crate) fn rollback_transaction(
inner: sqlx::Error,
connection_tags: Option<ConnectionTags>,
) -> Self {
Self {
inner,
action: ConnectionAction::RollbackTransaction,
connection_tags,
}
}
}

/// Extension trait to create `sqlx::Result`s, similar to `anyhow::Context`.
Expand Down
73 changes: 51 additions & 22 deletions core/node/db_pruner/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
//! Postgres pruning component.
use std::{fmt, sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration};

use anyhow::Context as _;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use zksync_dal::{pruning_dal::PruningInfo, Connection, ConnectionPool, Core, CoreDal};
Expand All @@ -14,7 +13,7 @@ use self::{
metrics::{MetricPruneType, METRICS},
prune_conditions::{
ConsistencyCheckerProcessedBatch, L1BatchExistsCondition, L1BatchOlderThanPruneCondition,
NextL1BatchHasMetadataCondition, NextL1BatchWasExecutedCondition,
NextL1BatchHasMetadataCondition, NextL1BatchWasExecutedCondition, PruneCondition,
},
};

Expand Down Expand Up @@ -59,6 +58,17 @@ impl From<PruningInfo> for DbPrunerHealth {
}
}

/// Outcome of a single pruning iteration.
#[derive(Debug)]
enum PruningIterationOutcome {
/// Nothing to prune.
NoOp,
/// Iteration resulted in pruning.
Pruned,
/// Pruning was interrupted because of a stop signal.
Interrupted,
}

/// Postgres database pruning component.
#[derive(Debug)]
pub struct DbPruner {
Expand All @@ -68,12 +78,6 @@ pub struct DbPruner {
prune_conditions: Vec<Arc<dyn PruneCondition>>,
}

/// Interface to be used for health checks.
#[async_trait]
trait PruneCondition: fmt::Debug + fmt::Display + Send + Sync + 'static {
async fn is_batch_prunable(&self, l1_batch_number: L1BatchNumber) -> anyhow::Result<bool>;
}

impl DbPruner {
pub fn new(config: DbPrunerConfig, connection_pool: ConnectionPool<Core>) -> Self {
let mut conditions: Vec<Arc<dyn PruneCondition>> = vec![
Expand Down Expand Up @@ -207,7 +211,11 @@ impl DbPruner {
Ok(true)
}

async fn hard_prune(&self, storage: &mut Connection<'_, Core>) -> anyhow::Result<()> {
async fn hard_prune(
&self,
storage: &mut Connection<'_, Core>,
stop_receiver: &mut watch::Receiver<bool>,
) -> anyhow::Result<PruningIterationOutcome> {
let latency = METRICS.pruning_chunk_duration[&MetricPruneType::Hard].start();
let mut transaction = storage.start_transaction().await?;

Expand All @@ -221,10 +229,21 @@ impl DbPruner {
format!("bogus pruning info {current_pruning_info:?}: trying to hard-prune data, but there is no soft-pruned L2 block")
})?;

let stats = transaction
.pruning_dal()
.hard_prune_batches_range(last_soft_pruned_l1_batch, last_soft_pruned_l2_block)
.await?;
let mut dal = transaction.pruning_dal();
let stats = tokio::select! {
result = dal.hard_prune_batches_range(
last_soft_pruned_l1_batch,
last_soft_pruned_l2_block,
) => result?,

_ = stop_receiver.changed() => {
// `hard_prune_batches_range()` can take a long time. It looks better to roll back it explicitly here if a node is getting shut down
// rather than waiting a node to force-exit after a timeout, which would interrupt the DB connection and will lead to an implicit rollback.
tracing::info!("Hard pruning interrupted; rolling back pruning transaction");
transaction.rollback().await?;
return Ok(PruningIterationOutcome::Interrupted);
}
};
METRICS.observe_hard_pruning(stats);
transaction.commit().await?;

Expand All @@ -236,10 +255,13 @@ impl DbPruner {
current_pruning_info.last_hard_pruned_l1_batch = Some(last_soft_pruned_l1_batch);
current_pruning_info.last_hard_pruned_l2_block = Some(last_soft_pruned_l2_block);
self.update_health(current_pruning_info);
Ok(())
Ok(PruningIterationOutcome::Pruned)
}

async fn run_single_iteration(&self) -> anyhow::Result<bool> {
async fn run_single_iteration(
&self,
stop_receiver: &mut watch::Receiver<bool>,
) -> anyhow::Result<PruningIterationOutcome> {
let mut storage = self.connection_pool.connection_tagged("db_pruner").await?;
let current_pruning_info = storage.pruning_dal().get_pruning_info().await?;
self.update_health(current_pruning_info);
Expand All @@ -250,15 +272,20 @@ impl DbPruner {
{
let pruning_done = self.soft_prune(&mut storage).await?;
if !pruning_done {
return Ok(false);
return Ok(PruningIterationOutcome::NoOp);
}
}
drop(storage); // Don't hold a connection across a timeout

tokio::time::sleep(self.config.removal_delay).await;
if tokio::time::timeout(self.config.removal_delay, stop_receiver.changed())
.await
.is_ok()
{
return Ok(PruningIterationOutcome::Interrupted);
}

let mut storage = self.connection_pool.connection_tagged("db_pruner").await?;
self.hard_prune(&mut storage).await?;
Ok(true)
self.hard_prune(&mut storage, stop_receiver).await
}

pub async fn run(self, mut stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
Expand All @@ -277,7 +304,7 @@ impl DbPruner {
tracing::warn!("Error updating DB pruning metrics: {err:?}");
}

let should_sleep = match self.run_single_iteration().await {
let should_sleep = match self.run_single_iteration(&mut stop_receiver).await {
Err(err) => {
// As this component is not really mission-critical, all errors are generally ignored
tracing::warn!(
Expand All @@ -290,7 +317,9 @@ impl DbPruner {
self.health_updater.update(health);
true
}
Ok(pruning_done) => !pruning_done,
Ok(PruningIterationOutcome::Interrupted) => break,
Ok(PruningIterationOutcome::Pruned) => false,
Ok(PruningIterationOutcome::NoOp) => true,
};

if should_sleep
Expand Down
5 changes: 4 additions & 1 deletion core/node/db_pruner/src/prune_conditions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use chrono::Utc;
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_types::L1BatchNumber;

use crate::PruneCondition;
#[async_trait]
pub(crate) trait PruneCondition: fmt::Debug + fmt::Display + Send + Sync + 'static {
async fn is_batch_prunable(&self, l1_batch_number: L1BatchNumber) -> anyhow::Result<bool>;
}

#[derive(Debug)]
pub(super) struct L1BatchOlderThanPruneCondition {
Expand Down
Loading

0 comments on commit 0a07312

Please sign in to comment.