Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

refactor SyncDir and blob container url #809

Merged
7 commits merged into from
Apr 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions src/agent/onefuzz-agent/src/local/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,14 @@ pub fn get_synced_dirs(
let remote_blob_url = BlobContainerUrl::new(remote_url).expect("invalid url");
let path = current_dir.join(format!("{}/{}/{}_{}", job_id, task_id, name, index));
Ok(SyncedDir {
url: Some(remote_blob_url),
path,
remote_path: Some(remote_blob_url),
local_path: path,
})
} else {
Ok(SyncedDir { url: None, path })
Ok(SyncedDir {
remote_path: None,
local_path: path,
})
}
})
.collect()
Expand All @@ -195,13 +198,13 @@ pub fn get_synced_dir(
let remote_blob_url = BlobContainerUrl::new(remote_url)?;
let path = std::env::current_dir()?.join(format!("{}/{}/{}", job_id, task_id, name));
Ok(SyncedDir {
url: Some(remote_blob_url),
path,
remote_path: Some(remote_blob_url),
local_path: path,
})
} else {
Ok(SyncedDir {
url: None,
path: remote_path,
remote_path: None,
local_path: remote_path,
})
}
}
Expand Down
20 changes: 10 additions & 10 deletions src/agent/onefuzz-agent/src/tasks/analysis/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct Config {
pub async fn run(config: Config) -> Result<()> {
let task_dir = config
.analysis
.path
.local_path
.parent()
.ok_or_else(|| anyhow!("Invalid input path"))?;
let temp_path = task_dir.join(".temp");
Expand Down Expand Up @@ -94,7 +94,7 @@ pub async fn run(config: Config) -> Result<()> {
(None, None)
};

set_executable(&config.tools.path).await?;
set_executable(&config.tools.local_path).await?;
run_existing(&config, &reports_path).await?;
let poller = poll_inputs(&config, tmp, &reports_path);

Expand All @@ -114,7 +114,7 @@ async fn run_existing(config: &Config, reports_dir: &Option<PathBuf>) -> Result<
if let Some(crashes) = &config.crashes {
crashes.init_pull().await?;
let mut count: u64 = 0;
let mut read_dir = fs::read_dir(&crashes.path).await?;
let mut read_dir = fs::read_dir(&crashes.local_path).await?;
while let Some(file) = read_dir.next_entry().await? {
debug!("Processing file {:?}", file);
run_tool(file.path(), &config, &reports_dir).await?;
Expand All @@ -128,9 +128,9 @@ async fn run_existing(config: &Config, reports_dir: &Option<PathBuf>) -> Result<

async fn already_checked(config: &Config, input: &BlobUrl) -> Result<bool> {
let result = if let Some(crashes) = &config.crashes {
crashes.url.clone().and_then(|u| u.account()) == input.account()
&& crashes.url.clone().and_then(|u| u.container()) == input.container()
&& crashes.path.join(input.name()).exists()
crashes.remote_path.clone().and_then(|u| u.account()) == input.account()
&& crashes.remote_path.clone().and_then(|u| u.container()) == input.container()
&& crashes.local_path.join(input.name()).exists()
} else {
false
};
Expand Down Expand Up @@ -193,8 +193,8 @@ pub async fn run_tool(
.target_options(&config.target_options)
.analyzer_exe(&config.analyzer_exe)
.analyzer_options(&config.analyzer_options)
.output_dir(&config.analysis.path)
.tools_dir(&config.tools.path)
.output_dir(&config.analysis.local_path)
.tools_dir(&config.tools.local_path)
.setup_dir(&config.common.setup_dir)
.job_id(&config.common.job_id)
.task_id(&config.common.task_id)
Expand All @@ -210,11 +210,11 @@ pub async fn run_tool(
.set_optional_ref(&config.crashes, |tester, crashes| {
tester
.set_optional_ref(
&crashes.url.clone().and_then(|u| u.account()),
&crashes.remote_path.clone().and_then(|u| u.account()),
|tester, account| tester.crashes_account(account),
)
.set_optional_ref(
&crashes.url.clone().and_then(|u| u.container()),
&crashes.remote_path.clone().and_then(|u| u.container()),
|tester, container| tester.crashes_container(container),
)
});
Expand Down
20 changes: 11 additions & 9 deletions src/agent/onefuzz-agent/src/tasks/coverage/libfuzzer_coverage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl CoverageTask {
let mut seen_inputs = false;
// Update the total with the coverage from each seed corpus.
for dir in &self.config.readonly_inputs {
debug!("recording coverage for {}", dir.path.display());
debug!("recording coverage for {}", dir.local_path.display());
dir.init_pull().await?;
if self.record_corpus_coverage(&mut processor, dir).await? {
seen_inputs = true;
Expand Down Expand Up @@ -150,12 +150,14 @@ impl CoverageTask {
processor: &mut CoverageProcessor,
corpus_dir: &SyncedDir,
) -> Result<bool> {
let mut corpus = fs::read_dir(&corpus_dir.path).await.with_context(|| {
format!(
"unable to read corpus coverage directory: {}",
corpus_dir.path.display()
)
})?;
let mut corpus = fs::read_dir(&corpus_dir.local_path)
.await
.with_context(|| {
format!(
"unable to read corpus coverage directory: {}",
corpus_dir.local_path.display()
)
})?;
let mut seen_inputs = false;

loop {
Expand Down Expand Up @@ -187,7 +189,7 @@ pub struct CoverageProcessor {
impl CoverageProcessor {
pub async fn new(config: Arc<Config>) -> Result<Self> {
let heartbeat_client = config.common.init_heartbeat().await?;
let total = TotalCoverage::new(config.coverage.path.join(TOTAL_COVERAGE));
let total = TotalCoverage::new(config.coverage.local_path.join(TOTAL_COVERAGE));
let recorder = CoverageRecorder::new(config.clone()).await?;
let module_totals = BTreeMap::default();

Expand All @@ -209,7 +211,7 @@ impl CoverageProcessor {
debug!("updating module info {:?}", module);

if !self.module_totals.contains_key(&module) {
let parent = &self.config.coverage.path.join("by-module");
let parent = &self.config.coverage.local_path.join("by-module");
fs::create_dir_all(parent).await.with_context(|| {
format!(
"unable to create by-module coverage directory: {}",
Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz-agent/src/tasks/coverage/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl CoverageRecorder {

let coverage_path = {
let digest = digest_file(test_input).await?;
self.config.coverage.path.join("inputs").join(digest)
self.config.coverage.local_path.join("inputs").join(digest)
};

fs::create_dir_all(&coverage_path).await.with_context(|| {
Expand Down
20 changes: 10 additions & 10 deletions src/agent/onefuzz-agent/src/tasks/fuzz/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl GeneratorTask {
self.config.crashes.init().await?;
if let Some(tools) = &self.config.tools {
tools.init_pull().await?;
set_executable(&tools.path).await?;
set_executable(&tools.local_path).await?;
}

let hb_client = self.config.common.init_heartbeat().await?;
Expand Down Expand Up @@ -104,7 +104,7 @@ impl GeneratorTask {
loop {
for corpus_dir in &self.config.readonly_inputs {
heartbeat_client.alive();
let corpus_dir = &corpus_dir.path;
let corpus_dir = &corpus_dir.local_path;
let generated_inputs = tempdir()?;
let generated_inputs_path = generated_inputs.path();

Expand All @@ -131,7 +131,7 @@ impl GeneratorTask {
file.file_name()
};

let destination_file = self.config.crashes.path.join(destination_file);
let destination_file = self.config.crashes.local_path.join(destination_file);
if tester.is_crash(file.path()).await? {
fs::rename(file.path(), &destination_file).await?;
debug!("crash found {}", destination_file.display());
Expand Down Expand Up @@ -162,7 +162,7 @@ impl GeneratorTask {
tester.instance_telemetry_key(&key)
})
.set_optional_ref(&self.config.tools, |expand, tools| {
expand.tools_dir(&tools.path)
expand.tools_dir(&tools.local_path)
});

let generator_path = expand.evaluate_value(&self.config.generator_exe)?;
Expand Down Expand Up @@ -240,20 +240,20 @@ mod tests {
generator_exe: String::from("{tools_dir}/radamsa"),
generator_options,
readonly_inputs: vec![SyncedDir {
path: readonly_inputs_local,
url: Some(BlobContainerUrl::parse(
local_path: readonly_inputs_local,
remote_path: Some(BlobContainerUrl::parse(
Url::from_directory_path(inputs).unwrap(),
)?),
}],
crashes: SyncedDir {
path: crashes_local,
url: Some(BlobContainerUrl::parse(
local_path: crashes_local,
remote_path: Some(BlobContainerUrl::parse(
Url::from_directory_path(crashes).unwrap(),
)?),
},
tools: Some(SyncedDir {
path: tools_local,
url: Some(BlobContainerUrl::parse(
local_path: tools_local,
remote_path: Some(BlobContainerUrl::parse(
Url::from_directory_path(radamsa_dir).unwrap(),
)?),
}),
Expand Down
24 changes: 17 additions & 7 deletions src/agent/onefuzz-agent/src/tasks/fuzz/libfuzzer_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,12 @@ impl LibFuzzerFuzzTask {
}

pub async fn verify(&self) -> Result<()> {
let mut directories = vec![self.config.inputs.path.clone()];
let mut directories = vec![self.config.inputs.local_path.clone()];
if let Some(readonly_inputs) = &self.config.readonly_inputs {
let mut dirs = readonly_inputs.iter().map(|x| x.path.clone()).collect();
let mut dirs = readonly_inputs
.iter()
.map(|x| x.local_path.clone())
.collect();
directories.append(&mut dirs);
}

Expand Down Expand Up @@ -136,7 +139,7 @@ impl LibFuzzerFuzzTask {
let task_dir = self
.config
.inputs
.path
.local_path
.parent()
.ok_or_else(|| anyhow!("Invalid input path"))?;
let temp_path = task_dir.join(".temp");
Expand All @@ -161,7 +164,12 @@ impl LibFuzzerFuzzTask {

let mut entries = tokio::fs::read_dir(local_input_dir.path()).await?;
while let Ok(Some(entry)) = entries.next_entry().await {
let destination_path = self.config.inputs.path.clone().join(entry.file_name());
let destination_path = self
.config
.inputs
.local_path
.clone()
.join(entry.file_name());
tokio::fs::rename(&entry.path(), &destination_path)
.await
.with_context(|| {
Expand Down Expand Up @@ -189,9 +197,11 @@ impl LibFuzzerFuzzTask {

debug!("starting fuzzer run, run_id = {}", run_id);

let mut inputs = vec![&self.config.inputs.path];
let mut inputs = vec![&self.config.inputs.local_path];
if let Some(readonly_inputs) = &self.config.readonly_inputs {
readonly_inputs.iter().for_each(|d| inputs.push(&d.path));
readonly_inputs
.iter()
.for_each(|d| inputs.push(&d.local_path));
}

let fuzzer = LibFuzzer::new(
Expand Down Expand Up @@ -262,7 +272,7 @@ impl LibFuzzerFuzzTask {

for file in &files {
if let Some(filename) = file.file_name() {
let dest = self.config.crashes.path.join(filename);
let dest = self.config.crashes.local_path.join(filename);
if let Err(e) = tokio::fs::rename(file.clone(), dest.clone()).await {
if !dest.exists() {
bail!(e)
Expand Down
40 changes: 23 additions & 17 deletions src/agent/onefuzz-agent/src/tasks/fuzz/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {
// setup tools
if let Some(tools) = &config.tools {
tools.init_pull().await?;
set_executable(&tools.path).await?;
set_executable(&tools.local_path).await?;
}

// setup crashes
let crashes = SyncedDir {
path: runtime_dir.path().join("crashes"),
url: config.crashes.url.clone(),
local_path: runtime_dir.path().join("crashes"),
remote_path: config.crashes.remote_path.clone(),
};
crashes.init().await?;
let monitor_crashes = crashes.monitor_results(new_result, false);
Expand All @@ -92,8 +92,8 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {
);

let inputs = SyncedDir {
path: runtime_dir.path().join("inputs"),
url: config.inputs.url.clone(),
local_path: runtime_dir.path().join("inputs"),
remote_path: config.inputs.remote_path.clone(),
};

inputs.init().await?;
Expand All @@ -105,7 +105,7 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {
let delay = std::time::Duration::from_secs(10);
loop {
dir.sync_pull().await?;
if has_files(&dir.path).await? {
if has_files(&dir.local_path).await? {
break;
}
delay_with_jitter(delay).await;
Expand Down Expand Up @@ -177,13 +177,15 @@ async fn start_supervisor(
.supervisor_exe(&config.supervisor_exe)
.supervisor_options(&config.supervisor_options)
.runtime_dir(&runtime_dir)
.crashes(&crashes.path)
.input_corpus(&inputs.path)
.crashes(&crashes.local_path)
.input_corpus(&inputs.local_path)
.reports_dir(&reports_dir)
.setup_dir(&config.common.setup_dir)
.job_id(&config.common.job_id)
.task_id(&config.common.task_id)
.set_optional_ref(&config.tools, |expand, tools| expand.tools_dir(&tools.path))
.set_optional_ref(&config.tools, |expand, tools| {
expand.tools_dir(&tools.local_path)
})
.set_optional_ref(&config.target_exe, |expand, target_exe| {
expand.target_exe(target_exe)
})
Expand All @@ -200,11 +202,15 @@ async fn start_supervisor(
tester.instance_telemetry_key(&key)
})
.set_optional_ref(
&config.crashes.url.clone().and_then(|u| u.account()),
&config.crashes.remote_path.clone().and_then(|u| u.account()),
|tester, account| tester.crashes_account(account),
)
.set_optional_ref(
&config.crashes.url.clone().and_then(|u| u.container()),
&config
.crashes
.remote_path
.clone()
.and_then(|u| u.container()),
|tester, container| tester.crashes_container(container),
);

Expand Down Expand Up @@ -286,21 +292,21 @@ mod tests {
let crashes_local = tempfile::tempdir().unwrap().path().into();
let corpus_dir_local = tempfile::tempdir().unwrap().path().into();
let crashes = SyncedDir {
path: crashes_local,
url: Some(
local_path: crashes_local,
remote_path: Some(
BlobContainerUrl::parse(Url::from_directory_path(fault_dir_temp).unwrap()).unwrap(),
),
};

let corpus_dir_temp = tempfile::tempdir().unwrap();
let corpus_dir = SyncedDir {
path: corpus_dir_local,
url: Some(
local_path: corpus_dir_local,
remote_path: Some(
BlobContainerUrl::parse(Url::from_directory_path(corpus_dir_temp).unwrap())
.unwrap(),
),
};
let seed_file_name = corpus_dir.path.join("seed.txt");
let seed_file_name = corpus_dir.local_path.join("seed.txt");
tokio::fs::write(seed_file_name, "xyz").await.unwrap();

let target_options = Some(vec!["{input}".to_owned()]);
Expand Down Expand Up @@ -349,7 +355,7 @@ mod tests {
let notify = Notify::new();
let _fuzzing_monitor =
monitor_process(process, "supervisor".to_string(), false, Some(&notify));
let stat_output = crashes.path.join("fuzzer_stats");
let stat_output = crashes.local_path.join("fuzzer_stats");
let start = Instant::now();
loop {
if has_stats(&stat_output).await {
Expand Down
Loading