Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Commit

Permalink
Upgrade to latest stable notify preview (#1849)
Browse files Browse the repository at this point in the history
  • Loading branch information
ranweiler authored Apr 27, 2022
1 parent 44059f2 commit 5d6f5f2
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 231 deletions.
241 changes: 80 additions & 161 deletions src/agent/Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions src/agent/onefuzz-agent/src/local/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,10 @@ impl DirectoryMonitorQueue {
);
let queue = queue_client.clone();
let handle: tokio::task::JoinHandle<Result<()>> = tokio::spawn(async move {
let mut monitor = DirectoryMonitor::new(directory_path_clone.clone());
monitor.start()?;
let mut monitor = DirectoryMonitor::new(directory_path_clone.clone())?;
monitor.start().await?;

while let Some(file_path) = monitor.next_file().await {
while let Some(file_path) = monitor.next_file().await? {
let file_url =
Url::from_file_path(file_path).map_err(|_| anyhow!("invalid file path"))?;
queue.enqueue(file_url).await?;
Expand Down
6 changes: 3 additions & 3 deletions src/agent/onefuzz-agent/src/tasks/report/crash_report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,9 @@ pub async fn monitor_reports(
return Ok(());
}

let mut monitor = DirectoryMonitor::new(base_dir);
monitor.start()?;
while let Some(file) = monitor.next_file().await {
let mut monitor = DirectoryMonitor::new(base_dir)?;
monitor.start().await?;
while let Some(file) = monitor.next_file().await? {
let result = parse_report_file(file).await?;
result.save(unique_reports, reports, no_crash).await?;
}
Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ futures-util = "0.3"
hex = "0.4"
lazy_static = "1.4"
log = "0.4"
notify = "4.0"
notify = "5.0.0-pre.14"
regex = "1.5.5"
reqwest = { version = "0.11", features = ["json", "stream", "rustls-tls"], default-features=false }
sha2 = "0.10.1"
Expand Down
28 changes: 28 additions & 0 deletions src/agent/onefuzz/examples/dir-monitor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

use anyhow::Result;
use onefuzz::monitor::DirectoryMonitor;
use structopt::StructOpt;

#[derive(Debug, StructOpt)]
struct Opt {
#[structopt(short, long)]
path: String,
}

#[tokio::main]
async fn main() -> Result<()> {
let opt = Opt::from_args();

let mut monitor = DirectoryMonitor::new(opt.path)?;
monitor.start().await?;

while let Some(created) = monitor.next_file().await? {
println!("[create] {}", created.display());
}

println!("done!");

Ok(())
}
123 changes: 64 additions & 59 deletions src/agent/onefuzz/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,105 +2,110 @@
// Licensed under the MIT License.

use std::path::PathBuf;
use std::sync::{self, mpsc::Receiver as SyncReceiver};
use std::time::Duration;

use anyhow::Result;
use notify::{DebouncedEvent, Watcher};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::task::{self, JoinHandle};
use anyhow::{format_err, Result};
use notify::{Event, EventKind, Watcher};
use tokio::{
fs,
sync::mpsc::{unbounded_channel, UnboundedReceiver},
};

pub struct DirectoryMonitor {
dir: PathBuf,
notify_events: UnboundedReceiver<DebouncedEvent>,
notify_events: UnboundedReceiver<notify::Result<Event>>,
watcher: notify::RecommendedWatcher,
}

impl DirectoryMonitor {
pub fn new(dir: impl Into<PathBuf>) -> Self {
pub fn new(dir: impl Into<PathBuf>) -> Result<Self> {
let dir = dir.into();
let (notify_sender, notify_receiver) = sync::mpsc::channel();
let delay = Duration::from_millis(100);
let watcher = notify::watcher(notify_sender, delay).unwrap();

// We can drop the thread handle, and it will continue to run until it
// errors or we drop the async receiver.
let (notify_events, _handle) = into_async(notify_receiver);

Self {
let (sender, notify_events) = unbounded_channel();
let event_handler = move |event_or_err| {
// A send error only occurs when the channel is closed. No remedial
// action is needed (or possible), so ignore it.
let _ = sender.send(event_or_err);
};
let watcher = notify::recommended_watcher(event_handler)?;

Ok(Self {
dir,
notify_events,
watcher,
}
})
}

pub fn start(&mut self) -> Result<()> {
pub async fn start(&mut self) -> Result<()> {
use notify::RecursiveMode;

// Canonicalize so we can compare the watched dir to paths in the events.
self.dir = std::fs::canonicalize(&self.dir)?;
self.dir = fs::canonicalize(&self.dir).await?;

// Initialize the watcher.
self.watcher.watch(&self.dir, RecursiveMode::NonRecursive)?;

Ok(())
}

pub fn stop(&mut self) -> Result<()> {
self.watcher.unwatch(self.dir.clone())?;
self.watcher.unwatch(&self.dir)?;
Ok(())
}

pub async fn next_file(&mut self) -> Option<PathBuf> {
pub async fn next_file(&mut self) -> Result<Option<PathBuf>> {
loop {
let event = self.notify_events.recv().await;
let event = match self.notify_events.recv().await {
Some(Ok(event)) => event,
Some(Err(err)) => {
// A low-level watch error has occurred. Treat as fatal.
warn!(
"error watching for new files. path = {}, error = {}",
self.dir.display(),
err
);

// Make sure we try to stop our `Watcher` if we return early.
let _ = self.stop();
return Ok(None);
}
None => {
// Make sure we try to stop our `Watcher` if we return early.
let _ = self.stop();
return Ok(None);
}
};

if event.is_none() {
// Make sure we stop our `Watcher` if we return early.
let _ = self.stop();
}
match event.kind {
EventKind::Create(..) => {
let path = event
.paths
.get(0)
.ok_or_else(|| format_err!("missing path for file create event"))?
.clone();

match event? {
DebouncedEvent::Create(path) => {
return Some(path);
return Ok(Some(path));
}
DebouncedEvent::Remove(path) => {
if path == self.dir {
EventKind::Remove(..) => {
let path = event
.paths
.get(0)
.ok_or_else(|| format_err!("missing path for file remove event"))?;

if path == &self.dir {
// The directory we were watching was removed; we're done.
let _ = self.stop();
return None;
return Ok(None);
} else {
// Some file _inside_ the watched directory was removed. Ignore.
}
}
_event => {
_event_kind => {
// Other filesystem event. Ignore.
}
}
}
}
}

/// Convert a `Receiver` from a `std::sync::mpsc` channel into an async receiver.
///
/// The returned `JoinHandle` does _not_ need to be held by callers. The associated task
/// will continue to run (detached) if dropped.
fn into_async<T: Send + 'static>(
sync_receiver: SyncReceiver<T>,
) -> (UnboundedReceiver<T>, JoinHandle<()>) {
let (sender, receiver) = unbounded_channel();

let handle = task::spawn_blocking(move || {
while let Ok(msg) = sync_receiver.recv() {
if sender.send(msg).is_err() {
// The async receiver is closed. We can't do anything else, so
// drop this message (and the sync receiver).
break;
}
}

// We'll never receive any more events.
//
// Drop our `Receiver` and hang up.
});

(receiver, handle)
}
#[cfg(not(target_os = "macos"))]
#[cfg(test)]
mod tests;
105 changes: 105 additions & 0 deletions src/agent/onefuzz/src/monitor/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

use std::time::Duration;

use anyhow::Result;
use tempfile::tempdir;
use tokio::fs;

use crate::monitor::DirectoryMonitor;

const TEST_TIMEOUT: Duration = Duration::from_millis(200);

macro_rules! timed_test {
($test_name: ident, $future: expr) => {
#[tokio::test]
async fn $test_name() -> Result<()> {
let result = tokio::time::timeout(TEST_TIMEOUT, $future).await;
result.map_err(|_| anyhow::anyhow!("test timed out after {:?}", TEST_TIMEOUT))?
}
};
}

timed_test!(test_monitor_empty_path, async move {
let mut monitor = DirectoryMonitor::new("")?;

assert!(monitor.start().await.is_err());

Ok(())
});

timed_test!(test_monitor_nonexistent_path, async move {
let mut monitor = DirectoryMonitor::new("some-nonexistent-path")?;

assert!(monitor.start().await.is_err());

Ok(())
});

timed_test!(test_monitor_dir, async move {
let dir = tempdir()?;
let mut monitor = DirectoryMonitor::new(dir.path())?;

assert!(monitor.start().await.is_ok());

let _ = monitor.stop();

Ok(())
});

timed_test!(test_monitor_dir_symlink, async move {
let parent = tempdir()?;

let child = parent.path().join("child");
fs::create_dir(&child).await?;

let symlink = parent.path().join("link-to-child");

#[cfg(target_family = "unix")]
fs::symlink(&child, &symlink).await?;

#[cfg(target_family = "windows")]
fs::symlink_dir(&child, &symlink).await?;

let mut monitor = DirectoryMonitor::new(&symlink)?;

assert!(monitor.start().await.is_ok());

let _ = monitor.stop();

Ok(())
});

timed_test!(test_monitor_dir_create_files, async move {
use std::fs::canonicalize;

let dir = tempdir()?;
let mut monitor = DirectoryMonitor::new(dir.path())?;

assert!(monitor.start().await.is_ok());

let file_a = dir.path().join("a.txt");
let file_b = dir.path().join("b.txt");
let file_c = dir.path().join("c.txt");

fs::write(&file_a, "aaa").await?;
fs::write(&file_b, "bbb").await?;
fs::write(&file_c, "ccc").await?;

assert_eq!(monitor.next_file().await?, Some(canonicalize(&file_a)?));
assert_eq!(monitor.next_file().await?, Some(canonicalize(&file_b)?));
assert_eq!(monitor.next_file().await?, Some(canonicalize(&file_c)?));

// TODO: on Windows, `notify` doesn't provide an event for the removal of a
// watched directory, so we can't proactively close our channel.
#[cfg(not(target_os = "windows"))]
{
dir.close()?;
assert_eq!(monitor.next_file().await?, None);
}

let _ = monitor.stop();

Ok(())
});
8 changes: 4 additions & 4 deletions src/agent/onefuzz/src/syncdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,13 @@ impl SyncedDir {
) -> Result<()> {
debug!("monitoring {}", path.display());

let mut monitor = DirectoryMonitor::new(path.clone());
monitor.start()?;
let mut monitor = DirectoryMonitor::new(path.clone())?;
monitor.start().await?;

if let Some(path) = url.as_file_path() {
fs::create_dir_all(&path).await?;

while let Some(item) = monitor.next_file().await {
while let Some(item) = monitor.next_file().await? {
let file_name = item
.file_name()
.ok_or_else(|| anyhow!("invalid file path"))?;
Expand Down Expand Up @@ -267,7 +267,7 @@ impl SyncedDir {
} else {
let mut uploader = BlobUploader::new(url.url()?);

while let Some(item) = monitor.next_file().await {
while let Some(item) = monitor.next_file().await? {
let file_name = item
.file_name()
.ok_or_else(|| anyhow!("invalid file path"))?;
Expand Down

0 comments on commit 5d6f5f2

Please sign in to comment.