Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

parallel get_snapshot_storages (backport #17589) #17679

Merged
merged 1 commit into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 91 additions & 30 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4245,7 +4245,7 @@ impl AccountsDb {
) -> Result<(Hash, u64), BankHashVerificationError> {
if !use_index {
let mut time = Measure::start("collect");
let combined_maps = self.get_snapshot_storages(slot);
let combined_maps = self.get_snapshot_storages(slot, Some(ancestors)).0;
time.stop();

let timings = HashStats {
Expand Down Expand Up @@ -5133,26 +5133,78 @@ impl AccountsDb {
}
}

pub fn get_snapshot_storages(&self, snapshot_slot: Slot) -> SnapshotStorages {
self.storage
pub fn get_snapshot_storages(
&self,
snapshot_slot: Slot,
_ancestors: Option<&Ancestors>,
) -> (SnapshotStorages, Vec<Slot>) {
let mut m = Measure::start("get slots");
let slots = self
.storage
.0
.iter()
.filter(|iter_item| {
let slot = *iter_item.key();
slot <= snapshot_slot && self.accounts_index.is_root(slot)
})
.map(|iter_item| {
iter_item
.value()
.read()
.unwrap()
.values()
.filter(|x| x.has_accounts())
.cloned()
.collect()
.map(|k| *k.key() as Slot)
.collect::<Vec<_>>();
m.stop();
let mut m2 = Measure::start("filter");

let chunk_size = 5_000;
let wide = self.thread_pool_clean.install(|| {
slots
.par_chunks(chunk_size)
.map(|slots| {
slots
.iter()
.filter_map(|slot| {
if *slot <= snapshot_slot && self.accounts_index.is_root(*slot) {
self.storage.0.get(&slot).map_or_else(
|| None,
|item| {
let storages = item
.value()
.read()
.unwrap()
.values()
.filter(|x| x.has_accounts())
.cloned()
.collect::<Vec<_>>();
if !storages.is_empty() {
Some((storages, *slot))
} else {
None
}
},
)
} else {
None
}
})
.collect::<Vec<(SnapshotStorage, Slot)>>()
})
.collect::<Vec<_>>()
});
m2.stop();
let mut m3 = Measure::start("flatten");
// some slots we found above may not have been a root or met the slot # constraint.
// So the resulting 'slots' vector we return will be a subset of the raw keys we got initially.
let mut slots = Vec::with_capacity(slots.len());
let result = wide
.into_iter()
.flatten()
.map(|(storage, slot)| {
slots.push(slot);
storage
})
.filter(|snapshot_storage: &SnapshotStorage| !snapshot_storage.is_empty())
.collect()
.collect::<Vec<_>>();
m3.stop();

debug!(
"hash_total: get slots: {}, filter: {}, flatten: {}",
m.as_us(),
m2.as_us(),
m3.as_us()
);
(result, slots)
}

#[allow(clippy::needless_collect)]
Expand Down Expand Up @@ -5735,7 +5787,16 @@ pub mod tests {
);
accounts.add_root(SLOT);

let storages = accounts.get_snapshot_storages(SLOT);
let (storages, slots) = accounts.get_snapshot_storages(SLOT, None);
assert_eq!(storages.len(), slots.len());
storages
.iter()
.zip(slots.iter())
.for_each(|(storages, slot)| {
for storage in storages {
assert_eq!(&storage.slot(), slot);
}
});
(storages, raw_expected)
}

Expand Down Expand Up @@ -8126,7 +8187,7 @@ pub mod tests {
#[test]
fn test_get_snapshot_storages_empty() {
let db = AccountsDb::new(Vec::new(), &ClusterType::Development);
assert!(db.get_snapshot_storages(0).is_empty());
assert!(db.get_snapshot_storages(0, None).0.is_empty());
}

#[test]
Expand All @@ -8141,10 +8202,10 @@ pub mod tests {

db.add_root(base_slot);
db.store_uncached(base_slot, &[(&key, &account)]);
assert!(db.get_snapshot_storages(before_slot).is_empty());
assert!(db.get_snapshot_storages(before_slot, None).0.is_empty());

assert_eq!(1, db.get_snapshot_storages(base_slot).len());
assert_eq!(1, db.get_snapshot_storages(after_slot).len());
assert_eq!(1, db.get_snapshot_storages(base_slot, None).0.len());
assert_eq!(1, db.get_snapshot_storages(after_slot, None).0.len());
}

#[test]
Expand All @@ -8164,10 +8225,10 @@ pub mod tests {
.unwrap()
.clear();
db.add_root(base_slot);
assert!(db.get_snapshot_storages(after_slot).is_empty());
assert!(db.get_snapshot_storages(after_slot, None).0.is_empty());

db.store_uncached(base_slot, &[(&key, &account)]);
assert_eq!(1, db.get_snapshot_storages(after_slot).len());
assert_eq!(1, db.get_snapshot_storages(after_slot, None).0.len());
}

#[test]
Expand All @@ -8180,10 +8241,10 @@ pub mod tests {
let after_slot = base_slot + 1;

db.store_uncached(base_slot, &[(&key, &account)]);
assert!(db.get_snapshot_storages(after_slot).is_empty());
assert!(db.get_snapshot_storages(after_slot, None).0.is_empty());

db.add_root(base_slot);
assert_eq!(1, db.get_snapshot_storages(after_slot).len());
assert_eq!(1, db.get_snapshot_storages(after_slot, None).0.len());
}

#[test]
Expand All @@ -8197,7 +8258,7 @@ pub mod tests {

db.store_uncached(base_slot, &[(&key, &account)]);
db.add_root(base_slot);
assert_eq!(1, db.get_snapshot_storages(after_slot).len());
assert_eq!(1, db.get_snapshot_storages(after_slot, None).0.len());

db.storage
.get_slot_stores(0)
Expand All @@ -8208,7 +8269,7 @@ pub mod tests {
.next()
.unwrap()
.remove_account(0, true);
assert!(db.get_snapshot_storages(after_slot).is_empty());
assert!(db.get_snapshot_storages(after_slot, None).0.is_empty());
}

#[test]
Expand Down Expand Up @@ -8372,7 +8433,7 @@ pub mod tests {
accounts.store_uncached(current_slot, &[(&pubkey2, &zero_lamport_account)]);
accounts.store_uncached(current_slot, &[(&pubkey3, &zero_lamport_account)]);

let snapshot_stores = accounts.get_snapshot_storages(current_slot);
let snapshot_stores = accounts.get_snapshot_storages(current_slot, None).0;
let total_accounts: usize = snapshot_stores
.iter()
.flatten()
Expand Down
5 changes: 4 additions & 1 deletion runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,10 @@ impl BankRc {
}

pub fn get_snapshot_storages(&self, slot: Slot) -> SnapshotStorages {
self.accounts.accounts_db.get_snapshot_storages(slot)
self.accounts
.accounts_db
.get_snapshot_storages(slot, None)
.0
}
}

Expand Down
13 changes: 9 additions & 4 deletions runtime/src/serde_snapshot/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ fn copy_append_vecs<P: AsRef<Path>>(
accounts_db: &AccountsDb,
output_dir: P,
) -> std::io::Result<UnpackedAppendVecMap> {
let storage_entries = accounts_db.get_snapshot_storages(Slot::max_value());
let storage_entries = accounts_db.get_snapshot_storages(Slot::max_value(), None).0;
let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
for storage in storage_entries.iter().flatten() {
let storage_path = storage.get_path();
Expand Down Expand Up @@ -142,7 +142,7 @@ fn test_accounts_serialize_style(serde_style: SerdeStyle) {
&mut writer,
&*accounts.accounts_db,
0,
&accounts.accounts_db.get_snapshot_storages(0),
&accounts.accounts_db.get_snapshot_storages(0, None).0,
)
.unwrap();

Expand Down Expand Up @@ -243,7 +243,7 @@ pub(crate) fn reconstruct_accounts_db_via_serialization(
slot: Slot,
) -> AccountsDb {
let mut writer = Cursor::new(vec![]);
let snapshot_storages = accounts.get_snapshot_storages(slot);
let snapshot_storages = accounts.get_snapshot_storages(slot, None).0;
accountsdb_to_stream(
SerdeStyle::Newer,
&mut writer,
Expand Down Expand Up @@ -301,7 +301,12 @@ mod test_bank_serialize {
where
S: serde::Serializer,
{
let snapshot_storages = bank.rc.accounts.accounts_db.get_snapshot_storages(0);
let snapshot_storages = bank
.rc
.accounts
.accounts_db
.get_snapshot_storages(0, None)
.0;
// ensure there is a single snapshot storage example for ABI digesting
assert_eq!(snapshot_storages.len(), 1);

Expand Down