Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

local_run refactoring #508

Merged
110 commits merged into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
110 commits
Select commit Hold shift + click to select a range
1faaa67
moving sync operation inside SyncDir
chkeita Feb 1, 2021
19e3fae
Refactor queue management
chkeita Feb 4, 2021
359969e
more refactoring
chkeita Feb 5, 2021
216571e
WIP local queue implementation
chkeita Feb 6, 2021
7f54250
Fix build
chkeita Feb 16, 2021
d6ae489
build fix
chkeita Feb 16, 2021
ebc797c
removed unused imports
chkeita Feb 16, 2021
74e6c9b
move auth to onefuzz
chkeita Feb 16, 2021
10f2b2e
handle queue authentication
chkeita Feb 16, 2021
8be75a8
WIP converting local fuzzing templates
chkeita Feb 16, 2021
68a5fd9
build fix
chkeita Feb 17, 2021
b3d352e
local queue sender and receiver are now create per requests
chkeita Feb 18, 2021
bacb202
fix url scheme check
chkeita Feb 18, 2021
ad12f53
fixing unit tests
chkeita Feb 18, 2021
04a0de2
formatting
chkeita Feb 18, 2021
c8c4951
utlis::download_inputs supports file url
chkeita Feb 18, 2021
a2a11a2
support local files in BlobUrl
chkeita Feb 18, 2021
9503f0d
updating local lib fuzzer crash report
chkeita Feb 20, 2021
b14ab1a
build fix
chkeita Feb 20, 2021
6de303a
formatting
chkeita Feb 20, 2021
bb8d459
formatting
chkeita Feb 20, 2021
a6ee53f
fix tests
chkeita Feb 20, 2021
1d5485a
convert blobUrl to enum
chkeita Feb 20, 2021
21bf38b
formatting
chkeita Feb 20, 2021
9d7f395
fix clippy issues
chkeita Feb 20, 2021
6e1802d
fix test
chkeita Feb 20, 2021
b55f5a3
introducing DirectoryMonitorQueue struct
chkeita Feb 22, 2021
2cd5231
bring back message ids
chkeita Feb 22, 2021
6b80377
cargo lock update
chkeita Feb 22, 2021
3b91722
wiring up radamsa, generic_crash_report
chkeita Feb 22, 2021
590f6f9
adding local generic analyis
chkeita Feb 22, 2021
f0417be
build fix
chkeita Feb 23, 2021
88505e7
remove byte order mark when reading mesasge from a queue
chkeita Feb 24, 2021
2c4732b
formatting
chkeita Feb 24, 2021
efeb0df
Merge remote-tracking branch 'upstream/main' into chkeita/local_run
chkeita Feb 24, 2021
58a8cf5
revert change in file upload
chkeita Feb 24, 2021
0ca077f
Introducing synchronization accros local folders
chkeita Feb 25, 2021
cdfc710
build fix
chkeita Feb 25, 2021
0798e52
- generating new task id for each task
chkeita Feb 25, 2021
584cfc9
clippy fix
chkeita Feb 25, 2021
c420af1
disabling radamsa test
chkeita Feb 25, 2021
0080d17
reenabling radamsa test
chkeita Feb 25, 2021
6041c7e
fix test
chkeita Feb 25, 2021
2ee8d55
test fix
chkeita Feb 25, 2021
1e0efc6
Merge remote-tracking branch 'upstream/main' into chkeita/local_run
chkeita Feb 25, 2021
e8f31db
another test fix
chkeita Feb 25, 2021
ca54fd9
formatting
chkeita Feb 25, 2021
be9b02f
test fix
chkeita Feb 25, 2021
52a13d8
- passing refernce to queue client instead of just the url
chkeita Feb 27, 2021
d5dcdd3
Merge remote-tracking branch 'upstream/main' into chkeita/local_run
chkeita Feb 27, 2021
8b72651
clippy fix
chkeita Feb 27, 2021
bdd64c5
fix clippy
chkeita Feb 27, 2021
73f7a27
adding crash analysis to local libfuzzer
chkeita Mar 1, 2021
66b2c05
adding libfuzzer merge to local run
chkeita Mar 1, 2021
0874cef
Merge remote-tracking branch 'upstream/main' into chkeita/local_run
chkeita Mar 1, 2021
3cd8f3d
Merge remote-tracking branch 'upstream/main' into chkeita/local_run
chkeita Mar 1, 2021
c785a5b
waiting for the crash_dir to be created before monitoring it
chkeita Mar 2, 2021
1dd00b7
Merge remote-tracking branch 'upstream/main' into chkeita/local_run
chkeita Mar 2, 2021
054a6ba
cleanup comments
chkeita Mar 2, 2021
154f04e
formatting
chkeita Mar 2, 2021
3a1ff1e
Merge remote-tracking branch 'upstream/main' into chkeita/local_run
chkeita Mar 2, 2021
df35f3a
regenerate lock files
chkeita Mar 2, 2021
2e1f4e0
removing workset file
chkeita Mar 2, 2021
29ea9bb
removing queue file creation
chkeita Mar 2, 2021
1365779
remove unused parameter
chkeita Mar 2, 2021
e73fea6
formatting
chkeita Mar 2, 2021
a6542e3
- address PR comments
chkeita Mar 3, 2021
b45a990
clippy fix
chkeita Mar 3, 2021
fbdaf21
adding trailing '/' to container file path
chkeita Mar 3, 2021
047246d
using json for local queue serialization
chkeita Mar 3, 2021
2530447
copying instead renaming to void issues when source and destination a…
chkeita Mar 3, 2021
a8cef11
absolutize paths
chkeita Mar 3, 2021
d17cbd9
revert back to xml for the serilization of local messages
chkeita Mar 4, 2021
1403f8d
libfuzzer local now exists as soon as there a task returns an error
chkeita Mar 4, 2021
ebf7da6
adding context information
chkeita Mar 4, 2021
15f1d84
clippy fix
chkeita Mar 4, 2021
47137fd
warn instead of bail when file copy fail
chkeita Mar 4, 2021
8b3523d
using backoff to wait for file
chkeita Mar 4, 2021
b58bc44
warn instead of bail on fail upload
chkeita Mar 4, 2021
e5650e9
using select_all instead of try_join_all
chkeita Mar 4, 2021
532f4fa
fix tests
chkeita Mar 4, 2021
857eec7
Merge branch 'main' into chkeita/local_run
chkeita Mar 4, 2021
5bd48b4
adding comment
chkeita Mar 4, 2021
020764b
build fix
chkeita Mar 5, 2021
1de393f
fix sync push
chkeita Mar 5, 2021
5509eea
Merge branch 'main' into chkeita/local_run
bmc-msft Mar 8, 2021
5e5b6ca
Update src/agent/onefuzz/src/blob/url.rs
chkeita Mar 8, 2021
cd2ce06
Update src/agent/onefuzz-agent/src/local/common.rs
chkeita Mar 8, 2021
d0a0081
Update src/agent/onefuzz-agent/src/local/cmd.rs
chkeita Mar 8, 2021
1cc7e0c
Merge branch 'main' into chkeita/local_run
bmc-msft Mar 10, 2021
707981d
Merge branch 'main' into chkeita/local_run
chkeita Mar 11, 2021
341645f
fix upload
chkeita Mar 11, 2021
ad19f31
build fix
chkeita Mar 11, 2021
b3aac9b
rename _sync to sync_impl
chkeita Mar 12, 2021
7a503e6
added unit tests
chkeita Mar 12, 2021
9b3b730
Merge branch 'main' into chkeita/local_run
bmc-msft Mar 12, 2021
741660c
bringing back bail when upload fail but
chkeita Mar 12, 2021
6a5dd24
introduced a type to distinguish between dir and file path
chkeita Mar 12, 2021
ca62a9b
bring back comment
chkeita Mar 12, 2021
2bde835
update to yaque 0.5.1
chkeita Mar 13, 2021
4c4286f
Merge remote-tracking branch 'upstream/main' into chkeita/local_run
chkeita Mar 13, 2021
747fa6d
Creating temporary folder inside the libfuzzer task dir
chkeita Mar 15, 2021
0fe1240
Merge branch 'main' into chkeita/local_run
chkeita Mar 15, 2021
f911db0
Merge branch 'main' into chkeita/local_run
bmc-msft Mar 15, 2021
3f7aee3
Update src/agent/reqwest-retry/src/lib.rs
bmc-msft Mar 15, 2021
43d11fb
merge fix
chkeita Mar 15, 2021
53ab0b7
Merge branch 'main' into chkeita/local_run
bmc-msft Mar 16, 2021
0c05a2f
add send_retry_default to local run's download_input
demoray Mar 16, 2021
e9f53a4
Merge pull request #2 from bmc-msft/add-retry-to-download_input-local…
chkeita Mar 16, 2021
44bd9bb
Merge remote-tracking branch 'upstream/main' into chkeita/local_run
chkeita Mar 17, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 185 additions & 38 deletions src/agent/Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/agent/onefuzz-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ integration_test=[]
anyhow = "1.0"
appinsights = "0.1"
async-trait = "0.1"
backoff = { version = "0.3", features = ["async-std"] }
clap = "2.33"
tempfile = "3.2"
env_logger = "0.8"
Expand All @@ -32,6 +33,7 @@ onefuzz = { path = "../onefuzz" }
storage-queue = { path = "../storage-queue" }
reqwest-retry = { path = "../reqwest-retry" }
onefuzz-telemetry = { path = "../onefuzz-telemetry" }
path-absolutize = "3.0.6"

