From 30660c9a63e9dddf74574929118052298f2234be Mon Sep 17 00:00:00 2001 From: Joe Ranweiler Date: Sun, 1 May 2022 20:58:43 -0700 Subject: [PATCH] Switch to misuse-resistant smart ctor (#1865) Use a simpler and less error-prone API for `DirectoryMonitor`. - Make `new()` ctor async, set watch on construction - Remove `start()`, validate that target path is a directory in `new()` --- src/agent/onefuzz-agent/src/local/common.rs | 53 ++++++++++--------- .../src/tasks/report/crash_report.rs | 5 +- src/agent/onefuzz/examples/dir-monitor.rs | 3 +- src/agent/onefuzz/src/monitor.rs | 43 +++++++-------- src/agent/onefuzz/src/monitor/tests.rs | 26 +++++---- src/agent/onefuzz/src/syncdir.rs | 3 +- 6 files changed, 64 insertions(+), 69 deletions(-) diff --git a/src/agent/onefuzz-agent/src/local/common.rs b/src/agent/onefuzz-agent/src/local/common.rs index 5c266cdf36..7472e404ca 100644 --- a/src/agent/onefuzz-agent/src/local/common.rs +++ b/src/agent/onefuzz-agent/src/local/common.rs @@ -1,5 +1,10 @@ -use crate::tasks::config::CommonConfig; -use crate::tasks::utils::parse_key_value; +use std::{ + collections::HashMap, + env::current_dir, + path::{Path, PathBuf}, + time::Duration, +}; + use anyhow::Result; use backoff::{future::retry, Error as BackoffError, ExponentialBackoff}; use clap::{App, Arg, ArgMatches}; @@ -7,14 +12,12 @@ use flume::Sender; use onefuzz::{blob::url::BlobContainerUrl, monitor::DirectoryMonitor, syncdir::SyncedDir}; use path_absolutize::Absolutize; use reqwest::Url; -use std::{ - collections::HashMap, - env::current_dir, - path::{Path, PathBuf}, - time::Duration, -}; +use storage_queue::{local_queue::ChannelQueueClient, QueueClient}; use uuid::Uuid; +use crate::tasks::config::CommonConfig; +use crate::tasks::utils::parse_key_value; + pub const SETUP_DIR: &str = "setup_dir"; pub const INPUTS_DIR: &str = "inputs_dir"; pub const CRASHES_DIR: &str = "crashes_dir"; @@ -267,24 +270,10 @@ pub struct DirectoryMonitorQueue { impl DirectoryMonitorQueue { pub async fn start_monitoring(directory_path: impl AsRef) -> Result { - let directory_path = PathBuf::from(directory_path.as_ref()); - let directory_path_clone = directory_path.clone(); - let queue_client = storage_queue::QueueClient::Channel( - storage_queue::local_queue::ChannelQueueClient::new()?, - ); - let queue = queue_client.clone(); - let handle: tokio::task::JoinHandle> = tokio::spawn(async move { - let mut monitor = DirectoryMonitor::new(directory_path_clone.clone())?; - monitor.start().await?; - - while let Some(file_path) = monitor.next_file().await? { - let file_url = - Url::from_file_path(file_path).map_err(|_| anyhow!("invalid file path"))?; - queue.enqueue(file_url).await?; - } - - Ok(()) - }); + let directory_path = directory_path.as_ref().to_owned(); + let queue_client = QueueClient::Channel(ChannelQueueClient::new()?); + let monitor_task = monitor_directory(queue_client.clone(), directory_path.clone()); + let handle = tokio::spawn(monitor_task); Ok(DirectoryMonitorQueue { directory_path, @@ -294,6 +283,18 @@ impl DirectoryMonitorQueue { } } +async fn monitor_directory(queue_client: QueueClient, directory: PathBuf) -> Result<()> { + let mut monitor = DirectoryMonitor::new(&directory).await?; + + while let Some(file_path) = monitor.next_file().await? { + let file_url = Url::from_file_path(file_path).map_err(|_| anyhow!("invalid file path"))?; + + queue_client.enqueue(file_url).await?; + } + + Ok(()) +} + pub async fn wait_for_dir(path: impl AsRef) -> Result<()> { let op = || async { if path.as_ref().exists() { diff --git a/src/agent/onefuzz-agent/src/tasks/report/crash_report.rs b/src/agent/onefuzz-agent/src/tasks/report/crash_report.rs index 6d7c180e5f..ee7b1c6e76 100644 --- a/src/agent/onefuzz-agent/src/tasks/report/crash_report.rs +++ b/src/agent/onefuzz-agent/src/tasks/report/crash_report.rs @@ -311,12 +311,13 @@ pub async fn monitor_reports( return Ok(()); } - let mut monitor = DirectoryMonitor::new(base_dir)?; - monitor.start().await?; + let mut monitor = DirectoryMonitor::new(base_dir).await?; + while let Some(file) = monitor.next_file().await? { let result = parse_report_file(file).await?; result.save(unique_reports, reports, no_crash).await?; } + Ok(()) } diff --git a/src/agent/onefuzz/examples/dir-monitor.rs b/src/agent/onefuzz/examples/dir-monitor.rs index 9d54ff1024..97d5a836b4 100644 --- a/src/agent/onefuzz/examples/dir-monitor.rs +++ b/src/agent/onefuzz/examples/dir-monitor.rs @@ -15,8 +15,7 @@ struct Opt { async fn main() -> Result<()> { let opt = Opt::from_args(); - let mut monitor = DirectoryMonitor::new(opt.path)?; - monitor.start().await?; + let mut monitor = DirectoryMonitor::new(opt.path).await?; while let Some(created) = monitor.next_file().await? { println!("[create] {}", created.display()); diff --git a/src/agent/onefuzz/src/monitor.rs b/src/agent/onefuzz/src/monitor.rs index eb5f96bb88..fa38d09a1d 100644 --- a/src/agent/onefuzz/src/monitor.rs +++ b/src/agent/onefuzz/src/monitor.rs @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use anyhow::{format_err, Result}; use notify::{Event, EventKind, Watcher}; @@ -10,6 +10,7 @@ use tokio::{ sync::mpsc::{unbounded_channel, UnboundedReceiver}, }; +/// Watches a directory, and on file creation, emits the path to the file. pub struct DirectoryMonitor { dir: PathBuf, notify_events: UnboundedReceiver>, @@ -17,15 +18,30 @@ pub struct DirectoryMonitor { } impl DirectoryMonitor { - pub fn new(dir: impl Into) -> Result { - let dir = dir.into(); + /// Create a new directory monitor. + /// + /// The path `dir` must name a directory, not a file. + pub async fn new(dir: impl AsRef) -> Result { + use notify::RecursiveMode; + + // Canonicalize so we can compare the watched dir to paths in the events. + let dir = fs::canonicalize(dir).await?; + + // Make sure we are watching a directory. + // + // This check will pass for symlinks to directories. + if !fs::metadata(&dir).await?.is_dir() { + bail!("monitored path is not a directory: {}", dir.display()); + } + let (sender, notify_events) = unbounded_channel(); let event_handler = move |event_or_err| { // A send error only occurs when the channel is closed. No remedial // action is needed (or possible), so ignore it. let _ = sender.send(event_or_err); }; - let watcher = notify::recommended_watcher(event_handler)?; + let mut watcher = notify::recommended_watcher(event_handler)?; + watcher.watch(&dir, RecursiveMode::NonRecursive)?; Ok(Self { dir, @@ -34,25 +50,6 @@ impl DirectoryMonitor { }) } - pub async fn start(&mut self) -> Result<()> { - use notify::RecursiveMode; - - // Canonicalize so we can compare the watched dir to paths in the events. - self.dir = fs::canonicalize(&self.dir).await?; - - // Make sure we are watching a directory. - // - // This check will pass for symlinks to directories. - if !fs::metadata(&self.dir).await?.is_dir() { - bail!("monitored path is not a directory: {}", self.dir.display()); - } - - // Initialize the watcher. - self.watcher.watch(&self.dir, RecursiveMode::NonRecursive)?; - - Ok(()) - } - pub fn stop(&mut self) -> Result<()> { self.watcher.unwatch(&self.dir)?; Ok(()) diff --git a/src/agent/onefuzz/src/monitor/tests.rs b/src/agent/onefuzz/src/monitor/tests.rs index 5d1594cbe9..1021ddf965 100644 --- a/src/agent/onefuzz/src/monitor/tests.rs +++ b/src/agent/onefuzz/src/monitor/tests.rs @@ -22,17 +22,17 @@ macro_rules! timed_test { } timed_test!(test_monitor_empty_path, async move { - let mut monitor = DirectoryMonitor::new("")?; + let monitor = DirectoryMonitor::new("").await; - assert!(monitor.start().await.is_err()); + assert!(monitor.is_err()); Ok(()) }); timed_test!(test_monitor_nonexistent_path, async move { - let mut monitor = DirectoryMonitor::new("some-nonexistent-path")?; + let monitor = DirectoryMonitor::new("some-nonexistent-path").await; - assert!(monitor.start().await.is_err()); + assert!(monitor.is_err()); Ok(()) }); @@ -44,18 +44,19 @@ timed_test!(test_monitor_file, async move { let file_path = dir.path().join("some-file.txt"); tokio::fs::write(&file_path, "aaaaaa").await?; - let mut monitor = DirectoryMonitor::new(&file_path)?; + let monitor = DirectoryMonitor::new(&file_path).await; - assert!(monitor.start().await.is_err()); + // Ctor must fail. + assert!(monitor.is_err()); Ok(()) }); timed_test!(test_monitor_dir, async move { let dir = tempdir()?; - let mut monitor = DirectoryMonitor::new(dir.path())?; - assert!(monitor.start().await.is_ok()); + // Ctor must succeed. + let mut monitor = DirectoryMonitor::new(dir.path()).await?; let _ = monitor.stop(); @@ -76,9 +77,8 @@ timed_test!(test_monitor_dir_symlink, async move { #[cfg(target_family = "windows")] fs::symlink_dir(&child, &symlink).await?; - let mut monitor = DirectoryMonitor::new(&symlink)?; - - assert!(monitor.start().await.is_ok()); + // Ctor must succeed. + let mut monitor = DirectoryMonitor::new(&symlink).await?; let _ = monitor.stop(); @@ -89,9 +89,7 @@ timed_test!(test_monitor_dir_create_files, async move { use std::fs::canonicalize; let dir = tempdir()?; - let mut monitor = DirectoryMonitor::new(dir.path())?; - - assert!(monitor.start().await.is_ok()); + let mut monitor = DirectoryMonitor::new(dir.path()).await?; let file_a = dir.path().join("a.txt"); let file_b = dir.path().join("b.txt"); diff --git a/src/agent/onefuzz/src/syncdir.rs b/src/agent/onefuzz/src/syncdir.rs index 90540c0288..621e5070ad 100644 --- a/src/agent/onefuzz/src/syncdir.rs +++ b/src/agent/onefuzz/src/syncdir.rs @@ -225,8 +225,7 @@ impl SyncedDir { ) -> Result<()> { debug!("monitoring {}", path.display()); - let mut monitor = DirectoryMonitor::new(path.clone())?; - monitor.start().await?; + let mut monitor = DirectoryMonitor::new(path.clone()).await?; if let Some(path) = url.as_file_path() { fs::create_dir_all(&path).await?;