From 8433b32ac76572f5b1c9a12c3773cc5378358f24 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Mon, 19 Oct 2020 15:00:48 -0400 Subject: [PATCH 1/3] only emit events on a change of state --- src/agent/onefuzz-supervisor/src/agent.rs | 16 ++++++++++---- src/agent/onefuzz-supervisor/src/scheduler.rs | 22 +++++++++++++++++++ 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/src/agent/onefuzz-supervisor/src/agent.rs b/src/agent/onefuzz-supervisor/src/agent.rs index 8ea46d69ca..ada9afe930 100644 --- a/src/agent/onefuzz-supervisor/src/agent.rs +++ b/src/agent/onefuzz-supervisor/src/agent.rs @@ -21,6 +21,7 @@ pub struct Agent { work_queue: Box, worker_runner: Box, _heartbeat: Option, + previous_state: Option, } impl Agent { @@ -34,6 +35,7 @@ impl Agent { heartbeat: Option, ) -> Self { let scheduler = Some(scheduler); + let previous_state = None; Self { coordinator, @@ -43,6 +45,7 @@ impl Agent { work_queue, worker_runner, _heartbeat: heartbeat, + previous_state, } } @@ -92,6 +95,7 @@ impl Agent { Scheduler::Done(s) => self.done(s).await?, }; + self.previous_state = Some(StateValue::from(&next)); let done = matches!(next, Scheduler::Done(..)); self.scheduler = Some(next); @@ -100,8 +104,10 @@ impl Agent { } async fn free(&mut self, state: State) -> Result { - let event = StateUpdateEvent::Free.into(); - self.coordinator.emit_event(event).await?; + if !matches!(self.previous_state, Some(StateValue::Free)) { + let event = StateUpdateEvent::Free.into(); + self.coordinator.emit_event(event).await?; + } let msg = self.work_queue.poll().await?; @@ -208,8 +214,10 @@ impl Agent { } async fn busy(&mut self, state: State) -> Result { - let event = StateUpdateEvent::Busy.into(); - self.coordinator.emit_event(event).await?; + if !matches!(self.previous_state, Some(StateValue::Busy)) { + let event = StateUpdateEvent::Busy.into(); + self.coordinator.emit_event(event).await?; + } let mut events = vec![]; let updated = state diff --git a/src/agent/onefuzz-supervisor/src/scheduler.rs b/src/agent/onefuzz-supervisor/src/scheduler.rs index 62cd213300..e1070e9a74 100644 --- a/src/agent/onefuzz-supervisor/src/scheduler.rs +++ b/src/agent/onefuzz-supervisor/src/scheduler.rs @@ -35,6 +35,28 @@ impl fmt::Display for Scheduler { } } +pub enum StateValue { + Free, + SettingUp, + PendingReboot, + Ready, + Busy, + Done, +} + +impl From<&Scheduler> for StateValue { + fn from(value: &Scheduler) -> Self { + match value { + Scheduler::Free(_) => Self::Free, + Scheduler::SettingUp(_) => Self::SettingUp, + Scheduler::PendingReboot(_) => Self::PendingReboot, + Scheduler::Ready(_) => Self::Ready, + Scheduler::Busy(_) => Self::Busy, + Scheduler::Done(_) => Self::Done, + } + } +} + impl Scheduler { pub fn new() -> Self { Self::default() From 807c0f3084242c27755e887dbfe95e45285b2c58 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Mon, 19 Oct 2020 18:22:02 -0400 Subject: [PATCH 2/3] address comments --- src/agent/onefuzz-supervisor/src/agent.rs | 54 ++++++++++--------- src/agent/onefuzz-supervisor/src/scheduler.rs | 37 +++++-------- 2 files changed, 44 insertions(+), 47 deletions(-) diff --git a/src/agent/onefuzz-supervisor/src/agent.rs b/src/agent/onefuzz-supervisor/src/agent.rs index ada9afe930..c08ae9da18 100644 --- a/src/agent/onefuzz-supervisor/src/agent.rs +++ b/src/agent/onefuzz-supervisor/src/agent.rs @@ -21,7 +21,7 @@ pub struct Agent { work_queue: Box, worker_runner: Box, _heartbeat: Option, - previous_state: Option, + previous_state: NodeState, } impl Agent { @@ -35,7 +35,7 @@ impl Agent { heartbeat: Option, ) -> Self { let scheduler = Some(scheduler); - let previous_state = None; + let previous_state = NodeState::Init; Self { coordinator, @@ -95,7 +95,7 @@ impl Agent { Scheduler::Done(s) => self.done(s).await?, }; - self.previous_state = Some(StateValue::from(&next)); + self.previous_state = NodeState::from(&next); let done = matches!(next, Scheduler::Done(..)); self.scheduler = Some(next); @@ -103,12 +103,26 @@ impl Agent { Ok(done) } - async fn free(&mut self, state: State) -> Result { - if !matches!(self.previous_state, Some(StateValue::Free)) { - let event = StateUpdateEvent::Free.into(); - self.coordinator.emit_event(event).await?; + async fn emit_state_update_if_changed(&mut self, event: StateUpdateEvent) -> Result<()> { + match (&event, self.previous_state) { + (StateUpdateEvent::Free, NodeState::Free) + | (StateUpdateEvent::Busy, NodeState::Busy) + | (StateUpdateEvent::SettingUp{..}, NodeState::SettingUp) + | (StateUpdateEvent::Rebooting, NodeState::Rebooting) + | (StateUpdateEvent::Ready, NodeState::Ready) + | (StateUpdateEvent::Done{..}, NodeState::Done) => {} + _ => { + self.coordinator.emit_event(event.into()).await?; + } } + Ok(()) + } + + async fn free(&mut self, state: State) -> Result { + self.emit_state_update_if_changed(StateUpdateEvent::Free) + .await?; + let msg = self.work_queue.poll().await?; let next = if let Some(msg) = msg { @@ -179,8 +193,8 @@ impl Agent { verbose!("agent setting up"); let tasks = state.work_set().task_ids(); - let event = StateUpdateEvent::SettingUp { tasks }; - self.coordinator.emit_event(event.into()).await?; + self.emit_state_update_if_changed(StateUpdateEvent::SettingUp { tasks }) + .await?; let scheduler = match state.finish(self.setup_runner.as_mut()).await? { SetupDone::Ready(s) => s.into(), @@ -193,9 +207,8 @@ impl Agent { async fn pending_reboot(&mut self, state: State) -> Result { verbose!("agent pending reboot"); - - let event = StateUpdateEvent::Rebooting.into(); - self.coordinator.emit_event(event).await?; + self.emit_state_update_if_changed(StateUpdateEvent::Rebooting) + .await?; let ctx = state.reboot_context(); self.reboot.save_context(ctx).await?; @@ -206,19 +219,14 @@ impl Agent { async fn ready(&mut self, state: State) -> Result { verbose!("agent ready"); - - let event = StateUpdateEvent::Ready.into(); - self.coordinator.emit_event(event).await?; - + self.emit_state_update_if_changed(StateUpdateEvent::Ready) + .await?; Ok(state.run().await?.into()) } async fn busy(&mut self, state: State) -> Result { - if !matches!(self.previous_state, Some(StateValue::Busy)) { - let event = StateUpdateEvent::Busy.into(); - self.coordinator.emit_event(event).await?; - } - + self.emit_state_update_if_changed(StateUpdateEvent::Busy) + .await?; let mut events = vec![]; let updated = state .update(&mut events, self.worker_runner.as_mut()) @@ -249,9 +257,7 @@ impl Agent { }, }; - let event = event.into(); - self.coordinator.emit_event(event).await?; - + self.emit_state_update_if_changed(event).await?; // `Done` is a final state. Ok(state.into()) } diff --git a/src/agent/onefuzz-supervisor/src/scheduler.rs b/src/agent/onefuzz-supervisor/src/scheduler.rs index e1070e9a74..5676eb6944 100644 --- a/src/agent/onefuzz-supervisor/src/scheduler.rs +++ b/src/agent/onefuzz-supervisor/src/scheduler.rs @@ -6,7 +6,7 @@ use std::fmt; use anyhow::Result; use onefuzz::process::Output; -use crate::coordinator::NodeCommand; +use crate::coordinator::{NodeCommand, NodeState}; use crate::reboot::RebootContext; use crate::setup::ISetupRunner; use crate::work::*; @@ -21,6 +21,19 @@ pub enum Scheduler { Done(State), } +impl From<&Scheduler> for NodeState { + fn from(value: &Scheduler) -> Self { + match value { + Scheduler::Free(_) => Self::Free, + Scheduler::SettingUp(_) => Self::SettingUp, + Scheduler::PendingReboot(_) => Self::Rebooting, + Scheduler::Ready(_) => Self::Ready, + Scheduler::Busy(_) => Self::Busy, + Scheduler::Done(_) => Self::Done, + } + } +} + impl fmt::Display for Scheduler { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let s = match self { @@ -35,28 +48,6 @@ impl fmt::Display for Scheduler { } } -pub enum StateValue { - Free, - SettingUp, - PendingReboot, - Ready, - Busy, - Done, -} - -impl From<&Scheduler> for StateValue { - fn from(value: &Scheduler) -> Self { - match value { - Scheduler::Free(_) => Self::Free, - Scheduler::SettingUp(_) => Self::SettingUp, - Scheduler::PendingReboot(_) => Self::PendingReboot, - Scheduler::Ready(_) => Self::Ready, - Scheduler::Busy(_) => Self::Busy, - Scheduler::Done(_) => Self::Done, - } - } -} - impl Scheduler { pub fn new() -> Self { Self::default() From a1744047f0e8499894410ae5015f56d437f9b32c Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Mon, 19 Oct 2020 18:25:49 -0400 Subject: [PATCH 3/3] cargo fmt --- src/agent/onefuzz-supervisor/src/agent.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/agent/onefuzz-supervisor/src/agent.rs b/src/agent/onefuzz-supervisor/src/agent.rs index c08ae9da18..e0d4422ad4 100644 --- a/src/agent/onefuzz-supervisor/src/agent.rs +++ b/src/agent/onefuzz-supervisor/src/agent.rs @@ -107,10 +107,10 @@ impl Agent { match (&event, self.previous_state) { (StateUpdateEvent::Free, NodeState::Free) | (StateUpdateEvent::Busy, NodeState::Busy) - | (StateUpdateEvent::SettingUp{..}, NodeState::SettingUp) + | (StateUpdateEvent::SettingUp { .. }, NodeState::SettingUp) | (StateUpdateEvent::Rebooting, NodeState::Rebooting) | (StateUpdateEvent::Ready, NodeState::Ready) - | (StateUpdateEvent::Done{..}, NodeState::Done) => {} + | (StateUpdateEvent::Done { .. }, NodeState::Done) => {} _ => { self.coordinator.emit_event(event.into()).await?; }