[dev-dependencies]
tempfile = "3.2"
24 changes: 0 additions & 24 deletions src/agent/onefuzz-agent/src/debug/cmd.rs

This file was deleted.

58 changes: 0 additions & 58 deletions src/agent/onefuzz-agent/src/debug/libfuzzer_merge.rs

This file was deleted.

5 changes: 0 additions & 5 deletions src/agent/onefuzz-agent/src/debug/mod.rs

This file was deleted.

11 changes: 9 additions & 2 deletions src/agent/onefuzz-agent/src/local/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@ use anyhow::Result;
use clap::{App, SubCommand};

use crate::local::{
common::add_common_config, generic_crash_report, generic_generator, libfuzzer,
libfuzzer_coverage, libfuzzer_crash_report, libfuzzer_fuzz, radamsa,
common::add_common_config, generic_analysis, generic_crash_report, generic_generator,
libfuzzer, libfuzzer_coverage, libfuzzer_crash_report, libfuzzer_fuzz, libfuzzer_merge,
radamsa,
};

const RADAMSA: &str = "radamsa";
const LIBFUZZER: &str = "libfuzzer";
const LIBFUZZER_FUZZ: &str = "libfuzzer-fuzz";
const LIBFUZZER_CRASH_REPORT: &str = "libfuzzer-crash-report";
const LIBFUZZER_COVERAGE: &str = "libfuzzer-coverage";
const LIBFUZZER_MERGE: &str = "libfuzzer-merge";
const GENERIC_CRASH_REPORT: &str = "generic-crash-report";
const GENERIC_GENERATOR: &str = "generic-generator";
const GENERIC_ANALYSIS: &str = "generic-analysis";

pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
match args.subcommand() {
Expand All @@ -24,6 +27,8 @@ pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
(LIBFUZZER_FUZZ, Some(sub)) => libfuzzer_fuzz::run(sub).await,
(LIBFUZZER_COVERAGE, Some(sub)) => libfuzzer_coverage::run(sub).await,
(LIBFUZZER_CRASH_REPORT, Some(sub)) => libfuzzer_crash_report::run(sub).await,
(LIBFUZZER_MERGE, Some(sub)) => libfuzzer_merge::run(sub).await,
(GENERIC_ANALYSIS, Some(sub)) => generic_analysis::run(sub).await,
(GENERIC_CRASH_REPORT, Some(sub)) => generic_crash_report::run(sub).await,
(GENERIC_GENERATOR, Some(sub)) => generic_generator::run(sub).await,
_ => {
Expand All @@ -41,6 +46,7 @@ pub fn args(name: &str) -> App<'static, 'static> {
.subcommand(add_common_config(libfuzzer_coverage::args(
LIBFUZZER_COVERAGE,
)))
.subcommand(add_common_config(libfuzzer_merge::args(LIBFUZZER_MERGE)))
.subcommand(add_common_config(libfuzzer_crash_report::args(
LIBFUZZER_CRASH_REPORT,
)))
Expand All @@ -50,4 +56,5 @@ pub fn args(name: &str) -> App<'static, 'static> {
.subcommand(add_common_config(generic_generator::args(
GENERIC_GENERATOR,
)))
.subcommand(add_common_config(generic_analysis::args(GENERIC_ANALYSIS)))
}
191 changes: 141 additions & 50 deletions src/agent/onefuzz-agent/src/local/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,20 @@ use crate::tasks::config::CommonConfig;
use crate::tasks::utils::parse_key_value;
use anyhow::Result;
use clap::{App, Arg, ArgMatches};
use std::{collections::HashMap, path::PathBuf};

use onefuzz::jitter::delay_with_jitter;
use onefuzz::{blob::BlobContainerUrl, monitor::DirectoryMonitor, syncdir::SyncedDir};
use reqwest::Url;
use std::{
collections::HashMap,
path::{Path, PathBuf},
time::Duration,
};
use uuid::Uuid;

use backoff::{future::retry, Error as BackoffError, ExponentialBackoff};
use path_absolutize::Absolutize;
use std::task::Poll;

pub const SETUP_DIR: &str = "setup_dir";
pub const INPUTS_DIR: &str = "inputs_dir";
pub const CRASHES_DIR: &str = "crashes_dir";
Expand Down Expand Up @@ -33,46 +43,30 @@ pub const GENERATOR_EXE: &str = "generator_exe";
pub const GENERATOR_ENV: &str = "generator_env";
pub const GENERATOR_OPTIONS: &str = "generator_options";

pub const ANALYZER_EXE: &str = "analyzer_exe";
pub const ANALYZER_OPTIONS: &str = "analyzer_options";
pub const ANALYZER_ENV: &str = "analyzer_env";
pub const ANALYSIS_DIR: &str = "analysis_dir";
pub const ANALYSIS_INPUTS: &str = "analysis_inputs";
pub const ANALYSIS_UNIQUE_INPUTS: &str = "analysis_unique_inputs";
pub const PRESERVE_EXISTING_OUTPUTS: &str = "preserve_existing_outputs";

const WAIT_FOR_MAX_WAIT: Duration = Duration::from_secs(10);
const WAIT_FOR_DIR_DELAY: Duration = Duration::from_secs(1);

pub enum CmdType {
Target,
Generator,
// Supervisor,
}

pub fn add_cmd_options(
cmd_type: CmdType,
exe: bool,
arg: bool,
env: bool,
mut app: App<'static, 'static>,
) -> App<'static, 'static> {
let (exe_name, env_name, arg_name) = match cmd_type {
CmdType::Target => (TARGET_EXE, TARGET_ENV, TARGET_OPTIONS),
// CmdType::Supervisor => (SUPERVISOR_EXE, SUPERVISOR_ENV, SUPERVISOR_OPTIONS),
CmdType::Generator => (GENERATOR_EXE, GENERATOR_ENV, GENERATOR_OPTIONS),
};

if exe {
app = app.arg(Arg::with_name(exe_name).takes_value(true).required(true));
}
if env {
app = app.arg(
Arg::with_name(env_name)
.long(env_name)
.takes_value(true)
.multiple(true),
)
}
if arg {
app = app.arg(
Arg::with_name(arg_name)
.long(arg_name)
.takes_value(true)
.value_delimiter(" ")
.help("Use a quoted string with space separation to denote multiple arguments"),
)
pub fn get_hash_map(args: &clap::ArgMatches<'_>, name: &str) -> Result<HashMap<String, String>> {
chkeita marked this conversation as resolved.
Show resolved Hide resolved
let mut env = HashMap::new();
for opt in args.values_of_lossy(name).unwrap_or_default() {
let (k, v) = parse_key_value(opt)?;
env.insert(k, v);
}
app
Ok(env)
}

pub fn get_cmd_exe(cmd_type: CmdType, args: &clap::ArgMatches<'_>) -> Result<String> {
Expand Down Expand Up @@ -105,13 +99,7 @@ pub fn get_cmd_env(
// CmdType::Supervisor => SUPERVISOR_ENV,
CmdType::Generator => GENERATOR_ENV,
};

let mut env = HashMap::new();
for opt in args.values_of_lossy(env_name).unwrap_or_default() {
let (k, v) = parse_key_value(opt)?;
env.insert(k, v);
}
Ok(env)
get_hash_map(args, env_name)
}

pub fn add_common_config(app: App<'static, 'static>) -> App<'static, 'static> {
Expand Down Expand Up @@ -142,17 +130,56 @@ pub fn add_common_config(app: App<'static, 'static>) -> App<'static, 'static> {
}

fn get_uuid(name: &str, args: &ArgMatches<'_>) -> Result<Uuid> {
match value_t!(args, name, String) {
Ok(x) => Uuid::parse_str(&x)
.map_err(|x| format_err!("invalid {}. uuid expected. {})", name, x)),
Err(_) => Ok(Uuid::nil()),
}
value_t!(args, name, String).map(|x| {
Uuid::parse_str(&x).map_err(|x| format_err!("invalid {}. uuid expected. {})", name, x))
})?
}

pub fn get_synced_dirs(
name: &str,
job_id: Uuid,
task_id: Uuid,
args: &ArgMatches<'_>,
) -> Result<Vec<SyncedDir>> {
let current_dir = std::env::current_dir()?;
let dirs: Result<Vec<SyncedDir>> = value_t!(args, name, PathBuf)?
.iter()
.enumerate()
.map(|(index, remote_path)| {
let path = PathBuf::from(remote_path);
let remote_path = path.absolutize()?;
chkeita marked this conversation as resolved.
Show resolved Hide resolved
let remote_url = Url::from_file_path(remote_path).expect("invalid file path");
let remote_blob_url = BlobContainerUrl::new(remote_url).expect("invalid url");
let path = current_dir.join(format!("{}/{}/{}_{}", job_id, task_id, name, index));
Ok(SyncedDir {
url: remote_blob_url,
path,
})
})
.collect();
Ok(dirs?)
}

pub fn get_synced_dir(
name: &str,
job_id: Uuid,
task_id: Uuid,
args: &ArgMatches<'_>,
) -> Result<SyncedDir> {
let remote_path = value_t!(args, name, PathBuf)?.absolutize()?.into_owned();
let remote_url = Url::from_file_path(remote_path).map_err(|_| anyhow!("invalid file path"))?;
let remote_blob_url = BlobContainerUrl::new(remote_url)?;
let path = std::env::current_dir()?.join(format!("{}/{}/{}", job_id, task_id, name));
Ok(SyncedDir {
url: remote_blob_url,
path,
})
}

pub fn build_common_config(args: &ArgMatches<'_>) -> Result<CommonConfig> {
let job_id = get_uuid("job_id", args)?;
let task_id = get_uuid("task_id", args)?;
let instance_id = get_uuid("instance_id", args)?;
let job_id = get_uuid("job_id", args).unwrap_or_else(|_| Uuid::nil());
let task_id = get_uuid("task_id", args).unwrap_or_else(|_| Uuid::new_v4());
let instance_id = get_uuid("instance_id", args).unwrap_or_else(|_| Uuid::nil());

let setup_dir = if args.is_present(SETUP_DIR) {
value_t!(args, SETUP_DIR, PathBuf)?
Expand All @@ -174,3 +201,67 @@ pub fn build_common_config(args: &ArgMatches<'_>) -> Result<CommonConfig> {
};
Ok(config)
}

/// Information about a local path being monitored
/// A new notification will be received on the queue url
/// For each new file added to the directory
pub struct DirectoryMonitorQueue {
pub directory_path: PathBuf,
pub queue_client: storage_queue::QueueClient,
pub handle: tokio::task::JoinHandle<Result<()>>,
}

impl DirectoryMonitorQueue {
pub async fn start_monitoring(directory_path: impl AsRef<Path>) -> Result<Self> {
let directory_path = PathBuf::from(directory_path.as_ref());
let directory_path_clone = directory_path.clone();
let queue_client = storage_queue::QueueClient::Channel(
storage_queue::local_queue::ChannelQueueClient::new()?,
);
let queue = queue_client.clone();
let handle: tokio::task::JoinHandle<Result<()>> = tokio::spawn(async move {
let mut monitor = DirectoryMonitor::new(directory_path_clone.clone());
monitor.start()?;
loop {
match monitor.poll_file() {
Poll::Ready(Some(file_path)) => {
let file_url = Url::from_file_path(file_path)
.map_err(|_| anyhow!("invalid file path"))?;
queue.enqueue(file_url).await?;
}
Poll::Ready(None) => break,
Poll::Pending => delay_with_jitter(Duration::from_secs(1)).await,
}
}
Ok(())
});

Ok(DirectoryMonitorQueue {
directory_path,
queue_client,
handle,
})
}
}

pub async fn wait_for_dir(path: impl AsRef<Path>) -> Result<()> {
chkeita marked this conversation as resolved.
Show resolved Hide resolved
let op = || async {
if path.as_ref().exists() {
Ok(())
} else {
Err(BackoffError::Transient(anyhow::anyhow!(
"path '{:?}' does not exisit",
path.as_ref()
)))
}
};
retry(
ExponentialBackoff {
max_elapsed_time: Some(WAIT_FOR_MAX_WAIT),
max_interval: WAIT_FOR_DIR_DELAY,
..ExponentialBackoff::default()
},
op,
)
.await
}
Loading