Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Commit

Permalink
Switch to misuse-resistant smart ctor (#1865)
Browse files Browse the repository at this point in the history
Use a simpler and less error-prone API for `DirectoryMonitor`.

- Make `new()` ctor async, set watch on construction
- Remove `start()`, validate that target path is a directory in `new()`
  • Loading branch information
ranweiler authored May 2, 2022
1 parent ebe5352 commit 30660c9
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 69 deletions.
53 changes: 27 additions & 26 deletions src/agent/onefuzz-agent/src/local/common.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
use crate::tasks::config::CommonConfig;
use crate::tasks::utils::parse_key_value;
use std::{
collections::HashMap,
env::current_dir,
path::{Path, PathBuf},
time::Duration,
};

use anyhow::Result;
use backoff::{future::retry, Error as BackoffError, ExponentialBackoff};
use clap::{App, Arg, ArgMatches};
use flume::Sender;
use onefuzz::{blob::url::BlobContainerUrl, monitor::DirectoryMonitor, syncdir::SyncedDir};
use path_absolutize::Absolutize;
use reqwest::Url;
use std::{
collections::HashMap,
env::current_dir,
path::{Path, PathBuf},
time::Duration,
};
use storage_queue::{local_queue::ChannelQueueClient, QueueClient};
use uuid::Uuid;

use crate::tasks::config::CommonConfig;
use crate::tasks::utils::parse_key_value;

pub const SETUP_DIR: &str = "setup_dir";
pub const INPUTS_DIR: &str = "inputs_dir";
pub const CRASHES_DIR: &str = "crashes_dir";
Expand Down Expand Up @@ -267,24 +270,10 @@ pub struct DirectoryMonitorQueue {

impl DirectoryMonitorQueue {
pub async fn start_monitoring(directory_path: impl AsRef<Path>) -> Result<Self> {
let directory_path = PathBuf::from(directory_path.as_ref());
let directory_path_clone = directory_path.clone();
let queue_client = storage_queue::QueueClient::Channel(
storage_queue::local_queue::ChannelQueueClient::new()?,
);
let queue = queue_client.clone();
let handle: tokio::task::JoinHandle<Result<()>> = tokio::spawn(async move {
let mut monitor = DirectoryMonitor::new(directory_path_clone.clone())?;
monitor.start().await?;

while let Some(file_path) = monitor.next_file().await? {
let file_url =
Url::from_file_path(file_path).map_err(|_| anyhow!("invalid file path"))?;
queue.enqueue(file_url).await?;
}

Ok(())
});
let directory_path = directory_path.as_ref().to_owned();
let queue_client = QueueClient::Channel(ChannelQueueClient::new()?);
let monitor_task = monitor_directory(queue_client.clone(), directory_path.clone());
let handle = tokio::spawn(monitor_task);

Ok(DirectoryMonitorQueue {
directory_path,
Expand All @@ -294,6 +283,18 @@ impl DirectoryMonitorQueue {
}
}

async fn monitor_directory(queue_client: QueueClient, directory: PathBuf) -> Result<()> {
let mut monitor = DirectoryMonitor::new(&directory).await?;

while let Some(file_path) = monitor.next_file().await? {
let file_url = Url::from_file_path(file_path).map_err(|_| anyhow!("invalid file path"))?;

queue_client.enqueue(file_url).await?;
}

Ok(())
}

pub async fn wait_for_dir(path: impl AsRef<Path>) -> Result<()> {
let op = || async {
if path.as_ref().exists() {
Expand Down
5 changes: 3 additions & 2 deletions src/agent/onefuzz-agent/src/tasks/report/crash_report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,13 @@ pub async fn monitor_reports(
return Ok(());
}

let mut monitor = DirectoryMonitor::new(base_dir)?;
monitor.start().await?;
let mut monitor = DirectoryMonitor::new(base_dir).await?;

while let Some(file) = monitor.next_file().await? {
let result = parse_report_file(file).await?;
result.save(unique_reports, reports, no_crash).await?;
}

Ok(())
}

Expand Down
3 changes: 1 addition & 2 deletions src/agent/onefuzz/examples/dir-monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ struct Opt {
async fn main() -> Result<()> {
let opt = Opt::from_args();

let mut monitor = DirectoryMonitor::new(opt.path)?;
monitor.start().await?;
let mut monitor = DirectoryMonitor::new(opt.path).await?;

while let Some(created) = monitor.next_file().await? {
println!("[create] {}", created.display());
Expand Down
43 changes: 20 additions & 23 deletions src/agent/onefuzz/src/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

use std::path::PathBuf;
use std::path::{Path, PathBuf};

use anyhow::{format_err, Result};
use notify::{Event, EventKind, Watcher};
Expand All @@ -10,22 +10,38 @@ use tokio::{
sync::mpsc::{unbounded_channel, UnboundedReceiver},
};

/// Watches a directory, and on file creation, emits the path to the file.
pub struct DirectoryMonitor {
dir: PathBuf,
notify_events: UnboundedReceiver<notify::Result<Event>>,
watcher: notify::RecommendedWatcher,
}

impl DirectoryMonitor {
pub fn new(dir: impl Into<PathBuf>) -> Result<Self> {
let dir = dir.into();
/// Create a new directory monitor.
///
/// The path `dir` must name a directory, not a file.
pub async fn new(dir: impl AsRef<Path>) -> Result<Self> {
use notify::RecursiveMode;

// Canonicalize so we can compare the watched dir to paths in the events.
let dir = fs::canonicalize(dir).await?;

// Make sure we are watching a directory.
//
// This check will pass for symlinks to directories.
if !fs::metadata(&dir).await?.is_dir() {
bail!("monitored path is not a directory: {}", dir.display());
}

let (sender, notify_events) = unbounded_channel();
let event_handler = move |event_or_err| {
// A send error only occurs when the channel is closed. No remedial
// action is needed (or possible), so ignore it.
let _ = sender.send(event_or_err);
};
let watcher = notify::recommended_watcher(event_handler)?;
let mut watcher = notify::recommended_watcher(event_handler)?;
watcher.watch(&dir, RecursiveMode::NonRecursive)?;

Ok(Self {
dir,
Expand All @@ -34,25 +50,6 @@ impl DirectoryMonitor {
})
}

pub async fn start(&mut self) -> Result<()> {
use notify::RecursiveMode;

// Canonicalize so we can compare the watched dir to paths in the events.
self.dir = fs::canonicalize(&self.dir).await?;

// Make sure we are watching a directory.
//
// This check will pass for symlinks to directories.
if !fs::metadata(&self.dir).await?.is_dir() {
bail!("monitored path is not a directory: {}", self.dir.display());
}

// Initialize the watcher.
self.watcher.watch(&self.dir, RecursiveMode::NonRecursive)?;

Ok(())
}

pub fn stop(&mut self) -> Result<()> {
self.watcher.unwatch(&self.dir)?;
Ok(())
Expand Down
26 changes: 12 additions & 14 deletions src/agent/onefuzz/src/monitor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ macro_rules! timed_test {
}

timed_test!(test_monitor_empty_path, async move {
let mut monitor = DirectoryMonitor::new("")?;
let monitor = DirectoryMonitor::new("").await;

assert!(monitor.start().await.is_err());
assert!(monitor.is_err());

Ok(())
});

timed_test!(test_monitor_nonexistent_path, async move {
let mut monitor = DirectoryMonitor::new("some-nonexistent-path")?;
let monitor = DirectoryMonitor::new("some-nonexistent-path").await;

assert!(monitor.start().await.is_err());
assert!(monitor.is_err());

Ok(())
});
Expand All @@ -44,18 +44,19 @@ timed_test!(test_monitor_file, async move {
let file_path = dir.path().join("some-file.txt");
tokio::fs::write(&file_path, "aaaaaa").await?;

let mut monitor = DirectoryMonitor::new(&file_path)?;
let monitor = DirectoryMonitor::new(&file_path).await;

assert!(monitor.start().await.is_err());
// Ctor must fail.
assert!(monitor.is_err());

Ok(())
});

timed_test!(test_monitor_dir, async move {
let dir = tempdir()?;
let mut monitor = DirectoryMonitor::new(dir.path())?;

assert!(monitor.start().await.is_ok());
// Ctor must succeed.
let mut monitor = DirectoryMonitor::new(dir.path()).await?;

let _ = monitor.stop();

Expand All @@ -76,9 +77,8 @@ timed_test!(test_monitor_dir_symlink, async move {
#[cfg(target_family = "windows")]
fs::symlink_dir(&child, &symlink).await?;

let mut monitor = DirectoryMonitor::new(&symlink)?;

assert!(monitor.start().await.is_ok());
// Ctor must succeed.
let mut monitor = DirectoryMonitor::new(&symlink).await?;

let _ = monitor.stop();

Expand All @@ -89,9 +89,7 @@ timed_test!(test_monitor_dir_create_files, async move {
use std::fs::canonicalize;

let dir = tempdir()?;
let mut monitor = DirectoryMonitor::new(dir.path())?;

assert!(monitor.start().await.is_ok());
let mut monitor = DirectoryMonitor::new(dir.path()).await?;

let file_a = dir.path().join("a.txt");
let file_b = dir.path().join("b.txt");
Expand Down
3 changes: 1 addition & 2 deletions src/agent/onefuzz/src/syncdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,7 @@ impl SyncedDir {
) -> Result<()> {
debug!("monitoring {}", path.display());

let mut monitor = DirectoryMonitor::new(path.clone())?;
monitor.start().await?;
let mut monitor = DirectoryMonitor::new(path.clone()).await?;

if let Some(path) = url.as_file_path() {
fs::create_dir_all(&path).await?;
Expand Down

0 comments on commit 30660c9

Please sign in to comment.