Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Allow the local run to work without copying the synced dirs #794

Merged
12 commits merged into from
Apr 13, 2021
76 changes: 35 additions & 41 deletions src/agent/onefuzz-agent/src/local/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ use anyhow::Result;
use backoff::{future::retry, Error as BackoffError, ExponentialBackoff};
use clap::{App, Arg, ArgMatches};
use onefuzz::jitter::delay_with_jitter;
use onefuzz::{blob::BlobContainerUrl, monitor::DirectoryMonitor, syncdir::SyncedDir};
use onefuzz::{blob::url::BlobContainerUrl, monitor::DirectoryMonitor, syncdir::SyncedDir};
use path_absolutize::Absolutize;
use remove_dir_all::remove_dir_all;
chkeita marked this conversation as resolved.
Show resolved Hide resolved
use reqwest::Url;
use std::task::Poll;
use std::{
Expand Down Expand Up @@ -55,6 +54,8 @@ pub const ANALYSIS_INPUTS: &str = "analysis_inputs";
pub const ANALYSIS_UNIQUE_INPUTS: &str = "analysis_unique_inputs";
pub const PRESERVE_EXISTING_OUTPUTS: &str = "preserve_existing_outputs";

pub const CREATE_JOB_DIR: &str = "create_job_dir";

const WAIT_FOR_MAX_WAIT: Duration = Duration::from_secs(10);
const WAIT_FOR_DIR_DELAY: Duration = Duration::from_secs(1);

Expand Down Expand Up @@ -139,10 +140,10 @@ pub fn add_common_config(app: App<'static, 'static>) -> App<'static, 'static> {
.required(false),
)
.arg(
Arg::with_name("keep_job_dir")
.long("keep_job_dir")
Arg::with_name(CREATE_JOB_DIR)
.long(CREATE_JOB_DIR)
.required(false)
.help("keep the local directory created for running the task"),
.help("create a local job directory to sync the files"),
)
}

Expand All @@ -158,54 +159,51 @@ pub fn get_synced_dirs(
task_id: Uuid,
args: &ArgMatches<'_>,
) -> Result<Vec<SyncedDir>> {
let create_job_dir = args.is_present(CREATE_JOB_DIR);
let current_dir = std::env::current_dir()?;
args.values_of_os(name)
.ok_or_else(|| anyhow!("argument '{}' not specified", name))?
.enumerate()
.map(|(index, remote_path)| {
let path = PathBuf::from(remote_path);
let remote_path = path.absolutize()?;
let remote_url = Url::from_file_path(remote_path).expect("invalid file path");
let remote_blob_url = BlobContainerUrl::new(remote_url).expect("invalid url");
let path = current_dir.join(format!("{}/{}/{}_{}", job_id, task_id, name, index));
Ok(SyncedDir {
url: remote_blob_url,
path,
})
if create_job_dir {
let remote_path = path.absolutize()?;
let remote_url = Url::from_file_path(remote_path).expect("invalid file path");
let remote_blob_url = BlobContainerUrl::new(remote_url).expect("invalid url");
let path = current_dir.join(format!("{}/{}/{}_{}", job_id, task_id, name, index));
Ok(SyncedDir {
url: Some(remote_blob_url),
path,
})
} else {
Ok(SyncedDir { url: None, path })
}
})
.collect()
}

fn register_cleanup(job_id: Uuid) -> Result<()> {
let path = std::env::current_dir()?.join(job_id.to_string());
atexit::register(move || {
// only cleaing up if the path exists upon exit
if std::fs::metadata(&path).is_ok() {
let result = remove_dir_all(&path);

// don't panic if the remove failed but the path is gone
if result.is_err() && std::fs::metadata(&path).is_ok() {
result.expect("cleanup failed");
}
}
});
Ok(())
}

