Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: clean up busy waiting in prefetcher blocking get #8215

Merged
merged 14 commits into from
Dec 22, 2022
1 change: 1 addition & 0 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub mod metadata;
mod metrics;
pub mod migrations;
mod opener;
mod sync_utils;
pub mod test_utils;
mod trie;

Expand Down
72 changes: 72 additions & 0 deletions core/store/src/sync_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::ops::{Deref, DerefMut};
use std::sync::{Condvar, Mutex, MutexGuard};

const POISONED_LOCK_ERR: &str = "The lock was poisoned.";

/// A convenience wrapper around a SharedMutex and a Condvar.
///
/// It enables blocking while waiting for the underlying value to be updated.
/// The implementation ensures that any modification results in all blocked
/// threads being notified.
pub(crate) struct Monitor<T> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not saying that rolling out our own implementation is problematic but curious why we didn't use something off the shelf e.g. https://github.com/reem/rust-shared-mutex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've considered considered using shared-mutex and actually my Monitor implementation is inspired by it.
Unfortunately it doesn't provide the API we need here, in particular we want SharedMutexWriteGuard to notify condvar when dropped (see this comment).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks for the explanation.

cvar: Condvar,
mutex: Mutex<T>,
}

pub(crate) struct MonitorReadGuard<'a, T> {
guard: MutexGuard<'a, T>,
}

pub(crate) struct MonitorWriteGuard<'a, T> {
guard: MutexGuard<'a, T>,
cvar: &'a Condvar,
}

impl<T> Monitor<T> {
pub fn new(t: T) -> Self {
Self { mutex: Mutex::new(t), cvar: Condvar::new() }
}

pub fn lock(&self) -> MonitorReadGuard<'_, T> {
let guard = self.mutex.lock().expect(POISONED_LOCK_ERR);
MonitorReadGuard { guard }
}

pub fn lock_mut(&self) -> MonitorWriteGuard<'_, T> {
let guard = self.mutex.lock().expect(POISONED_LOCK_ERR);
MonitorWriteGuard { guard, cvar: &self.cvar }
}

pub fn wait<'a>(&'a self, guard: MonitorReadGuard<'a, T>) -> MonitorReadGuard<'a, T> {
let guard = self.cvar.wait(guard.guard).expect(POISONED_LOCK_ERR);
MonitorReadGuard { guard }
}
}

impl<T> Deref for MonitorReadGuard<'_, T> {
type Target = T;

fn deref(&self) -> &Self::Target {
self.guard.deref()
}
}

impl<T> Deref for MonitorWriteGuard<'_, T> {
type Target = T;

fn deref(&self) -> &Self::Target {
self.guard.deref()
}
}

impl<T> DerefMut for MonitorWriteGuard<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.guard.deref_mut()
}
}

impl<T> Drop for MonitorWriteGuard<'_, T> {
fn drop(&mut self) {
self.cvar.notify_all();
}
}
59 changes: 44 additions & 15 deletions core/store/src/trie/prefetching_trie_storage.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::sync_utils::Monitor;
use crate::trie::POISONED_LOCK_ERR;
use crate::{
metrics, DBCol, StorageError, Store, Trie, TrieCache, TrieCachingStorage, TrieConfig,
Expand All @@ -12,7 +13,7 @@ use near_primitives::shard_layout::ShardUId;
use near_primitives::trie_key::TrieKey;
use near_primitives::types::{AccountId, ShardId, StateRoot, TrieNodesCount};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::thread;

const MAX_QUEUED_WORK_ITEMS: usize = 16 * 1024;
Expand Down Expand Up @@ -107,7 +108,7 @@ pub enum PrefetchError {
/// without the prefetcher, because the order in which it sees accesses is
/// independent of the prefetcher.
#[derive(Clone)]
pub(crate) struct PrefetchStagingArea(Arc<Mutex<InnerPrefetchStagingArea>>);
pub(crate) struct PrefetchStagingArea(Arc<Monitor<InnerPrefetchStagingArea>>);

struct InnerPrefetchStagingArea {
slots: SizeTrackedHashMap,
Expand Down Expand Up @@ -311,7 +312,7 @@ impl TriePrefetchingStorage {
impl PrefetchStagingArea {
fn new(shard_id: ShardId) -> Self {
let inner = InnerPrefetchStagingArea { slots: SizeTrackedHashMap::new(shard_id) };
Self(Arc::new(Mutex::new(inner)))
Self(Arc::new(Monitor::new(inner)))
}

/// Release a slot in the prefetcher staging area.
Expand All @@ -322,7 +323,7 @@ impl PrefetchStagingArea {
/// 2: IO thread misses in the shard cache on the same key and starts fetching it again.
/// 3: Main thread value is inserted in shard cache.
pub(crate) fn release(&self, key: &CryptoHash) {
let mut guard = self.lock();
let mut guard = self.0.lock_mut();
let dropped = guard.slots.remove(key);
// `Done` is the result after a successful prefetch.
// `PendingFetch` means the value has been read without a prefetch.
Expand All @@ -345,13 +346,14 @@ impl PrefetchStagingArea {
/// same data and thus are waiting on each other rather than the DB.
/// Of course, that would require prefetching to be moved into an async environment,
pub(crate) fn blocking_get(&self, key: CryptoHash) -> Option<Arc<[u8]>> {
let mut guard = self.0.lock();
loop {
match self.lock().slots.get(&key) {
match guard.slots.get(&key) {
Some(PrefetchSlot::Done(value)) => return Some(value.clone()),
Some(_) => (),
None => return None,
}
thread::sleep(std::time::Duration::from_micros(1));
guard = self.0.wait(guard);
}
}

Expand All @@ -362,7 +364,7 @@ impl PrefetchStagingArea {
}

fn insert_fetched(&self, key: CryptoHash, value: Arc<[u8]>) {
self.lock().slots.insert(key, PrefetchSlot::Done(value));
self.0.lock_mut().slots.insert(key, PrefetchSlot::Done(value));
}

/// Get prefetched value if available and otherwise atomically insert the
Expand All @@ -372,7 +374,7 @@ impl PrefetchStagingArea {
key: CryptoHash,
set_if_empty: PrefetchSlot,
) -> PrefetcherResult {
let mut guard = self.lock();
let mut guard = self.0.lock_mut();
let full =
guard.slots.size_bytes > MAX_PREFETCH_STAGING_MEMORY - PREFETCH_RESERVED_BYTES_PER_SLOT;
match guard.slots.map.get(&key) {
Expand All @@ -393,12 +395,7 @@ impl PrefetchStagingArea {
}

fn clear(&self) {
self.lock().slots.clear();
}

#[track_caller]
fn lock(&self) -> std::sync::MutexGuard<InnerPrefetchStagingArea> {
self.0.lock().expect(POISONED_LOCK_ERR)
self.0.lock_mut().slots.clear();
}
}

Expand Down Expand Up @@ -551,14 +548,15 @@ impl Drop for PrefetchingThreadsHandle {
/// a minimal set of functions is required to check the inner
/// state of the prefetcher.
#[cfg(feature = "test_features")]
mod tests {
mod tests_utils {
use super::{PrefetchApi, PrefetchSlot};
use crate::TrieCachingStorage;

impl PrefetchApi {
/// Returns the number of prefetched values currently staged.
pub fn num_prefetched_and_staged(&self) -> usize {
self.prefetching
.0
.lock()
.slots
.map
Expand All @@ -581,3 +579,34 @@ mod tests {
}
}
}

#[cfg(test)]
mod tests {
use super::{PrefetchStagingArea, PrefetcherResult};
use near_primitives::hash::CryptoHash;

#[test]
fn test_prefetch_staging_area_blocking_get_after_update() {
let key = CryptoHash::hash_bytes(&[1, 2, 3]);
let value: std::sync::Arc<[u8]> = vec![4, 5, 6].into();
let prefetch_staging_area = PrefetchStagingArea::new(0);
assert!(matches!(
prefetch_staging_area.get_or_set_fetching(key),
PrefetcherResult::SlotReserved
));
let prefetch_staging_area2 = prefetch_staging_area.clone();
let value2 = value.clone();
// We need to execute `blocking_get` before `insert_fetched` so that
// it blocks while waiting for the value to be updated. Spawning
// a new thread + yielding should give enough time for the main
// thread to make progress. Please note that even if `insert_fetched`
// is executed before `blocking_get`, that still wouldn't result in
// any flakiness since the test would still pass, it just won't verify
// the the synchronization part of `blocking_get`.
std::thread::spawn(move || {
std::thread::yield_now();
prefetch_staging_area2.insert_fetched(key, value2);
});
assert_eq!(prefetch_staging_area.blocking_get(key), Some(value));
}
}