diff --git a/Cargo.toml b/Cargo.toml index b81da6f6..0fb928b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ default = ["atomic64", "quanta"] sync = [] # Enable this feature to use `moka::future::Cache`. -future = ["async-lock", "async-trait", "event-listener", "futures-util"] +future = ["async-lock", "event-listener", "futures-util"] # Enable this feature to activate optional logging from caches. # Currently cache will emit log only when it encounters a panic in user provided @@ -60,7 +60,6 @@ quanta = { version = "0.12.2", optional = true } # Optional dependencies (future) async-lock = { version = "3.3", optional = true } -async-trait = { version = "0.1.58", optional = true } event-listener = { version = "5.3", optional = true } futures-util = { version = "0.3.17", optional = true } diff --git a/src/future/base_cache.rs b/src/future/base_cache.rs index f495bba1..0c9d0a5f 100644 --- a/src/future/base_cache.rs +++ b/src/future/base_cache.rs @@ -1,6 +1,6 @@ use super::{ - housekeeper::{Housekeeper, InnerSync}, - invalidator::{GetOrRemoveEntry, Invalidator, KeyDateLite, PredicateFun}, + housekeeper::Housekeeper, + invalidator::{Invalidator, KeyDateLite, PredicateFun}, key_lock::{KeyLock, KeyLockMap}, notifier::RemovalNotifier, InterruptedOp, PredicateId, @@ -36,7 +36,6 @@ use crate::{ use common::concurrent::debug_counters::CacheDebugStats; use async_lock::{Mutex, MutexGuard, RwLock}; -use async_trait::async_trait; use crossbeam_channel::{Receiver, Sender, TrySendError}; use crossbeam_utils::atomic::AtomicCell; use futures_util::future::BoxFuture; @@ -384,7 +383,7 @@ where #[inline] pub(crate) async fn apply_reads_writes_if_needed( - inner: &Arc, + inner: &Arc>, ch: &Sender>, now: Instant, housekeeper: Option<&HouseKeeperArc>, @@ -620,7 +619,7 @@ where #[inline] pub(crate) async fn schedule_write_op( - inner: &Arc, + inner: &Arc>, ch: &Sender>, ch_ready_event: &event_listener::Event<()>, op: WriteOp, @@ -1042,7 +1041,7 @@ pub(crate) struct Inner { max_capacity: Option, entry_count: AtomicCell, weighted_size: AtomicCell, - cache: CacheStore, + pub(crate) cache: CacheStore, build_hasher: S, deques: Mutex>, timer_wheel: Mutex>, @@ -1050,7 +1049,7 @@ pub(crate) struct Inner { frequency_sketch_enabled: AtomicBool, read_op_ch: Receiver>, write_op_ch: Receiver>, - write_op_ch_ready_event: event_listener::Event, + pub(crate) write_op_ch_ready_event: event_listener::Event, eviction_policy: EvictionPolicyConfig, expiration_policy: ExpirationPolicy, valid_after: AtomicInstant, @@ -1101,7 +1100,7 @@ impl Inner { } #[inline] - fn is_removal_notifier_enabled(&self) -> bool { + pub(crate) fn is_removal_notifier_enabled(&self) -> bool { self.removal_notifier.is_some() } @@ -1118,7 +1117,7 @@ impl Inner { ) } - fn maybe_key_lock(&self, key: &Arc) -> Option> + pub(crate) fn maybe_key_lock(&self, key: &Arc) -> Option> where K: Hash + Eq, S: BuildHasher, @@ -1127,7 +1126,7 @@ impl Inner { } #[inline] - fn current_time_from_expiration_clock(&self) -> Instant { + pub(crate) fn current_time_from_expiration_clock(&self) -> Instant { if self.clocks.has_expiration_clock.load(Ordering::Relaxed) { Instant::new( self.clocks @@ -1191,7 +1190,7 @@ impl Inner where K: Hash + Eq + Send + Sync + 'static, V: Send + Sync + 'static, - S: BuildHasher + Clone, + S: BuildHasher + Send + Sync + Clone + 'static, { // Disable a Clippy warning for having more than seven arguments. // https://rust-lang.github.io/rust-clippy/master/index.html#too_many_arguments @@ -1354,75 +1353,6 @@ where } } -#[async_trait] -impl GetOrRemoveEntry for Inner -where - K: Hash + Eq, - S: BuildHasher + Send + Sync + 'static, -{ - fn get_value_entry(&self, key: &Arc, hash: u64) -> Option>> { - self.cache.get(hash, |k| k == key) - } - - async fn remove_key_value_if( - &self, - key: &Arc, - hash: u64, - condition: F, - ) -> Option>> - where - K: Send + Sync + 'static, - V: Clone + Send + Sync + 'static, - F: for<'a, 'b> FnMut(&'a Arc, &'b TrioArc>) -> bool + Send, - { - // Lock the key for removal if blocking removal notification is enabled. - let kl = self.maybe_key_lock(key); - let _klg = if let Some(lock) = &kl { - Some(lock.lock().await) - } else { - None - }; - - let maybe_entry = self.cache.remove_if(hash, |k| k == key, condition); - if let Some(entry) = &maybe_entry { - if self.is_removal_notifier_enabled() { - self.notify_single_removal(Arc::clone(key), entry, RemovalCause::Explicit) - .await; - } - } - maybe_entry - } -} - -#[async_trait] -impl InnerSync for Inner -where - K: Hash + Eq + Send + Sync + 'static, - V: Clone + Send + Sync + 'static, - S: BuildHasher + Clone + Send + Sync + 'static, -{ - /// Runs the pending tasks. Returns `true` if there are more entries to evict. - async fn run_pending_tasks( - &self, - timeout: Option, - max_log_sync_repeats: u32, - eviction_batch_size: u32, - ) -> bool { - self.do_run_pending_tasks(timeout, max_log_sync_repeats, eviction_batch_size) - .await - } - - /// Notifies all the async tasks waiting in `BaseCache::schedule_write_op` method - /// for the write op channel to have enough room. - fn notify_write_op_ch_is_ready(&self) { - self.write_op_ch_ready_event.notify(usize::MAX); - } - - fn now(&self) -> Instant { - self.current_time_from_expiration_clock() - } -} - impl Inner where K: Hash + Eq + Send + Sync + 'static, @@ -1430,7 +1360,7 @@ where S: BuildHasher + Clone + Send + Sync + 'static, { /// Runs the pending tasks. Returns `true` if there are more entries to evict. - async fn do_run_pending_tasks( + pub(crate) async fn do_run_pending_tasks( &self, timeout: Option, max_log_sync_repeats: u32, @@ -2643,7 +2573,7 @@ where K: Send + Sync + 'static, V: Clone + Send + Sync + 'static, { - async fn notify_single_removal( + pub(crate) async fn notify_single_removal( &self, key: Arc, entry: &TrioArc>, diff --git a/src/future/cache.rs b/src/future/cache.rs index bd7b3f46..77785d1f 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -1,6 +1,6 @@ use super::{ base_cache::BaseCache, - value_initializer::{GetOrInsert, InitResult, ValueInitializer}, + value_initializer::{InitResult, ValueInitializer}, CacheBuilder, CancelGuard, Iter, OwnedKeyEntrySelector, PredicateId, RefKeyEntrySelector, WriteOp, }; @@ -15,7 +15,6 @@ use crate::{ #[cfg(feature = "unstable-debug-counters")] use crate::common::concurrent::debug_counters::CacheDebugStats; -use async_trait::async_trait; use std::{ borrow::Borrow, collections::hash_map::RandomState, @@ -631,7 +630,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; /// [builder-name-method]: ./struct.CacheBuilder.html#method.name /// pub struct Cache { - base: BaseCache, + pub(crate) base: BaseCache, value_initializer: Arc>, #[cfg(test)] @@ -1806,7 +1805,7 @@ where } } - async fn insert_with_hash(&self, key: Arc, hash: u64, value: V) { + pub(crate) async fn insert_with_hash(&self, key: Arc, hash: u64, value: V) { if self.base.is_map_disabled() { return; } @@ -1901,7 +1900,12 @@ where } } - async fn invalidate_with_hash(&self, key: &Q, hash: u64, need_value: bool) -> Option + pub(crate) async fn invalidate_with_hash( + &self, + key: &Q, + hash: u64, + need_value: bool, + ) -> Option where K: Borrow, Q: Hash + Eq + ?Sized, @@ -2015,44 +2019,6 @@ where } } -#[async_trait] -impl GetOrInsert for Cache -where - K: Hash + Eq + Send + Sync + 'static, - V: Clone + Send + Sync + 'static, - S: BuildHasher + Clone + Send + Sync + 'static, -{ - async fn get_without_recording( - &self, - key: &Arc, - hash: u64, - replace_if: Option<&mut I>, - ) -> Option - where - I: for<'i> FnMut(&'i V) -> bool + Send, - { - self.base - .get_with_hash(key, hash, replace_if, false, false) - .await - .map(Entry::into_value) - } - - async fn get_entry(&self, key: &Arc, hash: u64) -> Option> { - let ignore_if = None as Option<&mut fn(&V) -> bool>; - self.base - .get_with_hash(key, hash, ignore_if, true, true) - .await - } - - async fn insert(&self, key: Arc, hash: u64, value: V) { - self.insert_with_hash(key.clone(), hash, value).await; - } - - async fn remove(&self, key: &Arc, hash: u64) -> Option { - self.invalidate_with_hash(key, hash, true).await - } -} - // For unit tests. // For unit tests. #[cfg(test)] diff --git a/src/future/housekeeper.rs b/src/future/housekeeper.rs index 49cf334d..7149019a 100644 --- a/src/future/housekeeper.rs +++ b/src/future/housekeeper.rs @@ -8,6 +8,7 @@ use crate::common::{ }; use std::{ + hash::{BuildHasher, Hash}, sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -19,26 +20,9 @@ use std::{ use std::sync::atomic::AtomicUsize; use async_lock::Mutex; -use async_trait::async_trait; use futures_util::future::{BoxFuture, Shared}; -#[async_trait] -pub(crate) trait InnerSync { - /// Runs the pending tasks. Returns `true` if there are more entries to evict in - /// next run. - async fn run_pending_tasks( - &self, - timeout: Option, - max_log_sync_repeats: u32, - eviction_batch_size: u32, - ) -> bool; - - /// Notifies all the async tasks waiting in `BaseCache::schedule_write_op` method - /// for the write op channel to have enough room. - fn notify_write_op_ch_is_ready(&self); - - fn now(&self) -> Instant; -} +use super::base_cache::Inner; pub(crate) struct Housekeeper { /// A shared `Future` of the maintenance task that is currently being resolved. @@ -124,9 +108,11 @@ impl Housekeeper { && (ch_len >= ch_flush_point || now >= self.run_after.instant().unwrap()) } - pub(crate) async fn run_pending_tasks(&self, cache: Arc) + pub(crate) async fn run_pending_tasks(&self, cache: Arc>) where - T: InnerSync + Send + Sync + 'static, + K: Hash + Eq + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, + S: BuildHasher + Clone + Send + Sync + 'static, { let mut current_task = self.current_task.lock().await; self.do_run_pending_tasks(Arc::clone(&cache), &mut current_task) @@ -136,14 +122,16 @@ impl Housekeeper { // If there are any async tasks waiting in `BaseCache::schedule_write_op` // method for the write op channel, notify them. - cache.notify_write_op_ch_is_ready(); + cache.write_op_ch_ready_event.notify(usize::MAX); } /// Tries to run the pending tasks if the lock is free. Returns `true` if there /// are more entries to evict in next run. - pub(crate) async fn try_run_pending_tasks(&self, cache: &Arc) -> bool + pub(crate) async fn try_run_pending_tasks(&self, cache: &Arc>) -> bool where - T: InnerSync + Send + Sync + 'static, + K: Hash + Eq + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, + S: BuildHasher + Clone + Send + Sync + 'static, { if let Some(mut current_task) = self.current_task.try_lock() { self.do_run_pending_tasks(Arc::clone(cache), &mut current_task) @@ -156,20 +144,22 @@ impl Housekeeper { // If there are any async tasks waiting in `BaseCache::schedule_write_op` // method for the write op channel, notify them. - cache.notify_write_op_ch_is_ready(); + cache.write_op_ch_ready_event.notify(usize::MAX); true } - async fn do_run_pending_tasks( + async fn do_run_pending_tasks( &self, - cache: Arc, + cache: Arc>, current_task: &mut Option>>, ) where - T: InnerSync + Send + Sync + 'static, + K: Hash + Eq + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, + S: BuildHasher + Clone + Send + Sync + 'static, { use futures_util::FutureExt; - let now = cache.now(); + let now = cache.current_time_from_expiration_clock(); let more_to_evict; // Async Cancellation Safety: Our maintenance task is cancellable as we save // it in the lock. If it is canceled, we will resume it in the next run. @@ -183,9 +173,13 @@ impl Housekeeper { let repeats = self.max_log_sync_repeats; let batch_size = self.eviction_batch_size; // Create a new maintenance task and await it. - let task = async move { cache.run_pending_tasks(timeout, repeats, batch_size).await } - .boxed() - .shared(); + let task = async move { + cache + .do_run_pending_tasks(timeout, repeats, batch_size) + .await + } + .boxed() + .shared(); *current_task = Some(task.clone()); #[cfg(test)] diff --git a/src/future/invalidator.rs b/src/future/invalidator.rs index d34138bb..4c159f6c 100644 --- a/src/future/invalidator.rs +++ b/src/future/invalidator.rs @@ -1,14 +1,14 @@ -use super::{PredicateId, PredicateIdStr}; +use super::{base_cache::Inner, PredicateId, PredicateIdStr}; use crate::{ common::{ concurrent::{AccessTime, KvEntry, ValueEntry}, time::Instant, }, + notification::RemovalCause, PredicateError, }; use async_lock::{Mutex, MutexGuard}; -use async_trait::async_trait; use std::{ hash::{BuildHasher, Hash}, sync::{ @@ -23,22 +23,6 @@ pub(crate) type PredicateFun = Arc bool + Send + Sync + const PREDICATE_MAP_NUM_SEGMENTS: usize = 16; -#[async_trait] -pub(crate) trait GetOrRemoveEntry { - fn get_value_entry(&self, key: &Arc, hash: u64) -> Option>>; - - async fn remove_key_value_if( - &self, - key: &Arc, - hash: u64, - condition: F, - ) -> Option>> - where - K: Send + Sync + 'static, - V: Clone + Send + Sync + 'static, - F: for<'a, 'b> FnMut(&'a Arc, &'b TrioArc>) -> bool + Send; -} - pub(crate) struct KeyDateLite { key: Arc, hash: u64, @@ -161,7 +145,7 @@ impl Invalidator { where K: Hash + Eq + Send + Sync + 'static, V: Clone + Send + Sync + 'static, - S: BuildHasher, + S: BuildHasher + Send + Sync + 'static, { if self.is_empty() { false @@ -177,17 +161,16 @@ impl Invalidator { } } - pub(crate) async fn scan_and_invalidate( + pub(crate) async fn scan_and_invalidate( &self, - cache: &C, + cache: &Inner, candidates: Vec>, is_truncated: bool, ) -> (Vec>, bool) where - C: GetOrRemoveEntry, K: Hash + Eq + Send + Sync + 'static, V: Clone + Send + Sync + 'static, - S: BuildHasher, + S: BuildHasher + Send + Sync + 'static, { let mut predicates = self.scan_context.predicates.lock().await; if predicates.is_empty() { @@ -221,7 +204,11 @@ impl Invalidator { // // Private methods. // -impl Invalidator { +impl Invalidator +where + K: Hash + Eq, + S: BuildHasher + Send + Sync + 'static, +{ #[inline] fn do_apply_predicates(predicates: I, key: &K, value: &V, ts: Instant) -> bool where @@ -280,18 +267,15 @@ impl Invalidator { } } - fn apply( + fn apply( &self, predicates: &[Predicate], - cache: &C, + cache: &Inner, key: &Arc, hash: u64, ts: Instant, - ) -> bool - where - C: GetOrRemoveEntry, - { - if let Some(entry) = cache.get_value_entry(key, hash) { + ) -> bool { + if let Some(entry) = cache.cache.get(hash, |k| k == key) { if let Some(lm) = entry.last_modified() { if lm == ts { return Self::do_apply_predicates( @@ -307,26 +291,43 @@ impl Invalidator { false } - async fn invalidate( - cache: &C, + async fn invalidate( + cache: &Inner, key: &Arc, hash: u64, ts: Instant, ) -> Option>> where - C: GetOrRemoveEntry, K: Send + Sync + 'static, V: Clone + Send + Sync + 'static, { - cache - .remove_key_value_if(key, hash, |_, v| { + // Lock the key for removal if blocking removal notification is enabled. + let kl = cache.maybe_key_lock(key); + let _klg = if let Some(lock) = &kl { + Some(lock.lock().await) + } else { + None + }; + + let maybe_entry = cache.cache.remove_if( + hash, + |k| k == key, + |_, v| { if let Some(lm) = v.last_modified() { lm == ts } else { false } - }) - .await + }, + ); + if let Some(entry) = &maybe_entry { + if cache.is_removal_notifier_enabled() { + cache + .notify_single_removal(Arc::clone(key), entry, RemovalCause::Explicit) + .await; + } + } + maybe_entry } } diff --git a/src/future/value_initializer.rs b/src/future/value_initializer.rs index 1af827da..63b7a341 100644 --- a/src/future/value_initializer.rs +++ b/src/future/value_initializer.rs @@ -1,5 +1,4 @@ use async_lock::{RwLock, RwLockWriteGuard}; -use async_trait::async_trait; use futures_util::FutureExt; use std::{ any::{Any, TypeId}, @@ -16,37 +15,10 @@ use crate::{ Entry, }; -use super::{ComputeNone, OptionallyNone}; +use super::{Cache, ComputeNone, OptionallyNone}; const WAITER_MAP_NUM_SEGMENTS: usize = 64; -#[async_trait] -pub(crate) trait GetOrInsert { - /// Gets a value for the given key without recording the access to the cache - /// policies. - async fn get_without_recording( - &self, - key: &Arc, - hash: u64, - replace_if: Option<&mut I>, - ) -> Option - where - V: 'static, - I: for<'i> FnMut(&'i V) -> bool + Send; - - /// Gets an entry for the given key _with_ recording the access to the cache - /// policies. - async fn get_entry(&self, key: &Arc, hash: u64) -> Option> - where - V: 'static; - - /// Inserts a value for the given key. - async fn insert(&self, key: Arc, hash: u64, value: V); - - /// Removes a value for the given key. Returns the removed value. - async fn remove(&self, key: &Arc, hash: u64) -> Option; -} - type ErrorObject = Arc; pub(crate) enum InitResult { @@ -151,7 +123,7 @@ impl ValueInitializer where K: Eq + Hash + Send + Sync + 'static, V: Clone + Send + Sync + 'static, - S: BuildHasher + Send + Sync + 'static, + S: BuildHasher + Clone + Send + Sync + 'static, { pub(crate) fn with_hasher(hasher: S) -> Self { Self { @@ -175,12 +147,12 @@ where /// # Panics /// Panics if the `init` future has been panicked. #[allow(clippy::too_many_arguments)] - pub(crate) async fn try_init_or_read<'a, C, I, O, E>( - &'a self, + pub(crate) async fn try_init_or_read( + &self, c_key: &Arc, c_hash: u64, type_id: TypeId, - cache: &C, + cache: &Cache, mut ignore_if: Option, // Future to initialize a new value. init: Pin<&mut impl Future>, @@ -189,7 +161,6 @@ where post_init: fn(O) -> Result, ) -> InitResult where - C: GetOrInsert + Send + 'a, I: FnMut(&V) -> bool + Send, E: Send + Sync + 'static, { @@ -251,8 +222,10 @@ where // Check if the value has already been inserted by other thread. if let Some(value) = cache - .get_without_recording(c_key, c_hash, ignore_if.as_mut()) + .base + .get_with_hash(c_key, c_hash, ignore_if.as_mut(), false, false) .await + .map(Entry::into_value) { // Yes. Set the waiter value, remove our waiter, and return // the existing value. @@ -267,7 +240,9 @@ where // Resolved. Ok(value) => match post_init(value) { Ok(value) => { - cache.insert(Arc::clone(c_key), c_hash, value.clone()).await; + cache + .insert_with_hash(Arc::clone(c_key), c_hash, value.clone()) + .await; waiter_guard.set_waiter_value(WaiterValue::Ready(Ok(value.clone()))); Initialized(value) } @@ -288,17 +263,16 @@ where /// # Panics /// Panics if the `init` future has been panicked. - pub(crate) async fn try_compute<'a, C, F, Fut, O, E>( + pub(crate) async fn try_compute<'a, F, Fut, O, E>( &'a self, c_key: Arc, c_hash: u64, - cache: &C, + cache: &Cache, f: F, post_init: fn(O) -> Result, E>, allow_nop: bool, ) -> Result, E> where - C: GetOrInsert + Send + 'a, F: FnOnce(Option>) -> Fut, Fut: Future + 'a, E: Send + Sync + 'static, @@ -344,7 +318,11 @@ where let waiter_guard = WaiterGuard::new(w_key, w_hash, &self.waiters, lock); // Get the current value. - let maybe_entry = cache.get_entry(&c_key, c_hash).await; + let ignore_if = None as Option<&mut fn(&V) -> bool>; + let maybe_entry = cache + .base + .get_with_hash(&c_key, c_hash, ignore_if, true, true) + .await; let maybe_value = if allow_nop { maybe_entry.as_ref().map(|ent| ent.value().clone()) } else { @@ -394,7 +372,7 @@ where } Op::Put(value) => { cache - .insert(Arc::clone(&c_key), c_hash, value.clone()) + .insert_with_hash(Arc::clone(&c_key), c_hash, value.clone()) .await; if entry_existed { crossbeam_epoch::pin().flush(); @@ -406,7 +384,7 @@ where } } Op::Remove => { - let maybe_prev_v = cache.remove(&c_key, c_hash).await; + let maybe_prev_v = cache.invalidate_with_hash(&c_key, c_hash, true).await; if let Some(prev_v) = maybe_prev_v { crossbeam_epoch::pin().flush(); let entry = Entry::new(Some(c_key), prev_v, false, false); diff --git a/src/sync/cache.rs b/src/sync/cache.rs index 03a7c3ac..789855a1 100644 --- a/src/sync/cache.rs +++ b/src/sync/cache.rs @@ -1,5 +1,5 @@ use super::{ - value_initializer::{GetOrInsert, InitResult, ValueInitializer}, + value_initializer::{InitResult, ValueInitializer}, CacheBuilder, OwnedKeyEntrySelector, RefKeyEntrySelector, }; use crate::{ @@ -572,7 +572,7 @@ use std::{ /// pub struct Cache { - base: BaseCache, + pub(crate) base: BaseCache, value_initializer: Arc>, } @@ -1848,27 +1848,6 @@ where } } -impl GetOrInsert for Cache -where - K: Hash + Eq + Send + Sync + 'static, - V: Clone + Send + Sync + 'static, - S: BuildHasher + Clone + Send + Sync + 'static, -{ - fn get_entry(&self, key: &Arc, hash: u64) -> Option> { - let ignore_if = None as Option<&mut fn(&V) -> bool>; - self.base - .get_with_hash_and_ignore_if(key, hash, ignore_if, true) - } - - fn insert(&self, key: Arc, hash: u64, value: V) { - self.insert_with_hash(key.clone(), hash, value); - } - - fn remove(&self, key: &Arc, hash: u64) -> Option { - self.invalidate_with_hash(key, hash, true) - } -} - // For unit tests. #[cfg(test)] impl Cache { diff --git a/src/sync/value_initializer.rs b/src/sync/value_initializer.rs index ca7c8b3a..f66aa9ef 100644 --- a/src/sync/value_initializer.rs +++ b/src/sync/value_initializer.rs @@ -12,24 +12,10 @@ use crate::{ Entry, }; -use super::{ComputeNone, OptionallyNone}; +use super::{Cache, ComputeNone, OptionallyNone}; const WAITER_MAP_NUM_SEGMENTS: usize = 64; -pub(crate) trait GetOrInsert { - /// Gets an entry for the given key _with_ recording the access to the cache - /// policies. - fn get_entry(&self, key: &Arc, hash: u64) -> Option> - where - V: 'static; - - /// Inserts a value for the given key. - fn insert(&self, key: Arc, hash: u64, value: V); - - /// Removes a value for the given key. Returns the removed value. - fn remove(&self, key: &Arc, hash: u64) -> Option; -} - type ErrorObject = Arc; // type WaiterValue = Option>; @@ -70,9 +56,9 @@ pub(crate) struct ValueInitializer { impl ValueInitializer where - K: Eq + Hash, - V: Clone, - S: BuildHasher, + K: Hash + Eq + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, + S: BuildHasher + Clone + Send + Sync + 'static, { pub(crate) fn with_hasher(hasher: S) -> Self { Self { @@ -190,18 +176,17 @@ where /// # Panics /// Panics if the `init` closure has been panicked. - pub(crate) fn try_compute<'a, C, F, O, E>( - &'a self, + pub(crate) fn try_compute( + &self, c_key: Arc, c_hash: u64, - cache: &C, + cache: &Cache, f: F, post_init: fn(O) -> Result, E>, allow_nop: bool, ) -> Result, E> where V: 'static, - C: GetOrInsert + Send + 'a, F: FnOnce(Option>) -> O, E: Send + Sync + 'static, { @@ -240,7 +225,10 @@ where // Our waiter was inserted. // Get the current value. - let maybe_entry = cache.get_entry(&c_key, c_hash); + let ignore_if = None as Option<&mut fn(&V) -> bool>; + let maybe_entry = cache + .base + .get_with_hash_and_ignore_if(&c_key, c_hash, ignore_if, true); let maybe_value = if allow_nop { maybe_entry.as_ref().map(|ent| ent.value().clone()) } else { @@ -287,7 +275,7 @@ where } } Op::Put(value) => { - cache.insert(Arc::clone(&c_key), c_hash, value.clone()); + cache.insert_with_hash(Arc::clone(&c_key), c_hash, value.clone()); if entry_existed { crossbeam_epoch::pin().flush(); let entry = Entry::new(Some(c_key), value, true, true); @@ -298,7 +286,7 @@ where } } Op::Remove => { - let maybe_prev_v = cache.remove(&c_key, c_hash); + let maybe_prev_v = cache.invalidate_with_hash(&c_key, c_hash, true); if let Some(prev_v) = maybe_prev_v { crossbeam_epoch::pin().flush(); let entry = Entry::new(Some(c_key), prev_v, false, false); diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index 64468479..fe951d29 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -1,5 +1,5 @@ use super::{ - invalidator::{GetOrRemoveEntry, Invalidator, KeyDateLite, PredicateFun}, + invalidator::{Invalidator, KeyDateLite, PredicateFun}, iter::ScanningGet, key_lock::{KeyLock, KeyLockMap}, PredicateId, @@ -910,7 +910,7 @@ pub(crate) struct Inner { max_capacity: Option, entry_count: AtomicCell, weighted_size: AtomicCell, - cache: CacheStore, + pub(crate) cache: CacheStore, build_hasher: S, deques: Mutex>, timer_wheel: Mutex>, @@ -968,11 +968,11 @@ impl Inner { } #[inline] - fn is_removal_notifier_enabled(&self) -> bool { + pub(crate) fn is_removal_notifier_enabled(&self) -> bool { self.removal_notifier.is_some() } - fn maybe_key_lock(&self, key: &Arc) -> Option> + pub(crate) fn maybe_key_lock(&self, key: &Arc) -> Option> where K: Hash + Eq, S: BuildHasher, @@ -1208,39 +1208,6 @@ where } } -impl GetOrRemoveEntry for Inner -where - K: Hash + Eq, - S: BuildHasher, -{ - fn get_value_entry(&self, key: &Arc, hash: u64) -> Option>> { - self.cache.get(hash, |k| k == key) - } - - fn remove_key_value_if( - &self, - key: &Arc, - hash: u64, - condition: impl FnMut(&Arc, &TrioArc>) -> bool, - ) -> Option>> - where - K: Send + Sync + 'static, - V: Clone + Send + Sync + 'static, - { - // Lock the key for removal if blocking removal notification is enabled. - let kl = self.maybe_key_lock(key); - let _klg = &kl.as_ref().map(|kl| kl.lock()); - - let maybe_entry = self.cache.remove_if(hash, |k| k == key, condition); - if let Some(entry) = &maybe_entry { - if self.is_removal_notifier_enabled() { - self.notify_single_removal(Arc::clone(key), entry, RemovalCause::Explicit); - } - } - maybe_entry - } -} - impl InnerSync for Inner where K: Hash + Eq + Send + Sync + 'static, @@ -2398,7 +2365,7 @@ where K: Send + Sync + 'static, V: Clone + Send + Sync + 'static, { - fn notify_single_removal( + pub(crate) fn notify_single_removal( &self, key: Arc, entry: &TrioArc>, diff --git a/src/sync_base/invalidator.rs b/src/sync_base/invalidator.rs index c1ba0284..dfb62c42 100644 --- a/src/sync_base/invalidator.rs +++ b/src/sync_base/invalidator.rs @@ -1,9 +1,10 @@ -use super::{PredicateId, PredicateIdStr}; +use super::{base_cache::Inner, PredicateId, PredicateIdStr}; use crate::{ common::{ concurrent::{AccessTime, KvEntry, ValueEntry}, time::Instant, }, + notification::RemovalCause, PredicateError, }; @@ -22,20 +23,6 @@ pub(crate) type PredicateFun = Arc bool + Send + Sync + const PREDICATE_MAP_NUM_SEGMENTS: usize = 16; -pub(crate) trait GetOrRemoveEntry { - fn get_value_entry(&self, key: &Arc, hash: u64) -> Option>>; - - fn remove_key_value_if( - &self, - key: &Arc, - hash: u64, - condition: impl FnMut(&Arc, &TrioArc>) -> bool, - ) -> Option>> - where - K: Send + Sync + 'static, - V: Clone + Send + Sync + 'static; -} - pub(crate) struct KeyDateLite { key: Arc, hash: u64, @@ -174,14 +161,13 @@ impl Invalidator { } } - pub(crate) fn scan_and_invalidate( + pub(crate) fn scan_and_invalidate( &self, - cache: &C, + cache: &Inner, candidates: Vec>, is_truncated: bool, ) -> (Vec>, bool) where - C: GetOrRemoveEntry, K: Hash + Eq + Send + Sync + 'static, V: Clone + Send + Sync + 'static, S: BuildHasher, @@ -218,7 +204,11 @@ impl Invalidator { // // Private methods. // -impl Invalidator { +impl Invalidator +where + K: Hash + Eq, + S: BuildHasher, +{ #[inline] fn do_apply_predicates(predicates: I, key: &K, value: &V, ts: Instant) -> bool where @@ -277,18 +267,15 @@ impl Invalidator { } } - fn apply( + fn apply( &self, predicates: &[Predicate], - cache: &C, + cache: &Inner, key: &Arc, hash: u64, ts: Instant, - ) -> bool - where - C: GetOrRemoveEntry, - { - if let Some(entry) = cache.get_value_entry(key, hash) { + ) -> bool { + if let Some(entry) = cache.cache.get(hash, |k| k == key) { if let Some(lm) = entry.last_modified() { if lm == ts { return Invalidator::<_, _, S>::do_apply_predicates( @@ -304,24 +291,37 @@ impl Invalidator { false } - fn invalidate( - cache: &C, + fn invalidate( + cache: &Inner, key: &Arc, hash: u64, ts: Instant, ) -> Option>> where - C: GetOrRemoveEntry, K: Send + Sync + 'static, V: Clone + Send + Sync + 'static, { - cache.remove_key_value_if(key, hash, |_, v| { - if let Some(lm) = v.last_modified() { - lm == ts - } else { - false + // Lock the key for removal if blocking removal notification is enabled. + let kl = cache.maybe_key_lock(key); + let _klg = &kl.as_ref().map(|kl| kl.lock()); + + let maybe_entry = cache.cache.remove_if( + hash, + |k| k == key, + |_, v| { + if let Some(lm) = v.last_modified() { + lm == ts + } else { + false + } + }, + ); + if let Some(entry) = &maybe_entry { + if cache.is_removal_notifier_enabled() { + cache.notify_single_removal(Arc::clone(key), entry, RemovalCause::Explicit); } - }) + } + maybe_entry } }