pub fn get_synced_dir(
name: &str,
job_id: Uuid,
task_id: Uuid,
args: &ArgMatches<'_>,
) -> Result<SyncedDir> {
let remote_path = value_t!(args, name, PathBuf)?.absolutize()?.into_owned();
let remote_url = Url::from_file_path(remote_path).map_err(|_| anyhow!("invalid file path"))?;
let remote_blob_url = BlobContainerUrl::new(remote_url)?;
let path = std::env::current_dir()?.join(format!("{}/{}/{}", job_id, task_id, name));
Ok(SyncedDir {
url: remote_blob_url,
path,
})
if args.is_present(CREATE_JOB_DIR) {
let remote_url =
Url::from_file_path(remote_path).map_err(|_| anyhow!("invalid file path"))?;
let remote_blob_url = BlobContainerUrl::new(remote_url)?;
let path = std::env::current_dir()?.join(format!("{}/{}/{}", job_id, task_id, name));
Ok(SyncedDir {
url: Some(remote_blob_url),
path,
})
} else {
Ok(SyncedDir {
url: None,
path: remote_path,
})
}
}

// NOTE: generate_task_id is intended to change the default behavior for local
Expand Down Expand Up @@ -238,10 +236,6 @@ pub fn build_local_context(
PathBuf::default()
};

if !args.is_present("keep_job_dir") {
register_cleanup(job_id)?;
}

