Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

ledger_cleanup_service: compact at a slower rate than purging #10414

Merged
merged 3 commits into from
Jun 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 49 additions & 10 deletions core/src/ledger_cleanup_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use solana_ledger::blockstore::{Blockstore, PurgeType};
use solana_ledger::blockstore_db::Result as BlockstoreResult;
use solana_measure::measure::Measure;
use solana_sdk::clock::Slot;
use solana_sdk::clock::{Slot, DEFAULT_TICKS_PER_SLOT, TICKS_PER_DAY};
use std::string::ToString;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError};
Expand Down Expand Up @@ -32,6 +32,10 @@ pub const DEFAULT_PURGE_SLOT_INTERVAL: u64 = 512;
// Delay between purges to cooperate with other blockstore users
pub const DEFAULT_DELAY_BETWEEN_PURGES: Duration = Duration::from_millis(500);

// Compacting at a slower interval than purging helps keep IOPS down.
// Once a day should be ample
const DEFAULT_COMPACTION_SLOT_INTERVAL: u64 = TICKS_PER_DAY / DEFAULT_TICKS_PER_SLOT;

pub struct LedgerCleanupService {
t_cleanup: JoinHandle<()>,
}
Expand All @@ -49,6 +53,8 @@ impl LedgerCleanupService {
);
let exit = exit.clone();
let mut last_purge_slot = 0;
let mut last_compaction_slot = 0;

