diff --git a/Cargo.toml b/Cargo.toml index 9430d2d6..44c5750c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -110,6 +110,10 @@ rustdoc-args = ["--cfg", "docsrs"] name = "async_example" required-features = ["future"] +[[example]] +name = "cascading_drop_async" +required-features = ["future"] + [[example]] name = "sync_example" required-features = ["sync"] diff --git a/examples/cascading_drop_async.rs b/examples/cascading_drop_async.rs new file mode 100644 index 00000000..762dbe99 --- /dev/null +++ b/examples/cascading_drop_async.rs @@ -0,0 +1,189 @@ +use moka::future::Cache; +use std::collections::btree_map; +use std::collections::BTreeMap; +use std::sync::mpsc; +use std::sync::Arc; +use std::sync::Mutex; +use std::thread; +use std::thread::sleep; +use std::time::Duration; + +#[derive(Debug)] +pub struct User { + user_id: u64, // Needed as key in BTreeMap when executing a recursive Drop of a Session + name: String, + friends: Vec>>, +} + +impl User { + pub fn print_friends(&self) { + print!("User {} has friends ", self.name); + for f in &self.friends { + print!("{}, ", f.lock().unwrap().name); + } + println!(); + } +} + +impl Drop for User { + fn drop(&mut self) { + println!("Dropping user {}", self.name); + } +} + +pub struct Session { + ptr: Option>>, + sender: std::sync::mpsc::Sender, +} + +impl Drop for Session { + fn drop(&mut self) { + let user_id; + user_id = self.ptr.as_ref().unwrap().lock().unwrap().user_id; + println!("Dropping session holding a reference to user {}", user_id); + self.ptr = None; // Must drop Arc before verify Btree!!! + let _ = self.sender.send(user_id); + } +} + +#[tokio::main] +async fn main() { + // For a webserver you may want to access the users via their cached session + // or via their user number, as they can be friends of each other. + // Using the Drop trait for a Session, orphaned users will get pruned. + // + // Create some users. + let user1 = Arc::new(Mutex::new(User { + user_id: 1, + name: String::from("Alice"), + friends: vec![], + })); + let user2 = Arc::new(Mutex::new(User { + user_id: 2, + name: String::from("Bob"), + friends: vec![], + })); + // There will be no session of user Charlie, but he will connected as friend. + let user3 = Arc::new(Mutex::new(User { + user_id: 3, + name: String::from("Charlie"), + friends: vec![], + })); + // Connect their friends to them. + user2.lock().unwrap().friends.push(user1.clone()); + user2.lock().unwrap().friends.push(user3.clone()); + user2.lock().unwrap().print_friends(); + + // Store users names in a B-tree by number. + let mut group_tree = BTreeMap::new(); + group_tree.insert(1, user1.clone()); + group_tree.insert(2, user2.clone()); + // The group_tree MUST consume user3 here, and not a clone, otherwise + // strong_count() reports that user3 still has another (unused) reference! + group_tree.insert(3, user3); + + // Create mpsc channel for pruning user-ids in B-tree. + let (send, recv) = mpsc::channel::(); + let send_cl = send.clone(); + let group_tree = Arc::new(Mutex::new(group_tree)); + let group_tree_cl = group_tree.clone(); + thread::spawn(move || loop { + for u in recv.iter() { + println!( + "user id {} has strong count: {}", + u, + Arc::strong_count(group_tree_cl.lock().unwrap().get(&u).unwrap()) + ); + let mut verify_queue = Vec::new(); + match group_tree_cl.lock().unwrap().entry(u) { + btree_map::Entry::Occupied(e) if Arc::strong_count(e.get()) < 2 => { + let u = e.remove(); + for f in u.lock().unwrap().friends.iter() { + let u = f.lock().unwrap().user_id; + verify_queue.push(u); + } + } + _ => {} + }; + // drop here: + if !verify_queue.is_empty() { + println!("Send users to verification queue: {:?}", verify_queue); + for i in verify_queue { + let _ = send_cl.send(i); + } + } + } + }); + + // Make an artificially small cache and 1-second ttl to observe pruning of the tree. + let ttl = 3; + let sessions_cache = Cache::builder() + .max_capacity(10) + .time_to_live(Duration::from_secs(ttl)) + .eviction_listener(|key, value: Arc>, cause| { + println!( + "Evicted session with key {:08X} of user_id {:?} because {:?}", + *key, + value + .lock() + .unwrap() + .ptr + .as_ref() + .unwrap() + .lock() + .unwrap() + .user_id, + cause + ) + }) + .build(); + // To create some simple CRC-32 session keys with Bash do: + // for ((i = 1; i < 4 ; i++)); do rhash <(echo "$i")|tail -1; done + + // Alice's session on browser + let session1 = Session { + ptr: Some(user1.clone()), + sender: send.clone(), + }; + sessions_cache + .insert(0x6751FC53, Arc::new(Mutex::new(session1))) + .await; + + // Alice's second session on smartphone + let session2 = Session { + ptr: Some(user1), + sender: send.clone(), + }; + sessions_cache + .insert(0x4C7CAF90, Arc::new(Mutex::new(session2))) + .await; + + // Add also Bob's session + let session3 = Session { + ptr: Some(user2), + sender: send.clone(), + }; + sessions_cache + .insert(0x55679ED1, Arc::new(Mutex::new(session3))) + .await; + + // Show cache content + for (key, value) in sessions_cache.iter() { + let session = value.lock().unwrap(); + println!( + "Found session {:08X} from user_id: {}", + *key, + session.ptr.as_ref().unwrap().lock().unwrap().user_id + ); + } + + println!("Waiting"); + for t in 1..=ttl + 1 { + sleep(Duration::from_secs(1)); + sessions_cache.get(&0).await; + sessions_cache.run_pending_tasks().await; + println!("t = {}, pending: {}", t, sessions_cache.entry_count()); + } + assert!(group_tree.lock().unwrap().is_empty()); + println!("Exit program."); +} diff --git a/examples/eviction_listener.rs b/examples/eviction_listener.rs index a6f04819..05b79949 100644 --- a/examples/eviction_listener.rs +++ b/examples/eviction_listener.rs @@ -4,10 +4,11 @@ use std::time::Duration; fn main() { // Make an artificially small cache and 1-second ttl to observe eviction listener. + let ttl = 1; { let cache = Cache::builder() .max_capacity(2) - .time_to_live(Duration::from_secs(1)) + .time_to_live(Duration::from_secs(ttl)) .eviction_listener(|key, value, cause| { println!("Evicted ({key:?},{value:?}) because {cause:?}") }) @@ -20,24 +21,30 @@ fn main() { // Replaced and Size. cache.insert(&2, "two".to_string()); // With 1-second ttl, keys 0 and 1 will be evicted if we wait long enough. - sleep(Duration::from_secs(2)); + sleep(Duration::from_secs(ttl + 1)); println!("Wake up!"); cache.insert(&3, "three".to_string()); cache.insert(&4, "four".to_string()); - let _ = cache.remove(&3); + + // Remove from cache and return value: + if let Some(v) = cache.remove(&3) { + println!("Removed: {v}") + }; + // Or remove from cache without returning the value. cache.invalidate(&4); + cache.insert(&5, "five".to_string()); // invalidate_all() removes entries using a background thread, so there will // be some delay before entries are removed and the eviction listener is - // called. If you want to remove all entries immediately, call sync() method - // repeatedly like the loop below. + // called. If you want to remove all entries immediately, call + // run_pending_tasks() method repeatedly like the loop below. cache.invalidate_all(); loop { // Synchronization is limited to at most 500 entries for each call. cache.run_pending_tasks(); - // Check if all is done. Calling entry_count() requires calling sync() - // first! + // Check if all is done. Calling entry_count() requires calling + // run_pending_tasks() first! if cache.entry_count() == 0 { break; } @@ -46,7 +53,7 @@ fn main() { cache.insert(&6, "six".to_string()); // When cache is dropped eviction listener is not called. Either call // invalidate_all() or wait longer than ttl. - sleep(Duration::from_secs(2)); + sleep(Duration::from_secs(ttl + 1)); } // cache is dropped here. println!("Cache structure removed.");