Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question about using Arc as a cache value #372

Closed
rtdavis22 opened this issue Dec 31, 2023 · 6 comments
Closed

Question about using Arc as a cache value #372

rtdavis22 opened this issue Dec 31, 2023 · 6 comments
Labels
question Further information is requested

Comments

@rtdavis22
Copy link

rtdavis22 commented Dec 31, 2023

Hello, I have a question about how I can store Arc<Mutex<T>> values in a moka Cache and keep these values in sync with a persistent store.

My problem is evident in the code below. I'm getting an Arc<Mutex<String>> from the cache, then the cache entry is invalidated (causing its value to be written to the DB), and then the value in the Arc is updated (but the updated value is never persisted to the DB).

use std::sync::Arc;

use moka::future::{Cache, FutureExt};
use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
    let cache = Cache::builder()
        .max_capacity(10)
        .async_eviction_listener(move |k, v: Arc<Mutex<String>>, _cause| {
            async move {
                // "Strong count: 4"
                println!("Strong count: {}", Arc::strong_count(&v));

                // "Saving 1, Hello to DB"
                println!("Saving {}, {} to DB", k, v.lock().await);
            }
            .boxed()
        })
        .build();

    let v = cache
        .get_with(1, async move {
            // Read value from DB
            Arc::new(Mutex::new(String::from("Hello")))
        })
        .await;

    cache.invalidate_all();
    cache.run_pending_tasks().await;

    // This change is never reflected in the DB!
    *v.lock().await = String::from("World");
}

One idea I had was to update the eviction listener to wait until the Arcs strong count drops to 1 and then write the value to the DB, but the strong count doesn't seem to drop (presumably because of clones the Cache is using internally?).

Any suggestions for what I can do to keep the values in the Arcs in sync with the DB?

@peter-scholtens
Copy link
Contributor

I had a similar problem, see the comment of the original author here. With this knowledge I avoided using the eviction listener and wrote a cascading drop example, see this example. Have a look at the README.md file in the same directory too.

@tatsuya6502
Copy link
Member

tatsuya6502 commented Dec 31, 2023

Hi. After changing the value, you can reinsert it to the cache to ensure the eviction listener will be called later. It is guaranteed that the eviction listener is called on every inserted values.

The following code reinserts the value, and the eviction listener should be called twice for key 1; one for "Hello" and one for "World".

use std::sync::Arc;

use moka::future::{Cache, FutureExt};
use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
    let cache = Cache::builder()
        .max_capacity(10)
        .async_eviction_listener(move |k, v: Arc<Mutex<String>>, _cause| {
            async move {
                println!("Saving {}, {} to DB ({:?})", k, v.lock().await, _cause);
            }
            .boxed()
        })
        .build();

    let v = cache
        .get_with(1, async move {
            // Read value from DB
            Arc::new(Mutex::new(String::from("Hello")))
        })
        .await;

    cache.invalidate_all();
    cache.run_pending_tasks().await;

    // Change the value.
    *v.lock().await = String::from("World");

    // Reinsert it to the cache.
    cache.insert(1, v).await;

    cache.invalidate_all();
    cache.run_pending_tasks().await;
}
Saving 1, Hello to DB (Explicit)
Saving 1, World to DB (Explicit)

One idea I had was to update the eviction listener to wait until the Arcs strong count drops to 1 and then write the value to the DB, but the strong count doesn't seem to drop (presumably because of clones the Cache is using internally?).

In your code, calling run_pending_tasks should call the eviction listener. When it calls, run_pending_task still has some works to do so the strong count will be larger than 2 (although, I thought it will be 3, not 4).

And even though after returning from run_pending_tasks, the strong count will be 2 because the internal data structure of the Cache uses crossbeam_epoch, which provides a epoch-based garbage collector. crossbeam_epoch will not drop your Arc pointer immediately.

If you call run_pending_tasks once more, the strong count might become 1.

use std::sync::Arc;

use moka::future::{Cache, FutureExt};
use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
    let cache = Cache::builder()
        .max_capacity(10)
        .async_eviction_listener(move |k, v: Arc<Mutex<String>>, _cause| {
            async move {
                // "Strong count: 4"
                println!("A: Strong count: {}", Arc::strong_count(&v));

                // "Saving 1, Hello to DB"
                println!("Saving {}, {} to DB ({:?})", k, v.lock().await, _cause);
            }
            .boxed()
        })
        .build();

    let v = cache
        .get_with(1, async move {
            // Read value from DB
            Arc::new(Mutex::new(String::from("Hello")))
        })
        .await;

    cache.invalidate_all();

    // This will do the followings:
    // 1. Call the eviction listener for key `1`.
    // 2. Precess other pending tasks.
    // 3. Call `crossbeam_epoch::pin::flush()`. It will move the deferred
    //    drop request from the thread local storage to to the global queue.
    //    https://docs.rs/crossbeam-epoch/0.9.17/crossbeam_epoch/struct.Guard.html#method.flush
    cache.run_pending_tasks().await;

    // "Strong count: 2"
    println!("B: Strong count: {}", Arc::strong_count(&v));

    // This calls `crossbeam_epoch::pin::flush()`. It will run the deferred
    // drop request on the global queue.
    cache.run_pending_tasks().await;

    // "Strong count: 1"
    println!("C: Strong count: {}", Arc::strong_count(&v));

    // This change is never reflected in the DB!
    *v.lock().await = String::from("World");
}

So, in the above code, the strong count will eventually become 1. However, I would not suggest to rely on the strong count because it is hard to predict exactly when crossbeam_epoch's GC will run and the strong count will become 1.

Instead, I would suggest to do reinserting the changed value, as in the first code snippet.

@tatsuya6502
Copy link
Member

see the comment of the original author here.

Just FYI, the following PR will explain more details about crossbeam_epoch's GC behavior.

@tatsuya6502 tatsuya6502 added the question Further information is requested label Jan 1, 2024
@tatsuya6502
Copy link
Member

Closing as the original question was answered. Feel free to reopen if needed.

As for this one,

However, I would not suggest to rely on the strong count because it is hard to predict exactly when crossbeam_epoch's GC will run and the strong count will become 1.

we hope we could address/mitigate it in the future. I am keeping notes here in the project roadmap:

@rtdavis22
Copy link
Author

Sorry, meant to follow up here. Thanks for the responses. I wasn't able to use the reinsertion approach, I think because it wasn't really practical to resinsert into the cache after every modification to a cache entry. I ended up implementing my own cache type that stores Arcs and only evicts from the cache if the ttl is passed AND if the strong count is 1, in which case it can safely unwrap the Arc. This seems to work well for my purposes.

@tatsuya6502
Copy link
Member

tatsuya6502 commented Jan 19, 2024

@rtdavis22 Thank you for the update. Your feedback is very valuable!

I ended up implementing my own cache type that stores Arcs and only evicts from the cache if the ttl is passed AND if the strong count is 1, in which case it can safely unwrap the Arc.

I am glad you found a solution. Perhaps, I should have shown you the sample code on this comment (#344). It uses dashmap::DashMap to store the keys and Arcs, and use moka::sync::Cache to control the TTL of the entries in the DashMap. It does not store the Arcs on moka cache, so it does not have the strong count problem.

The sample code does not exactly match your use case, because moka cache cannot delay the eviction until the strong count becomes 1. You will have to reinsert the evicted entry like this one (#298), so it is not an elegant solution.

After #298 was filed, I was thinking to add a support for an event listener event handler closure, which will be called when a cached entry is about to expire by the TTL (but is not yet evicted). The handler can then check the strong count of the Arc to decide whether it lets the entry to be evicted or extends its expiry.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants