Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Fix a storage leak in parachains db (#5594)
Browse files Browse the repository at this point in the history
* Fix cleanup of old votes.

* Cleanup.

* Get rid of redundant import

* Tests + logging

* Fix db key name.

* Add some reasoning to batch size.

* Add dispute data to indexed columns

* Fix fmt

* Add helper function.

* Fix typos.

* Update node/core/dispute-coordinator/src/db/v1.rs

Co-authored-by: Andronik <write@reusable.software>

* Update node/core/dispute-coordinator/src/db/v1.rs

Co-authored-by: Andronik <write@reusable.software>

* Add metric for how long cleanup takes.

Co-authored-by: Andronik <write@reusable.software>
  • Loading branch information
2 people authored and al3mart committed Jul 14, 2022
1 parent c88246d commit 80c1adb
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 50 deletions.
7 changes: 0 additions & 7 deletions node/core/dispute-coordinator/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,6 @@ impl<'a, B: 'a + Backend> OverlayedBackend<'a, B> {
self.candidate_votes.insert((session, candidate_hash), Some(votes));
}

/// Prepare a deletion of the candidate votes under the indicated candidate.
///
/// Later calls to this function for the same candidate will override earlier ones.
pub fn delete_candidate_votes(&mut self, session: SessionIndex, candidate_hash: CandidateHash) {
self.candidate_votes.insert((session, candidate_hash), None);
}

