Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Fix a storage leak in parachains db #5594

Merged
merged 15 commits into from
Jun 13, 2022
7 changes: 0 additions & 7 deletions node/core/dispute-coordinator/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,6 @@ impl<'a, B: 'a + Backend> OverlayedBackend<'a, B> {
self.candidate_votes.insert((session, candidate_hash), Some(votes));
}

/// Prepare a deletion of the candidate votes under the indicated candidate.
///
/// Later calls to this function for the same candidate will override earlier ones.
pub fn delete_candidate_votes(&mut self, session: SessionIndex, candidate_hash: CandidateHash) {
self.candidate_votes.insert((session, candidate_hash), None);
}

/// Transform this backend into a set of write-ops to be written to the inner backend.
pub fn into_write_ops(self) -> impl Iterator<Item = BackendWriteOp> {
let earliest_session_ops = self
Expand Down
240 changes: 204 additions & 36 deletions node/core/dispute-coordinator/src/db/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,80 @@ use parity_scale_codec::{Decode, Encode};
use crate::{
backend::{Backend, BackendWriteOp, OverlayedBackend},
error::{FatalError, FatalResult},
metrics::Metrics,
status::DisputeStatus,
DISPUTE_WINDOW,
DISPUTE_WINDOW, LOG_TARGET,
};

const RECENT_DISPUTES_KEY: &[u8; 15] = b"recent-disputes";
const EARLIEST_SESSION_KEY: &[u8; 16] = b"earliest-session";
const CANDIDATE_VOTES_SUBKEY: &[u8; 15] = b"candidate-votes";
/// Until what session have votes been cleaned up already?
const CLEANED_VOTES_WATERMARK_KEY: &[u8; 23] = b"cleaned-votes-watermark";

/// Restrict number of cleanup operations.
///
/// On the first run we are starting at session 0 going up all the way to the current session -
/// this should not be done at once, but rather in smaller batches so nodes won't get stalled by
/// this.
///
/// 300 is with session duration of 1 hour and 30 parachains around <3_000_000 key purges in the worst
/// case. Which is already quite a lot, at the same time we have around 21_000 sessions on
/// Kusama. This means at 300 purged sessions per session, cleaning everything up will take
/// around 3 days. Depending on how severe disk usage becomes, we might want to bump the batch
/// size, at the cost of risking issues at session boundaries (performance).
#[cfg(test)]
const MAX_CLEAN_BATCH_SIZE: u32 = 10;
#[cfg(not(test))]
const MAX_CLEAN_BATCH_SIZE: u32 = 300;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the impact of doing this many items on each earliest session update? (for nodes who have a lot of dangling storage items to clean)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I have seen so far, it seems to be pretty fast - although that might have been mostly empty sessions. Worst thing that could happen is that a valdiator is heavily loaded at a session boundary and fails to do some work. I also don't have any good data yet about how much wasted storage we are actually talking about, if it is tiny we can go with smaller batch sizes as then it does not matter if it takes forever.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

burnin on Kusama will tell.


pub struct DbBackend {
inner: Arc<dyn Database>,
config: ColumnConfiguration,
metrics: Metrics,
}

impl DbBackend {
pub fn new(db: Arc<dyn Database>, config: ColumnConfiguration) -> Self {
Self { inner: db, config }
pub fn new(db: Arc<dyn Database>, config: ColumnConfiguration, metrics: Metrics) -> Self {
Self { inner: db, config, metrics }
}

/// Cleanup old votes.
///
/// Should be called whenever a new earliest session gets written.
fn add_vote_cleanup_tx(
&mut self,
tx: &mut DBTransaction,
earliest_session: SessionIndex,
) -> FatalResult<()> {
// Cleanup old votes in db:
let watermark = load_cleaned_votes_watermark(&*self.inner, &self.config)?.unwrap_or(0);
let clean_until = if earliest_session.saturating_sub(watermark) > MAX_CLEAN_BATCH_SIZE {
watermark + MAX_CLEAN_BATCH_SIZE
} else {
earliest_session
};
gum::trace!(
target: LOG_TARGET,
?watermark,
?clean_until,
?earliest_session,
?MAX_CLEAN_BATCH_SIZE,
"WriteEarliestSession"
);

for index in watermark..clean_until {
gum::trace!(
target: LOG_TARGET,
?index,
encoded = ?candidate_votes_session_prefix(index),
"Cleaning votes for session index"
);
tx.delete_prefix(self.config.col_data, &candidate_votes_session_prefix(index));
}
// New watermark:
tx.put_vec(self.config.col_data, CLEANED_VOTES_WATERMARK_KEY, clean_until.encode());
Ok(())
}
}

Expand All @@ -71,20 +129,32 @@ impl Backend for DbBackend {

/// Atomically writes the list of operations, with later operations taking precedence over
/// prior.
///
/// This also takes care of purging old votes (of obsolete sessions).
fn write<I>(&mut self, ops: I) -> FatalResult<()>
where
I: IntoIterator<Item = BackendWriteOp>,
{
let mut tx = DBTransaction::new();
// Make sure the whole process is timed, including the actual transaction flush:
let mut cleanup_timer = None;
for op in ops {
match op {
BackendWriteOp::WriteEarliestSession(session) => {
cleanup_timer = match cleanup_timer.take() {
None => Some(self.metrics.time_vote_cleanup()),
Some(t) => Some(t),
};
self.add_vote_cleanup_tx(&mut tx, session)?;

// Actually write the earliest session.
tx.put_vec(self.config.col_data, EARLIEST_SESSION_KEY, session.encode());
},
BackendWriteOp::WriteRecentDisputes(recent_disputes) => {
tx.put_vec(self.config.col_data, RECENT_DISPUTES_KEY, recent_disputes.encode());
},
BackendWriteOp::WriteCandidateVotes(session, candidate_hash, votes) => {
gum::trace!(target: LOG_TARGET, ?session, "Writing candidate votes");
tx.put_vec(
self.config.col_data,
&candidate_votes_key(session, &candidate_hash),
Expand Down Expand Up @@ -112,6 +182,15 @@ fn candidate_votes_key(session: SessionIndex, candidate_hash: &CandidateHash) ->
buf
}

fn candidate_votes_session_prefix(session: SessionIndex) -> [u8; 15 + 4] {
let mut buf = [0u8; 15 + 4];
buf[..15].copy_from_slice(CANDIDATE_VOTES_SUBKEY);

// big-endian encoding is used to ensure lexicographic ordering.
buf[15..][..4].copy_from_slice(&session.to_be_bytes());
buf
}

/// Column configuration information for the DB.
#[derive(Debug, Clone)]
pub struct ColumnConfiguration {
Expand Down Expand Up @@ -244,9 +323,7 @@ pub(crate) fn note_current_session(

if pruned_disputes.len() != 0 {
overlay_db.write_recent_disputes(new_recent_disputes);
for ((session, candidate_hash), _) in pruned_disputes {
overlay_db.delete_candidate_votes(session, candidate_hash);
}
// Note: Deleting old candidate votes is handled in `write` based on the earliest session.
}
}
},
Expand All @@ -258,18 +335,114 @@ pub(crate) fn note_current_session(
Ok(())
}

/// Until what session votes have been cleaned up already.
///
/// That is the db has already been purged of votes for sessions older than the returned
/// `SessionIndex`.
fn load_cleaned_votes_watermark(
db: &dyn Database,
config: &ColumnConfiguration,
) -> FatalResult<Option<SessionIndex>> {
load_decode(db, config.col_data, CLEANED_VOTES_WATERMARK_KEY)
.map_err(|e| FatalError::DbReadFailed(e))
eskimor marked this conversation as resolved.
Show resolved Hide resolved
}

#[cfg(test)]
mod tests {

use super::*;
use ::test_helpers::{dummy_candidate_receipt, dummy_hash};
use polkadot_primitives::v2::{Hash, Id as ParaId};

fn make_db() -> DbBackend {
let db = kvdb_memorydb::create(1);
let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]);
let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[0]);
let store = Arc::new(db);
let config = ColumnConfiguration { col_data: 0 };
DbBackend::new(store, config)
DbBackend::new(store, config, Metrics::default())
}

#[test]
fn max_clean_batch_size_is_honored() {
let mut backend = make_db();

let mut overlay_db = OverlayedBackend::new(&backend);
let current_session = MAX_CLEAN_BATCH_SIZE + DISPUTE_WINDOW.get() + 3;
let earliest_session = current_session - DISPUTE_WINDOW.get();

overlay_db.write_earliest_session(0);
let candidate_hash = CandidateHash(Hash::repeat_byte(1));

for session in 0..current_session + 1 {
overlay_db.write_candidate_votes(
session,
candidate_hash,
CandidateVotes {
candidate_receipt: dummy_candidate_receipt(dummy_hash()),
valid: Vec::new(),
invalid: Vec::new(),
},
);
}
assert!(overlay_db.load_candidate_votes(0, &candidate_hash).unwrap().is_some());
assert!(overlay_db
.load_candidate_votes(MAX_CLEAN_BATCH_SIZE - 1, &candidate_hash)
.unwrap()
.is_some());
assert!(overlay_db
.load_candidate_votes(MAX_CLEAN_BATCH_SIZE, &candidate_hash)
.unwrap()
.is_some());

// Cleanup only works for votes that have been written already - so write.
let write_ops = overlay_db.into_write_ops();
backend.write(write_ops).unwrap();

let mut overlay_db = OverlayedBackend::new(&backend);

gum::trace!(target: LOG_TARGET, ?current_session, "Noting current session");
note_current_session(&mut overlay_db, current_session).unwrap();

let write_ops = overlay_db.into_write_ops();
backend.write(write_ops).unwrap();

let mut overlay_db = OverlayedBackend::new(&backend);

assert!(overlay_db
.load_candidate_votes(MAX_CLEAN_BATCH_SIZE - 1, &candidate_hash)
.unwrap()
.is_none());
// After batch size votes should still be there:
assert!(overlay_db
.load_candidate_votes(MAX_CLEAN_BATCH_SIZE, &candidate_hash)
.unwrap()
.is_some());

let current_session = current_session + 1;
let earliest_session = earliest_session + 1;

note_current_session(&mut overlay_db, current_session).unwrap();

let write_ops = overlay_db.into_write_ops();
backend.write(write_ops).unwrap();

let overlay_db = OverlayedBackend::new(&backend);

// All should be gone now:
assert!(overlay_db
.load_candidate_votes(earliest_session - 1, &candidate_hash)
.unwrap()
.is_none());
// Earliest session should still be there:
assert!(overlay_db
.load_candidate_votes(earliest_session, &candidate_hash)
.unwrap()
.is_some());
// Old current session should still be there as well:
assert!(overlay_db
.load_candidate_votes(current_session - 1, &candidate_hash)
.unwrap()
.is_some());
}

#[test]
Expand Down Expand Up @@ -368,57 +541,40 @@ mod tests {
let mut backend = make_db();

let mut overlay_db = OverlayedBackend::new(&backend);
overlay_db.delete_candidate_votes(1, CandidateHash(Hash::repeat_byte(1)));

overlay_db.write_candidate_votes(
1,
CandidateHash(Hash::repeat_byte(1)),
CandidateVotes {
candidate_receipt: dummy_candidate_receipt(dummy_hash()),
candidate_receipt: dummy_candidate_receipt(Hash::random()),
valid: Vec::new(),
invalid: Vec::new(),
},
);

let write_ops = overlay_db.into_write_ops();
backend.write(write_ops).unwrap();

assert_eq!(
backend
.load_candidate_votes(1, &CandidateHash(Hash::repeat_byte(1)))
.unwrap()
.unwrap()
.candidate_receipt
.descriptor
.para_id,
ParaId::from(1),
);
let receipt = dummy_candidate_receipt(dummy_hash());

let mut overlay_db = OverlayedBackend::new(&backend);
overlay_db.write_candidate_votes(
1,
CandidateHash(Hash::repeat_byte(1)),
CandidateVotes {
candidate_receipt: {
let mut receipt = dummy_candidate_receipt(dummy_hash());
receipt.descriptor.para_id = ParaId::from(5_u32);

receipt
},
candidate_receipt: receipt.clone(),
valid: Vec::new(),
invalid: Vec::new(),
},
);

overlay_db.delete_candidate_votes(1, CandidateHash(Hash::repeat_byte(1)));

let write_ops = overlay_db.into_write_ops();
backend.write(write_ops).unwrap();

assert!(backend
.load_candidate_votes(1, &CandidateHash(Hash::repeat_byte(1)))
.unwrap()
.is_none());
assert_eq!(
backend
.load_candidate_votes(1, &CandidateHash(Hash::repeat_byte(1)))
.unwrap()
.unwrap()
.candidate_receipt,
receipt,
);
}

#[test]
Expand All @@ -434,6 +590,7 @@ mod tests {
let new_earliest_session = 5;
let current_session = 5 + DISPUTE_WINDOW.get();

let super_old_no_dispute = 1;
let very_old = 3;
let slightly_old = 4;
let very_recent = current_session - 1;
Expand All @@ -457,6 +614,7 @@ mod tests {
.collect(),
);

overlay_db.write_candidate_votes(super_old_no_dispute, hash_a, blank_candidate_votes());
overlay_db.write_candidate_votes(very_old, hash_a, blank_candidate_votes());

overlay_db.write_candidate_votes(slightly_old, hash_b, blank_candidate_votes());
Expand All @@ -483,6 +641,16 @@ mod tests {
.collect(),
);

// Votes are only cleaned up after actual write:
let write_ops = overlay_db.into_write_ops();
backend.write(write_ops).unwrap();

let overlay_db = OverlayedBackend::new(&backend);

assert!(overlay_db
.load_candidate_votes(super_old_no_dispute, &hash_a)
.unwrap()
.is_none());
assert!(overlay_db.load_candidate_votes(very_old, &hash_a).unwrap().is_none());
assert!(overlay_db.load_candidate_votes(slightly_old, &hash_b).unwrap().is_none());
assert!(overlay_db
Expand Down
6 changes: 5 additions & 1 deletion node/core/dispute-coordinator/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use futures::channel::oneshot;
use polkadot_node_subsystem::{errors::ChainApiError, SubsystemError};
use polkadot_node_subsystem_util::{rolling_session_window::SessionsUnavailable, runtime};

use crate::{participation, LOG_TARGET};
use crate::{db, participation, LOG_TARGET};
use parity_scale_codec::Error as CodecError;

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -53,6 +53,10 @@ pub enum Error {
#[error("Writing to database failed: {0}")]
DbWriteFailed(std::io::Error),

#[fatal]
#[error("Reading from database failed: {0}")]
DbReadFailed(db::v1::Error),

#[fatal]
#[error("Oneshot for receiving block number from chain API got cancelled")]
CanceledBlockNumber,
Expand Down
Loading