From 85f606ab6a18003d360c1feb696cd74e7cf54a3f Mon Sep 17 00:00:00 2001 From: Cheick Keita Date: Mon, 19 Apr 2021 10:38:58 -0700 Subject: [PATCH] refactor SyncDir and blob container url (#809) --- src/agent/onefuzz-agent/src/local/common.rs | 17 ++-- .../src/tasks/analysis/generic.rs | 20 ++-- .../src/tasks/coverage/libfuzzer_coverage.rs | 20 ++-- .../src/tasks/coverage/recorder.rs | 2 +- .../onefuzz-agent/src/tasks/fuzz/generator.rs | 20 ++-- .../src/tasks/fuzz/libfuzzer_fuzz.rs | 24 +++-- .../src/tasks/fuzz/supervisor.rs | 40 ++++---- .../src/tasks/generic/input_poller.rs | 8 +- .../onefuzz-agent/src/tasks/merge/generic.rs | 13 +-- .../src/tasks/merge/libfuzzer_merge.rs | 6 +- .../src/tasks/regression/common.rs | 10 +- src/agent/onefuzz-supervisor/src/setup.rs | 2 +- src/agent/onefuzz/src/blob/url.rs | 96 ++++++++++--------- src/agent/onefuzz/src/syncdir.rs | 55 ++++++----- 14 files changed, 187 insertions(+), 146 deletions(-) diff --git a/src/agent/onefuzz-agent/src/local/common.rs b/src/agent/onefuzz-agent/src/local/common.rs index 6d718db712..0d0910f38c 100644 --- a/src/agent/onefuzz-agent/src/local/common.rs +++ b/src/agent/onefuzz-agent/src/local/common.rs @@ -172,11 +172,14 @@ pub fn get_synced_dirs( let remote_blob_url = BlobContainerUrl::new(remote_url).expect("invalid url"); let path = current_dir.join(format!("{}/{}/{}_{}", job_id, task_id, name, index)); Ok(SyncedDir { - url: Some(remote_blob_url), - path, + remote_path: Some(remote_blob_url), + local_path: path, }) } else { - Ok(SyncedDir { url: None, path }) + Ok(SyncedDir { + remote_path: None, + local_path: path, + }) } }) .collect() @@ -195,13 +198,13 @@ pub fn get_synced_dir( let remote_blob_url = BlobContainerUrl::new(remote_url)?; let path = std::env::current_dir()?.join(format!("{}/{}/{}", job_id, task_id, name)); Ok(SyncedDir { - url: Some(remote_blob_url), - path, + remote_path: Some(remote_blob_url), + local_path: path, }) } else { Ok(SyncedDir { - url: None, - path: remote_path, + remote_path: None, + local_path: remote_path, }) } } diff --git a/src/agent/onefuzz-agent/src/tasks/analysis/generic.rs b/src/agent/onefuzz-agent/src/tasks/analysis/generic.rs index 9280d4df90..1de5c23cdb 100644 --- a/src/agent/onefuzz-agent/src/tasks/analysis/generic.rs +++ b/src/agent/onefuzz-agent/src/tasks/analysis/generic.rs @@ -49,7 +49,7 @@ pub struct Config { pub async fn run(config: Config) -> Result<()> { let task_dir = config .analysis - .path + .local_path .parent() .ok_or_else(|| anyhow!("Invalid input path"))?; let temp_path = task_dir.join(".temp"); @@ -94,7 +94,7 @@ pub async fn run(config: Config) -> Result<()> { (None, None) }; - set_executable(&config.tools.path).await?; + set_executable(&config.tools.local_path).await?; run_existing(&config, &reports_path).await?; let poller = poll_inputs(&config, tmp, &reports_path); @@ -114,7 +114,7 @@ async fn run_existing(config: &Config, reports_dir: &Option) -> Result< if let Some(crashes) = &config.crashes { crashes.init_pull().await?; let mut count: u64 = 0; - let mut read_dir = fs::read_dir(&crashes.path).await?; + let mut read_dir = fs::read_dir(&crashes.local_path).await?; while let Some(file) = read_dir.next_entry().await? { debug!("Processing file {:?}", file); run_tool(file.path(), &config, &reports_dir).await?; @@ -128,9 +128,9 @@ async fn run_existing(config: &Config, reports_dir: &Option) -> Result< async fn already_checked(config: &Config, input: &BlobUrl) -> Result { let result = if let Some(crashes) = &config.crashes { - crashes.url.clone().and_then(|u| u.account()) == input.account() - && crashes.url.clone().and_then(|u| u.container()) == input.container() - && crashes.path.join(input.name()).exists() + crashes.remote_path.clone().and_then(|u| u.account()) == input.account() + && crashes.remote_path.clone().and_then(|u| u.container()) == input.container() + && crashes.local_path.join(input.name()).exists() } else { false }; @@ -193,8 +193,8 @@ pub async fn run_tool( .target_options(&config.target_options) .analyzer_exe(&config.analyzer_exe) .analyzer_options(&config.analyzer_options) - .output_dir(&config.analysis.path) - .tools_dir(&config.tools.path) + .output_dir(&config.analysis.local_path) + .tools_dir(&config.tools.local_path) .setup_dir(&config.common.setup_dir) .job_id(&config.common.job_id) .task_id(&config.common.task_id) @@ -210,11 +210,11 @@ pub async fn run_tool( .set_optional_ref(&config.crashes, |tester, crashes| { tester .set_optional_ref( - &crashes.url.clone().and_then(|u| u.account()), + &crashes.remote_path.clone().and_then(|u| u.account()), |tester, account| tester.crashes_account(account), ) .set_optional_ref( - &crashes.url.clone().and_then(|u| u.container()), + &crashes.remote_path.clone().and_then(|u| u.container()), |tester, container| tester.crashes_container(container), ) }); diff --git a/src/agent/onefuzz-agent/src/tasks/coverage/libfuzzer_coverage.rs b/src/agent/onefuzz-agent/src/tasks/coverage/libfuzzer_coverage.rs index 9c39868df5..7c97d4a643 100644 --- a/src/agent/onefuzz-agent/src/tasks/coverage/libfuzzer_coverage.rs +++ b/src/agent/onefuzz-agent/src/tasks/coverage/libfuzzer_coverage.rs @@ -116,7 +116,7 @@ impl CoverageTask { let mut seen_inputs = false; // Update the total with the coverage from each seed corpus. for dir in &self.config.readonly_inputs { - debug!("recording coverage for {}", dir.path.display()); + debug!("recording coverage for {}", dir.local_path.display()); dir.init_pull().await?; if self.record_corpus_coverage(&mut processor, dir).await? { seen_inputs = true; @@ -150,12 +150,14 @@ impl CoverageTask { processor: &mut CoverageProcessor, corpus_dir: &SyncedDir, ) -> Result { - let mut corpus = fs::read_dir(&corpus_dir.path).await.with_context(|| { - format!( - "unable to read corpus coverage directory: {}", - corpus_dir.path.display() - ) - })?; + let mut corpus = fs::read_dir(&corpus_dir.local_path) + .await + .with_context(|| { + format!( + "unable to read corpus coverage directory: {}", + corpus_dir.local_path.display() + ) + })?; let mut seen_inputs = false; loop { @@ -187,7 +189,7 @@ pub struct CoverageProcessor { impl CoverageProcessor { pub async fn new(config: Arc) -> Result { let heartbeat_client = config.common.init_heartbeat().await?; - let total = TotalCoverage::new(config.coverage.path.join(TOTAL_COVERAGE)); + let total = TotalCoverage::new(config.coverage.local_path.join(TOTAL_COVERAGE)); let recorder = CoverageRecorder::new(config.clone()).await?; let module_totals = BTreeMap::default(); @@ -209,7 +211,7 @@ impl CoverageProcessor { debug!("updating module info {:?}", module); if !self.module_totals.contains_key(&module) { - let parent = &self.config.coverage.path.join("by-module"); + let parent = &self.config.coverage.local_path.join("by-module"); fs::create_dir_all(parent).await.with_context(|| { format!( "unable to create by-module coverage directory: {}", diff --git a/src/agent/onefuzz-agent/src/tasks/coverage/recorder.rs b/src/agent/onefuzz-agent/src/tasks/coverage/recorder.rs index df6d7d59f1..0a895ba4ba 100644 --- a/src/agent/onefuzz-agent/src/tasks/coverage/recorder.rs +++ b/src/agent/onefuzz-agent/src/tasks/coverage/recorder.rs @@ -98,7 +98,7 @@ impl CoverageRecorder { let coverage_path = { let digest = digest_file(test_input).await?; - self.config.coverage.path.join("inputs").join(digest) + self.config.coverage.local_path.join("inputs").join(digest) }; fs::create_dir_all(&coverage_path).await.with_context(|| { diff --git a/src/agent/onefuzz-agent/src/tasks/fuzz/generator.rs b/src/agent/onefuzz-agent/src/tasks/fuzz/generator.rs index b648a34dea..727312a937 100644 --- a/src/agent/onefuzz-agent/src/tasks/fuzz/generator.rs +++ b/src/agent/onefuzz-agent/src/tasks/fuzz/generator.rs @@ -64,7 +64,7 @@ impl GeneratorTask { self.config.crashes.init().await?; if let Some(tools) = &self.config.tools { tools.init_pull().await?; - set_executable(&tools.path).await?; + set_executable(&tools.local_path).await?; } let hb_client = self.config.common.init_heartbeat().await?; @@ -104,7 +104,7 @@ impl GeneratorTask { loop { for corpus_dir in &self.config.readonly_inputs { heartbeat_client.alive(); - let corpus_dir = &corpus_dir.path; + let corpus_dir = &corpus_dir.local_path; let generated_inputs = tempdir()?; let generated_inputs_path = generated_inputs.path(); @@ -131,7 +131,7 @@ impl GeneratorTask { file.file_name() }; - let destination_file = self.config.crashes.path.join(destination_file); + let destination_file = self.config.crashes.local_path.join(destination_file); if tester.is_crash(file.path()).await? { fs::rename(file.path(), &destination_file).await?; debug!("crash found {}", destination_file.display()); @@ -162,7 +162,7 @@ impl GeneratorTask { tester.instance_telemetry_key(&key) }) .set_optional_ref(&self.config.tools, |expand, tools| { - expand.tools_dir(&tools.path) + expand.tools_dir(&tools.local_path) }); let generator_path = expand.evaluate_value(&self.config.generator_exe)?; @@ -240,20 +240,20 @@ mod tests { generator_exe: String::from("{tools_dir}/radamsa"), generator_options, readonly_inputs: vec![SyncedDir { - path: readonly_inputs_local, - url: Some(BlobContainerUrl::parse( + local_path: readonly_inputs_local, + remote_path: Some(BlobContainerUrl::parse( Url::from_directory_path(inputs).unwrap(), )?), }], crashes: SyncedDir { - path: crashes_local, - url: Some(BlobContainerUrl::parse( + local_path: crashes_local, + remote_path: Some(BlobContainerUrl::parse( Url::from_directory_path(crashes).unwrap(), )?), }, tools: Some(SyncedDir { - path: tools_local, - url: Some(BlobContainerUrl::parse( + local_path: tools_local, + remote_path: Some(BlobContainerUrl::parse( Url::from_directory_path(radamsa_dir).unwrap(), )?), }), diff --git a/src/agent/onefuzz-agent/src/tasks/fuzz/libfuzzer_fuzz.rs b/src/agent/onefuzz-agent/src/tasks/fuzz/libfuzzer_fuzz.rs index 7329284209..602f46904d 100644 --- a/src/agent/onefuzz-agent/src/tasks/fuzz/libfuzzer_fuzz.rs +++ b/src/agent/onefuzz-agent/src/tasks/fuzz/libfuzzer_fuzz.rs @@ -104,9 +104,12 @@ impl LibFuzzerFuzzTask { } pub async fn verify(&self) -> Result<()> { - let mut directories = vec![self.config.inputs.path.clone()]; + let mut directories = vec![self.config.inputs.local_path.clone()]; if let Some(readonly_inputs) = &self.config.readonly_inputs { - let mut dirs = readonly_inputs.iter().map(|x| x.path.clone()).collect(); + let mut dirs = readonly_inputs + .iter() + .map(|x| x.local_path.clone()) + .collect(); directories.append(&mut dirs); } @@ -136,7 +139,7 @@ impl LibFuzzerFuzzTask { let task_dir = self .config .inputs - .path + .local_path .parent() .ok_or_else(|| anyhow!("Invalid input path"))?; let temp_path = task_dir.join(".temp"); @@ -161,7 +164,12 @@ impl LibFuzzerFuzzTask { let mut entries = tokio::fs::read_dir(local_input_dir.path()).await?; while let Ok(Some(entry)) = entries.next_entry().await { - let destination_path = self.config.inputs.path.clone().join(entry.file_name()); + let destination_path = self + .config + .inputs + .local_path + .clone() + .join(entry.file_name()); tokio::fs::rename(&entry.path(), &destination_path) .await .with_context(|| { @@ -189,9 +197,11 @@ impl LibFuzzerFuzzTask { debug!("starting fuzzer run, run_id = {}", run_id); - let mut inputs = vec![&self.config.inputs.path]; + let mut inputs = vec![&self.config.inputs.local_path]; if let Some(readonly_inputs) = &self.config.readonly_inputs { - readonly_inputs.iter().for_each(|d| inputs.push(&d.path)); + readonly_inputs + .iter() + .for_each(|d| inputs.push(&d.local_path)); } let fuzzer = LibFuzzer::new( @@ -262,7 +272,7 @@ impl LibFuzzerFuzzTask { for file in &files { if let Some(filename) = file.file_name() { - let dest = self.config.crashes.path.join(filename); + let dest = self.config.crashes.local_path.join(filename); if let Err(e) = tokio::fs::rename(file.clone(), dest.clone()).await { if !dest.exists() { bail!(e) diff --git a/src/agent/onefuzz-agent/src/tasks/fuzz/supervisor.rs b/src/agent/onefuzz-agent/src/tasks/fuzz/supervisor.rs index b53b5d6fb6..3ebff8fcd0 100644 --- a/src/agent/onefuzz-agent/src/tasks/fuzz/supervisor.rs +++ b/src/agent/onefuzz-agent/src/tasks/fuzz/supervisor.rs @@ -62,13 +62,13 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> { // setup tools if let Some(tools) = &config.tools { tools.init_pull().await?; - set_executable(&tools.path).await?; + set_executable(&tools.local_path).await?; } // setup crashes let crashes = SyncedDir { - path: runtime_dir.path().join("crashes"), - url: config.crashes.url.clone(), + local_path: runtime_dir.path().join("crashes"), + remote_path: config.crashes.remote_path.clone(), }; crashes.init().await?; let monitor_crashes = crashes.monitor_results(new_result, false); @@ -92,8 +92,8 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> { ); let inputs = SyncedDir { - path: runtime_dir.path().join("inputs"), - url: config.inputs.url.clone(), + local_path: runtime_dir.path().join("inputs"), + remote_path: config.inputs.remote_path.clone(), }; inputs.init().await?; @@ -105,7 +105,7 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> { let delay = std::time::Duration::from_secs(10); loop { dir.sync_pull().await?; - if has_files(&dir.path).await? { + if has_files(&dir.local_path).await? { break; } delay_with_jitter(delay).await; @@ -177,13 +177,15 @@ async fn start_supervisor( .supervisor_exe(&config.supervisor_exe) .supervisor_options(&config.supervisor_options) .runtime_dir(&runtime_dir) - .crashes(&crashes.path) - .input_corpus(&inputs.path) + .crashes(&crashes.local_path) + .input_corpus(&inputs.local_path) .reports_dir(&reports_dir) .setup_dir(&config.common.setup_dir) .job_id(&config.common.job_id) .task_id(&config.common.task_id) - .set_optional_ref(&config.tools, |expand, tools| expand.tools_dir(&tools.path)) + .set_optional_ref(&config.tools, |expand, tools| { + expand.tools_dir(&tools.local_path) + }) .set_optional_ref(&config.target_exe, |expand, target_exe| { expand.target_exe(target_exe) }) @@ -200,11 +202,15 @@ async fn start_supervisor( tester.instance_telemetry_key(&key) }) .set_optional_ref( - &config.crashes.url.clone().and_then(|u| u.account()), + &config.crashes.remote_path.clone().and_then(|u| u.account()), |tester, account| tester.crashes_account(account), ) .set_optional_ref( - &config.crashes.url.clone().and_then(|u| u.container()), + &config + .crashes + .remote_path + .clone() + .and_then(|u| u.container()), |tester, container| tester.crashes_container(container), ); @@ -286,21 +292,21 @@ mod tests { let crashes_local = tempfile::tempdir().unwrap().path().into(); let corpus_dir_local = tempfile::tempdir().unwrap().path().into(); let crashes = SyncedDir { - path: crashes_local, - url: Some( + local_path: crashes_local, + remote_path: Some( BlobContainerUrl::parse(Url::from_directory_path(fault_dir_temp).unwrap()).unwrap(), ), }; let corpus_dir_temp = tempfile::tempdir().unwrap(); let corpus_dir = SyncedDir { - path: corpus_dir_local, - url: Some( + local_path: corpus_dir_local, + remote_path: Some( BlobContainerUrl::parse(Url::from_directory_path(corpus_dir_temp).unwrap()) .unwrap(), ), }; - let seed_file_name = corpus_dir.path.join("seed.txt"); + let seed_file_name = corpus_dir.local_path.join("seed.txt"); tokio::fs::write(seed_file_name, "xyz").await.unwrap(); let target_options = Some(vec!["{input}".to_owned()]); @@ -349,7 +355,7 @@ mod tests { let notify = Notify::new(); let _fuzzing_monitor = monitor_process(process, "supervisor".to_string(), false, Some(¬ify)); - let stat_output = crashes.path.join("fuzzer_stats"); + let stat_output = crashes.local_path.join("fuzzer_stats"); let start = Instant::now(); loop { if has_stats(&stat_output).await { diff --git a/src/agent/onefuzz-agent/src/tasks/generic/input_poller.rs b/src/agent/onefuzz-agent/src/tasks/generic/input_poller.rs index b94287db47..fdcde4a517 100644 --- a/src/agent/onefuzz-agent/src/tasks/generic/input_poller.rs +++ b/src/agent/onefuzz-agent/src/tasks/generic/input_poller.rs @@ -128,10 +128,10 @@ impl InputPoller { info!( "batch processing directory: {} - {}", self.name, - to_process.path.display() + to_process.local_path.display() ); - let mut read_dir = fs::read_dir(&to_process.path).await?; + let mut read_dir = fs::read_dir(&to_process.local_path).await?; while let Some(file) = read_dir.next_entry().await? { let path = file.path(); info!( @@ -143,7 +143,7 @@ impl InputPoller { // Compute the file name relative to the synced directory, and thus the // container. let blob_name = { - let dir_path = to_process.path.canonicalize()?; + let dir_path = to_process.local_path.canonicalize()?; let input_path = path.canonicalize()?; let dir_relative = input_path.strip_prefix(&dir_path)?; dir_relative.display().to_string() @@ -161,7 +161,7 @@ impl InputPoller { if let Ok(blob) = BlobUrl::new(url.clone()) { batch_dir.try_url().and_then(|u| u.account()) == blob.account() && batch_dir.try_url().and_then(|u| u.container()) == blob.container() - && batch_dir.path.join(blob.name()).exists() + && batch_dir.local_path.join(blob.name()).exists() } else { false } diff --git a/src/agent/onefuzz-agent/src/tasks/merge/generic.rs b/src/agent/onefuzz-agent/src/tasks/merge/generic.rs index 66caa6f571..d3e80ed766 100644 --- a/src/agent/onefuzz-agent/src/tasks/merge/generic.rs +++ b/src/agent/onefuzz-agent/src/tasks/merge/generic.rs @@ -46,7 +46,7 @@ pub struct Config { pub async fn spawn(config: Arc) -> Result<()> { config.tools.init_pull().await?; - set_executable(&config.tools.path).await?; + set_executable(&config.tools.local_path).await?; config.unique_inputs.init().await?; let hb_client = config.common.init_heartbeat().await?; @@ -94,7 +94,8 @@ pub async fn spawn(config: Arc) -> Result<()> { } async fn process_message(config: Arc, input_url: &Url, tmp_dir: &Path) -> Result<()> { - let input_path = utils::download_input(input_url.clone(), &config.unique_inputs.path).await?; + let input_path = + utils::download_input(input_url.clone(), &config.unique_inputs.local_path).await?; info!("downloaded input to {}", input_path.display()); info!("Merging corpus"); @@ -105,8 +106,8 @@ async fn process_message(config: Arc, input_url: &Url, tmp_dir: &Path) - queue_dir.push("queue"); let _delete_output = tokio::fs::remove_dir_all(queue_dir).await; let synced_dir = SyncedDir { - path: tmp_dir.to_path_buf(), - url: config.unique_inputs.url.clone(), + local_path: tmp_dir.to_path_buf(), + remote_path: config.unique_inputs.remote_path.clone(), }; synced_dir.sync_push().await? } @@ -131,14 +132,14 @@ async fn try_delete_blob(input_url: Url) -> Result<()> { async fn merge(config: &Config, output_dir: impl AsRef) -> Result<()> { let expand = Expand::new() .input_marker(&config.supervisor_input_marker) - .input_corpus(&config.unique_inputs.path) + .input_corpus(&config.unique_inputs.local_path) .target_options(&config.target_options) .supervisor_exe(&config.supervisor_exe) .supervisor_options(&config.supervisor_options) .generated_inputs(output_dir) .target_exe(&config.target_exe) .setup_dir(&config.common.setup_dir) - .tools_dir(&config.tools.path) + .tools_dir(&config.tools.local_path) .job_id(&config.common.job_id) .task_id(&config.common.task_id) .set_optional_ref(&config.common.microsoft_telemetry_key, |tester, key| { diff --git a/src/agent/onefuzz-agent/src/tasks/merge/libfuzzer_merge.rs b/src/agent/onefuzz-agent/src/tasks/merge/libfuzzer_merge.rs index 8b6b64ccc6..a215cea984 100644 --- a/src/agent/onefuzz-agent/src/tasks/merge/libfuzzer_merge.rs +++ b/src/agent/onefuzz-agent/src/tasks/merge/libfuzzer_merge.rs @@ -70,7 +70,7 @@ pub async fn spawn(config: Arc) -> Result<()> { input.init().await?; input.sync_pull().await?; } - let input_paths = config.inputs.iter().map(|i| &i.path).collect(); + let input_paths = config.inputs.iter().map(|i| &i.local_path).collect(); sync_and_merge( config.clone(), input_paths, @@ -166,7 +166,9 @@ pub async fn merge_inputs( &config.target_env, &config.common.setup_dir, ); - merger.merge(&config.unique_inputs.path, &candidates).await + merger + .merge(&config.unique_inputs.local_path, &candidates) + .await } async fn try_delete_blob(input_url: Url) -> Result<()> { diff --git a/src/agent/onefuzz-agent/src/tasks/regression/common.rs b/src/agent/onefuzz-agent/src/tasks/regression/common.rs index 3a1ae3a162..dd02f4bf94 100644 --- a/src/agent/onefuzz-agent/src/tasks/regression/common.rs +++ b/src/agent/onefuzz-agent/src/tasks/regression/common.rs @@ -73,7 +73,7 @@ pub async fn handle_inputs( heartbeat_client: &Option, ) -> Result<()> { readonly_inputs.init_pull().await?; - let mut input_files = tokio::fs::read_dir(&readonly_inputs.path).await?; + let mut input_files = tokio::fs::read_dir(&readonly_inputs.local_path).await?; while let Some(file) = input_files.next_entry().await? { heartbeat_client.alive(); @@ -88,7 +88,7 @@ pub async fn handle_inputs( .to_string_lossy() .to_string(); - let input_url = readonly_inputs.remote_url()?.url().join(&file_name)?; + let input_url = readonly_inputs.remote_url()?.url()?.join(&file_name)?; let crash_test_result = handler.get_crash_result(file_path, input_url).await?; RegressionReport { @@ -120,7 +120,7 @@ pub async fn handle_crash_reports( for possible_dir in report_dirs { possible_dir.init_pull().await?; - let mut report_files = tokio::fs::read_dir(&possible_dir.path).await?; + let mut report_files = tokio::fs::read_dir(&possible_dir.local_path).await?; while let Some(file) = report_files.next_entry().await? { heartbeat_client.alive(); let file_path = file.path(); @@ -150,8 +150,8 @@ pub async fn handle_crash_reports( } .ok_or_else(|| format_err!("crash report is missing input blob: {}", file_name))?; - let input_url = crashes.remote_url()?.url().clone(); - let input = crashes.path.join(&input_blob.name); + let input_url = crashes.remote_url()?.url()?; + let input = crashes.local_path.join(&input_blob.name); let crash_test_result = handler.get_crash_result(input, input_url).await?; RegressionReport { diff --git a/src/agent/onefuzz-supervisor/src/setup.rs b/src/agent/onefuzz-supervisor/src/setup.rs index 347565913f..87d571fa96 100644 --- a/src/agent/onefuzz-supervisor/src/setup.rs +++ b/src/agent/onefuzz-supervisor/src/setup.rs @@ -41,7 +41,7 @@ impl SetupRunner { work_set.save_context().await?; // Download the setup container. - let setup_url = work_set.setup_url.url(); + let setup_url = work_set.setup_url.url()?; let setup_dir = work_set.setup_dir()?; // `azcopy sync` requires the local dir to exist. diff --git a/src/agent/onefuzz/src/blob/url.rs b/src/agent/onefuzz/src/blob/url.rs index 7826ead5cd..38663c1ff3 100644 --- a/src/agent/onefuzz/src/blob/url.rs +++ b/src/agent/onefuzz/src/blob/url.rs @@ -116,8 +116,9 @@ impl fmt::Display for BlobUrl { /// /// Use to validate a URL and address contained blobs. #[derive(Clone, Eq, PartialEq)] -pub struct BlobContainerUrl { - url: Url, +pub enum BlobContainerUrl { + BlobContainer(Url), + Path(PathBuf), } impl BlobContainerUrl { @@ -126,11 +127,19 @@ impl BlobContainerUrl { bail!("Invalid container URL: {}", url); } - Ok(Self { url }) + if let Ok(path) = url.to_file_path() { + Ok(Self::Path(path)) + } else { + Ok(Self::BlobContainer(url)) + } } pub fn as_file_path(&self) -> Option { - self.url.to_file_path().ok() + if let Self::Path(p) = self { + Some(p.clone()) + } else { + None + } } pub fn parse(url: impl AsRef) -> Result { @@ -139,55 +148,53 @@ impl BlobContainerUrl { Self::new(url) } - pub fn url(&self) -> &Url { - &self.url + pub fn url(&self) -> Result { + match self { + Self::BlobContainer(url) => Ok(url.clone()), + Self::Path(p) => Ok(Url::from_file_path(p).map_err(|_| anyhow!("invalid path"))?), + } } pub fn account(&self) -> Option { - if self.as_file_path().is_some() { - None - } else { - // Ctor checks that domain has at least one subdomain. - Some( - self.url - .domain() - .unwrap() - .split('.') - .next() - .unwrap() - .to_owned(), - ) + match self { + Self::BlobContainer(url) => { + // Ctor checks that domain has at least one subdomain. + Some(url.domain().unwrap().split('.').next().unwrap().to_owned()) + } + Self::Path(_p) => None, } } pub fn container(&self) -> Option { - if self.as_file_path().is_some() { - None - } else { - Some(self.url.path_segments().unwrap().next().unwrap().to_owned()) + match self { + Self::BlobContainer(url) => { + Some(url.path_segments().unwrap().next().unwrap().to_owned()) + } + Self::Path(_p) => None, } } pub fn blob(&self, name: impl AsRef) -> BlobUrl { - let mut url = self.url.clone(); - name.as_ref().split('/').fold( - &mut url.path_segments_mut().unwrap(), // Checked in ctor - |segments, current| segments.push(current), - ); - - BlobUrl::new(url).expect("invalid blob URL from valid container") - } -} - -impl AsRef for BlobContainerUrl { - fn as_ref(&self) -> &Url { - self.url() + match self { + Self::BlobContainer(url) => { + let mut url = url.clone(); + name.as_ref().split('/').fold( + &mut url.path_segments_mut().unwrap(), // Checked in ctor + |segments, current| segments.push(current), + ); + BlobUrl::AzureBlob(url) + } + Self::Path(p) => BlobUrl::LocalFile(p.join(name.as_ref())), + } } } impl fmt::Debug for BlobContainerUrl { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", redact_query_sas_sig(self.url())) + match self { + Self::BlobContainer(url) => write!(f, "{}", redact_query_sas_sig(url)), + Self::Path(p) => write!(f, "{}", p.display()), + } } } @@ -203,12 +210,6 @@ impl fmt::Display for BlobContainerUrl { } } -impl From for Url { - fn from(container: BlobContainerUrl) -> Self { - container.url - } -} - fn redact_query_sas_sig(url: &Url) -> Url { let mut redacted = url.clone(); redacted.set_query(None); @@ -292,8 +293,13 @@ impl Serialize for BlobContainerUrl { where S: Serializer, { - let url = self.url.to_string(); - serializer.serialize_str(&url) + match self { + Self::Path(p) => serializer.serialize_str(p.to_str().unwrap_or_default()), + Self::BlobContainer(url) => { + let url = url.to_string(); + serializer.serialize_str(&url) + } + } } } diff --git a/src/agent/onefuzz/src/syncdir.rs b/src/agent/onefuzz/src/syncdir.rs index 3b94d3e781..5159d24f05 100644 --- a/src/agent/onefuzz/src/syncdir.rs +++ b/src/agent/onefuzz/src/syncdir.rs @@ -29,22 +29,24 @@ const DEFAULT_CONTINUOUS_SYNC_DELAY_SECONDS: u64 = 60; #[derive(Debug, Deserialize, Clone, PartialEq)] pub struct SyncedDir { - pub path: PathBuf, - pub url: Option, + #[serde(alias = "local_path", alias = "path")] + pub local_path: PathBuf, + #[serde(alias = "remote_path", alias = "url")] + pub remote_path: Option, } impl SyncedDir { pub fn remote_url(&self) -> Result { - let url = self.url.clone().unwrap_or(BlobContainerUrl::new( - Url::from_file_path(self.path.clone()).map_err(|_| anyhow!("invalid path"))?, + let url = self.remote_path.clone().unwrap_or(BlobContainerUrl::new( + Url::from_file_path(self.local_path.clone()).map_err(|_| anyhow!("invalid path"))?, )?); Ok(url) } pub async fn sync(&self, operation: SyncOperation, delete_dst: bool) -> Result<()> { - let dir = &self.path.join(""); + let dir = &self.local_path.join(""); - if let Some(dest) = self.url.clone().and_then(|u| u.as_file_path()) { + if let Some(dest) = self.remote_path.clone().and_then(|u| u.as_file_path()) { debug!("syncing {:?} {}", operation, dest.display()); match operation { SyncOperation::Push => { @@ -64,7 +66,7 @@ impl SyncedDir { .await } } - } else if let Some(url) = self.url.clone().map(|u| u.url().clone()) { + } else if let Some(url) = self.remote_path.clone().and_then(|u| u.url().ok()) { let url = url.as_ref(); debug!("syncing {:?} {}", operation, dir.display()); match operation { @@ -77,7 +79,7 @@ impl SyncedDir { } pub fn try_url(&self) -> Option { - self.url.clone() + self.remote_path.clone() } pub async fn init_pull(&self) -> Result<()> { @@ -86,20 +88,26 @@ impl SyncedDir { } pub async fn init(&self) -> Result<()> { - if let Some(remote_path) = self.url.clone().and_then(|u| u.as_file_path()) { + if let Some(remote_path) = self.remote_path.clone().and_then(|u| u.as_file_path()) { fs::create_dir_all(remote_path).await?; } - match fs::metadata(&self.path).await { + match fs::metadata(&self.local_path).await { Ok(m) => { if m.is_dir() { Ok(()) } else { - anyhow::bail!("File with name '{}' already exists", self.path.display()); + anyhow::bail!( + "File with name '{}' already exists", + self.local_path.display() + ); } } - Err(_) => fs::create_dir_all(&self.path).await.with_context(|| { - format!("unable to create local SyncedDir: {}", self.path.display()) + Err(_) => fs::create_dir_all(&self.local_path).await.with_context(|| { + format!( + "unable to create local SyncedDir: {}", + self.local_path.display() + ) }), } } @@ -131,7 +139,7 @@ impl SyncedDir { // Conditionally upload a report, if it would not be a duplicate. pub async fn upload(&self, name: &str, data: &T) -> Result { - if let Some(url) = self.url.clone() { + if let Some(url) = self.remote_path.clone() { match url.as_file_path() { Some(path) => { let path = path.join(name); @@ -167,7 +175,7 @@ impl SyncedDir { } } } else { - let path = self.path.join(name); + let path = self.local_path.join(name); if !exists(&path).await? { let data = serde_json::to_vec(&data)?; fs::write(path, data).await?; @@ -217,7 +225,7 @@ impl SyncedDir { } } } else { - let mut uploader = BlobUploader::new(url.url().clone()); + let mut uploader = BlobUploader::new(url.url()?); while let Some(item) = monitor.next().await { let file_name = item @@ -260,18 +268,21 @@ impl SyncedDir { /// to be initialized, but a user-supplied binary, (such as AFL) logically owns /// a directory, and may reset it. pub async fn monitor_results(&self, event: Event, ignore_dotfiles: bool) -> Result<()> { - if let Some(url) = self.url.clone() { + if let Some(url) = self.remote_path.clone() { loop { - debug!("waiting to monitor {}", self.path.display()); + debug!("waiting to monitor {}", self.local_path.display()); - while fs::metadata(&self.path).await.is_err() { - debug!("dir {} not ready to monitor, delaying", self.path.display()); + while fs::metadata(&self.local_path).await.is_err() { + debug!( + "dir {} not ready to monitor, delaying", + self.local_path.display() + ); delay_with_jitter(DELAY).await; } - debug!("starting monitor for {}", self.path.display()); + debug!("starting monitor for {}", self.local_path.display()); Self::file_monitor_event( - self.path.clone(), + self.local_path.clone(), url.clone(), event.clone(), ignore_dotfiles,