Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

implementing heartbeat for the supervisor #30

Merged
merged 9 commits into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/agent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/agent/onefuzz-agent/src/debug/generic_crash_report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use uuid::Uuid;
async fn run_impl(input: String, config: Config) -> Result<()> {
let input_path = Path::new(&input);
let test_url = Url::parse("https://contoso.com/sample-container/blob.txt")?;
let processor = GenericReportProcessor::new(&config);
let heartbeat_client = config.common.init_heartbeat().await?;
let processor = GenericReportProcessor::new(&config, heartbeat_client);
let result = processor.test_input(test_url, input_path).await?;
println!("{:#?}", result);
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use url::Url;
use uuid::Uuid;

async fn run_impl(input: String, config: Config) -> Result<()> {
let task = AsanProcessor::new(Arc::new(config));
let task = AsanProcessor::new(Arc::new(config)).await?;

let test_url = Url::parse("https://contoso.com/sample-container/blob.txt")?;
let input_path = Path::new(&input);
Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz-agent/src/tasks/analysis/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async fn already_checked(config: &Config, input: &BlobUrl) -> Result<bool> {
}

async fn poll_inputs(config: &Config, tmp_dir: OwnedDir) -> Result<()> {
let heartbeat = config.common.init_heartbeat();
let heartbeat = config.common.init_heartbeat().await?;
if let Some(queue) = &config.input_queue {
let mut input_queue = QueueClient::new(queue.clone());

Expand Down
12 changes: 8 additions & 4 deletions src/agent/onefuzz-agent/src/tasks/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@ pub struct CommonConfig {
}

impl CommonConfig {
pub fn init_heartbeat(&self) -> Option<HeartbeatClient> {
self.heartbeat_queue
.clone()
.map(|url| HeartbeatClient::init(url, self.task_id))
pub async fn init_heartbeat(&self) -> Result<Option<TaskHeartbeatClient>> {
match &self.heartbeat_queue {
Some(url) => {
let hb = init_task_heartbeat(url.clone(), self.task_id).await?;
Ok(Some(hb))
}
None => Ok(None),
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,12 @@ pub struct CoverageProcessor {
pub recorder: CoverageRecorder,
pub total: TotalCoverage,
pub module_totals: HashMap<OsString, TotalCoverage>,
heartbeat_client: Option<HeartbeatClient>,
heartbeat_client: Option<TaskHeartbeatClient>,
}

impl CoverageProcessor {
pub async fn new(config: Arc<Config>) -> Result<Self> {
let heartbeat_client = config.common.init_heartbeat();
let heartbeat_client = config.common.init_heartbeat().await?;
let total = TotalCoverage::new(config.coverage.path.join(TOTAL_COVERAGE));
let recorder = CoverageRecorder::new(config.clone());
let module_totals = HashMap::default();
Expand Down
6 changes: 3 additions & 3 deletions src/agent/onefuzz-agent/src/tasks/fuzz/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub async fn spawn(config: Arc<GeneratorConfig>) -> Result<(), Error> {
utils::init_dir(&config.tools.path).await?;
utils::sync_remote_dir(&config.tools, utils::SyncOperation::Pull).await?;
set_executable(&config.tools.path).await?;
let hb_client = config.common.init_heartbeat();
let hb_client = config.common.init_heartbeat().await?;

for sync_dir in &config.readonly_inputs {
utils::init_dir(&sync_dir.path).await?;
Expand Down Expand Up @@ -129,14 +129,14 @@ async fn start_fuzzing<'a>(
config: &GeneratorConfig,
corpus_dirs: Vec<impl AsRef<Path>>,
tester: Tester<'a>,
heartbeat_sender: Option<HeartbeatClient>,
heartbeat_client: Option<TaskHeartbeatClient>,
) -> Result<()> {
let generator_tmp = "generator_tmp";

info!("Starting generator fuzzing loop");

loop {
heartbeat_sender.alive();
heartbeat_client.alive();

for corpus_dir in &corpus_dirs {
let corpus_dir = corpus_dir.as_ref();
Expand Down
6 changes: 3 additions & 3 deletions src/agent/onefuzz-agent/src/tasks/fuzz/libfuzzer_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl LibFuzzerFuzzTask {

self.init_directories().await?;
self.sync_all_corpuses().await?;
let hb_client = self.config.common.init_heartbeat();
let hb_client = self.config.common.init_heartbeat().await?;

// To be scheduled.
let resync = self.resync_all_corpuses();
Expand Down Expand Up @@ -355,7 +355,7 @@ impl Timer {
async fn report_runtime_stats(
workers: usize,
mut stats_channel: mpsc::UnboundedReceiver<RuntimeStats>,
heartbeat_sender: impl HeartbeatSender,
heartbeat_client: impl HeartbeatSender,
) -> Result<()> {
// Cache the last-reported stats for a given worker.
//
Expand All @@ -369,7 +369,7 @@ async fn report_runtime_stats(
loop {
tokio::select! {
Some(stats) = stats_channel.next() => {
heartbeat_sender.alive();
heartbeat_client.alive();
stats.report();

let idx = stats.worker_id as usize;
Expand Down
5 changes: 3 additions & 2 deletions src/agent/onefuzz-agent/src/tasks/fuzz/supervisor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.


#![allow(clippy::too_many_arguments)]
use crate::tasks::{
config::{CommonConfig, ContainerType, SyncedDir},
Expand Down Expand Up @@ -104,7 +105,7 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {

let stopped = Notify::new();
let monitor_process = monitor_process(process, &stopped);
let hb = config.common.init_heartbeat();
let hb = config.common.init_heartbeat().await?;

let heartbeat_process = heartbeat_process(&stopped, hb);

Expand Down Expand Up @@ -134,7 +135,7 @@ pub async fn spawn(config: SupervisorConfig) -> Result<(), Error> {

async fn heartbeat_process(
stopped: &Notify,
heartbeat_client: Option<HeartbeatClient>,
heartbeat_client: Option<TaskHeartbeatClient>,
) -> Result<()> {
while !stopped.is_notified(HEARTBEAT_PERIOD).await {
heartbeat_client.alive();
Expand Down
149 changes: 38 additions & 111 deletions src/agent/onefuzz-agent/src/tasks/heartbeat.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,13 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

use crate::onefuzz::heartbeat::HeartbeatClient;
use crate::onefuzz::machine_id::{get_machine_id, get_machine_name};
use crate::tasks::utils::CheckNotify;
use anyhow::Result;
use reqwest::Url;
use serde::{self, Deserialize, Serialize};
use std::{
collections::HashSet,
sync::{Arc, Mutex},
time::Duration,
};
use storage_queue::QueueClient;
use tokio::{
sync::Notify,
task::{self, JoinHandle},
};
use uuid::Uuid;

const DEFAULT_HEARTBEAT_PERIOD: Duration = Duration::from_secs(60 * 5);
#[derive(Debug, Deserialize, Serialize, Hash, Eq, PartialEq, Clone)]
#[serde(tag = "type")]
pub enum HeartbeatData {
Expand All @@ -27,115 +16,53 @@ pub enum HeartbeatData {
}

#[derive(Debug, Deserialize, Serialize, Clone)]
struct Heartbeat<'a> {
struct Heartbeat {
task_id: Uuid,
machine_id: Uuid,
machine_name: &'a str,
machine_name: String,
data: Vec<HeartbeatData>,
}

pub struct HeartbeatClient {
cancelled: Arc<Notify>,
messages: Arc<Mutex<HashSet<HeartbeatData>>>,
_heartbeat_process: JoinHandle<Result<()>>,
}

impl Drop for HeartbeatClient {
fn drop(&mut self) {
self.cancelled.notify();
}
#[derive(Clone)]
pub struct TaskContext {
task_id: Uuid,
machine_id: Uuid,
machine_name: String,
}

impl HeartbeatClient {
pub fn init(queue_url: Url, task_id: Uuid) -> Self {
HeartbeatClient::init_with_period(queue_url, task_id, DEFAULT_HEARTBEAT_PERIOD)
}
pub type TaskHeartbeatClient = HeartbeatClient<TaskContext, HeartbeatData>;

pub fn init_with_period(queue_url: Url, task_id: Uuid, heartbeat_period: Duration) -> Self {
let messages = Arc::new(Mutex::new(HashSet::new()));
let cancelled = Arc::new(Notify::new());
let _heartbeat_process = HeartbeatClient::start_background_process(
pub async fn init_task_heartbeat(queue_url: Url, task_id: Uuid) -> Result<TaskHeartbeatClient> {
let machine_id = get_machine_id().await?;
let machine_name = get_machine_name().await?;
let hb = HeartbeatClient::init_heartbeat(
TaskContext {
task_id,
queue_url,
messages.clone(),
cancelled.clone(),
heartbeat_period,
);
HeartbeatClient {
messages,
_heartbeat_process,
cancelled,
}
}

fn drain_current_messages(messages: Arc<Mutex<HashSet<HeartbeatData>>>) -> Vec<HeartbeatData> {
let lock = messages.lock();
let mut messages = lock.unwrap();
let drain = messages.iter().cloned().collect::<Vec<HeartbeatData>>();
messages.clear();
drain
}
machine_id,
machine_name,
},
queue_url,
None,
|context| async move {
let task_id = context.state.task_id;
let machine_id = context.state.machine_id;
let machine_name = context.state.machine_name.clone();

async fn flush(
task_id: Uuid,
machine_id: Uuid,
machine_name: &str,
queue_client: &QueueClient,
messages: Arc<Mutex<HashSet<HeartbeatData>>>,
) {
let mut data = HeartbeatClient::drain_current_messages(messages.clone());
data.push(HeartbeatData::MachineAlive);
let _ = queue_client
.enqueue(Heartbeat {
task_id,
data,
machine_id,
machine_name,
})
.await;
}

pub fn start_background_process(
task_id: Uuid,
queue_url: Url,
messages: Arc<Mutex<HashSet<HeartbeatData>>>,
cancelled: Arc<Notify>,
heartbeat_period: Duration,
) -> JoinHandle<Result<()>> {
let queue_client = QueueClient::new(queue_url);
task::spawn(async move {
let machine_id = get_machine_id().await?;
let machine_name = get_machine_name().await?;

HeartbeatClient::flush(
task_id,
machine_id,
&machine_name,
&queue_client,
messages.clone(),
)
.await;
while !cancelled.is_notified(heartbeat_period).await {
HeartbeatClient::flush(
let mut data =
HeartbeatClient::<TaskContext, _>::drain_current_messages(context.clone());
data.push(HeartbeatData::MachineAlive);
let _ = context
.queue_client
.enqueue(Heartbeat {
task_id,
data,
machine_id,
&machine_name,
&queue_client,
messages.clone(),
)
machine_name,
})
.await;
}
HeartbeatClient::flush(
task_id,
machine_id,
&machine_name,
&queue_client,
messages.clone(),
)
.await;
Ok(())
})
}
},
);
Ok(hb)
}

pub trait HeartbeatSender {
Expand All @@ -146,15 +73,15 @@ pub trait HeartbeatSender {
}
}

impl HeartbeatSender for HeartbeatClient {
impl HeartbeatSender for TaskHeartbeatClient {
fn send(&self, data: HeartbeatData) -> Result<()> {
let mut messages_lock = self.messages.lock().unwrap();
let mut messages_lock = self.context.pending_messages.lock().unwrap();
messages_lock.insert(data);
Ok(())
}
}

impl HeartbeatSender for Option<HeartbeatClient> {
impl HeartbeatSender for Option<TaskHeartbeatClient> {
fn send(&self, data: HeartbeatData) -> Result<()> {
match self {
Some(client) => client.send(data),
Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz-agent/src/tasks/merge/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub async fn spawn(config: Arc<Config>) -> Result<()> {
set_executable(&config.tools.path).await?;

utils::init_dir(&config.unique_inputs.path).await?;
let hb_client = config.common.init_heartbeat();
let hb_client = config.common.init_heartbeat().await?;
loop {
hb_client.alive();
let tmp_dir = PathBuf::from("./tmp");
Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz-agent/src/tasks/merge/libfuzzer_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct Config {
}

pub async fn spawn(config: Arc<Config>) -> Result<()> {
let hb_client = config.common.init_heartbeat();
let hb_client = config.common.init_heartbeat().await?;
utils::init_dir(&config.unique_inputs.path).await?;
loop {
hb_client.alive();
Expand Down
8 changes: 4 additions & 4 deletions src/agent/onefuzz-agent/src/tasks/report/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ impl<'a> ReportTask<'a> {

pub async fn run(&mut self) -> Result<()> {
info!("Starting generic crash report task");
let mut processor = GenericReportProcessor::new(&self.config);
let heartbeat_client = self.config.common.init_heartbeat().await?;
let mut processor = GenericReportProcessor::new(&self.config, heartbeat_client);

if let Some(crashes) = &self.config.crashes {
self.poller.batch_process(&mut processor, &crashes).await?;
Expand All @@ -82,12 +83,11 @@ impl<'a> ReportTask<'a> {
pub struct GenericReportProcessor<'a> {
config: &'a Config,
tester: Tester<'a>,
heartbeat_client: Option<HeartbeatClient>,
heartbeat_client: Option<TaskHeartbeatClient>,
}

impl<'a> GenericReportProcessor<'a> {
pub fn new(config: &'a Config) -> Self {
let heartbeat_client = config.common.init_heartbeat();
pub fn new(config: &'a Config, heartbeat_client: Option<TaskHeartbeatClient>) -> Self {
let tester = Tester::new(
&config.target_exe,
&config.target_options,
Expand Down
Loading