Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example for cascading drop triggered by eviction #350

Merged
merged 4 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ rustdoc-args = ["--cfg", "docsrs"]
name = "async_example"
required-features = ["future"]

[[example]]
name = "cascading_drop_async"
required-features = ["future"]

[[example]]
name = "sync_example"
required-features = ["sync"]
Expand Down
189 changes: 189 additions & 0 deletions examples/cascading_drop_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use moka::future::Cache;
use std::collections::btree_map;
use std::collections::BTreeMap;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::thread::sleep;
use std::time::Duration;

#[derive(Debug)]
pub struct User {
user_id: u64, // Needed as key in BTreeMap when executing a recursive Drop of a Session
name: String,
friends: Vec<Arc<Mutex<User>>>,
}

impl User {
pub fn print_friends(&self) {
print!("User {} has friends ", self.name);
for f in &self.friends {
print!("{}, ", f.lock().unwrap().name);
}
println!();
}
}

impl Drop for User {
fn drop(&mut self) {
println!("Dropping user {}", self.name);
}
}

pub struct Session {
ptr: Option<Arc<Mutex<User>>>,
sender: std::sync::mpsc::Sender<u64>,
}

impl Drop for Session {
fn drop(&mut self) {
let user_id;
user_id = self.ptr.as_ref().unwrap().lock().unwrap().user_id;
println!("Dropping session holding a reference to user {}", user_id);
self.ptr = None; // Must drop Arc before verify Btree!!!
let _ = self.sender.send(user_id);
}
}

#[tokio::main]
async fn main() {
// For a webserver you may want to access the users via their cached session
// or via their user number, as they can be friends of each other.
// Using the Drop trait for a Session, orphaned users will get pruned.
//
// Create some users.
let user1 = Arc::new(Mutex::new(User {
user_id: 1,
name: String::from("Alice"),
friends: vec![],
}));
let user2 = Arc::new(Mutex::new(User {
user_id: 2,
name: String::from("Bob"),
friends: vec![],
}));
// There will be no session of user Charlie, but he will connected as friend.
let user3 = Arc::new(Mutex::new(User {
user_id: 3,
name: String::from("Charlie"),
friends: vec![],
}));
// Connect their friends to them.
user2.lock().unwrap().friends.push(user1.clone());
user2.lock().unwrap().friends.push(user3.clone());
user2.lock().unwrap().print_friends();

// Store users names in a B-tree by number.
let mut group_tree = BTreeMap::new();
group_tree.insert(1, user1.clone());
group_tree.insert(2, user2.clone());
// The group_tree MUST consume user3 here, and not a clone, otherwise
// strong_count() reports that user3 still has another (unused) reference!
group_tree.insert(3, user3);

// Create mpsc channel for pruning user-ids in B-tree.
let (send, recv) = mpsc::channel::<u64>();
let send_cl = send.clone();
let group_tree = Arc::new(Mutex::new(group_tree));
let group_tree_cl = group_tree.clone();
thread::spawn(move || loop {
for u in recv.iter() {
println!(
"user id {} has strong count: {}",
u,
Arc::strong_count(group_tree_cl.lock().unwrap().get(&u).unwrap())
);
let mut verify_queue = Vec::new();
match group_tree_cl.lock().unwrap().entry(u) {
btree_map::Entry::Occupied(e) if Arc::strong_count(e.get()) < 2 => {
let u = e.remove();
for f in u.lock().unwrap().friends.iter() {
let u = f.lock().unwrap().user_id;
verify_queue.push(u);
}
}
_ => {}
};
// drop here:
if !verify_queue.is_empty() {
println!("Send users to verification queue: {:?}", verify_queue);
for i in verify_queue {
let _ = send_cl.send(i);
}
}
}
});

// Make an artificially small cache and 1-second ttl to observe pruning of the tree.
let ttl = 3;
let sessions_cache = Cache::builder()
.max_capacity(10)
.time_to_live(Duration::from_secs(ttl))
.eviction_listener(|key, value: Arc<Mutex<Session>>, cause| {
println!(
"Evicted session with key {:08X} of user_id {:?} because {:?}",
*key,
value
.lock()
.unwrap()
.ptr
.as_ref()
.unwrap()
.lock()
.unwrap()
.user_id,
cause
)
})
.build();
// To create some simple CRC-32 session keys with Bash do:
// for ((i = 1; i < 4 ; i++)); do rhash <(echo "$i")|tail -1; done

// Alice's session on browser
let session1 = Session {
ptr: Some(user1.clone()),
sender: send.clone(),
};
sessions_cache
.insert(0x6751FC53, Arc::new(Mutex::new(session1)))
.await;

// Alice's second session on smartphone
let session2 = Session {
ptr: Some(user1),
sender: send.clone(),
};
sessions_cache
.insert(0x4C7CAF90, Arc::new(Mutex::new(session2)))
.await;

// Add also Bob's session
let session3 = Session {
ptr: Some(user2),
sender: send.clone(),
};
sessions_cache
.insert(0x55679ED1, Arc::new(Mutex::new(session3)))
.await;

// Show cache content
for (key, value) in sessions_cache.iter() {
let session = value.lock().unwrap();
println!(
"Found session {:08X} from user_id: {}",
*key,
session.ptr.as_ref().unwrap().lock().unwrap().user_id
);
}

println!("Waiting");
for t in 1..=ttl + 1 {
sleep(Duration::from_secs(1));
sessions_cache.get(&0).await;
sessions_cache.run_pending_tasks().await;
println!("t = {}, pending: {}", t, sessions_cache.entry_count());
}
assert!(group_tree.lock().unwrap().is_empty());
println!("Exit program.");
}
23 changes: 15 additions & 8 deletions examples/eviction_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use std::time::Duration;

