From f55cded125dc0abda6675efbec10b3cea4b9c427 Mon Sep 17 00:00:00 2001 From: Anton Puhach Date: Tue, 13 Dec 2022 14:34:11 +0100 Subject: [PATCH 01/13] Initial implementation --- .../src/trie/prefetching_trie_storage.rs | 92 +++++++++++++++---- 1 file changed, 72 insertions(+), 20 deletions(-) diff --git a/core/store/src/trie/prefetching_trie_storage.rs b/core/store/src/trie/prefetching_trie_storage.rs index efe0fa8311a..fca47a85c87 100644 --- a/core/store/src/trie/prefetching_trie_storage.rs +++ b/core/store/src/trie/prefetching_trie_storage.rs @@ -12,7 +12,7 @@ use near_primitives::shard_layout::ShardUId; use near_primitives::trie_key::TrieKey; use near_primitives::types::{AccountId, ShardId, StateRoot, TrieNodesCount}; use std::collections::HashMap; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Condvar, Mutex, MutexGuard}; use std::thread; const MAX_QUEUED_WORK_ITEMS: usize = 16 * 1024; @@ -107,10 +107,11 @@ pub enum PrefetchError { /// without the prefetcher, because the order in which it sees accesses is /// independent of the prefetcher. #[derive(Clone)] -pub(crate) struct PrefetchStagingArea(Arc>); +pub(crate) struct PrefetchStagingArea(Arc); struct InnerPrefetchStagingArea { - slots: SizeTrackedHashMap, + slots: Mutex, + slots_updated_cvar: Condvar, } /// Result when atomically accessing the prefetch staging area. @@ -310,8 +311,11 @@ impl TriePrefetchingStorage { impl PrefetchStagingArea { fn new(shard_id: ShardId) -> Self { - let inner = InnerPrefetchStagingArea { slots: SizeTrackedHashMap::new(shard_id) }; - Self(Arc::new(Mutex::new(inner))) + let inner = InnerPrefetchStagingArea { + slots: Mutex::new(SizeTrackedHashMap::new(shard_id)), + slots_updated_cvar: Condvar::new(), + }; + Self(Arc::new(inner)) } /// Release a slot in the prefetcher staging area. @@ -322,8 +326,8 @@ impl PrefetchStagingArea { /// 2: IO thread misses in the shard cache on the same key and starts fetching it again. /// 3: Main thread value is inserted in shard cache. pub(crate) fn release(&self, key: &CryptoHash) { - let mut guard = self.lock(); - let dropped = guard.slots.remove(key); + let mut guard = self.lock_slots(); + let dropped = guard.remove(key); // `Done` is the result after a successful prefetch. // `PendingFetch` means the value has been read without a prefetch. // `None` means prefetching was stopped due to memory limits. @@ -335,6 +339,9 @@ impl PrefetchStagingArea { ) || prefetch_state_matches(PrefetchSlot::PendingFetch, dropped.as_ref().unwrap()), ); + if dropped.is_some() { + self.notify_slots_update(guard); + } } /// Block until value is prefetched and then return it. @@ -345,13 +352,14 @@ impl PrefetchStagingArea { /// same data and thus are waiting on each other rather than the DB. /// Of course, that would require prefetching to be moved into an async environment, pub(crate) fn blocking_get(&self, key: CryptoHash) -> Option> { + let mut guard = self.lock_slots(); loop { - match self.lock().slots.get(&key) { + match guard.get(&key) { Some(PrefetchSlot::Done(value)) => return Some(value.clone()), Some(_) => (), None => return None, } - thread::sleep(std::time::Duration::from_micros(1)); + guard = self.0.slots_updated_cvar.wait(guard).expect(POISONED_LOCK_ERR) } } @@ -362,7 +370,9 @@ impl PrefetchStagingArea { } fn insert_fetched(&self, key: CryptoHash, value: Arc<[u8]>) { - self.lock().slots.insert(key, PrefetchSlot::Done(value)); + let mut guard = self.lock_slots(); + guard.insert(key, PrefetchSlot::Done(value)); + self.notify_slots_update(guard); } /// Get prefetched value if available and otherwise atomically insert the @@ -372,10 +382,10 @@ impl PrefetchStagingArea { key: CryptoHash, set_if_empty: PrefetchSlot, ) -> PrefetcherResult { - let mut guard = self.lock(); + let mut guard = self.lock_slots(); let full = - guard.slots.size_bytes > MAX_PREFETCH_STAGING_MEMORY - PREFETCH_RESERVED_BYTES_PER_SLOT; - match guard.slots.map.get(&key) { + guard.size_bytes > MAX_PREFETCH_STAGING_MEMORY - PREFETCH_RESERVED_BYTES_PER_SLOT; + match guard.map.get(&key) { Some(value) => match value { PrefetchSlot::Done(value) => PrefetcherResult::Prefetched(value.clone()), PrefetchSlot::PendingPrefetch | PrefetchSlot::PendingFetch => { @@ -386,19 +396,29 @@ impl PrefetchStagingArea { if full { return PrefetcherResult::MemoryLimitReached; } - guard.slots.insert(key, set_if_empty); + guard.insert(key, set_if_empty); + self.notify_slots_update(guard); PrefetcherResult::SlotReserved } } } fn clear(&self) { - self.lock().slots.clear(); + let mut guard = self.lock_slots(); + guard.clear(); + self.notify_slots_update(guard); + } + + /// This consumes locked mutex guard to make sure to unlock it before + /// notifying condition variable. + fn notify_slots_update(&self, guard: MutexGuard) { + std::mem::drop(guard); + self.0.slots_updated_cvar.notify_all(); } #[track_caller] - fn lock(&self) -> std::sync::MutexGuard { - self.0.lock().expect(POISONED_LOCK_ERR) + fn lock_slots(&self) -> MutexGuard { + self.0.slots.lock().expect(POISONED_LOCK_ERR) } } @@ -551,7 +571,7 @@ impl Drop for PrefetchingThreadsHandle { /// a minimal set of functions is required to check the inner /// state of the prefetcher. #[cfg(feature = "test_features")] -mod tests { +mod tests_utils { use super::{PrefetchApi, PrefetchSlot}; use crate::TrieCachingStorage; @@ -559,8 +579,7 @@ mod tests { /// Returns the number of prefetched values currently staged. pub fn num_prefetched_and_staged(&self) -> usize { self.prefetching - .lock() - .slots + .lock_slots() .map .iter() .filter(|(_key, slot)| match slot { @@ -581,3 +600,36 @@ mod tests { } } } + +#[cfg(test)] +mod tests { + use super::{PrefetchStagingArea, PrefetcherResult}; + use near_primitives::hash::CryptoHash; + use std::time::{Duration, Instant}; + + #[test] + fn test_prefetch_staging_area_blocking_get_after_update() { + let key = CryptoHash::hash_bytes(&[1, 2, 3]); + let value: std::sync::Arc<[u8]> = vec![4, 5, 6].into(); + let prefetch_staging_area = PrefetchStagingArea::new(0); + assert!(matches!( + prefetch_staging_area.get_or_set_fetching(key), + PrefetcherResult::SlotReserved + )); + let prefetch_staging_area_get = prefetch_staging_area.clone(); + let handle = std::thread::spawn(move || prefetch_staging_area_get.blocking_get(key)); + std::thread::yield_now(); + assert!(!handle.is_finished()); + prefetch_staging_area.insert_fetched(key, value.clone()); + + let wait_start = Instant::now(); + while !handle.is_finished() { + std::thread::yield_now(); + assert!( + wait_start.elapsed() < Duration::from_millis(100), + "timeout while waiting for blocking_get to return" + ); + } + assert_eq!(handle.join().unwrap(), Some(value)); + } +} From 0627abd4eacc448ec9ec64da06bfad52052cb399 Mon Sep 17 00:00:00 2001 From: Anton Puhach Date: Tue, 13 Dec 2022 14:51:39 +0100 Subject: [PATCH 02/13] Minor updates --- core/store/src/trie/prefetching_trie_storage.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/store/src/trie/prefetching_trie_storage.rs b/core/store/src/trie/prefetching_trie_storage.rs index fca47a85c87..b3ece6f0f00 100644 --- a/core/store/src/trie/prefetching_trie_storage.rs +++ b/core/store/src/trie/prefetching_trie_storage.rs @@ -111,7 +111,7 @@ pub(crate) struct PrefetchStagingArea(Arc); struct InnerPrefetchStagingArea { slots: Mutex, - slots_updated_cvar: Condvar, + slots_update_cvar: Condvar, } /// Result when atomically accessing the prefetch staging area. @@ -313,7 +313,7 @@ impl PrefetchStagingArea { fn new(shard_id: ShardId) -> Self { let inner = InnerPrefetchStagingArea { slots: Mutex::new(SizeTrackedHashMap::new(shard_id)), - slots_updated_cvar: Condvar::new(), + slots_update_cvar: Condvar::new(), }; Self(Arc::new(inner)) } @@ -359,7 +359,7 @@ impl PrefetchStagingArea { Some(_) => (), None => return None, } - guard = self.0.slots_updated_cvar.wait(guard).expect(POISONED_LOCK_ERR) + guard = self.0.slots_update_cvar.wait(guard).expect(POISONED_LOCK_ERR) } } @@ -410,10 +410,10 @@ impl PrefetchStagingArea { } /// This consumes locked mutex guard to make sure to unlock it before - /// notifying condition variable. + /// notifying the condition variable. fn notify_slots_update(&self, guard: MutexGuard) { std::mem::drop(guard); - self.0.slots_updated_cvar.notify_all(); + self.0.slots_update_cvar.notify_all(); } #[track_caller] From 3b9b27a3be69444f634c7760b58a74b6edffefa6 Mon Sep 17 00:00:00 2001 From: Anton Puhach Date: Thu, 15 Dec 2022 15:35:32 +0100 Subject: [PATCH 03/13] Avoid passing mutex guard to notify_slots_update --- .../src/trie/prefetching_trie_storage.rs | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/core/store/src/trie/prefetching_trie_storage.rs b/core/store/src/trie/prefetching_trie_storage.rs index b3ece6f0f00..038ad46fd33 100644 --- a/core/store/src/trie/prefetching_trie_storage.rs +++ b/core/store/src/trie/prefetching_trie_storage.rs @@ -326,8 +326,7 @@ impl PrefetchStagingArea { /// 2: IO thread misses in the shard cache on the same key and starts fetching it again. /// 3: Main thread value is inserted in shard cache. pub(crate) fn release(&self, key: &CryptoHash) { - let mut guard = self.lock_slots(); - let dropped = guard.remove(key); + let dropped = self.lock_slots().remove(key); // `Done` is the result after a successful prefetch. // `PendingFetch` means the value has been read without a prefetch. // `None` means prefetching was stopped due to memory limits. @@ -340,7 +339,7 @@ impl PrefetchStagingArea { || prefetch_state_matches(PrefetchSlot::PendingFetch, dropped.as_ref().unwrap()), ); if dropped.is_some() { - self.notify_slots_update(guard); + self.notify_slots_update(); } } @@ -370,9 +369,8 @@ impl PrefetchStagingArea { } fn insert_fetched(&self, key: CryptoHash, value: Arc<[u8]>) { - let mut guard = self.lock_slots(); - guard.insert(key, PrefetchSlot::Done(value)); - self.notify_slots_update(guard); + self.lock_slots().insert(key, PrefetchSlot::Done(value)); + self.notify_slots_update(); } /// Get prefetched value if available and otherwise atomically insert the @@ -397,22 +395,18 @@ impl PrefetchStagingArea { return PrefetcherResult::MemoryLimitReached; } guard.insert(key, set_if_empty); - self.notify_slots_update(guard); + self.notify_slots_update(); PrefetcherResult::SlotReserved } } } fn clear(&self) { - let mut guard = self.lock_slots(); - guard.clear(); - self.notify_slots_update(guard); + self.lock_slots().clear(); + self.notify_slots_update(); } - /// This consumes locked mutex guard to make sure to unlock it before - /// notifying the condition variable. - fn notify_slots_update(&self, guard: MutexGuard) { - std::mem::drop(guard); + fn notify_slots_update(&self) { self.0.slots_update_cvar.notify_all(); } From 446b32eeaac1f4450cb605b5139680cf4c099ef8 Mon Sep 17 00:00:00 2001 From: Anton Puhach Date: Thu, 15 Dec 2022 15:38:31 +0100 Subject: [PATCH 04/13] Use explicit guard variable --- core/store/src/trie/prefetching_trie_storage.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/store/src/trie/prefetching_trie_storage.rs b/core/store/src/trie/prefetching_trie_storage.rs index 038ad46fd33..8d91b60e4dc 100644 --- a/core/store/src/trie/prefetching_trie_storage.rs +++ b/core/store/src/trie/prefetching_trie_storage.rs @@ -326,7 +326,8 @@ impl PrefetchStagingArea { /// 2: IO thread misses in the shard cache on the same key and starts fetching it again. /// 3: Main thread value is inserted in shard cache. pub(crate) fn release(&self, key: &CryptoHash) { - let dropped = self.lock_slots().remove(key); + let mut guard = self.lock_slots(); + let dropped = guard.remove(key); // `Done` is the result after a successful prefetch. // `PendingFetch` means the value has been read without a prefetch. // `None` means prefetching was stopped due to memory limits. @@ -369,7 +370,8 @@ impl PrefetchStagingArea { } fn insert_fetched(&self, key: CryptoHash, value: Arc<[u8]>) { - self.lock_slots().insert(key, PrefetchSlot::Done(value)); + let mut guard = self.lock_slots(); + guard.insert(key, PrefetchSlot::Done(value)); self.notify_slots_update(); } @@ -402,7 +404,8 @@ impl PrefetchStagingArea { } fn clear(&self) { - self.lock_slots().clear(); + let mut guard = self.lock_slots(); + guard.clear(); self.notify_slots_update(); } From cebbd82559664e14b751ee5d8dbc9e756688110a Mon Sep 17 00:00:00 2001 From: Anton Puhach Date: Mon, 19 Dec 2022 13:01:19 +0100 Subject: [PATCH 05/13] Introduce Monitor --- core/store/src/lib.rs | 1 + core/store/src/sync_utils.rs | 72 +++++++++++++++++++ .../src/trie/prefetching_trie_storage.rs | 72 ++++++++----------- 3 files changed, 104 insertions(+), 41 deletions(-) create mode 100644 core/store/src/sync_utils.rs diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index d9ba0837f7d..7a2436591da 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -51,6 +51,7 @@ pub mod migrations; mod opener; pub mod test_utils; mod trie; +mod sync_utils; pub use crate::config::{Mode, StoreConfig}; pub use crate::opener::{StoreMigrator, StoreOpener, StoreOpenerError}; diff --git a/core/store/src/sync_utils.rs b/core/store/src/sync_utils.rs new file mode 100644 index 00000000000..31401c2ca6f --- /dev/null +++ b/core/store/src/sync_utils.rs @@ -0,0 +1,72 @@ +use std::ops::{Deref, DerefMut}; +use std::sync::{Condvar, Mutex, MutexGuard}; + +const POISONED_LOCK_ERR: &str = "The lock was poisoned."; + +/// A convenience wrapper around a SharedMutex and a Condvar. +/// +/// It enables blocking while waiting for the underlying value to be updated. +/// The implementation ensures that any modification results in all blocked +/// threads being notified. +pub(crate) struct Monitor { + cvar: Condvar, + mutex: Mutex, +} + +pub(crate) struct MonitorReadGuard<'a, T> { + guard: MutexGuard<'a, T> +} + +pub(crate) struct MonitorWriteGuard<'a, T> { + guard: MutexGuard<'a, T>, + cvar: &'a Condvar, +} + +impl Monitor { + pub fn new(t: T) -> Self { + Self { mutex: Mutex::new(t), cvar: Condvar::new() } + } + + pub fn lock(&self) -> MonitorReadGuard<'_, T> { + let guard = self.mutex.lock().expect(POISONED_LOCK_ERR); + MonitorReadGuard { guard } + } + + pub fn lock_mut(&self) -> MonitorWriteGuard<'_, T> { + let guard = self.mutex.lock().expect(POISONED_LOCK_ERR); + MonitorWriteGuard { guard, cvar: &self.cvar } + } + + pub fn wait<'a>(&'a self, guard: MonitorReadGuard<'a, T>) -> MonitorReadGuard<'a, T> { + let guard = self.cvar.wait(guard.guard).expect(POISONED_LOCK_ERR); + MonitorReadGuard{ guard } + } +} + +impl Deref for MonitorReadGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + self.guard.deref() + } +} + +impl Deref for MonitorWriteGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + self.guard.deref() + } +} + +impl DerefMut for MonitorWriteGuard<'_, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.guard.deref_mut() + } +} + +impl Drop for MonitorWriteGuard<'_, T> { + fn drop(&mut self) { + self.cvar.notify_all(); + } +} diff --git a/core/store/src/trie/prefetching_trie_storage.rs b/core/store/src/trie/prefetching_trie_storage.rs index 8d91b60e4dc..6bb0b213c41 100644 --- a/core/store/src/trie/prefetching_trie_storage.rs +++ b/core/store/src/trie/prefetching_trie_storage.rs @@ -1,3 +1,4 @@ +use crate::sync_utils::Monitor; use crate::trie::POISONED_LOCK_ERR; use crate::{ metrics, DBCol, StorageError, Store, Trie, TrieCache, TrieCachingStorage, TrieConfig, @@ -12,7 +13,7 @@ use near_primitives::shard_layout::ShardUId; use near_primitives::trie_key::TrieKey; use near_primitives::types::{AccountId, ShardId, StateRoot, TrieNodesCount}; use std::collections::HashMap; -use std::sync::{Arc, Condvar, Mutex, MutexGuard}; +use std::sync::Arc; use std::thread; const MAX_QUEUED_WORK_ITEMS: usize = 16 * 1024; @@ -107,11 +108,10 @@ pub enum PrefetchError { /// without the prefetcher, because the order in which it sees accesses is /// independent of the prefetcher. #[derive(Clone)] -pub(crate) struct PrefetchStagingArea(Arc); +pub(crate) struct PrefetchStagingArea(Arc>); struct InnerPrefetchStagingArea { - slots: Mutex, - slots_update_cvar: Condvar, + slots: SizeTrackedHashMap, } /// Result when atomically accessing the prefetch staging area. @@ -311,11 +311,8 @@ impl TriePrefetchingStorage { impl PrefetchStagingArea { fn new(shard_id: ShardId) -> Self { - let inner = InnerPrefetchStagingArea { - slots: Mutex::new(SizeTrackedHashMap::new(shard_id)), - slots_update_cvar: Condvar::new(), - }; - Self(Arc::new(inner)) + let inner = InnerPrefetchStagingArea { slots: SizeTrackedHashMap::new(shard_id) }; + Self(Arc::new(Monitor::new(inner))) } /// Release a slot in the prefetcher staging area. @@ -326,8 +323,8 @@ impl PrefetchStagingArea { /// 2: IO thread misses in the shard cache on the same key and starts fetching it again. /// 3: Main thread value is inserted in shard cache. pub(crate) fn release(&self, key: &CryptoHash) { - let mut guard = self.lock_slots(); - let dropped = guard.remove(key); + let mut guard = self.0.lock_mut(); + let dropped = guard.slots.remove(key); // `Done` is the result after a successful prefetch. // `PendingFetch` means the value has been read without a prefetch. // `None` means prefetching was stopped due to memory limits. @@ -339,9 +336,6 @@ impl PrefetchStagingArea { ) || prefetch_state_matches(PrefetchSlot::PendingFetch, dropped.as_ref().unwrap()), ); - if dropped.is_some() { - self.notify_slots_update(); - } } /// Block until value is prefetched and then return it. @@ -352,14 +346,14 @@ impl PrefetchStagingArea { /// same data and thus are waiting on each other rather than the DB. /// Of course, that would require prefetching to be moved into an async environment, pub(crate) fn blocking_get(&self, key: CryptoHash) -> Option> { - let mut guard = self.lock_slots(); + let mut guard = self.0.lock(); loop { - match guard.get(&key) { + match guard.slots.get(&key) { Some(PrefetchSlot::Done(value)) => return Some(value.clone()), Some(_) => (), None => return None, } - guard = self.0.slots_update_cvar.wait(guard).expect(POISONED_LOCK_ERR) + guard = self.0.wait(guard); } } @@ -370,9 +364,8 @@ impl PrefetchStagingArea { } fn insert_fetched(&self, key: CryptoHash, value: Arc<[u8]>) { - let mut guard = self.lock_slots(); - guard.insert(key, PrefetchSlot::Done(value)); - self.notify_slots_update(); + let mut guard = self.0.lock_mut(); + guard.slots.insert(key, PrefetchSlot::Done(value)); } /// Get prefetched value if available and otherwise atomically insert the @@ -382,10 +375,10 @@ impl PrefetchStagingArea { key: CryptoHash, set_if_empty: PrefetchSlot, ) -> PrefetcherResult { - let mut guard = self.lock_slots(); + let mut guard = self.0.lock_mut(); let full = - guard.size_bytes > MAX_PREFETCH_STAGING_MEMORY - PREFETCH_RESERVED_BYTES_PER_SLOT; - match guard.map.get(&key) { + guard.slots.size_bytes > MAX_PREFETCH_STAGING_MEMORY - PREFETCH_RESERVED_BYTES_PER_SLOT; + match guard.slots.map.get(&key) { Some(value) => match value { PrefetchSlot::Done(value) => PrefetcherResult::Prefetched(value.clone()), PrefetchSlot::PendingPrefetch | PrefetchSlot::PendingFetch => { @@ -396,26 +389,14 @@ impl PrefetchStagingArea { if full { return PrefetcherResult::MemoryLimitReached; } - guard.insert(key, set_if_empty); - self.notify_slots_update(); + guard.slots.insert(key, set_if_empty); PrefetcherResult::SlotReserved } } } fn clear(&self) { - let mut guard = self.lock_slots(); - guard.clear(); - self.notify_slots_update(); - } - - fn notify_slots_update(&self) { - self.0.slots_update_cvar.notify_all(); - } - - #[track_caller] - fn lock_slots(&self) -> MutexGuard { - self.0.slots.lock().expect(POISONED_LOCK_ERR) + self.0.lock_mut().slots.clear(); } } @@ -576,7 +557,9 @@ mod tests_utils { /// Returns the number of prefetched values currently staged. pub fn num_prefetched_and_staged(&self) -> usize { self.prefetching - .lock_slots() + .0 + .lock() + .slots .map .iter() .filter(|(_key, slot)| match slot { @@ -613,9 +596,16 @@ mod tests { prefetch_staging_area.get_or_set_fetching(key), PrefetcherResult::SlotReserved )); - let prefetch_staging_area_get = prefetch_staging_area.clone(); - let handle = std::thread::spawn(move || prefetch_staging_area_get.blocking_get(key)); - std::thread::yield_now(); + let prefetch_staging_area2 = prefetch_staging_area.clone(); + let handle = std::thread::spawn(move || prefetch_staging_area2.blocking_get(key)); + // We need to sleep here to give some time for the thread above to + // spawn and block. Otherwise `insert_fetched` can happen before + // `blocking_get` which results in data being available immediately, + // so it doesn't actually block. Please note that this should not + // result in any flakiness since the test would still pass if we don't + // sleep enough, it just won't verify the the synchronization part of + // `blocking_get`. + std::thread::sleep(Duration::from_micros(1000)); assert!(!handle.is_finished()); prefetch_staging_area.insert_fetched(key, value.clone()); From 53412e75526b354069672306f2015e6760b31af4 Mon Sep 17 00:00:00 2001 From: Anton Puhach Date: Mon, 19 Dec 2022 13:11:26 +0100 Subject: [PATCH 06/13] Remove tmp guard --- core/store/src/trie/prefetching_trie_storage.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/store/src/trie/prefetching_trie_storage.rs b/core/store/src/trie/prefetching_trie_storage.rs index 6bb0b213c41..a106ff796a6 100644 --- a/core/store/src/trie/prefetching_trie_storage.rs +++ b/core/store/src/trie/prefetching_trie_storage.rs @@ -364,8 +364,7 @@ impl PrefetchStagingArea { } fn insert_fetched(&self, key: CryptoHash, value: Arc<[u8]>) { - let mut guard = self.0.lock_mut(); - guard.slots.insert(key, PrefetchSlot::Done(value)); + self.0.lock_mut().slots.insert(key, PrefetchSlot::Done(value)); } /// Get prefetched value if available and otherwise atomically insert the From f861cd7106c6b82214631bc8fb2ac519137a4ef5 Mon Sep 17 00:00:00 2001 From: Anton Puhach Date: Mon, 19 Dec 2022 14:11:26 +0100 Subject: [PATCH 07/13] fmt --- core/store/src/sync_utils.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/store/src/sync_utils.rs b/core/store/src/sync_utils.rs index 31401c2ca6f..8af4c16cfd8 100644 --- a/core/store/src/sync_utils.rs +++ b/core/store/src/sync_utils.rs @@ -5,7 +5,7 @@ const POISONED_LOCK_ERR: &str = "The lock was poisoned."; /// A convenience wrapper around a SharedMutex and a Condvar. /// -/// It enables blocking while waiting for the underlying value to be updated. +/// It enables blocking while waiting for the underlying value to be updated. /// The implementation ensures that any modification results in all blocked /// threads being notified. pub(crate) struct Monitor { @@ -14,7 +14,7 @@ pub(crate) struct Monitor { } pub(crate) struct MonitorReadGuard<'a, T> { - guard: MutexGuard<'a, T> + guard: MutexGuard<'a, T>, } pub(crate) struct MonitorWriteGuard<'a, T> { @@ -39,11 +39,11 @@ impl Monitor { pub fn wait<'a>(&'a self, guard: MonitorReadGuard<'a, T>) -> MonitorReadGuard<'a, T> { let guard = self.cvar.wait(guard.guard).expect(POISONED_LOCK_ERR); - MonitorReadGuard{ guard } + MonitorReadGuard { guard } } } -impl Deref for MonitorReadGuard<'_, T> { +impl Deref for MonitorReadGuard<'_, T> { type Target = T; fn deref(&self) -> &Self::Target { @@ -65,7 +65,7 @@ impl DerefMut for MonitorWriteGuard<'_, T> { } } -impl Drop for MonitorWriteGuard<'_, T> { +impl Drop for MonitorWriteGuard<'_, T> { fn drop(&mut self) { self.cvar.notify_all(); } From 8e4a16172d55085ccd3264440aaa0949b7d12e1b Mon Sep 17 00:00:00 2001 From: Anton Puhach Date: Mon, 19 Dec 2022 14:52:58 +0100 Subject: [PATCH 08/13] Reorder use --- core/store/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index 7a2436591da..783fa0d8449 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -49,9 +49,9 @@ pub mod metadata; mod metrics; pub mod migrations; mod opener; +mod sync_utils; pub mod test_utils; mod trie; -mod sync_utils; pub use crate::config::{Mode, StoreConfig}; pub use crate::opener::{StoreMigrator, StoreOpener, StoreOpenerError}; From cb59478cc85fb29051efde930e09955266be16fc Mon Sep 17 00:00:00 2001 From: Anton Puhach Date: Mon, 19 Dec 2022 17:49:05 +0100 Subject: [PATCH 09/13] from_micros -> from_millis --- core/store/src/trie/prefetching_trie_storage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/store/src/trie/prefetching_trie_storage.rs b/core/store/src/trie/prefetching_trie_storage.rs index a106ff796a6..9c1df97a9de 100644 --- a/core/store/src/trie/prefetching_trie_storage.rs +++ b/core/store/src/trie/prefetching_trie_storage.rs @@ -604,7 +604,7 @@ mod tests { // result in any flakiness since the test would still pass if we don't // sleep enough, it just won't verify the the synchronization part of // `blocking_get`. - std::thread::sleep(Duration::from_micros(1000)); + std::thread::sleep(Duration::from_millis(1)); assert!(!handle.is_finished()); prefetch_staging_area.insert_fetched(key, value.clone()); From 2ec45462a7693119c92fcf36643192635ec30504 Mon Sep 17 00:00:00 2001 From: Anton Puhach Date: Tue, 20 Dec 2022 10:03:35 +0100 Subject: [PATCH 10/13] Simplify test --- .../src/trie/prefetching_trie_storage.rs | 33 +++++++------------ 1 file changed, 12 insertions(+), 21 deletions(-) diff --git a/core/store/src/trie/prefetching_trie_storage.rs b/core/store/src/trie/prefetching_trie_storage.rs index 9c1df97a9de..bb022a77bfa 100644 --- a/core/store/src/trie/prefetching_trie_storage.rs +++ b/core/store/src/trie/prefetching_trie_storage.rs @@ -584,7 +584,6 @@ mod tests_utils { mod tests { use super::{PrefetchStagingArea, PrefetcherResult}; use near_primitives::hash::CryptoHash; - use std::time::{Duration, Instant}; #[test] fn test_prefetch_staging_area_blocking_get_after_update() { @@ -596,26 +595,18 @@ mod tests { PrefetcherResult::SlotReserved )); let prefetch_staging_area2 = prefetch_staging_area.clone(); - let handle = std::thread::spawn(move || prefetch_staging_area2.blocking_get(key)); - // We need to sleep here to give some time for the thread above to - // spawn and block. Otherwise `insert_fetched` can happen before - // `blocking_get` which results in data being available immediately, - // so it doesn't actually block. Please note that this should not - // result in any flakiness since the test would still pass if we don't - // sleep enough, it just won't verify the the synchronization part of - // `blocking_get`. - std::thread::sleep(Duration::from_millis(1)); - assert!(!handle.is_finished()); - prefetch_staging_area.insert_fetched(key, value.clone()); - - let wait_start = Instant::now(); - while !handle.is_finished() { + let value2 = value.clone(); + // We need to execute `blocking_get` before `insert_fetched` so that + // it blocks while waiting for the value to be updated. Spawning + // a new thread + yielding should give enough time for the main + // thread to make progress. Please note that even if `insert_fetched` + // is executed before `blocking_get`, that still wouldn't result in + // any flakiness since the test would still pass, it just won't verify + // the the synchronization part of `blocking_get`. + std::thread::spawn(move || { std::thread::yield_now(); - assert!( - wait_start.elapsed() < Duration::from_millis(100), - "timeout while waiting for blocking_get to return" - ); - } - assert_eq!(handle.join().unwrap(), Some(value)); + prefetch_staging_area.insert_fetched(key, value2); + }); + assert_eq!(prefetch_staging_area2.blocking_get(key), Some(value)); } } From ceec97ca291ea715894c1618040cc6f10dd91353 Mon Sep 17 00:00:00 2001 From: Anton Puhach Date: Tue, 20 Dec 2022 10:10:08 +0100 Subject: [PATCH 11/13] Use prefetch_staging_area2 async --- core/store/src/trie/prefetching_trie_storage.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/store/src/trie/prefetching_trie_storage.rs b/core/store/src/trie/prefetching_trie_storage.rs index bb022a77bfa..cef1830d852 100644 --- a/core/store/src/trie/prefetching_trie_storage.rs +++ b/core/store/src/trie/prefetching_trie_storage.rs @@ -605,8 +605,8 @@ mod tests { // the the synchronization part of `blocking_get`. std::thread::spawn(move || { std::thread::yield_now(); - prefetch_staging_area.insert_fetched(key, value2); + prefetch_staging_area2.insert_fetched(key, value2); }); - assert_eq!(prefetch_staging_area2.blocking_get(key), Some(value)); + assert_eq!(prefetch_staging_area.blocking_get(key), Some(value)); } } From 965a75b935080a0099846b662bba7db9bdee401c Mon Sep 17 00:00:00 2001 From: Anton Puhach Date: Thu, 22 Dec 2022 09:38:46 +0100 Subject: [PATCH 12/13] Fix comment --- core/store/src/sync_utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/store/src/sync_utils.rs b/core/store/src/sync_utils.rs index 8af4c16cfd8..c2abfccb911 100644 --- a/core/store/src/sync_utils.rs +++ b/core/store/src/sync_utils.rs @@ -3,7 +3,7 @@ use std::sync::{Condvar, Mutex, MutexGuard}; const POISONED_LOCK_ERR: &str = "The lock was poisoned."; -/// A convenience wrapper around a SharedMutex and a Condvar. +/// A convenience wrapper around a Mutex and a Condvar. /// /// It enables blocking while waiting for the underlying value to be updated. /// The implementation ensures that any modification results in all blocked From 04cd64cde7cebe66e0c443bdf15d57b4021e70c8 Mon Sep 17 00:00:00 2001 From: Anton Puhach Date: Thu, 22 Dec 2022 09:39:57 +0100 Subject: [PATCH 13/13] Fix another comment --- core/store/src/trie/prefetching_trie_storage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/store/src/trie/prefetching_trie_storage.rs b/core/store/src/trie/prefetching_trie_storage.rs index cef1830d852..7719d4e6c41 100644 --- a/core/store/src/trie/prefetching_trie_storage.rs +++ b/core/store/src/trie/prefetching_trie_storage.rs @@ -602,7 +602,7 @@ mod tests { // thread to make progress. Please note that even if `insert_fetched` // is executed before `blocking_get`, that still wouldn't result in // any flakiness since the test would still pass, it just won't verify - // the the synchronization part of `blocking_get`. + // the synchronization part of `blocking_get`. std::thread::spawn(move || { std::thread::yield_now(); prefetch_staging_area2.insert_fetched(key, value2);