Skip to content

Commit

Permalink
Merge pull request #370 from moka-rs/compute-api
Browse files Browse the repository at this point in the history
Add the upsert and compute methods for modifying a cached entry
  • Loading branch information
tatsuya6502 authored Jan 8, 2024
2 parents ada3236 + 472addc commit cc36d72
Show file tree
Hide file tree
Showing 27 changed files with 3,142 additions and 65 deletions.
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
"Uninit",
"unsync",
"Upsert",
"upserted",
"usize"
],
"files.watcherExclude": {
Expand Down
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# Moka Cache — Change Log

## Version 0.12.3

### Added

- Added the upsert and compute methods for modifying a cached entry
([#370][gh-pull-0370]):
- Now the `entry` or `entry_by_ref` APIs have the following methods:
- `and_upsert_with` method to insert or update the entry.
- `and_compute_with` method to insert, update, remove or do nothing on the
entry.
- `and_try_compute_with` method, which is similar to above but returns
`Result`.


## Version 0.12.2

### Fixed
Expand Down Expand Up @@ -781,6 +795,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (Mar 25, 2021).
[gh-issue-0034]: https://github.com/moka-rs/moka/issues/34/
[gh-issue-0031]: https://github.com/moka-rs/moka/issues/31/

[gh-pull-0370]: https://github.com/moka-rs/moka/pull/370/
[gh-pull-0363]: https://github.com/moka-rs/moka/pull/363/
[gh-pull-0350]: https://github.com/moka-rs/moka/pull/350/
[gh-pull-0348]: https://github.com/moka-rs/moka/pull/348/
Expand Down
36 changes: 34 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "moka"
version = "0.12.2"
version = "0.12.3"
edition = "2021"
# Rust 1.65 was released on Nov 3, 2022.
rust-version = "1.65"
Expand Down Expand Up @@ -81,7 +81,7 @@ getrandom = "0.2"
paste = "1.0.9"
reqwest = { version = "0.11.11", default-features = false, features = ["rustls-tls"] }
skeptic = "0.13"
tokio = { version = "1.19", features = ["fs", "macros", "rt-multi-thread", "sync", "time" ] }
tokio = { version = "1.19", features = ["fs", "io-util", "macros", "rt-multi-thread", "sync", "time" ] }

[target.'cfg(trybuild)'.dev-dependencies]
trybuild = "1.0"
Expand All @@ -106,6 +106,14 @@ rustdoc-args = ["--cfg", "docsrs"]

# Examples

[[example]]
name = "append_value_async"
required-features = ["future"]

[[example]]
name = "append_value_sync"
required-features = ["sync"]

[[example]]
name = "basics_async"
required-features = ["future"]
Expand All @@ -114,14 +122,38 @@ required-features = ["future"]
name = "basics_sync"
required-features = ["sync"]

[[example]]
name = "bounded_counter_async"
required-features = ["future"]

[[example]]
name = "bounded_counter_sync"
required-features = ["sync"]

[[example]]
name = "cascading_drop_async"
required-features = ["future"]

[[example]]
name = "counter_async"
required-features = ["future"]

[[example]]
name = "counter_sync"
required-features = ["sync"]

[[example]]
name = "eviction_listener_sync"
required-features = ["sync"]

[[example]]
name = "size_aware_eviction_sync"
required-features = ["sync"]

[[example]]
name = "try_append_value_async"
required-features = ["future"]

[[example]]
name = "try_append_value_sync"
required-features = ["sync"]
13 changes: 6 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -514,24 +514,23 @@ $ cargo +nightly -Z unstable-options --config 'build.rustdocflags="--cfg docsrs"
doc --no-deps --features 'future, sync'
```

## Road Map
## Roadmap

- [x] Size-aware eviction. (`v0.7.0` via [#24][gh-pull-024])
- [x] API stabilization. (Smaller core cache API, shorter names for frequently
used methods) (`v0.8.0` via [#105][gh-pull-105])
- [x] API stabilization. (Smaller core API, shorter names for frequently used
methods) (`v0.8.0` via [#105][gh-pull-105])
- e.g.
- `get_or_insert_with(K, F)``get_with(K, F)`
- `get_or_try_insert_with(K, F)``try_get_with(K, F)`
- `blocking_insert(K, V)``blocking().insert(K, V)`
- `time_to_live()``policy().time_to_live()`
- [x] Notifications on eviction. (`v0.9.0` via [#145][gh-pull-145])
- [x] Variable (per-entry) expiration, using hierarchical timer wheels.
(`v0.11.0` via [#248][gh-pull-248])
- [ ] Cache statistics (Hit rate, etc.). ([details][cache-stats])
- [x] Remove background threads. (`v0.12.0` via [#294][gh-pull-294] and
[#316][gh-pull-316])
- [x] Add upsert and compute methods. (`v0.12.3` via [#370][gh-pull-370])
- [ ] Cache statistics (Hit rate, etc.). ([details][cache-stats])
- [ ] Restore cache from a snapshot. ([details][restore])
- [ ] `and_compute` method. ([details][and-compute])
- [ ] Upgrade TinyLFU to Window-TinyLFU. ([details][tiny-lfu])

[gh-pull-024]: https://github.com/moka-rs/moka/pull/24
Expand All @@ -540,8 +539,8 @@ $ cargo +nightly -Z unstable-options --config 'build.rustdocflags="--cfg docsrs"
[gh-pull-248]: https://github.com/moka-rs/moka/pull/248
[gh-pull-294]: https://github.com/moka-rs/moka/pull/294
[gh-pull-316]: https://github.com/moka-rs/moka/pull/316
[gh-pull-370]: https://github.com/moka-rs/moka/pull/370

[and-compute]: https://github.com/moka-rs/moka/issues/227
[cache-stats]: https://github.com/moka-rs/moka/issues/234
[restore]: https://github.com/moka-rs/moka/issues/314

Expand Down
47 changes: 36 additions & 11 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,54 @@ Each example has a suffix `_async` or `_sync`:
## Basics of the Cache API

- [basics_async](./basics_async.rs) and [basics_sync](./basics_sync.rs)
- Sharing a cache between async tasks or OS threads.
- Shares a cache between async tasks or OS threads.
- Do not wrap a `Cache` with `Arc<Mutex<_>>`! Just clone the `Cache` and you
are all set.
- `insert`, `get` and `invalidate` methods.
- Uses `insert`, `get` and `invalidate` methods.

- [size_aware_eviction_sync](./size_aware_eviction_sync.rs)
- Configuring the max capacity of the cache based on the total size of the cached
- Configures the max capacity of the cache based on the total size of the cached
entries.

## The `Entry` API

Atomically inserts, updates and removes an entry from the cache depending on the
existence of the entry.

- [counter_async](./counter_async.rs) and [counter_sync](./counter_sync.rs)
- Atomically increments a cached `u64` by 1. If the entry does not exist, inserts
a new entry with the value 1.
- Uses `and_upsert_with` method.
- [bounded_counter_async](./bounded_counter_async.rs) and
[bounded_counter_sync](./bounded_counter_sync.rs)
- Same as above except removing the entry when the value is 2.
- `and_compute_with` method.
- [append_value_async](./append_value_async.rs) and
[append_value_sync](./append_value_sync.rs)
- Atomically appends an `i32` to a cached `Arc<RwLock<Vec<i32>>>`. If the entry
does not exist, inserts a new entry.
- Uses `and_upsert_with` method.
- [try_append_value_async](./try_append_value_async.rs) and
[try_append_value_sync](./try_append_value_sync.rs)
- Atomically reads an `char` from a reader and appends it to a cached `Arc<RwLock<String>>`,
but reading may fail by an early EOF.
- Uses `and_try_compute_with` method.

## Expiration and Eviction Listener

- [eviction_listener_sync](./eviction_listener_sync.rs)
- Setting the `time_to_live` expiration policy.
- Registering a listener (closure) to be notified when an entry is evicted from
the cache.
- `insert`, `invalidate` and `invalidate_all` methods.
- Demonstrating when the expired entries will be actually evicted from the cache,
- Configures the `time_to_live` expiration policy.
- Registers a listener (closure) to be notified when an entry is evicted from the
cache.
- Uses `insert`, `invalidate`, `invalidate_all` and `run_pending_tasks` methods.
- Demonstrates when the expired entries will be actually evicted from the cache,
and why the `run_pending_tasks` method could be important in some cases.

- [cascading_drop_async](./cascading_drop_async.rs)
- Controlling the lifetime of the objects in a separate `BTreeMap` collection
from the cache using an eviction listener.
- `BTreeMap`, `Arc` and mpsc channel (multi-producer, single consumer channel).
- Controls the lifetime of the objects in a separate `BTreeMap` collection from
the cache using an eviction listener.
- Beside the cache APIs, uses `BTreeMap`, `Arc` and mpsc channel (multi-producer,
single consumer channel).

## Check out the API Documentation too!

Expand Down
67 changes: 67 additions & 0 deletions examples/append_value_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! This example demonstrates how to append an `i32` value to a cached `Vec<i32>`
//! value. It uses the `and_upsert_with` method of `Cache`.
use std::sync::Arc;

use moka::{future::Cache, Entry};
use tokio::sync::RwLock;

#[tokio::main]
async fn main() {
// We want to store a raw value `Vec<i32>` for each `String` key. We are going to
// append `i32` values to the `Vec` in the cache.
//
// Note that we have to wrap the `Vec` in an `Arc<RwLock<_>>`. We need the `Arc`,
// an atomic reference counted shared pointer, because `and_upsert_with` method
// of `Cache` passes a _clone_ of the value to our closure, instead of passing a
// `&mut` reference. We do not want to clone the `Vec` every time we append a
// value to it, so we wrap it in an `Arc`. Then we need the `RwLock` because we
// mutate the `Vec` when we append a value to it.
//
// The reason that `and_upsert_with` cannot pass a `&mut Vec<_>` to the closure
// is because the internal concurrent hash table of `Cache` is a lock free data
// structure and does not use any mutexes. So it cannot guarantee: (1) the `&mut
// Vec<_>` is unique, and (2) it is not accessed concurrently by other threads.
let cache: Cache<String, Arc<RwLock<Vec<i32>>>> = Cache::new(100);

let key = "key".to_string();

let entry = append_to_cached_vec(&cache, &key, 1).await;
// It was not an update.
assert!(!entry.is_old_value_replaced());
assert!(entry.is_fresh());
assert_eq!(*entry.into_value().read().await, &[1]);

let entry = append_to_cached_vec(&cache, &key, 2).await;
assert!(entry.is_fresh());
// It was an update.
assert!(entry.is_old_value_replaced());
assert_eq!(*entry.into_value().read().await, &[1, 2]);

let entry = append_to_cached_vec(&cache, &key, 3).await;
assert!(entry.is_fresh());
assert!(entry.is_old_value_replaced());
assert_eq!(*entry.into_value().read().await, &[1, 2, 3]);
}

async fn append_to_cached_vec(
cache: &Cache<String, Arc<RwLock<Vec<i32>>>>,
key: &str,
value: i32,
) -> Entry<String, Arc<RwLock<Vec<i32>>>> {
cache
.entry_by_ref(key)
.and_upsert_with(|maybe_entry| async {
if let Some(entry) = maybe_entry {
// The entry exists, append the value to the Vec.
let v = entry.into_value();
v.write().await.push(value);
v
} else {
// The entry does not exist, insert a new Vec containing
// the value.
Arc::new(RwLock::new(vec![value]))
}
})
.await
}
60 changes: 60 additions & 0 deletions examples/append_value_sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//! This example demonstrates how to append an `i32` value to a cached `Vec<i32>`
//! value. It uses the `and_upsert_with` method of `Cache`.
use std::sync::{Arc, RwLock};

use moka::{sync::Cache, Entry};

fn main() {
// We want to store a raw value `Vec<i32>` for each `String` key. We are going to
// append `i32` values to the `Vec` in the cache.
//
// Note that we have to wrap the `Vec` in an `Arc<RwLock<_>>`. We need the `Arc`,
// an atomic reference counted shared pointer, because `and_upsert_with` method
// of `Cache` passes a _clone_ of the value to our closure, instead of passing a
// `&mut` reference. We do not want to clone the `Vec` every time we append a
// value to it, so we wrap it in an `Arc`. Then we need the `RwLock` because we
// mutate the `Vec` when we append a value to it.
//
// The reason that `and_upsert_with` cannot pass a `&mut Vec<_>` to the closure
// is because the internal concurrent hash table of `Cache` is a lock free data
// structure and does not use any mutexes. So it cannot guarantee: (1) the `&mut
// Vec<_>` is unique, and (2) it is not accessed concurrently by other threads.
let cache: Cache<String, Arc<RwLock<Vec<i32>>>> = Cache::new(100);

let key = "key".to_string();

let entry = append_to_cached_vec(&cache, &key, 1);
assert!(entry.is_fresh());
assert!(!entry.is_old_value_replaced());
assert_eq!(*entry.into_value().read().unwrap(), &[1]);

let entry = append_to_cached_vec(&cache, &key, 2);
assert!(entry.is_fresh());
assert!(entry.is_old_value_replaced());
assert_eq!(*entry.into_value().read().unwrap(), &[1, 2]);

let entry = append_to_cached_vec(&cache, &key, 3);
assert!(entry.is_fresh());
assert!(entry.is_old_value_replaced());
assert_eq!(*entry.into_value().read().unwrap(), &[1, 2, 3]);
}

fn append_to_cached_vec(
cache: &Cache<String, Arc<RwLock<Vec<i32>>>>,
key: &str,
value: i32,
) -> Entry<String, Arc<RwLock<Vec<i32>>>> {
cache.entry_by_ref(key).and_upsert_with(|maybe_entry| {
if let Some(entry) = maybe_entry {
// The entry exists, append the value to the Vec.
let v = entry.into_value();
v.write().unwrap().push(value);
v
} else {
// The entry does not exist, insert a new Vec containing
// the value.
Arc::new(RwLock::new(vec![value]))
}
})
}
Loading

0 comments on commit cc36d72

Please sign in to comment.