Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Hold read lock during startup shrink (#17309)
Browse files Browse the repository at this point in the history
* hold read lock during account scan of shrink

* rename and improve rusty
  • Loading branch information
jeffwashington authored May 21, 2021
1 parent 96cde36 commit 3f33242
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 35 deletions.
74 changes: 45 additions & 29 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1911,7 +1911,7 @@ impl AccountsDb {
);
}

fn do_shrink_slot_stores<'a, I>(&'a self, slot: Slot, stores: I)
fn do_shrink_slot_stores<'a, I>(&'a self, slot: Slot, stores: I, is_startup: bool)
where
I: Iterator<Item = &'a Arc<AccountStorageEntry>>,
{
Expand Down Expand Up @@ -1950,31 +1950,47 @@ impl AccountsDb {

let mut index_read_elapsed = Measure::start("index_read_elapsed");
let mut alive_total = 0;
let alive_accounts: Vec<_> = {
stored_accounts
.iter()
.filter(|(pubkey, stored_account)| {
if let Some(locked_entry) = self.accounts_index.get_account_read_entry(pubkey) {
let is_alive = locked_entry.slot_list().iter().any(|(_slot, i)| {
i.store_id == stored_account.store_id
&& i.offset == stored_account.account.offset
});
if !is_alive {
// This pubkey was found in the storage, but no longer exists in the index.
// It would have had a ref to the storage from the initial store, but it will
// not exist in the re-written slot. Unref it to keep the index consistent with
// rewriting the storage entries.
locked_entry.unref()
} else {
alive_total += stored_account.account_size as u64;
}
is_alive
let accounts_index_map_lock = if is_startup {
// at startup, there is nobody else to contend with the accounts_index read lock, so it is more efficient for us to keep it held
Some(self.accounts_index.get_account_maps_read_lock())
} else {
None
};
let accounts_index_map_lock_ref = accounts_index_map_lock.as_ref();

let alive_accounts: Vec<_> = stored_accounts
.iter()
.filter(|(pubkey, stored_account)| {
let lookup = if is_startup {
self.accounts_index.get_account_read_entry_with_lock(
pubkey,
accounts_index_map_lock_ref.unwrap(),
)
} else {
self.accounts_index.get_account_read_entry(pubkey)
};

if let Some(locked_entry) = lookup {
let is_alive = locked_entry.slot_list().iter().any(|(_slot, i)| {
i.store_id == stored_account.store_id
&& i.offset == stored_account.account.offset
});
if !is_alive {
// This pubkey was found in the storage, but no longer exists in the index.
// It would have had a ref to the storage from the initial store, but it will
// not exist in the re-written slot. Unref it to keep the index consistent with
// rewriting the storage entries.
locked_entry.unref()
} else {
false
alive_total += stored_account.account_size as u64;
}
})
.collect()
};
is_alive
} else {
false
}
})
.collect();
drop(accounts_index_map_lock);
index_read_elapsed.stop();
let aligned_total: u64 = self.page_align(alive_total);

Expand Down Expand Up @@ -2136,7 +2152,7 @@ impl AccountsDb {

// Reads all accounts in given slot's AppendVecs and filter only to alive,
// then create a minimum AppendVec filled with the alive.
fn shrink_slot_forced(&self, slot: Slot) -> usize {
fn shrink_slot_forced(&self, slot: Slot, is_startup: bool) -> usize {
debug!("shrink_slot_forced: slot: {}", slot);

if let Some(stores_lock) = self.storage.get_slot_stores(slot) {
Expand All @@ -2157,7 +2173,7 @@ impl AccountsDb {
);
return 0;
}
self.do_shrink_slot_stores(slot, stores.iter());
self.do_shrink_slot_stores(slot, stores.iter(), is_startup);
alive_count
} else {
0
Expand All @@ -2177,7 +2193,7 @@ impl AccountsDb {
let num_candidates = shrink_slots.len();
for (slot, slot_shrink_candidates) in shrink_slots {
let mut measure = Measure::start("shrink_candidate_slots-ms");
self.do_shrink_slot_stores(slot, slot_shrink_candidates.values());
self.do_shrink_slot_stores(slot, slot_shrink_candidates.values(), false);
measure.stop();
inc_new_counter_info!("shrink_candidate_slots-ms", measure.as_ms() as usize);
}
Expand All @@ -2190,13 +2206,13 @@ impl AccountsDb {
let chunk_size = std::cmp::max(slots.len() / 8, 1); // approximately 400k slots in a snapshot
slots.par_chunks(chunk_size).for_each(|slots| {
for slot in slots {
self.shrink_slot_forced(*slot);
self.shrink_slot_forced(*slot, is_startup);
}
});
} else {
for slot in self.all_slots_in_storage() {
if self.caching_enabled {
self.shrink_slot_forced(slot);
self.shrink_slot_forced(slot, false);
} else {
self.do_shrink_slot_forced_v1(slot);
}
Expand Down
35 changes: 29 additions & 6 deletions runtime/src/accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -838,10 +838,16 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
}

pub fn get_account_read_entry(&self, pubkey: &Pubkey) -> Option<ReadAccountMapEntry<T>> {
self.account_maps
.read()
.unwrap()
.get(pubkey)
let lock = self.get_account_maps_read_lock();
self.get_account_read_entry_with_lock(pubkey, &lock)
}

pub fn get_account_read_entry_with_lock(
&self,
pubkey: &Pubkey,
lock: &AccountMapsReadLock<'_, T>,
) -> Option<ReadAccountMapEntry<T>> {
lock.get(pubkey)
.cloned()
.map(ReadAccountMapEntry::from_account_map_entry)
}
Expand Down Expand Up @@ -1184,6 +1190,10 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
self.account_maps.write().unwrap()
}

pub(crate) fn get_account_maps_read_lock(&self) -> AccountMapsReadLock<T> {
self.account_maps.read().unwrap()
}

// Same functionally to upsert, but:
// 1. operates on a batch of items
// 2. holds the write lock for the duration of adding the items
Expand Down Expand Up @@ -2411,8 +2421,21 @@ pub mod tests {
}
assert!(gc.is_empty());

{
let entry = index.get_account_read_entry(&key).unwrap();
for lock in &[false, true] {
let read_lock = if *lock {
Some(index.get_account_maps_read_lock())
} else {
None
};

let entry = if *lock {
index
.get_account_read_entry_with_lock(&key, read_lock.as_ref().unwrap())
.unwrap()
} else {
index.get_account_read_entry(&key).unwrap()
};

assert_eq!(
entry.ref_count().load(Ordering::Relaxed),
if is_cached { 0 } else { 2 }
Expand Down

0 comments on commit 3f33242

Please sign in to comment.