Skip to content

Commit

Permalink
add manual polling for PollWatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
0xpr03 committed Aug 16, 2023
1 parent 6e2f1f4 commit 2e0fb36
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 26 deletions.
4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ path = "watcher_kind.rs"
name = "pollwatcher_scan"
path = "pollwatcher_scan.rs"

[[example]]
name = "pollwatcher_manual"
path = "pollwatcher_manual.rs"

# specifically in its own sub folder
# to prevent cargo audit from complaining
#[[example]]
Expand Down
3 changes: 2 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ Examples for notify and the debouncers.

- **monitor_raw** basic example for using notify
- **async_monitor** example for using `futures::channel` to receive events in async code
- **poll_sysfs** example for observing linux `/sys` events using PollWatcher and the hasing mode
- **poll_sysfs** example for observing linux `/sys` events using PollWatcher and the hashing mode
- **watcher_kind** example for detecting the kind of watcher used and running specific configurations
- **hot_reload_tide** large example for async notify using the crates tide and async-std
- **pollwatcher_scan** example using `PollWatcher::with_initial_scan` to listen for files found during initial scanning
- **pollwatcher_manual** example using `PollWatcher::poll` without automatic polling for manual triggered polling

### Notify Debouncer Full (debouncer)

Expand Down
49 changes: 49 additions & 0 deletions examples/pollwatcher_manual.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use notify::{Config, PollWatcher, RecursiveMode, Watcher};
use std::path::Path;

// Example for the PollWatcher with manual polling.
// Call with cargo run -p examples --example pollwatcher_manual -- path/to/watch
fn main() {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();

let path = std::env::args()
.nth(1)
.expect("Argument 1 needs to be a path");

log::info!("Watching {path}");

if let Err(error) = watch(path) {
log::error!("Error: {error:?}");
}
}

fn watch<P: AsRef<Path>>(path: P) -> notify::Result<()> {
let (tx, rx) = std::sync::mpsc::channel();
// use the PollWatcher and disable automatic polling
let mut watcher = PollWatcher::new(
tx,
Config::default().with_manual_polling(),
)?;

// Add a path to be watched. All files and directories at that path and
// below will be monitored for changes.
watcher.watch(path.as_ref(), RecursiveMode::Recursive)?;

// run event receiver on a different thread, we want this one for user input
std::thread::spawn(move ||{for res in rx {
match res {
Ok(event) => println!("changed: {:?}", event),
Err(e) => println!("watch error: {:?}", e),
}
}});

// wait for any input and poll
loop {
println!("Press enter to poll for changes");
let mut buffer = String::new();
std::io::stdin().read_line(&mut buffer)?;
println!("polling..");
// manually poll for changes, received by the spawned thread
watcher.poll().unwrap();
}
}
36 changes: 30 additions & 6 deletions notify/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,30 +38,54 @@ impl RecursiveMode {
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
pub struct Config {
/// See [BackendConfig::with_poll_interval]
poll_interval: Duration,
poll_interval: Option<Duration>,

/// See [BackendConfig::with_compare_contents]
compare_contents: bool,
}

impl Config {
/// For [crate::PollWatcher]
/// For the [PollWatcher](crate::PollWatcher) backend.
///
/// Interval between each rescan attempt. This can be extremely expensive for large
/// Interval between each re-scan attempt. This can be extremely expensive for large
/// file trees so it is recommended to measure and tune accordingly.
///
/// The default poll frequency is 30 seconds.
///
/// This will enable automatic polling, overwriting [with_manual_polling](Config::with_manual_polling).
pub fn with_poll_interval(mut self, dur: Duration) -> Self {
self.poll_interval = dur;
// TODO: v7.0 break signature to option
self.poll_interval = Some(dur);
self
}

/// Returns current setting
#[deprecated(
since = "6.1.0",
note = "use poll_interval_v2 to account for disabled automatic polling"
)]
pub fn poll_interval(&self) -> Duration {
// TODO: v7.0 break signature to option
self.poll_interval.unwrap_or_default()
}

/// Returns current setting
pub fn poll_interval_v2(&self) -> Option<Duration> {
// TODO: v7.0 break signature to option
self.poll_interval
}

