Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Commit

Permalink
refactor SyncDir and blob container url (#809)
Browse files Browse the repository at this point in the history
  • Loading branch information
chkeita authored Apr 19, 2021
1 parent 8d5a7d1 commit 85f606a
Show file tree
Hide file tree
Showing 14 changed files with 187 additions and 146 deletions.
17 changes: 10 additions & 7 deletions src/agent/onefuzz-agent/src/local/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,14 @@ pub fn get_synced_dirs(
let remote_blob_url = BlobContainerUrl::new(remote_url).expect("invalid url");
let path = current_dir.join(format!("{}/{}/{}_{}", job_id, task_id, name, index));
Ok(SyncedDir {
url: Some(remote_blob_url),
path,
remote_path: Some(remote_blob_url),
local_path: path,
})
} else {
Ok(SyncedDir { url: None, path })
Ok(SyncedDir {
remote_path: None,
local_path: path,
})
}
})
.collect()
Expand All @@ -195,13 +198,13 @@ pub fn get_synced_dir(
let remote_blob_url = BlobContainerUrl::new(remote_url)?;
let path = std::env::current_dir()?.join(format!("{}/{}/{}", job_id, task_id, name));
Ok(SyncedDir {
url: Some(remote_blob_url),
path,
remote_path: Some(remote_blob_url),
local_path: path,
})
} else {
Ok(SyncedDir {
url: None,
path: remote_path,
remote_path: None,
local_path: remote_path,
})
}
}
Expand Down
20 changes: 10 additions & 10 deletions src/agent/onefuzz-agent/src/tasks/analysis/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct Config {
pub async fn run(config: Config) -> Result<()> {
let task_dir = config
.analysis
.path
.local_path
.parent()
.ok_or_else(|| anyhow!("Invalid input path"))?;
let temp_path = task_dir.join(".temp");
Expand Down Expand Up @@ -94,7 +94,7 @@ pub async fn run(config: Config) -> Result<()> {
(None, None)
};

set_executable(&config.tools.path).await?;
set_executable(&config.tools.local_path).await?;
run_existing(&config, &reports_path).await?;
let poller = poll_inputs(&config, tmp, &reports_path);

Expand All @@ -114,7 +114,7 @@ async fn run_existing(config: &Config, reports_dir: &Option<PathBuf>) -> Result<
if let Some(crashes) = &config.crashes {
crashes.init_pull().await?;
let mut count: u64 = 0;
let mut read_dir = fs::read_dir(&crashes.path).await?;
let mut read_dir = fs::read_dir(&crashes.local_path).await?;
while let Some(file) = read_dir.next_entry().await? {
debug!("Processing file {:?}", file);
run_tool(file.path(), &config, &reports_dir).await?;
Expand All @@ -128,9 +128,9 @@ async fn run_existing(config: &Config, reports_dir: &Option<PathBuf>) -> Result<

async fn already_checked(config: &Config, input: &BlobUrl) -> Result<bool> {
let result = if let Some(crashes) = &config.crashes {
crashes.url.clone().and_then(|u| u.account()) == input.account()
&& crashes.url.clone().and_then(|u| u.container()) == input.container()
&& crashes.path.join(input.name()).exists()
crashes.remote_path.clone().and_then(|u| u.account()) == input.account()
&& crashes.remote_path.clone().and_then(|u| u.container()) == input.container()
&& crashes.local_path.join(input.name()).exists()
} else {
false
};
Expand Down Expand Up @@ -193,8 +193,8 @@ pub async fn run_tool(
.target_options(&config.target_options)
.analyzer_exe(&config.analyzer_exe)
.analyzer_options(&config.analyzer_options)
.output_dir(&config.analysis.path)
.tools_dir(&config.tools.path)
.output_dir(&config.analysis.local_path)
.tools_dir(&config.tools.local_path)
.setup_dir(&config.common.setup_dir)
.job_id(&config.common.job_id)
.task_id(&config.common.task_id)
Expand All @@ -210,11 +210,11 @@ pub async fn run_tool(
.set_optional_ref(&config.crashes, |tester, crashes| {
tester
.set_optional_ref(
&crashes.url.clone().and_then(|u| u.account()),
&crashes.remote_path.clone().and_then(|u| u.account()),
|tester, account| tester.crashes_account(account),
)
.set_optional_ref(
&crashes.url.clone().and_then(|u| u.container()),
&crashes.remote_path.clone().and_then(|u| u.container()),
|tester, container| tester.crashes_container(container),
)
});
Expand Down
20 changes: 11 additions & 9 deletions src/agent/onefuzz-agent/src/tasks/coverage/libfuzzer_coverage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl CoverageTask {
let mut seen_inputs = false;
// Update the total with the coverage from each seed corpus.
for dir in &self.config.readonly_inputs {
debug!("recording coverage for {}", dir.path.display());
debug!("recording coverage for {}", dir.local_path.display());
dir.init_pull().await?;
if self.record_corpus_coverage(&mut processor, dir).await? {
seen_inputs = true;
Expand Down Expand Up @@ -150,12 +150,14 @@ impl CoverageTask {
processor: &mut CoverageProcessor,
corpus_dir: &SyncedDir,
) -> Result<bool> {
let mut corpus = fs::read_dir(&corpus_dir.path).await.with_context(|| {
format!(
"unable to read corpus coverage directory: {}",
corpus_dir.path.display()
)
})?;
let mut corpus = fs::read_dir(&corpus_dir.local_path)
.await
.with_context(|| {
format!(
"unable to read corpus coverage directory: {}",
corpus_dir.local_path.display()
)
})?;
let mut seen_inputs = false;

loop {
Expand Down Expand Up @@ -187,7 +189,7 @@ pub struct CoverageProcessor {
impl CoverageProcessor {
pub async fn new(config: Arc<Config>) -> Result<Self> {
let heartbeat_client = config.common.init_heartbeat().await?;
let total = TotalCoverage::new(config.coverage.path.join(TOTAL_COVERAGE));
let total = TotalCoverage::new(config.coverage.local_path.join(TOTAL_COVERAGE));
let recorder = CoverageRecorder::new(config.clone()).await?;
let module_totals = BTreeMap::default();

Expand All @@ -209,7 +211,7 @@ impl CoverageProcessor {
debug!("updating module info {:?}", module);

if !self.module_totals.contains_key(&module) {
let parent = &self.config.coverage.path.join("by-module");
let parent = &self.config.coverage.local_path.join("by-module");
fs::create_dir_all(parent).await.with_context(|| {
format!(
"unable to create by-module coverage directory: {}",
Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz-agent/src/tasks/coverage/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl CoverageRecorder {

let coverage_path = {
let digest = digest_file(test_input).await?;
self.config.coverage.path.join("inputs").join(digest)
self.config.coverage.local_path.join("inputs").join(digest)
};

fs::create_dir_all(&coverage_path).await.with_context(|| {
Expand Down
20 changes: 10 additions & 10 deletions src/agent/onefuzz-agent/src/tasks/fuzz/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl GeneratorTask {
self.config.crashes.init().await?;
if let Some(tools) = &self.config.tools {
tools.init_pull().await?;
set_executable(&tools.path).await?;
set_executable(&tools.local_path).await?;
}

let hb_client = self.config.common.init_heartbeat().await?;
Expand Down Expand Up @@ -104,7 +104,7 @@ impl GeneratorTask {
loop {
for corpus_dir in &self.config.readonly_inputs {
heartbeat_client.alive();
let corpus_dir = &corpus_dir.path;
let corpus_dir = &corpus_dir.local_path;
let generated_inputs = tempdir()?;
let generated_inputs_path = generated_inputs.path();

Expand All @@ -131,7 +131,7 @@ impl GeneratorTask {
file.file_name()
};

let destination_file = self.config.crashes.path.join(destination_file);
let destination_file = self.config.crashes.local_path.join(destination_file);
if tester.is_crash(file.path()).await? {
fs::rename(file.path(), &destination_file).await?;
debug!("crash found {}", destination_file.display());
Expand Down Expand Up @@ -162,7 +162,7 @@ impl GeneratorTask {
tester.instance_telemetry_key(&key)
})
.set_optional_ref(&self.config.tools, |expand, tools| {
expand.tools_dir(&tools.path)
expand.tools_dir(&tools.local_path)
});

let generator_path = expand.evaluate_value(&self.config.generator_exe)?;
Expand Down Expand Up @@ -240,20 +240,20 @@ mod tests {
generator_exe: String::from("{tools_dir}/radamsa"),
generator_options,
readonly_inputs: vec![SyncedDir {
path: readonly_inputs_local,
url: Some(BlobContainerUrl::parse(
local_path: readonly_inputs_local,
remote_path: Some(BlobContainerUrl::parse(
Url::from_directory_path(inputs).unwrap(),
)?),
}],
crashes: SyncedDir {
path: crashes_local,
url: Some(BlobContainerUrl::parse(
local_path: crashes_local,
remote_path: Some(BlobContainerUrl::parse(
Url::from_directory_path(crashes).unwrap(),
)?),
},
tools: Some(SyncedDir {
path: tools_local,
url: Some(BlobContainerUrl::parse(
local_path: tools_local,
remote_path: Some(BlobContainerUrl::parse(
Url::from_directory_path(radamsa_dir).unwrap(),
)?),
}),
Expand Down
24 changes: 17 additions & 7 deletions src/agent/onefuzz-agent/src/tasks/fuzz/libfuzzer_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,12 @@ impl LibFuzzerFuzzTask {
}

pub async fn verify(&self) -> Result<()> {
let mut directories = vec![self.config.inputs.path.clone()];
let mut directories = vec![self.config.inputs.local_path.clone()];
if let Some(readonly_inputs) = &self.config.readonly_inputs {
let mut dirs = readonly_inputs.iter().map(|x| x.path.clone()).collect();
let mut dirs = readonly_inputs
.iter()
.map(|x| x.local_path.clone())
.collect();
directories.append(&mut dirs);
}

Expand Down Expand Up @@ -136,7 +139,7 @@ impl LibFuzzerFuzzTask {
let task_dir = self
.config
.inputs
.path
.local_path
.parent()
.ok_or_else(|| anyhow!("Invalid input path"))?;
let temp_path = task_dir.join(".temp");
Expand All @@ -161,7 +164,12 @@ impl LibFuzzerFuzzTask {

let mut entries = tokio::fs::read_dir(local_input_dir.path()).await?;
while let Ok(Some(entry)) = entries.next_entry().await {
let destination_path = self.config.inputs.path.clone().join(entry.file_name());
let destination_path = self
.config
.inputs
.local_path
.clone()
.join(entry.file_name());
tokio::fs::rename(&entry.path(), &destination_path)
.await
.with_context(|| {
Expand Down Expand Up @@ -189,9 +197,11 @@ impl LibFuzzerFuzzTask {

debug!("starting fuzzer run, run_id = {}", run_id);

let mut inputs = vec![&self.config.inputs.path];
let mut inputs = vec![&self.config.inputs.local_path];
if let Some(readonly_inputs) = &self.config.readonly_inputs {
readonly_inputs.iter().for_each(|d| inputs.push(&d.path));
readonly_inputs
.iter()
.for_each(|d| inputs.push(&d.local_path));
}

let fuzzer = LibFuzzer::new(
Expand Down Expand Up @@ -262,7 +272,7 @@ impl LibFuzzerFuzzTask {

for file in &files {
if let Some(filename) = file.file_name() {
let dest = self.config.crashes.path.join(filename);
let dest = self.config.crashes.local_path.join(filename);
if let Err(e) = tokio::fs::rename(file.clone(), dest.clone()).await {
if !dest.exists() {
bail!(e)
Expand Down
40 changes: 23 additions & 17 deletions src/agent/onefuzz-agent/src/tasks/fuzz/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {
// setup tools
if let Some(tools) = &config.tools {
tools.init_pull().await?;
set_executable(&tools.path).await?;
set_executable(&tools.local_path).await?;
}

// setup crashes
let crashes = SyncedDir {
path: runtime_dir.path().join("crashes"),
url: config.crashes.url.clone(),
local_path: runtime_dir.path().join("crashes"),
remote_path: config.crashes.remote_path.clone(),
};
crashes.init().await?;
let monitor_crashes = crashes.monitor_results(new_result, false);
Expand All @@ -92,8 +92,8 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {
);

let inputs = SyncedDir {
path: runtime_dir.path().join("inputs"),
url: config.inputs.url.clone(),
local_path: runtime_dir.path().join("inputs"),
remote_path: config.inputs.remote_path.clone(),
};

inputs.init().await?;
Expand All @@ -105,7 +105,7 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {
let delay = std::time::Duration::from_secs(10);
loop {
dir.sync_pull().await?;
if has_files(&dir.path).await? {
if has_files(&dir.local_path).await? {
break;
}
delay_with_jitter(delay).await;
Expand Down Expand Up @@ -177,13 +177,15 @@ async fn start_supervisor(
.supervisor_exe(&config.supervisor_exe)
.supervisor_options(&config.supervisor_options)
.runtime_dir(&runtime_dir)
.crashes(&crashes.path)
.input_corpus(&inputs.path)
.crashes(&crashes.local_path)
.input_corpus(&inputs.local_path)
.reports_dir(&reports_dir)
.setup_dir(&config.common.setup_dir)
.job_id(&config.common.job_id)
.task_id(&config.common.task_id)
.set_optional_ref(&config.tools, |expand, tools| expand.tools_dir(&tools.path))
.set_optional_ref(&config.tools, |expand, tools| {
expand.tools_dir(&tools.local_path)
})
.set_optional_ref(&config.target_exe, |expand, target_exe| {
expand.target_exe(target_exe)
})
Expand All @@ -200,11 +202,15 @@ async fn start_supervisor(
tester.instance_telemetry_key(&key)
})
.set_optional_ref(
&config.crashes.url.clone().and_then(|u| u.account()),
&config.crashes.remote_path.clone().and_then(|u| u.account()),
|tester, account| tester.crashes_account(account),
)
.set_optional_ref(
&config.crashes.url.clone().and_then(|u| u.container()),
&config
.crashes
.remote_path
.clone()
.and_then(|u| u.container()),
|tester, container| tester.crashes_container(container),
);

Expand Down Expand Up @@ -286,21 +292,21 @@ mod tests {
let crashes_local = tempfile::tempdir().unwrap().path().into();
let corpus_dir_local = tempfile::tempdir().unwrap().path().into();
let crashes = SyncedDir {
path: crashes_local,
url: Some(
local_path: crashes_local,
remote_path: Some(
BlobContainerUrl::parse(Url::from_directory_path(fault_dir_temp).unwrap()).unwrap(),
),
};

let corpus_dir_temp = tempfile::tempdir().unwrap();
let corpus_dir = SyncedDir {
path: corpus_dir_local,
url: Some(
local_path: corpus_dir_local,
remote_path: Some(
BlobContainerUrl::parse(Url::from_directory_path(corpus_dir_temp).unwrap())
.unwrap(),
),
};
let seed_file_name = corpus_dir.path.join("seed.txt");
let seed_file_name = corpus_dir.local_path.join("seed.txt");
tokio::fs::write(seed_file_name, "xyz").await.unwrap();

let target_options = Some(vec!["{input}".to_owned()]);
Expand Down Expand Up @@ -349,7 +355,7 @@ mod tests {
let notify = Notify::new();
let _fuzzing_monitor =
monitor_process(process, "supervisor".to_string(), false, Some(&notify));
let stat_output = crashes.path.join("fuzzer_stats");
let stat_output = crashes.local_path.join("fuzzer_stats");
let start = Instant::now();
loop {
if has_stats(&stat_output).await {
Expand Down
Loading

0 comments on commit 85f606a

Please sign in to comment.