diff --git a/src/agent/Cargo.lock b/src/agent/Cargo.lock index 1040be270b..912254364e 100644 --- a/src/agent/Cargo.lock +++ b/src/agent/Cargo.lock @@ -38,7 +38,7 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" dependencies = [ - "winapi 0.3.9", + "winapi", ] [[package]] @@ -122,7 +122,7 @@ checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ "hermit-abi", "libc", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -419,7 +419,7 @@ dependencies = [ "num-traits", "serde", "time", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -511,7 +511,7 @@ dependencies = [ "symbolic", "uuid", "win-util", - "winapi 0.3.9", + "winapi", "xml-rs", ] @@ -609,7 +609,7 @@ dependencies = [ "parking_lot", "signal-hook", "signal-hook-mio", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -625,7 +625,7 @@ dependencies = [ "parking_lot", "signal-hook", "signal-hook-mio", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -634,7 +634,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a6966607622438301997d3dac0d2f6e9a90c68bb6bc1785ea98456ab93c0507" dependencies = [ - "winapi 0.3.9", + "winapi", ] [[package]] @@ -643,7 +643,7 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2ae1b35a484aa10e07fe0638d02301c5ad24de82d310ccbd2f3693da5f09bf1c" dependencies = [ - "winapi 0.3.9", + "winapi", ] [[package]] @@ -672,7 +672,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a19c6cedffdc8c03a3346d723eb20bd85a13362bb96dc2ac000842c6381ec7bf" dependencies = [ "nix", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -702,7 +702,7 @@ dependencies = [ "rand 0.8.4", "serde", "win-util", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -791,7 +791,7 @@ dependencies = [ "regex", "structopt", "thiserror", - "winapi 0.3.9", + "winapi", "winreg 0.10.1", ] @@ -854,7 +854,7 @@ checksum = "fa68f2fb9cae9d37c9b2b3584aba698a2e97f72d7aef7b9f7aa71d8b54ce46fe" dependencies = [ "errno-dragonfly", "libc", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -897,7 +897,7 @@ dependencies = [ "cfg-if 1.0.0", "libc", "redox_syscall", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -989,41 +989,15 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "fsevent" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ab7d1bd1bd33cc98b0889831b72da23c0aa4df9cec7e0702f46ecea04b35db6" -dependencies = [ - "bitflags", - "fsevent-sys", -] - [[package]] name = "fsevent-sys" -version = "2.0.1" +version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f41b048a94555da0f42f1d632e2e19510084fb8e303b0daa2816e733fb3644a0" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" dependencies = [ "libc", ] -[[package]] -name = "fuchsia-zircon" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" -dependencies = [ - "bitflags", - "fuchsia-zircon-sys", -] - -[[package]] -name = "fuchsia-zircon-sys" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" - [[package]] name = "funty" version = "1.1.0" @@ -1264,7 +1238,7 @@ checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867" dependencies = [ "libc", "match_cfg", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -1429,9 +1403,9 @@ checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac" [[package]] name = "inotify" -version = "0.7.1" +version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4816c66d2c8ae673df83366c18341538f234a26d65a9ecea5c348b453ac1d02f" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" dependencies = [ "bitflags", "inotify-sys", @@ -1462,7 +1436,7 @@ dependencies = [ "rayon", "sha2 0.10.1", "win-util", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -1474,15 +1448,6 @@ dependencies = [ "cfg-if 1.0.0", ] -[[package]] -name = "iovec" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" -dependencies = [ - "libc", -] - [[package]] name = "ipnet" version = "2.3.1" @@ -1517,13 +1482,23 @@ dependencies = [ ] [[package]] -name = "kernel32-sys" -version = "0.2.2" +name = "kqueue" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" +checksum = "97caf428b83f7c86809b7450722cd1f2b1fc7fb23aa7b9dee7e72ed14d048352" dependencies = [ - "winapi 0.2.8", - "winapi-build", + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587" +dependencies = [ + "bitflags", + "libc", ] [[package]] @@ -1579,7 +1554,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "afe203d669ec979b7128619bae5a63b7b42e9203c1b29146079ee05e2f604b52" dependencies = [ "cfg-if 1.0.0", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -1650,7 +1625,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" dependencies = [ "libc", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -1693,25 +1668,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "mio" -version = "0.6.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4afd66f5b91bf2a3bc13fad0e21caedac168ca4c707504e75585648ae80e4cc4" -dependencies = [ - "cfg-if 0.1.10", - "fuchsia-zircon", - "fuchsia-zircon-sys", - "iovec", - "kernel32-sys", - "libc", - "log", - "miow 0.2.2", - "net2", - "slab", - "winapi 0.2.8", -] - [[package]] name = "mio" version = "0.7.13" @@ -1720,33 +1676,23 @@ checksum = "8c2bdb6314ec10835cd3293dd268473a835c02b7b352e788be788b3c6ca6bb16" dependencies = [ "libc", "log", - "miow 0.3.7", + "miow", "ntapi", - "winapi 0.3.9", + "winapi", ] [[package]] -name = "mio-extras" -version = "2.0.6" +name = "mio" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52403fe290012ce777c4626790c8951324a2b9e3316b3143779c72b029742f19" +checksum = "52da4364ffb0e4fe33a9841a98a3f3014fb964045ce4f7a45a398243c8d6b0c9" dependencies = [ - "lazycell", + "libc", "log", - "mio 0.6.23", - "slab", -] - -[[package]] -name = "miow" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebd808424166322d4a38da87083bfddd3ac4c131334ed55856112eb06d46944d" -dependencies = [ - "kernel32-sys", - "net2", - "winapi 0.2.8", - "ws2_32-sys", + "miow", + "ntapi", + "wasi 0.11.0+wasi-snapshot-preview1", + "winapi", ] [[package]] @@ -1755,7 +1701,7 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" dependencies = [ - "winapi 0.3.9", + "winapi", ] [[package]] @@ -1794,17 +1740,6 @@ dependencies = [ "tempfile", ] -[[package]] -name = "net2" -version = "0.2.37" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "391630d12b68002ae1e25e8f974306474966550ad82dac6886fb8910c19568ae" -dependencies = [ - "cfg-if 0.1.10", - "libc", - "winapi 0.3.9", -] - [[package]] name = "new_debug_unreachable" version = "1.0.4" @@ -1873,20 +1808,20 @@ dependencies = [ [[package]] name = "notify" -version = "4.0.17" +version = "5.0.0-pre.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae03c8c853dba7bfd23e571ff0cff7bc9dceb40a4cd684cd1681824183f45257" +checksum = "d13c22db70a63592e098fb51735bab36646821e6389a0ba171f3549facdf0b74" dependencies = [ "bitflags", + "crossbeam-channel", "filetime", - "fsevent", "fsevent-sys", "inotify", + "kqueue", "libc", - "mio 0.6.23", - "mio-extras", + "mio 0.8.2", "walkdir", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -1895,7 +1830,7 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" dependencies = [ - "winapi 0.3.9", + "winapi", ] [[package]] @@ -2010,7 +1945,7 @@ dependencies = [ "url-escape", "urlparse", "uuid", - "winapi 0.3.9", + "winapi", "winreg 0.10.1", ] @@ -2083,7 +2018,7 @@ dependencies = [ "url", "users", "uuid", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -2146,7 +2081,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e3492ebca331b895fe23ed427dce2013d9b2e00c45964f12040b0db38b8ab27" dependencies = [ "libc", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -2155,7 +2090,7 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53cdc5b785b7a58c5aad8216b3dfa114df64b0b06ae6e1501cef91df2fbdf8f9" dependencies = [ - "winapi 0.3.9", + "winapi", ] [[package]] @@ -2186,7 +2121,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -2378,7 +2313,7 @@ dependencies = [ "libc", "libproc", "mach", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -2388,7 +2323,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f58e4014e4044c192e428de63277504298583339889483bcbaf9ba91a532b2ec" dependencies = [ "libc", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -2578,7 +2513,7 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" dependencies = [ - "winapi 0.3.9", + "winapi", ] [[package]] @@ -2648,7 +2583,7 @@ dependencies = [ "spin 0.5.2", "untrusted", "web-sys", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -2733,7 +2668,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" dependencies = [ "lazy_static", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -3003,7 +2938,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5dc90fe6c7be1a323296982db1836d1ea9e47b6839496dde9a541bc496df3516" dependencies = [ "libc", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -3268,7 +3203,7 @@ dependencies = [ "rand 0.8.4", "redox_syscall", "remove_dir_all", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -3316,7 +3251,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" dependencies = [ "libc", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -3350,7 +3285,7 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "tokio-macros", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -3576,7 +3511,7 @@ checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" dependencies = [ "getrandom 0.2.3", "serde", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -3610,7 +3545,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" dependencies = [ "same-file", - "winapi 0.3.9", + "winapi", "winapi-util", ] @@ -3662,6 +3597,12 @@ version = "0.10.2+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "wasm-bindgen" version = "0.2.78" @@ -3790,16 +3731,10 @@ dependencies = [ "atexit", "log", "os_pipe", - "winapi 0.3.9", + "winapi", "winreg 0.10.1", ] -[[package]] -name = "winapi" -version = "0.2.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" - [[package]] name = "winapi" version = "0.3.9" @@ -3810,12 +3745,6 @@ dependencies = [ "winapi-x86_64-pc-windows-gnu", ] -[[package]] -name = "winapi-build" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" - [[package]] name = "winapi-i686-pc-windows-gnu" version = "0.4.0" @@ -3828,7 +3757,7 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" dependencies = [ - "winapi 0.3.9", + "winapi", ] [[package]] @@ -3843,7 +3772,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69" dependencies = [ - "winapi 0.3.9", + "winapi", ] [[package]] @@ -3852,7 +3781,7 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" dependencies = [ - "winapi 0.3.9", + "winapi", ] [[package]] @@ -3875,16 +3804,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "ws2_32-sys" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" -dependencies = [ - "winapi 0.2.8", - "winapi-build", -] - [[package]] name = "wyz" version = "0.2.0" diff --git a/src/agent/onefuzz-agent/src/local/common.rs b/src/agent/onefuzz-agent/src/local/common.rs index b1a25ad63d..5c266cdf36 100644 --- a/src/agent/onefuzz-agent/src/local/common.rs +++ b/src/agent/onefuzz-agent/src/local/common.rs @@ -274,10 +274,10 @@ impl DirectoryMonitorQueue { ); let queue = queue_client.clone(); let handle: tokio::task::JoinHandle> = tokio::spawn(async move { - let mut monitor = DirectoryMonitor::new(directory_path_clone.clone()); - monitor.start()?; + let mut monitor = DirectoryMonitor::new(directory_path_clone.clone())?; + monitor.start().await?; - while let Some(file_path) = monitor.next_file().await { + while let Some(file_path) = monitor.next_file().await? { let file_url = Url::from_file_path(file_path).map_err(|_| anyhow!("invalid file path"))?; queue.enqueue(file_url).await?; diff --git a/src/agent/onefuzz-agent/src/tasks/report/crash_report.rs b/src/agent/onefuzz-agent/src/tasks/report/crash_report.rs index ae706f9b38..6d7c180e5f 100644 --- a/src/agent/onefuzz-agent/src/tasks/report/crash_report.rs +++ b/src/agent/onefuzz-agent/src/tasks/report/crash_report.rs @@ -311,9 +311,9 @@ pub async fn monitor_reports( return Ok(()); } - let mut monitor = DirectoryMonitor::new(base_dir); - monitor.start()?; - while let Some(file) = monitor.next_file().await { + let mut monitor = DirectoryMonitor::new(base_dir)?; + monitor.start().await?; + while let Some(file) = monitor.next_file().await? { let result = parse_report_file(file).await?; result.save(unique_reports, reports, no_crash).await?; } diff --git a/src/agent/onefuzz/Cargo.toml b/src/agent/onefuzz/Cargo.toml index a817e79efe..446ae028a5 100644 --- a/src/agent/onefuzz/Cargo.toml +++ b/src/agent/onefuzz/Cargo.toml @@ -18,7 +18,7 @@ futures-util = "0.3" hex = "0.4" lazy_static = "1.4" log = "0.4" -notify = "4.0" +notify = "5.0.0-pre.14" regex = "1.5.5" reqwest = { version = "0.11", features = ["json", "stream", "rustls-tls"], default-features=false } sha2 = "0.10.1" diff --git a/src/agent/onefuzz/examples/dir-monitor.rs b/src/agent/onefuzz/examples/dir-monitor.rs new file mode 100644 index 0000000000..9d54ff1024 --- /dev/null +++ b/src/agent/onefuzz/examples/dir-monitor.rs @@ -0,0 +1,28 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +use anyhow::Result; +use onefuzz::monitor::DirectoryMonitor; +use structopt::StructOpt; + +#[derive(Debug, StructOpt)] +struct Opt { + #[structopt(short, long)] + path: String, +} + +#[tokio::main] +async fn main() -> Result<()> { + let opt = Opt::from_args(); + + let mut monitor = DirectoryMonitor::new(opt.path)?; + monitor.start().await?; + + while let Some(created) = monitor.next_file().await? { + println!("[create] {}", created.display()); + } + + println!("done!"); + + Ok(()) +} diff --git a/src/agent/onefuzz/src/monitor.rs b/src/agent/onefuzz/src/monitor.rs index 1c162e7d8a..2a4fe456b5 100644 --- a/src/agent/onefuzz/src/monitor.rs +++ b/src/agent/onefuzz/src/monitor.rs @@ -2,76 +2,103 @@ // Licensed under the MIT License. use std::path::PathBuf; -use std::sync::{self, mpsc::Receiver as SyncReceiver}; -use std::time::Duration; -use anyhow::Result; -use notify::{DebouncedEvent, Watcher}; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; -use tokio::task::{self, JoinHandle}; +use anyhow::{format_err, Result}; +use notify::{Event, EventKind, Watcher}; +use tokio::{ + fs, + sync::mpsc::{unbounded_channel, UnboundedReceiver}, +}; pub struct DirectoryMonitor { dir: PathBuf, - notify_events: UnboundedReceiver, + notify_events: UnboundedReceiver>, watcher: notify::RecommendedWatcher, } impl DirectoryMonitor { - pub fn new(dir: impl Into) -> Self { + pub fn new(dir: impl Into) -> Result { let dir = dir.into(); - let (notify_sender, notify_receiver) = sync::mpsc::channel(); - let delay = Duration::from_millis(100); - let watcher = notify::watcher(notify_sender, delay).unwrap(); - - // We can drop the thread handle, and it will continue to run until it - // errors or we drop the async receiver. - let (notify_events, _handle) = into_async(notify_receiver); - - Self { + let (sender, notify_events) = unbounded_channel(); + let event_handler = move |event_or_err| { + // A send error only occurs when the channel is closed. No remedial + // action is needed (or possible), so ignore it. + let _ = sender.send(event_or_err); + }; + let watcher = notify::recommended_watcher(event_handler)?; + + Ok(Self { dir, notify_events, watcher, - } + }) } - pub fn start(&mut self) -> Result<()> { + pub async fn start(&mut self) -> Result<()> { use notify::RecursiveMode; // Canonicalize so we can compare the watched dir to paths in the events. - self.dir = std::fs::canonicalize(&self.dir)?; + self.dir = fs::canonicalize(&self.dir).await?; + + // Initialize the watcher. self.watcher.watch(&self.dir, RecursiveMode::NonRecursive)?; Ok(()) } pub fn stop(&mut self) -> Result<()> { - self.watcher.unwatch(self.dir.clone())?; + self.watcher.unwatch(&self.dir)?; Ok(()) } - pub async fn next_file(&mut self) -> Option { + pub async fn next_file(&mut self) -> Result> { loop { - let event = self.notify_events.recv().await; + let event = match self.notify_events.recv().await { + Some(Ok(event)) => event, + Some(Err(err)) => { + // A low-level watch error has occurred. Treat as fatal. + warn!( + "error watching for new files. path = {}, error = {}", + self.dir.display(), + err + ); + + // Make sure we try to stop our `Watcher` if we return early. + let _ = self.stop(); + return Ok(None); + } + None => { + // Make sure we try to stop our `Watcher` if we return early. + let _ = self.stop(); + return Ok(None); + } + }; - if event.is_none() { - // Make sure we stop our `Watcher` if we return early. - let _ = self.stop(); - } + match event.kind { + EventKind::Create(..) => { + let path = event + .paths + .get(0) + .ok_or_else(|| format_err!("missing path for file create event"))? + .clone(); - match event? { - DebouncedEvent::Create(path) => { - return Some(path); + return Ok(Some(path)); } - DebouncedEvent::Remove(path) => { - if path == self.dir { + EventKind::Remove(..) => { + let path = event + .paths + .get(0) + .ok_or_else(|| format_err!("missing path for file remove event"))?; + + if path == &self.dir { // The directory we were watching was removed; we're done. let _ = self.stop(); - return None; + return Ok(None); } else { // Some file _inside_ the watched directory was removed. Ignore. } } - _event => { + _event_kind => { // Other filesystem event. Ignore. } } @@ -79,28 +106,6 @@ impl DirectoryMonitor { } } -/// Convert a `Receiver` from a `std::sync::mpsc` channel into an async receiver. -/// -/// The returned `JoinHandle` does _not_ need to be held by callers. The associated task -/// will continue to run (detached) if dropped. -fn into_async( - sync_receiver: SyncReceiver, -) -> (UnboundedReceiver, JoinHandle<()>) { - let (sender, receiver) = unbounded_channel(); - - let handle = task::spawn_blocking(move || { - while let Ok(msg) = sync_receiver.recv() { - if sender.send(msg).is_err() { - // The async receiver is closed. We can't do anything else, so - // drop this message (and the sync receiver). - break; - } - } - - // We'll never receive any more events. - // - // Drop our `Receiver` and hang up. - }); - - (receiver, handle) -} +#[cfg(not(target_os = "macos"))] +#[cfg(test)] +mod tests; diff --git a/src/agent/onefuzz/src/monitor/tests.rs b/src/agent/onefuzz/src/monitor/tests.rs new file mode 100644 index 0000000000..c932ec053f --- /dev/null +++ b/src/agent/onefuzz/src/monitor/tests.rs @@ -0,0 +1,105 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +use std::time::Duration; + +use anyhow::Result; +use tempfile::tempdir; +use tokio::fs; + +use crate::monitor::DirectoryMonitor; + +const TEST_TIMEOUT: Duration = Duration::from_millis(200); + +macro_rules! timed_test { + ($test_name: ident, $future: expr) => { + #[tokio::test] + async fn $test_name() -> Result<()> { + let result = tokio::time::timeout(TEST_TIMEOUT, $future).await; + result.map_err(|_| anyhow::anyhow!("test timed out after {:?}", TEST_TIMEOUT))? + } + }; +} + +timed_test!(test_monitor_empty_path, async move { + let mut monitor = DirectoryMonitor::new("")?; + + assert!(monitor.start().await.is_err()); + + Ok(()) +}); + +timed_test!(test_monitor_nonexistent_path, async move { + let mut monitor = DirectoryMonitor::new("some-nonexistent-path")?; + + assert!(monitor.start().await.is_err()); + + Ok(()) +}); + +timed_test!(test_monitor_dir, async move { + let dir = tempdir()?; + let mut monitor = DirectoryMonitor::new(dir.path())?; + + assert!(monitor.start().await.is_ok()); + + let _ = monitor.stop(); + + Ok(()) +}); + +timed_test!(test_monitor_dir_symlink, async move { + let parent = tempdir()?; + + let child = parent.path().join("child"); + fs::create_dir(&child).await?; + + let symlink = parent.path().join("link-to-child"); + + #[cfg(target_family = "unix")] + fs::symlink(&child, &symlink).await?; + + #[cfg(target_family = "windows")] + fs::symlink_dir(&child, &symlink).await?; + + let mut monitor = DirectoryMonitor::new(&symlink)?; + + assert!(monitor.start().await.is_ok()); + + let _ = monitor.stop(); + + Ok(()) +}); + +timed_test!(test_monitor_dir_create_files, async move { + use std::fs::canonicalize; + + let dir = tempdir()?; + let mut monitor = DirectoryMonitor::new(dir.path())?; + + assert!(monitor.start().await.is_ok()); + + let file_a = dir.path().join("a.txt"); + let file_b = dir.path().join("b.txt"); + let file_c = dir.path().join("c.txt"); + + fs::write(&file_a, "aaa").await?; + fs::write(&file_b, "bbb").await?; + fs::write(&file_c, "ccc").await?; + + assert_eq!(monitor.next_file().await?, Some(canonicalize(&file_a)?)); + assert_eq!(monitor.next_file().await?, Some(canonicalize(&file_b)?)); + assert_eq!(monitor.next_file().await?, Some(canonicalize(&file_c)?)); + + // TODO: on Windows, `notify` doesn't provide an event for the removal of a + // watched directory, so we can't proactively close our channel. + #[cfg(not(target_os = "windows"))] + { + dir.close()?; + assert_eq!(monitor.next_file().await?, None); + } + + let _ = monitor.stop(); + + Ok(()) +}); diff --git a/src/agent/onefuzz/src/syncdir.rs b/src/agent/onefuzz/src/syncdir.rs index 2ae81e3b0c..90540c0288 100644 --- a/src/agent/onefuzz/src/syncdir.rs +++ b/src/agent/onefuzz/src/syncdir.rs @@ -225,13 +225,13 @@ impl SyncedDir { ) -> Result<()> { debug!("monitoring {}", path.display()); - let mut monitor = DirectoryMonitor::new(path.clone()); - monitor.start()?; + let mut monitor = DirectoryMonitor::new(path.clone())?; + monitor.start().await?; if let Some(path) = url.as_file_path() { fs::create_dir_all(&path).await?; - while let Some(item) = monitor.next_file().await { + while let Some(item) = monitor.next_file().await? { let file_name = item .file_name() .ok_or_else(|| anyhow!("invalid file path"))?; @@ -267,7 +267,7 @@ impl SyncedDir { } else { let mut uploader = BlobUploader::new(url.url()?); - while let Some(item) = monitor.next_file().await { + while let Some(item) = monitor.next_file().await? { let file_name = item .file_name() .ok_or_else(|| anyhow!("invalid file path"))?;