/// Transform this backend into a set of write-ops to be written to the inner backend.
pub fn into_write_ops(self) -> impl Iterator<Item = BackendWriteOp> {
let earliest_session_ops = self
Expand Down
240 changes: 204 additions & 36 deletions node/core/dispute-coordinator/src/db/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,80 @@ use parity_scale_codec::{Decode, Encode};
use crate::{
backend::{Backend, BackendWriteOp, OverlayedBackend},
error::{FatalError, FatalResult},
metrics::Metrics,
status::DisputeStatus,
DISPUTE_WINDOW,
DISPUTE_WINDOW, LOG_TARGET,
};

const RECENT_DISPUTES_KEY: &[u8; 15] = b"recent-disputes";
const EARLIEST_SESSION_KEY: &[u8; 16] = b"earliest-session";
const CANDIDATE_VOTES_SUBKEY: &[u8; 15] = b"candidate-votes";
/// Until what session have votes been cleaned up already?
const CLEANED_VOTES_WATERMARK_KEY: &[u8; 23] = b"cleaned-votes-watermark";

/// Restrict number of cleanup operations.
///
/// On the first run we are starting at session 0 going up all the way to the current session -
/// this should not be done at once, but rather in smaller batches so nodes won't get stalled by
/// this.
///
/// 300 is with session duration of 1 hour and 30 parachains around <3_000_000 key purges in the worst
/// case. Which is already quite a lot, at the same time we have around 21_000 sessions on
/// Kusama. This means at 300 purged sessions per session, cleaning everything up will take
/// around 3 days. Depending on how severe disk usage becomes, we might want to bump the batch
/// size, at the cost of risking issues at session boundaries (performance).
#[cfg(test)]
const MAX_CLEAN_BATCH_SIZE: u32 = 10;
#[cfg(not(test))]
const MAX_CLEAN_BATCH_SIZE: u32 = 300;

pub struct DbBackend {
inner: Arc<dyn Database>,
config: ColumnConfiguration,
metrics: Metrics,
}

impl DbBackend {
pub fn new(db: Arc<dyn Database>, config: ColumnConfiguration) -> Self {
Self { inner: db, config }
pub fn new(db: Arc<dyn Database>, config: ColumnConfiguration, metrics: Metrics) -> Self {
Self { inner: db, config, metrics }
}

/// Cleanup old votes.
///
/// Should be called whenever a new earliest session gets written.
fn add_vote_cleanup_tx(
&mut self,
tx: &mut DBTransaction,
earliest_session: SessionIndex,
) -> FatalResult<()> {
// Cleanup old votes in db:
let watermark = load_cleaned_votes_watermark(&*self.inner, &self.config)?.unwrap_or(0);
let clean_until = if earliest_session.saturating_sub(watermark) > MAX_CLEAN_BATCH_SIZE {
watermark + MAX_CLEAN_BATCH_SIZE
} else {
earliest_session
};
gum::trace!(
target: LOG_TARGET,
?watermark,
?clean_until,
?earliest_session,
?MAX_CLEAN_BATCH_SIZE,
"WriteEarliestSession"
);

for index in watermark..clean_until {
gum::trace!(
target: LOG_TARGET,
?index,
encoded = ?candidate_votes_session_prefix(index),
"Cleaning votes for session index"
);
tx.delete_prefix(self.config.col_data, &candidate_votes_session_prefix(index));
}
// New watermark:
tx.put_vec(self.config.col_data, CLEANED_VOTES_WATERMARK_KEY, clean_until.encode());
Ok(())
}
}

Expand All @@ -71,20 +129,32 @@ impl Backend for DbBackend {

/// Atomically writes the list of operations, with later operations taking precedence over
/// prior.
///
/// This also takes care of purging old votes (of obsolete sessions).
fn write<I>(&mut self, ops: I) -> FatalResult<()>
where
I: IntoIterator<Item = BackendWriteOp>,
{
let mut tx = DBTransaction::new();
// Make sure the whole process is timed, including the actual transaction flush:
let mut cleanup_timer = None;
for op in ops {
match op {
BackendWriteOp::WriteEarliestSession(session) => {
cleanup_timer = match cleanup_timer.take() {
None => Some(self.metrics.time_vote_cleanup()),
Some(t) => Some(t),
};
self.add_vote_cleanup_tx(&mut tx, session)?;

// Actually write the earliest session.
tx.put_vec(self.config.col_data, EARLIEST_SESSION_KEY, session.encode());
},
BackendWriteOp::WriteRecentDisputes(recent_disputes) => {
tx.put_vec(self.config.col_data, RECENT_DISPUTES_KEY, recent_disputes.encode());
},
BackendWriteOp::WriteCandidateVotes(session, candidate_hash, votes) => {
gum::trace!(target: LOG_TARGET, ?session, "Writing candidate votes");
tx.put_vec(
self.config.col_data,
&candidate_votes_key(session, &candidate_hash),
Expand Down Expand Up @@ -112,6 +182,15 @@ fn candidate_votes_key(session: SessionIndex, candidate_hash: &CandidateHash) ->
buf
}

fn candidate_votes_session_prefix(session: SessionIndex) -> [u8; 15 + 4] {
let mut buf = [0u8; 15 + 4];
buf[..15].copy_from_slice(CANDIDATE_VOTES_SUBKEY);

// big-endian encoding is used to ensure lexicographic ordering.
buf[15..][..4].copy_from_slice(&session.to_be_bytes());
buf
}

/// Column configuration information for the DB.
#[derive(Debug, Clone)]
pub struct ColumnConfiguration {
Expand Down Expand Up @@ -244,9 +323,7 @@ pub(crate) fn note_current_session(

if pruned_disputes.len() != 0 {
overlay_db.write_recent_disputes(new_recent_disputes);
for ((session, candidate_hash), _) in pruned_disputes {
overlay_db.delete_candidate_votes(session, candidate_hash);
}
// Note: Deleting old candidate votes is handled in `write` based on the earliest session.
}
}
},
Expand All @@ -258,18 +335,114 @@ pub(crate) fn note_current_session(
Ok(())
}

/// Until what session votes have been cleaned up already.
///
/// That is the db has already been purged of votes for sessions older than the returned
/// `SessionIndex`.
fn load_cleaned_votes_watermark(
db: &dyn Database,
config: &ColumnConfiguration,
) -> FatalResult<Option<SessionIndex>> {
load_decode(db, config.col_data, CLEANED_VOTES_WATERMARK_KEY)
.map_err(|e| FatalError::DbReadFailed(e))
}

#[cfg(test)]
mod tests {

use super::*;
use ::test_helpers::{dummy_candidate_receipt, dummy_hash};
use polkadot_primitives::v2::{Hash, Id as ParaId};

fn make_db() -> DbBackend {
let db = kvdb_memorydb::create(1);
let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]);
let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[0]);
let store = Arc::new(db);
let config = ColumnConfiguration { col_data: 0 };
DbBackend::new(store, config)
DbBackend::new(store, config, Metrics::default())
}

#[test]
fn max_clean_batch_size_is_honored() {
let mut backend = make_db();

let mut overlay_db = OverlayedBackend::new(&backend);
let current_session = MAX_CLEAN_BATCH_SIZE + DISPUTE_WINDOW.get() + 3;
let earliest_session = current_session - DISPUTE_WINDOW.get();

overlay_db.write_earliest_session(0);
let candidate_hash = CandidateHash(Hash::repeat_byte(1));

for session in 0..current_session + 1 {
overlay_db.write_candidate_votes(
session,
candidate_hash,
CandidateVotes {
candidate_receipt: dummy_candidate_receipt(dummy_hash()),
valid: Vec::new(),
invalid: Vec::new(),
},
);
}
assert!(overlay_db.load_candidate_votes(0, &candidate_hash).unwrap().is_some());
assert!(overlay_db
.load_candidate_votes(MAX_CLEAN_BATCH_SIZE - 1, &candidate_hash)
.unwrap()
.is_some());
assert!(overlay_db
.load_candidate_votes(MAX_CLEAN_BATCH_SIZE, &candidate_hash)
.unwrap()
.is_some());

// Cleanup only works for votes that have been written already - so write.
let write_ops = overlay_db.into_write_ops();
backend.write(write_ops).unwrap();

let mut overlay_db = OverlayedBackend::new(&backend);

gum::trace!(target: LOG_TARGET, ?current_session, "Noting current session");
note_current_session(&mut overlay_db, current_session).unwrap();

let write_ops = overlay_db.into_write_ops();
backend.write(write_ops).unwrap();

let mut overlay_db = OverlayedBackend::new(&backend);

assert!(overlay_db
.load_candidate_votes(MAX_CLEAN_BATCH_SIZE - 1, &candidate_hash)
.unwrap()
.is_none());
// After batch size votes should still be there:
assert!(overlay_db
.load_candidate_votes(MAX_CLEAN_BATCH_SIZE, &candidate_hash)
.unwrap()
.is_some());

let current_session = current_session + 1;
let earliest_session = earliest_session + 1;

note_current_session(&mut overlay_db, current_session).unwrap();

let write_ops = overlay_db.into_write_ops();
backend.write(write_ops).unwrap();

let overlay_db = OverlayedBackend::new(&backend);

// All should be gone now:
assert!(overlay_db
.load_candidate_votes(earliest_session - 1, &candidate_hash)
.unwrap()
.is_none());
// Earliest session should still be there:
assert!(overlay_db
.load_candidate_votes(earliest_session, &candidate_hash)
.unwrap()
.is_some());
// Old current session should still be there as well:
assert!(overlay_db
.load_candidate_votes(current_session - 1, &candidate_hash)
.unwrap()
.is_some());
}

#[test]
Expand Down Expand Up @@ -368,57 +541,40 @@ mod tests {
let mut backend = make_db();

let mut overlay_db = OverlayedBackend::new(&backend);
overlay_db.delete_candidate_votes(1, CandidateHash(Hash::repeat_byte(1)));

overlay_db.write_candidate_votes(
1,
CandidateHash(Hash::repeat_byte(1)),
CandidateVotes {
candidate_receipt: dummy_candidate_receipt(dummy_hash()),
candidate_receipt: dummy_candidate_receipt(Hash::random()),
valid: Vec::new(),
invalid: Vec::new(),
},
);

let write_ops = overlay_db.into_write_ops();
backend.write(write_ops).unwrap();

assert_eq!(
backend
.load_candidate_votes(1, &CandidateHash(Hash::repeat_byte(1)))
.unwrap()
.unwrap()
.candidate_receipt
.descriptor
.para_id,
ParaId::from(1),
);
let receipt = dummy_candidate_receipt(dummy_hash());

let mut overlay_db = OverlayedBackend::new(&backend);
overlay_db.write_candidate_votes(
1,
CandidateHash(Hash::repeat_byte(1)),
CandidateVotes {
candidate_receipt: {
let mut receipt = dummy_candidate_receipt(dummy_hash());
receipt.descriptor.para_id = ParaId::from(5_u32);

receipt
},
candidate_receipt: receipt.clone(),
valid: Vec::new(),
invalid: Vec::new(),
},
);

overlay_db.delete_candidate_votes(1, CandidateHash(Hash::repeat_byte(1)));

let write_ops = overlay_db.into_write_ops();
backend.write(write_ops).unwrap();

assert!(backend
.load_candidate_votes(1, &CandidateHash(Hash::repeat_byte(1)))
.unwrap()
.is_none());
assert_eq!(
backend
.load_candidate_votes(1, &CandidateHash(Hash::repeat_byte(1)))
.unwrap()
.unwrap()
.candidate_receipt,
receipt,
);
}

#[test]
Expand All @@ -434,6 +590,7 @@ mod tests {
let new_earliest_session = 5;
let current_session = 5 + DISPUTE_WINDOW.get();

let super_old_no_dispute = 1;
let very_old = 3;
let slightly_old = 4;
let very_recent = current_session - 1;
Expand All @@ -457,6 +614,7 @@ mod tests {
.collect(),
);

overlay_db.write_candidate_votes(super_old_no_dispute, hash_a, blank_candidate_votes());
overlay_db.write_candidate_votes(very_old, hash_a, blank_candidate_votes());

overlay_db.write_candidate_votes(slightly_old, hash_b, blank_candidate_votes());
Expand All @@ -483,6 +641,16 @@ mod tests {
.collect(),
);

// Votes are only cleaned up after actual write:
let write_ops = overlay_db.into_write_ops();
backend.write(write_ops).unwrap();

let overlay_db = OverlayedBackend::new(&backend);

assert!(overlay_db
.load_candidate_votes(super_old_no_dispute, &hash_a)
.unwrap()
.is_none());
assert!(overlay_db.load_candidate_votes(very_old, &hash_a).unwrap().is_none());
assert!(overlay_db.load_candidate_votes(slightly_old, &hash_b).unwrap().is_none());
assert!(overlay_db
Expand Down
6 changes: 5 additions & 1 deletion node/core/dispute-coordinator/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use futures::channel::oneshot;
use polkadot_node_subsystem::{errors::ChainApiError, SubsystemError};
use polkadot_node_subsystem_util::{rolling_session_window::SessionsUnavailable, runtime};

use crate::{participation, LOG_TARGET};
use crate::{db, participation, LOG_TARGET};
use parity_scale_codec::Error as CodecError;

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -53,6 +53,10 @@ pub enum Error {
#[error("Writing to database failed: {0}")]
DbWriteFailed(std::io::Error),

#[fatal]
#[error("Reading from database failed: {0}")]
DbReadFailed(db::v1::Error),

#[fatal]
#[error("Oneshot for receiving block number from chain API got cancelled")]
CanceledBlockNumber,
Expand Down
Loading

0 comments on commit 80c1adb

Please sign in to comment.