diff --git a/src/agent/Cargo.lock b/src/agent/Cargo.lock index a0effc4705..6ceacae669 100644 --- a/src/agent/Cargo.lock +++ b/src/agent/Cargo.lock @@ -1764,7 +1764,6 @@ dependencies = [ "onefuzz", "onefuzz-telemetry", "path-absolutize", - "remove_dir_all 0.7.0", "reqwest", "reqwest-retry", "serde", @@ -2317,19 +2316,6 @@ dependencies = [ "winapi 0.3.9", ] -[[package]] -name = "remove_dir_all" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "882f368737489ea543bc5c340e6f3d34a28c39980bd9a979e47322b26f60ac40" -dependencies = [ - "libc", - "log", - "num_cpus", - "rayon", - "winapi 0.3.9", -] - [[package]] name = "reqwest" version = "0.10.10" @@ -2835,7 +2821,7 @@ dependencies = [ "libc", "rand 0.8.3", "redox_syscall", - "remove_dir_all 0.5.3", + "remove_dir_all", "winapi 0.3.9", ] diff --git a/src/agent/onefuzz-agent/Cargo.toml b/src/agent/onefuzz-agent/Cargo.toml index 2cd841f543..e641d139fe 100644 --- a/src/agent/onefuzz-agent/Cargo.toml +++ b/src/agent/onefuzz-agent/Cargo.toml @@ -31,7 +31,6 @@ onefuzz = { path = "../onefuzz" } onefuzz-telemetry = { path = "../onefuzz-telemetry" } path-absolutize = "3.0.6" reqwest-retry = { path = "../reqwest-retry" } -remove_dir_all = "0.7" stacktrace-parser = { path = "../stacktrace-parser" } storage-queue = { path = "../storage-queue" } tempfile = "3.2" diff --git a/src/agent/onefuzz-agent/src/local/common.rs b/src/agent/onefuzz-agent/src/local/common.rs index ddd7e1b8d1..33d1383bad 100644 --- a/src/agent/onefuzz-agent/src/local/common.rs +++ b/src/agent/onefuzz-agent/src/local/common.rs @@ -4,9 +4,8 @@ use anyhow::Result; use backoff::{future::retry, Error as BackoffError, ExponentialBackoff}; use clap::{App, Arg, ArgMatches}; use onefuzz::jitter::delay_with_jitter; -use onefuzz::{blob::BlobContainerUrl, monitor::DirectoryMonitor, syncdir::SyncedDir}; +use onefuzz::{blob::url::BlobContainerUrl, monitor::DirectoryMonitor, syncdir::SyncedDir}; use path_absolutize::Absolutize; -use remove_dir_all::remove_dir_all; use reqwest::Url; use std::task::Poll; use std::{ @@ -55,6 +54,8 @@ pub const ANALYSIS_INPUTS: &str = "analysis_inputs"; pub const ANALYSIS_UNIQUE_INPUTS: &str = "analysis_unique_inputs"; pub const PRESERVE_EXISTING_OUTPUTS: &str = "preserve_existing_outputs"; +pub const CREATE_JOB_DIR: &str = "create_job_dir"; + const WAIT_FOR_MAX_WAIT: Duration = Duration::from_secs(10); const WAIT_FOR_DIR_DELAY: Duration = Duration::from_secs(1); @@ -139,10 +140,10 @@ pub fn add_common_config(app: App<'static, 'static>) -> App<'static, 'static> { .required(false), ) .arg( - Arg::with_name("keep_job_dir") - .long("keep_job_dir") + Arg::with_name(CREATE_JOB_DIR) + .long(CREATE_JOB_DIR) .required(false) - .help("keep the local directory created for running the task"), + .help("create a local job directory to sync the files"), ) } @@ -158,40 +159,29 @@ pub fn get_synced_dirs( task_id: Uuid, args: &ArgMatches<'_>, ) -> Result> { + let create_job_dir = args.is_present(CREATE_JOB_DIR); let current_dir = std::env::current_dir()?; args.values_of_os(name) .ok_or_else(|| anyhow!("argument '{}' not specified", name))? .enumerate() .map(|(index, remote_path)| { let path = PathBuf::from(remote_path); - let remote_path = path.absolutize()?; - let remote_url = Url::from_file_path(remote_path).expect("invalid file path"); - let remote_blob_url = BlobContainerUrl::new(remote_url).expect("invalid url"); - let path = current_dir.join(format!("{}/{}/{}_{}", job_id, task_id, name, index)); - Ok(SyncedDir { - url: remote_blob_url, - path, - }) + if create_job_dir { + let remote_path = path.absolutize()?; + let remote_url = Url::from_file_path(remote_path).expect("invalid file path"); + let remote_blob_url = BlobContainerUrl::new(remote_url).expect("invalid url"); + let path = current_dir.join(format!("{}/{}/{}_{}", job_id, task_id, name, index)); + Ok(SyncedDir { + url: Some(remote_blob_url), + path, + }) + } else { + Ok(SyncedDir { url: None, path }) + } }) .collect() } -fn register_cleanup(job_id: Uuid) -> Result<()> { - let path = std::env::current_dir()?.join(job_id.to_string()); - atexit::register(move || { - // only cleaing up if the path exists upon exit - if std::fs::metadata(&path).is_ok() { - let result = remove_dir_all(&path); - - // don't panic if the remove failed but the path is gone - if result.is_err() && std::fs::metadata(&path).is_ok() { - result.expect("cleanup failed"); - } - } - }); - Ok(()) -} - pub fn get_synced_dir( name: &str, job_id: Uuid, @@ -199,13 +189,21 @@ pub fn get_synced_dir( args: &ArgMatches<'_>, ) -> Result { let remote_path = value_t!(args, name, PathBuf)?.absolutize()?.into_owned(); - let remote_url = Url::from_file_path(remote_path).map_err(|_| anyhow!("invalid file path"))?; - let remote_blob_url = BlobContainerUrl::new(remote_url)?; - let path = std::env::current_dir()?.join(format!("{}/{}/{}", job_id, task_id, name)); - Ok(SyncedDir { - url: remote_blob_url, - path, - }) + if args.is_present(CREATE_JOB_DIR) { + let remote_url = + Url::from_file_path(remote_path).map_err(|_| anyhow!("invalid file path"))?; + let remote_blob_url = BlobContainerUrl::new(remote_url)?; + let path = std::env::current_dir()?.join(format!("{}/{}/{}", job_id, task_id, name)); + Ok(SyncedDir { + url: Some(remote_blob_url), + path, + }) + } else { + Ok(SyncedDir { + url: None, + path: remote_path, + }) + } } // NOTE: generate_task_id is intended to change the default behavior for local @@ -238,10 +236,6 @@ pub fn build_local_context( PathBuf::default() }; - if !args.is_present("keep_job_dir") { - register_cleanup(job_id)?; - } - let common_config = CommonConfig { job_id, task_id, @@ -333,7 +327,7 @@ pub trait SyncCountDirMonitor { impl SyncCountDirMonitor for SyncedDir { fn monitor_count(self, event_sender: &Option>) -> Result { - if let (Some(event_sender), Some(p)) = (event_sender, self.url.as_file_path()) { + if let (Some(event_sender), Some(p)) = (event_sender, self.remote_url()?.as_file_path()) { event_sender.send(UiEvent::MonitorDir(p))?; } Ok(self) diff --git a/src/agent/onefuzz-agent/src/local/libfuzzer.rs b/src/agent/onefuzz-agent/src/local/libfuzzer.rs index 4b46c757d1..30c7d44996 100644 --- a/src/agent/onefuzz-agent/src/local/libfuzzer.rs +++ b/src/agent/onefuzz-agent/src/local/libfuzzer.rs @@ -36,7 +36,7 @@ pub async fn run( let fuzz_config = build_fuzz_config(args, context.common_config.clone(), event_sender.clone())?; let crash_dir = fuzz_config .crashes - .url + .remote_url()? .as_file_path() .expect("invalid crash dir remote location"); diff --git a/src/agent/onefuzz-agent/src/local/radamsa.rs b/src/agent/onefuzz-agent/src/local/radamsa.rs index 44c5663a48..c89d12cb22 100644 --- a/src/agent/onefuzz-agent/src/local/radamsa.rs +++ b/src/agent/onefuzz-agent/src/local/radamsa.rs @@ -25,7 +25,7 @@ pub async fn run( let fuzz_config = build_fuzz_config(args, context.common_config.clone(), event_sender.clone())?; let crash_dir = fuzz_config .crashes - .url + .remote_url()? .as_file_path() .expect("invalid crash dir remote location"); diff --git a/src/agent/onefuzz-agent/src/tasks/analysis/generic.rs b/src/agent/onefuzz-agent/src/tasks/analysis/generic.rs index d598f13a43..8507d58c38 100644 --- a/src/agent/onefuzz-agent/src/tasks/analysis/generic.rs +++ b/src/agent/onefuzz-agent/src/tasks/analysis/generic.rs @@ -130,8 +130,8 @@ async fn run_existing(config: &Config, reports_dir: &Option) -> Result< async fn already_checked(config: &Config, input: &BlobUrl) -> Result { let result = if let Some(crashes) = &config.crashes { - crashes.url.account() == input.account() - && crashes.url.container() == input.container() + crashes.url.clone().and_then(|u| u.account()) == input.account() + && crashes.url.clone().and_then(|u| u.container()) == input.container() && crashes.path.join(input.name()).exists() } else { false @@ -211,12 +211,14 @@ pub async fn run_tool( }) .set_optional_ref(&config.crashes, |tester, crashes| { tester - .set_optional_ref(&crashes.url.account(), |tester, account| { - tester.crashes_account(account) - }) - .set_optional_ref(&crashes.url.container(), |tester, container| { - tester.crashes_container(container) - }) + .set_optional_ref( + &crashes.url.clone().and_then(|u| u.account()), + |tester, account| tester.crashes_account(account), + ) + .set_optional_ref( + &crashes.url.clone().and_then(|u| u.container()), + |tester, container| tester.crashes_container(container), + ) }); let analyzer_path = expand.evaluate_value(&config.analyzer_exe)?; diff --git a/src/agent/onefuzz-agent/src/tasks/fuzz/generator.rs b/src/agent/onefuzz-agent/src/tasks/fuzz/generator.rs index d427141613..efe385eabf 100644 --- a/src/agent/onefuzz-agent/src/tasks/fuzz/generator.rs +++ b/src/agent/onefuzz-agent/src/tasks/fuzz/generator.rs @@ -244,15 +244,21 @@ mod tests { generator_options, readonly_inputs: vec![SyncedDir { path: readonly_inputs_local, - url: BlobContainerUrl::parse(Url::from_directory_path(inputs).unwrap())?, + url: Some(BlobContainerUrl::parse( + Url::from_directory_path(inputs).unwrap(), + )?), }], crashes: SyncedDir { path: crashes_local, - url: BlobContainerUrl::parse(Url::from_directory_path(crashes).unwrap())?, + url: Some(BlobContainerUrl::parse( + Url::from_directory_path(crashes).unwrap(), + )?), }, tools: Some(SyncedDir { path: tools_local, - url: BlobContainerUrl::parse(Url::from_directory_path(radamsa_dir).unwrap())?, + url: Some(BlobContainerUrl::parse( + Url::from_directory_path(radamsa_dir).unwrap(), + )?), }), target_exe: Default::default(), target_env: Default::default(), diff --git a/src/agent/onefuzz-agent/src/tasks/fuzz/supervisor.rs b/src/agent/onefuzz-agent/src/tasks/fuzz/supervisor.rs index 43fc7d192a..234d83cb14 100644 --- a/src/agent/onefuzz-agent/src/tasks/fuzz/supervisor.rs +++ b/src/agent/onefuzz-agent/src/tasks/fuzz/supervisor.rs @@ -199,12 +199,14 @@ async fn start_supervisor( .set_optional_ref(&config.common.instance_telemetry_key, |tester, key| { tester.instance_telemetry_key(&key) }) - .set_optional_ref(&config.crashes.url.account(), |tester, account| { - tester.crashes_account(account) - }) - .set_optional_ref(&config.crashes.url.container(), |tester, container| { - tester.crashes_container(container) - }); + .set_optional_ref( + &config.crashes.url.clone().and_then(|u| u.account()), + |tester, account| tester.crashes_account(account), + ) + .set_optional_ref( + &config.crashes.url.clone().and_then(|u| u.container()), + |tester, container| tester.crashes_container(container), + ); let supervisor_path = expand.evaluate_value(&config.supervisor_exe)?; let mut cmd = Command::new(supervisor_path); @@ -285,15 +287,18 @@ mod tests { let corpus_dir_local = tempfile::tempdir().unwrap().path().into(); let crashes = SyncedDir { path: crashes_local, - url: BlobContainerUrl::parse(Url::from_directory_path(fault_dir_temp).unwrap()) - .unwrap(), + url: Some( + BlobContainerUrl::parse(Url::from_directory_path(fault_dir_temp).unwrap()).unwrap(), + ), }; let corpus_dir_temp = tempfile::tempdir().unwrap(); let corpus_dir = SyncedDir { path: corpus_dir_local, - url: BlobContainerUrl::parse(Url::from_directory_path(corpus_dir_temp).unwrap()) - .unwrap(), + url: Some( + BlobContainerUrl::parse(Url::from_directory_path(corpus_dir_temp).unwrap()) + .unwrap(), + ), }; let seed_file_name = corpus_dir.path.join("seed.txt"); tokio::fs::write(seed_file_name, "xyz").await.unwrap(); diff --git a/src/agent/onefuzz-agent/src/tasks/generic/input_poller.rs b/src/agent/onefuzz-agent/src/tasks/generic/input_poller.rs index ff7ff91039..ef91b7894d 100644 --- a/src/agent/onefuzz-agent/src/tasks/generic/input_poller.rs +++ b/src/agent/onefuzz-agent/src/tasks/generic/input_poller.rs @@ -149,7 +149,7 @@ impl InputPoller { let dir_relative = input_path.strip_prefix(&dir_path)?; dir_relative.display().to_string() }; - let url = to_process.try_url().map(|x| x.blob(blob_name).url()).ok(); + let url = to_process.try_url().map(|x| x.blob(blob_name).url()); processor.process(url, &path).await?; } @@ -160,8 +160,8 @@ impl InputPoller { pub async fn seen_in_batch(&self, url: &Url) -> Result { let result = if let Some(batch_dir) = &self.batch_dir { if let Ok(blob) = BlobUrl::new(url.clone()) { - batch_dir.try_url()?.account() == blob.account() - && batch_dir.try_url()?.container() == blob.container() + batch_dir.try_url().and_then(|u| u.account()) == blob.account() + && batch_dir.try_url().and_then(|u| u.container()) == blob.container() && batch_dir.path.join(blob.name()).exists() } else { false diff --git a/src/agent/onefuzz-agent/src/tasks/regression/common.rs b/src/agent/onefuzz-agent/src/tasks/regression/common.rs index 4cc124afcf..3a1ae3a162 100644 --- a/src/agent/onefuzz-agent/src/tasks/regression/common.rs +++ b/src/agent/onefuzz-agent/src/tasks/regression/common.rs @@ -88,7 +88,7 @@ pub async fn handle_inputs( .to_string_lossy() .to_string(); - let input_url = readonly_inputs.url.url().join(&file_name)?; + let input_url = readonly_inputs.remote_url()?.url().join(&file_name)?; let crash_test_result = handler.get_crash_result(file_path, input_url).await?; RegressionReport { @@ -150,7 +150,7 @@ pub async fn handle_crash_reports( } .ok_or_else(|| format_err!("crash report is missing input blob: {}", file_name))?; - let input_url = crashes.url.blob(&input_blob.name).url(); + let input_url = crashes.remote_url()?.url().clone(); let input = crashes.path.join(&input_blob.name); let crash_test_result = handler.get_crash_result(input, input_url).await?; diff --git a/src/agent/onefuzz/src/syncdir.rs b/src/agent/onefuzz/src/syncdir.rs index ba6d74335b..3b94d3e781 100644 --- a/src/agent/onefuzz/src/syncdir.rs +++ b/src/agent/onefuzz/src/syncdir.rs @@ -12,7 +12,7 @@ use crate::{ use anyhow::{Context, Result}; use futures::stream::StreamExt; use onefuzz_telemetry::{Event, EventData}; -use reqwest::StatusCode; +use reqwest::{StatusCode, Url}; use reqwest_retry::{RetryCheck, SendRetry, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS}; use serde::{Deserialize, Serialize}; use std::{path::PathBuf, str, time::Duration}; @@ -30,13 +30,21 @@ const DEFAULT_CONTINUOUS_SYNC_DELAY_SECONDS: u64 = 60; #[derive(Debug, Deserialize, Clone, PartialEq)] pub struct SyncedDir { pub path: PathBuf, - pub url: BlobContainerUrl, + pub url: Option, } impl SyncedDir { + pub fn remote_url(&self) -> Result { + let url = self.url.clone().unwrap_or(BlobContainerUrl::new( + Url::from_file_path(self.path.clone()).map_err(|_| anyhow!("invalid path"))?, + )?); + Ok(url) + } + pub async fn sync(&self, operation: SyncOperation, delete_dst: bool) -> Result<()> { let dir = &self.path.join(""); - if let Some(dest) = self.url.as_file_path() { + + if let Some(dest) = self.url.clone().and_then(|u| u.as_file_path()) { debug!("syncing {:?} {}", operation, dest.display()); match operation { SyncOperation::Push => { @@ -56,19 +64,20 @@ impl SyncedDir { .await } } - } else { - let url = self.url.url(); + } else if let Some(url) = self.url.clone().map(|u| u.url().clone()) { let url = url.as_ref(); debug!("syncing {:?} {}", operation, dir.display()); match operation { SyncOperation::Push => az_copy::sync(dir, url, delete_dst).await, SyncOperation::Pull => az_copy::sync(url, dir, delete_dst).await, } + } else { + Ok(()) } } - pub fn try_url(&self) -> Result<&BlobContainerUrl> { - Ok(&self.url) + pub fn try_url(&self) -> Option { + self.url.clone() } pub async fn init_pull(&self) -> Result<()> { @@ -77,7 +86,7 @@ impl SyncedDir { } pub async fn init(&self) -> Result<()> { - if let Some(remote_path) = self.url.as_file_path() { + if let Some(remote_path) = self.url.clone().and_then(|u| u.as_file_path()) { fs::create_dir_all(remote_path).await?; } @@ -122,48 +131,64 @@ impl SyncedDir { // Conditionally upload a report, if it would not be a duplicate. pub async fn upload(&self, name: &str, data: &T) -> Result { - match self.url.as_file_path() { - Some(path) => { - let path = path.join(name); - if !exists(&path).await? { - let data = serde_json::to_vec(&data)?; - fs::write(path, data).await?; - Ok(true) - } else { - Ok(false) + if let Some(url) = self.url.clone() { + match url.as_file_path() { + Some(path) => { + let path = path.join(name); + if !exists(&path).await? { + let data = serde_json::to_vec(&data)?; + fs::write(path, data).await?; + Ok(true) + } else { + Ok(false) + } } - } - None => { - let url = self.url.blob(name).url(); - let blob = BlobClient::new(); - let result = blob - .put(url.clone()) - .json(data) - // Conditional PUT, only if-not-exists. - // https://docs.microsoft.com/en-us/rest/api/storageservices/specifying-conditional-headers-for-blob-service-operations - .header("If-None-Match", "*") - .send_retry( - |code| match code { - StatusCode::CONFLICT => RetryCheck::Succeed, - _ => RetryCheck::Retry, - }, - DEFAULT_RETRY_PERIOD, - MAX_RETRY_ATTEMPTS, - ) - .await - .context("SyncedDir.upload")?; + None => { + let url = url.blob(name).url(); + let blob = BlobClient::new(); + let result = blob + .put(url.clone()) + .json(data) + // Conditional PUT, only if-not-exists. + // https://docs.microsoft.com/en-us/rest/api/storageservices/specifying-conditional-headers-for-blob-service-operations + .header("If-None-Match", "*") + .send_retry( + |code| match code { + StatusCode::CONFLICT => RetryCheck::Succeed, + _ => RetryCheck::Retry, + }, + DEFAULT_RETRY_PERIOD, + MAX_RETRY_ATTEMPTS, + ) + .await + .context("SyncedDir.upload")?; - Ok(result.status() == StatusCode::CREATED) + Ok(result.status() == StatusCode::CREATED) + } + } + } else { + let path = self.path.join(name); + if !exists(&path).await? { + let data = serde_json::to_vec(&data)?; + fs::write(path, data).await?; + Ok(true) + } else { + Ok(false) } } } - async fn file_monitor_event(&self, event: Event, ignore_dotfiles: bool) -> Result<()> { - debug!("monitoring {}", self.path.display()); - let mut monitor = DirectoryMonitor::new(self.path.clone()); + async fn file_monitor_event( + path: PathBuf, + url: BlobContainerUrl, + event: Event, + ignore_dotfiles: bool, + ) -> Result<()> { + debug!("monitoring {}", path.display()); + let mut monitor = DirectoryMonitor::new(path.clone()); monitor.start()?; - if let Some(path) = self.url.as_file_path() { + if let Some(path) = url.as_file_path() { fs::create_dir_all(&path).await?; while let Some(item) = monitor.next().await { @@ -192,7 +217,7 @@ impl SyncedDir { } } } else { - let mut uploader = BlobUploader::new(self.url.url().clone()); + let mut uploader = BlobUploader::new(url.url().clone()); while let Some(item) = monitor.next().await { let file_name = item @@ -207,7 +232,7 @@ impl SyncedDir { let error_message = format!( "Couldn't upload file. path:{} dir:{} err:{}", item.display(), - self.path.display(), + path.display(), err ); @@ -235,17 +260,26 @@ impl SyncedDir { /// to be initialized, but a user-supplied binary, (such as AFL) logically owns /// a directory, and may reset it. pub async fn monitor_results(&self, event: Event, ignore_dotfiles: bool) -> Result<()> { - loop { - debug!("waiting to monitor {}", self.path.display()); + if let Some(url) = self.url.clone() { + loop { + debug!("waiting to monitor {}", self.path.display()); - while fs::metadata(&self.path).await.is_err() { - debug!("dir {} not ready to monitor, delaying", self.path.display()); - delay_with_jitter(DELAY).await; - } + while fs::metadata(&self.path).await.is_err() { + debug!("dir {} not ready to monitor, delaying", self.path.display()); + delay_with_jitter(DELAY).await; + } - debug!("starting monitor for {}", self.path.display()); - self.file_monitor_event(event.clone(), ignore_dotfiles) + debug!("starting monitor for {}", self.path.display()); + Self::file_monitor_event( + self.path.clone(), + url.clone(), + event.clone(), + ignore_dotfiles, + ) .await?; + } + } else { + Ok(()) } } }