let common_config = CommonConfig {
job_id,
task_id,
Expand Down Expand Up @@ -333,7 +327,7 @@ pub trait SyncCountDirMonitor<T: Sized> {

impl SyncCountDirMonitor<SyncedDir> for SyncedDir {
fn monitor_count(self, event_sender: &Option<UnboundedSender<UiEvent>>) -> Result<Self> {
if let (Some(event_sender), Some(p)) = (event_sender, self.url.as_file_path()) {
if let (Some(event_sender), Some(p)) = (event_sender, self.remote_url()?.as_file_path()) {
event_sender.send(UiEvent::MonitorDir(p))?;
}
Ok(self)
Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz-agent/src/local/libfuzzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub async fn run(
let fuzz_config = build_fuzz_config(args, context.common_config.clone(), event_sender.clone())?;
let crash_dir = fuzz_config
.crashes
.url
.remote_url()?
.as_file_path()
.expect("invalid crash dir remote location");

Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz-agent/src/local/radamsa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub async fn run(
let fuzz_config = build_fuzz_config(args, context.common_config.clone(), event_sender.clone())?;
let crash_dir = fuzz_config
.crashes
.url
.remote_url()?
.as_file_path()
.expect("invalid crash dir remote location");

Expand Down
18 changes: 10 additions & 8 deletions src/agent/onefuzz-agent/src/tasks/analysis/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ async fn run_existing(config: &Config, reports_dir: &Option<PathBuf>) -> Result<

async fn already_checked(config: &Config, input: &BlobUrl) -> Result<bool> {
let result = if let Some(crashes) = &config.crashes {
crashes.url.account() == input.account()
&& crashes.url.container() == input.container()
crashes.url.clone().and_then(|u| u.account()) == input.account()
&& crashes.url.clone().and_then(|u| u.container()) == input.container()
&& crashes.path.join(input.name()).exists()
} else {
false
Expand Down Expand Up @@ -211,12 +211,14 @@ pub async fn run_tool(
})
.set_optional_ref(&config.crashes, |tester, crashes| {
tester
.set_optional_ref(&crashes.url.account(), |tester, account| {
tester.crashes_account(account)
})
.set_optional_ref(&crashes.url.container(), |tester, container| {
tester.crashes_container(container)
})
.set_optional_ref(
&crashes.url.clone().and_then(|u| u.account()),
|tester, account| tester.crashes_account(account),
)
.set_optional_ref(
&crashes.url.clone().and_then(|u| u.container()),
|tester, container| tester.crashes_container(container),
)
});

let analyzer_path = expand.evaluate_value(&config.analyzer_exe)?;
Expand Down
12 changes: 9 additions & 3 deletions src/agent/onefuzz-agent/src/tasks/fuzz/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,21 @@ mod tests {
generator_options,
readonly_inputs: vec![SyncedDir {
path: readonly_inputs_local,
url: BlobContainerUrl::parse(Url::from_directory_path(inputs).unwrap())?,
url: Some(BlobContainerUrl::parse(
Url::from_directory_path(inputs).unwrap(),
)?),
}],
crashes: SyncedDir {
path: crashes_local,
url: BlobContainerUrl::parse(Url::from_directory_path(crashes).unwrap())?,
url: Some(BlobContainerUrl::parse(
Url::from_directory_path(crashes).unwrap(),
)?),
},
tools: Some(SyncedDir {
path: tools_local,
url: BlobContainerUrl::parse(Url::from_directory_path(radamsa_dir).unwrap())?,
url: Some(BlobContainerUrl::parse(
Url::from_directory_path(radamsa_dir).unwrap(),
)?),
}),
target_exe: Default::default(),
target_env: Default::default(),
Expand Down
25 changes: 15 additions & 10 deletions src/agent/onefuzz-agent/src/tasks/fuzz/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,14 @@ async fn start_supervisor(
.set_optional_ref(&config.common.instance_telemetry_key, |tester, key| {
tester.instance_telemetry_key(&key)
})
.set_optional_ref(&config.crashes.url.account(), |tester, account| {
tester.crashes_account(account)
})
.set_optional_ref(&config.crashes.url.container(), |tester, container| {
tester.crashes_container(container)
});
.set_optional_ref(
&config.crashes.url.clone().and_then(|u| u.account()),
|tester, account| tester.crashes_account(account),
)
.set_optional_ref(
&config.crashes.url.clone().and_then(|u| u.container()),
|tester, container| tester.crashes_container(container),
);

let supervisor_path = expand.evaluate_value(&config.supervisor_exe)?;
let mut cmd = Command::new(supervisor_path);
Expand Down Expand Up @@ -285,15 +287,18 @@ mod tests {
let corpus_dir_local = tempfile::tempdir().unwrap().path().into();
let crashes = SyncedDir {
path: crashes_local,
url: BlobContainerUrl::parse(Url::from_directory_path(fault_dir_temp).unwrap())
.unwrap(),
url: Some(
BlobContainerUrl::parse(Url::from_directory_path(fault_dir_temp).unwrap()).unwrap(),
),
};

let corpus_dir_temp = tempfile::tempdir().unwrap();
let corpus_dir = SyncedDir {
path: corpus_dir_local,
url: BlobContainerUrl::parse(Url::from_directory_path(corpus_dir_temp).unwrap())
.unwrap(),
url: Some(
BlobContainerUrl::parse(Url::from_directory_path(corpus_dir_temp).unwrap())
.unwrap(),
),
};
let seed_file_name = corpus_dir.path.join("seed.txt");
tokio::fs::write(seed_file_name, "xyz").await.unwrap();
Expand Down
6 changes: 3 additions & 3 deletions src/agent/onefuzz-agent/src/tasks/generic/input_poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl<M> InputPoller<M> {
let dir_relative = input_path.strip_prefix(&dir_path)?;
dir_relative.display().to_string()
};
let url = to_process.try_url().map(|x| x.blob(blob_name).url()).ok();
let url = to_process.try_url().map(|x| x.blob(blob_name).url());

processor.process(url, &path).await?;
}
Expand All @@ -160,8 +160,8 @@ impl<M> InputPoller<M> {
pub async fn seen_in_batch(&self, url: &Url) -> Result<bool> {
let result = if let Some(batch_dir) = &self.batch_dir {
if let Ok(blob) = BlobUrl::new(url.clone()) {
batch_dir.try_url()?.account() == blob.account()
&& batch_dir.try_url()?.container() == blob.container()
batch_dir.try_url().and_then(|u| u.account()) == blob.account()
&& batch_dir.try_url().and_then(|u| u.container()) == blob.container()
&& batch_dir.path.join(blob.name()).exists()
} else {
false
Expand Down
4 changes: 2 additions & 2 deletions src/agent/onefuzz-agent/src/tasks/regression/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub async fn handle_inputs(
.to_string_lossy()
.to_string();

let input_url = readonly_inputs.url.url().join(&file_name)?;
let input_url = readonly_inputs.remote_url()?.url().join(&file_name)?;

let crash_test_result = handler.get_crash_result(file_path, input_url).await?;
RegressionReport {
Expand Down Expand Up @@ -150,7 +150,7 @@ pub async fn handle_crash_reports(
}
.ok_or_else(|| format_err!("crash report is missing input blob: {}", file_name))?;

let input_url = crashes.url.blob(&input_blob.name).url();
let input_url = crashes.remote_url()?.url().clone();
let input = crashes.path.join(&input_blob.name);
let crash_test_result = handler.get_crash_result(input, input_url).await?;

Expand Down
Loading