diff --git a/src/agent/Cargo.lock b/src/agent/Cargo.lock index c35a68673a..38f52e7e89 100644 --- a/src/agent/Cargo.lock +++ b/src/agent/Cargo.lock @@ -1812,6 +1812,7 @@ version = "0.1.0" dependencies = [ "appinsights", "iced-x86", + "lazy_static", "log", "serde", "tokio 0.2.25", diff --git a/src/agent/onefuzz-agent/src/local/cmd.rs b/src/agent/onefuzz-agent/src/local/cmd.rs index d4ecced5dd..12382c276a 100644 --- a/src/agent/onefuzz-agent/src/local/cmd.rs +++ b/src/agent/onefuzz-agent/src/local/cmd.rs @@ -39,18 +39,20 @@ pub async fn run(args: clap::ArgMatches<'static>) -> Result<()> { let event_sender = terminal.as_ref().map(|t| t.task_events.clone()); let command_run = tokio::spawn(async move { match args.subcommand() { - (RADAMSA, Some(sub)) => radamsa::run(sub).await, + (RADAMSA, Some(sub)) => radamsa::run(sub, event_sender).await, (LIBFUZZER, Some(sub)) => libfuzzer::run(sub, event_sender).await, - (LIBFUZZER_FUZZ, Some(sub)) => libfuzzer_fuzz::run(sub).await, - (LIBFUZZER_COVERAGE, Some(sub)) => libfuzzer_coverage::run(sub).await, - (LIBFUZZER_CRASH_REPORT, Some(sub)) => libfuzzer_crash_report::run(sub).await, - (LIBFUZZER_MERGE, Some(sub)) => libfuzzer_merge::run(sub).await, - (GENERIC_ANALYSIS, Some(sub)) => generic_analysis::run(sub).await, - (GENERIC_CRASH_REPORT, Some(sub)) => generic_crash_report::run(sub).await, - (GENERIC_GENERATOR, Some(sub)) => generic_generator::run(sub).await, - (GENERIC_TEST_INPUT, Some(sub)) => test_input::run(sub).await, - (LIBFUZZER_TEST_INPUT, Some(sub)) => libfuzzer_test_input::run(sub).await, - (LIBFUZZER_REGRESSION, Some(sub)) => libfuzzer_regression::run(sub).await, + (LIBFUZZER_FUZZ, Some(sub)) => libfuzzer_fuzz::run(sub, event_sender).await, + (LIBFUZZER_COVERAGE, Some(sub)) => libfuzzer_coverage::run(sub, event_sender).await, + (LIBFUZZER_CRASH_REPORT, Some(sub)) => { + libfuzzer_crash_report::run(sub, event_sender).await + } + (LIBFUZZER_MERGE, Some(sub)) => libfuzzer_merge::run(sub, event_sender).await, + (GENERIC_ANALYSIS, Some(sub)) => generic_analysis::run(sub, event_sender).await, + (GENERIC_CRASH_REPORT, Some(sub)) => generic_crash_report::run(sub, event_sender).await, + (GENERIC_GENERATOR, Some(sub)) => generic_generator::run(sub, event_sender).await, + (GENERIC_TEST_INPUT, Some(sub)) => test_input::run(sub, event_sender).await, + (LIBFUZZER_TEST_INPUT, Some(sub)) => libfuzzer_test_input::run(sub, event_sender).await, + (LIBFUZZER_REGRESSION, Some(sub)) => libfuzzer_regression::run(sub, event_sender).await, _ => { anyhow::bail!("missing subcommand\nUSAGE: {}", args.usage()); } diff --git a/src/agent/onefuzz-agent/src/local/common.rs b/src/agent/onefuzz-agent/src/local/common.rs index 348e0a037b..ddd7e1b8d1 100644 --- a/src/agent/onefuzz-agent/src/local/common.rs +++ b/src/agent/onefuzz-agent/src/local/common.rs @@ -15,8 +15,7 @@ use std::{ path::{Path, PathBuf}, time::Duration, }; -use tokio::stream::StreamExt; -use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle, time::delay_for}; +use tokio::sync::mpsc::UnboundedSender; use uuid::Uuid; pub const SETUP_DIR: &str = "setup_dir"; @@ -69,6 +68,7 @@ pub enum CmdType { pub struct LocalContext { pub job_path: PathBuf, pub common_config: CommonConfig, + pub event_sender: Option>, } pub fn get_hash_map(args: &clap::ArgMatches<'_>, name: &str) -> Result> { @@ -212,7 +212,11 @@ pub fn get_synced_dir( // fuzzing tasks from generating random task id to using UUID::nil(). This // enables making the one-shot crash report generation, which isn't really a task, // consistent across multiple runs. -pub fn build_local_context(args: &ArgMatches<'_>, generate_task_id: bool) -> Result { +pub fn build_local_context( + args: &ArgMatches<'_>, + generate_task_id: bool, + event_sender: Option>, +) -> Result { let job_id = get_uuid("job_id", args).unwrap_or_else(|_| Uuid::nil()); let task_id = get_uuid("task_id", args).unwrap_or_else(|_| { if generate_task_id { @@ -250,6 +254,7 @@ pub fn build_local_context(args: &ArgMatches<'_>, generate_task_id: bool) -> Res Ok(LocalContext { job_path, common_config, + event_sender, }) } @@ -317,48 +322,31 @@ pub async fn wait_for_dir(path: impl AsRef) -> Result<()> { .await } -pub fn spawn_file_count_monitor( - dir: PathBuf, - sender: UnboundedSender, -) -> JoinHandle> { - tokio::spawn(async move { - wait_for_dir(&dir).await?; - - loop { - let mut rd = tokio::fs::read_dir(&dir).await?; - let mut count: usize = 0; - - while let Some(Ok(entry)) = rd.next().await { - if entry.path().is_file() { - count += 1; - } - } +#[derive(Debug)] +pub enum UiEvent { + MonitorDir(PathBuf), +} - if sender - .send(UiEvent::FileCount { - dir: dir.clone(), - count, - }) - .is_err() - { - return Ok(()); - } - delay_for(Duration::from_secs(5)).await; - } - }) +pub trait SyncCountDirMonitor { + fn monitor_count(self, event_sender: &Option>) -> Result; } -pub fn monitor_file_urls( - urls: &[Option>], - event_sender: UnboundedSender, -) -> Vec>> { - urls.iter() - .filter_map(|x| x.as_ref()) - .map(|path| spawn_file_count_monitor(path.as_ref().into(), event_sender.clone())) - .collect::>() +impl SyncCountDirMonitor for SyncedDir { + fn monitor_count(self, event_sender: &Option>) -> Result { + if let (Some(event_sender), Some(p)) = (event_sender, self.url.as_file_path()) { + event_sender.send(UiEvent::MonitorDir(p))?; + } + Ok(self) + } } -#[derive(Debug)] -pub enum UiEvent { - FileCount { dir: PathBuf, count: usize }, +impl SyncCountDirMonitor> for Option { + fn monitor_count(self, event_sender: &Option>) -> Result { + if let Some(sd) = self { + let sd = sd.monitor_count(event_sender)?; + Ok(Some(sd)) + } else { + Ok(self) + } + } } diff --git a/src/agent/onefuzz-agent/src/local/generic_analysis.rs b/src/agent/onefuzz-agent/src/local/generic_analysis.rs index ef21bcb9c8..4b0495512e 100644 --- a/src/agent/onefuzz-agent/src/local/generic_analysis.rs +++ b/src/agent/onefuzz-agent/src/local/generic_analysis.rs @@ -4,8 +4,9 @@ use crate::{ local::common::{ build_local_context, get_cmd_arg, get_cmd_exe, get_hash_map, get_synced_dir, CmdType, - ANALYSIS_DIR, ANALYZER_ENV, ANALYZER_EXE, ANALYZER_OPTIONS, CRASHES_DIR, NO_REPRO_DIR, - REPORTS_DIR, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS, TOOLS_DIR, UNIQUE_REPORTS_DIR, + SyncCountDirMonitor, UiEvent, ANALYSIS_DIR, ANALYZER_ENV, ANALYZER_EXE, ANALYZER_OPTIONS, + CRASHES_DIR, NO_REPRO_DIR, REPORTS_DIR, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS, TOOLS_DIR, + UNIQUE_REPORTS_DIR, }, tasks::{ analysis::generic::{run as run_analysis, Config}, @@ -15,11 +16,13 @@ use crate::{ use anyhow::Result; use clap::{App, Arg, SubCommand}; use storage_queue::QueueClient; +use tokio::sync::mpsc::UnboundedSender; pub fn build_analysis_config( args: &clap::ArgMatches<'_>, input_queue: Option, common: CommonConfig, + event_sender: Option>, ) -> Result { let target_exe = get_cmd_exe(CmdType::Target, args)?.into(); let target_options = get_cmd_arg(CmdType::Target, args); @@ -27,18 +30,25 @@ pub fn build_analysis_config( let analyzer_exe = value_t!(args, ANALYZER_EXE, String)?; let analyzer_options = args.values_of_lossy(ANALYZER_OPTIONS).unwrap_or_default(); let analyzer_env = get_hash_map(args, ANALYZER_ENV)?; - let analysis = get_synced_dir(ANALYSIS_DIR, common.job_id, common.task_id, args)?; + let analysis = get_synced_dir(ANALYSIS_DIR, common.job_id, common.task_id, args)? + .monitor_count(&event_sender)?; let tools = get_synced_dir(TOOLS_DIR, common.job_id, common.task_id, args)?; let crashes = if input_queue.is_none() { - get_synced_dir(CRASHES_DIR, common.job_id, common.task_id, args).ok() + get_synced_dir(CRASHES_DIR, common.job_id, common.task_id, args) + .ok() + .monitor_count(&event_sender)? } else { None }; - - let reports = get_synced_dir(REPORTS_DIR, common.job_id, common.task_id, args).ok(); - let no_repro = get_synced_dir(NO_REPRO_DIR, common.job_id, common.task_id, args).ok(); - let unique_reports = - get_synced_dir(UNIQUE_REPORTS_DIR, common.job_id, common.task_id, args).ok(); + let reports = get_synced_dir(REPORTS_DIR, common.job_id, common.task_id, args) + .ok() + .monitor_count(&event_sender)?; + let no_repro = get_synced_dir(NO_REPRO_DIR, common.job_id, common.task_id, args) + .ok() + .monitor_count(&event_sender)?; + let unique_reports = get_synced_dir(UNIQUE_REPORTS_DIR, common.job_id, common.task_id, args) + .ok() + .monitor_count(&event_sender)?; let config = Config { target_exe, @@ -59,9 +69,12 @@ pub fn build_analysis_config( Ok(config) } -pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> { - let context = build_local_context(args, true)?; - let config = build_analysis_config(args, None, context.common_config.clone())?; +pub async fn run( + args: &clap::ArgMatches<'_>, + event_sender: Option>, +) -> Result<()> { + let context = build_local_context(args, true, event_sender.clone())?; + let config = build_analysis_config(args, None, context.common_config.clone(), event_sender)?; run_analysis(config).await } diff --git a/src/agent/onefuzz-agent/src/local/generic_crash_report.rs b/src/agent/onefuzz-agent/src/local/generic_crash_report.rs index 56b10fdac7..8039f4ed9a 100644 --- a/src/agent/onefuzz-agent/src/local/generic_crash_report.rs +++ b/src/agent/onefuzz-agent/src/local/generic_crash_report.rs @@ -4,9 +4,9 @@ use crate::{ local::common::{ build_local_context, get_cmd_arg, get_cmd_env, get_cmd_exe, get_synced_dir, CmdType, - CHECK_ASAN_LOG, CHECK_RETRY_COUNT, CRASHES_DIR, DISABLE_CHECK_DEBUGGER, - DISABLE_CHECK_QUEUE, NO_REPRO_DIR, REPORTS_DIR, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS, - TARGET_TIMEOUT, UNIQUE_REPORTS_DIR, + SyncCountDirMonitor, UiEvent, CHECK_ASAN_LOG, CHECK_RETRY_COUNT, CRASHES_DIR, + DISABLE_CHECK_DEBUGGER, DISABLE_CHECK_QUEUE, NO_REPRO_DIR, REPORTS_DIR, TARGET_ENV, + TARGET_EXE, TARGET_OPTIONS, TARGET_TIMEOUT, UNIQUE_REPORTS_DIR, }, tasks::{ config::CommonConfig, @@ -16,11 +16,13 @@ use crate::{ use anyhow::Result; use clap::{App, Arg, SubCommand}; use storage_queue::QueueClient; +use tokio::sync::mpsc::UnboundedSender; pub fn build_report_config( args: &clap::ArgMatches<'_>, input_queue: Option, common: CommonConfig, + event_sender: Option>, ) -> Result { let target_exe = get_cmd_exe(CmdType::Target, args)?.into(); let target_env = get_cmd_env(CmdType::Target, args)?; @@ -31,16 +33,22 @@ pub fn build_report_config( common.job_id, common.task_id, args, - )?); - let reports = get_synced_dir(REPORTS_DIR, common.job_id, common.task_id, args).ok(); - let no_repro = get_synced_dir(NO_REPRO_DIR, common.job_id, common.task_id, args).ok(); + )?) + .monitor_count(&event_sender)?; + let reports = get_synced_dir(REPORTS_DIR, common.job_id, common.task_id, args) + .ok() + .monitor_count(&event_sender)?; + let no_repro = get_synced_dir(NO_REPRO_DIR, common.job_id, common.task_id, args) + .ok() + .monitor_count(&event_sender)?; let unique_reports = Some(get_synced_dir( UNIQUE_REPORTS_DIR, common.job_id, common.task_id, args, - )?); + )?) + .monitor_count(&event_sender)?; let target_timeout = value_t!(args, TARGET_TIMEOUT, u64).ok(); @@ -70,9 +78,12 @@ pub fn build_report_config( Ok(config) } -pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> { - let context = build_local_context(args, true)?; - let config = build_report_config(args, None, context.common_config.clone())?; +pub async fn run( + args: &clap::ArgMatches<'_>, + event_sender: Option>, +) -> Result<()> { + let context = build_local_context(args, true, event_sender.clone())?; + let config = build_report_config(args, None, context.common_config.clone(), event_sender)?; ReportTask::new(config).managed_run().await } diff --git a/src/agent/onefuzz-agent/src/local/generic_generator.rs b/src/agent/onefuzz-agent/src/local/generic_generator.rs index 07dae5562d..1f2bc4f055 100644 --- a/src/agent/onefuzz-agent/src/local/generic_generator.rs +++ b/src/agent/onefuzz-agent/src/local/generic_generator.rs @@ -4,9 +4,10 @@ use crate::{ local::common::{ build_local_context, get_cmd_arg, get_cmd_env, get_cmd_exe, get_synced_dir, - get_synced_dirs, CmdType, CHECK_ASAN_LOG, CHECK_RETRY_COUNT, CRASHES_DIR, - DISABLE_CHECK_DEBUGGER, GENERATOR_ENV, GENERATOR_EXE, GENERATOR_OPTIONS, READONLY_INPUTS, - RENAME_OUTPUT, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS, TARGET_TIMEOUT, TOOLS_DIR, + get_synced_dirs, CmdType, SyncCountDirMonitor, UiEvent, CHECK_ASAN_LOG, CHECK_RETRY_COUNT, + CRASHES_DIR, DISABLE_CHECK_DEBUGGER, GENERATOR_ENV, GENERATOR_EXE, GENERATOR_OPTIONS, + READONLY_INPUTS, RENAME_OUTPUT, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS, TARGET_TIMEOUT, + TOOLS_DIR, }, tasks::{ config::CommonConfig, @@ -15,9 +16,15 @@ use crate::{ }; use anyhow::Result; use clap::{App, Arg, SubCommand}; +use tokio::sync::mpsc::UnboundedSender; -pub fn build_fuzz_config(args: &clap::ArgMatches<'_>, common: CommonConfig) -> Result { - let crashes = get_synced_dir(CRASHES_DIR, common.job_id, common.task_id, args)?; +pub fn build_fuzz_config( + args: &clap::ArgMatches<'_>, + common: CommonConfig, + event_sender: Option>, +) -> Result { + let crashes = get_synced_dir(CRASHES_DIR, common.job_id, common.task_id, args)? + .monitor_count(&event_sender)?; let target_exe = get_cmd_exe(CmdType::Target, args)?.into(); let target_options = get_cmd_arg(CmdType::Target, args); let target_env = get_cmd_env(CmdType::Target, args)?; @@ -25,7 +32,10 @@ pub fn build_fuzz_config(args: &clap::ArgMatches<'_>, common: CommonConfig) -> R let generator_exe = get_cmd_exe(CmdType::Generator, args)?; let generator_options = get_cmd_arg(CmdType::Generator, args); let generator_env = get_cmd_env(CmdType::Generator, args)?; - let readonly_inputs = get_synced_dirs(READONLY_INPUTS, common.job_id, common.task_id, args)?; + let readonly_inputs = get_synced_dirs(READONLY_INPUTS, common.job_id, common.task_id, args)? + .into_iter() + .map(|sd| sd.monitor_count(&event_sender)) + .collect::>>()?; let rename_output = args.is_present(RENAME_OUTPUT); let check_asan_log = args.is_present(CHECK_ASAN_LOG); @@ -33,7 +43,9 @@ pub fn build_fuzz_config(args: &clap::ArgMatches<'_>, common: CommonConfig) -> R let check_retry_count = value_t!(args, CHECK_RETRY_COUNT, u64)?; let target_timeout = Some(value_t!(args, TARGET_TIMEOUT, u64)?); - let tools = get_synced_dir(TOOLS_DIR, common.job_id, common.task_id, args).ok(); + let tools = get_synced_dir(TOOLS_DIR, common.job_id, common.task_id, args) + .ok() + .monitor_count(&event_sender)?; let ensemble_sync_delay = None; @@ -59,9 +71,12 @@ pub fn build_fuzz_config(args: &clap::ArgMatches<'_>, common: CommonConfig) -> R Ok(config) } -pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> { - let context = build_local_context(args, true)?; - let config = build_fuzz_config(args, context.common_config.clone())?; +pub async fn run( + args: &clap::ArgMatches<'_>, + event_sender: Option>, +) -> Result<()> { + let context = build_local_context(args, true, event_sender.clone())?; + let config = build_fuzz_config(args, context.common_config.clone(), event_sender)?; GeneratorTask::new(config).run().await } diff --git a/src/agent/onefuzz-agent/src/local/libfuzzer.rs b/src/agent/onefuzz-agent/src/local/libfuzzer.rs index 1a811e2b99..4b46c757d1 100644 --- a/src/agent/onefuzz-agent/src/local/libfuzzer.rs +++ b/src/agent/onefuzz-agent/src/local/libfuzzer.rs @@ -4,8 +4,8 @@ use crate::{ local::{ common::{ - build_local_context, monitor_file_urls, wait_for_dir, DirectoryMonitorQueue, - ANALYZER_EXE, COVERAGE_DIR, REGRESSION_REPORTS_DIR, UNIQUE_REPORTS_DIR, + build_local_context, wait_for_dir, DirectoryMonitorQueue, UiEvent, ANALYZER_EXE, + COVERAGE_DIR, REGRESSION_REPORTS_DIR, UNIQUE_REPORTS_DIR, }, generic_analysis::{build_analysis_config, build_shared_args as build_analysis_args}, libfuzzer_coverage::{build_coverage_config, build_shared_args as build_coverage_args}, @@ -28,24 +28,12 @@ use std::collections::HashSet; use tokio::{sync::mpsc::UnboundedSender, task::spawn}; use uuid::Uuid; -use super::common::UiEvent; - pub async fn run( args: &clap::ArgMatches<'_>, event_sender: Option>, ) -> Result<()> { - let mut task_handles = vec![]; - let context = build_local_context(args, true)?; - let fuzz_config = build_fuzz_config(args, context.common_config.clone())?; - if let Some(event_sender) = event_sender.clone() { - task_handles.append(&mut monitor_file_urls( - &[ - fuzz_config.crashes.url.as_file_path(), - fuzz_config.inputs.url.as_file_path(), - ], - event_sender, - )); - } + let context = build_local_context(args, true, event_sender.clone())?; + let fuzz_config = build_fuzz_config(args, context.common_config.clone(), event_sender.clone())?; let crash_dir = fuzz_config .crashes .url @@ -72,26 +60,8 @@ pub async fn run( task_id: Uuid::new_v4(), ..context.common_config.clone() }, + event_sender.clone(), )?; - if let Some(event_sender) = event_sender.clone() { - task_handles.append(&mut monitor_file_urls( - &[ - report_config - .no_repro - .clone() - .and_then(|u| u.url.as_file_path()), - report_config - .reports - .clone() - .and_then(|u| u.url.as_file_path()), - report_config - .unique_reports - .clone() - .and_then(|u| u.url.as_file_path()), - ], - event_sender, - )); - } let mut report = ReportTask::new(report_config); let report_task = spawn(async move { report.managed_run().await }); @@ -111,24 +81,9 @@ pub async fn run( task_id: Uuid::new_v4(), ..context.common_config.clone() }, + event_sender.clone(), )?; - if let Some(event_sender) = event_sender { - task_handles.append(&mut monitor_file_urls( - &coverage_config - .readonly_inputs - .iter() - .cloned() - .map(|input| input.url.as_file_path()) - .collect::>(), - event_sender.clone(), - )); - task_handles.append(&mut monitor_file_urls( - &[coverage_config.coverage.url.as_file_path()], - event_sender, - )); - } - let mut coverage = CoverageTask::new(coverage_config); let coverage_task = spawn(async move { coverage.managed_run().await }); @@ -145,6 +100,7 @@ pub async fn run( task_id: Uuid::new_v4(), ..context.common_config.clone() }, + event_sender.clone(), )?; let analysis_task = spawn(async move { run_analysis(analysis_config).await }); @@ -159,6 +115,7 @@ pub async fn run( task_id: Uuid::new_v4(), ..context.common_config.clone() }, + event_sender, )?; let regression = LibFuzzerRegressionTask::new(regression_config); let regression_task = spawn(async move { regression.run().await }); diff --git a/src/agent/onefuzz-agent/src/local/libfuzzer_coverage.rs b/src/agent/onefuzz-agent/src/local/libfuzzer_coverage.rs index 48d70dc358..a7ab89ce78 100644 --- a/src/agent/onefuzz-agent/src/local/libfuzzer_coverage.rs +++ b/src/agent/onefuzz-agent/src/local/libfuzzer_coverage.rs @@ -15,29 +15,35 @@ use crate::{ use anyhow::Result; use clap::{App, Arg, SubCommand}; use storage_queue::QueueClient; +use tokio::sync::mpsc::UnboundedSender; + +use super::common::{SyncCountDirMonitor, UiEvent}; pub fn build_coverage_config( args: &clap::ArgMatches<'_>, local_job: bool, input_queue: Option, common: CommonConfig, + event_sender: Option>, ) -> Result { let target_exe = get_cmd_exe(CmdType::Target, args)?.into(); let target_env = get_cmd_env(CmdType::Target, args)?; let target_options = get_cmd_arg(CmdType::Target, args); let readonly_inputs = if local_job { - vec![get_synced_dir( - INPUTS_DIR, - common.job_id, - common.task_id, - args, - )?] + vec![ + get_synced_dir(INPUTS_DIR, common.job_id, common.task_id, args)? + .monitor_count(&event_sender)?, + ] } else { get_synced_dirs(READONLY_INPUTS, common.job_id, common.task_id, args)? + .into_iter() + .map(|sd| sd.monitor_count(&event_sender)) + .collect::>>()? }; - let coverage = get_synced_dir(COVERAGE_DIR, common.job_id, common.task_id, args)?; + let coverage = get_synced_dir(COVERAGE_DIR, common.job_id, common.task_id, args)? + .monitor_count(&event_sender)?; let check_fuzzer_help = args.is_present(CHECK_FUZZER_HELP); let config = Config { @@ -51,12 +57,22 @@ pub fn build_coverage_config( common, check_queue: false, }; + Ok(config) } -pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> { - let context = build_local_context(args, true)?; - let config = build_coverage_config(args, false, None, context.common_config.clone())?; +pub async fn run( + args: &clap::ArgMatches<'_>, + event_sender: Option>, +) -> Result<()> { + let context = build_local_context(args, true, event_sender.clone())?; + let config = build_coverage_config( + args, + false, + None, + context.common_config.clone(), + event_sender, + )?; let mut task = CoverageTask::new(config); task.managed_run().await diff --git a/src/agent/onefuzz-agent/src/local/libfuzzer_crash_report.rs b/src/agent/onefuzz-agent/src/local/libfuzzer_crash_report.rs index d3f3a88237..d217090694 100644 --- a/src/agent/onefuzz-agent/src/local/libfuzzer_crash_report.rs +++ b/src/agent/onefuzz-agent/src/local/libfuzzer_crash_report.rs @@ -4,8 +4,9 @@ use crate::{ local::common::{ build_local_context, get_cmd_arg, get_cmd_env, get_cmd_exe, get_synced_dir, CmdType, - CHECK_FUZZER_HELP, CHECK_RETRY_COUNT, CRASHES_DIR, DISABLE_CHECK_QUEUE, NO_REPRO_DIR, - REPORTS_DIR, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS, TARGET_TIMEOUT, UNIQUE_REPORTS_DIR, + SyncCountDirMonitor, UiEvent, CHECK_FUZZER_HELP, CHECK_RETRY_COUNT, CRASHES_DIR, + DISABLE_CHECK_QUEUE, NO_REPRO_DIR, REPORTS_DIR, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS, + TARGET_TIMEOUT, UNIQUE_REPORTS_DIR, }, tasks::{ config::CommonConfig, @@ -15,23 +16,32 @@ use crate::{ use anyhow::Result; use clap::{App, Arg, SubCommand}; use storage_queue::QueueClient; +use tokio::sync::mpsc::UnboundedSender; pub fn build_report_config( args: &clap::ArgMatches<'_>, input_queue: Option, common: CommonConfig, + event_sender: Option>, ) -> Result { let target_exe = get_cmd_exe(CmdType::Target, args)?.into(); let target_env = get_cmd_env(CmdType::Target, args)?; let target_options = get_cmd_arg(CmdType::Target, args); - let crashes = get_synced_dir(CRASHES_DIR, common.job_id, common.task_id, args).ok(); - let reports = get_synced_dir(REPORTS_DIR, common.job_id, common.task_id, args).ok(); + let crashes = get_synced_dir(CRASHES_DIR, common.job_id, common.task_id, args) + .ok() + .monitor_count(&event_sender)?; + let reports = get_synced_dir(REPORTS_DIR, common.job_id, common.task_id, args) + .ok() + .monitor_count(&event_sender)?; - let no_repro = get_synced_dir(NO_REPRO_DIR, common.job_id, common.task_id, args).ok(); + let no_repro = get_synced_dir(NO_REPRO_DIR, common.job_id, common.task_id, args) + .ok() + .monitor_count(&event_sender)?; - let unique_reports = - get_synced_dir(UNIQUE_REPORTS_DIR, common.job_id, common.task_id, args).ok(); + let unique_reports = get_synced_dir(UNIQUE_REPORTS_DIR, common.job_id, common.task_id, args) + .ok() + .monitor_count(&event_sender)?; let target_timeout = value_t!(args, TARGET_TIMEOUT, u64).ok(); @@ -59,12 +69,16 @@ pub fn build_report_config( unique_reports, common, }; + Ok(config) } -pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> { - let context = build_local_context(args, true)?; - let config = build_report_config(args, None, context.common_config.clone())?; +pub async fn run( + args: &clap::ArgMatches<'_>, + event_sender: Option>, +) -> Result<()> { + let context = build_local_context(args, true, event_sender.clone())?; + let config = build_report_config(args, None, context.common_config.clone(), event_sender)?; ReportTask::new(config).managed_run().await } diff --git a/src/agent/onefuzz-agent/src/local/libfuzzer_fuzz.rs b/src/agent/onefuzz-agent/src/local/libfuzzer_fuzz.rs index 31a88d81c2..68d71c941a 100644 --- a/src/agent/onefuzz-agent/src/local/libfuzzer_fuzz.rs +++ b/src/agent/onefuzz-agent/src/local/libfuzzer_fuzz.rs @@ -4,8 +4,8 @@ use crate::{ local::common::{ build_local_context, get_cmd_arg, get_cmd_env, get_cmd_exe, get_synced_dir, CmdType, - CHECK_FUZZER_HELP, CRASHES_DIR, INPUTS_DIR, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS, - TARGET_WORKERS, + SyncCountDirMonitor, UiEvent, CHECK_FUZZER_HELP, CRASHES_DIR, INPUTS_DIR, TARGET_ENV, + TARGET_EXE, TARGET_OPTIONS, TARGET_WORKERS, }, tasks::{ config::CommonConfig, @@ -14,12 +14,19 @@ use crate::{ }; use anyhow::Result; use clap::{App, Arg, SubCommand}; +use tokio::sync::mpsc::UnboundedSender; const DISABLE_EXPECT_CRASH_ON_FAILURE: &str = "disable_expect_crash_on_failure"; -pub fn build_fuzz_config(args: &clap::ArgMatches<'_>, common: CommonConfig) -> Result { - let crashes = get_synced_dir(CRASHES_DIR, common.job_id, common.task_id, args)?; - let inputs = get_synced_dir(INPUTS_DIR, common.job_id, common.task_id, args)?; +pub fn build_fuzz_config( + args: &clap::ArgMatches<'_>, + common: CommonConfig, + event_sender: Option>, +) -> Result { + let crashes = get_synced_dir(CRASHES_DIR, common.job_id, common.task_id, args)? + .monitor_count(&event_sender)?; + let inputs = get_synced_dir(INPUTS_DIR, common.job_id, common.task_id, args)? + .monitor_count(&event_sender)?; let target_exe = get_cmd_exe(CmdType::Target, args)?.into(); let target_env = get_cmd_env(CmdType::Target, args)?; @@ -49,9 +56,12 @@ pub fn build_fuzz_config(args: &clap::ArgMatches<'_>, common: CommonConfig) -> R Ok(config) } -pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> { - let context = build_local_context(args, true)?; - let config = build_fuzz_config(args, context.common_config.clone())?; +pub async fn run( + args: &clap::ArgMatches<'_>, + event_sender: Option>, +) -> Result<()> { + let context = build_local_context(args, true, event_sender.clone())?; + let config = build_fuzz_config(args, context.common_config.clone(), event_sender)?; LibFuzzerFuzzTask::new(config)?.run().await } diff --git a/src/agent/onefuzz-agent/src/local/libfuzzer_merge.rs b/src/agent/onefuzz-agent/src/local/libfuzzer_merge.rs index cf352f0b09..9b8d91e1d4 100644 --- a/src/agent/onefuzz-agent/src/local/libfuzzer_merge.rs +++ b/src/agent/onefuzz-agent/src/local/libfuzzer_merge.rs @@ -4,8 +4,9 @@ use crate::{ local::common::{ build_local_context, get_cmd_arg, get_cmd_env, get_cmd_exe, get_synced_dir, - get_synced_dirs, CmdType, ANALYSIS_INPUTS, ANALYSIS_UNIQUE_INPUTS, CHECK_FUZZER_HELP, - INPUTS_DIR, PRESERVE_EXISTING_OUTPUTS, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS, + get_synced_dirs, CmdType, SyncCountDirMonitor, UiEvent, ANALYSIS_INPUTS, + ANALYSIS_UNIQUE_INPUTS, CHECK_FUZZER_HELP, INPUTS_DIR, PRESERVE_EXISTING_OUTPUTS, + TARGET_ENV, TARGET_EXE, TARGET_OPTIONS, }, tasks::{ config::CommonConfig, @@ -15,19 +16,25 @@ use crate::{ use anyhow::Result; use clap::{App, Arg, SubCommand}; use storage_queue::QueueClient; +use tokio::sync::mpsc::UnboundedSender; pub fn build_merge_config( args: &clap::ArgMatches<'_>, input_queue: Option, common: CommonConfig, + event_sender: Option>, ) -> Result { let target_exe = get_cmd_exe(CmdType::Target, args)?.into(); let target_env = get_cmd_env(CmdType::Target, args)?; let target_options = get_cmd_arg(CmdType::Target, args); let check_fuzzer_help = args.is_present(CHECK_FUZZER_HELP); - let inputs = get_synced_dirs(ANALYSIS_INPUTS, common.job_id, common.task_id, args)?; + let inputs = get_synced_dirs(ANALYSIS_INPUTS, common.job_id, common.task_id, args)? + .into_iter() + .map(|sd| sd.monitor_count(&event_sender)) + .collect::>>()?; let unique_inputs = - get_synced_dir(ANALYSIS_UNIQUE_INPUTS, common.job_id, common.task_id, args)?; + get_synced_dir(ANALYSIS_UNIQUE_INPUTS, common.job_id, common.task_id, args)? + .monitor_count(&event_sender)?; let preserve_existing_outputs = value_t!(args, PRESERVE_EXISTING_OUTPUTS, bool)?; let config = Config { @@ -41,12 +48,16 @@ pub fn build_merge_config( unique_inputs, preserve_existing_outputs, }; + Ok(config) } -pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> { - let context = build_local_context(args, true)?; - let config = build_merge_config(args, None, context.common_config.clone())?; +pub async fn run( + args: &clap::ArgMatches<'_>, + event_sender: Option>, +) -> Result<()> { + let context = build_local_context(args, true, event_sender.clone())?; + let config = build_merge_config(args, None, context.common_config.clone(), event_sender)?; spawn(std::sync::Arc::new(config)).await } diff --git a/src/agent/onefuzz-agent/src/local/libfuzzer_regression.rs b/src/agent/onefuzz-agent/src/local/libfuzzer_regression.rs index 823c27bdea..7c916ace6f 100644 --- a/src/agent/onefuzz-agent/src/local/libfuzzer_regression.rs +++ b/src/agent/onefuzz-agent/src/local/libfuzzer_regression.rs @@ -4,9 +4,9 @@ use crate::{ local::common::{ build_local_context, get_cmd_arg, get_cmd_env, get_cmd_exe, get_synced_dir, CmdType, - CHECK_FUZZER_HELP, CHECK_RETRY_COUNT, COVERAGE_DIR, CRASHES_DIR, NO_REPRO_DIR, - REGRESSION_REPORTS_DIR, REPORTS_DIR, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS, - TARGET_TIMEOUT, UNIQUE_REPORTS_DIR, + SyncCountDirMonitor, UiEvent, CHECK_FUZZER_HELP, CHECK_RETRY_COUNT, COVERAGE_DIR, + CRASHES_DIR, NO_REPRO_DIR, REGRESSION_REPORTS_DIR, REPORTS_DIR, TARGET_ENV, TARGET_EXE, + TARGET_OPTIONS, TARGET_TIMEOUT, UNIQUE_REPORTS_DIR, }, tasks::{ config::CommonConfig, @@ -15,26 +15,35 @@ use crate::{ }; use anyhow::Result; use clap::{App, Arg, SubCommand}; +use tokio::sync::mpsc::UnboundedSender; const REPORT_NAMES: &str = "report_names"; pub fn build_regression_config( args: &clap::ArgMatches<'_>, common: CommonConfig, + event_sender: Option>, ) -> Result { let target_exe = get_cmd_exe(CmdType::Target, args)?.into(); let target_env = get_cmd_env(CmdType::Target, args)?; let target_options = get_cmd_arg(CmdType::Target, args); let target_timeout = value_t!(args, TARGET_TIMEOUT, u64).ok(); - let crashes = get_synced_dir(CRASHES_DIR, common.job_id, common.task_id, args)?; + let crashes = get_synced_dir(CRASHES_DIR, common.job_id, common.task_id, args)? + .monitor_count(&event_sender)?; let regression_reports = - get_synced_dir(REGRESSION_REPORTS_DIR, common.job_id, common.task_id, args)?; + get_synced_dir(REGRESSION_REPORTS_DIR, common.job_id, common.task_id, args)? + .monitor_count(&event_sender)?; let check_retry_count = value_t!(args, CHECK_RETRY_COUNT, u64)?; - let reports = get_synced_dir(REPORTS_DIR, common.job_id, common.task_id, args).ok(); - let no_repro = get_synced_dir(NO_REPRO_DIR, common.job_id, common.task_id, args).ok(); - let unique_reports = - get_synced_dir(UNIQUE_REPORTS_DIR, common.job_id, common.task_id, args).ok(); + let reports = get_synced_dir(REPORTS_DIR, common.job_id, common.task_id, args) + .ok() + .monitor_count(&event_sender)?; + let no_repro = get_synced_dir(NO_REPRO_DIR, common.job_id, common.task_id, args) + .ok() + .monitor_count(&event_sender)?; + let unique_reports = get_synced_dir(UNIQUE_REPORTS_DIR, common.job_id, common.task_id, args) + .ok() + .monitor_count(&event_sender)?; let report_list = if args.is_present(REPORT_NAMES) { Some(values_t!(args, REPORT_NAMES, String)?) @@ -64,9 +73,12 @@ pub fn build_regression_config( Ok(config) } -pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> { - let context = build_local_context(args, true)?; - let config = build_regression_config(args, context.common_config.clone())?; +pub async fn run( + args: &clap::ArgMatches<'_>, + event_sender: Option>, +) -> Result<()> { + let context = build_local_context(args, true, event_sender.clone())?; + let config = build_regression_config(args, context.common_config.clone(), event_sender)?; LibFuzzerRegressionTask::new(config).run().await } diff --git a/src/agent/onefuzz-agent/src/local/libfuzzer_test_input.rs b/src/agent/onefuzz-agent/src/local/libfuzzer_test_input.rs index 323110ea5c..4970a4192e 100644 --- a/src/agent/onefuzz-agent/src/local/libfuzzer_test_input.rs +++ b/src/agent/onefuzz-agent/src/local/libfuzzer_test_input.rs @@ -3,17 +3,21 @@ use crate::{ local::common::{ - build_local_context, get_cmd_arg, get_cmd_env, CmdType, CHECK_RETRY_COUNT, TARGET_ENV, - TARGET_EXE, TARGET_OPTIONS, TARGET_TIMEOUT, + build_local_context, get_cmd_arg, get_cmd_env, CmdType, UiEvent, CHECK_RETRY_COUNT, + TARGET_ENV, TARGET_EXE, TARGET_OPTIONS, TARGET_TIMEOUT, }, tasks::report::libfuzzer_report::{test_input, TestInputArgs}, }; use anyhow::Result; use clap::{App, Arg, SubCommand}; use std::path::PathBuf; +use tokio::sync::mpsc::UnboundedSender; -pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> { - let context = build_local_context(args, true)?; +pub async fn run( + args: &clap::ArgMatches<'_>, + event_sender: Option>, +) -> Result<()> { + let context = build_local_context(args, true, event_sender)?; let target_exe = value_t!(args, TARGET_EXE, PathBuf)?; let target_env = get_cmd_env(CmdType::Target, args)?; diff --git a/src/agent/onefuzz-agent/src/local/radamsa.rs b/src/agent/onefuzz-agent/src/local/radamsa.rs index 3e69d2aba9..44c5663a48 100644 --- a/src/agent/onefuzz-agent/src/local/radamsa.rs +++ b/src/agent/onefuzz-agent/src/local/radamsa.rs @@ -3,7 +3,7 @@ use crate::{ local::{ - common::{build_local_context, DirectoryMonitorQueue}, + common::{build_local_context, DirectoryMonitorQueue, UiEvent}, generic_crash_report::{build_report_config, build_shared_args as build_crash_args}, generic_generator::{build_fuzz_config, build_shared_args as build_fuzz_args}, }, @@ -13,13 +13,16 @@ use anyhow::Result; use clap::{App, SubCommand}; use onefuzz::utils::try_wait_all_join_handles; use std::collections::HashSet; +use tokio::sync::mpsc::UnboundedSender; use tokio::task::spawn; - use uuid::Uuid; -pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> { - let context = build_local_context(args, true)?; - let fuzz_config = build_fuzz_config(args, context.common_config.clone())?; +pub async fn run( + args: &clap::ArgMatches<'_>, + event_sender: Option>, +) -> Result<()> { + let context = build_local_context(args, true, event_sender.clone())?; + let fuzz_config = build_fuzz_config(args, context.common_config.clone(), event_sender.clone())?; let crash_dir = fuzz_config .crashes .url @@ -37,6 +40,7 @@ pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> { task_id: Uuid::new_v4(), ..context.common_config.clone() }, + event_sender, )?; let report_task = spawn(async move { ReportTask::new(report_config).managed_run().await }); diff --git a/src/agent/onefuzz-agent/src/local/test_input.rs b/src/agent/onefuzz-agent/src/local/test_input.rs index 95973dcbb5..225abdfca9 100644 --- a/src/agent/onefuzz-agent/src/local/test_input.rs +++ b/src/agent/onefuzz-agent/src/local/test_input.rs @@ -3,17 +3,22 @@ use crate::{ local::common::{ - build_local_context, get_cmd_arg, get_cmd_env, CmdType, CHECK_ASAN_LOG, CHECK_RETRY_COUNT, - DISABLE_CHECK_DEBUGGER, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS, TARGET_TIMEOUT, + build_local_context, get_cmd_arg, get_cmd_env, CmdType, UiEvent, CHECK_ASAN_LOG, + CHECK_RETRY_COUNT, DISABLE_CHECK_DEBUGGER, TARGET_ENV, TARGET_EXE, TARGET_OPTIONS, + TARGET_TIMEOUT, }, tasks::report::generic::{test_input, TestInputArgs}, }; use anyhow::Result; use clap::{App, Arg, SubCommand}; use std::path::PathBuf; +use tokio::sync::mpsc::UnboundedSender; -pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> { - let context = build_local_context(args, false)?; +pub async fn run( + args: &clap::ArgMatches<'_>, + event_sender: Option>, +) -> Result<()> { + let context = build_local_context(args, false, event_sender)?; let target_exe = value_t!(args, TARGET_EXE, PathBuf)?; let target_env = get_cmd_env(CmdType::Target, args)?; diff --git a/src/agent/onefuzz-agent/src/local/tui.rs b/src/agent/onefuzz-agent/src/local/tui.rs index b188ee534e..af27d73d93 100644 --- a/src/agent/onefuzz-agent/src/local/tui.rs +++ b/src/agent/onefuzz-agent/src/local/tui.rs @@ -2,7 +2,7 @@ // Licensed under the MIT License. use crate::local::common::UiEvent; -use anyhow::{Context, Result}; +use anyhow::Result; use crossterm::{ event::{self, Event, KeyCode}, execute, @@ -11,29 +11,37 @@ use crossterm::{ use futures::{StreamExt, TryStreamExt}; use log::Level; use onefuzz::utils::try_wait_all_join_handles; +use onefuzz_telemetry::{self, EventData}; use std::{ collections::HashMap, io::{self, Stdout, Write}, + iter::once, + mem::{discriminant, Discriminant}, path::PathBuf, thread::{self, JoinHandle}, time::Duration, }; use tokio::{ - sync::mpsc::{self, UnboundedReceiver}, - time, + sync::{ + broadcast::{self, TryRecvError}, + mpsc::{self, UnboundedSender}, + }, + time::delay_for, }; use tui::{ backend::CrosstermBackend, - layout::{Constraint, Corner, Direction, Layout}, + layout::{Alignment, Constraint, Corner, Direction, Layout}, style::{Color, Modifier, Style}, text::{Span, Spans}, widgets::{Block, Borders}, - widgets::{List, ListItem, ListState}, + widgets::{Gauge, List, ListItem, ListState, Paragraph, Wrap}, Terminal, }; use arraydeque::{ArrayDeque, Wrapping}; +use super::common::wait_for_dir; + #[derive(Debug, thiserror::Error)] enum UiLoopError { #[error("program exiting")] @@ -57,6 +65,15 @@ impl From for UiLoopError { /// Maximum number of log message to display, arbitrarily chosen const LOGS_BUFFER_SIZE: usize = 100; const TICK_RATE: Duration = Duration::from_millis(250); +const FILE_MONITOR_POLLING_PERIOD: Duration = Duration::from_secs(5); +const EVENT_POLLING_PERIOD: Duration = Duration::from_secs(1); + +#[derive(Debug, Default)] +struct CoverageData { + covered: Option, + features: Option, + rate: Option, +} /// Event driving the refresh of the UI #[derive(Debug)] @@ -65,15 +82,19 @@ enum TerminalEvent { Tick, FileCount { dir: PathBuf, count: usize }, Quit, + MonitorDir(PathBuf), + Telemetry(Vec), } struct UiLoopState { pub logs: ArrayDeque<[(Level, String); LOGS_BUFFER_SIZE], Wrapping>, pub file_count: HashMap, pub file_count_state: ListState, - pub file_monitors: Vec>>, + pub file_monitors: HashMap>>, pub log_event_receiver: mpsc::UnboundedReceiver<(Level, String)>, pub terminal: Terminal>, + pub cancellation_tx: broadcast::Sender<()>, + pub events: HashMap, EventData>, } impl UiLoopState { @@ -81,6 +102,8 @@ impl UiLoopState { terminal: Terminal>, log_event_receiver: mpsc::UnboundedReceiver<(Level, String)>, ) -> Self { + let (cancellation_tx, _) = broadcast::channel(1); + let events = HashMap::new(); Self { log_event_receiver, logs: Default::default(), @@ -88,6 +111,8 @@ impl UiLoopState { file_count_state: Default::default(), file_monitors: Default::default(), terminal, + cancellation_tx, + events, } } } @@ -130,40 +155,109 @@ impl TerminalUi { .init(); let tick_event_tx_clone = self.ui_event_tx.clone(); - let tick_event_handle = - tokio::spawn(async { Self::ticking(tick_event_tx_clone).await.context("ticking") }); + let tick_event_handle = tokio::spawn(Self::ticking( + tick_event_tx_clone, + initial_state.cancellation_tx.subscribe(), + )); let keyboard_ui_event_tx = self.ui_event_tx.clone(); - let _keyboard_event_handle = Self::read_keyboard_events(keyboard_ui_event_tx); + let _keyboard_event_handle = Self::read_keyboard_events( + keyboard_ui_event_tx, + initial_state.cancellation_tx.subscribe(), + ); let task_event_receiver = self.task_event_receiver; let ui_event_tx = self.ui_event_tx.clone(); - let external_event_handle = - tokio::spawn(Self::read_commands(ui_event_tx, task_event_receiver)); - let ui_loop = tokio::spawn(Self::ui_loop(initial_state, self.ui_event_rx)); + let external_event_handle = tokio::spawn(Self::read_commands( + ui_event_tx, + task_event_receiver, + initial_state.cancellation_tx.subscribe(), + )); - let mut task_handles = vec![tick_event_handle, ui_loop, external_event_handle]; + let mut task_handles = vec![tick_event_handle, external_event_handle]; + + let ui_event_tx = self.ui_event_tx.clone(); + let telemetry = tokio::spawn(Self::listen_telemetry_event( + ui_event_tx, + initial_state.cancellation_tx.subscribe(), + )); + + task_handles.push(telemetry); + + let ui_loop = tokio::spawn(Self::ui_loop( + initial_state, + self.ui_event_rx, + self.ui_event_tx.clone(), + )); + + task_handles.push(ui_loop); if let Some(timeout) = timeout { let ui_event_tx = self.ui_event_tx.clone(); - let timeout_task = tokio::spawn(async move { - time::delay_for(timeout).await; + tokio::spawn(async move { + tokio::time::delay_for(timeout).await; let _ = ui_event_tx.send(TerminalEvent::Quit); - Ok(()) }); - task_handles.push(timeout_task); } - try_wait_all_join_handles(task_handles) - .await - .context("ui_loop")?; + try_wait_all_join_handles(task_handles).await?; Ok(()) } - async fn ticking(ui_event_tx: mpsc::UnboundedSender) -> Result<()> { + fn filter_event(event: &EventData) -> bool { + !matches!( + event, + EventData::WorkerId(_) + | EventData::InstanceId(_) + | EventData::JobId(_) + | EventData::TaskId(_) + | EventData::ScalesetId(_) + | EventData::MachineId(_) + | EventData::Version(_) + | EventData::CommandLine(_) + | EventData::Type(_) + | EventData::Mode(_) + | EventData::Path(_) + | EventData::Count(_) + | EventData::ExecsSecond(_) + | EventData::Pid(_) + | EventData::RunId(_) + | EventData::Name(_) + | EventData::ToolName(_) + | EventData::ProcessStatus(_) + ) + } + + async fn listen_telemetry_event( + ui_event_tx: UnboundedSender, + mut cancellation_rx: broadcast::Receiver<()>, + ) -> Result<()> { + let mut rx = onefuzz_telemetry::subscribe_to_events(); + + while cancellation_rx.try_recv() == Err(broadcast::TryRecvError::Empty) { + match rx.try_recv() { + Ok((_event, data)) => { + let data = data + .into_iter() + .filter(Self::filter_event) + .collect::>(); + let _ = ui_event_tx.send(TerminalEvent::Telemetry(data)); + } + Err(TryRecvError::Empty) => delay_for(EVENT_POLLING_PERIOD).await, + Err(TryRecvError::Lagged(_)) => continue, + Err(TryRecvError::Closed) => break, + } + } + Ok(()) + } + + async fn ticking( + ui_event_tx: mpsc::UnboundedSender, + mut cancellation_rx: broadcast::Receiver<()>, + ) -> Result<()> { let mut interval = tokio::time::interval(TICK_RATE); - loop { + while Err(broadcast::TryRecvError::Empty) == cancellation_rx.try_recv() { interval.tick().await; if let Err(_err) = ui_event_tx.send(TerminalEvent::Tick) { break; @@ -174,34 +268,42 @@ impl TerminalUi { fn read_keyboard_events( ui_event_tx: mpsc::UnboundedSender, + mut cancellation_rx: broadcast::Receiver<()>, ) -> JoinHandle> { - thread::spawn(move || loop { - if event::poll(Duration::from_secs(1))? { - let event = event::read()?; - if let Err(_err) = ui_event_tx.send(TerminalEvent::Input(event)) { - return Ok(()); + thread::spawn(move || { + while Err(broadcast::TryRecvError::Empty) == cancellation_rx.try_recv() { + if event::poll(EVENT_POLLING_PERIOD)? { + let event = event::read()?; + if let Err(_err) = ui_event_tx.send(TerminalEvent::Input(event)) { + return Ok(()); + } } } + Ok(()) }) } async fn read_commands( ui_event_tx: mpsc::UnboundedSender, mut external_event_rx: mpsc::UnboundedReceiver, + mut cancellation_rx: broadcast::Receiver<()>, ) -> Result<()> { - while let Some(UiEvent::FileCount { dir, count }) = external_event_rx.recv().await { - if ui_event_tx - .send(TerminalEvent::FileCount { dir, count }) - .is_err() - { - break; + while Err(broadcast::TryRecvError::Empty) == cancellation_rx.try_recv() { + match external_event_rx.try_recv() { + Ok(UiEvent::MonitorDir(dir)) => { + if ui_event_tx.send(TerminalEvent::MonitorDir(dir)).is_err() { + break; + } + } + Err(mpsc::error::TryRecvError::Empty) => delay_for(EVENT_POLLING_PERIOD).await, + Err(mpsc::error::TryRecvError::Closed) => break, } } Ok(()) } fn take_available_logs( - receiver: &mut UnboundedReceiver, + receiver: &mut mpsc::UnboundedReceiver, size: usize, buffer: &mut ArrayDeque<[T; LOGS_BUFFER_SIZE], Wrapping>, ) { @@ -215,77 +317,184 @@ impl TerminalUi { } } - async fn refresh_ui(ui_state: UiLoopState) -> Result { - let mut logs = ui_state.logs; - let mut file_count_state = ui_state.file_count_state; - let file_count = ui_state.file_count; - let mut log_event_receiver = ui_state.log_event_receiver; - let mut terminal = ui_state.terminal; + fn create_coverage_gauge<'a>(rate: f64) -> Gauge<'a> { + let label = format!("coverage {:.2}%", rate * 100.0); + Gauge::default() + .gauge_style( + Style::default() + .fg(Color::White) + .bg(Color::Black) + .add_modifier(Modifier::ITALIC | Modifier::BOLD), + ) + .label(label) + .ratio(rate) + } - Self::take_available_logs(&mut log_event_receiver, 10, &mut logs); - terminal.draw(|f| { - let chunks = Layout::default() - .direction(Direction::Vertical) - .constraints([Constraint::Percentage(25), Constraint::Percentage(75)].as_ref()) - .split(f.size()); + fn create_stats_paragraph( + events: &HashMap, EventData>, + ) -> Paragraph<'_> { + let mut event_values = events.values().map(|v| v.as_values()).collect::>(); + + event_values.sort_by(|(a, _), (b, _)| a.cmp(b)); + + let mut stats_spans = once(Span::styled( + "Stats: ", + Style::default().add_modifier(Modifier::BOLD), + )) + .chain( + event_values + .into_iter() + .map(|(name, value)| { + vec![ + Span::raw(name), + Span::raw(" "), + Span::styled(value, Style::default().add_modifier(Modifier::BOLD)), + Span::raw(", "), + ] + }) + .flatten(), + ) + .collect::>(); + + if stats_spans.len() > 1 { + // removing the last "," + stats_spans.pop(); + } + + Paragraph::new(Spans::from(stats_spans)) + .style(Style::default()) + .alignment(Alignment::Left) + .wrap(Wrap { trim: true }) + } - let mut sorted_file_count = file_count.iter().collect::>(); + fn create_file_count_paragraph(file_count: &HashMap) -> Paragraph<'_> { + let mut sorted_file_count = file_count.iter().collect::>(); - sorted_file_count.sort_by(|(p1, _), (p2, _)| p1.cmp(p2)); + sorted_file_count.sort_by(|(p1, _), (p2, _)| p1.cmp(p2)); - let files = sorted_file_count + let mut files_spans = once(Span::styled( + "Files: ", + Style::default().add_modifier(Modifier::BOLD), + )) + .chain( + sorted_file_count .iter() .map(|(path, count)| { - ListItem::new(Spans::from(vec![ + vec![ Span::raw( path.file_name() .map(|f| f.to_string_lossy()) .unwrap_or_default(), ), - Span::raw(": "), - Span::raw(format!("{}", count)), - ])) + Span::raw(" "), + Span::styled( + format!("{}", count), + Style::default().add_modifier(Modifier::BOLD), + ), + Span::raw(", "), + ] }) - .collect::>(); + .flatten(), + ) + .collect::>(); + + if files_spans.len() > 1 { + files_spans.pop(); + } // removing the last "," + + Paragraph::new(Spans::from(files_spans)) + .style(Style::default()) + .alignment(Alignment::Left) + .wrap(Wrap { trim: true }) + } + + fn create_log_list( + logs: &ArrayDeque<[(Level, String); LOGS_BUFFER_SIZE], Wrapping>, + ) -> List<'_> { + let log_items = logs + .iter() + .map(|(level, log)| { + let style = match level { + Level::Debug => Style::default().fg(Color::Magenta), + Level::Error => Style::default().fg(Color::Red), + Level::Warn => Style::default().fg(Color::Yellow), + Level::Info => Style::default().fg(Color::Blue), + Level::Trace => Style::default(), + }; + + ListItem::new(Spans::from(vec![ + Span::styled(format!("{:<9}", level), style), + Span::raw(" "), + Span::raw(log), + ])) + }) + .collect::>(); - let log_list = List::new(files) - .block(Block::default().borders(Borders::ALL).title("files")) - .highlight_style(Style::default().add_modifier(Modifier::BOLD)) - .start_corner(Corner::TopLeft); + List::new(log_items) + .block(Block::default().borders(Borders::TOP).title("Logs")) + .start_corner(Corner::BottomLeft) + } - f.render_stateful_widget(log_list, chunks[0], &mut file_count_state); + async fn refresh_ui(ui_state: UiLoopState) -> Result { + let mut logs = ui_state.logs; + let file_count = ui_state.file_count; + let mut log_event_receiver = ui_state.log_event_receiver; + let mut terminal = ui_state.terminal; + let rate = ui_state + .events + .get(&discriminant(&EventData::Rate(0.0))) + .and_then(|x| { + if let EventData::Rate(r) = x { + Some(*r) + } else { + None + } + }); - let log_items = logs - .iter() - .map(|(level, log)| { - let style = match level { - Level::Debug => Style::default().fg(Color::Magenta), - Level::Error => Style::default().fg(Color::Red), - Level::Warn => Style::default().fg(Color::Yellow), - Level::Info => Style::default().fg(Color::Blue), - Level::Trace => Style::default(), - }; - - ListItem::new(Spans::from(vec![ - Span::styled(format!("{:<9}", level), style), - Span::raw(" "), - Span::raw(log), - ])) - }) - .collect::>(); + let events = ui_state.events; - let log_list = List::new(log_items) - .block(Block::default().borders(Borders::ALL).title("Logs")) - .start_corner(Corner::BottomLeft); + Self::take_available_logs(&mut log_event_receiver, 10, &mut logs); + terminal.draw(|f| { + let chunks = Layout::default() + .direction(Direction::Vertical) + .constraints([Constraint::Percentage(25), Constraint::Percentage(75)].as_ref()) + .split(f.size()); + + let log_area = chunks[1]; + let top_area = Layout::default() + .direction(Direction::Vertical) + .constraints([Constraint::Percentage(50), Constraint::Percentage(50)].as_ref()) + .split(chunks[0]); + + let file_count_area = top_area[0]; + + if let Some(rate) = rate { + let coverage_area = Layout::default() + .direction(Direction::Vertical) + .constraints([Constraint::Percentage(25), Constraint::Percentage(75)].as_ref()) + .split(top_area[1]); + + let gauge = Self::create_coverage_gauge(rate); + f.render_widget(gauge, coverage_area[0]); + let stats_paragraph = Self::create_stats_paragraph(&events); + f.render_widget(stats_paragraph, coverage_area[1]); + } else { + let stats_paragraph = Self::create_stats_paragraph(&events); + f.render_widget(stats_paragraph, top_area[1]); + } + + let file_count_paragraph = Self::create_file_count_paragraph(&file_count); + f.render_widget(file_count_paragraph, file_count_area); - f.render_widget(log_list, chunks[1]); + let log_list = Self::create_log_list(&logs); + f.render_widget(log_list, log_area); })?; Ok(UiLoopState { logs, - file_count_state, file_count, terminal, log_event_receiver, + events, ..ui_state }) } @@ -331,11 +540,16 @@ impl TerminalUi { }) } - async fn on_quit(ui_state: UiLoopState) -> Result { + async fn on_quit( + ui_state: UiLoopState, + cancellation_tx: broadcast::Sender<()>, + ) -> Result { + let _ = cancellation_tx.send(()); let mut terminal = ui_state.terminal; disable_raw_mode().map_err(|e| anyhow!("{:?}", e))?; execute!(terminal.backend_mut(), LeaveAlternateScreen).map_err(|e| anyhow!("{:?}", e))?; terminal.show_cursor()?; + Err(UiLoopError::Exit) } @@ -352,17 +566,38 @@ impl TerminalUi { }) } + async fn on_monitor_dir( + ui_state: UiLoopState, + path: PathBuf, + ui_event_tx: mpsc::UnboundedSender, + cancellation_rx: broadcast::Receiver<()>, + ) -> Result { + let mut file_monitors = ui_state.file_monitors; + + file_monitors.entry(path).or_insert_with_key(|path| { + Self::spawn_file_count_monitor(path.clone(), ui_event_tx, cancellation_rx) + }); + + Ok(UiLoopState { + file_monitors, + ..ui_state + }) + } + async fn ui_loop( initial_state: UiLoopState, ui_event_rx: mpsc::UnboundedReceiver, + ui_event_tx: mpsc::UnboundedSender, ) -> Result<()> { let loop_result = ui_event_rx .map(Ok) .try_fold(initial_state, |ui_state, event| async { + let ui_event_tx = ui_event_tx.clone(); + let cancellation_tx = ui_state.cancellation_tx.clone(); match event { TerminalEvent::Tick => Self::refresh_ui(ui_state).await, TerminalEvent::Input(Event::Key(k)) => match k.code { - KeyCode::Char('q') => Self::on_quit(ui_state).await, + KeyCode::Char('q') => Self::on_quit(ui_state, cancellation_tx).await, KeyCode::Down => Self::on_key_down(ui_state).await, KeyCode::Up => Self::on_key_up(ui_state).await, _ => Ok(ui_state), @@ -370,7 +605,24 @@ impl TerminalUi { TerminalEvent::FileCount { dir, count } => { Self::on_file_count(ui_state, dir, count).await } - TerminalEvent::Quit => Self::on_quit(ui_state).await, + TerminalEvent::Quit => Self::on_quit(ui_state, cancellation_tx).await, + TerminalEvent::MonitorDir(path) => { + Self::on_monitor_dir( + ui_state, + path, + ui_event_tx, + cancellation_tx.subscribe(), + ) + .await + } + TerminalEvent::Telemetry(event_data) => { + let mut events = ui_state.events; + for e in event_data { + events.insert(discriminant(&e), e); + } + + Ok(UiLoopState { events, ..ui_state }) + } _ => Ok(ui_state), } }) @@ -381,4 +633,37 @@ impl TerminalUi { Err(UiLoopError::Anyhow(e)) => Err(e), } } + + fn spawn_file_count_monitor( + dir: PathBuf, + sender: mpsc::UnboundedSender, + mut cancellation_rx: broadcast::Receiver<()>, + ) -> tokio::task::JoinHandle> { + tokio::spawn(async move { + wait_for_dir(&dir).await?; + while cancellation_rx.try_recv() == Err(broadcast::TryRecvError::Empty) { + let mut rd = tokio::fs::read_dir(&dir).await?; + let mut count: usize = 0; + + while let Some(Ok(entry)) = rd.next().await { + if entry.path().is_file() { + count += 1; + } + } + + if sender + .send(TerminalEvent::FileCount { + dir: dir.clone(), + count, + }) + .is_err() + { + break; + } + + delay_for(FILE_MONITOR_POLLING_PERIOD).await; + } + Ok(()) + }) + } } diff --git a/src/agent/onefuzz-agent/src/tasks/fuzz/libfuzzer_fuzz.rs b/src/agent/onefuzz-agent/src/tasks/fuzz/libfuzzer_fuzz.rs index 27f6aa8c93..0a06af67a9 100644 --- a/src/agent/onefuzz-agent/src/tasks/fuzz/libfuzzer_fuzz.rs +++ b/src/agent/onefuzz-agent/src/tasks/fuzz/libfuzzer_fuzz.rs @@ -250,7 +250,11 @@ impl LibFuzzerFuzzTask { for file in &files { if let Some(filename) = file.file_name() { let dest = self.config.crashes.path.join(filename); - tokio::fs::rename(file, dest).await?; + if let Err(e) = tokio::fs::rename(file.clone(), dest.clone()).await { + if !dest.exists() { + bail!(e) + } + } } } diff --git a/src/agent/onefuzz-telemetry/Cargo.toml b/src/agent/onefuzz-telemetry/Cargo.toml index 60e0827ca0..3b607f42d2 100644 --- a/src/agent/onefuzz-telemetry/Cargo.toml +++ b/src/agent/onefuzz-telemetry/Cargo.toml @@ -17,7 +17,7 @@ uuid = { version = "0.8", features = ["serde", "v4"] } serde = { version = "1.0", features = ["derive"] } z3-sys = { version = "0.6", optional = true} iced-x86 = { version = "1.1", optional = true} +tokio = { version = "0.2" } +lazy_static = "1.4" -[dev-dependencies] -tokio = { version = "0.2" } diff --git a/src/agent/onefuzz-telemetry/src/lib.rs b/src/agent/onefuzz-telemetry/src/lib.rs index 72ef089baa..d6956168fd 100644 --- a/src/agent/onefuzz-telemetry/src/lib.rs +++ b/src/agent/onefuzz-telemetry/src/lib.rs @@ -11,6 +11,9 @@ use uuid::Uuid; use z3_sys::ErrorCode as Z3ErrorCode; pub use appinsights::telemetry::SeverityLevel::{Critical, Error, Information, Verbose, Warning}; +use tokio::sync::broadcast::{self, Receiver}; +#[macro_use] +extern crate lazy_static; #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] #[serde(transparent)] @@ -338,6 +341,8 @@ mod global { RwLock, }; + use tokio::sync::broadcast::Sender; + use super::*; #[derive(Default)] @@ -350,6 +355,14 @@ mod global { instance: None, microsoft: None, }; + + lazy_static! { + pub static ref EVENT_SOURCE: Sender<(Event, Vec)> = { + let (telemetry_event_source, _) = broadcast::channel::<_>(100); + telemetry_event_source + }; + } + const UNSET: usize = 0; const SETTING: usize = 1; const SET: usize = 2; @@ -501,6 +514,17 @@ pub fn format_events(events: &[EventData]) -> String { .join(" ") } +fn try_broadcast_event(event: &Event, properties: &[EventData]) -> bool { + // we ignore any send error here because they indicate that + // there are no receivers on the other end + let (event, properties) = (event.clone(), properties.to_vec()); + global::EVENT_SOURCE.send((event, properties)).is_ok() +} + +pub fn subscribe_to_events() -> Receiver<(Event, Vec)> { + global::EVENT_SOURCE.subscribe() +} + pub fn track_event(event: &Event, properties: &[EventData]) { use appinsights::telemetry::Telemetry; @@ -526,6 +550,7 @@ pub fn track_event(event: &Event, properties: &[EventData]) { } client.track(evt); } + try_broadcast_event(event, properties); } pub fn to_log_level(level: &appinsights::telemetry::SeverityLevel) -> log::Level {