Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blockstore: clean/save old TransactionMemos sensibly #33678

Merged
merged 8 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2154,6 +2154,8 @@ impl Blockstore {
}
if highest_primary_index_slot.is_some() {
self.set_highest_primary_index_slot(highest_primary_index_slot);
} else {
self.db.set_clean_slot_0(true);
}
Ok(())
}
Expand All @@ -2167,6 +2169,9 @@ impl Blockstore {
self.transaction_status_index_cf.delete(1)?;
}
}
if w_highest_primary_index_slot.is_none() {
self.db.set_clean_slot_0(true);
}
Ok(())
}

Expand Down
105 changes: 105 additions & 0 deletions ledger/src/blockstore/blockstore_purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -990,4 +990,109 @@ pub mod tests {
}
assert_eq!(count, max_slot - (oldest_slot - 1));
}

#[test]
fn test_purge_transaction_memos_compaction_filter() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let oldest_slot = 5;

fn random_signature() -> Signature {
use rand::Rng;

let mut key = [0u8; 64];
rand::thread_rng().fill(&mut key[..]);
Signature::from(key)
}

// Insert some deprecated TransactionMemos
blockstore
.transaction_memos_cf
.put_deprecated(random_signature(), &"this is a memo".to_string())
.unwrap();
blockstore
.transaction_memos_cf
.put_deprecated(random_signature(), &"another memo".to_string())
.unwrap();
// Set clean_slot_0 to false, since we have deprecated memos
blockstore.db.set_clean_slot_0(false);

// Insert some current TransactionMemos
blockstore
.transaction_memos_cf
.put(
(random_signature(), oldest_slot - 1),
&"this is a new memo in slot 4".to_string(),
)
.unwrap();
blockstore
.transaction_memos_cf
.put(
(random_signature(), oldest_slot),
&"this is a memo in slot 5 ".to_string(),
)
.unwrap();

let first_index = {
let mut memos_iterator = blockstore
.transaction_memos_cf
.iterator_cf_raw_key(IteratorMode::Start);
memos_iterator.next().unwrap().unwrap().0
};
let last_index = {
let mut memos_iterator = blockstore
.transaction_memos_cf
.iterator_cf_raw_key(IteratorMode::End);
memos_iterator.next().unwrap().unwrap().0
};

// Purge at slot 0 should not affect any memos
blockstore.db.set_oldest_slot(0);
blockstore
.db
.compact_range_cf::<cf::TransactionMemos>(&first_index, &last_index);
let memos_iterator = blockstore
.transaction_memos_cf
.iterator_cf_raw_key(IteratorMode::Start);
let mut count = 0;
for item in memos_iterator {
let _item = item.unwrap();
count += 1;
}
assert_eq!(count, 4);

// Purge at oldest_slot without clean_slot_0 only purges the current memo at slot 4
blockstore.db.set_oldest_slot(oldest_slot);
blockstore
.db
.compact_range_cf::<cf::TransactionMemos>(&first_index, &last_index);
let memos_iterator = blockstore
.transaction_memos_cf
.iterator_cf_raw_key(IteratorMode::Start);
let mut count = 0;
for item in memos_iterator {
let (key, _value) = item.unwrap();
let slot = <cf::TransactionMemos as Column>::index(&key).1;
assert!(slot == 0 || slot >= oldest_slot);
count += 1;
}
assert_eq!(count, 3);

