Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: clean up busy waiting in prefetcher blocking get #8215

Merged
merged 14 commits into from
Dec 22, 2022
89 changes: 69 additions & 20 deletions core/store/src/trie/prefetching_trie_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use near_primitives::shard_layout::ShardUId;
use near_primitives::trie_key::TrieKey;
use near_primitives::types::{AccountId, ShardId, StateRoot, TrieNodesCount};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Condvar, Mutex, MutexGuard};
use std::thread;

const MAX_QUEUED_WORK_ITEMS: usize = 16 * 1024;
Expand Down Expand Up @@ -107,10 +107,11 @@ pub enum PrefetchError {
/// without the prefetcher, because the order in which it sees accesses is
/// independent of the prefetcher.
#[derive(Clone)]
pub(crate) struct PrefetchStagingArea(Arc<Mutex<InnerPrefetchStagingArea>>);
pub(crate) struct PrefetchStagingArea(Arc<InnerPrefetchStagingArea>);

struct InnerPrefetchStagingArea {
slots: SizeTrackedHashMap,
slots: Mutex<SizeTrackedHashMap>,
slots_update_cvar: Condvar,
}

/// Result when atomically accessing the prefetch staging area.
Expand Down Expand Up @@ -310,8 +311,11 @@ impl TriePrefetchingStorage {

impl PrefetchStagingArea {
fn new(shard_id: ShardId) -> Self {
let inner = InnerPrefetchStagingArea { slots: SizeTrackedHashMap::new(shard_id) };
Self(Arc::new(Mutex::new(inner)))
let inner = InnerPrefetchStagingArea {
slots: Mutex::new(SizeTrackedHashMap::new(shard_id)),
slots_update_cvar: Condvar::new(),
};
Self(Arc::new(inner))
}

/// Release a slot in the prefetcher staging area.
Expand All @@ -322,8 +326,8 @@ impl PrefetchStagingArea {
/// 2: IO thread misses in the shard cache on the same key and starts fetching it again.
/// 3: Main thread value is inserted in shard cache.
pub(crate) fn release(&self, key: &CryptoHash) {
let mut guard = self.lock();
let dropped = guard.slots.remove(key);
let mut guard = self.lock_slots();
let dropped = guard.remove(key);
// `Done` is the result after a successful prefetch.
// `PendingFetch` means the value has been read without a prefetch.
// `None` means prefetching was stopped due to memory limits.
Expand All @@ -335,6 +339,9 @@ impl PrefetchStagingArea {
)
|| prefetch_state_matches(PrefetchSlot::PendingFetch, dropped.as_ref().unwrap()),
);
if dropped.is_some() {
self.notify_slots_update();
}
}

/// Block until value is prefetched and then return it.
Expand All @@ -345,13 +352,14 @@ impl PrefetchStagingArea {
/// same data and thus are waiting on each other rather than the DB.
/// Of course, that would require prefetching to be moved into an async environment,
pub(crate) fn blocking_get(&self, key: CryptoHash) -> Option<Arc<[u8]>> {
let mut guard = self.lock_slots();
loop {
match self.lock().slots.get(&key) {
match guard.get(&key) {
Some(PrefetchSlot::Done(value)) => return Some(value.clone()),
Some(_) => (),
None => return None,
}
thread::sleep(std::time::Duration::from_micros(1));
guard = self.0.slots_update_cvar.wait(guard).expect(POISONED_LOCK_ERR)
}
}

Expand All @@ -362,7 +370,9 @@ impl PrefetchStagingArea {
}

fn insert_fetched(&self, key: CryptoHash, value: Arc<[u8]>) {
self.lock().slots.insert(key, PrefetchSlot::Done(value));
let mut guard = self.lock_slots();
guard.insert(key, PrefetchSlot::Done(value));
self.notify_slots_update();
}

/// Get prefetched value if available and otherwise atomically insert the
Expand All @@ -372,10 +382,10 @@ impl PrefetchStagingArea {
key: CryptoHash,
set_if_empty: PrefetchSlot,
) -> PrefetcherResult {
let mut guard = self.lock();
let mut guard = self.lock_slots();
let full =
guard.slots.size_bytes > MAX_PREFETCH_STAGING_MEMORY - PREFETCH_RESERVED_BYTES_PER_SLOT;
match guard.slots.map.get(&key) {
guard.size_bytes > MAX_PREFETCH_STAGING_MEMORY - PREFETCH_RESERVED_BYTES_PER_SLOT;
match guard.map.get(&key) {
Some(value) => match value {
PrefetchSlot::Done(value) => PrefetcherResult::Prefetched(value.clone()),
PrefetchSlot::PendingPrefetch | PrefetchSlot::PendingFetch => {
Expand All @@ -386,19 +396,26 @@ impl PrefetchStagingArea {
if full {
return PrefetcherResult::MemoryLimitReached;
}
guard.slots.insert(key, set_if_empty);
guard.insert(key, set_if_empty);
self.notify_slots_update();
PrefetcherResult::SlotReserved
}
}
}

fn clear(&self) {
self.lock().slots.clear();
let mut guard = self.lock_slots();
guard.clear();
self.notify_slots_update();
}

fn notify_slots_update(&self) {
self.0.slots_update_cvar.notify_all();
}

#[track_caller]
fn lock(&self) -> std::sync::MutexGuard<InnerPrefetchStagingArea> {
self.0.lock().expect(POISONED_LOCK_ERR)
fn lock_slots(&self) -> MutexGuard<SizeTrackedHashMap> {
self.0.slots.lock().expect(POISONED_LOCK_ERR)
}
}

Expand Down Expand Up @@ -551,16 +568,15 @@ impl Drop for PrefetchingThreadsHandle {
/// a minimal set of functions is required to check the inner
/// state of the prefetcher.
#[cfg(feature = "test_features")]
mod tests {
mod tests_utils {
use super::{PrefetchApi, PrefetchSlot};
use crate::TrieCachingStorage;

impl PrefetchApi {
/// Returns the number of prefetched values currently staged.
pub fn num_prefetched_and_staged(&self) -> usize {
self.prefetching
.lock()
.slots
.lock_slots()
.map
.iter()
.filter(|(_key, slot)| match slot {
Expand All @@ -581,3 +597,36 @@ mod tests {
}
}
}

#[cfg(test)]
mod tests {
use super::{PrefetchStagingArea, PrefetcherResult};
use near_primitives::hash::CryptoHash;
use std::time::{Duration, Instant};

#[test]
fn test_prefetch_staging_area_blocking_get_after_update() {
let key = CryptoHash::hash_bytes(&[1, 2, 3]);
let value: std::sync::Arc<[u8]> = vec![4, 5, 6].into();
let prefetch_staging_area = PrefetchStagingArea::new(0);
assert!(matches!(
prefetch_staging_area.get_or_set_fetching(key),
PrefetcherResult::SlotReserved
));
let prefetch_staging_area_get = prefetch_staging_area.clone();
let handle = std::thread::spawn(move || prefetch_staging_area_get.blocking_get(key));
std::thread::yield_now();
assert!(!handle.is_finished());
prefetch_staging_area.insert_fetched(key, value.clone());

let wait_start = Instant::now();
while !handle.is_finished() {
std::thread::yield_now();
assert!(
wait_start.elapsed() < Duration::from_millis(100),
"timeout while waiting for blocking_get to return"
);
}
assert_eq!(handle.join().unwrap(), Some(value));
}
}