Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Adding option to merge all inputs at once #282

Merged
merged 37 commits into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
8ebc520
adding option to merge all inputs at once
chkeita Nov 6, 2020
a266b44
adding debug command for libfuzzer_merge
chkeita Nov 11, 2020
f8e5ea4
formatting
chkeita Nov 11, 2020
7418ecc
adding libfuzzer merge template
chkeita Nov 11, 2020
b3694f8
Merge branch 'main' into chkeita/254
chkeita Nov 11, 2020
2b3521d
initialize input dir before sync
chkeita Nov 12, 2020
924a7a9
Merge branch 'main' into chkeita/254
chkeita Nov 12, 2020
d194d8b
Merge branch 'main' into chkeita/254
chkeita Nov 12, 2020
e22d449
- set container permissions
chkeita Nov 12, 2020
882f99a
Merge branch 'main' into chkeita/254
chkeita Nov 12, 2020
b198fb0
Add error message when inputs are missing
chkeita Nov 12, 2020
e43159d
formatting
chkeita Nov 12, 2020
2733e5c
missing return value
chkeita Nov 12, 2020
0008f9c
add output_container parameter
chkeita Nov 13, 2020
b608d47
Merge branch 'main' into chkeita/254
chkeita Nov 13, 2020
3986c11
Merge branch 'main' into chkeita/254
bmc-msft Nov 17, 2020
8d8f10a
Merge branch 'main' into chkeita/254
chkeita Nov 18, 2020
24f3f2f
Adding option to overwrite the destination container of the merge
chkeita Nov 18, 2020
8d5d588
formatting
chkeita Nov 18, 2020
f63000b
build fix
chkeita Nov 18, 2020
7883487
updating docs
chkeita Nov 19, 2020
014b2ab
fix api types
chkeita Nov 19, 2020
0f5d339
formatting
chkeita Nov 19, 2020
5fe0e04
rename overwrite_unique_inputs --> overwrite_output_container
chkeita Nov 19, 2020
e029b9f
Merge branch 'main' into chkeita/254
chkeita Nov 19, 2020
d7dbb62
update docs
chkeita Nov 19, 2020
2038e48
Fixing az_copy invocation
chkeita Nov 19, 2020
3ecb17f
Merge branch 'main' into chkeita/254
chkeita Nov 19, 2020
0558281
update docs
chkeita Nov 19, 2020
206fa22
Merge branch 'main' into chkeita/254
bmc-msft Nov 20, 2020
13e3eb1
Merge branch 'main' into chkeita/254
bmc-msft Nov 20, 2020
9efe3d2
renaming overwrite_output_container to preserve_existing_outputs and…
chkeita Nov 21, 2020
a2e7b2d
update docs
chkeita Nov 21, 2020
2189d1f
Merge branch 'main' into chkeita/254
chkeita Nov 21, 2020
877ae91
formatting
chkeita Nov 21, 2020
eeeca77
Merge branch 'main' into chkeita/254
bmc-msft Nov 24, 2020
c3a0372
Merge branch 'main' into chkeita/254
bmc-msft Nov 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/webhook_events.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ Each event will be submitted via HTTP POST to the user provided URL.
"ensemble_sync_delay": {
"title": "Ensemble Sync Delay",
"type": "integer"
},
"overwrite_unique_inputs": {
"title": "Overwrite Unique Inputs",
"type": "boolean"
}
},
"required": [
Expand Down Expand Up @@ -884,6 +888,10 @@ Each event will be submitted via HTTP POST to the user provided URL.
"ensemble_sync_delay": {
"title": "Ensemble Sync Delay",
"type": "integer"
},
"overwrite_unique_inputs": {
"title": "Overwrite Unique Inputs",
"type": "boolean"
}
},
"required": [
Expand Down
2 changes: 2 additions & 0 deletions src/agent/onefuzz-agent/src/debug/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub fn run(args: &clap::ArgMatches) -> Result<()> {
("libfuzzer-coverage", Some(sub)) => crate::debug::libfuzzer_coverage::run(sub)?,
("libfuzzer-crash-report", Some(sub)) => crate::debug::libfuzzer_crash_report::run(sub)?,
("libfuzzer-fuzz", Some(sub)) => crate::debug::libfuzzer_fuzz::run(sub)?,
("libfuzzer-merge", Some(sub)) => crate::debug::libfuzzer_merge::run(sub)?,
_ => println!("missing subcommand\nUSAGE : {}", args.usage()),
}

Expand All @@ -23,4 +24,5 @@ pub fn args() -> App<'static, 'static> {
.subcommand(crate::debug::libfuzzer_coverage::args())
.subcommand(crate::debug::libfuzzer_crash_report::args())
.subcommand(crate::debug::libfuzzer_fuzz::args())
.subcommand(crate::debug::libfuzzer_merge::args())
}
82 changes: 82 additions & 0 deletions src/agent/onefuzz-agent/src/debug/libfuzzer_merge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

use crate::tasks::{
config::CommonConfig,
merge::libfuzzer_merge::{merge_inputs, Config},
utils::parse_key_value,
};
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use onefuzz::{blob::BlobContainerUrl, syncdir::SyncedDir};
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use tokio::runtime::Runtime;
use url::Url;
use uuid::Uuid;

pub fn run(args: &clap::ArgMatches) -> Result<()> {
let target_exe = value_t!(args, "target_exe", PathBuf)?;
let inputs = value_t!(args, "inputs", String)?;
let unique_inputs = value_t!(args, "unique_inputs", String)?;
let target_options = args.values_of_lossy("target_options").unwrap_or_default();

let mut target_env = HashMap::new();
for opt in args.values_of_lossy("target_env").unwrap_or_default() {
let (k, v) = parse_key_value(opt)?;
target_env.insert(k, v);
}

let config = Arc::new(Config {
target_exe,
target_env,
target_options,
input_queue: None,
inputs: vec![SyncedDir {
path: inputs.into(),
url: BlobContainerUrl::new(Url::parse("https://contoso.com/inputs")?)?,
}],
unique_inputs: SyncedDir {
path: unique_inputs.into(),
url: BlobContainerUrl::new(Url::parse("https://contoso.com/unique_inputs")?)?,
},
common: CommonConfig {
heartbeat_queue: None,
instrumentation_key: None,
telemetry_key: None,
job_id: Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap(),
task_id: Uuid::parse_str("11111111-1111-1111-1111-111111111111").unwrap(),
instance_id: Uuid::parse_str("22222222-2222-2222-2222-222222222222").unwrap(),
},
overwrite_unique_inputs: true,
});

let mut rt = Runtime::new()?;
rt.block_on(merge_inputs(
config.clone(),
vec![config.clone().inputs[0].path.clone()],
))?;

Ok(())
}

pub fn args() -> App<'static, 'static> {
SubCommand::with_name("libfuzzer-merge")
.about("execute a local-only libfuzzer merge task")
.arg(
Arg::with_name("target_exe")
.takes_value(true)
.required(true),
)
.arg(Arg::with_name("inputs").takes_value(true).required(true))
.arg(
Arg::with_name("unique_inputs")
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name("target_env")
.long("target_env")
.takes_value(true)
.multiple(true),
)
}
1 change: 1 addition & 0 deletions src/agent/onefuzz-agent/src/debug/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ pub mod generic_crash_report;
pub mod libfuzzer_coverage;
pub mod libfuzzer_crash_report;
pub mod libfuzzer_fuzz;
pub mod libfuzzer_merge;
125 changes: 75 additions & 50 deletions src/agent/onefuzz-agent/src/tasks/merge/libfuzzer_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use onefuzz::{
http::ResponseExt,
jitter::delay_with_jitter,
libfuzzer::{LibFuzzer, LibFuzzerMergeOutput},
syncdir::SyncedDir,
syncdir::{SyncOperation, SyncedDir},
};
use reqwest::Url;
use reqwest_retry::SendRetry;
Expand All @@ -22,7 +22,6 @@ use storage_queue::{QueueClient, EMPTY_QUEUE_DELAY};
#[derive(Debug, Deserialize)]
struct QueueMessage {
content_length: u32,

url: Url,
}

Expand All @@ -31,39 +30,52 @@ pub struct Config {
pub target_exe: PathBuf,
pub target_env: HashMap<String, String>,
pub target_options: Vec<String>,
pub input_queue: Url,
pub inputs: SyncedDir,
pub input_queue: Option<Url>,
pub inputs: Vec<SyncedDir>,
pub unique_inputs: SyncedDir,
pub overwrite_unique_inputs: bool,

#[serde(flatten)]
pub common: CommonConfig,
}

pub async fn spawn(config: Arc<Config>) -> Result<()> {
let hb_client = config.common.init_heartbeat().await?;
config.unique_inputs.init().await?;
loop {
hb_client.alive();
if let Err(error) = process_message(config.clone()).await {
error!(
"failed to process latest message from notification queue: {}",
error
);
if let Some(url) = config.input_queue.clone() {
loop {
let queue = QueueClient::new(url.clone());
if let Err(error) = process_message(config.clone(), queue).await {
error!(
"failed to process latest message from notification queue: {}",
error
);
}
}
} else {
for input in config.inputs.iter() {
input.init().await?;
input.sync_pull().await?;
}
let input_paths = config.inputs.iter().map(|i| &i.path).collect();
sync_and_merge(
config.clone(),
input_paths,
false,
config.overwrite_unique_inputs,
)
.await?;
Ok(())
}
}

async fn process_message(config: Arc<Config>) -> Result<()> {
async fn process_message(config: Arc<Config>, mut input_queue: QueueClient) -> Result<()> {
let hb_client = config.common.init_heartbeat().await?;
hb_client.alive();
let tmp_dir = "./tmp";

verbose!("tmp dir reset");

utils::reset_tmp_dir(tmp_dir).await?;
config.unique_inputs.sync_pull().await?;

let mut queue = QueueClient::new(config.input_queue.clone());

if let Some(msg) = queue.pop().await? {
if let Some(msg) = input_queue.pop().await? {
let input_url = match utils::parse_url_data(msg.data()) {
Ok(url) => url,
Err(err) => {
Expand All @@ -74,28 +86,11 @@ async fn process_message(config: Arc<Config>) -> Result<()> {

let input_path = utils::download_input(input_url.clone(), tmp_dir).await?;
info!("downloaded input to {}", input_path.display());

info!("Merging corpus");
match merge(
&config.target_exe,
&config.target_options,
&config.target_env,
&config.unique_inputs.path,
&tmp_dir,
)
.await
{
Ok(result) if result.added_files_count > 0 => {
info!("Added {} new files to the corpus", result.added_files_count);
config.unique_inputs.sync_push().await?;
}
Ok(_) => info!("No new files added by the merge"),
Err(e) => error!("Merge failed : {}", e),
}
sync_and_merge(config.clone(), vec![tmp_dir], true, false).await?;

verbose!("will delete popped message with id = {}", msg.id());

queue.delete(msg).await?;
input_queue.delete(msg).await?;

verbose!(
"Attempting to delete {} from the candidate container",
Expand All @@ -113,6 +108,48 @@ async fn process_message(config: Arc<Config>) -> Result<()> {
}
}

async fn sync_and_merge(
config: Arc<Config>,
input_dirs: Vec<impl AsRef<Path>>,
pull_inputs: bool,
overwrite_outputs: bool,
) -> Result<LibFuzzerMergeOutput> {
if pull_inputs {
config.unique_inputs.sync_pull().await?;
}
match merge_inputs(config.clone(), input_dirs).await {
bmc-msft marked this conversation as resolved.
Show resolved Hide resolved
Ok(result) => {
if result.added_files_count > 0 {
info!("Added {} new files to the corpus", result.added_files_count);
config
.unique_inputs
.sync(SyncOperation::Push, overwrite_outputs)
.await?;
} else {
info!("No new files added by the merge")
}
Ok(result)
}
Err(e) => {
error!("Merge failed : {}", e);
Err(e)
}
}
}

pub async fn merge_inputs(
config: Arc<Config>,
candidates: Vec<impl AsRef<Path>>,
) -> Result<LibFuzzerMergeOutput> {
info!("Merging corpus");
let merger = LibFuzzer::new(
&config.target_exe,
&config.target_options,
&config.target_env,
);
merger.merge(&config.unique_inputs.path, &candidates).await
}

async fn try_delete_blob(input_url: Url) -> Result<()> {
let http_client = reqwest::Client::new();
match http_client
Expand All @@ -126,15 +163,3 @@ async fn try_delete_blob(input_url: Url) -> Result<()> {
Err(err) => Err(err.into()),
}
}

async fn merge(
target_exe: &Path,
target_options: &[String],
target_env: &HashMap<String, String>,
corpus_dir: &Path,
candidate_dir: impl AsRef<Path>,
) -> Result<LibFuzzerMergeOutput> {
let merger = LibFuzzer::new(target_exe, target_options, target_env);
let candidates = vec![candidate_dir];
merger.merge(&corpus_dir, &candidates).await
}
2 changes: 1 addition & 1 deletion src/agent/onefuzz-supervisor/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl SetupRunner {

// `azcopy sync` requires the local dir to exist.
fs::create_dir_all(&setup_dir).await?;
az_copy::sync(setup_url.to_string(), &setup_dir).await?;
az_copy::sync(setup_url.to_string(), &setup_dir, false).await?;

verbose!(
"synced setup container from {} to {}",
Expand Down
6 changes: 4 additions & 2 deletions src/agent/onefuzz/src/az_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::Result;
use std::ffi::OsStr;
use tokio::process::Command;

pub async fn sync(src: impl AsRef<OsStr>, dst: impl AsRef<OsStr>) -> Result<()> {
pub async fn sync(src: impl AsRef<OsStr>, dst: impl AsRef<OsStr>, delete_dst: bool) -> Result<()> {
use std::process::Stdio;

let mut cmd = Command::new("azcopy");
Expand All @@ -15,7 +15,9 @@ pub async fn sync(src: impl AsRef<OsStr>, dst: impl AsRef<OsStr>) -> Result<()>
.stderr(Stdio::piped())
.arg("sync")
.arg(&src)
.arg(&dst);
.arg(&dst)
.arg("--delete_destination")
chkeita marked this conversation as resolved.
Show resolved Hide resolved
.arg(delete_dst.to_string());
chkeita marked this conversation as resolved.
Show resolved Hide resolved

let output = cmd.spawn()?.wait_with_output().await?;
if !output.status.success() {
Expand Down
16 changes: 8 additions & 8 deletions src/agent/onefuzz/src/syncdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,20 @@ pub struct SyncedDir {
}

impl SyncedDir {
pub async fn sync(&self, operation: SyncOperation) -> Result<()> {
pub async fn sync(&self, operation: SyncOperation, delete_dst: bool) -> Result<()> {
let dir = &self.path;
let url = self.url.url();
let url = url.as_ref();
verbose!("syncing {:?} {}", operation, dir.display());
match operation {
SyncOperation::Push => az_copy::sync(dir, url).await,
SyncOperation::Pull => az_copy::sync(url, dir).await,
SyncOperation::Push => az_copy::sync(dir, url, delete_dst).await,
SyncOperation::Pull => az_copy::sync(url, dir, delete_dst).await,
}
}

pub async fn init_pull(&self) -> Result<()> {
self.init().await?;
self.sync(SyncOperation::Pull).await
self.sync(SyncOperation::Pull, false).await
}

pub async fn init(&self) -> Result<()> {
Expand All @@ -60,11 +60,11 @@ impl SyncedDir {
}

pub async fn sync_pull(&self) -> Result<()> {
self.sync(SyncOperation::Pull).await
self.sync(SyncOperation::Pull, false).await
}

pub async fn sync_push(&self) -> Result<()> {
self.sync(SyncOperation::Push).await
self.sync(SyncOperation::Push, false).await
}

pub async fn continuous_sync(
Expand All @@ -79,7 +79,7 @@ impl SyncedDir {
let delay = Duration::from_secs(delay_seconds);

loop {
self.sync(operation).await?;
self.sync(operation, false).await?;
delay_with_jitter(delay).await;
}
}
Expand Down Expand Up @@ -146,7 +146,7 @@ pub async fn continuous_sync(

loop {
for dir in dirs {
dir.sync(operation).await?;
dir.sync(operation, false).await?;
}
delay_with_jitter(delay).await;
}
Expand Down
Loading