// Purge at oldest_slot with clean_slot_0 purges deprecated memos
blockstore.db.set_clean_slot_0(true);
blockstore
.db
.compact_range_cf::<cf::TransactionMemos>(&first_index, &last_index);
let memos_iterator = blockstore
.transaction_memos_cf
.iterator_cf_raw_key(IteratorMode::Start);
let mut count = 0;
for item in memos_iterator {
let (key, _value) = item.unwrap();
let slot = <cf::TransactionMemos as Column>::index(&key).1;
assert!(slot >= oldest_slot);
count += 1;
}
assert_eq!(count, 1);
}
}
32 changes: 27 additions & 5 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use {
marker::PhantomData,
path::Path,
sync::{
atomic::{AtomicU64, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
},
Expand Down Expand Up @@ -348,15 +348,18 @@ pub mod columns {
}

#[derive(Default, Clone, Debug)]
struct OldestSlot(Arc<AtomicU64>);
struct OldestSlot {
slot: Arc<AtomicU64>,
clean_slot_0: Arc<AtomicBool>,
}

impl OldestSlot {
pub fn set(&self, oldest_slot: Slot) {
// this is independently used for compaction_filter without any data dependency.
// also, compaction_filters are created via its factories, creating short-lived copies of
// this atomic value for the single job of compaction. So, Relaxed store can be justified
// in total
self.0.store(oldest_slot, Ordering::Relaxed);
self.slot.store(oldest_slot, Ordering::Relaxed);
}

pub fn get(&self) -> Slot {
Expand All @@ -365,7 +368,15 @@ impl OldestSlot {
// requirement at the moment
// also eventual propagation (very Relaxed) load is Ok, because compaction by nature doesn't
// require strictly synchronized semantics in this regard
self.0.load(Ordering::Relaxed)
self.slot.load(Ordering::Relaxed)
}

pub(crate) fn set_clean_slot_0(&self, clean_slot_0: bool) {
self.clean_slot_0.store(clean_slot_0, Ordering::Relaxed);
}

pub(crate) fn get_clean_slot_0(&self) -> bool {
self.clean_slot_0.load(Ordering::Relaxed)
}
}

Expand Down Expand Up @@ -1427,6 +1438,10 @@ impl Database {
self.backend.oldest_slot.set(oldest_slot);
}

pub(crate) fn set_clean_slot_0(&self, clean_slot_0: bool) {
self.backend.oldest_slot.set_clean_slot_0(clean_slot_0);
}

pub fn live_files_metadata(&self) -> Result<Vec<LiveFile>> {
self.backend.live_files_metadata()
}
Expand Down Expand Up @@ -1835,6 +1850,10 @@ impl<'a> WriteBatch<'a> {
struct PurgedSlotFilter<C: Column + ColumnName> {
/// The oldest slot to keep; any slot < oldest_slot will be removed
oldest_slot: Slot,
/// Whether to preserve keys that return slot 0, even when oldest_slot > 0.
// This is used to delete old column data that wasn't keyed with a Slot, and so always returns
// `C::slot() == 0`
clean_slot_0: bool,
name: CString,
_phantom: PhantomData<C>,
}
Expand All @@ -1844,7 +1863,7 @@ impl<C: Column + ColumnName> CompactionFilter for PurgedSlotFilter<C> {
use rocksdb::CompactionDecision::*;

let slot_in_key = C::slot(C::index(key));
if slot_in_key >= self.oldest_slot {
if slot_in_key >= self.oldest_slot || (slot_in_key == 0 && !self.clean_slot_0) {
Keep
} else {
Remove
Expand All @@ -1867,8 +1886,10 @@ impl<C: Column + ColumnName> CompactionFilterFactory for PurgedSlotFilterFactory

fn create(&mut self, _context: CompactionFilterContext) -> Self::Filter {
let copied_oldest_slot = self.oldest_slot.get();
let copied_clean_slot_0 = self.oldest_slot.get_clean_slot_0();
PurgedSlotFilter::<C> {
oldest_slot: copied_oldest_slot,
clean_slot_0: copied_clean_slot_0,
name: CString::new(format!(
"purged_slot_filter({}, {:?})",
C::NAME,
Expand Down Expand Up @@ -2113,6 +2134,7 @@ pub mod tests {
is_manual_compaction: true,
};
let oldest_slot = OldestSlot::default();
oldest_slot.set_clean_slot_0(true);

let mut factory = PurgedSlotFilterFactory::<ShredData> {
oldest_slot: oldest_slot.clone(),
Expand Down