From f3f1a030896fdb597d1b1a4785f067ff8e3cde5a Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Thu, 4 Jun 2020 11:33:53 -0700 Subject: [PATCH 1/3] ledger_cleanup_service: compact at a slower rate than purging --- core/src/ledger_cleanup_service.rs | 52 ++++++++++++++++++++--- core/tests/ledger_cleanup.rs | 7 ++- ledger/src/blockstore/blockstore_purge.rs | 24 ++++++----- 3 files changed, 63 insertions(+), 20 deletions(-) diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index ffcb7facb52ae5..dd6d92b48c3cf9 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -3,7 +3,7 @@ use solana_ledger::blockstore::{Blockstore, PurgeType}; use solana_ledger::blockstore_db::Result as BlockstoreResult; use solana_measure::measure::Measure; -use solana_sdk::clock::Slot; +use solana_sdk::clock::{Slot, DEFAULT_SLOTS_PER_EPOCH}; use std::string::ToString; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError}; @@ -32,6 +32,9 @@ pub const DEFAULT_PURGE_SLOT_INTERVAL: u64 = 512; // Delay between purges to cooperate with other blockstore users pub const DEFAULT_DELAY_BETWEEN_PURGES: Duration = Duration::from_millis(500); +// Compacting at a slower interval than purging helps keep IOPS down +const DEFAULT_COMPACTION_SLOT_INTERVAL: u64 = DEFAULT_SLOTS_PER_EPOCH; + pub struct LedgerCleanupService { t_cleanup: JoinHandle<()>, } @@ -49,6 +52,8 @@ impl LedgerCleanupService { ); let exit = exit.clone(); let mut last_purge_slot = 0; + let mut last_compaction_slot = 0; + let t_cleanup = Builder::new() .name("solana-ledger-cleanup".to_string()) .spawn(move || loop { @@ -62,6 +67,8 @@ impl LedgerCleanupService { &mut last_purge_slot, DEFAULT_PURGE_SLOT_INTERVAL, Some(DEFAULT_DELAY_BETWEEN_PURGES), + &mut last_compaction_slot, + DEFAULT_COMPACTION_SLOT_INTERVAL, ) { match e { RecvTimeoutError::Disconnected => break, @@ -135,6 +142,8 @@ impl LedgerCleanupService { last_purge_slot: &mut u64, purge_interval: u64, delay_between_purges: Option, + last_compaction_slot: &mut u64, + compaction_interval: u64, ) -> Result<(), RecvTimeoutError> { let root = Self::receive_new_roots(new_root_receiver)?; if root - *last_purge_slot <= purge_interval { @@ -143,8 +152,8 @@ impl LedgerCleanupService { let disk_utilization_pre = blockstore.storage_size(); info!( - "purge: last_root={}, last_purge_slot={}, purge_interval={}, disk_utilization={:?}", - root, last_purge_slot, purge_interval, disk_utilization_pre + "purge: last_root={}, last_purge_slot={}, purge_interval={}, last_compaction_slot={}, disk_utilization={:?}", + root, last_purge_slot, purge_interval, last_compaction_slot, disk_utilization_pre ); *last_purge_slot = root; @@ -152,10 +161,11 @@ impl LedgerCleanupService { Self::find_slots_to_clean(&blockstore, root, max_ledger_shreds); if slots_to_clean { - info!( - "purging data from slots {} to {}", - first_slot, lowest_cleanup_slot - ); + let mut compact_first_slot = std::u64::MAX; + if root.saturating_sub(*last_compaction_slot) > compaction_interval { + compact_first_slot = *last_compaction_slot; + *last_compaction_slot = lowest_cleanup_slot; + } let purge_complete = Arc::new(AtomicBool::new(false)); let blockstore = blockstore.clone(); @@ -167,6 +177,11 @@ impl LedgerCleanupService { *blockstore.lowest_cleanup_slot.write().unwrap() = lowest_cleanup_slot; slot_update_time.stop(); + info!( + "purging data from slots {} to {}", + first_slot, lowest_cleanup_slot + ); + let mut purge_time = Measure::start("purge_slots_with_delay"); blockstore.purge_slots_with_delay( first_slot, @@ -176,6 +191,23 @@ impl LedgerCleanupService { ); purge_time.stop(); info!("{}", purge_time); + + if compact_first_slot < lowest_cleanup_slot { + info!( + "compacting data from slots {} to {}", + compact_first_slot, lowest_cleanup_slot + ); + if let Err(err) = + blockstore.compact_storage(compact_first_slot, lowest_cleanup_slot) + { + // This error is not fatal and indicates an internal error? + error!( + "Error: {:?}; Couldn't compact storage from {:?} to {:?}", + err, compact_first_slot, lowest_cleanup_slot + ); + } + } + purge_complete1.store(true, Ordering::Relaxed); }) .unwrap(); @@ -234,6 +266,7 @@ mod tests { //send a signal to kill all but 5 shreds, which will be in the newest slots let mut last_purge_slot = 0; + let mut last_compaction_slot = 0; sender.send(50).unwrap(); LedgerCleanupService::cleanup_ledger( &receiver, @@ -242,6 +275,8 @@ mod tests { &mut last_purge_slot, 10, None, + &mut last_compaction_slot, + 10, ) .unwrap(); @@ -273,6 +308,7 @@ mod tests { info!("{}", first_insert); let mut last_purge_slot = 0; + let mut last_compaction_slot = 0; let mut slot = initial_slots; let mut num_slots = 6; for _ in 0..5 { @@ -297,6 +333,8 @@ mod tests { &mut last_purge_slot, 10, None, + &mut last_compaction_slot, + 10, ) .unwrap(); time.stop(); diff --git a/core/tests/ledger_cleanup.rs b/core/tests/ledger_cleanup.rs index d7f8a2ad061bff..ab0029b2fe067f 100644 --- a/core/tests/ledger_cleanup.rs +++ b/core/tests/ledger_cleanup.rs @@ -374,14 +374,17 @@ mod tests { // send signal to cleanup slots let (sender, receiver) = channel(); sender.send(n).unwrap(); - let mut next_purge_batch = 0; + let mut last_purge_slot = 0; + let mut last_compaction_slot = 0; LedgerCleanupService::cleanup_ledger( &receiver, &blockstore, max_ledger_shreds, - &mut next_purge_batch, + &mut last_purge_slot, 10, None, + &mut last_compaction_slot, + 10, ) .unwrap(); diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index 7e705195f937f6..bfef9aa6dcd0ff 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -35,20 +35,18 @@ impl Blockstore { } } } - - if !self.no_compaction { - if let Err(e) = self.compact_storage(from_slot, to_slot) { - // This error is not fatal and indicates an internal error - error!( - "Error: {:?}; Couldn't compact storage from {:?} to {:?}", - e, from_slot, to_slot - ); - } - } } + // TODO: rename purge_slots() to purge_and_compact_slots() pub fn purge_slots(&self, from_slot: Slot, to_slot: Slot) { - self.purge_slots_with_delay(from_slot, to_slot, None, PurgeType::Exact) + self.purge_slots_with_delay(from_slot, to_slot, None, PurgeType::Exact); + if let Err(e) = self.compact_storage(from_slot, to_slot) { + // This error is not fatal and indicates an internal error? + error!( + "Error: {:?}; Couldn't compact storage from {:?} to {:?}", + e, from_slot, to_slot + ); + } } /// Ensures that the SlotMeta::next_slots vector for all slots contain no references in the @@ -169,6 +167,10 @@ impl Blockstore { } pub fn compact_storage(&self, from_slot: Slot, to_slot: Slot) -> Result { + if self.no_compaction { + info!("compact_storage: compaction disabled"); + return Ok(false); + } info!("compact_storage: from {} to {}", from_slot, to_slot); let mut compact_timer = Measure::start("compact_range"); let result = self From 5658a6cbb9e90f72e7876470c485f08a2753f678 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Thu, 4 Jun 2020 12:49:13 -0700 Subject: [PATCH 2/3] fixup --- core/src/ledger_cleanup_service.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index dd6d92b48c3cf9..2868a4d5e41f4e 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -123,7 +123,7 @@ impl LedgerCleanupService { } } - (true, lowest_cleanup_slot, first_slot, total_shreds) + (true, first_slot, lowest_cleanup_slot, total_shreds) } fn receive_new_roots(new_root_receiver: &Receiver) -> Result { @@ -157,12 +157,12 @@ impl LedgerCleanupService { ); *last_purge_slot = root; - let (slots_to_clean, lowest_cleanup_slot, first_slot, total_shreds) = + let (slots_to_clean, purge_first_slot, lowest_cleanup_slot, total_shreds) = Self::find_slots_to_clean(&blockstore, root, max_ledger_shreds); if slots_to_clean { let mut compact_first_slot = std::u64::MAX; - if root.saturating_sub(*last_compaction_slot) > compaction_interval { + if lowest_cleanup_slot.saturating_sub(*last_compaction_slot) > compaction_interval { compact_first_slot = *last_compaction_slot; *last_compaction_slot = lowest_cleanup_slot; } @@ -179,12 +179,12 @@ impl LedgerCleanupService { info!( "purging data from slots {} to {}", - first_slot, lowest_cleanup_slot + purge_first_slot, lowest_cleanup_slot ); let mut purge_time = Measure::start("purge_slots_with_delay"); blockstore.purge_slots_with_delay( - first_slot, + purge_first_slot, lowest_cleanup_slot, delay_between_purges, PurgeType::PrimaryIndex, From 630407a8fc596697fd5a4bc6723ac428c673c83f Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Thu, 4 Jun 2020 18:51:25 -0700 Subject: [PATCH 3/3] Compact once a day --- core/src/ledger_cleanup_service.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index 2868a4d5e41f4e..9b111e2e44e7ad 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -3,7 +3,7 @@ use solana_ledger::blockstore::{Blockstore, PurgeType}; use solana_ledger::blockstore_db::Result as BlockstoreResult; use solana_measure::measure::Measure; -use solana_sdk::clock::{Slot, DEFAULT_SLOTS_PER_EPOCH}; +use solana_sdk::clock::{Slot, DEFAULT_TICKS_PER_SLOT, TICKS_PER_DAY}; use std::string::ToString; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError}; @@ -32,8 +32,9 @@ pub const DEFAULT_PURGE_SLOT_INTERVAL: u64 = 512; // Delay between purges to cooperate with other blockstore users pub const DEFAULT_DELAY_BETWEEN_PURGES: Duration = Duration::from_millis(500); -// Compacting at a slower interval than purging helps keep IOPS down -const DEFAULT_COMPACTION_SLOT_INTERVAL: u64 = DEFAULT_SLOTS_PER_EPOCH; +// Compacting at a slower interval than purging helps keep IOPS down. +// Once a day should be ample +const DEFAULT_COMPACTION_SLOT_INTERVAL: u64 = TICKS_PER_DAY / DEFAULT_TICKS_PER_SLOT; pub struct LedgerCleanupService { t_cleanup: JoinHandle<()>,