diff --git a/.vscode/settings.json b/.vscode/settings.json index 9d1719f1..22ae28b7 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -64,6 +64,7 @@ "Uninit", "unsync", "Upsert", + "upserted", "usize" ], "files.watcherExclude": { diff --git a/CHANGELOG.md b/CHANGELOG.md index 09d64a46..5eb75834 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,19 @@ # Moka Cache — Change Log +## Version 0.12.3 + +### Added + +- Added the upsert and compute methods for modifying a cached entry + ([#370][gh-pull-0370]): + - Now the `entry` or `entry_by_ref` APIs have the following methods: + - `and_upsert_with` method to insert or update the entry. + - `and_compute_with` method to insert, update, remove or do nothing on the + entry. + - `and_try_compute_with` method, which is similar to above but returns + `Result`. + + ## Version 0.12.2 ### Fixed @@ -781,6 +795,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (Mar 25, 2021). [gh-issue-0034]: https://github.com/moka-rs/moka/issues/34/ [gh-issue-0031]: https://github.com/moka-rs/moka/issues/31/ +[gh-pull-0370]: https://github.com/moka-rs/moka/pull/370/ [gh-pull-0363]: https://github.com/moka-rs/moka/pull/363/ [gh-pull-0350]: https://github.com/moka-rs/moka/pull/350/ [gh-pull-0348]: https://github.com/moka-rs/moka/pull/348/ diff --git a/Cargo.toml b/Cargo.toml index d89d6402..ae460947 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "moka" -version = "0.12.2" +version = "0.12.3" edition = "2021" # Rust 1.65 was released on Nov 3, 2022. rust-version = "1.65" @@ -81,7 +81,7 @@ getrandom = "0.2" paste = "1.0.9" reqwest = { version = "0.11.11", default-features = false, features = ["rustls-tls"] } skeptic = "0.13" -tokio = { version = "1.19", features = ["fs", "macros", "rt-multi-thread", "sync", "time" ] } +tokio = { version = "1.19", features = ["fs", "io-util", "macros", "rt-multi-thread", "sync", "time" ] } [target.'cfg(trybuild)'.dev-dependencies] trybuild = "1.0" @@ -106,6 +106,14 @@ rustdoc-args = ["--cfg", "docsrs"] # Examples +[[example]] +name = "append_value_async" +required-features = ["future"] + +[[example]] +name = "append_value_sync" +required-features = ["sync"] + [[example]] name = "basics_async" required-features = ["future"] @@ -114,10 +122,26 @@ required-features = ["future"] name = "basics_sync" required-features = ["sync"] +[[example]] +name = "bounded_counter_async" +required-features = ["future"] + +[[example]] +name = "bounded_counter_sync" +required-features = ["sync"] + [[example]] name = "cascading_drop_async" required-features = ["future"] +[[example]] +name = "counter_async" +required-features = ["future"] + +[[example]] +name = "counter_sync" +required-features = ["sync"] + [[example]] name = "eviction_listener_sync" required-features = ["sync"] @@ -125,3 +149,11 @@ required-features = ["sync"] [[example]] name = "size_aware_eviction_sync" required-features = ["sync"] + +[[example]] +name = "try_append_value_async" +required-features = ["future"] + +[[example]] +name = "try_append_value_sync" +required-features = ["sync"] diff --git a/README.md b/README.md index 12798b80..1502097e 100644 --- a/README.md +++ b/README.md @@ -514,24 +514,23 @@ $ cargo +nightly -Z unstable-options --config 'build.rustdocflags="--cfg docsrs" doc --no-deps --features 'future, sync' ``` -## Road Map +## Roadmap - [x] Size-aware eviction. (`v0.7.0` via [#24][gh-pull-024]) -- [x] API stabilization. (Smaller core cache API, shorter names for frequently - used methods) (`v0.8.0` via [#105][gh-pull-105]) +- [x] API stabilization. (Smaller core API, shorter names for frequently used + methods) (`v0.8.0` via [#105][gh-pull-105]) - e.g. - `get_or_insert_with(K, F)` → `get_with(K, F)` - `get_or_try_insert_with(K, F)` → `try_get_with(K, F)` - - `blocking_insert(K, V)` → `blocking().insert(K, V)` - `time_to_live()` → `policy().time_to_live()` - [x] Notifications on eviction. (`v0.9.0` via [#145][gh-pull-145]) - [x] Variable (per-entry) expiration, using hierarchical timer wheels. (`v0.11.0` via [#248][gh-pull-248]) -- [ ] Cache statistics (Hit rate, etc.). ([details][cache-stats]) - [x] Remove background threads. (`v0.12.0` via [#294][gh-pull-294] and [#316][gh-pull-316]) +- [x] Add upsert and compute methods. (`v0.12.3` via [#370][gh-pull-370]) +- [ ] Cache statistics (Hit rate, etc.). ([details][cache-stats]) - [ ] Restore cache from a snapshot. ([details][restore]) -- [ ] `and_compute` method. ([details][and-compute]) - [ ] Upgrade TinyLFU to Window-TinyLFU. ([details][tiny-lfu]) [gh-pull-024]: https://github.com/moka-rs/moka/pull/24 @@ -540,8 +539,8 @@ $ cargo +nightly -Z unstable-options --config 'build.rustdocflags="--cfg docsrs" [gh-pull-248]: https://github.com/moka-rs/moka/pull/248 [gh-pull-294]: https://github.com/moka-rs/moka/pull/294 [gh-pull-316]: https://github.com/moka-rs/moka/pull/316 +[gh-pull-370]: https://github.com/moka-rs/moka/pull/370 -[and-compute]: https://github.com/moka-rs/moka/issues/227 [cache-stats]: https://github.com/moka-rs/moka/issues/234 [restore]: https://github.com/moka-rs/moka/issues/314 diff --git a/examples/README.md b/examples/README.md index b09370bb..591248f5 100644 --- a/examples/README.md +++ b/examples/README.md @@ -17,29 +17,54 @@ Each example has a suffix `_async` or `_sync`: ## Basics of the Cache API - [basics_async](./basics_async.rs) and [basics_sync](./basics_sync.rs) - - Sharing a cache between async tasks or OS threads. + - Shares a cache between async tasks or OS threads. - Do not wrap a `Cache` with `Arc>`! Just clone the `Cache` and you are all set. - - `insert`, `get` and `invalidate` methods. + - Uses `insert`, `get` and `invalidate` methods. - [size_aware_eviction_sync](./size_aware_eviction_sync.rs) - - Configuring the max capacity of the cache based on the total size of the cached + - Configures the max capacity of the cache based on the total size of the cached entries. +## The `Entry` API + +Atomically inserts, updates and removes an entry from the cache depending on the +existence of the entry. + +- [counter_async](./counter_async.rs) and [counter_sync](./counter_sync.rs) + - Atomically increments a cached `u64` by 1. If the entry does not exist, inserts + a new entry with the value 1. + - Uses `and_upsert_with` method. +- [bounded_counter_async](./bounded_counter_async.rs) and + [bounded_counter_sync](./bounded_counter_sync.rs) + - Same as above except removing the entry when the value is 2. + - `and_compute_with` method. +- [append_value_async](./append_value_async.rs) and + [append_value_sync](./append_value_sync.rs) + - Atomically appends an `i32` to a cached `Arc>>`. If the entry + does not exist, inserts a new entry. + - Uses `and_upsert_with` method. +- [try_append_value_async](./try_append_value_async.rs) and + [try_append_value_sync](./try_append_value_sync.rs) + - Atomically reads an `char` from a reader and appends it to a cached `Arc>`, + but reading may fail by an early EOF. + - Uses `and_try_compute_with` method. + ## Expiration and Eviction Listener - [eviction_listener_sync](./eviction_listener_sync.rs) - - Setting the `time_to_live` expiration policy. - - Registering a listener (closure) to be notified when an entry is evicted from - the cache. - - `insert`, `invalidate` and `invalidate_all` methods. - - Demonstrating when the expired entries will be actually evicted from the cache, + - Configures the `time_to_live` expiration policy. + - Registers a listener (closure) to be notified when an entry is evicted from the + cache. + - Uses `insert`, `invalidate`, `invalidate_all` and `run_pending_tasks` methods. + - Demonstrates when the expired entries will be actually evicted from the cache, and why the `run_pending_tasks` method could be important in some cases. - [cascading_drop_async](./cascading_drop_async.rs) - - Controlling the lifetime of the objects in a separate `BTreeMap` collection - from the cache using an eviction listener. - - `BTreeMap`, `Arc` and mpsc channel (multi-producer, single consumer channel). + - Controls the lifetime of the objects in a separate `BTreeMap` collection from + the cache using an eviction listener. + - Beside the cache APIs, uses `BTreeMap`, `Arc` and mpsc channel (multi-producer, + single consumer channel). ## Check out the API Documentation too! diff --git a/examples/append_value_async.rs b/examples/append_value_async.rs new file mode 100644 index 00000000..b26f9984 --- /dev/null +++ b/examples/append_value_async.rs @@ -0,0 +1,67 @@ +//! This example demonstrates how to append an `i32` value to a cached `Vec` +//! value. It uses the `and_upsert_with` method of `Cache`. + +use std::sync::Arc; + +use moka::{future::Cache, Entry}; +use tokio::sync::RwLock; + +#[tokio::main] +async fn main() { + // We want to store a raw value `Vec` for each `String` key. We are going to + // append `i32` values to the `Vec` in the cache. + // + // Note that we have to wrap the `Vec` in an `Arc>`. We need the `Arc`, + // an atomic reference counted shared pointer, because `and_upsert_with` method + // of `Cache` passes a _clone_ of the value to our closure, instead of passing a + // `&mut` reference. We do not want to clone the `Vec` every time we append a + // value to it, so we wrap it in an `Arc`. Then we need the `RwLock` because we + // mutate the `Vec` when we append a value to it. + // + // The reason that `and_upsert_with` cannot pass a `&mut Vec<_>` to the closure + // is because the internal concurrent hash table of `Cache` is a lock free data + // structure and does not use any mutexes. So it cannot guarantee: (1) the `&mut + // Vec<_>` is unique, and (2) it is not accessed concurrently by other threads. + let cache: Cache>>> = Cache::new(100); + + let key = "key".to_string(); + + let entry = append_to_cached_vec(&cache, &key, 1).await; + // It was not an update. + assert!(!entry.is_old_value_replaced()); + assert!(entry.is_fresh()); + assert_eq!(*entry.into_value().read().await, &[1]); + + let entry = append_to_cached_vec(&cache, &key, 2).await; + assert!(entry.is_fresh()); + // It was an update. + assert!(entry.is_old_value_replaced()); + assert_eq!(*entry.into_value().read().await, &[1, 2]); + + let entry = append_to_cached_vec(&cache, &key, 3).await; + assert!(entry.is_fresh()); + assert!(entry.is_old_value_replaced()); + assert_eq!(*entry.into_value().read().await, &[1, 2, 3]); +} + +async fn append_to_cached_vec( + cache: &Cache>>>, + key: &str, + value: i32, +) -> Entry>>> { + cache + .entry_by_ref(key) + .and_upsert_with(|maybe_entry| async { + if let Some(entry) = maybe_entry { + // The entry exists, append the value to the Vec. + let v = entry.into_value(); + v.write().await.push(value); + v + } else { + // The entry does not exist, insert a new Vec containing + // the value. + Arc::new(RwLock::new(vec![value])) + } + }) + .await +} diff --git a/examples/append_value_sync.rs b/examples/append_value_sync.rs new file mode 100644 index 00000000..23db556b --- /dev/null +++ b/examples/append_value_sync.rs @@ -0,0 +1,60 @@ +//! This example demonstrates how to append an `i32` value to a cached `Vec` +//! value. It uses the `and_upsert_with` method of `Cache`. + +use std::sync::{Arc, RwLock}; + +use moka::{sync::Cache, Entry}; + +fn main() { + // We want to store a raw value `Vec` for each `String` key. We are going to + // append `i32` values to the `Vec` in the cache. + // + // Note that we have to wrap the `Vec` in an `Arc>`. We need the `Arc`, + // an atomic reference counted shared pointer, because `and_upsert_with` method + // of `Cache` passes a _clone_ of the value to our closure, instead of passing a + // `&mut` reference. We do not want to clone the `Vec` every time we append a + // value to it, so we wrap it in an `Arc`. Then we need the `RwLock` because we + // mutate the `Vec` when we append a value to it. + // + // The reason that `and_upsert_with` cannot pass a `&mut Vec<_>` to the closure + // is because the internal concurrent hash table of `Cache` is a lock free data + // structure and does not use any mutexes. So it cannot guarantee: (1) the `&mut + // Vec<_>` is unique, and (2) it is not accessed concurrently by other threads. + let cache: Cache>>> = Cache::new(100); + + let key = "key".to_string(); + + let entry = append_to_cached_vec(&cache, &key, 1); + assert!(entry.is_fresh()); + assert!(!entry.is_old_value_replaced()); + assert_eq!(*entry.into_value().read().unwrap(), &[1]); + + let entry = append_to_cached_vec(&cache, &key, 2); + assert!(entry.is_fresh()); + assert!(entry.is_old_value_replaced()); + assert_eq!(*entry.into_value().read().unwrap(), &[1, 2]); + + let entry = append_to_cached_vec(&cache, &key, 3); + assert!(entry.is_fresh()); + assert!(entry.is_old_value_replaced()); + assert_eq!(*entry.into_value().read().unwrap(), &[1, 2, 3]); +} + +fn append_to_cached_vec( + cache: &Cache>>>, + key: &str, + value: i32, +) -> Entry>>> { + cache.entry_by_ref(key).and_upsert_with(|maybe_entry| { + if let Some(entry) = maybe_entry { + // The entry exists, append the value to the Vec. + let v = entry.into_value(); + v.write().unwrap().push(value); + v + } else { + // The entry does not exist, insert a new Vec containing + // the value. + Arc::new(RwLock::new(vec![value])) + } + }) +} diff --git a/examples/bounded_counter_async.rs b/examples/bounded_counter_async.rs new file mode 100644 index 00000000..57ace065 --- /dev/null +++ b/examples/bounded_counter_async.rs @@ -0,0 +1,80 @@ +//! This example demonstrates how to increment a cached `u64` counter. It uses the +//! `and_compute_with` method of `Cache`. + +use moka::{ + future::Cache, + ops::compute::{CompResult, Op}, +}; + +#[tokio::main] +async fn main() { + let cache: Cache = Cache::new(100); + let key = "key".to_string(); + + // This should insert a new counter value 1 to the cache, and return the value + // with the kind of the operation performed. + let result = inclement_or_remove_counter(&cache, &key).await; + let CompResult::Inserted(entry) = result else { + panic!("`Inserted` should be returned: {result:?}"); + }; + assert_eq!(entry.into_value(), 1); + + // This should increment the cached counter value by 1. + let result = inclement_or_remove_counter(&cache, &key).await; + let CompResult::ReplacedWith(entry) = result else { + panic!("`ReplacedWith` should be returned: {result:?}"); + }; + assert_eq!(entry.into_value(), 2); + + // This should remove the cached counter from the cache, and returns the + // _removed_ value. + let result = inclement_or_remove_counter(&cache, &key).await; + let CompResult::Removed(entry) = result else { + panic!("`Removed` should be returned: {result:?}"); + }; + assert_eq!(entry.into_value(), 2); + + // The key should not exist. + assert!(!cache.contains_key(&key)); + + // This should start over; insert a new counter value 1 to the cache. + let result = inclement_or_remove_counter(&cache, &key).await; + let CompResult::Inserted(entry) = result else { + panic!("`Inserted` should be returned: {result:?}"); + }; + assert_eq!(entry.into_value(), 1); +} + +/// Increment a cached `u64` counter. If the counter is greater than or equal to 2, +/// remove it. +/// +/// This method uses cache's `and_compute_with` method. +async fn inclement_or_remove_counter( + cache: &Cache, + key: &str, +) -> CompResult { + // - If the counter does not exist, insert a new value of 1. + // - If the counter is less than 2, increment it by 1. + // - If the counter is greater than or equal to 2, remove it. + cache + .entry_by_ref(key) + .and_compute_with(|maybe_entry| { + let op = if let Some(entry) = maybe_entry { + // The entry exists. + let counter = entry.into_value(); + if counter < 2 { + // Increment the counter by 1. + Op::Put(counter.saturating_add(1)) + } else { + // Remove the entry. + Op::Remove + } + } else { + // The entry does not exist, insert a new value of 1. + Op::Put(1) + }; + // Return a Future that is resolved to `op` immediately. + std::future::ready(op) + }) + .await +} diff --git a/examples/bounded_counter_sync.rs b/examples/bounded_counter_sync.rs new file mode 100644 index 00000000..d067a54f --- /dev/null +++ b/examples/bounded_counter_sync.rs @@ -0,0 +1,71 @@ +//! This example demonstrates how to increment a cached `u64` counter. It uses the +//! `and_compute_with` method of `Cache`. + +use moka::{ + ops::compute::{CompResult, Op}, + sync::Cache, +}; + +fn main() { + let cache: Cache = Cache::new(100); + let key = "key".to_string(); + + // This should insert a new counter value 1 to the cache, and return the value + // with the kind of the operation performed. + let result = inclement_or_remove_counter(&cache, &key); + let CompResult::Inserted(entry) = result else { + panic!("`Inserted` should be returned: {result:?}"); + }; + assert_eq!(entry.into_value(), 1); + + // This should increment the cached counter value by 1. + let result = inclement_or_remove_counter(&cache, &key); + let CompResult::ReplacedWith(entry) = result else { + panic!("`ReplacedWith` should be returned: {result:?}"); + }; + assert_eq!(entry.into_value(), 2); + + // This should remove the cached counter from the cache, and returns the + // _removed_ value. + let result = inclement_or_remove_counter(&cache, &key); + let CompResult::Removed(entry) = result else { + panic!("`Removed` should be returned: {result:?}"); + }; + assert_eq!(entry.into_value(), 2); + + // The key should no longer exist. + assert!(!cache.contains_key(&key)); + + // This should start over; insert a new counter value 1 to the cache. + let result = inclement_or_remove_counter(&cache, &key); + let CompResult::Inserted(entry) = result else { + panic!("`Inserted` should be returned: {result:?}"); + }; + assert_eq!(entry.into_value(), 1); +} + +/// Increment a cached `u64` counter. If the counter is greater than or equal to 2, +/// remove it. +/// +/// This method uses cache's `and_compute_with` method. +fn inclement_or_remove_counter(cache: &Cache, key: &str) -> CompResult { + // - If the counter does not exist, insert a new value of 1. + // - If the counter is less than 2, increment it by 1. + // - If the counter is greater than or equal to 2, remove it. + cache.entry_by_ref(key).and_compute_with(|maybe_entry| { + if let Some(entry) = maybe_entry { + // The entry exists. + let counter = entry.into_value(); + if counter < 2 { + // Increment the counter by 1. + Op::Put(counter.saturating_add(1)) + } else { + // Remove the entry. + Op::Remove + } + } else { + // The entry does not exist, insert a new value of 1. + Op::Put(1) + } + }) +} diff --git a/examples/counter_async.rs b/examples/counter_async.rs new file mode 100644 index 00000000..32e53495 --- /dev/null +++ b/examples/counter_async.rs @@ -0,0 +1,42 @@ +//! This example demonstrates how to increment a cached `u64` counter. It uses the +//! `and_upsert_with` method of `Cache`. + +use moka::{future::Cache, Entry}; + +#[tokio::main] +async fn main() { + let cache: Cache = Cache::new(100); + let key = "key".to_string(); + + let entry = increment_counter(&cache, &key).await; + assert!(entry.is_fresh()); + assert!(!entry.is_old_value_replaced()); + assert_eq!(entry.into_value(), 1); + + let entry = increment_counter(&cache, &key).await; + assert!(entry.is_fresh()); + assert!(entry.is_old_value_replaced()); + assert_eq!(entry.into_value(), 2); + + let entry = increment_counter(&cache, &key).await; + assert!(entry.is_fresh()); + assert!(entry.is_old_value_replaced()); + assert_eq!(entry.into_value(), 3); +} + +async fn increment_counter(cache: &Cache, key: &str) -> Entry { + cache + .entry_by_ref(key) + .and_upsert_with(|maybe_entry| { + let counter = if let Some(entry) = maybe_entry { + // The entry exists, increment the value by 1. + entry.into_value().saturating_add(1) + } else { + // The entry does not exist, insert a new value of 1. + 1 + }; + // Return a Future that is resolved to `counter` immediately. + std::future::ready(counter) + }) + .await +} diff --git a/examples/counter_sync.rs b/examples/counter_sync.rs new file mode 100644 index 00000000..5508875d --- /dev/null +++ b/examples/counter_sync.rs @@ -0,0 +1,36 @@ +//! This example demonstrates how to increment a cached `u64` counter. It uses the +//! `and_upsert_with` method of `Cache`. + +use moka::{sync::Cache, Entry}; + +fn main() { + let cache: Cache = Cache::new(100); + let key = "key".to_string(); + + let entry = increment_counter(&cache, &key); + assert!(entry.is_fresh()); + assert!(!entry.is_old_value_replaced()); + assert_eq!(entry.into_value(), 1); + + let entry = increment_counter(&cache, &key); + assert!(entry.is_fresh()); + assert!(entry.is_old_value_replaced()); + assert_eq!(entry.into_value(), 2); + + let entry = increment_counter(&cache, &key); + assert!(entry.is_fresh()); + assert!(entry.is_old_value_replaced()); + assert_eq!(entry.into_value(), 3); +} + +fn increment_counter(cache: &Cache, key: &str) -> Entry { + cache.entry_by_ref(key).and_upsert_with(|maybe_entry| { + if let Some(entry) = maybe_entry { + // The entry exists, increment the value by 1. + entry.into_value().saturating_add(1) + } else { + // The entry does not exist, insert a new value of 1. + 1 + } + }) +} diff --git a/examples/try_append_value_async.rs b/examples/try_append_value_async.rs new file mode 100644 index 00000000..d0d94f59 --- /dev/null +++ b/examples/try_append_value_async.rs @@ -0,0 +1,111 @@ +//! This example demonstrates how to append a `char` to a cached `Vec` value. +//! It uses the `and_upsert_with` method of `Cache`. + +use std::{io::Cursor, pin::Pin, sync::Arc}; + +use moka::{ + future::Cache, + ops::compute::{CompResult, Op}, +}; +use tokio::{ + io::{AsyncRead, AsyncReadExt}, + sync::RwLock, +}; + +/// The type of the cache key. +type Key = i32; + +/// The type of the cache value. +/// +/// We want to store a raw value `String` for each `i32` key. We are going to append +/// a `char` to the `String` value in the cache. +/// +/// Note that we have to wrap the `String` in an `Arc>`. We need the `Arc`, +/// an atomic reference counted shared pointer, because `and_try_compute_with` method +/// of `Cache` passes a _clone_ of the value to our closure, instead of passing a +/// `&mut` reference. We do not want to clone the `String` every time we append a +/// `char` to it, so we wrap it in an `Arc`. Then we need the `RwLock` because we +/// mutate the `String` when we append a value to it. +/// +/// The reason that `and_try_compute_with` cannot pass a `&mut String` to the closure +/// is because the internal concurrent hash table of `Cache` is a lock free data +/// structure and does not use any mutexes. So it cannot guarantee: (1) the +/// `&mut String` is unique, and (2) it is not accessed concurrently by other +/// threads. +type Value = Arc>; + +#[tokio::main] +async fn main() -> Result<(), tokio::io::Error> { + let cache: Cache = Cache::new(100); + + let key = 0; + + // We are going read a byte at a time from a byte string (`[u8; 3]`). + let reader = Cursor::new(b"abc"); + tokio::pin!(reader); + + // Read the first char 'a' from the reader, and insert a string "a" to the cache. + let result = append_to_cached_string(&cache, key, &mut reader).await?; + let CompResult::Inserted(entry) = result else { + panic!("`Inserted` should be returned: {result:?}"); + }; + assert_eq!(*entry.into_value().read().await, "a"); + + // Read next char 'b' from the reader, and append it the cached string. + let result = append_to_cached_string(&cache, key, &mut reader).await?; + let CompResult::ReplacedWith(entry) = result else { + panic!("`ReplacedWith` should be returned: {result:?}"); + }; + assert_eq!(*entry.into_value().read().await, "ab"); + + // Read next char 'c' from the reader, and append it the cached string. + let result = append_to_cached_string(&cache, key, &mut reader).await?; + let CompResult::ReplacedWith(entry) = result else { + panic!("`ReplacedWith` should be returned: {result:?}"); + }; + assert_eq!(*entry.into_value().read().await, "abc"); + + // Reading should fail as no more char left. + let err = append_to_cached_string(&cache, key, &mut reader).await; + assert_eq!( + err.expect_err("An error should be returned").kind(), + tokio::io::ErrorKind::UnexpectedEof + ); + + Ok(()) +} + +/// Reads a byte from the `reader``, convert it into a `char`, append it to the +/// cached `String` for the given `key`, and returns the resulting cached entry. +/// +/// If reading from the `reader` fails with an IO error, it returns the error. +/// +/// This method uses cache's `and_try_compute_with` method. +async fn append_to_cached_string( + cache: &Cache, + key: Key, + reader: &mut Pin<&mut impl AsyncRead>, +) -> Result, tokio::io::Error> { + cache + .entry(key) + .and_try_compute_with(|maybe_entry| async { + // Read a char from the reader. + let byte = reader.read_u8().await?; + let char = + char::from_u32(byte as u32).expect("An ASCII byte should be converted into a char"); + + // Check if the entry already exists. + if let Some(entry) = maybe_entry { + // The entry exists, append the char to the Vec. + let v = entry.into_value(); + v.write().await.push(char); + Ok(Op::Put(v)) + } else { + // The entry does not exist, insert a new Vec containing + // the char. + let v = RwLock::new(String::from(char)); + Ok(Op::Put(Arc::new(v))) + } + }) + .await +} diff --git a/examples/try_append_value_sync.rs b/examples/try_append_value_sync.rs new file mode 100644 index 00000000..81eb6be0 --- /dev/null +++ b/examples/try_append_value_sync.rs @@ -0,0 +1,113 @@ +//! This example demonstrates how to append a `char` to a cached `Vec` value. +//! It uses the `and_upsert_with` method of `Cache`. + +use std::{ + io::{self, Cursor, Read}, + sync::{Arc, RwLock}, +}; + +use moka::{ + ops::compute::{CompResult, Op}, + sync::Cache, +}; + +/// The type of the cache key. +type Key = i32; + +/// The type of the cache value. +/// +/// We want to store a raw value `String` for each `i32` key. We are going to append +/// a `char` to the `String` value in the cache. +/// +/// Note that we have to wrap the `String` in an `Arc>`. We need the `Arc`, +/// an atomic reference counted shared pointer, because `and_try_compute_with` method +/// of `Cache` passes a _clone_ of the value to our closure, instead of passing a +/// `&mut` reference. We do not want to clone the `String` every time we append a +/// `char` to it, so we wrap it in an `Arc`. Then we need the `RwLock` because we +/// mutate the `String` when we append a value to it. +/// +/// The reason that `and_try_compute_with` cannot pass a `&mut String` to the closure +/// is because the internal concurrent hash table of `Cache` is a lock free data +/// structure and does not use any mutexes. So it cannot guarantee: (1) the +/// `&mut String` is unique, and (2) it is not accessed concurrently by other +/// threads. +type Value = Arc>; + +fn main() -> Result<(), tokio::io::Error> { + let cache: Cache = Cache::new(100); + + let key = 0; + + // We are going read a byte at a time from a byte string (`[u8; 3]`). + let mut reader = Cursor::new(b"abc"); + + // Read the first char 'a' from the reader, and insert a string "a" to the cache. + let result = append_to_cached_string(&cache, key, &mut reader)?; + let CompResult::Inserted(entry) = result else { + panic!("`Inserted` should be returned: {result:?}"); + }; + assert_eq!(*entry.into_value().read().unwrap(), "a"); + + // Read next char 'b' from the reader, and append it the cached string. + let result = append_to_cached_string(&cache, key, &mut reader)?; + let CompResult::ReplacedWith(entry) = result else { + panic!("`ReplacedWith` should be returned: {result:?}"); + }; + assert_eq!(*entry.into_value().read().unwrap(), "ab"); + + // Read next char 'c' from the reader, and append it the cached string. + let result = append_to_cached_string(&cache, key, &mut reader)?; + let CompResult::ReplacedWith(entry) = result else { + panic!("`ReplacedWith` should be returned: {result:?}"); + }; + assert_eq!(*entry.into_value().read().unwrap(), "abc"); + + // Reading should fail as no more char left. + let err = append_to_cached_string(&cache, key, &mut reader); + assert_eq!( + err.expect_err("An error should be returned").kind(), + io::ErrorKind::UnexpectedEof + ); + + Ok(()) +} + +/// Reads a byte from the `reader``, convert it into a `char`, append it to the +/// cached `String` for the given `key`, and returns the resulting cached entry. +/// +/// If reading from the `reader` fails with an IO error, it returns the error. +/// +/// This method uses cache's `and_try_compute_with` method. +fn append_to_cached_string( + cache: &Cache, + key: Key, + reader: &mut impl Read, +) -> io::Result> { + cache.entry(key).and_try_compute_with(|maybe_entry| { + // Read a char from the reader. + let mut buf = [0u8]; + let len = reader.read(&mut buf)?; + if len == 0 { + // No more char left. + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "No more char left", + )); + } + let char = + char::from_u32(buf[0] as u32).expect("An ASCII byte should be converted into a char"); + + // Check if the entry already exists. + if let Some(entry) = maybe_entry { + // The entry exists, append the char to the Vec. + let v = entry.into_value(); + v.write().unwrap().push(char); + Ok(Op::Put(v)) + } else { + // The entry does not exist, insert a new Vec containing + // the char. + let v = RwLock::new(String::from(char)); + Ok(Op::Put(Arc::new(v))) + } + }) +} diff --git a/src/common/entry.rs b/src/common/entry.rs index 7a762090..a1fb820d 100644 --- a/src/common/entry.rs +++ b/src/common/entry.rs @@ -21,6 +21,7 @@ pub struct Entry { key: Option>, value: V, is_fresh: bool, + is_old_value_replaced: bool, } impl Debug for Entry @@ -33,16 +34,23 @@ where .field("key", self.key()) .field("value", &self.value) .field("is_fresh", &self.is_fresh) + .field("is_old_value_replaced", &self.is_old_value_replaced) .finish() } } impl Entry { - pub(crate) fn new(key: Option>, value: V, is_fresh: bool) -> Self { + pub(crate) fn new( + key: Option>, + value: V, + is_fresh: bool, + is_old_value_replaced: bool, + ) -> Self { Self { key, value, is_fresh, + is_old_value_replaced, } } @@ -72,4 +80,13 @@ impl Entry { pub fn is_fresh(&self) -> bool { self.is_fresh } + + /// Returns `true` if an old value existed in the cache and was replaced by the + /// value in this `Entry`. + /// + /// Note that the new value can be the same as the old value. This method still + /// returns `true` in that case. + pub fn is_old_value_replaced(&self) -> bool { + self.is_old_value_replaced + } } diff --git a/src/future.rs b/src/future.rs index d15cd4ef..654b3eb7 100644 --- a/src/future.rs +++ b/src/future.rs @@ -35,9 +35,12 @@ pub type PredicateId = String; pub(crate) type PredicateIdStr<'a> = &'a str; -// Empty struct to be used in InitResult::InitErr to represent the Option None. +// Empty struct to be used in `InitResult::InitErr` to represent the Option None. pub(crate) struct OptionallyNone; +// Empty struct to be used in `InitResult::InitErr` to represent the Compute None. +pub(crate) struct ComputeNone; + impl FutureExt for T where T: Future {} pub trait FutureExt: Future { diff --git a/src/future/base_cache.rs b/src/future/base_cache.rs index c5f5233c..41e94aac 100644 --- a/src/future/base_cache.rs +++ b/src/future/base_cache.rs @@ -326,7 +326,7 @@ where entry.set_last_accessed(now); let maybe_key = if need_key { Some(Arc::clone(k)) } else { None }; - let ent = Entry::new(maybe_key, entry.value.clone(), false); + let ent = Entry::new(maybe_key, entry.value.clone(), false, false); let maybe_op = if record_read { Some(ReadOp::Hit { value_entry: TrioArc::clone(entry), diff --git a/src/future/cache.rs b/src/future/cache.rs index 09ddc8ba..6a683b7e 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -5,7 +5,10 @@ use super::{ WriteOp, }; use crate::{ - common::concurrent::Weigher, notification::AsyncEvictionListener, policy::ExpirationPolicy, + common::concurrent::Weigher, + notification::AsyncEvictionListener, + ops::compute::{self, CompResult}, + policy::ExpirationPolicy, Entry, Policy, PredicateError, }; @@ -1063,6 +1066,7 @@ where .into_value() } + /// TODO: Remove this in v0.13.0. /// Deprecated, replaced with /// [`entry()::or_insert_with_if()`](./struct.OwnedKeyEntrySelector.html#method.or_insert_with_if) #[deprecated(since = "0.10.0", note = "Replaced with `entry().or_insert_with_if()`")] @@ -1575,9 +1579,9 @@ where { InitResult::Initialized(v) => { crossbeam_epoch::pin().flush(); - Entry::new(k, v, true) + Entry::new(k, v, true, false) } - InitResult::ReadExisting(v) => Entry::new(k, v, false), + InitResult::ReadExisting(v) => Entry::new(k, v, false, false), InitResult::InitErr(_) => unreachable!(), } } @@ -1598,7 +1602,7 @@ where let value = init(); self.insert_with_hash(Arc::clone(&key), hash, value.clone()) .await; - Entry::new(Some(key), value, true) + Entry::new(Some(key), value, true, false) } } } @@ -1624,7 +1628,7 @@ where let value = init(); self.insert_with_hash(Arc::clone(&key), hash, value.clone()) .await; - Entry::new(Some(key), value, true) + Entry::new(Some(key), value, true, false) } } } @@ -1702,9 +1706,9 @@ where { InitResult::Initialized(v) => { crossbeam_epoch::pin().flush(); - Some(Entry::new(k, v, true)) + Some(Entry::new(k, v, true, false)) } - InitResult::ReadExisting(v) => Some(Entry::new(k, v, false)), + InitResult::ReadExisting(v) => Some(Entry::new(k, v, false, false)), InitResult::InitErr(_) => None, } } @@ -1784,9 +1788,9 @@ where { InitResult::Initialized(v) => { crossbeam_epoch::pin().flush(); - Ok(Entry::new(k, v, true)) + Ok(Entry::new(k, v, true, false)) } - InitResult::ReadExisting(v) => Ok(Entry::new(k, v, false)), + InitResult::ReadExisting(v) => Ok(Entry::new(k, v, false, false)), InitResult::InitErr(e) => { crossbeam_epoch::pin().flush(); Err(e) @@ -1830,6 +1834,65 @@ where cancel_guard.clear(); } + pub(crate) async fn compute_with_hash_and_fun( + &self, + key: Arc, + hash: u64, + f: F, + ) -> compute::CompResult + where + F: FnOnce(Option>) -> Fut, + Fut: Future>, + { + let post_init = ValueInitializer::::post_init_for_compute_with; + match self + .value_initializer + .try_compute(key, hash, self, f, post_init, true) + .await + { + Ok(result) => result, + Err(_) => unreachable!(), + } + } + + pub(crate) async fn try_compute_with_hash_and_fun( + &self, + key: Arc, + hash: u64, + f: F, + ) -> Result, E> + where + F: FnOnce(Option>) -> Fut, + Fut: Future, E>>, + E: Send + Sync + 'static, + { + let post_init = ValueInitializer::::post_init_for_try_compute_with; + self.value_initializer + .try_compute(key, hash, self, f, post_init, true) + .await + } + + pub(crate) async fn upsert_with_hash_and_fun( + &self, + key: Arc, + hash: u64, + f: F, + ) -> Entry + where + F: FnOnce(Option>) -> Fut, + Fut: Future, + { + let post_init = ValueInitializer::::post_init_for_upsert_with; + match self + .value_initializer + .try_compute(key, hash, self, f, post_init, false) + .await + { + Ok(CompResult::Inserted(entry) | CompResult::ReplacedWith(entry)) => entry, + _ => unreachable!(), + } + } + async fn invalidate_with_hash(&self, key: &Q, hash: u64, need_value: bool) -> Option where K: Borrow, @@ -1966,9 +2029,20 @@ where .map(Entry::into_value) } + async fn get_entry(&self, key: &Arc, hash: u64) -> Option> { + let ignore_if = None as Option<&mut fn(&V) -> bool>; + self.base + .get_with_hash(key, hash, ignore_if, true, true) + .await + } + async fn insert(&self, key: Arc, hash: u64, value: V) { self.insert_with_hash(key.clone(), hash, value).await; } + + async fn remove(&self, key: &Arc, hash: u64) -> Option { + self.invalidate_with_hash(key, hash, true).await + } } // For unit tests. @@ -2032,6 +2106,7 @@ mod tests { common::time::Clock, future::FutureExt, notification::{ListenerFuture, RemovalCause}, + ops::compute, policy::test_utils::ExpiryCallCounters, Expiry, }; @@ -2044,6 +2119,7 @@ mod tests { Arc, }, time::{Duration, Instant as StdInstant}, + vec, }; use tokio::time::sleep; @@ -2069,6 +2145,17 @@ mod tests { is_send(cache.try_get_with_by_ref(&(), async { Err(()) })); // entry fns + is_send( + cache + .entry(()) + .and_compute_with(|_| async { compute::Op::Nop }), + ); + is_send( + cache + .entry(()) + .and_try_compute_with(|_| async { Ok(compute::Op::Nop) as Result<_, Infallible> }), + ); + is_send(cache.entry(()).and_upsert_with(|_| async {})); is_send(cache.entry(()).or_default()); is_send(cache.entry(()).or_insert(())); is_send(cache.entry(()).or_insert_with(async {})); @@ -2077,6 +2164,17 @@ mod tests { is_send(cache.entry(()).or_try_insert_with(async { Err(()) })); // entry_by_ref fns + is_send( + cache + .entry_by_ref(&()) + .and_compute_with(|_| async { compute::Op::Nop }), + ); + is_send( + cache + .entry_by_ref(&()) + .and_try_compute_with(|_| async { Ok(compute::Op::Nop) as Result<_, Infallible> }), + ); + is_send(cache.entry_by_ref(&()).and_upsert_with(|_| async {})); is_send(cache.entry_by_ref(&()).or_default()); is_send(cache.entry_by_ref(&()).or_insert(())); is_send(cache.entry_by_ref(&()).or_insert_with(async {})); @@ -4233,6 +4331,364 @@ mod tests { futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8); } + #[tokio::test] + async fn upsert_with() { + let cache = Cache::new(100); + const KEY: u32 = 0; + + // Spawn three async tasks to call `and_upsert_with` for the same key and + // each task increments the current value by 1. Ensure the key-level lock is + // working by verifying the value is 3 after all tasks finish. + // + // | | task 1 | task 2 | task 3 | + // |--------|----------|----------|----------| + // | 0 ms | get none | | | + // | 100 ms | | blocked | | + // | 200 ms | insert 1 | | | + // | | | get 1 | | + // | 300 ms | | | blocked | + // | 400 ms | | insert 2 | | + // | | | | get 2 | + // | 500 ms | | | insert 3 | + + let task1 = { + let cache1 = cache.clone(); + async move { + cache1 + .entry(KEY) + .and_upsert_with(|maybe_entry| async move { + sleep(Duration::from_millis(200)).await; + assert!(maybe_entry.is_none()); + 1 + }) + .await + } + }; + + let task2 = { + let cache2 = cache.clone(); + async move { + sleep(Duration::from_millis(100)).await; + cache2 + .entry_by_ref(&KEY) + .and_upsert_with(|maybe_entry| async move { + sleep(Duration::from_millis(200)).await; + let entry = maybe_entry.expect("The entry should exist"); + entry.into_value() + 1 + }) + .await + } + }; + + let task3 = { + let cache3 = cache.clone(); + async move { + sleep(Duration::from_millis(300)).await; + cache3 + .entry_by_ref(&KEY) + .and_upsert_with(|maybe_entry| async move { + sleep(Duration::from_millis(100)).await; + let entry = maybe_entry.expect("The entry should exist"); + entry.into_value() + 1 + }) + .await + } + }; + + let (ent1, ent2, ent3) = futures_util::join!(task1, task2, task3); + assert_eq!(ent1.into_value(), 1); + assert_eq!(ent2.into_value(), 2); + assert_eq!(ent3.into_value(), 3); + + assert_eq!(cache.get(&KEY).await, Some(3)); + } + + #[tokio::test] + async fn compute_with() { + use crate::ops::compute; + use tokio::sync::RwLock; + + let cache = Cache::new(100); + const KEY: u32 = 0; + + // Spawn six async tasks to call `and_compute_with` for the same key. Ensure + // the key-level lock is working by verifying the value after all tasks + // finish. + // + // | | task 1 | task 2 | task 3 | task 4 | task 5 | task 6 | + // |---------|------------|---------------|------------|----------|------------|---------| + // | 0 ms | get none | | | | | | + // | 100 ms | | blocked | | | | | + // | 200 ms | insert [1] | | | | | | + // | | | get [1] | | | | | + // | 300 ms | | | blocked | | | | + // | 400 ms | | insert [1, 2] | | | | | + // | | | | get [1, 2] | | | | + // | 500 ms | | | | blocked | | | + // | 600 ms | | | remove | | | | + // | | | | | get none | | | + // | 700 ms | | | | | blocked | | + // | 800 ms | | | | nop | | | + // | | | | | | get none | | + // | 900 ms | | | | | | blocked | + // | 1000 ms | | | | | insert [5] | | + // | | | | | | | get [5] | + // | 1100 ms | | | | | | nop | + + let task1 = { + let cache1 = cache.clone(); + async move { + cache1 + .entry(KEY) + .and_compute_with(|maybe_entry| async move { + sleep(Duration::from_millis(200)).await; + assert!(maybe_entry.is_none()); + compute::Op::Put(Arc::new(RwLock::new(vec![1]))) + }) + .await + } + }; + + let task2 = { + let cache2 = cache.clone(); + async move { + sleep(Duration::from_millis(100)).await; + cache2 + .entry_by_ref(&KEY) + .and_compute_with(|maybe_entry| async move { + let entry = maybe_entry.expect("The entry should exist"); + let value = entry.into_value(); + assert_eq!(*value.read().await, vec![1]); + sleep(Duration::from_millis(200)).await; + value.write().await.push(2); + compute::Op::Put(value) + }) + .await + } + }; + + let task3 = { + let cache3 = cache.clone(); + async move { + sleep(Duration::from_millis(300)).await; + cache3 + .entry(KEY) + .and_compute_with(|maybe_entry| async move { + let entry = maybe_entry.expect("The entry should exist"); + let value = entry.into_value(); + assert_eq!(*value.read().await, vec![1, 2]); + sleep(Duration::from_millis(200)).await; + compute::Op::Remove + }) + .await + } + }; + + let task4 = { + let cache4 = cache.clone(); + async move { + sleep(Duration::from_millis(500)).await; + cache4 + .entry(KEY) + .and_compute_with(|maybe_entry| async move { + assert!(maybe_entry.is_none()); + sleep(Duration::from_millis(200)).await; + compute::Op::Nop + }) + .await + } + }; + + let task5 = { + let cache5 = cache.clone(); + async move { + sleep(Duration::from_millis(700)).await; + cache5 + .entry_by_ref(&KEY) + .and_compute_with(|maybe_entry| async move { + assert!(maybe_entry.is_none()); + sleep(Duration::from_millis(200)).await; + compute::Op::Put(Arc::new(RwLock::new(vec![5]))) + }) + .await + } + }; + + let task6 = { + let cache6 = cache.clone(); + async move { + sleep(Duration::from_millis(900)).await; + cache6 + .entry_by_ref(&KEY) + .and_compute_with(|maybe_entry| async move { + let entry = maybe_entry.expect("The entry should exist"); + let value = entry.into_value(); + assert_eq!(*value.read().await, vec![5]); + sleep(Duration::from_millis(100)).await; + compute::Op::Nop + }) + .await + } + }; + + let (res1, res2, res3, res4, res5, res6) = + futures_util::join!(task1, task2, task3, task4, task5, task6); + + let compute::CompResult::Inserted(entry) = res1 else { + panic!("Expected `Inserted`. Got {res1:?}") + }; + assert_eq!( + *entry.into_value().read().await, + vec![1, 2] // The same Vec was modified by task2. + ); + + let compute::CompResult::ReplacedWith(entry) = res2 else { + panic!("Expected `ReplacedWith`. Got {res2:?}") + }; + assert_eq!(*entry.into_value().read().await, vec![1, 2]); + + let compute::CompResult::Removed(entry) = res3 else { + panic!("Expected `Removed`. Got {res3:?}") + }; + assert_eq!(*entry.into_value().read().await, vec![1, 2]); + + let compute::CompResult::StillNone(key) = res4 else { + panic!("Expected `StillNone`. Got {res4:?}") + }; + assert_eq!(*key, KEY); + + let compute::CompResult::Inserted(entry) = res5 else { + panic!("Expected `Inserted`. Got {res5:?}") + }; + assert_eq!(*entry.into_value().read().await, vec![5]); + + let compute::CompResult::Unchanged(entry) = res6 else { + panic!("Expected `Unchanged`. Got {res6:?}") + }; + assert_eq!(*entry.into_value().read().await, vec![5]); + } + + #[tokio::test] + async fn try_compute_with() { + use crate::ops::compute; + use tokio::sync::RwLock; + + let cache: Cache>>> = Cache::new(100); + const KEY: u32 = 0; + + // Spawn four async tasks to call `and_try_compute_with` for the same key. + // Ensure the key-level lock is working by verifying the value after all + // tasks finish. + // + // | | task 1 | task 2 | task 3 | task 4 | + // |---------|------------|---------------|------------|------------| + // | 0 ms | get none | | | | + // | 100 ms | | blocked | | | + // | 200 ms | insert [1] | | | | + // | | | get [1] | | | + // | 300 ms | | | blocked | | + // | 400 ms | | insert [1, 2] | | | + // | | | | get [1, 2] | | + // | 500 ms | | | | blocked | + // | 600 ms | | | err | | + // | | | | | get [1, 2] | + // | 700 ms | | | | remove | + // + // This test is shorter than `compute_with` test because this one omits `Nop` + // cases. + + let task1 = { + let cache1 = cache.clone(); + async move { + cache1 + .entry(KEY) + .and_try_compute_with(|maybe_entry| async move { + sleep(Duration::from_millis(200)).await; + assert!(maybe_entry.is_none()); + Ok(compute::Op::Put(Arc::new(RwLock::new(vec![1])))) as Result<_, ()> + }) + .await + } + }; + + let task2 = { + let cache2 = cache.clone(); + async move { + sleep(Duration::from_millis(100)).await; + cache2 + .entry_by_ref(&KEY) + .and_try_compute_with(|maybe_entry| async move { + let entry = maybe_entry.expect("The entry should exist"); + let value = entry.into_value(); + assert_eq!(*value.read().await, vec![1]); + sleep(Duration::from_millis(200)).await; + value.write().await.push(2); + Ok(compute::Op::Put(value)) as Result<_, ()> + }) + .await + } + }; + + let task3 = { + let cache3 = cache.clone(); + async move { + sleep(Duration::from_millis(300)).await; + cache3 + .entry(KEY) + .and_try_compute_with(|maybe_entry| async move { + let entry = maybe_entry.expect("The entry should exist"); + let value = entry.into_value(); + assert_eq!(*value.read().await, vec![1, 2]); + sleep(Duration::from_millis(200)).await; + Err(()) + }) + .await + } + }; + + let task4 = { + let cache4 = cache.clone(); + async move { + sleep(Duration::from_millis(500)).await; + cache4 + .entry(KEY) + .and_try_compute_with(|maybe_entry| async move { + let entry = maybe_entry.expect("The entry should exist"); + let value = entry.into_value(); + assert_eq!(*value.read().await, vec![1, 2]); + sleep(Duration::from_millis(100)).await; + Ok(compute::Op::Remove) as Result<_, ()> + }) + .await + } + }; + + let (res1, res2, res3, res4) = futures_util::join!(task1, task2, task3, task4); + + let Ok(compute::CompResult::Inserted(entry)) = res1 else { + panic!("Expected `Inserted`. Got {res1:?}") + }; + assert_eq!( + *entry.into_value().read().await, + vec![1, 2] // The same Vec was modified by task2. + ); + + let Ok(compute::CompResult::ReplacedWith(entry)) = res2 else { + panic!("Expected `ReplacedWith`. Got {res2:?}") + }; + assert_eq!(*entry.into_value().read().await, vec![1, 2]); + + assert!(res3.is_err()); + + let Ok(compute::CompResult::Removed(entry)) = res4 else { + panic!("Expected `Removed`. Got {res4:?}") + }; + assert_eq!( + *entry.into_value().read().await, + vec![1, 2] // Removed value. + ); + } + #[tokio::test] // https://github.com/moka-rs/moka/issues/43 async fn handle_panic_in_get_with() { diff --git a/src/future/entry_selector.rs b/src/future/entry_selector.rs index d4954b34..f1db19f0 100644 --- a/src/future/entry_selector.rs +++ b/src/future/entry_selector.rs @@ -1,4 +1,4 @@ -use crate::Entry; +use crate::{ops::compute, Entry}; use super::Cache; @@ -39,6 +39,296 @@ where } } + /// Performs a compute operation on a cached entry by using the given closure + /// `f`. A compute operation is either put, remove or no-operation (nop). + /// + /// The closure `f` should take the current entry of `Option>` for + /// the key, and return a `Future` that resolves to an `ops::compute::Op` + /// enum. + /// + /// This method works as the followings: + /// + /// 1. Apply the closure `f` to the current cached `Entry`, and get a `Future`. + /// 2. Resolve the `Future`, and get an `ops::compute::Op`. + /// 3. Execute the op on the cache: + /// - `Op::Put(V)`: Put the new value `V` to the cache. + /// - `Op::Remove`: Remove the current cached entry. + /// - `Op::Nop`: Do nothing. + /// 4. Return an `ops::compute::CompResult` as the followings: + /// + /// | [`Op`] | [`Entry`] already exists? | [`CompResult`] | Notes | + /// |:--------- |:--- |:--------------------------- |:------------------------------- | + /// | `Put(V)` | no | `Inserted(Entry)` | The new entry is returned. | + /// | `Put(V)` | yes | `ReplacedWith(Entry)` | The new entry is returned. | + /// | `Remove` | no | `StillNone(Arc)` | | + /// | `Remove` | yes | `Removed(Entry)` | The removed entry is returned. | + /// | `Nop` | no | `StillNone(Arc)` | | + /// | `Nop` | yes | `Unchanged(Entry)` | The existing entry is returned. | + /// + /// # See Also + /// + /// - If you want the `Future` resolve to `Result>` instead of `Op`, and + /// modify entry only when resolved to `Ok(V)`, use the + /// [`and_try_compute_with`] method. + /// - If you only want to update or insert, use the [`and_upsert_with`] method. + /// + /// [`Entry`]: ../struct.Entry.html + /// [`Op`]: ../ops/compute/enum.Op.html + /// [`CompResult`]: ../ops/compute/enum.CompResult.html + /// [`and_upsert_with`]: #method.and_upsert_with + /// [`and_try_compute_with`]: #method.and_try_compute_with + /// + /// # Example + /// + /// ```rust + /// // Cargo.toml + /// // + /// // [dependencies] + /// // moka = { version = "0.12.3", features = ["future"] } + /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } + /// + /// use moka::{ + /// future::Cache, + /// ops::compute::{CompResult, Op}, + /// }; + /// + /// #[tokio::main] + /// async fn main() { + /// let cache: Cache = Cache::new(100); + /// let key = "key1".to_string(); + /// + /// /// Increment a cached `u64` counter. If the counter is greater than or + /// /// equal to 2, remove it. + /// async fn inclement_or_remove_counter( + /// cache: &Cache, + /// key: &str, + /// ) -> CompResult { + /// cache + /// .entry(key.to_string()) + /// .and_compute_with(|maybe_entry| { + /// let op = if let Some(entry) = maybe_entry { + /// let counter = entry.into_value(); + /// if counter < 2 { + /// Op::Put(counter.saturating_add(1)) // Update + /// } else { + /// Op::Remove + /// } + /// } else { + /// Op::Put(1) // Insert + /// }; + /// // Return a Future that is resolved to `op` immediately. + /// std::future::ready(op) + /// }) + /// .await + /// } + /// + /// // This should insert a new counter value 1 to the cache, and return the + /// // value with the kind of the operation performed. + /// let result = inclement_or_remove_counter(&cache, &key).await; + /// let CompResult::Inserted(entry) = result else { + /// panic!("`Inserted` should be returned: {result:?}"); + /// }; + /// assert_eq!(entry.into_value(), 1); + /// + /// // This should increment the cached counter value by 1. + /// let result = inclement_or_remove_counter(&cache, &key).await; + /// let CompResult::ReplacedWith(entry) = result else { + /// panic!("`ReplacedWith` should be returned: {result:?}"); + /// }; + /// assert_eq!(entry.into_value(), 2); + /// + /// // This should remove the cached counter from the cache, and returns the + /// // _removed_ value. + /// let result = inclement_or_remove_counter(&cache, &key).await; + /// let CompResult::Removed(entry) = result else { + /// panic!("`Removed` should be returned: {result:?}"); + /// }; + /// assert_eq!(entry.into_value(), 2); + /// + /// // The key should not exist. + /// assert!(!cache.contains_key(&key)); + /// + /// // This should start over; insert a new counter value 1 to the cache. + /// let result = inclement_or_remove_counter(&cache, &key).await; + /// let CompResult::Inserted(entry) = result else { + /// panic!("`Inserted` should be returned: {result:?}"); + /// }; + /// assert_eq!(entry.into_value(), 1); + /// } + /// ``` + /// + /// # Concurrent calls on the same key + /// + /// This method guarantees that concurrent calls on the same key are executed + /// serially. That is, `and_compute_with` calls on the same key never run + /// concurrently. The calls are serialized by the order of their invocation. It + /// uses a key-level lock to achieve this. + pub async fn and_compute_with(self, f: F) -> compute::CompResult + where + F: FnOnce(Option>) -> Fut, + Fut: Future>, + { + let key = Arc::new(self.owned_key); + self.cache + .compute_with_hash_and_fun(key, self.hash, f) + .await + } + + /// Performs a compute operation on a cached entry by using the given closure + /// `f`. A compute operation is either put, remove or no-operation (nop). + /// + /// The closure `f` should take the current entry of `Option>` for + /// the key, and return a `Future` that resolves to a + /// `Result, E>`. + /// + /// This method works as the followings: + /// + /// 1. Apply the closure `f` to the current cached `Entry`, and get a `Future`. + /// 2. Resolve the `Future`, and get a `Result, E>`. + /// 3. If resolved to `Err(E)`, return it. + /// 4. Else, execute the op on the cache: + /// - `Ok(Op::Put(V))`: Put the new value `V` to the cache. + /// - `Ok(Op::Remove)`: Remove the current cached entry. + /// - `Ok(Op::Nop)`: Do nothing. + /// 5. Return an `Ok(ops::compute::CompResult)` as the followings: + /// + /// | [`Op`] | [`Entry`] already exists? | [`CompResult`] | Notes | + /// |:--------- |:--- |:--------------------------- |:------------------------------- | + /// | `Put(V)` | no | `Inserted(Entry)` | The new entry is returned. | + /// | `Put(V)` | yes | `ReplacedWith(Entry)` | The new entry is returned. | + /// | `Remove` | no | `StillNone(Arc)` | | + /// | `Remove` | yes | `Removed(Entry)` | The removed entry is returned. | + /// | `Nop` | no | `StillNone(Arc)` | | + /// | `Nop` | yes | `Unchanged(Entry)` | The existing entry is returned. | + /// + /// # See Also + /// + /// - If you want the `Future` resolve to `Op` instead of `Result>`, use + /// the [`and_compute_with`] method. + /// - If you only want to update or insert, use the [`and_upsert_with`] method. + /// + /// [`Entry`]: ../struct.Entry.html + /// [`Op`]: ../ops/compute/enum.Op.html + /// [`CompResult`]: ../ops/compute/enum.CompResult.html + /// [`and_upsert_with`]: #method.and_upsert_with + /// [`and_compute_with`]: #method.and_compute_with + /// + /// # Example + /// + /// See [`try_append_value_async.rs`] in the `examples` directory. + /// + /// [`try_append_value_async.rs`]: + /// https://github.com/moka-rs/moka/tree/main/examples/try_append_value_async.rs + /// + /// # Concurrent calls on the same key + /// + /// This method guarantees that concurrent calls on the same key are executed + /// serially. That is, `and_try_compute_with` calls on the same key never run + /// concurrently. The calls are serialized by the order of their invocation. It + /// uses a key-level lock to achieve this. + pub async fn and_try_compute_with(self, f: F) -> Result, E> + where + F: FnOnce(Option>) -> Fut, + Fut: Future, E>>, + E: Send + Sync + 'static, + { + let key = Arc::new(self.owned_key); + self.cache + .try_compute_with_hash_and_fun(key, self.hash, f) + .await + } + + /// Performs an upsert of an [`Entry`] by using the given closure `f`. The word + /// "upsert" here means "update" or "insert". + /// + /// The closure `f` should take the current entry of `Option>` for + /// the key, and return a `Future` that resolves to a new value `V`. + /// + /// This method works as the followings: + /// + /// 1. Apply the closure `f` to the current cached `Entry`, and get a `Future`. + /// 2. Resolve the `Future`, and get a new value `V`. + /// 3. Upsert the new value to the cache. + /// 4. Return the `Entry` having the upserted value. + /// + /// # See Also + /// + /// - If you want to optionally upsert, that is to upsert only when certain + /// conditions meet, use the [`and_compute_with`] method. + /// - If you try to upsert, that is to make the `Future` resolve to `Result` + /// instead of `V`, and upsert only when resolved to `Ok(V)`, use the + /// [`and_try_compute_with`] method. + /// + /// [`Entry`]: ../struct.Entry.html + /// [`and_compute_with`]: #method.and_compute_with + /// [`and_try_compute_with`]: #method.and_try_compute_with + /// + /// # Example + /// + /// ```rust + /// // Cargo.toml + /// // + /// // [dependencies] + /// // moka = { version = "0.12.3", features = ["future"] } + /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } + /// + /// use moka::future::Cache; + /// + /// #[tokio::main] + /// async fn main() { + /// let cache: Cache = Cache::new(100); + /// let key = "key1".to_string(); + /// + /// let entry = cache + /// .entry(key.clone()) + /// .and_upsert_with(|maybe_entry| { + /// let counter = if let Some(entry) = maybe_entry { + /// entry.into_value().saturating_add(1) // Update + /// } else { + /// 1 // Insert + /// }; + /// // Return a Future that is resolved to `counter` immediately. + /// std::future::ready(counter) + /// }) + /// .await; + /// // It was not an update. + /// assert!(!entry.is_old_value_replaced()); + /// assert_eq!(entry.key(), &key); + /// assert_eq!(entry.into_value(), 1); + /// + /// let entry = cache + /// .entry(key.clone()) + /// .and_upsert_with(|maybe_entry| { + /// let counter = if let Some(entry) = maybe_entry { + /// entry.into_value().saturating_add(1) + /// } else { + /// 1 + /// }; + /// std::future::ready(counter) + /// }) + /// .await; + /// // It was an update. + /// assert!(entry.is_old_value_replaced()); + /// assert_eq!(entry.key(), &key); + /// assert_eq!(entry.into_value(), 2); + /// } + /// ``` + /// + /// # Concurrent calls on the same key + /// + /// This method guarantees that concurrent calls on the same key are executed + /// serially. That is, `and_upsert_with` calls on the same key never run + /// concurrently. The calls are serialized by the order of their invocation. It + /// uses a key-level lock to achieve this. + pub async fn and_upsert_with(self, f: F) -> Entry + where + F: FnOnce(Option>) -> Fut, + Fut: Future, + { + let key = Arc::new(self.owned_key); + self.cache.upsert_with_hash_and_fun(key, self.hash, f).await + } + /// Returns the corresponding [`Entry`] for the key given when this entry /// selector was constructed. If the entry does not exist, inserts one by calling /// the [`default`][std-default-function] function of the value type `V`. @@ -389,6 +679,296 @@ where } } + /// Performs a compute operation on a cached entry by using the given closure + /// `f`. A compute operation is either put, remove or no-operation (nop). + /// + /// The closure `f` should take the current entry of `Option>` for + /// the key, and return a `Future` that resolves to an `ops::compute::Op` + /// enum. + /// + /// This method works as the followings: + /// + /// 1. Apply the closure `f` to the current cached `Entry`, and get a `Future`. + /// 2. Resolve the `Future`, and get an `ops::compute::Op`. + /// 3. Execute the op on the cache: + /// - `Op::Put(V)`: Put the new value `V` to the cache. + /// - `Op::Remove`: Remove the current cached entry. + /// - `Op::Nop`: Do nothing. + /// 4. Return an `ops::compute::CompResult` as the followings: + /// + /// | [`Op`] | [`Entry`] already exists? | [`CompResult`] | Notes | + /// |:--------- |:--- |:--------------------------- |:------------------------------- | + /// | `Put(V)` | no | `Inserted(Entry)` | The new entry is returned. | + /// | `Put(V)` | yes | `ReplacedWith(Entry)` | The new entry is returned. | + /// | `Remove` | no | `StillNone(Arc)` | | + /// | `Remove` | yes | `Removed(Entry)` | The removed entry is returned. | + /// | `Nop` | no | `StillNone(Arc)` | | + /// | `Nop` | yes | `Unchanged(Entry)` | The existing entry is returned. | + /// + /// # See Also + /// + /// - If you want the `Future` resolve to `Result>` instead of `Op`, and + /// modify entry only when resolved to `Ok(V)`, use the + /// [`and_try_compute_with`] method. + /// - If you only want to update or insert, use the [`and_upsert_with`] method. + /// + /// [`Entry`]: ../struct.Entry.html + /// [`Op`]: ../ops/compute/enum.Op.html + /// [`CompResult`]: ../ops/compute/enum.CompResult.html + /// [`and_upsert_with`]: #method.and_upsert_with + /// [`and_try_compute_with`]: #method.and_try_compute_with + /// + /// # Example + /// + /// ```rust + /// // Cargo.toml + /// // + /// // [dependencies] + /// // moka = { version = "0.12.3", features = ["future"] } + /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } + /// + /// use moka::{ + /// future::Cache, + /// ops::compute::{CompResult, Op}, + /// }; + /// + /// #[tokio::main] + /// async fn main() { + /// let cache: Cache = Cache::new(100); + /// let key = "key1"; + /// + /// /// Increment a cached `u64` counter. If the counter is greater than or + /// /// equal to 2, remove it. + /// async fn inclement_or_remove_counter( + /// cache: &Cache, + /// key: &str, + /// ) -> CompResult { + /// cache + /// .entry_by_ref(key) + /// .and_compute_with(|maybe_entry| { + /// let op = if let Some(entry) = maybe_entry { + /// let counter = entry.into_value(); + /// if counter < 2 { + /// Op::Put(counter.saturating_add(1)) // Update + /// } else { + /// Op::Remove + /// } + /// } else { + /// Op::Put(1) // Insert + /// }; + /// // Return a Future that is resolved to `op` immediately. + /// std::future::ready(op) + /// }) + /// .await + /// } + /// + /// // This should insert a now counter value 1 to the cache, and return the + /// // value with the kind of the operation performed. + /// let result = inclement_or_remove_counter(&cache, &key).await; + /// let CompResult::Inserted(entry) = result else { + /// panic!("`Inserted` should be returned: {result:?}"); + /// }; + /// assert_eq!(entry.into_value(), 1); + /// + /// // This should increment the cached counter value by 1. + /// let result = inclement_or_remove_counter(&cache, &key).await; + /// let CompResult::ReplacedWith(entry) = result else { + /// panic!("`ReplacedWith` should be returned: {result:?}"); + /// }; + /// assert_eq!(entry.into_value(), 2); + /// + /// // This should remove the cached counter from the cache, and returns the + /// // _removed_ value. + /// let result = inclement_or_remove_counter(&cache, &key).await; + /// let CompResult::Removed(entry) = result else { + /// panic!("`Removed` should be returned: {result:?}"); + /// }; + /// assert_eq!(entry.into_value(), 2); + /// + /// // The key should no longer exist. + /// assert!(!cache.contains_key(key)); + /// + /// // This should start over; insert a new counter value 1 to the cache. + /// let result = inclement_or_remove_counter(&cache, &key).await; + /// let CompResult::Inserted(entry) = result else { + /// panic!("`Inserted` should be returned: {result:?}"); + /// }; + /// assert_eq!(entry.into_value(), 1); + /// } + /// ``` + /// + /// # Concurrent calls on the same key + /// + /// This method guarantees that concurrent calls on the same key are executed + /// serially. That is, `and_compute_with` calls on the same key never run + /// concurrently. The calls are serialized by the order of their invocation. It + /// uses a key-level lock to achieve this. + pub async fn and_compute_with(self, f: F) -> compute::CompResult + where + F: FnOnce(Option>) -> Fut, + Fut: Future>, + { + let key = Arc::new(self.ref_key.to_owned()); + self.cache + .compute_with_hash_and_fun(key, self.hash, f) + .await + } + + /// Performs a compute operation on a cached entry by using the given closure + /// `f`. A compute operation is either put, remove or no-operation (nop). + /// + /// The closure `f` should take the current entry of `Option>` for + /// the key, and return a `Future` that resolves to a + /// `Result, E>`. + /// + /// This method works as the followings: + /// + /// 1. Apply the closure `f` to the current cached `Entry`, and get a `Future`. + /// 2. Resolve the `Future`, and get a `Result, E>`. + /// 3. If resolved to `Err(E)`, return it. + /// 4. Else, execute the op on the cache: + /// - `Ok(Op::Put(V))`: Put the new value `V` to the cache. + /// - `Ok(Op::Remove)`: Remove the current cached entry. + /// - `Ok(Op::Nop)`: Do nothing. + /// 5. Return an `Ok(ops::compute::CompResult)` as the followings: + /// + /// | [`Op`] | [`Entry`] already exists? | [`CompResult`] | Notes | + /// |:--------- |:--- |:--------------------------- |:------------------------------- | + /// | `Put(V)` | no | `Inserted(Entry)` | The new entry is returned. | + /// | `Put(V)` | yes | `ReplacedWith(Entry)` | The new entry is returned. | + /// | `Remove` | no | `StillNone(Arc)` | | + /// | `Remove` | yes | `Removed(Entry)` | The removed entry is returned. | + /// | `Nop` | no | `StillNone(Arc)` | | + /// | `Nop` | yes | `Unchanged(Entry)` | The existing entry is returned. | + /// + /// # See Also + /// + /// - If you want the `Future` resolve to `Op` instead of `Result>`, use + /// the [`and_compute_with`] method. + /// - If you only want to update or insert, use the [`and_upsert_with`] method. + /// + /// [`Entry`]: ../struct.Entry.html + /// [`Op`]: ../ops/compute/enum.Op.html + /// [`CompResult`]: ../ops/compute/enum.CompResult.html + /// [`and_upsert_with`]: #method.and_upsert_with + /// [`and_compute_with`]: #method.and_compute_with + /// + /// # Example + /// + /// See [`try_append_value_async.rs`] in the `examples` directory. + /// + /// [`try_append_value_async.rs`]: + /// https://github.com/moka-rs/moka/tree/main/examples/try_append_value_async.rs + /// + /// # Concurrent calls on the same key + /// + /// This method guarantees that concurrent calls on the same key are executed + /// serially. That is, `and_try_compute_with` calls on the same key never run + /// concurrently. The calls are serialized by the order of their invocation. It + /// uses a key-level lock to achieve this. + pub async fn and_try_compute_with(self, f: F) -> Result, E> + where + F: FnOnce(Option>) -> Fut, + Fut: Future, E>>, + E: Send + Sync + 'static, + { + let key = Arc::new(self.ref_key.to_owned()); + self.cache + .try_compute_with_hash_and_fun(key, self.hash, f) + .await + } + + /// Performs an upsert of an [`Entry`] by using the given closure `f`. The word + /// "upsert" here means "update" or "insert". + /// + /// The closure `f` should take the current entry of `Option>` for + /// the key, and return a `Future` that resolves to a new value `V`. + /// + /// This method works as the followings: + /// + /// 1. Apply the closure `f` to the current cached `Entry`, and get a `Future`. + /// 2. Resolve the `Future`, and get a new value `V`. + /// 3. Upsert the new value to the cache. + /// 4. Return the `Entry` having the upserted value. + /// + /// # See Also + /// + /// - If you want to optionally upsert, that is to upsert only when certain + /// conditions meet, use the [`and_compute_with`] method. + /// - If you try to upsert, that is to make the `Future` resolve to `Result` + /// instead of `V`, and upsert only when resolved to `Ok(V)`, use the + /// [`and_try_compute_with`] method. + /// + /// [`Entry`]: ../struct.Entry.html + /// [`and_compute_with`]: #method.and_compute_with + /// [`and_try_compute_with`]: #method.and_try_compute_with + /// + /// # Example + /// + /// ```rust + /// // Cargo.toml + /// // + /// // [dependencies] + /// // moka = { version = "0.12.3", features = ["future"] } + /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } + /// + /// use moka::future::Cache; + /// + /// #[tokio::main] + /// async fn main() { + /// let cache: Cache = Cache::new(100); + /// let key = "key1"; + /// + /// let entry = cache + /// .entry_by_ref(key) + /// .and_upsert_with(|maybe_entry| { + /// let counter = if let Some(entry) = maybe_entry { + /// entry.into_value().saturating_add(1) // Update + /// } else { + /// 1 // Insert + /// }; + /// // Return a Future that is resolved to `counter` immediately. + /// std::future::ready(counter) + /// }) + /// .await; + /// // It was not an update. + /// assert!(!entry.is_old_value_replaced()); + /// assert_eq!(entry.key(), &key); + /// assert_eq!(entry.into_value(), 1); + /// + /// let entry = cache + /// .entry_by_ref(key) + /// .and_upsert_with(|maybe_entry| { + /// let counter = if let Some(entry) = maybe_entry { + /// entry.into_value().saturating_add(1) + /// } else { + /// 1 + /// }; + /// std::future::ready(counter) + /// }) + /// .await; + /// // It was an update. + /// assert!(entry.is_old_value_replaced()); + /// assert_eq!(entry.key(), &key); + /// assert_eq!(entry.into_value(), 2); + /// } + /// ``` + /// + /// # Concurrent calls on the same key + /// + /// This method guarantees that concurrent calls on the same key are executed + /// serially. That is, `and_upsert_with` calls on the same key never run + /// concurrently. The calls are serialized by the order of their invocation. It + /// uses a key-level lock to achieve this. + pub async fn and_upsert_with(self, f: F) -> Entry + where + F: FnOnce(Option>) -> Fut, + Fut: Future, + { + let key = Arc::new(self.ref_key.to_owned()); + self.cache.upsert_with_hash_and_fun(key, self.hash, f).await + } + /// Returns the corresponding [`Entry`] for the reference of the key given when /// this entry selector was constructed. If the entry does not exist, inserts one /// by cloning the key and calling the [`default`][std-default-function] function diff --git a/src/future/value_initializer.rs b/src/future/value_initializer.rs index 45a6f0b2..b7392750 100644 --- a/src/future/value_initializer.rs +++ b/src/future/value_initializer.rs @@ -3,6 +3,7 @@ use async_trait::async_trait; use futures_util::FutureExt; use std::{ any::{Any, TypeId}, + fmt, future::Future, hash::{BuildHasher, Hash}, pin::Pin, @@ -10,12 +11,19 @@ use std::{ }; use triomphe::Arc as TrioArc; -use super::OptionallyNone; +use crate::{ + ops::compute::{CompResult, Op}, + Entry, +}; + +use super::{ComputeNone, OptionallyNone}; const WAITER_MAP_NUM_SEGMENTS: usize = 64; #[async_trait] pub(crate) trait GetOrInsert { + /// Gets a value for the given key without recording the access to the cache + /// policies. async fn get_without_recording( &self, key: &Arc, @@ -26,7 +34,17 @@ pub(crate) trait GetOrInsert { V: 'static, I: for<'i> FnMut(&'i V) -> bool + Send; + /// Gets an entry for the given key _with_ recording the access to the cache + /// policies. + async fn get_entry(&self, key: &Arc, hash: u64) -> Option> + where + V: 'static; + + /// Inserts a value for the given key. async fn insert(&self, key: Arc, hash: u64, value: V); + + /// Removes a value for the given key. Returns the removed value. + async fn remove(&self, key: &Arc, hash: u64) -> Option; } type ErrorObject = Arc; @@ -40,12 +58,25 @@ pub(crate) enum InitResult { enum WaiterValue { Computing, Ready(Result), + ReadyNone, // https://github.com/moka-rs/moka/issues/43 InitFuturePanicked, // https://github.com/moka-rs/moka/issues/59 EnclosingFutureAborted, } +impl fmt::Debug for WaiterValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + WaiterValue::Computing => write!(f, "Computing"), + WaiterValue::Ready(_) => write!(f, "Ready"), + WaiterValue::ReadyNone => write!(f, "ReadyNone"), + WaiterValue::InitFuturePanicked => write!(f, "InitFuturePanicked"), + WaiterValue::EnclosingFutureAborted => write!(f, "EnclosingFutureAborted"), + } + } +} + type Waiter = TrioArc>>; type WaiterMap = crate::cht::SegmentedHashMap<(Arc, TypeId), Waiter, S>; @@ -179,6 +210,7 @@ where let Some(existing_waiter) = try_insert_waiter(&self.waiters, w_key.clone(), w_hash, &waiter) else { + // Inserted. break; }; @@ -203,8 +235,8 @@ where continue; } // Unexpected state. - WaiterValue::Computing => panic!( - "Got unexpected state `Computing` after resolving `init` future. \ + s @ (WaiterValue::Computing | WaiterValue::ReadyNone) => panic!( + "Got unexpected state `{s:?}` after resolving `init` future. \ This might be a bug in Moka" ), } @@ -254,6 +286,140 @@ where // The lock will be unlocked here. } + /// # Panics + /// Panics if the `init` future has been panicked. + pub(crate) async fn try_compute<'a, C, F, Fut, O, E>( + &'a self, + c_key: Arc, + c_hash: u64, + cache: &C, + f: F, + post_init: fn(O) -> Result, E>, + allow_nop: bool, + ) -> Result, E> + where + C: GetOrInsert + Send + 'a, + F: FnOnce(Option>) -> Fut, + Fut: Future + 'a, + E: Send + Sync + 'static, + { + use std::panic::{resume_unwind, AssertUnwindSafe}; + + let type_id = TypeId::of::(); + let (w_key, w_hash) = waiter_key_hash(&self.waiters, &c_key, type_id); + let waiter = TrioArc::new(RwLock::new(WaiterValue::Computing)); + // NOTE: We have to acquire a write lock before `try_insert_waiter`, + // so that any concurrent attempt will get our lock and wait on it. + let lock = waiter.write().await; + + loop { + let Some(existing_waiter) = + try_insert_waiter(&self.waiters, w_key.clone(), w_hash, &waiter) + else { + // Inserted. + break; + }; + + // Somebody else's waiter already exists, so wait for it to finish + // (wait for it to release the write lock). + let waiter_result = existing_waiter.read().await; + match &*waiter_result { + // Unexpected state. + WaiterValue::Computing => panic!( + "Got unexpected state `Computing` after resolving `init` future. \ + This might be a bug in Moka" + ), + _ => { + // Try to insert our waiter again. + continue; + } + } + } + + // Our waiter was inserted. + + // Create a guard. This will ensure to remove our waiter when the + // enclosing future has been aborted: + // https://github.com/moka-rs/moka/issues/59 + let waiter_guard = WaiterGuard::new(w_key, w_hash, &self.waiters, lock); + + // Get the current value. + let maybe_entry = cache.get_entry(&c_key, c_hash).await; + let maybe_value = if allow_nop { + maybe_entry.as_ref().map(|ent| ent.value().clone()) + } else { + None + }; + let entry_existed = maybe_entry.is_some(); + + // Evaluate the `f` closure and get a future. Catching panic is safe here as + // we will not evaluate the closure again. + let fut = match std::panic::catch_unwind(AssertUnwindSafe(|| f(maybe_entry))) { + // Evaluated. + Ok(fut) => fut, + // Panicked. + Err(payload) => { + waiter_guard.set_waiter_value(WaiterValue::InitFuturePanicked); + resume_unwind(payload); + } + }; + + // Resolve the `fut` future. Catching panic is safe here as we will not + // resolve the future again. + let output = match AssertUnwindSafe(fut).catch_unwind().await { + // Resolved. + Ok(output) => { + waiter_guard.set_waiter_value(WaiterValue::ReadyNone); + output + } + // Panicked. + Err(payload) => { + waiter_guard.set_waiter_value(WaiterValue::InitFuturePanicked); + resume_unwind(payload); + } + }; + + match post_init(output)? { + Op::Nop => { + if let Some(value) = maybe_value { + Ok(CompResult::Unchanged(Entry::new( + Some(c_key), + value, + false, + false, + ))) + } else { + Ok(CompResult::StillNone(c_key)) + } + } + Op::Put(value) => { + cache + .insert(Arc::clone(&c_key), c_hash, value.clone()) + .await; + if entry_existed { + crossbeam_epoch::pin().flush(); + let entry = Entry::new(Some(c_key), value, true, true); + Ok(CompResult::ReplacedWith(entry)) + } else { + let entry = Entry::new(Some(c_key), value, true, false); + Ok(CompResult::Inserted(entry)) + } + } + Op::Remove => { + let maybe_prev_v = cache.remove(&c_key, c_hash).await; + if let Some(prev_v) = maybe_prev_v { + crossbeam_epoch::pin().flush(); + let entry = Entry::new(Some(c_key), prev_v, false, false); + Ok(CompResult::Removed(entry)) + } else { + Ok(CompResult::StillNone(c_key)) + } + } + } + + // The lock will be unlocked here. + } + /// The `post_init` function for the `get_with` method of cache. pub(crate) fn post_init_for_get_with(value: V) -> Result { Ok(value) @@ -275,6 +441,24 @@ where result } + /// The `post_init` function for the `and_upsert_with` method of cache. + pub(crate) fn post_init_for_upsert_with(value: V) -> Result, ()> { + Ok(Op::Put(value)) + } + + /// The `post_init` function for the `and_compute_with` method of cache. + pub(crate) fn post_init_for_compute_with(op: Op) -> Result, ()> { + Ok(op) + } + + /// The `post_init` function for the `and_try_compute_with` method of cache. + pub(crate) fn post_init_for_try_compute_with(op: Result, E>) -> Result, E> + where + E: Send + Sync + 'static, + { + op + } + /// Returns the `type_id` for `get_with` method of cache. pub(crate) fn type_id_for_get_with() -> TypeId { // NOTE: We use a regular function here instead of a const fn because TypeId diff --git a/src/lib.rs b/src/lib.rs index e05e09cb..c56f4c69 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -109,6 +109,10 @@ pub(crate) mod cht; #[cfg(any(feature = "sync", feature = "future"))] pub(crate) mod common; +#[cfg(any(feature = "sync", feature = "future"))] +#[cfg_attr(docsrs, doc(cfg(any(feature = "sync", feature = "future"))))] +pub mod ops; + #[cfg(any(feature = "sync", feature = "future"))] pub(crate) mod policy; diff --git a/src/ops.rs b/src/ops.rs new file mode 100644 index 00000000..11598330 --- /dev/null +++ b/src/ops.rs @@ -0,0 +1,71 @@ +//! Cache operations. + +/// Operations used by the `and_compute_with` and similar methods. +pub mod compute { + use std::sync::Arc; + + use crate::Entry; + + /// Instructs the `and_compute_with` and similar methods how to modify the cached + /// entry. + #[derive(Debug, Clone, PartialEq, Eq)] + pub enum Op { + /// No-operation. Do not modify the cached entry. + Nop, + /// Insert or update the value of the cached entry. + Put(V), + /// Remove the cached entry. + Remove, + } + + /// The result of the `and_compute_with` and similar methods. + #[derive(Debug)] + pub enum CompResult { + /// The entry did not exist and still does not exist. + StillNone(Arc), + /// The entry already existed and was not modified. The returned entry + /// contains the existing value. + Unchanged(Entry), + /// The entry did not exist and was inserted. The returned entry contains + /// the inserted value. + Inserted(Entry), + /// The entry already existed and its value was replaced with a new one. The + /// returned entry contains the new value (not the replaced value). + ReplacedWith(Entry), + /// The entry already existed and was removed. The returned entry contains + /// the removed value. + /// + /// Note: `StillNone` is returned instead of `Removed` if `Op::Remove` was + /// requested but the entry did not exist. + Removed(Entry), + } + + impl CompResult { + /// Returns the contained `Some(Entry)` if any. Otherwise returns `None`. + /// Consumes the `self` value. + pub fn into_entry(self) -> Option> { + match self { + CompResult::StillNone(_) => None, + CompResult::Unchanged(entry) => Some(entry), + CompResult::Inserted(entry) => Some(entry), + CompResult::ReplacedWith(entry) => Some(entry), + CompResult::Removed(entry) => Some(entry), + } + } + + /// Unwraps the contained `Entry`, consuming the `self` value. + /// + /// # Panics + /// + /// Panics if the `self` value is `StillNone`. + pub fn unwrap(self) -> Entry { + match self { + CompResult::StillNone(_) => panic!("`CompResult::unwrap` called on `StillNone`"), + CompResult::Unchanged(entry) => entry, + CompResult::Inserted(entry) => entry, + CompResult::ReplacedWith(entry) => entry, + CompResult::Removed(entry) => entry, + } + } + } +} diff --git a/src/sync.rs b/src/sync.rs index 67529062..ec3e3988 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -20,6 +20,8 @@ pub trait ConcurrentCacheExt { fn sync(&self); } -// Empty internal struct to be used in optionally_get_with to represent the None -// results. +// Empty struct to be used in `InitResult::InitErr` to represent the Option None. pub(crate) struct OptionallyNone; + +// Empty struct to be used in `InitResult::InitErr`` to represent the Compute None. +pub(crate) struct ComputeNone; diff --git a/src/sync/cache.rs b/src/sync/cache.rs index 3ab1677d..b8f430ad 100644 --- a/src/sync/cache.rs +++ b/src/sync/cache.rs @@ -1,5 +1,5 @@ use super::{ - value_initializer::{InitResult, ValueInitializer}, + value_initializer::{GetOrInsert, InitResult, ValueInitializer}, CacheBuilder, OwnedKeyEntrySelector, RefKeyEntrySelector, }; use crate::{ @@ -10,6 +10,7 @@ use crate::{ time::Instant, }, notification::EvictionListener, + ops::compute::{self, CompResult}, policy::ExpirationPolicy, sync::{Iter, PredicateId}, sync_base::{ @@ -965,6 +966,7 @@ where .into_value() } + /// TODO: Remove this in v0.13.0. /// Deprecated, replaced with /// [`entry()::or_insert_with_if()`](./struct.OwnedKeyEntrySelector.html#method.or_insert_with_if) #[deprecated(since = "0.10.0", note = "Replaced with `entry().or_insert_with_if()`")] @@ -1047,9 +1049,9 @@ where { InitResult::Initialized(v) => { crossbeam_epoch::pin().flush(); - Entry::new(k, v, true) + Entry::new(k, v, true, false) } - InitResult::ReadExisting(v) => Entry::new(k, v, false), + InitResult::ReadExisting(v) => Entry::new(k, v, false, false), InitResult::InitErr(_) => unreachable!(), } } @@ -1065,7 +1067,7 @@ where None => { let value = init(); self.insert_with_hash(Arc::clone(&key), hash, value.clone()); - Entry::new(Some(key), value, true) + Entry::new(Some(key), value, true, false) } } } @@ -1086,7 +1088,7 @@ where let key = Arc::new(key.to_owned()); let value = init(); self.insert_with_hash(Arc::clone(&key), hash, value.clone()); - Entry::new(Some(key), value, true) + Entry::new(Some(key), value, true, false) } } } @@ -1271,9 +1273,9 @@ where { InitResult::Initialized(v) => { crossbeam_epoch::pin().flush(); - Some(Entry::new(k, v, true)) + Some(Entry::new(k, v, true, false)) } - InitResult::ReadExisting(v) => Some(Entry::new(k, v, false)), + InitResult::ReadExisting(v) => Some(Entry::new(k, v, false, false)), InitResult::InitErr(_) => { crossbeam_epoch::pin().flush(); None @@ -1465,9 +1467,9 @@ where { InitResult::Initialized(v) => { crossbeam_epoch::pin().flush(); - Ok(Entry::new(k, v, true)) + Ok(Entry::new(k, v, true, false)) } - InitResult::ReadExisting(v) => Ok(Entry::new(k, v, false)), + InitResult::ReadExisting(v) => Ok(Entry::new(k, v, false, false)), InitResult::InitErr(e) => { crossbeam_epoch::pin().flush(); Err(e) @@ -1501,6 +1503,54 @@ where .expect("Failed to insert"); } + pub(crate) fn compute_with_hash_and_fun( + &self, + key: Arc, + hash: u64, + f: F, + ) -> compute::CompResult + where + F: FnOnce(Option>) -> compute::Op, + { + let post_init = ValueInitializer::::post_init_for_compute_with; + match self + .value_initializer + .try_compute(key, hash, self, f, post_init, true) + { + Ok(result) => result, + Err(_) => unreachable!(), + } + } + + pub(crate) fn try_compute_with_hash_and_fun( + &self, + key: Arc, + hash: u64, + f: F, + ) -> Result, E> + where + F: FnOnce(Option>) -> Result, E>, + E: Send + Sync + 'static, + { + let post_init = ValueInitializer::::post_init_for_try_compute_with; + self.value_initializer + .try_compute(key, hash, self, f, post_init, true) + } + + pub(crate) fn upsert_with_hash_and_fun(&self, key: Arc, hash: u64, f: F) -> Entry + where + F: FnOnce(Option>) -> V, + { + let post_init = ValueInitializer::::post_init_for_upsert_with; + match self + .value_initializer + .try_compute(key, hash, self, f, post_init, false) + { + Ok(CompResult::Inserted(entry) | CompResult::ReplacedWith(entry)) => entry, + _ => unreachable!(), + } + } + /// Discards any cached value for the key. /// /// If you need to get a the value that has been discarded, use the @@ -1792,6 +1842,27 @@ where } } +impl GetOrInsert for Cache +where + K: Hash + Eq + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, + S: BuildHasher + Clone + Send + Sync + 'static, +{ + fn get_entry(&self, key: &Arc, hash: u64) -> Option> { + let ignore_if = None as Option<&mut fn(&V) -> bool>; + self.base + .get_with_hash_and_ignore_if(key, hash, ignore_if, true) + } + + fn insert(&self, key: Arc, hash: u64, value: V) { + self.insert_with_hash(key.clone(), hash, value); + } + + fn remove(&self, key: &Arc, hash: u64) -> Option { + self.invalidate_with_hash(key, hash, true) + } +} + // For unit tests. #[cfg(test)] impl Cache @@ -3896,6 +3967,343 @@ mod tests { } } + #[test] + fn upsert_with() { + use std::thread::{sleep, spawn}; + + let cache = Cache::new(100); + const KEY: u32 = 0; + + // Spawn three threads to call `and_upsert_with` for the same key and each + // task increments the current value by 1. Ensure the key-level lock is + // working by verifying the value is 3 after all threads finish. + // + // | | thread 1 | thread 2 | thread 3 | + // |--------|----------|----------|----------| + // | 0 ms | get none | | | + // | 100 ms | | blocked | | + // | 200 ms | insert 1 | | | + // | | | get 1 | | + // | 300 ms | | | blocked | + // | 400 ms | | insert 2 | | + // | | | | get 2 | + // | 500 ms | | | insert 3 | + + let thread1 = { + let cache1 = cache.clone(); + spawn(move || { + cache1.entry(KEY).and_upsert_with(|maybe_entry| { + sleep(Duration::from_millis(200)); + assert!(maybe_entry.is_none()); + 1 + }) + }) + }; + + let thread2 = { + let cache2 = cache.clone(); + spawn(move || { + sleep(Duration::from_millis(100)); + cache2.entry_by_ref(&KEY).and_upsert_with(|maybe_entry| { + sleep(Duration::from_millis(200)); + let entry = maybe_entry.expect("The entry should exist"); + entry.into_value() + 1 + }) + }) + }; + + let thread3 = { + let cache3 = cache.clone(); + spawn(move || { + sleep(Duration::from_millis(300)); + cache3.entry_by_ref(&KEY).and_upsert_with(|maybe_entry| { + sleep(Duration::from_millis(100)); + let entry = maybe_entry.expect("The entry should exist"); + entry.into_value() + 1 + }) + }) + }; + + let ent1 = thread1.join().expect("Thread 1 should finish"); + let ent2 = thread2.join().expect("Thread 2 should finish"); + let ent3 = thread3.join().expect("Thread 3 should finish"); + assert_eq!(ent1.into_value(), 1); + assert_eq!(ent2.into_value(), 2); + assert_eq!(ent3.into_value(), 3); + + assert_eq!(cache.get(&KEY), Some(3)); + } + + #[test] + fn compute_with() { + use crate::ops::compute; + use std::{ + sync::RwLock, + thread::{sleep, spawn}, + }; + + let cache = Cache::new(100); + const KEY: u32 = 0; + + // Spawn six threads to call `and_compute_with` for the same key. Ensure the + // key-level lock is working by verifying the value after all threads finish. + // + // | | thread 1 | thread 2 | thread 3 | thread 4 | thread 5 | thread 6 | + // |---------|------------|---------------|------------|----------|------------|----------| + // | 0 ms | get none | | | | | | + // | 100 ms | | blocked | | | | | + // | 200 ms | insert [1] | | | | | | + // | | | get [1] | | | | | + // | 300 ms | | | blocked | | | | + // | 400 ms | | insert [1, 2] | | | | | + // | | | | get [1, 2] | | | | + // | 500 ms | | | | blocked | | | + // | 600 ms | | | remove | | | | + // | | | | | get none | | | + // | 700 ms | | | | | blocked | | + // | 800 ms | | | | nop | | | + // | | | | | | get none | | + // | 900 ms | | | | | | blocked | + // | 1000 ms | | | | | insert [5] | | + // | | | | | | | get [5] | + // | 1100 ms | | | | | | nop | + + let thread1 = { + let cache1 = cache.clone(); + spawn(move || { + cache1.entry(KEY).and_compute_with(|maybe_entry| { + sleep(Duration::from_millis(200)); + assert!(maybe_entry.is_none()); + compute::Op::Put(Arc::new(RwLock::new(vec![1]))) + }) + }) + }; + + let thread2 = { + let cache2 = cache.clone(); + spawn(move || { + sleep(Duration::from_millis(100)); + cache2.entry_by_ref(&KEY).and_compute_with(|maybe_entry| { + let entry = maybe_entry.expect("The entry should exist"); + let value = entry.into_value(); + assert_eq!(*value.read().unwrap(), vec![1]); + sleep(Duration::from_millis(200)); + value.write().unwrap().push(2); + compute::Op::Put(value) + }) + }) + }; + + let thread3 = { + let cache3 = cache.clone(); + spawn(move || { + sleep(Duration::from_millis(300)); + cache3.entry(KEY).and_compute_with(|maybe_entry| { + let entry = maybe_entry.expect("The entry should exist"); + let value = entry.into_value(); + assert_eq!(*value.read().unwrap(), vec![1, 2]); + sleep(Duration::from_millis(200)); + compute::Op::Remove + }) + }) + }; + + let thread4 = { + let cache4 = cache.clone(); + spawn(move || { + sleep(Duration::from_millis(500)); + cache4.entry(KEY).and_compute_with(|maybe_entry| { + assert!(maybe_entry.is_none()); + sleep(Duration::from_millis(200)); + compute::Op::Nop + }) + }) + }; + + let thread5 = { + let cache5 = cache.clone(); + spawn(move || { + sleep(Duration::from_millis(700)); + cache5.entry_by_ref(&KEY).and_compute_with(|maybe_entry| { + assert!(maybe_entry.is_none()); + sleep(Duration::from_millis(200)); + compute::Op::Put(Arc::new(RwLock::new(vec![5]))) + }) + }) + }; + + let thread6 = { + let cache6 = cache.clone(); + spawn(move || { + sleep(Duration::from_millis(900)); + cache6.entry_by_ref(&KEY).and_compute_with(|maybe_entry| { + let entry = maybe_entry.expect("The entry should exist"); + let value = entry.into_value(); + assert_eq!(*value.read().unwrap(), vec![5]); + sleep(Duration::from_millis(100)); + compute::Op::Nop + }) + }) + }; + + let res1 = thread1.join().expect("Thread 1 should finish"); + let res2 = thread2.join().expect("Thread 2 should finish"); + let res3 = thread3.join().expect("Thread 3 should finish"); + let res4 = thread4.join().expect("Thread 4 should finish"); + let res5 = thread5.join().expect("Thread 5 should finish"); + let res6 = thread6.join().expect("Thread 6 should finish"); + + let compute::CompResult::Inserted(entry) = res1 else { + panic!("Expected `Inserted`. Got {res1:?}") + }; + assert_eq!( + *entry.into_value().read().unwrap(), + vec![1, 2] // The same Vec was modified by task2. + ); + + let compute::CompResult::ReplacedWith(entry) = res2 else { + panic!("Expected `ReplacedWith`. Got {res2:?}") + }; + assert_eq!(*entry.into_value().read().unwrap(), vec![1, 2]); + + let compute::CompResult::Removed(entry) = res3 else { + panic!("Expected `Removed`. Got {res3:?}") + }; + assert_eq!(*entry.into_value().read().unwrap(), vec![1, 2]); + + let compute::CompResult::StillNone(key) = res4 else { + panic!("Expected `StillNone`. Got {res4:?}") + }; + assert_eq!(*key, KEY); + + let compute::CompResult::Inserted(entry) = res5 else { + panic!("Expected `Inserted`. Got {res5:?}") + }; + assert_eq!(*entry.into_value().read().unwrap(), vec![5]); + + let compute::CompResult::Unchanged(entry) = res6 else { + panic!("Expected `Unchanged`. Got {res6:?}") + }; + assert_eq!(*entry.into_value().read().unwrap(), vec![5]); + } + + #[test] + fn try_compute_with() { + use crate::ops::compute; + use std::{ + sync::RwLock, + thread::{sleep, spawn}, + }; + + let cache: Cache>>> = Cache::new(100); + const KEY: u32 = 0; + + // Spawn four threads to call `and_try_compute_with` for the same key. Ensure + // the key-level lock is working by verifying the value after all threads + // finish. + // + // | | thread 1 | thread 2 | thread 3 | thread 4 | + // |---------|------------|---------------|------------|------------| + // | 0 ms | get none | | | | + // | 100 ms | | blocked | | | + // | 200 ms | insert [1] | | | | + // | | | get [1] | | | + // | 300 ms | | | blocked | | + // | 400 ms | | insert [1, 2] | | | + // | | | | get [1, 2] | | + // | 500 ms | | | | blocked | + // | 600 ms | | | err | | + // | | | | | get [1, 2] | + // | 700 ms | | | | remove | + // + // This test is shorter than `compute_with` test because this one omits `Nop` + // cases. + + let thread1 = { + let cache1 = cache.clone(); + spawn(move || { + cache1.entry(KEY).and_try_compute_with(|maybe_entry| { + sleep(Duration::from_millis(200)); + assert!(maybe_entry.is_none()); + Ok(compute::Op::Put(Arc::new(RwLock::new(vec![1])))) as Result<_, ()> + }) + }) + }; + + let thread2 = { + let cache2 = cache.clone(); + spawn(move || { + sleep(Duration::from_millis(100)); + cache2 + .entry_by_ref(&KEY) + .and_try_compute_with(|maybe_entry| { + let entry = maybe_entry.expect("The entry should exist"); + let value = entry.into_value(); + assert_eq!(*value.read().unwrap(), vec![1]); + sleep(Duration::from_millis(200)); + value.write().unwrap().push(2); + Ok(compute::Op::Put(value)) as Result<_, ()> + }) + }) + }; + + let thread3 = { + let cache3 = cache.clone(); + spawn(move || { + sleep(Duration::from_millis(300)); + cache3.entry(KEY).and_try_compute_with(|maybe_entry| { + let entry = maybe_entry.expect("The entry should exist"); + let value = entry.into_value(); + assert_eq!(*value.read().unwrap(), vec![1, 2]); + sleep(Duration::from_millis(200)); + Err(()) + }) + }) + }; + + let thread4 = { + let cache4 = cache.clone(); + spawn(move || { + sleep(Duration::from_millis(500)); + cache4.entry(KEY).and_try_compute_with(|maybe_entry| { + let entry = maybe_entry.expect("The entry should exist"); + let value = entry.into_value(); + assert_eq!(*value.read().unwrap(), vec![1, 2]); + sleep(Duration::from_millis(100)); + Ok(compute::Op::Remove) as Result<_, ()> + }) + }) + }; + + let res1 = thread1.join().expect("Thread 1 should finish"); + let res2 = thread2.join().expect("Thread 2 should finish"); + let res3 = thread3.join().expect("Thread 3 should finish"); + let res4 = thread4.join().expect("Thread 4 should finish"); + + let Ok(compute::CompResult::Inserted(entry)) = res1 else { + panic!("Expected `Inserted`. Got {res1:?}") + }; + assert_eq!( + *entry.into_value().read().unwrap(), + vec![1, 2] // The same Vec was modified by task2. + ); + + let Ok(compute::CompResult::ReplacedWith(entry)) = res2 else { + panic!("Expected `ReplacedWith`. Got {res2:?}") + }; + assert_eq!(*entry.into_value().read().unwrap(), vec![1, 2]); + + assert!(res3.is_err()); + + let Ok(compute::CompResult::Removed(entry)) = res4 else { + panic!("Expected `Removed`. Got {res4:?}") + }; + assert_eq!( + *entry.into_value().read().unwrap(), + vec![1, 2] // Removed value. + ); + } + #[test] // https://github.com/moka-rs/moka/issues/43 fn handle_panic_in_get_with() { diff --git a/src/sync/entry_selector.rs b/src/sync/entry_selector.rs index 11d65363..2d156176 100644 --- a/src/sync/entry_selector.rs +++ b/src/sync/entry_selector.rs @@ -1,4 +1,4 @@ -use crate::Entry; +use crate::{ops::compute, Entry}; use super::Cache; @@ -38,6 +38,261 @@ where } } + /// Performs a compute operation on a cached entry by using the given closure + /// `f`. A compute operation is either put, remove or no-operation (nop). + /// + /// The closure `f` should take the current entry of `Option>` for + /// the key, and return an `ops::compute::Op` enum. + /// + /// This method works as the followings: + /// + /// 1. Apply the closure `f` to the current cached `Entry`, and get an + /// `ops::compute::Op`. + /// 2. Execute the op on the cache: + /// - `Op::Put(V)`: Put the new value `V` to the cache. + /// - `Op::Remove`: Remove the current cached entry. + /// - `Op::Nop`: Do nothing. + /// 3. Return an `ops::compute::CompResult` as the followings: + /// + /// | [`Op`] | [`Entry`] already exists? | [`CompResult`] | Notes | + /// |:--------- |:--- |:--------------------------- |:------------------------------- | + /// | `Put(V)` | no | `Inserted(Entry)` | The new entry is returned. | + /// | `Put(V)` | yes | `ReplacedWith(Entry)` | The new entry is returned. | + /// | `Remove` | no | `StillNone(Arc)` | | + /// | `Remove` | yes | `Removed(Entry)` | The removed entry is returned. | + /// | `Nop` | no | `StillNone(Arc)` | | + /// | `Nop` | yes | `Unchanged(Entry)` | The existing entry is returned. | + /// + /// # See Also + /// + /// - If you want the `Future` resolve to `Result>` instead of `Op`, and + /// modify entry only when resolved to `Ok(V)`, use the + /// [`and_try_compute_with`] method. + /// - If you only want to update or insert, use the [`and_upsert_with`] method. + /// + /// [`Entry`]: ../struct.Entry.html + /// [`Op`]: ../ops/compute/enum.Op.html + /// [`CompResult`]: ../ops/compute/enum.CompResult.html + /// [`and_upsert_with`]: #method.and_upsert_with + /// [`and_try_compute_with`]: #method.and_try_compute_with + /// + /// # Example + /// + /// ```rust + /// use moka::{ + /// sync::Cache, + /// ops::compute::{CompResult, Op}, + /// }; + /// + /// let cache: Cache = Cache::new(100); + /// let key = "key1".to_string(); + /// + /// /// Increment a cached `u64` counter. If the counter is greater than or + /// /// equal to 2, remove it. + /// fn inclement_or_remove_counter( + /// cache: &Cache, + /// key: &str, + /// ) -> CompResult { + /// cache + /// .entry(key.to_string()) + /// .and_compute_with(|maybe_entry| { + /// if let Some(entry) = maybe_entry { + /// let counter = entry.into_value(); + /// if counter < 2 { + /// Op::Put(counter.saturating_add(1)) // Update + /// } else { + /// Op::Remove + /// } + /// } else { + /// Op::Put(1) // Insert + /// } + /// }) + /// } + /// + /// // This should insert a new counter value 1 to the cache, and return the + /// // value with the kind of the operation performed. + /// let result = inclement_or_remove_counter(&cache, &key); + /// let CompResult::Inserted(entry) = result else { + /// panic!("`Inserted` should be returned: {result:?}"); + /// }; + /// assert_eq!(entry.into_value(), 1); + /// + /// // This should increment the cached counter value by 1. + /// let result = inclement_or_remove_counter(&cache, &key); + /// let CompResult::ReplacedWith(entry) = result else { + /// panic!("`ReplacedWith` should be returned: {result:?}"); + /// }; + /// assert_eq!(entry.into_value(), 2); + /// + /// // This should remove the cached counter from the cache, and returns the + /// // _removed_ value. + /// let result = inclement_or_remove_counter(&cache, &key); + /// let CompResult::Removed(entry) = result else { + /// panic!("`Removed` should be returned: {result:?}"); + /// }; + /// assert_eq!(entry.into_value(), 2); + /// + /// // The key should no longer exist. + /// assert!(!cache.contains_key(&key)); + /// + /// // This should start over; insert a new counter value 1 to the cache. + /// let result = inclement_or_remove_counter(&cache, &key); + /// let CompResult::Inserted(entry) = result else { + /// panic!("`Inserted` should be returned: {result:?}"); + /// }; + /// assert_eq!(entry.into_value(), 1); + /// ``` + /// + /// # Concurrent calls on the same key + /// + /// This method guarantees that concurrent calls on the same key are executed + /// serially. That is, `and_compute_with` calls on the same key never run + /// concurrently. The calls are serialized by the order of their invocation. It + /// uses a key-level lock to achieve this. + pub fn and_compute_with(self, f: F) -> compute::CompResult + where + F: FnOnce(Option>) -> compute::Op, + { + let key = Arc::new(self.owned_key); + self.cache.compute_with_hash_and_fun(key, self.hash, f) + } + + /// Performs a compute operation on a cached entry by using the given closure + /// `f`. A compute operation is either put, remove or no-operation (nop). + /// + /// The closure `f` should take the current entry of `Option>` for + /// the key, and return a `Result, E>`. + /// + /// This method works as the followings: + /// + /// 1. Apply the closure `f` to the current cached `Entry`, and get a + /// `Result, E>`. + /// 2. If resolved to `Err(E)`, return it. + /// 3. Else, execute the op on the cache: + /// - `Ok(Op::Put(V))`: Put the new value `V` to the cache. + /// - `Ok(Op::Remove)`: Remove the current cached entry. + /// - `Ok(Op::Nop)`: Do nothing. + /// 4. Return an `Ok(ops::compute::CompResult)` as the followings: + /// + /// | [`Op`] | [`Entry`] already exists? | [`CompResult`] | Notes | + /// |:--------- |:--- |:--------------------------- |:------------------------------- | + /// | `Put(V)` | no | `Inserted(Entry)` | The new entry is returned. | + /// | `Put(V)` | yes | `ReplacedWith(Entry)` | The new entry is returned. | + /// | `Remove` | no | `StillNone(Arc)` | | + /// | `Remove` | yes | `Removed(Entry)` | The removed entry is returned. | + /// | `Nop` | no | `StillNone(Arc)` | | + /// | `Nop` | yes | `Unchanged(Entry)` | The existing entry is returned. | + /// + /// # See Also + /// + /// - If you want the `Future` resolve to `Op` instead of `Result>`, use + /// the [`and_compute_with`] method. + /// - If you only want to put, use the [`and_upsert_with`] method. + /// + /// [`Entry`]: ../struct.Entry.html + /// [`Op`]: ../ops/compute/enum.Op.html + /// [`CompResult`]: ../ops/compute/enum.CompResult.html + /// [`and_upsert_with`]: #method.and_upsert_with + /// [`and_compute_with`]: #method.and_compute_with + /// + /// # Example + /// + /// See [`try_append_value_async.rs`] in the `examples` directory. + /// + /// [`try_append_value_sync.rs`]: + /// https://github.com/moka-rs/moka/tree/main/examples/try_append_value_sync.rs + /// + /// # Concurrent calls on the same key + /// + /// This method guarantees that concurrent calls on the same key are executed + /// serially. That is, `and_try_compute_with` calls on the same key never run + /// concurrently. The calls are serialized by the order of their invocation. It + /// uses a key-level lock to achieve this. + pub fn and_try_compute_with(self, f: F) -> Result, E> + where + F: FnOnce(Option>) -> Result, E>, + E: Send + Sync + 'static, + { + let key = Arc::new(self.owned_key); + self.cache.try_compute_with_hash_and_fun(key, self.hash, f) + } + + /// Performs an upsert of an [`Entry`] by using the given closure `f`. The word + /// "upsert" here means "update" or "insert". + /// + /// The closure `f` should take the current entry of `Option>` for + /// the key, and return a new value `V`. + /// + /// This method works as the followings: + /// + /// 1. Apply the closure `f` to the current cached `Entry`, and get a new value + /// `V`. + /// 2. Upsert the new value to the cache. + /// 3. Return the `Entry` having the upserted value. + /// + /// # See Also + /// + /// - If you want to optionally upsert, that is to upsert only when certain + /// conditions meet, use the [`and_compute_with`] method. + /// - If you try to upsert, that is to make the `Future` resolve to `Result` + /// instead of `V`, and upsert only when resolved to `Ok(V)`, use the + /// [`and_try_compute_with`] method. + /// + /// [`Entry`]: ../struct.Entry.html + /// [`and_compute_with`]: #method.and_compute_with + /// [`and_try_compute_with`]: #method.and_try_compute_with + /// + /// # Example + /// + /// ```rust + /// use moka::sync::Cache; + /// + /// let cache: Cache = Cache::new(100); + /// let key = "key1".to_string(); + /// + /// let entry = cache + /// .entry(key.clone()) + /// .and_upsert_with(|maybe_entry| { + /// if let Some(entry) = maybe_entry { + /// entry.into_value().saturating_add(1) // Update + /// } else { + /// 1 // Insert + /// } + /// }); + /// // It was not an update. + /// assert!(!entry.is_old_value_replaced()); + /// assert_eq!(entry.key(), &key); + /// assert_eq!(entry.into_value(), 1); + /// + /// let entry = cache + /// .entry(key.clone()) + /// .and_upsert_with(|maybe_entry| { + /// if let Some(entry) = maybe_entry { + /// entry.into_value().saturating_add(1) + /// } else { + /// 1 + /// } + /// }); + /// // It was an update. + /// assert!(entry.is_old_value_replaced()); + /// assert_eq!(entry.key(), &key); + /// assert_eq!(entry.into_value(), 2); + /// ``` + /// + /// # Concurrent calls on the same key + /// + /// This method guarantees that concurrent calls on the same key are executed + /// serially. That is, `and_upsert_with` calls on the same key never run + /// concurrently. The calls are serialized by the order of their invocation. It + /// uses a key-level lock to achieve this. + pub fn and_upsert_with(self, f: F) -> Entry + where + F: FnOnce(Option>) -> V, + { + let key = Arc::new(self.owned_key); + self.cache.upsert_with_hash_and_fun(key, self.hash, f) + } + /// Returns the corresponding [`Entry`] for the key given when this entry /// selector was constructed. If the entry does not exist, inserts one by calling /// the [`default`][std-default-function] function of the value type `V`. @@ -325,6 +580,261 @@ where } } + /// Performs a compute operation on a cached entry by using the given closure + /// `f`. A compute operation is either put, remove or no-operation (nop). + /// + /// The closure `f` should take the current entry of `Option>` for + /// the key, and return an `ops::compute::Op` enum. + /// + /// This method works as the followings: + /// + /// 1. Apply the closure `f` to the current cached `Entry`, and get an + /// `ops::compute::Op`. + /// 2. Execute the op on the cache: + /// - `Op::Put(V)`: Put the new value `V` to the cache. + /// - `Op::Remove`: Remove the current cached entry. + /// - `Op::Nop`: Do nothing. + /// 3. Return an `ops::compute::CompResult` as the followings: + /// + /// | [`Op`] | [`Entry`] already exists? | [`CompResult`] | Notes | + /// |:--------- |:--- |:--------------------------- |:------------------------------- | + /// | `Put(V)` | no | `Inserted(Entry)` | The new entry is returned. | + /// | `Put(V)` | yes | `ReplacedWith(Entry)` | The new entry is returned. | + /// | `Remove` | no | `StillNone(Arc)` | | + /// | `Remove` | yes | `Removed(Entry)` | The removed entry is returned. | + /// | `Nop` | no | `StillNone(Arc)` | | + /// | `Nop` | yes | `Unchanged(Entry)` | The existing entry is returned. | + /// + /// # See Also + /// + /// - If you want the `Future` resolve to `Result>` instead of `Op`, and + /// modify entry only when resolved to `Ok(V)`, use the + /// [`and_try_compute_with`] method. + /// - If you only want to update or insert, use the [`and_upsert_with`] method. + /// + /// [`Entry`]: ../struct.Entry.html + /// [`Op`]: ../ops/compute/enum.Op.html + /// [`CompResult`]: ../ops/compute/enum.CompResult.html + /// [`and_upsert_with`]: #method.and_upsert_with + /// [`and_try_compute_with`]: #method.and_try_compute_with + /// + /// # Example + /// + /// ```rust + /// use moka::{ + /// sync::Cache, + /// ops::compute::{CompResult, Op}, + /// }; + /// + /// let cache: Cache = Cache::new(100); + /// let key = "key1".to_string(); + /// + /// /// Increment a cached `u64` counter. If the counter is greater than or + /// /// equal to 2, remove it. + /// fn inclement_or_remove_counter( + /// cache: &Cache, + /// key: &str, + /// ) -> CompResult { + /// cache + /// .entry_by_ref(key) + /// .and_compute_with(|maybe_entry| { + /// if let Some(entry) = maybe_entry { + /// let counter = entry.into_value(); + /// if counter < 2 { + /// Op::Put(counter.saturating_add(1)) // Update + /// } else { + /// Op::Remove + /// } + /// } else { + /// Op::Put(1) // Insert + /// } + /// }) + /// } + /// + /// // This should insert a now counter value 1 to the cache, and return the + /// // value with the kind of the operation performed. + /// let result = inclement_or_remove_counter(&cache, &key); + /// let CompResult::Inserted(entry) = result else { + /// panic!("`Inserted` should be returned: {result:?}"); + /// }; + /// assert_eq!(entry.into_value(), 1); + /// + /// // This should increment the cached counter value by 1. + /// let result = inclement_or_remove_counter(&cache, &key); + /// let CompResult::ReplacedWith(entry) = result else { + /// panic!("`ReplacedWith` should be returned: {result:?}"); + /// }; + /// assert_eq!(entry.into_value(), 2); + /// + /// // This should remove the cached counter from the cache, and returns the + /// // _removed_ value. + /// let result = inclement_or_remove_counter(&cache, &key); + /// let CompResult::Removed(entry) = result else { + /// panic!("`Removed` should be returned: {result:?}"); + /// }; + /// assert_eq!(entry.into_value(), 2); + /// + /// // The key should no longer exist. + /// assert!(!cache.contains_key(&key)); + /// + /// // This should start over; insert a new counter value 1 to the cache. + /// let result = inclement_or_remove_counter(&cache, &key); + /// let CompResult::Inserted(entry) = result else { + /// panic!("`Inserted` should be returned: {result:?}"); + /// }; + /// assert_eq!(entry.into_value(), 1); + /// ``` + /// + /// # Concurrent calls on the same key + /// + /// This method guarantees that concurrent calls on the same key are executed + /// serially. That is, `and_compute_with` calls on the same key never run + /// concurrently. The calls are serialized by the order of their invocation. It + /// uses a key-level lock to achieve this. + pub fn and_compute_with(self, f: F) -> compute::CompResult + where + F: FnOnce(Option>) -> compute::Op, + { + let key = Arc::new(self.ref_key.to_owned()); + self.cache.compute_with_hash_and_fun(key, self.hash, f) + } + + /// Performs a compute operation on a cached entry by using the given closure + /// `f`. A compute operation is either put, remove or no-operation (nop). + /// + /// The closure `f` should take the current entry of `Option>` for + /// the key, and return a `Result, E>`. + /// + /// This method works as the followings: + /// + /// 1. Apply the closure `f` to the current cached `Entry`, and get a + /// `Result, E>`. + /// 2. If resolved to `Err(E)`, return it. + /// 3. Else, execute the op on the cache: + /// - `Ok(Op::Put(V))`: Put the new value `V` to the cache. + /// - `Ok(Op::Remove)`: Remove the current cached entry. + /// - `Ok(Op::Nop)`: Do nothing. + /// 4. Return an `Ok(ops::compute::CompResult)` as the followings: + /// + /// | [`Op`] | [`Entry`] already exists? | [`CompResult`] | Notes | + /// |:--------- |:--- |:--------------------------- |:------------------------------- | + /// | `Put(V)` | no | `Inserted(Entry)` | The new entry is returned. | + /// | `Put(V)` | yes | `ReplacedWith(Entry)` | The new entry is returned. | + /// | `Remove` | no | `StillNone(Arc)` | | + /// | `Remove` | yes | `Removed(Entry)` | The removed entry is returned. | + /// | `Nop` | no | `StillNone(Arc)` | | + /// | `Nop` | yes | `Unchanged(Entry)` | The existing entry is returned. | + /// + /// # Similar Methods + /// + /// - If you want the `Future` resolve to `Op` instead of `Result>`, use + /// the [`and_compute_with`] method. + /// - If you only want to update or insert, use the [`and_upsert_with`] method. + /// + /// [`Entry`]: ../struct.Entry.html + /// [`Op`]: ../ops/compute/enum.Op.html + /// [`CompResult`]: ../ops/compute/enum.CompResult.html + /// [`and_upsert_with`]: #method.and_upsert_with + /// [`and_compute_with`]: #method.and_compute_with + /// + /// # Example + /// + /// See [`try_append_value_async.rs`] in the `examples` directory. + /// + /// [`try_append_value_sync.rs`]: + /// https://github.com/moka-rs/moka/tree/main/examples/try_append_value_sync.rs + /// + /// # Concurrent calls on the same key + /// + /// This method guarantees that concurrent calls on the same key are executed + /// serially. That is, `and_try_compute_with` calls on the same key never run + /// concurrently. The calls are serialized by the order of their invocation. It + /// uses a key-level lock to achieve this. + pub fn and_try_compute_with(self, f: F) -> Result, E> + where + F: FnOnce(Option>) -> Result, E>, + E: Send + Sync + 'static, + { + let key = Arc::new(self.ref_key.to_owned()); + self.cache.try_compute_with_hash_and_fun(key, self.hash, f) + } + + /// Performs an upsert of an [`Entry`] by using the given closure `f`. The word + /// "upsert" here means "update" or "insert". + /// + /// The closure `f` should take the current entry of `Option>` for + /// the key, and return a new value `V`. + /// + /// This method works as the followings: + /// + /// 1. Apply the closure `f` to the current cached `Entry`, and get a new value + /// `V`. + /// 2. Upsert the new value to the cache. + /// 3. Return the `Entry` having the upserted value. + /// + /// # Similar Methods + /// + /// - If you want to optionally upsert, that is to upsert only when certain + /// conditions meet, use the [`and_compute_with`] method. + /// - If you try to upsert, that is to make the `Future` resolve to `Result` + /// instead of `V`, and upsert only when resolved to `Ok(V)`, use the + /// [`and_try_compute_with`] method. + /// + /// [`Entry`]: ../struct.Entry.html + /// [`and_compute_with`]: #method.and_compute_with + /// [`and_try_compute_with`]: #method.and_try_compute_with + /// + /// # Example + /// + /// ```rust + /// use moka::sync::Cache; + /// + /// let cache: Cache = Cache::new(100); + /// let key = "key1".to_string(); + /// + /// let entry = cache + /// .entry_by_ref(&key) + /// .and_upsert_with(|maybe_entry| { + /// if let Some(entry) = maybe_entry { + /// entry.into_value().saturating_add(1) // Update + /// } else { + /// 1 // Insert + /// } + /// }); + /// // It was not an update. + /// assert!(!entry.is_old_value_replaced()); + /// assert_eq!(entry.key(), &key); + /// assert_eq!(entry.into_value(), 1); + /// + /// let entry = cache + /// .entry_by_ref(&key) + /// .and_upsert_with(|maybe_entry| { + /// if let Some(entry) = maybe_entry { + /// entry.into_value().saturating_add(1) + /// } else { + /// 1 + /// } + /// }); + /// // It was an update. + /// assert!(entry.is_old_value_replaced()); + /// assert_eq!(entry.key(), &key); + /// assert_eq!(entry.into_value(), 2); + /// ``` + /// + /// # Concurrent calls on the same key + /// + /// This method guarantees that concurrent calls on the same key are executed + /// serially. That is, `and_upsert_with` calls on the same key never run + /// concurrently. The calls are serialized by the order of their invocation. It + /// uses a key-level lock to achieve this. + pub fn and_upsert_with(self, f: F) -> Entry + where + F: FnOnce(Option>) -> V, + { + let key = Arc::new(self.ref_key.to_owned()); + self.cache.upsert_with_hash_and_fun(key, self.hash, f) + } + /// Returns the corresponding [`Entry`] for the reference of the key given when /// this entry selector was constructed. If the entry does not exist, inserts one /// by cloning the key and calling the [`default`][std-default-function] function diff --git a/src/sync/segment.rs b/src/sync/segment.rs index 4f84f404..aafdc777 100644 --- a/src/sync/segment.rs +++ b/src/sync/segment.rs @@ -284,12 +284,14 @@ where RefKeyEntrySelector::new(key, hash, cache) } + /// TODO: Remove this in v0.13.0. /// Deprecated, replaced with [`get_with`](#method.get_with) #[deprecated(since = "0.8.0", note = "Replaced with `get_with`")] pub fn get_or_insert_with(&self, key: K, init: impl FnOnce() -> V) -> V { self.get_with(key, init) } + /// TODO: Remove this in v0.13.0. /// Deprecated, replaced with [`try_get_with`](#method.try_get_with) #[deprecated(since = "0.8.0", note = "Replaced with `try_get_with`")] pub fn get_or_try_insert_with(&self, key: K, init: F) -> Result> diff --git a/src/sync/value_initializer.rs b/src/sync/value_initializer.rs index 8bbdc045..e802317a 100644 --- a/src/sync/value_initializer.rs +++ b/src/sync/value_initializer.rs @@ -1,17 +1,57 @@ use parking_lot::RwLock; use std::{ any::{Any, TypeId}, + fmt, hash::{BuildHasher, Hash}, sync::Arc, }; use triomphe::Arc as TrioArc; -use super::OptionallyNone; +use crate::{ + ops::compute::{CompResult, Op}, + Entry, +}; + +use super::{ComputeNone, OptionallyNone}; const WAITER_MAP_NUM_SEGMENTS: usize = 64; +pub(crate) trait GetOrInsert { + /// Gets an entry for the given key _with_ recording the access to the cache + /// policies. + fn get_entry(&self, key: &Arc, hash: u64) -> Option> + where + V: 'static; + + /// Inserts a value for the given key. + fn insert(&self, key: Arc, hash: u64, value: V); + + /// Removes a value for the given key. Returns the removed value. + fn remove(&self, key: &Arc, hash: u64) -> Option; +} + type ErrorObject = Arc; -type WaiterValue = Option>; + +// type WaiterValue = Option>; +enum WaiterValue { + Computing, + Ready(Result), + ReadyNone, + // https://github.com/moka-rs/moka/issues/43 + InitClosurePanicked, +} + +impl fmt::Debug for WaiterValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + WaiterValue::Computing => write!(f, "Computing"), + WaiterValue::Ready(_) => write!(f, "Ready"), + WaiterValue::ReadyNone => write!(f, "ReadyNone"), + WaiterValue::InitClosurePanicked => write!(f, "InitFuturePanicked"), + } + } +} + type Waiter = TrioArc>>; pub(crate) enum InitResult { @@ -70,22 +110,23 @@ where let (w_key, w_hash) = self.waiter_key_hash(key, type_id); - let waiter = TrioArc::new(RwLock::new(None)); + let waiter = TrioArc::new(RwLock::new(WaiterValue::Computing)); let mut lock = waiter.write(); loop { let Some(existing_waiter) = self.try_insert_waiter(w_key.clone(), w_hash, &waiter) else { + // Inserted. break; }; // Somebody else's waiter already exists, so wait for its result to become available. let waiter_result = existing_waiter.read(); match &*waiter_result { - Some(Ok(value)) => return ReadExisting(value.clone()), - Some(Err(e)) => return InitErr(Arc::clone(e).downcast().unwrap()), - // None means somebody else's init closure has been panicked. - None => { + WaiterValue::Ready(Ok(value)) => return ReadExisting(value.clone()), + WaiterValue::Ready(Err(e)) => return InitErr(Arc::clone(e).downcast().unwrap()), + // Somebody else's init closure has been panicked. + WaiterValue::InitClosurePanicked => { retries += 1; assert!( retries < MAX_RETRIES, @@ -96,6 +137,11 @@ where // Retry from the beginning. continue; } + // Unexpected state. + s @ (WaiterValue::Computing | WaiterValue::ReadyNone) => panic!( + "Got unexpected state `{s:?}` after resolving `init` future. \ + This might be a bug in Moka" + ), } } @@ -105,7 +151,7 @@ where if let Some(value) = get() { // Yes. Set the waiter value, remove our waiter, and return // the existing value. - *lock = Some(Ok(value.clone())); + *lock = WaiterValue::Ready(Ok(value.clone())); self.remove_waiter(w_key, w_hash); return InitResult::ReadExisting(value); } @@ -116,26 +162,24 @@ where match catch_unwind(AssertUnwindSafe(init)) { // Evaluated. Ok(value) => { - let (waiter_val, init_res) = match post_init(value) { + let init_res = match post_init(value) { Ok(value) => { insert(value.clone()); - (Some(Ok(value.clone())), InitResult::Initialized(value)) + *lock = WaiterValue::Ready(Ok(value.clone())); + InitResult::Initialized(value) } Err(e) => { let err: ErrorObject = Arc::new(e); - ( - Some(Err(Arc::clone(&err))), - InitResult::InitErr(err.downcast().unwrap()), - ) + *lock = WaiterValue::Ready(Err(Arc::clone(&err))); + InitResult::InitErr(err.downcast().unwrap()) } }; - *lock = waiter_val; self.remove_waiter(w_key, w_hash); init_res } // Panicked. Err(payload) => { - *lock = None; + *lock = WaiterValue::InitClosurePanicked; // Remove the waiter so that others can retry. self.remove_waiter(w_key, w_hash); resume_unwind(payload); @@ -144,6 +188,132 @@ where // The write lock will be unlocked here. } + /// # Panics + /// Panics if the `init` closure has been panicked. + pub(crate) fn try_compute<'a, C, F, O, E>( + &'a self, + c_key: Arc, + c_hash: u64, + cache: &C, + f: F, + post_init: fn(O) -> Result, E>, + allow_nop: bool, + ) -> Result, E> + where + V: 'static, + C: GetOrInsert + Send + 'a, + F: FnOnce(Option>) -> O, + E: Send + Sync + 'static, + { + use std::panic::{catch_unwind, resume_unwind, AssertUnwindSafe}; + + let type_id = TypeId::of::(); + let (w_key, w_hash) = self.waiter_key_hash(&c_key, type_id); + let waiter = TrioArc::new(RwLock::new(WaiterValue::Computing)); + // NOTE: We have to acquire a write lock before `try_insert_waiter`, + // so that any concurrent attempt will get our lock and wait on it. + let mut lock = waiter.write(); + + loop { + let Some(existing_waiter) = self.try_insert_waiter(w_key.clone(), w_hash, &waiter) + else { + // Inserted. + break; + }; + + // Somebody else's waiter already exists, so wait for it to finish + // (wait for it to release the write lock). + let waiter_result = existing_waiter.read(); + match &*waiter_result { + // Unexpected state. + WaiterValue::Computing => panic!( + "Got unexpected state `Computing` after resolving `init` future. \ + This might be a bug in Moka" + ), + _ => { + // Try to insert our waiter again. + continue; + } + } + } + + // Our waiter was inserted. + + // Get the current value. + let maybe_entry = cache.get_entry(&c_key, c_hash); + let maybe_value = if allow_nop { + maybe_entry.as_ref().map(|ent| ent.value().clone()) + } else { + None + }; + let entry_existed = maybe_entry.is_some(); + + // Evaluate the `f` closure. Catching panic is safe here as we will not + // evaluate the closure again. + let output = match catch_unwind(AssertUnwindSafe(|| f(maybe_entry))) { + // Evaluated. + Ok(output) => { + *lock = WaiterValue::ReadyNone; + output + } + // Panicked. + Err(payload) => { + *lock = WaiterValue::InitClosurePanicked; + // Remove the waiter so that others can retry. + self.remove_waiter(w_key, w_hash); + resume_unwind(payload); + } + }; + + let op = match post_init(output) { + Ok(op) => op, + Err(e) => { + self.remove_waiter(w_key, w_hash); + return Err(e); + } + }; + + let result = match op { + Op::Nop => { + if let Some(value) = maybe_value { + Ok(CompResult::Unchanged(Entry::new( + Some(c_key), + value, + false, + false, + ))) + } else { + Ok(CompResult::StillNone(c_key)) + } + } + Op::Put(value) => { + cache.insert(Arc::clone(&c_key), c_hash, value.clone()); + if entry_existed { + crossbeam_epoch::pin().flush(); + let entry = Entry::new(Some(c_key), value, true, true); + Ok(CompResult::ReplacedWith(entry)) + } else { + let entry = Entry::new(Some(c_key), value, true, false); + Ok(CompResult::Inserted(entry)) + } + } + Op::Remove => { + let maybe_prev_v = cache.remove(&c_key, c_hash); + if let Some(prev_v) = maybe_prev_v { + crossbeam_epoch::pin().flush(); + let entry = Entry::new(Some(c_key), prev_v, false, false); + Ok(CompResult::Removed(entry)) + } else { + Ok(CompResult::StillNone(c_key)) + } + } + }; + self.remove_waiter(w_key, w_hash); + result + + // The lock will be unlocked here. + } + /// The `post_init` function for the `get_with` method of cache. pub(crate) fn post_init_for_get_with(value: V) -> Result { Ok(value) @@ -165,6 +335,24 @@ where result } + /// The `post_init` function for the `and_upsert_with` method of cache. + pub(crate) fn post_init_for_upsert_with(value: V) -> Result, ()> { + Ok(Op::Put(value)) + } + + /// The `post_init` function for the `and_compute_with` method of cache. + pub(crate) fn post_init_for_compute_with(op: Op) -> Result, ()> { + Ok(op) + } + + /// The `post_init` function for the `and_try_compute_with` method of cache. + pub(crate) fn post_init_for_try_compute_with(op: Result, E>) -> Result, E> + where + E: Send + Sync + 'static, + { + op + } + /// Returns the `type_id` for `get_with` method of cache. pub(crate) fn type_id_for_get_with() -> TypeId { // NOTE: We use a regular function here instead of a const fn because TypeId diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index ad41b056..0d4571e1 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -357,7 +357,7 @@ where is_expiry_modified, }; read_recorder(op, now); - Some(Entry::new(maybe_key, v, false)) + Some(Entry::new(maybe_key, v, false, false)) } else { read_recorder(ReadOp::Miss(hash), now); None