Skip to content

Commit

Permalink
Uses fold+reduce for handling duplicate pubkeys during index generati…
Browse files Browse the repository at this point in the history
…on (#34011)
  • Loading branch information
brooksprumo authored Nov 10, 2023
1 parent 69ab8a8 commit 3c71f85
Showing 1 changed file with 64 additions and 31 deletions.
95 changes: 64 additions & 31 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9397,51 +9397,84 @@ impl AccountsDb {
..GenerateIndexTimings::default()
};

// subtract data.len() from accounts_data_len for all old accounts that are in the index twice
let mut accounts_data_len_dedup_timer =
Measure::start("handle accounts data len duplicates");
let uncleaned_roots = Mutex::new(IntSet::default());
if pass == 0 {
let accounts_data_len_from_duplicates = unique_pubkeys_by_bin
#[derive(Debug, Default)]
struct DuplicatePubkeysVisitedInfo {
accounts_data_len_from_duplicates: u64,
uncleaned_roots: IntSet<Slot>,
}
impl DuplicatePubkeysVisitedInfo {
fn reduce(mut a: Self, mut b: Self) -> Self {
if a.uncleaned_roots.len() >= b.uncleaned_roots.len() {
a.merge(b);
a
} else {
b.merge(a);
b
}
}
fn merge(&mut self, other: Self) {
self.accounts_data_len_from_duplicates +=
other.accounts_data_len_from_duplicates;
self.uncleaned_roots.extend(other.uncleaned_roots);
}
}

// subtract data.len() from accounts_data_len for all old accounts that are in the index twice
let mut accounts_data_len_dedup_timer =
Measure::start("handle accounts data len duplicates");
let DuplicatePubkeysVisitedInfo {
accounts_data_len_from_duplicates,
uncleaned_roots,
} = unique_pubkeys_by_bin
.par_iter()
.map(|unique_keys| {
unique_keys
.par_chunks(4096)
.map(|pubkeys| {
let (count, uncleaned_roots_this_group) = self
.visit_duplicate_pubkeys_during_startup(
pubkeys,
&rent_collector,
&timings,
);
let mut uncleaned_roots = uncleaned_roots.lock().unwrap();
uncleaned_roots_this_group.into_iter().for_each(|slot| {
uncleaned_roots.insert(slot);
});
count
})
.sum::<u64>()
})
.sum();
.fold(
DuplicatePubkeysVisitedInfo::default,
|accum, pubkeys_by_bin| {
let intermediate = pubkeys_by_bin
.par_chunks(4096)
.fold(DuplicatePubkeysVisitedInfo::default, |accum, pubkeys| {
let (accounts_data_len_from_duplicates, uncleaned_roots) = self
.visit_duplicate_pubkeys_during_startup(
pubkeys,
&rent_collector,
&timings,
);
let intermediate = DuplicatePubkeysVisitedInfo {
accounts_data_len_from_duplicates,
uncleaned_roots,
};
DuplicatePubkeysVisitedInfo::reduce(accum, intermediate)
})
.reduce(
DuplicatePubkeysVisitedInfo::default,
DuplicatePubkeysVisitedInfo::reduce,
);
DuplicatePubkeysVisitedInfo::reduce(accum, intermediate)
},
)
.reduce(
DuplicatePubkeysVisitedInfo::default,
DuplicatePubkeysVisitedInfo::reduce,
);
accounts_data_len_dedup_timer.stop();
timings.accounts_data_len_dedup_time_us = accounts_data_len_dedup_timer.as_us();
timings.slots_to_clean = uncleaned_roots.len() as u64;

self.accounts_index
.add_uncleaned_roots(uncleaned_roots.into_iter());
accounts_data_len.fetch_sub(accounts_data_len_from_duplicates, Ordering::Relaxed);
info!(
"accounts data len: {}",
accounts_data_len.load(Ordering::Relaxed)
);
}
accounts_data_len_dedup_timer.stop();
timings.accounts_data_len_dedup_time_us = accounts_data_len_dedup_timer.as_us();

let uncleaned_roots = uncleaned_roots.into_inner().unwrap();
timings.slots_to_clean = uncleaned_roots.len() as u64;

if pass == 0 {
// Need to add these last, otherwise older updates will be cleaned
for root in &slots {
self.accounts_index.add_root(*root);
}
self.accounts_index
.add_uncleaned_roots(uncleaned_roots.into_iter());

self.set_storage_count_and_alive_bytes(storage_info, &mut timings);
}
Expand Down

0 comments on commit 3c71f85

Please sign in to comment.