/// For [crate::PollWatcher]
/// For the [PollWatcher](crate::PollWatcher) backend.
///
/// Disable automatic polling. Requires calling [crate::PollWatcher::poll] manually.
///
/// This will disable automatic polling, overwriting [with_poll_interval](Config::with_poll_interval).
pub fn with_manual_polling(mut self) -> Self {
self.poll_interval = None;
self
}

/// For the [PollWatcher](crate::PollWatcher) backend.
///
/// Optional feature that will evaluate the contents of changed files to determine if
/// they have indeed changed using a fast hashing algorithm. This is especially important
Expand All @@ -85,7 +109,7 @@ impl Config {
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(30),
poll_interval: Some(Duration::from_secs(30)),
compare_contents: false,
}
}
Expand Down
45 changes: 26 additions & 19 deletions notify/src/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! Checks the `watch`ed paths periodically to detect changes. This implementation only uses
//! Rust stdlib APIs and should work on all of the platforms it supports.

use crate::{Config, EventHandler, RecursiveMode, Watcher};
use crate::{unbounded, Config, Error, EventHandler, Receiver, RecursiveMode, Sender, Watcher};
use std::{
collections::HashMap,
path::{Path, PathBuf},
Expand All @@ -22,7 +22,6 @@ pub type ScanEvent = crate::Result<PathBuf>;
/// Very much the same as [EventHandler], but including the Result.
///
/// See the full example for more information.
/// ```
pub trait ScanEventHandler: Send + 'static {
/// Handles an event.
fn handle_event(&mut self, event: ScanEvent);
Expand Down Expand Up @@ -477,7 +476,10 @@ pub struct PollWatcher {
watches: Arc<Mutex<HashMap<PathBuf, WatchData>>>,
data_builder: Arc<Mutex<DataBuilder>>,
want_to_stop: Arc<AtomicBool>,
delay: Duration,
/// channel to the poll loop
/// currently used only for manual polling
message_channel: Sender<()>,
delay: Option<Duration>,
}

impl PollWatcher {
Expand All @@ -486,6 +488,14 @@ impl PollWatcher {
Self::with_opt::<_, ()>(event_handler, config, None)
}

/// Actively poll for changes. Can be combined with a timeout of 0 to perform only manual polling.
pub fn poll(&self) -> crate::Result<()> {
self.message_channel
.send(())
.map_err(|_| Error::generic("failed to send poll message"))?;
Ok(())
}

/// Create a new [PollWatcher] with an scan event handler.
///
/// `scan_fallback` is called on the initial scan with all files seen by the pollwatcher.
Expand All @@ -497,7 +507,7 @@ impl PollWatcher {
Self::with_opt(event_handler, config, Some(scan_callback))
}

/// create a new pollwatcher with all options
/// create a new PollWatcher with all options
fn with_opt<F: EventHandler, G: ScanEventHandler>(
event_handler: F,
config: Config,
Expand All @@ -506,19 +516,22 @@ impl PollWatcher {
let data_builder =
DataBuilder::new(event_handler, config.compare_contents(), scan_callback);

let (tx, rx) = unbounded();

let poll_watcher = PollWatcher {
watches: Default::default(),
data_builder: Arc::new(Mutex::new(data_builder)),
want_to_stop: Arc::new(AtomicBool::new(false)),
delay: config.poll_interval(),
delay: config.poll_interval_v2(),
message_channel: tx,
};

poll_watcher.run();
poll_watcher.run(rx);

Ok(poll_watcher)
}

fn run(&self) {
fn run(&self, rx: Receiver<()>) {
let watches = Arc::clone(&self.watches);
let data_builder = Arc::clone(&self.data_builder);
let want_to_stop = Arc::clone(&self.want_to_stop);
Expand Down Expand Up @@ -546,18 +559,12 @@ impl PollWatcher {
watch_data.rescan(&mut data_builder);
}
}

// QUESTION: `actual_delay == process_time + delay`. Is it intended to?
//
// If not, consider fix it to:
//
// ```rust
// let still_need_to_delay = delay.checked_sub(data_builder.now.elapsed());
// if let Some(delay) = still_need_to_delay {
// thread::sleep(delay);
// }
// ```
thread::sleep(delay);
// TODO: v7.0 use delay - (Instant::now().saturating_duration_since(start))
if let Some(delay) = delay {
let _ = rx.recv_timeout(delay);
} else {
let _ = rx.recv();
}
}
});
}
Expand Down

0 comments on commit 2e0fb36

Please sign in to comment.