Skip to content

Commit

Permalink
add manual polling for PollWatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
0xpr03 committed Aug 12, 2023
1 parent 22f84ca commit dfe3272
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 22 deletions.
24 changes: 21 additions & 3 deletions notify/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl RecursiveMode {
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
pub struct Config {
/// See [BackendConfig::with_poll_interval]
poll_interval: Duration,
poll_interval: Option<Duration>,

/// See [BackendConfig::with_compare_contents]
compare_contents: bool,
Expand All @@ -52,15 +52,33 @@ impl Config {
///
/// The default poll frequency is 30 seconds.
pub fn with_poll_interval(mut self, dur: Duration) -> Self {
self.poll_interval = dur;
// TODO: v7.0 break signature to option
self.poll_interval = Some(dur);
self
}

/// Returns current setting
#[deprecated(
since = "6.1.0",
note = "use poll_interval_v2 to account for disabled automatic polling"
)]
pub fn poll_interval(&self) -> Duration {
// TODO: v7.0 break signature to option
self.poll_interval.unwrap_or_default()
}

/// Returns current setting
pub fn poll_interval_v2(&self) -> Option<Duration> {
// TODO: v7.0 break signature to option
self.poll_interval
}

/// Disable automatic polling. Requires calling [crate::PollWatcher::poll] manually.
pub fn with_manual_polling(mut self) -> Self {
self.poll_interval = None;
self
}

/// For [crate::PollWatcher]
///
/// Optional feature that will evaluate the contents of changed files to determine if
Expand All @@ -85,7 +103,7 @@ impl Config {
impl Default for Config {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(30),
poll_interval: Some(Duration::from_secs(5)),
compare_contents: false,
}
}
Expand Down
45 changes: 26 additions & 19 deletions notify/src/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! Checks the `watch`ed paths periodically to detect changes. This implementation only uses
//! Rust stdlib APIs and should work on all of the platforms it supports.

use crate::{Config, EventHandler, RecursiveMode, Watcher};
use crate::{unbounded, Config, Error, EventHandler, Receiver, RecursiveMode, Sender, Watcher};
use std::{
collections::HashMap,
path::{Path, PathBuf},
Expand All @@ -22,7 +22,6 @@ pub type ScanEvent = crate::Result<PathBuf>;
/// Very much the same as [EventHandler], but including the Result.
///
/// See the full example for more information.
/// ```
pub trait ScanEventHandler: Send + 'static {
/// Handles an event.
fn handle_event(&mut self, event: ScanEvent);
Expand Down Expand Up @@ -477,7 +476,10 @@ pub struct PollWatcher {
watches: Arc<Mutex<HashMap<PathBuf, WatchData>>>,
data_builder: Arc<Mutex<DataBuilder>>,
want_to_stop: Arc<AtomicBool>,
delay: Duration,
/// channel to the poll loop
/// currently used only for active polling
message_channel: Sender<()>,
delay: Option<Duration>,
}

impl PollWatcher {
Expand All @@ -486,6 +488,14 @@ impl PollWatcher {
Self::with_opt::<_, ()>(event_handler, config, None)
}

/// Actively poll for changes. Can be combined with a timeout of 0 to perform only manual polling.
pub fn poll(&self) -> crate::Result<()> {
self.message_channel
.send(())
.map_err(|_| Error::generic("failed to send poll message"))?;
Ok(())
}

/// Create a new [PollWatcher] with an scan event handler.
///
/// `scan_fallback` is called on the initial scan with all files seen by the pollwatcher.
Expand All @@ -497,7 +507,7 @@ impl PollWatcher {
Self::with_opt(event_handler, config, Some(scan_callback))
}

/// create a new pollwatcher with all options
/// create a new PollWatcher with all options
fn with_opt<F: EventHandler, G: ScanEventHandler>(
event_handler: F,
config: Config,
Expand All @@ -506,19 +516,22 @@ impl PollWatcher {
let data_builder =
DataBuilder::new(event_handler, config.compare_contents(), scan_callback);

let (tx, rx) = unbounded();

let poll_watcher = PollWatcher {
watches: Default::default(),
data_builder: Arc::new(Mutex::new(data_builder)),
want_to_stop: Arc::new(AtomicBool::new(false)),
delay: config.poll_interval(),
delay: config.poll_interval_v2(),
message_channel: tx,
};

poll_watcher.run();
poll_watcher.run(rx);

Ok(poll_watcher)
}

fn run(&self) {
fn run(&self, rx: Receiver<()>) {
let watches = Arc::clone(&self.watches);
let data_builder = Arc::clone(&self.data_builder);
let want_to_stop = Arc::clone(&self.want_to_stop);
Expand Down Expand Up @@ -546,18 +559,12 @@ impl PollWatcher {
watch_data.rescan(&mut data_builder);
}
}

// QUESTION: `actual_delay == process_time + delay`. Is it intended to?
//
// If not, consider fix it to:
//
// ```rust
// let still_need_to_delay = delay.checked_sub(data_builder.now.elapsed());
// if let Some(delay) = still_need_to_delay {
// thread::sleep(delay);
// }
// ```
thread::sleep(delay);
// TODO: v7.0 use delay - (Instant::now().saturating_duration_since(start))
if let Some(delay) = delay {
let _ = rx.recv_timeout(delay);
} else {
let _ = rx.recv();
}
}
});
}
Expand Down

0 comments on commit dfe3272

Please sign in to comment.