Skip to content

Commit

Permalink
AcctIdx: insert goes directly to disk to avoid unnecessary allocations (
Browse files Browse the repository at this point in the history
solana-labs#21490)

* AcctIdx: upsert avoids unnecessary allocation (during startup)

* feedback
  • Loading branch information
jeffwashington authored and pull[bot] committed Jun 23, 2022
1 parent f06bf7d commit 7ca5365
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 54 deletions.
11 changes: 11 additions & 0 deletions runtime/src/accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,17 @@ pub enum PreAllocatedAccountMapEntry<T: IndexValue> {
Raw((Slot, T)),
}

impl<T: IndexValue> ZeroLamport for PreAllocatedAccountMapEntry<T> {
fn is_zero_lamport(&self) -> bool {
match self {
PreAllocatedAccountMapEntry::Entry(entry) => {
entry.slot_list.read().unwrap()[0].1.is_zero_lamport()
}
PreAllocatedAccountMapEntry::Raw(raw) => raw.1.is_zero_lamport(),
}
}
}

impl<T: IndexValue> From<PreAllocatedAccountMapEntry<T>> for (Slot, T) {
fn from(source: PreAllocatedAccountMapEntry<T>) -> (Slot, T) {
match source {
Expand Down
140 changes: 86 additions & 54 deletions runtime/src/in_mem_accounts_index.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::accounts_index::{
AccountMapEntry, AccountMapEntryInner, AccountMapEntryMeta, IndexValue,
PreAllocatedAccountMapEntry, RefCount, SlotList, SlotSlice,
PreAllocatedAccountMapEntry, RefCount, SlotList, SlotSlice, ZeroLamport,
};
use crate::bucket_map_holder::{Age, BucketMapHolder};
use crate::bucket_map_holder_stats::BucketMapHolderStats;
Expand All @@ -9,7 +9,10 @@ use rand::Rng;
use solana_bucket_map::bucket_api::BucketApi;
use solana_measure::measure::Measure;
use solana_sdk::{clock::Slot, pubkey::Pubkey};
use std::collections::{hash_map::Entry, HashMap};
use std::collections::{
hash_map::{Entry, VacantEntry},
HashMap,
};
use std::ops::{Bound, RangeBounds, RangeInclusive};
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering};
use std::sync::{Arc, RwLock, RwLockWriteGuard};
Expand Down Expand Up @@ -351,24 +354,41 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
}
Entry::Vacant(vacant) => {
// not in cache, look on disk
let disk_entry = self.load_account_entry_from_disk(vacant.key());
let new_value = if let Some(disk_entry) = disk_entry {
// on disk, so merge new_value with what was on disk
Self::lock_and_update_slot_list(
&disk_entry,
new_value.into(),
let directly_to_disk = self.storage.get_startup();
if directly_to_disk {
// We may like this to always run, but it is unclear.
// If disk bucket needs to resize, then this call can stall for a long time.
// Right now, we know it is safe during startup.
let already_existed = self.upsert_on_disk(
vacant,
new_value,
reclaims,
previous_slot_entry_was_cached,
);
disk_entry
if !already_existed {
self.stats().insert_or_delete(true, self.bin);
}
} else {
// not on disk, so insert new thing
self.stats().insert_or_delete(true, self.bin);
new_value.into_account_map_entry(&self.storage)
};
assert!(new_value.dirty());
vacant.insert(new_value);
self.stats().insert_or_delete_mem(true, self.bin);
// go to in-mem cache first
let disk_entry = self.load_account_entry_from_disk(vacant.key());
let new_value = if let Some(disk_entry) = disk_entry {
// on disk, so merge new_value with what was on disk
Self::lock_and_update_slot_list(
&disk_entry,
new_value.into(),
reclaims,
previous_slot_entry_was_cached,
);
disk_entry
} else {
// not on disk, so insert new thing
self.stats().insert_or_delete(true, self.bin);
new_value.into_account_map_entry(&self.storage)
};
assert!(new_value.dirty());
vacant.insert(new_value);
self.stats().insert_or_delete_mem(true, self.bin);
}
}
}

Expand Down Expand Up @@ -490,12 +510,11 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
let mut map = self.map().write().unwrap();
let entry = map.entry(pubkey);
m.stop();
let mut new_entry_zero_lamports = false;
let new_entry_zero_lamports = new_entry.is_zero_lamport();
let (found_in_mem, already_existed) = match entry {
Entry::Occupied(occupied) => {
// in cache, so merge into cache
let (slot, account_info) = new_entry.into();
new_entry_zero_lamports = account_info.is_zero_lamport();
InMemAccountsIndex::lock_and_update_slot_list(
occupied.get(),
(slot, account_info),
Expand All @@ -509,41 +528,9 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
}
Entry::Vacant(vacant) => {
// not in cache, look on disk
let mut existed = false;
if let Some(disk) = self.bucket.as_ref() {
let (slot, account_info) = new_entry.into();
new_entry_zero_lamports = account_info.is_zero_lamport();
disk.update(vacant.key(), |current| {
if let Some((slot_list, mut ref_count)) = current {
// on disk, so merge and update disk
let mut slot_list = slot_list.to_vec();
let addref = Self::update_slot_list(
&mut slot_list,
slot,
account_info,
&mut Vec::default(),
false,
);
if addref {
ref_count += 1
};
existed = true;
Some((slot_list, ref_count))
} else {
// doesn't exist on disk yet, so insert it
let ref_count = if account_info.is_cached() { 0 } else { 1 };
Some((vec![(slot, account_info)], ref_count))
}
});
} else {
// not using disk, so insert into mem
self.stats().insert_or_delete_mem(true, self.bin);
let new_entry: AccountMapEntry<T> =
new_entry.into_account_map_entry(&self.storage);
assert!(new_entry.dirty());
vacant.insert(new_entry);
}
(false, existed)
let already_existed =
self.upsert_on_disk(vacant, new_entry, &mut Vec::default(), false);
(false, already_existed)
}
};
drop(map);
Expand All @@ -563,6 +550,51 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
}
}

/// return tuple:
/// true if item already existed in the index
fn upsert_on_disk(
&self,
vacant: VacantEntry<K, AccountMapEntry<T>>,
new_entry: PreAllocatedAccountMapEntry<T>,
reclaims: &mut SlotList<T>,
previous_slot_entry_was_cached: bool,
) -> bool {
if let Some(disk) = self.bucket.as_ref() {
let mut existed = false;
let (slot, account_info) = new_entry.into();
disk.update(vacant.key(), |current| {
if let Some((slot_list, mut ref_count)) = current {
// on disk, so merge and update disk
let mut slot_list = slot_list.to_vec();
let addref = Self::update_slot_list(
&mut slot_list,
slot,
account_info,
reclaims,
previous_slot_entry_was_cached,
);
if addref {
ref_count += 1
};
existed = true; // found on disk, so it did exist
Some((slot_list, ref_count))
} else {
// doesn't exist on disk yet, so insert it
let ref_count = if account_info.is_cached() { 0 } else { 1 };
Some((vec![(slot, account_info)], ref_count))
}
});
existed
} else {
// not using disk, so insert into mem
self.stats().insert_or_delete_mem(true, self.bin);
let new_entry: AccountMapEntry<T> = new_entry.into_account_map_entry(&self.storage);
assert!(new_entry.dirty());
vacant.insert(new_entry);
false // not using disk, not in mem, so did not exist
}
}

pub fn just_set_hold_range_in_memory<R>(&self, range: &R, start_holding: bool)
where
R: RangeBounds<Pubkey>,
Expand Down Expand Up @@ -643,8 +675,8 @@ impl<T: IndexValue> InMemAccountsIndex<T> {

// load from disk
if let Some(disk) = self.bucket.as_ref() {
let items = disk.items_in_range(range);
let mut map = self.map().write().unwrap();
let items = disk.items_in_range(range); // map's lock has to be held while we are getting items from disk
let future_age = self.storage.future_age_to_flush();
for item in items {
let entry = map.entry(item.pubkey);
Expand Down

0 comments on commit 7ca5365

Please sign in to comment.