let t_cleanup = Builder::new()
.name("solana-ledger-cleanup".to_string())
.spawn(move || loop {
Expand All @@ -62,6 +68,8 @@ impl LedgerCleanupService {
&mut last_purge_slot,
DEFAULT_PURGE_SLOT_INTERVAL,
Some(DEFAULT_DELAY_BETWEEN_PURGES),
&mut last_compaction_slot,
DEFAULT_COMPACTION_SLOT_INTERVAL,
) {
match e {
RecvTimeoutError::Disconnected => break,
Expand Down Expand Up @@ -116,7 +124,7 @@ impl LedgerCleanupService {
}
}

(true, lowest_cleanup_slot, first_slot, total_shreds)
(true, first_slot, lowest_cleanup_slot, total_shreds)
}

fn receive_new_roots(new_root_receiver: &Receiver<Slot>) -> Result<Slot, RecvTimeoutError> {
Expand All @@ -135,6 +143,8 @@ impl LedgerCleanupService {
last_purge_slot: &mut u64,
purge_interval: u64,
delay_between_purges: Option<Duration>,
last_compaction_slot: &mut u64,
compaction_interval: u64,
) -> Result<(), RecvTimeoutError> {
let root = Self::receive_new_roots(new_root_receiver)?;
if root - *last_purge_slot <= purge_interval {
Expand All @@ -143,19 +153,20 @@ impl LedgerCleanupService {

let disk_utilization_pre = blockstore.storage_size();
info!(
"purge: last_root={}, last_purge_slot={}, purge_interval={}, disk_utilization={:?}",
root, last_purge_slot, purge_interval, disk_utilization_pre
"purge: last_root={}, last_purge_slot={}, purge_interval={}, last_compaction_slot={}, disk_utilization={:?}",
root, last_purge_slot, purge_interval, last_compaction_slot, disk_utilization_pre
);
*last_purge_slot = root;

let (slots_to_clean, lowest_cleanup_slot, first_slot, total_shreds) =
let (slots_to_clean, purge_first_slot, lowest_cleanup_slot, total_shreds) =
Self::find_slots_to_clean(&blockstore, root, max_ledger_shreds);

if slots_to_clean {
info!(
"purging data from slots {} to {}",
first_slot, lowest_cleanup_slot
);
let mut compact_first_slot = std::u64::MAX;
if lowest_cleanup_slot.saturating_sub(*last_compaction_slot) > compaction_interval {
compact_first_slot = *last_compaction_slot;
*last_compaction_slot = lowest_cleanup_slot;
}

let purge_complete = Arc::new(AtomicBool::new(false));
let blockstore = blockstore.clone();
Expand All @@ -167,15 +178,37 @@ impl LedgerCleanupService {
*blockstore.lowest_cleanup_slot.write().unwrap() = lowest_cleanup_slot;
slot_update_time.stop();

info!(
"purging data from slots {} to {}",
purge_first_slot, lowest_cleanup_slot
);

let mut purge_time = Measure::start("purge_slots_with_delay");
blockstore.purge_slots_with_delay(
first_slot,
purge_first_slot,
lowest_cleanup_slot,
delay_between_purges,
PurgeType::PrimaryIndex,
);
purge_time.stop();
info!("{}", purge_time);

if compact_first_slot < lowest_cleanup_slot {
info!(
"compacting data from slots {} to {}",
compact_first_slot, lowest_cleanup_slot
);
if let Err(err) =
blockstore.compact_storage(compact_first_slot, lowest_cleanup_slot)
{
// This error is not fatal and indicates an internal error?
error!(
"Error: {:?}; Couldn't compact storage from {:?} to {:?}",
err, compact_first_slot, lowest_cleanup_slot
);
}
}

purge_complete1.store(true, Ordering::Relaxed);
})
.unwrap();
Expand Down Expand Up @@ -234,6 +267,7 @@ mod tests {

//send a signal to kill all but 5 shreds, which will be in the newest slots
let mut last_purge_slot = 0;
let mut last_compaction_slot = 0;
sender.send(50).unwrap();
LedgerCleanupService::cleanup_ledger(
&receiver,
Expand All @@ -242,6 +276,8 @@ mod tests {
&mut last_purge_slot,
10,
None,
&mut last_compaction_slot,
10,
)
.unwrap();

Expand Down Expand Up @@ -273,6 +309,7 @@ mod tests {
info!("{}", first_insert);

let mut last_purge_slot = 0;
let mut last_compaction_slot = 0;
let mut slot = initial_slots;
let mut num_slots = 6;
for _ in 0..5 {
Expand All @@ -297,6 +334,8 @@ mod tests {
&mut last_purge_slot,
10,
None,
&mut last_compaction_slot,
10,
)
.unwrap();
time.stop();
Expand Down
7 changes: 5 additions & 2 deletions core/tests/ledger_cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,14 +374,17 @@ mod tests {
// send signal to cleanup slots
let (sender, receiver) = channel();
sender.send(n).unwrap();
let mut next_purge_batch = 0;
let mut last_purge_slot = 0;
let mut last_compaction_slot = 0;
LedgerCleanupService::cleanup_ledger(
&receiver,
&blockstore,
max_ledger_shreds,
&mut next_purge_batch,
&mut last_purge_slot,
10,
None,
&mut last_compaction_slot,
10,
)
.unwrap();

Expand Down
24 changes: 13 additions & 11 deletions ledger/src/blockstore/blockstore_purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,18 @@ impl Blockstore {
}
}
}

if !self.no_compaction {
if let Err(e) = self.compact_storage(from_slot, to_slot) {
// This error is not fatal and indicates an internal error
error!(
"Error: {:?}; Couldn't compact storage from {:?} to {:?}",
e, from_slot, to_slot
);
}
}
}

// TODO: rename purge_slots() to purge_and_compact_slots()
pub fn purge_slots(&self, from_slot: Slot, to_slot: Slot) {
self.purge_slots_with_delay(from_slot, to_slot, None, PurgeType::Exact)
self.purge_slots_with_delay(from_slot, to_slot, None, PurgeType::Exact);
if let Err(e) = self.compact_storage(from_slot, to_slot) {
// This error is not fatal and indicates an internal error?
error!(
"Error: {:?}; Couldn't compact storage from {:?} to {:?}",
e, from_slot, to_slot
);
}
}

/// Ensures that the SlotMeta::next_slots vector for all slots contain no references in the
Expand Down Expand Up @@ -169,6 +167,10 @@ impl Blockstore {
}

pub fn compact_storage(&self, from_slot: Slot, to_slot: Slot) -> Result<bool> {
if self.no_compaction {
info!("compact_storage: compaction disabled");
return Ok(false);
}
info!("compact_storage: from {} to {}", from_slot, to_slot);
let mut compact_timer = Measure::start("compact_range");
let result = self
Expand Down