diff --git a/notify/src/consolidating_path_trie.rs b/notify/src/consolidating_path_trie.rs new file mode 100644 index 00000000..65ef059f --- /dev/null +++ b/notify/src/consolidating_path_trie.rs @@ -0,0 +1,239 @@ +use std::ffi::{OsStr, OsString}; +use std::path::{Path, PathBuf}; + +#[derive(Debug)] +pub struct PathTrieNode { + // sorted for binary search + children: Vec<(OsString, PathTrieNode)>, + value: Option, +} + +impl Default for PathTrieNode { + fn default() -> Self { + Self { + children: Vec::new(), + value: None, + } + } +} + +impl PathTrieNode { + pub(crate) fn get_child_index(&self, key: &OsStr) -> Result { + self.children + .binary_search_by(|(k, _)| k.as_os_str().cmp(key)) + } + + pub fn remove_children(&mut self) { + self.children.clear(); + } + + pub fn descendants(&self) -> impl Iterator { + PathTrieIter::new(self) + } +} + +#[derive(Debug)] +pub struct PathTrie { + root: PathTrieNode, +} + +impl PathTrie { + pub fn new() -> Self { + Self { + root: PathTrieNode::default(), + } + } + + /// Insert a path with the given value. + pub fn insert(&mut self, path: impl AsRef, value: T) -> &mut PathTrieNode { + let path = path.as_ref(); + let mut current = &mut self.root; + for component in path.components() { + let key = component.as_os_str(); + match current.get_child_index(key) { + Ok(idx) => { + current = &mut current.children[idx].1; + } + Err(idx) => { + let new_node = (key.to_os_string(), PathTrieNode::default()); + current.children.insert(idx, new_node); + current = &mut current.children[idx].1; + } + } + } + current.value = Some(value); + current + } + + /// Get the node associated with the given path. + /// It only returns if the node has a value. + #[expect(dead_code)] + pub fn get(&self, path: impl AsRef) -> Option<&PathTrieNode> { + let path = path.as_ref(); + let mut current = &self.root; + for component in path.components() { + let key = component.as_os_str(); + let idx = current.get_child_index(key).ok()?; + current = ¤t.children[idx].1; + } + Some(current) + } + + /// Get the value associated with the nearest ancestor of the given path. + /// If the path itself has a value, it is returned. + pub fn get_ancestor(&self, path: impl AsRef) -> Option<(PathBuf, &PathTrieNode)> { + let path = path.as_ref(); + let mut current = &self.root; + for (i, component) in path.components().enumerate() { + if current.value.is_some() { + let ancestor_path = path.components().take(i).collect(); + return Some((ancestor_path, current)); + } + let key = component.as_os_str(); + let idx = current.get_child_index(key).ok()?; + current = ¤t.children[idx].1; + } + current + .value + .as_ref() + .map(|_| (path.to_path_buf(), current)) + } + + #[cfg(test)] + pub fn iter(&self) -> PathTrieIter<'_, T> { + PathTrieIter::new(&self.root) + } +} + +pub struct PathTrieIter<'a, T> { + // (current node, next index to visit) + stack: Vec<(&'a PathTrieNode, usize)>, + // current path + current_path: PathBuf, +} + +impl<'a, T> Iterator for PathTrieIter<'a, T> { + type Item = (PathBuf, &'a T); + + fn next(&mut self) -> Option { + while let Some((node, child_idx)) = self.stack.last_mut() { + // check current node + if *child_idx == 0 { + *child_idx += 1; + if let Some(value) = &node.value { + return Some((self.current_path.clone(), value)); + } + } + + // visit children + let current_child_pos = *child_idx - 1; + if current_child_pos < node.children.len() { + let (key, next_node) = &node.children[current_child_pos]; + *child_idx += 1; + self.current_path.push(key); + self.stack.push((next_node, 0)); + } else { + self.stack.pop(); + self.current_path.pop(); + } + } + None + } +} + +impl<'a, T> PathTrieIter<'a, T> { + fn new(root: &'a PathTrieNode) -> Self { + Self { + stack: vec![(root, 0)], + current_path: PathBuf::new(), + } + } +} + +pub struct ConsolidatingPathTrie { + trie: PathTrie<()>, +} + +impl ConsolidatingPathTrie { + pub fn new() -> Self { + Self { + trie: PathTrie::new(), + } + } + + pub fn insert(&mut self, path: impl AsRef) { + let path = path.as_ref(); + if self.trie.get_ancestor(path).is_some() { + return; + } + let inserted = self.trie.insert(path, ()); + inserted.remove_children(); + } + + pub fn values(&self) -> Vec { + self.trie + .root + .descendants() + .map(|(path, ())| path) + .collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn basic() { + let mut t = PathTrie::new(); + t.insert(PathBuf::from("/a"), ()); + t.insert(PathBuf::from("/a/b/c"), ()); + t.insert(PathBuf::from("/a/b/c2"), ()); + assert_eq!( + t.iter().collect::>(), + vec![ + (PathBuf::from("/a"), &()), + (PathBuf::from("/a/b/c"), &()), + (PathBuf::from("/a/b/c2"), &()), + ] + ); + } + + #[test] + fn consolidate_no_siblings() { + let mut ct = ConsolidatingPathTrie::new(); + ct.insert(PathBuf::from("/a/b")); + ct.insert(PathBuf::from("/a/c")); + assert_eq!( + ct.values(), + vec![PathBuf::from("/a/b"), PathBuf::from("/a/c")] + ); + } + + #[test] + fn consolidate_no_siblings2() { + let mut ct = ConsolidatingPathTrie::new(); + ct.insert(PathBuf::from("/a/b1")); + ct.insert(PathBuf::from("/a/b2")); + assert_eq!( + ct.values(), + vec![PathBuf::from("/a/b1"), PathBuf::from("/a/b2")] + ); + } + + #[test] + fn consolidate_children() { + let mut ct = ConsolidatingPathTrie::new(); + ct.insert(PathBuf::from("/a/b")); + ct.insert(PathBuf::from("/a/b/c")); + assert_eq!(ct.values(), vec![PathBuf::from("/a/b")]); + } + + #[test] + fn consolidate_parent() { + let mut ct = ConsolidatingPathTrie::new(); + ct.insert(PathBuf::from("/a/b/c")); + ct.insert(PathBuf::from("/a/b")); + assert_eq!(ct.values(), vec![PathBuf::from("/a/b")]); + } +} diff --git a/notify/src/fsevent.rs b/notify/src/fsevent.rs index 0aeb13bb..57b0baa5 100644 --- a/notify/src/fsevent.rs +++ b/notify/src/fsevent.rs @@ -14,13 +14,15 @@ #![allow(non_upper_case_globals, dead_code)] +use crate::consolidating_path_trie::ConsolidatingPathTrie; use crate::{Config, Error, EventHandler, PathsMut, Result, Sender, WatchMode, Watcher, unbounded}; use crate::{TargetMode, event::*}; use objc2_core_foundation as cf; use objc2_core_services as fs; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::ffi::CStr; use std::fmt; +use std::hash::RandomState; use std::path::{Path, PathBuf}; use std::ptr::NonNull; use std::sync::{Arc, Mutex}; @@ -332,26 +334,6 @@ impl FsEventWatcher { } fn remove_path(&mut self, path: &Path) -> Result<()> { - let cf_path = cf::CFString::from_str(&path.to_string_lossy()); - - let mut to_remove = Vec::new(); - for (idx, item) in self.paths.iter().enumerate() { - if item.compare( - Some(&cf_path), - cf::CFStringCompareFlags::CompareCaseInsensitive, - ) == cf::CFComparisonResult::CompareEqualTo - { - to_remove.push(cf::CFIndex::try_from(idx).unwrap()); - } - } - - for idx in to_remove.iter().rev() { - // SAFETY: `the_array` is not `None` and the generic is correct, `idx` is in-bounds - unsafe { - cf::CFMutableArray::remove_value_at_index(Some(self.paths.as_opaque()), *idx); - }; - } - let p = if let Ok(canonicalized_path) = path.canonicalize() { canonicalized_path } else { @@ -375,19 +357,59 @@ impl FsEventWatcher { .canonicalize() .unwrap_or(path.to_path_buf()); - let cf_path = cf::CFString::from_str(&path.to_string_lossy()); - self.paths.append(&cf_path); - self.watches .insert(canonical_path, watch_mode.recursive_mode.is_recursive()); Ok(()) } + fn update_paths_based_on_watches(&mut self) { + let paths_to_watch = { + let mut trie = ConsolidatingPathTrie::new(); + for path in self.watches.keys() { + trie.insert(path.clone()); + } + trie.values() + }; + tracing::debug!("Watching the following paths: {paths_to_watch:?}"); + let paths_to_watch_set = paths_to_watch + .iter() + .map(|p| p.to_string_lossy().to_lowercase()) + .collect::>(); + let mut already_included_paths = + HashSet::::with_capacity(self.paths.len()); + + // remove no longer watched paths + let mut to_remove = Vec::new(); + for (idx, item) in self.paths.iter().enumerate() { + if paths_to_watch_set.contains(&item.to_string()) { + already_included_paths.insert(item.to_string()); + } else { + to_remove.push(cf::CFIndex::try_from(idx).unwrap()); + } + } + for idx in to_remove.iter().rev() { + // SAFETY: `the_array` is not `None` and the generic is correct, `idx` is in-bounds + unsafe { + cf::CFMutableArray::remove_value_at_index(Some(self.paths.as_opaque()), *idx); + }; + } + + // add new paths + for path in paths_to_watch { + if !already_included_paths.contains(&path.to_string_lossy().to_lowercase()) { + self.paths + .append(&cf::CFString::from_str(&path.to_string_lossy())); + } + } + } + fn run(&mut self) -> Result<()> { - if self.paths.is_empty() { + if self.watches.is_empty() { return Ok(()); } + self.update_paths_based_on_watches(); + // We need to associate the stream context with our callback in order to propagate events // to the rest of the system. This will be owned by the stream, and will be freed when the // stream is closed. This means we will leak the context if we panic before reaching diff --git a/notify/src/lib.rs b/notify/src/lib.rs index 41286146..64deab53 100644 --- a/notify/src/lib.rs +++ b/notify/src/lib.rs @@ -234,6 +234,8 @@ pub mod poll; mod bimap; mod config; +#[cfg(all(target_os = "macos", not(feature = "macos_kqueue")))] +mod consolidating_path_trie; mod error; #[cfg(test)]