Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Reduce mutation in the agent state machine #2710

Merged
merged 9 commits into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 83 additions & 52 deletions src/agent/onefuzz-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::reboot::*;
use crate::scheduler::*;
use crate::setup::*;
use crate::work::IWorkQueue;
use crate::worker::IWorkerRunner;
use crate::worker::{IWorkerRunner, WorkerEvent};

const PENDING_COMMANDS_DELAY: time::Duration = time::Duration::from_secs(10);
const BUSY_DELAY: time::Duration = time::Duration::from_secs(1);
Expand Down Expand Up @@ -62,7 +62,7 @@ impl Agent {
}
}

pub async fn run(&mut self) -> Result<()> {
pub async fn run(self) -> Result<()> {
let mut instant = time::Instant::now();

// Tell the service that the agent has started.
Expand All @@ -78,42 +78,39 @@ impl Agent {
let event = StateUpdateEvent::Init.into();
self.coordinator.emit_event(event).await?;
}

loop {
self.heartbeat.alive();
let mut state = self;
let mut done = false;
while !done {
state.heartbeat.alive();
if instant.elapsed() >= PENDING_COMMANDS_DELAY {
self.execute_pending_commands().await?;
state = state.execute_pending_commands().await?;
instant = time::Instant::now();
}

let done = self.update().await?;

if done {
debug!("agent done, exiting loop");
break;
}
(state, done) = state.update().await?;
}

info!("agent done, exiting loop");
Ok(())
}

async fn update(&mut self) -> Result<bool> {
async fn update(mut self) -> Result<(Self, bool)> {
let last = self.scheduler.take().ok_or_else(scheduler_error)?;
let previous_state = NodeState::from(&last);
let (next, done) = match last {
Scheduler::Free(s) => (self.free(s).await?, false),
Scheduler::SettingUp(s) => (self.setting_up(s).await?, false),
Scheduler::PendingReboot(s) => (self.pending_reboot(s).await?, false),
Scheduler::Ready(s) => (self.ready(s).await?, false),
Scheduler::Busy(s) => (self.busy(s).await?, false),
Scheduler::Done(s) => (self.done(s).await?, true),
Scheduler::Free(s) => (self.free(s, previous_state).await?, false),
Scheduler::SettingUp(s) => (self.setting_up(s, previous_state).await?, false),
Scheduler::PendingReboot(s) => (self.pending_reboot(s, previous_state).await?, false),
Scheduler::Ready(s) => (self.ready(s, previous_state).await?, false),
Scheduler::Busy(s) => (self.busy(s, previous_state).await?, false),
//todo: introduce a new prameter to allow the agent to restart after this point
Scheduler::Done(s) => (self.done(s, previous_state).await?, true),
};
self.previous_state = previous_state;
self.scheduler = Some(next);
Ok(done)

Ok((next, done))
}

async fn emit_state_update_if_changed(&mut self, event: StateUpdateEvent) -> Result<()> {
async fn emit_state_update_if_changed(&self, event: StateUpdateEvent) -> Result<()> {
match (&event, self.previous_state) {
(StateUpdateEvent::Free, NodeState::Free)
| (StateUpdateEvent::Busy, NodeState::Busy)
Expand All @@ -129,7 +126,7 @@ impl Agent {
Ok(())
}

async fn free(&mut self, state: State<Free>) -> Result<Scheduler> {
async fn free(mut self, state: State<Free>, previous: NodeState) -> Result<Self> {
self.emit_state_update_if_changed(StateUpdateEvent::Free)
.await?;

Expand Down Expand Up @@ -190,7 +187,7 @@ impl Agent {
// Otherwise, the work was not stopped, but we still should not execute it. This is likely
// our because agent version is out of date. Do nothing, so another node can see the work.
// The service will eventually send us a stop command and reimage our node, if appropriate.
debug!(
info!(
"not scheduling active work set, not dropping: {:?}",
msg.work_set
);
Expand All @@ -205,11 +202,15 @@ impl Agent {
state.into()
};

Ok(next)
Ok(Self {
previous_state: previous,
scheduler: Some(next),
..self
})
}

async fn setting_up(&mut self, state: State<SettingUp>) -> Result<Scheduler> {
debug!("agent setting up");
async fn setting_up(mut self, state: State<SettingUp>, previous: NodeState) -> Result<Self> {
info!("agent setting up");

let tasks = state.work_set().task_ids();
self.emit_state_update_if_changed(StateUpdateEvent::SettingUp { tasks })
Expand All @@ -221,11 +222,19 @@ impl Agent {
SetupDone::Done(s) => s.into(),
};

Ok(scheduler)
Ok(Self {
previous_state: previous,
scheduler: Some(scheduler),
..self
})
}

async fn pending_reboot(&mut self, state: State<PendingReboot>) -> Result<Scheduler> {
debug!("agent pending reboot");
async fn pending_reboot(
self,
state: State<PendingReboot>,
_previous: NodeState,
) -> Result<Self> {
info!("agent pending reboot");
self.emit_state_update_if_changed(StateUpdateEvent::Rebooting)
.await?;

Expand All @@ -236,14 +245,18 @@ impl Agent {
unreachable!()
}

async fn ready(&mut self, state: State<Ready>) -> Result<Scheduler> {
debug!("agent ready");
async fn ready(self, state: State<Ready>, previous: NodeState) -> Result<Self> {
info!("agent ready");
self.emit_state_update_if_changed(StateUpdateEvent::Ready)
.await?;
Ok(state.run().await?.into())
Ok(Self {
previous_state: previous,
scheduler: Some(state.run().await?.into()),
..self
})
}

async fn busy(&mut self, state: State<Busy>) -> Result<Scheduler> {
async fn busy(mut self, state: State<Busy>, previous: NodeState) -> Result<Self> {
self.emit_state_update_if_changed(StateUpdateEvent::Busy)
.await?;

Expand All @@ -255,7 +268,7 @@ impl Agent {
// that is done, this sleep should be removed.
time::sleep(BUSY_DELAY).await;

let mut events = vec![];
let mut events: Vec<WorkerEvent> = vec![];
let updated = state
.update(&mut events, self.worker_runner.as_mut())
.await?;
Expand All @@ -264,11 +277,15 @@ impl Agent {
self.coordinator.emit_event(event.into()).await?;
}

Ok(updated.into())
Ok(Self {
previous_state: previous,
scheduler: Some(updated.into()),
..self
})
}

async fn done(&mut self, state: State<Done>) -> Result<Scheduler> {
debug!("agent done");
async fn done(self, state: State<Done>, previous: NodeState) -> Result<Self> {
info!("agent done");
set_done_lock(self.machine_id).await?;

let event = match state.cause() {
Expand All @@ -287,23 +304,41 @@ impl Agent {

self.emit_state_update_if_changed(event).await?;
// `Done` is a final state.
Ok(state.into())
Ok(Self {
previous_state: previous,
scheduler: Some(state.into()),
..self
})
}

async fn execute_pending_commands(&mut self) -> Result<()> {
async fn execute_pending_commands(mut self) -> Result<Self> {
let result = self.coordinator.poll_commands().await;

match &result {
Ok(None) => {}
Ok(None) => Ok(Self {
last_poll_command: result,
..self
}),
Ok(Some(cmd)) => {
info!("agent received node command: {:?}", cmd);
let managed = self.managed;
self.scheduler()?.execute_command(cmd, managed).await?;
let scheduler = self.scheduler.take().ok_or_else(scheduler_error)?;
let new_scheduler = scheduler.execute_command(cmd.clone(), managed).await?;

Ok(Self {
last_poll_command: result,
scheduler: Some(new_scheduler),
..self
})
}
Err(PollCommandError::RequestFailed(err)) => {
// If we failed to request commands, this could be the service
// could be down. Log it, but keep going.
error!("error polling the service for commands: {:?}", err);
Ok(Self {
last_poll_command: result,
..self
})
}
Err(PollCommandError::RequestParseFailed(err)) => {
bail!("poll commands failed: {:?}", err);
Expand All @@ -321,22 +356,18 @@ impl Agent {
bail!("repeated command claim attempt failures: {:?}", err);
}
error!("error claiming command from the service: {:?}", err);
Ok(Self {
last_poll_command: result,
..self
})
}
}

self.last_poll_command = result;

Ok(())
}

async fn sleep(&mut self) {
async fn sleep(&self) {
let delay = time::Duration::from_secs(30);
time::sleep(delay).await;
}

fn scheduler(&mut self) -> Result<&mut Scheduler> {
self.scheduler.as_mut().ok_or_else(scheduler_error)
}
}

// The agent owns a `Scheduler`, which it must consume when driving its state
Expand Down
28 changes: 14 additions & 14 deletions src/agent/onefuzz-agent/src/agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ impl Fixture {

#[tokio::test]
async fn test_update_free_no_work() {
let mut agent = Fixture.agent();
let agent = Fixture.agent();

let done = agent.update().await.unwrap();
let (agent, done) = agent.update().await.unwrap();
assert!(!done);

assert!(matches!(agent.scheduler().unwrap(), Scheduler::Free(..)));
assert!(matches!(agent.scheduler.unwrap(), Scheduler::Free(..)));

let double: &WorkQueueDouble = agent.work_queue.downcast_ref().unwrap();
let claimed_worksets = double
Expand All @@ -109,13 +109,9 @@ async fn test_update_free_has_work() {
.available
.push(Fixture.message());

let done = agent.update().await.unwrap();
let (agent, done) = agent.update().await.unwrap();
assert!(!done);

assert!(matches!(
agent.scheduler().unwrap(),
Scheduler::SettingUp(..)
));
assert!(matches!(agent.scheduler.unwrap(), Scheduler::SettingUp(..)));

let double: &WorkQueueDouble = agent.work_queue.downcast_ref().unwrap();
let claimed_worksets = double
Expand Down Expand Up @@ -149,8 +145,10 @@ async fn test_emitted_state() {
.available
.push(Fixture.message());

let mut done;
for _i in 0..10 {
if agent.update().await.unwrap() {
(agent, done) = agent.update().await.unwrap();
if done {
break;
}
}
Expand Down Expand Up @@ -181,8 +179,8 @@ async fn test_emitted_state() {
}),
];
let coordinator: &CoordinatorDouble = agent.coordinator.downcast_ref().unwrap();
let events = &coordinator.events;
assert_eq!(events, &expected_events);
let events = &coordinator.events.read().await;
assert_eq!(&events.to_vec(), &expected_events);
}

#[tokio::test]
Expand All @@ -206,8 +204,10 @@ async fn test_emitted_state_failed_setup() {
.available
.push(Fixture.message());

let mut done;
for _i in 0..10 {
if agent.update().await.unwrap() {
(agent, done) = agent.update().await.unwrap();
if done {
break;
}
}
Expand All @@ -223,7 +223,7 @@ async fn test_emitted_state_failed_setup() {
}),
];
let coordinator: &CoordinatorDouble = agent.coordinator.downcast_ref().unwrap();
let events = &coordinator.events;
let events = &coordinator.events.read().await.to_vec();
assert_eq!(events, &expected_events);

// TODO: at some point, the underlying tests should be updated to not write
Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz-agent/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const ONEFUZZ_SERVICE_USER: &str = "onefuzz";
#[cfg(target_family = "windows")]
static SET_PERMISSION_ONCE: OnceCell<()> = OnceCell::const_new();

#[derive(Debug, Deserialize, Eq, PartialEq, Serialize)]
#[derive(Debug, Deserialize, Eq, PartialEq, Serialize, Clone)]
pub struct SshKeyInfo {
pub public_key: Secret<String>,
}
Expand Down
17 changes: 10 additions & 7 deletions src/agent/onefuzz-agent/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,12 @@ impl Registration {
pub async fn load_existing(config: StaticConfig) -> Result<Self> {
let dynamic_config = DynamicConfig::load().await?;
let machine_id = config.machine_identity.machine_id;
let mut registration = Self {
let registration = Self {
config,
dynamic_config,
machine_id,
};
registration.renew().await?;
Ok(registration)
registration.renew().await
}

pub async fn create_managed(config: StaticConfig) -> Result<Self> {
Expand All @@ -336,7 +335,7 @@ impl Registration {
Self::create(config, false, DEFAULT_REGISTRATION_CREATE_TIMEOUT).await
}

pub async fn renew(&mut self) -> Result<()> {
pub async fn renew(&self) -> Result<Self> {
info!("renewing registration");
let token = self.config.credentials.access_token().await?;

Expand All @@ -355,9 +354,13 @@ impl Registration {
.await
.context("Registration.renew request body")?;

self.dynamic_config = response.json().await?;
self.dynamic_config.save().await?;
let dynamic_config: DynamicConfig = response.json().await?;
dynamic_config.save().await?;

Ok(())
Ok(Self {
dynamic_config,
config: self.config.clone(),
machine_id: self.machine_id,
})
}
}
Loading