From f1283698d0bc1a71fd5d745660a14b336a53f05d Mon Sep 17 00:00:00 2001 From: visig Date: Sat, 21 May 2022 16:20:47 +0800 Subject: [PATCH 1/2] refactor: PollWatcher Refactor PollWatcher to make it clean for further develop. This commit would not change any behaviors user observable (not include Debug implementation) and public documentation. --- src/poll.rs | 773 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 459 insertions(+), 314 deletions(-) diff --git a/src/poll.rs b/src/poll.rs index b8913f70..3d9656ae 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -3,46 +3,400 @@ //! Checks the `watch`ed paths periodically to detect changes. This implementation only uses //! Rust stdlib APIs and should work on all of the platforms it supports. -use super::event::*; -use super::{Error, EventHandler, RecursiveMode, Result, Watcher}; -use filetime::FileTime; -use std::collections::hash_map::RandomState; -use std::collections::HashMap; -use std::fmt::Debug; -use std::fs::Metadata; -use std::hash::BuildHasher; -use std::hash::Hasher; -use std::io::{ErrorKind, Read}; -use std::path::{Path, PathBuf}; -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, Mutex, +use crate::{EventHandler, RecursiveMode, Watcher}; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, + }, + thread, + time::Duration, }; -use std::thread; -use std::time::{Duration, Instant}; -use std::{fs, io}; -use walkdir::WalkDir; -#[derive(Debug, Clone)] -struct PathData { - mtime: i64, - hash: Option, - last_check: Instant, -} +use data::{DataBuilder, WatchData}; +mod data { + use crate::{ + event::{CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind}, + EventHandler, + }; + use filetime::FileTime; + use std::{ + collections::{hash_map::RandomState, HashMap}, + fmt::{self, Debug}, + fs::{self, File, Metadata}, + hash::{BuildHasher, Hasher}, + io::{self, Read}, + path::{Path, PathBuf}, + sync::{Arc, Mutex}, + time::Instant, + }; + use walkdir::WalkDir; + + /// Builder for [`WatchData`] & [`PathData`]. + pub(super) struct DataBuilder { + emitter: EventEmitter, + + // TODO: May allow user setup their custom BuildHasher / BuildHasherDefault + // in future. + build_hasher: Option, + + // current timestamp for building Data. + now: Instant, + } -#[derive(Debug)] -struct WatchData { - is_recursive: bool, - paths: HashMap, + impl DataBuilder { + pub(super) fn new(event_handler: F, compare_content: bool) -> Self + where + F: EventHandler, + { + Self { + emitter: EventEmitter::new(event_handler), + build_hasher: compare_content.then(|| RandomState::default()), + now: Instant::now(), + } + } + + /// Update internal timestamp. + pub(super) fn update_timestamp(&mut self) { + self.now = Instant::now(); + } + + /// Create [`WatchData`]. + /// + /// This function will return `Err(_)` if can not retrieve metadata from + /// the path location. (e.g., not found). + pub(super) fn build_watch_data( + &self, + root: PathBuf, + is_recursive: bool, + ) -> Option { + WatchData::new(self, root, is_recursive) + } + + /// Create [`PathData`]. + fn build_path_data(&self, meta_path: &MetaPath) -> PathData { + PathData::new(self, meta_path) + } + } + + impl Debug for DataBuilder { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("DataBuilder") + .field("build_hasher", &self.build_hasher) + .field("now", &self.now) + .finish() + } + } + + #[derive(Debug)] + pub(super) struct WatchData { + // config part, won't change. + root: PathBuf, + is_recursive: bool, + + // current status part. + all_path_data: HashMap, + } + + impl WatchData { + /// Scan filesystem and create a new `WatchData`. + /// + /// # Side effect + /// + /// This function may send event by `data_builder.emitter`. + fn new(data_builder: &DataBuilder, root: PathBuf, is_recursive: bool) -> Option { + // If metadata read error at `root` path, it will emit + // a error event and stop to create the whole `WatchData`. + // + // QUESTION: inconsistent? + // + // When user try to *CREATE* a watch by `poll_watcher.watch(root, ..)`, + // if `root` path hit an io error, then watcher will reject to + // create this new watch. + // + // This may inconsistent with *POLLING* a watch. When watcher + // continue polling, io error at root path will not delete + // a existing watch. polling still working. + // + // So, consider a config file may not exists at first time but may + // create after a while, developer cannot watch it. + // + // FIXME: Can we always allow to watch a path, even file not + // found at this path? + if let Err(e) = fs::metadata(&root) { + data_builder.emitter.emit_io_err(e, &root); + return None; + } + + let all_path_data = + Self::scan_all_path_data(data_builder, root.clone(), is_recursive).collect(); + + Some(Self { + root, + is_recursive, + all_path_data, + }) + } + + /// Rescan filesystem and update this `WatchData`. + /// + /// # Side effect + /// + /// This function may emit event by `data_builder.emitter`. + pub(super) fn rescan(&mut self, data_builder: &mut DataBuilder) { + // scan current filesystem. + for (path, new_path_data) in + Self::scan_all_path_data(data_builder, self.root.clone(), self.is_recursive) + { + let old_path_data = self + .all_path_data + .insert(path.clone(), new_path_data.clone()); + + // emit event + let event = + PathData::compare_to_event(path, old_path_data.as_ref(), Some(&new_path_data)); + if let Some(event) = event { + data_builder.emitter.emit_ok(event); + } + } + + // scan for disappeared paths. + let mut disappeared_paths = Vec::new(); + for (path, path_data) in self.all_path_data.iter() { + if path_data.last_check < data_builder.now { + disappeared_paths.push(path.clone()); + } + } + + // remove disappeared paths + for path in disappeared_paths { + let old_path_data = self.all_path_data.remove(&path); + + // emit event + let event = PathData::compare_to_event(path, old_path_data.as_ref(), None); + if let Some(event) = event { + data_builder.emitter.emit_ok(event); + } + } + } + + /// Get all `PathData` by given configuration. + /// + /// # Side Effect + /// + /// This function may emit some IO Error events by `data_builder.emitter`. + fn scan_all_path_data<'a>( + data_builder: &'a DataBuilder, + root: PathBuf, + is_recursive: bool, + ) -> impl Iterator + 'a { + // WalkDir return only one entry if root is a file (not a folder), + // so we can use single logic to do the both file & dir's jobs. + // + // See: https://docs.rs/walkdir/2.0.1/walkdir/struct.WalkDir.html#method.new + WalkDir::new(root) + .follow_links(true) + .max_depth(Self::dir_scan_depth(is_recursive)) + .into_iter() + // + // QUESTION: should we ignore IO Error? + // + // current implementation ignore some IO error, e.g., + // + // - `.filter_map(|entry| entry.ok())` + // - all read error when hashing + // + // but the code also interest with `fs::metadata()` error and + // propagate to event handler. It may not consistent. + // + // FIXME: Should we emit all IO error events? Or ignore them all? + .filter_map(|entry| entry.ok()) + .filter_map(|entry| match entry.metadata() { + Ok(metadata) => { + let path = entry.into_path(); + + let meta_path = MetaPath::from_parts_unchecked(path, metadata); + let data_path = data_builder.build_path_data(&meta_path); + + Some((meta_path.into_path(), data_path)) + } + Err(e) => { + // emit event. + let path = entry.into_path(); + data_builder.emitter.emit_io_err(e, path); + + None + } + }) + } + + fn dir_scan_depth(is_recursive: bool) -> usize { + if is_recursive { + usize::max_value() + } else { + 1 + } + } + } + + /// Stored data for a one path locations. + /// + /// See [`WatchData`] for more detail. + #[derive(Debug, Clone)] + struct PathData { + /// File updated time. + mtime: i64, + + /// Content's hash value, only available if user request compare file + /// contents and read successful. + hash: Option, + + /// Checked time. + last_check: Instant, + } + + impl PathData { + /// Create a new `PathData`. + fn new(data_builder: &DataBuilder, meta_path: &MetaPath) -> PathData { + let metadata = meta_path.metadata(); + + PathData { + mtime: FileTime::from_last_modification_time(&metadata).seconds(), + hash: data_builder + .build_hasher + .as_ref() + .filter(|_| metadata.is_file()) + .and_then(|build_hasher| { + Self::get_content_hash(build_hasher, meta_path.path()).ok() + }), + + last_check: data_builder.now, + } + } + + /// Get hash value for the data content in given file `path`. + fn get_content_hash(build_hasher: &RandomState, path: &Path) -> io::Result { + let mut hasher = build_hasher.build_hasher(); + let mut file = File::open(path)?; + let mut buf = [0; 512]; + + loop { + let n = match file.read(&mut buf) { + Ok(0) => break, + Ok(len) => len, + Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, + Err(e) => return Err(e), + }; + + hasher.write(&buf[..n]); + } + + Ok(hasher.finish()) + } + + /// Get [`Event`] by compare two optional [`PathData`]. + fn compare_to_event

