Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Commit

Permalink
implementing heartbeat for the supervisor
Browse files Browse the repository at this point in the history
  • Loading branch information
chkeita committed Sep 29, 2020
1 parent 5cab62b commit fa358e3
Show file tree
Hide file tree
Showing 30 changed files with 395 additions and 171 deletions.
2 changes: 2 additions & 0 deletions src/agent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/agent/onefuzz-agent/src/debug/generic_crash_report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use uuid::Uuid;
async fn run_impl(input: String, config: Config) -> Result<()> {
let input_path = Path::new(&input);
let test_url = Url::parse("https://contoso.com/sample-container/blob.txt")?;
let processor = GenericReportProcessor::new(&config);
let heartbeat_client = config.common.init_heartbeat().await?;
let processor = GenericReportProcessor::new(&config, heartbeat_client);
let result = processor.test_input(test_url, input_path).await?;
println!("{:#?}", result);
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use url::Url;
use uuid::Uuid;

async fn run_impl(input: String, config: Config) -> Result<()> {
let task = AsanProcessor::new(Arc::new(config));
let task = AsanProcessor::new(Arc::new(config)).await?;

let test_url = Url::parse("https://contoso.com/sample-container/blob.txt")?;
let input_path = Path::new(&input);
Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz-agent/src/tasks/analysis/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async fn already_checked(config: &Config, input: &BlobUrl) -> Result<bool> {
}

async fn poll_inputs(config: &Config, tmp_dir: OwnedDir) -> Result<()> {
let heartbeat = config.common.init_heartbeat();
let heartbeat = config.common.init_heartbeat().await?;
if let Some(queue) = &config.input_queue {
let mut input_queue = QueueClient::new(queue.clone());

Expand Down
12 changes: 8 additions & 4 deletions src/agent/onefuzz-agent/src/tasks/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@ pub struct CommonConfig {
}

impl CommonConfig {
pub fn init_heartbeat(&self) -> Option<HeartbeatClient> {
self.heartbeat_queue
.clone()
.map(|url| HeartbeatClient::init(url, self.task_id))
pub async fn init_heartbeat(&self) -> Result<Option<TaskHeartbeatClient>> {
match &self.heartbeat_queue {
Some(url) => {
let hb = init_task_heartbeat(url.clone(), self.task_id).await?;
Ok(Some(hb))
}
None => Ok(None),
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,12 @@ pub struct CoverageProcessor {
pub recorder: CoverageRecorder,
pub total: TotalCoverage,
pub module_totals: HashMap<OsString, TotalCoverage>,
heartbeat_client: Option<HeartbeatClient>,
heartbeat_client: Option<TaskHeartbeatClient>,
}

impl CoverageProcessor {
pub async fn new(config: Arc<Config>) -> Result<Self> {
let heartbeat_client = config.common.init_heartbeat();
let heartbeat_client = config.common.init_heartbeat().await?;
let total = TotalCoverage::new(config.coverage.path.join(TOTAL_COVERAGE));
let recorder = CoverageRecorder::new(config.clone());
let module_totals = HashMap::default();
Expand Down
6 changes: 3 additions & 3 deletions src/agent/onefuzz-agent/src/tasks/fuzz/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub async fn spawn(config: Arc<GeneratorConfig>) -> Result<(), Error> {
utils::init_dir(&config.tools.path).await?;
utils::sync_remote_dir(&config.tools, utils::SyncOperation::Pull).await?;
set_executable(&config.tools.path).await?;
let hb_client = config.common.init_heartbeat();
let hb_client = config.common.init_heartbeat().await?;

for sync_dir in &config.readonly_inputs {
utils::init_dir(&sync_dir.path).await?;
Expand Down Expand Up @@ -128,14 +128,14 @@ async fn start_fuzzing<'a>(
config: &GeneratorConfig,
corpus_dirs: Vec<impl AsRef<Path>>,
tester: Tester<'a>,
heartbeat_sender: Option<HeartbeatClient>,
heartbeat_client: Option<TaskHeartbeatClient>,
) -> Result<()> {
let generator_tmp = "generator_tmp";

info!("Starting generator fuzzing loop");

loop {
heartbeat_sender.alive();
heartbeat_client.alive();

for corpus_dir in &corpus_dirs {
let corpus_dir = corpus_dir.as_ref();
Expand Down
6 changes: 3 additions & 3 deletions src/agent/onefuzz-agent/src/tasks/fuzz/libfuzzer_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl LibFuzzerFuzzTask {

self.init_directories().await?;
self.sync_all_corpuses().await?;
let hb_client = self.config.common.init_heartbeat();
let hb_client = self.config.common.init_heartbeat().await?;

// To be scheduled.
let resync = self.resync_all_corpuses();
Expand Down Expand Up @@ -343,7 +343,7 @@ impl Timer {
async fn report_runtime_stats(
workers: usize,
mut stats_channel: mpsc::UnboundedReceiver<RuntimeStats>,
heartbeat_sender: impl HeartbeatSender,
heartbeat_client: impl HeartbeatSender,
) -> Result<()> {
// Cache the last-reported stats for a given worker.
//
Expand All @@ -357,7 +357,7 @@ async fn report_runtime_stats(
loop {
tokio::select! {
Some(stats) = stats_channel.next() => {
heartbeat_sender.alive();
heartbeat_client.alive();
stats.report();

let idx = stats.worker_id as usize;
Expand Down
5 changes: 3 additions & 2 deletions src/agent/onefuzz-agent/src/tasks/fuzz/supervisor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.


#![allow(clippy::too_many_arguments)]
use crate::tasks::{
config::{CommonConfig, ContainerType, SyncedDir},
Expand Down Expand Up @@ -105,7 +106,7 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {

let stopped = Notify::new();
let monitor_process = monitor_process(process, &stopped);
let hb = config.common.init_heartbeat();
let hb = config.common.init_heartbeat().await?;

let heartbeat_process = heartbeat_process(&stopped, hb);

Expand Down Expand Up @@ -135,7 +136,7 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {

async fn heartbeat_process(
stopped: &Notify,
heartbeat_client: Option<HeartbeatClient>,
heartbeat_client: Option<TaskHeartbeatClient>,
) -> Result<()> {
while !stopped.is_notified(HEARTBEAT_PERIOD).await {
heartbeat_client.alive();
Expand Down
149 changes: 39 additions & 110 deletions src/agent/onefuzz-agent/src/tasks/heartbeat.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,12 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

use crate::onefuzz::heartbeat::HeartbeatClient;
use crate::onefuzz::machine_id::{get_machine_id, get_machine_name};
use crate::tasks::utils::CheckNotify;
use anyhow::Result;
use reqwest::Url;
use serde::{self, Deserialize, Serialize};
use std::{
collections::HashSet,
sync::{Arc, Mutex},
time::Duration,
};
use storage_queue::QueueClient;
use tokio::{
sync::Notify,
task::{self, JoinHandle},
};
use std::time::Duration;
use uuid::Uuid;

const DEFAULT_HEARTBEAT_PERIOD: Duration = Duration::from_secs(60 * 5);
Expand All @@ -27,115 +18,53 @@ pub enum HeartbeatData {
}

#[derive(Debug, Deserialize, Serialize, Clone)]
struct Heartbeat<'a> {
struct Heartbeat {
task_id: Uuid,
machine_id: Uuid,
machine_name: &'a str,
machine_name: String,
data: Vec<HeartbeatData>,
}

pub struct HeartbeatClient {
cancelled: Arc<Notify>,
messages: Arc<Mutex<HashSet<HeartbeatData>>>,
_heartbeat_process: JoinHandle<Result<()>>,
}

impl Drop for HeartbeatClient {
fn drop(&mut self) {
self.cancelled.notify();
}
#[derive(Clone)]
pub struct TaskContext {
task_id: Uuid,
machine_id: Uuid,
machine_name: String,
}

impl HeartbeatClient {
pub fn init(queue_url: Url, task_id: Uuid) -> Self {
HeartbeatClient::init_with_period(queue_url, task_id, DEFAULT_HEARTBEAT_PERIOD)
}
pub type TaskHeartbeatClient = HeartbeatClient<TaskContext, HeartbeatData>;

pub fn init_with_period(queue_url: Url, task_id: Uuid, heartbeat_period: Duration) -> Self {
let messages = Arc::new(Mutex::new(HashSet::new()));
let cancelled = Arc::new(Notify::new());
let _heartbeat_process = HeartbeatClient::start_background_process(
pub async fn init_task_heartbeat(queue_url: Url, task_id: Uuid) -> Result<TaskHeartbeatClient> {
let machine_id = get_machine_id().await?;
let machine_name = get_machine_name().await?;
let hb = HeartbeatClient::init_heartbeat(
TaskContext {
task_id,
queue_url,
messages.clone(),
cancelled.clone(),
heartbeat_period,
);
HeartbeatClient {
messages,
_heartbeat_process,
cancelled,
}
}

fn drain_current_messages(messages: Arc<Mutex<HashSet<HeartbeatData>>>) -> Vec<HeartbeatData> {
let lock = messages.lock();
let mut messages = lock.unwrap();
let drain = messages.iter().cloned().collect::<Vec<HeartbeatData>>();
messages.clear();
drain
}
machine_id,
machine_name,
},
queue_url,
DEFAULT_HEARTBEAT_PERIOD,
|context| async move {
let task_id = context.state.task_id;
let machine_id = context.state.machine_id;
let machine_name = context.state.machine_name.clone();

async fn flush(
task_id: Uuid,
machine_id: Uuid,
machine_name: &str,
queue_client: &QueueClient,
messages: Arc<Mutex<HashSet<HeartbeatData>>>,
) {
let mut data = HeartbeatClient::drain_current_messages(messages.clone());
data.push(HeartbeatData::MachineAlive);
let _ = queue_client
.enqueue(Heartbeat {
task_id,
data,
machine_id,
machine_name,
})
.await;
}

pub fn start_background_process(
task_id: Uuid,
queue_url: Url,
messages: Arc<Mutex<HashSet<HeartbeatData>>>,
cancelled: Arc<Notify>,
heartbeat_period: Duration,
) -> JoinHandle<Result<()>> {
let queue_client = QueueClient::new(queue_url);
task::spawn(async move {
let machine_id = get_machine_id().await?;
let machine_name = get_machine_name().await?;

HeartbeatClient::flush(
task_id,
machine_id,
&machine_name,
&queue_client,
messages.clone(),
)
.await;
while !cancelled.is_notified(heartbeat_period).await {
HeartbeatClient::flush(
let mut data =
HeartbeatClient::<TaskContext, _>::drain_current_messages(context.clone());
data.push(HeartbeatData::MachineAlive);
let _ = context
.queue_client
.enqueue(Heartbeat {
task_id,
data,
machine_id,
&machine_name,
&queue_client,
messages.clone(),
)
machine_name,
})
.await;
}
HeartbeatClient::flush(
task_id,
machine_id,
&machine_name,
&queue_client,
messages.clone(),
)
.await;
Ok(())
})
}
},
);
Ok(hb)
}

pub trait HeartbeatSender {
Expand All @@ -146,15 +75,15 @@ pub trait HeartbeatSender {
}
}

impl HeartbeatSender for HeartbeatClient {
impl HeartbeatSender for TaskHeartbeatClient {
fn send(&self, data: HeartbeatData) -> Result<()> {
let mut messages_lock = self.messages.lock().unwrap();
let mut messages_lock = self.context.pending_messages.lock().unwrap();
messages_lock.insert(data);
Ok(())
}
}

impl HeartbeatSender for Option<HeartbeatClient> {
impl HeartbeatSender for Option<TaskHeartbeatClient> {
fn send(&self, data: HeartbeatData) -> Result<()> {
match self {
Some(client) => client.send(data),
Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz-agent/src/tasks/merge/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub async fn spawn(config: Arc<Config>) -> Result<()> {
set_executable(&config.tools.path).await?;

utils::init_dir(&config.unique_inputs.path).await?;
let hb_client = config.common.init_heartbeat();
let hb_client = config.common.init_heartbeat().await?;
loop {
hb_client.alive();
let tmp_dir = PathBuf::from("./tmp");
Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz-agent/src/tasks/merge/libfuzzer_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct Config {
}

pub async fn spawn(config: Arc<Config>) -> Result<()> {
let hb_client = config.common.init_heartbeat();
let hb_client = config.common.init_heartbeat().await?;
utils::init_dir(&config.unique_inputs.path).await?;
loop {
hb_client.alive();
Expand Down
8 changes: 4 additions & 4 deletions src/agent/onefuzz-agent/src/tasks/report/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ impl<'a> ReportTask<'a> {

pub async fn run(&mut self) -> Result<()> {
info!("Starting generic crash report task");
let mut processor = GenericReportProcessor::new(&self.config);
let heartbeat_client = self.config.common.init_heartbeat().await?;
let mut processor = GenericReportProcessor::new(&self.config, heartbeat_client);

if let Some(crashes) = &self.config.crashes {
self.poller.batch_process(&mut processor, &crashes).await?;
Expand All @@ -82,12 +83,11 @@ impl<'a> ReportTask<'a> {
pub struct GenericReportProcessor<'a> {
config: &'a Config,
tester: Tester<'a>,
heartbeat_client: Option<HeartbeatClient>,
heartbeat_client: Option<TaskHeartbeatClient>,
}

impl<'a> GenericReportProcessor<'a> {
pub fn new(config: &'a Config) -> Self {
let heartbeat_client = config.common.init_heartbeat();
pub fn new(config: &'a Config, heartbeat_client: Option<TaskHeartbeatClient>) -> Self {
let tester = Tester::new(
&config.target_exe,
&config.target_options,
Expand Down
Loading

0 comments on commit fa358e3

Please sign in to comment.