diff --git a/accounts-db/src/accounts_db.rs b/accounts-db/src/accounts_db.rs index ba4f112576ba8d..9a19b02a3865f5 100644 --- a/accounts-db/src/accounts_db.rs +++ b/accounts-db/src/accounts_db.rs @@ -511,6 +511,26 @@ struct LoadAccountsIndexForShrink<'a, T: ShrinkCollectRefs<'a>> { index_entries_being_shrunk: Vec>, } +impl<'a, T: ShrinkCollectRefs<'a>> LoadAccountsIndexForShrink<'a, T> { + fn reduce(mut a: Self, mut b: Self) -> Self { + a.alive_accounts.collect(b.alive_accounts); + a.unrefed_pubkeys.append(&mut b.unrefed_pubkeys); + a.index_entries_being_shrunk + .append(&mut b.index_entries_being_shrunk); + a.all_are_zero_lamports = a.all_are_zero_lamports && b.all_are_zero_lamports; + a + } + + fn new(capacity: usize, slot: u64) -> Self { + Self { + alive_accounts: T::with_capacity(capacity, slot), + unrefed_pubkeys: Vec::with_capacity(capacity), + all_are_zero_lamports: true, + index_entries_being_shrunk: Vec::with_capacity(capacity), + } + } +} + pub struct GetUniqueAccountsResult<'a> { pub stored_accounts: Vec>, pub capacity: u64, @@ -3980,49 +4000,33 @@ impl AccountsDb { capacity, } = unique_accounts; - let mut index_read_elapsed = Measure::start("index_read_elapsed"); - let len = stored_accounts.len(); - let alive_accounts_collect = Mutex::new(T::with_capacity(len, slot)); - let unrefed_pubkeys_collect = Mutex::new(Vec::with_capacity(len)); stats .accounts_loaded .fetch_add(len as u64, Ordering::Relaxed); - let all_are_zero_lamports_collect = Mutex::new(true); - let index_entries_being_shrunk_outer = Mutex::new(Vec::default()); - self.thread_pool_clean.install(|| { + + let mut index_read_elapsed = Measure::start("index_read_elapsed"); + let LoadAccountsIndexForShrink { + alive_accounts, + unrefed_pubkeys, + all_are_zero_lamports, + index_entries_being_shrunk, + } = self.thread_pool_clean.install(|| { stored_accounts .par_chunks(SHRINK_COLLECT_CHUNK_SIZE) - .for_each(|stored_accounts| { - let LoadAccountsIndexForShrink { - alive_accounts, - mut unrefed_pubkeys, - all_are_zero_lamports, - mut index_entries_being_shrunk, - } = self.load_accounts_index_for_shrink(stored_accounts, stats, slot); - - // collect - alive_accounts_collect - .lock() - .unwrap() - .collect(alive_accounts); - unrefed_pubkeys_collect - .lock() - .unwrap() - .append(&mut unrefed_pubkeys); - index_entries_being_shrunk_outer - .lock() - .unwrap() - .append(&mut index_entries_being_shrunk); - if !all_are_zero_lamports { - *all_are_zero_lamports_collect.lock().unwrap() = false; - } - }); + .fold( + || LoadAccountsIndexForShrink::new(0, slot), + |accum: LoadAccountsIndexForShrink<'_, T>, stored_accounts| { + let to_shrink: LoadAccountsIndexForShrink<'_, T> = + self.load_accounts_index_for_shrink(stored_accounts, stats, slot); + LoadAccountsIndexForShrink::reduce(accum, to_shrink) + }, + ) + .reduce( + || LoadAccountsIndexForShrink::new(len, slot), + LoadAccountsIndexForShrink::reduce, + ) }); - - let alive_accounts = alive_accounts_collect.into_inner().unwrap(); - let unrefed_pubkeys = unrefed_pubkeys_collect.into_inner().unwrap(); - index_read_elapsed.stop(); stats .index_read_elapsed @@ -4050,8 +4054,8 @@ impl AccountsDb { alive_accounts, alive_total_bytes, total_starting_accounts: len, - all_are_zero_lamports: all_are_zero_lamports_collect.into_inner().unwrap(), - _index_entries_being_shrunk: index_entries_being_shrunk_outer.into_inner().unwrap(), + all_are_zero_lamports, + _index_entries_being_shrunk: index_entries_being_shrunk, } }