( + path: P, + old: Option<&PathData>, + new: Option<&PathData>, + ) -> Option + where + P: Into, + { + match (old, new) { + (Some(old), Some(new)) => { + if new.mtime > old.mtime { + Some(EventKind::Modify(ModifyKind::Metadata( + MetadataKind::WriteTime, + ))) + } else if new.hash != old.hash { + Some(EventKind::Modify(ModifyKind::Data(DataChange::Any))) + } else { + None + } + } + (None, Some(_new)) => Some(EventKind::Create(CreateKind::Any)), + (Some(_old), None) => Some(EventKind::Remove(RemoveKind::Any)), + (None, None) => None, + } + .map(|event_kind| Event::new(event_kind).add_path(path.into())) + } + } + + /// Compose path and its metadata. + /// + /// This data structure designed for make sure path and its metadata can be + /// transferred in consistent way, and may avoid some duplicated + /// `fs::metadata()` function call in some situations. + #[derive(Debug)] + pub(super) struct MetaPath { + path: PathBuf, + metadata: Metadata, + } + + impl MetaPath { + /// Create `MetaPath` by given parts. + /// + /// # Invariant + /// + /// User must make sure the input `metadata` are associated with `path`. + fn from_parts_unchecked(path: PathBuf, metadata: Metadata) -> Self { + Self { path, metadata } + } + + fn path(&self) -> &Path { + &self.path + } + + fn metadata(&self) -> &Metadata { + &self.metadata + } + + fn into_path(self) -> PathBuf { + self.path + } + } + + /// Thin wrapper for outer event handler, for easy to use. + struct EventEmitter(Arc>); + + impl EventEmitter { + fn new(event_handler: F) -> Self { + Self(Arc::new(Mutex::new(event_handler))) + } + + /// Emit single event. + fn emit(&self, event: crate::Result) { + // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore. + if let Ok(mut guard) = self.0.lock() { + guard.handle_event(event); + } + } + + fn emit_ok(&self, event: Event) { + self.emit(Ok(event)) + } + + /// Emit io error event. + fn emit_io_err(&self, err: E, path: P) + where + E: Into, + P: Into, + { + self.emit(Err(crate::Error::io(err.into()).add_path(path.into()))) + } + } } /// Polling based `Watcher` implementation +#[derive(Debug)] pub struct PollWatcher { - event_handler: Arc>, watches: Arc>>, - open: Arc, + data_builder: Arc>, + want_to_stop: Arc, delay: Duration, - compare_contents: bool, } /// General purpose configuration for [`PollWatcher`] specifically. Can be used to tune @@ -71,314 +425,103 @@ impl Default for PollWatcherConfig { } } -impl Debug for PollWatcher { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("PollWatcher") - .field("event_handler", &Arc::as_ptr(&self.watches)) - .field("watches", &self.watches) - .field("open", &self.open) - .field("delay", &self.delay) - .field("compare_contents", &self.compare_contents) - .finish() - } -} - -fn emit_event(event_handler: &Mutex, res: Result) { - if let Ok(mut guard) = event_handler.lock() { - let f: &mut dyn EventHandler = &mut *guard; - f.handle_event(res); - } -} - -impl PathData { - pub fn collect( - path: &Path, - metadata: &Metadata, - build_hasher: Option<&BH>, - last_check: Instant, - ) -> Self { - let mtime = FileTime::from_last_modification_time(metadata).seconds(); - let hash = metadata - .is_file() - .then(|| build_hasher.and_then(|bh| Self::hash_file(path, bh).ok())) - .flatten(); - Self { - mtime, - hash, - last_check, - } - } - - fn hash_file, BH: BuildHasher>(path: P, build_hasher: &BH) -> io::Result { - let mut hasher = build_hasher.build_hasher(); - let mut file = fs::File::open(path)?; - let mut buf = [0; 512]; - loop { - let n = match file.read(&mut buf) { - Ok(0) => break, - Ok(len) => len, - Err(ref e) if e.kind() == ErrorKind::Interrupted => continue, - Err(e) => return Err(e), - }; - hasher.write(&buf[..n]); - } - Ok(hasher.finish()) - } - - pub fn detect_change(&self, other: &PathData) -> Option { - if self.mtime > other.mtime { - Some(EventKind::Modify(ModifyKind::Metadata( - MetadataKind::WriteTime, - ))) - } else if self.hash != other.hash { - Some(EventKind::Modify(ModifyKind::Data(DataChange::Any))) - } else { - None - } - } -} - impl PollWatcher { /// Create a new [PollWatcher], configured as needed. pub fn with_config( event_handler: F, config: PollWatcherConfig, - ) -> Result { - let mut p = PollWatcher { - event_handler: Arc::new(Mutex::new(event_handler)), - watches: Arc::new(Mutex::new(HashMap::new())), - open: Arc::new(AtomicBool::new(true)), + ) -> crate::Result { + let data_builder = DataBuilder::new(event_handler, config.compare_contents); + + let poll_watcher = PollWatcher { + watches: Default::default(), + data_builder: Arc::new(Mutex::new(data_builder)), + want_to_stop: Arc::new(AtomicBool::new(false)), delay: config.poll_interval, - compare_contents: config.compare_contents, }; - p.run(); - Ok(p) + + poll_watcher.run(); + + Ok(poll_watcher) } - fn run(&mut self) { - let watches = self.watches.clone(); - let open = self.open.clone(); + fn run(&self) { + let watches = Arc::clone(&self.watches); + let data_builder = Arc::clone(&self.data_builder); + let want_to_stop = Arc::clone(&self.want_to_stop); let delay = self.delay; - let build_hasher = self.compare_contents.then(RandomState::default); - let event_handler = self.event_handler.clone(); - let event_handler = move |res| emit_event(&event_handler, res); let _ = thread::Builder::new() .name("notify-rs poll loop".to_string()) .spawn(move || { - // In order of priority: - // TODO: handle metadata events - // TODO: handle renames - // TODO: DRY it up - loop { - if !open.load(Ordering::SeqCst) { + if want_to_stop.load(Ordering::SeqCst) { break; } - if let Ok(mut watches) = watches.lock() { - let current_time = Instant::now(); - - for ( - watch, - &mut WatchData { - is_recursive, - ref mut paths, - }, - ) in watches.iter_mut() - { - match fs::metadata(watch) { - Err(e) => { - let err = Err(Error::io(e).add_path(watch.clone())); - event_handler(err); - continue; - } - Ok(metadata) => { - // this is a file type watching point - if !metadata.is_dir() { - let path_data = PathData::collect( - watch, - &metadata, - build_hasher.as_ref(), - current_time, - ); - - // Update `path_data` for this watching point. In file - // type watching point, only has single one path need - // to update. - match paths.insert(watch.clone(), path_data.clone()) { - // `old_path_data` not exists, this is a new path - None => { - let kind = EventKind::Create(CreateKind::Any); - let ev = Event::new(kind).add_path(watch.clone()); - event_handler(Ok(ev)); - } - - // path already exists, need further check to see - // what's the difference. - Some(old_path_data) => { - if let Some(kind) = - path_data.detect_change(&old_path_data) - { - let ev = - Event::new(kind).add_path(watch.clone()); - event_handler(Ok(ev)); - } - } - } - - // this is a dir type watching point - } else { - let depth = - if is_recursive { usize::max_value() } else { 1 }; - for entry in WalkDir::new(watch) - .follow_links(true) - .max_depth(depth) - .into_iter() - .filter_map(|e| e.ok()) - { - let path = entry.path(); - - // TODO: duplicate logic, considering refactor following lines to a function. - match entry.metadata() { - Err(e) => { - let err = Error::io(e.into()) - .add_path(path.to_path_buf()); - event_handler(Err(err)); - } - Ok(m) => { - let path_data = PathData::collect( - path, - &m, - build_hasher.as_ref(), - current_time, - ); - match paths.insert( - path.to_path_buf(), - path_data.clone(), - ) { - None => { - let kind = - EventKind::Create(CreateKind::Any); - let ev = Event::new(kind) - .add_path(path.to_path_buf()); - event_handler(Ok(ev)); - } - Some(old_path_data) => { - if let Some(kind) = path_data - .detect_change(&old_path_data) - { - // TODO add new mtime as attr - let ev = Event::new(kind) - .add_path(path.to_path_buf()); - event_handler(Ok(ev)); - } - } - } - } - } - } - } - } - } - } - - // clear out all paths which not updated by this round. - for (_, &mut WatchData { ref mut paths, .. }) in watches.iter_mut() { - // find which paths should be removed in this watching point. - let mut removed = Vec::new(); - for (path, &PathData { last_check, .. }) in paths.iter() { - if last_check < current_time { - let ev = Event::new(EventKind::Remove(RemoveKind::Any)) - .add_path(path.clone()); - event_handler(Ok(ev)); - removed.push(path.clone()); - } - } - - // remove actually. - for path in removed { - (*paths).remove(&path); - } + // HINT: Make sure always lock in the same order to avoid deadlock. + // + // FIXME: inconsistent: some place mutex poison cause panic, + // some place just ignore. + if let (Ok(mut watches), Ok(mut data_builder)) = + (watches.lock(), data_builder.lock()) + { + data_builder.update_timestamp(); + + for watch_data in watches.values_mut() { + watch_data.rescan(&mut data_builder); } } + // QUESTION: `actual_delay == process_time + delay`. Is it intended to? + // + // If not, consider fix it to: + // + // ```rust + // let still_need_to_delay = delay.checked_sub(data_builder.now.elapsed()); + // if let Some(delay) = still_need_to_delay { + // thread::sleep(delay); + // } + // ``` thread::sleep(delay); } }); } - fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> { - let build_hasher = self.compare_contents.then(RandomState::default); - - if let Ok(mut watches) = self.watches.lock() { - let current_time = Instant::now(); - - let watch = path.to_owned(); - - match fs::metadata(path) { - Err(e) => { - let err = Error::io(e).add_path(watch); - emit_event(&self.event_handler, Err(err)); - } - Ok(metadata) => { - let mut paths = HashMap::new(); - - if !metadata.is_dir() { - let path_data = - PathData::collect(path, &metadata, build_hasher.as_ref(), current_time); - paths.insert(watch.clone(), path_data); - } else { - let depth = if recursive_mode.is_recursive() { - usize::max_value() - } else { - 1 - }; - for entry in WalkDir::new(watch.clone()) - .follow_links(true) - .max_depth(depth) - .into_iter() - .filter_map(|e| e.ok()) - { - let path = entry.path(); - - match entry.metadata() { - Err(e) => { - let err = Error::io(e.into()).add_path(path.to_path_buf()); - emit_event(&self.event_handler, Err(err)); - } - Ok(m) => { - let path_data = PathData::collect( - path, - &m, - build_hasher.as_ref(), - current_time, - ); - paths.insert(path.to_path_buf(), path_data); - } - } - } - } - - watches.insert( - watch, - WatchData { - is_recursive: recursive_mode.is_recursive(), - paths, - }, - ); - } + /// Watch a path location. + /// + /// QUESTION: this function never return an Error, is it as intend? + /// Please also consider the IO Error event problem. + fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) { + // HINT: Make sure always lock in the same order to avoid deadlock. + // + // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore. + if let (Ok(mut watches), Ok(mut data_builder)) = + (self.watches.lock(), self.data_builder.lock()) + { + data_builder.update_timestamp(); + + let watch_data = + data_builder.build_watch_data(path.to_path_buf(), recursive_mode.is_recursive()); + + // if create watch_data successful, add it to watching list. + if let Some(watch_data) = watch_data { + watches.insert(path.to_path_buf(), watch_data); } } - Ok(()) } - fn unwatch_inner(&mut self, path: &Path) -> Result<()> { - if (*self.watches).lock().unwrap().remove(path).is_some() { - Ok(()) - } else { - Err(Error::watch_not_found()) - } + /// Unwatch a path. + /// + /// Return `Err(_)` if given path has't be monitored. + fn unwatch_inner(&mut self, path: &Path) -> crate::Result<()> { + // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore. + self.watches + .lock() + .unwrap() + .remove(path) + .map(|_| ()) + .ok_or_else(|| crate::Error::watch_not_found()) } } @@ -387,15 +530,17 @@ impl Watcher for PollWatcher { /// /// The default poll frequency is 30 seconds. /// Use [with_config] to manually set the poll frequency. - fn new(event_handler: F) -> Result { + fn new(event_handler: F) -> crate::Result { Self::with_config(event_handler, PollWatcherConfig::default()) } - fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> { - self.watch_inner(path, recursive_mode) + fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> crate::Result<()> { + self.watch_inner(path, recursive_mode); + + Ok(()) } - fn unwatch(&mut self, path: &Path) -> Result<()> { + fn unwatch(&mut self, path: &Path) -> crate::Result<()> { self.unwatch_inner(path) } @@ -406,7 +551,7 @@ impl Watcher for PollWatcher { impl Drop for PollWatcher { fn drop(&mut self) { - self.open.store(false, Ordering::Relaxed); + self.want_to_stop.store(true, Ordering::Relaxed); } } From 574c7a5f2d2da738dc9ded5c101aee5683d54548 Mon Sep 17 00:00:00 2001 From: visig Date: Sun, 22 May 2022 13:41:12 +0800 Subject: [PATCH 2/2] PollWatcher: remove Mutex::lock() when emit event --- src/poll.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/poll.rs b/src/poll.rs index 3d9656ae..f9dd9ed1 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -23,13 +23,13 @@ mod data { }; use filetime::FileTime; use std::{ + cell::RefCell, collections::{hash_map::RandomState, HashMap}, fmt::{self, Debug}, fs::{self, File, Metadata}, hash::{BuildHasher, Hasher}, io::{self, Read}, path::{Path, PathBuf}, - sync::{Arc, Mutex}, time::Instant, }; use walkdir::WalkDir; @@ -360,21 +360,23 @@ mod data { } /// Thin wrapper for outer event handler, for easy to use. - struct EventEmitter(Arc>); + struct EventEmitter( + // Use `RefCell` to make sure `emit()` only need shared borrow of self (&self). + // Use `Box` to make sure EventEmitter is Sized. + Box>, + ); impl EventEmitter { fn new(event_handler: F) -> Self { - Self(Arc::new(Mutex::new(event_handler))) + Self(Box::new(RefCell::new(event_handler))) } /// Emit single event. fn emit(&self, event: crate::Result) { - // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore. - if let Ok(mut guard) = self.0.lock() { - guard.handle_event(event); - } + self.0.borrow_mut().handle_event(event); } + /// Emit event. fn emit_ok(&self, event: Event) { self.emit(Ok(event)) }