fn main() {
// Make an artificially small cache and 1-second ttl to observe eviction listener.
let ttl = 1;
{
let cache = Cache::builder()
.max_capacity(2)
.time_to_live(Duration::from_secs(1))
.time_to_live(Duration::from_secs(ttl))
.eviction_listener(|key, value, cause| {
println!("Evicted ({key:?},{value:?}) because {cause:?}")
})
Expand All @@ -20,24 +21,30 @@ fn main() {
// Replaced and Size.
cache.insert(&2, "two".to_string());
// With 1-second ttl, keys 0 and 1 will be evicted if we wait long enough.
sleep(Duration::from_secs(2));
sleep(Duration::from_secs(ttl + 1));
println!("Wake up!");
cache.insert(&3, "three".to_string());
cache.insert(&4, "four".to_string());
let _ = cache.remove(&3);

// Remove from cache and return value:
if let Some(v) = cache.remove(&3) {
println!("Removed: {v}")
};
// Or remove from cache without returning the value.
cache.invalidate(&4);

cache.insert(&5, "five".to_string());

// invalidate_all() removes entries using a background thread, so there will
// be some delay before entries are removed and the eviction listener is
// called. If you want to remove all entries immediately, call sync() method
// repeatedly like the loop below.
// called. If you want to remove all entries immediately, call
// run_pending_tasks() method repeatedly like the loop below.
cache.invalidate_all();
loop {
// Synchronization is limited to at most 500 entries for each call.
cache.run_pending_tasks();
// Check if all is done. Calling entry_count() requires calling sync()
// first!
// Check if all is done. Calling entry_count() requires calling
// run_pending_tasks() first!
if cache.entry_count() == 0 {
break;
}
Expand All @@ -46,7 +53,7 @@ fn main() {
cache.insert(&6, "six".to_string());
// When cache is dropped eviction listener is not called. Either call
// invalidate_all() or wait longer than ttl.
sleep(Duration::from_secs(2));
sleep(Duration::from_secs(ttl + 1));
} // cache is dropped here.

println!("Cache structure removed.");
Expand Down
Loading