Skip to content

Commit

Permalink
Merge pull request #153 from microsoft/main
Browse files Browse the repository at this point in the history
Fork Sync: Update from parent repository
  • Loading branch information
AdamL-Microsoft authored Dec 21, 2022
2 parents 16bb4be + 04f83e0 commit b1a0367
Show file tree
Hide file tree
Showing 23 changed files with 347 additions and 156 deletions.
4 changes: 2 additions & 2 deletions src/agent/coverage/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ use crate::binary::BinaryCoverage;

pub use crate::binary::Count;

#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct SourceCoverage {
pub files: BTreeMap<FilePath, FileCoverage>,
}

#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct FileCoverage {
pub lines: BTreeMap<Line, Count>,
}
Expand Down
135 changes: 83 additions & 52 deletions src/agent/onefuzz-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::reboot::*;
use crate::scheduler::*;
use crate::setup::*;
use crate::work::IWorkQueue;
use crate::worker::IWorkerRunner;
use crate::worker::{IWorkerRunner, WorkerEvent};

const PENDING_COMMANDS_DELAY: time::Duration = time::Duration::from_secs(10);
const BUSY_DELAY: time::Duration = time::Duration::from_secs(1);
Expand Down Expand Up @@ -62,7 +62,7 @@ impl Agent {
}
}

pub async fn run(&mut self) -> Result<()> {
pub async fn run(self) -> Result<()> {
let mut instant = time::Instant::now();

// Tell the service that the agent has started.
Expand All @@ -78,42 +78,39 @@ impl Agent {
let event = StateUpdateEvent::Init.into();
self.coordinator.emit_event(event).await?;
}

loop {
self.heartbeat.alive();
let mut state = self;
let mut done = false;
while !done {
state.heartbeat.alive();
if instant.elapsed() >= PENDING_COMMANDS_DELAY {
self.execute_pending_commands().await?;
state = state.execute_pending_commands().await?;
instant = time::Instant::now();
}

let done = self.update().await?;

if done {
debug!("agent done, exiting loop");
break;
}
(state, done) = state.update().await?;
}

info!("agent done, exiting loop");
Ok(())
}

async fn update(&mut self) -> Result<bool> {
async fn update(mut self) -> Result<(Self, bool)> {
let last = self.scheduler.take().ok_or_else(scheduler_error)?;
let previous_state = NodeState::from(&last);
let (next, done) = match last {
Scheduler::Free(s) => (self.free(s).await?, false),
Scheduler::SettingUp(s) => (self.setting_up(s).await?, false),
Scheduler::PendingReboot(s) => (self.pending_reboot(s).await?, false),
Scheduler::Ready(s) => (self.ready(s).await?, false),
Scheduler::Busy(s) => (self.busy(s).await?, false),
Scheduler::Done(s) => (self.done(s).await?, true),
Scheduler::Free(s) => (self.free(s, previous_state).await?, false),
Scheduler::SettingUp(s) => (self.setting_up(s, previous_state).await?, false),
Scheduler::PendingReboot(s) => (self.pending_reboot(s, previous_state).await?, false),
Scheduler::Ready(s) => (self.ready(s, previous_state).await?, false),
Scheduler::Busy(s) => (self.busy(s, previous_state).await?, false),
//todo: introduce a new prameter to allow the agent to restart after this point
Scheduler::Done(s) => (self.done(s, previous_state).await?, true),
};
self.previous_state = previous_state;
self.scheduler = Some(next);
Ok(done)

Ok((next, done))
}

async fn emit_state_update_if_changed(&mut self, event: StateUpdateEvent) -> Result<()> {
async fn emit_state_update_if_changed(&self, event: StateUpdateEvent) -> Result<()> {
match (&event, self.previous_state) {
(StateUpdateEvent::Free, NodeState::Free)
| (StateUpdateEvent::Busy, NodeState::Busy)
Expand All @@ -129,7 +126,7 @@ impl Agent {
Ok(())
}

async fn free(&mut self, state: State<Free>) -> Result<Scheduler> {
async fn free(mut self, state: State<Free>, previous: NodeState) -> Result<Self> {
self.emit_state_update_if_changed(StateUpdateEvent::Free)
.await?;

Expand Down Expand Up @@ -190,7 +187,7 @@ impl Agent {
// Otherwise, the work was not stopped, but we still should not execute it. This is likely
// our because agent version is out of date. Do nothing, so another node can see the work.
// The service will eventually send us a stop command and reimage our node, if appropriate.
debug!(
info!(
"not scheduling active work set, not dropping: {:?}",
msg.work_set
);
Expand All @@ -205,11 +202,15 @@ impl Agent {
state.into()
};

Ok(next)
Ok(Self {
previous_state: previous,
scheduler: Some(next),
..self
})
}

async fn setting_up(&mut self, state: State<SettingUp>) -> Result<Scheduler> {
debug!("agent setting up");
async fn setting_up(mut self, state: State<SettingUp>, previous: NodeState) -> Result<Self> {
info!("agent setting up");

let tasks = state.work_set().task_ids();
self.emit_state_update_if_changed(StateUpdateEvent::SettingUp { tasks })
Expand All @@ -221,11 +222,19 @@ impl Agent {
SetupDone::Done(s) => s.into(),
};

Ok(scheduler)
Ok(Self {
previous_state: previous,
scheduler: Some(scheduler),
..self
})
}

async fn pending_reboot(&mut self, state: State<PendingReboot>) -> Result<Scheduler> {
debug!("agent pending reboot");
async fn pending_reboot(
self,
state: State<PendingReboot>,
_previous: NodeState,
) -> Result<Self> {
info!("agent pending reboot");
self.emit_state_update_if_changed(StateUpdateEvent::Rebooting)
.await?;

Expand All @@ -236,14 +245,18 @@ impl Agent {
unreachable!()
}

async fn ready(&mut self, state: State<Ready>) -> Result<Scheduler> {
debug!("agent ready");
async fn ready(self, state: State<Ready>, previous: NodeState) -> Result<Self> {
info!("agent ready");
self.emit_state_update_if_changed(StateUpdateEvent::Ready)
.await?;
Ok(state.run().await?.into())
Ok(Self {
previous_state: previous,
scheduler: Some(state.run().await?.into()),
..self
})
}

async fn busy(&mut self, state: State<Busy>) -> Result<Scheduler> {
async fn busy(mut self, state: State<Busy>, previous: NodeState) -> Result<Self> {
self.emit_state_update_if_changed(StateUpdateEvent::Busy)
.await?;

Expand All @@ -255,7 +268,7 @@ impl Agent {
// that is done, this sleep should be removed.
time::sleep(BUSY_DELAY).await;

let mut events = vec![];
let mut events: Vec<WorkerEvent> = vec![];
let updated = state
.update(&mut events, self.worker_runner.as_mut())
.await?;
Expand All @@ -264,11 +277,15 @@ impl Agent {
self.coordinator.emit_event(event.into()).await?;
}

Ok(updated.into())
Ok(Self {
previous_state: previous,
scheduler: Some(updated.into()),
..self
})
}

async fn done(&mut self, state: State<Done>) -> Result<Scheduler> {
debug!("agent done");
async fn done(self, state: State<Done>, previous: NodeState) -> Result<Self> {
info!("agent done");
set_done_lock(self.machine_id).await?;

let event = match state.cause() {
Expand All @@ -287,23 +304,41 @@ impl Agent {

self.emit_state_update_if_changed(event).await?;
// `Done` is a final state.
Ok(state.into())
Ok(Self {
previous_state: previous,
scheduler: Some(state.into()),
..self
})
}

async fn execute_pending_commands(&mut self) -> Result<()> {
async fn execute_pending_commands(mut self) -> Result<Self> {
let result = self.coordinator.poll_commands().await;

match &result {
Ok(None) => {}
Ok(None) => Ok(Self {
last_poll_command: result,
..self
}),
Ok(Some(cmd)) => {
info!("agent received node command: {:?}", cmd);
let managed = self.managed;
self.scheduler()?.execute_command(cmd, managed).await?;
let scheduler = self.scheduler.take().ok_or_else(scheduler_error)?;
let new_scheduler = scheduler.execute_command(cmd.clone(), managed).await?;

Ok(Self {
last_poll_command: result,
scheduler: Some(new_scheduler),
..self
})
}
Err(PollCommandError::RequestFailed(err)) => {
// If we failed to request commands, this could be the service
// could be down. Log it, but keep going.
error!("error polling the service for commands: {:?}", err);
Ok(Self {
last_poll_command: result,
..self
})
}
Err(PollCommandError::RequestParseFailed(err)) => {
bail!("poll commands failed: {:?}", err);
Expand All @@ -321,22 +356,18 @@ impl Agent {
bail!("repeated command claim attempt failures: {:?}", err);
}
error!("error claiming command from the service: {:?}", err);
Ok(Self {
last_poll_command: result,
..self
})
}
}

self.last_poll_command = result;

Ok(())
}

async fn sleep(&mut self) {
async fn sleep(&self) {
let delay = time::Duration::from_secs(30);
time::sleep(delay).await;
}

fn scheduler(&mut self) -> Result<&mut Scheduler> {
self.scheduler.as_mut().ok_or_else(scheduler_error)
}
}

// The agent owns a `Scheduler`, which it must consume when driving its state
Expand Down
28 changes: 14 additions & 14 deletions src/agent/onefuzz-agent/src/agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ impl Fixture {

#[tokio::test]
async fn test_update_free_no_work() {
let mut agent = Fixture.agent();
let agent = Fixture.agent();

let done = agent.update().await.unwrap();
let (agent, done) = agent.update().await.unwrap();
assert!(!done);

assert!(matches!(agent.scheduler().unwrap(), Scheduler::Free(..)));
assert!(matches!(agent.scheduler.unwrap(), Scheduler::Free(..)));

let double: &WorkQueueDouble = agent.work_queue.downcast_ref().unwrap();
let claimed_worksets = double
Expand All @@ -109,13 +109,9 @@ async fn test_update_free_has_work() {
.available
.push(Fixture.message());

let done = agent.update().await.unwrap();
let (agent, done) = agent.update().await.unwrap();
assert!(!done);

assert!(matches!(
agent.scheduler().unwrap(),
Scheduler::SettingUp(..)
));
assert!(matches!(agent.scheduler.unwrap(), Scheduler::SettingUp(..)));

let double: &WorkQueueDouble = agent.work_queue.downcast_ref().unwrap();
let claimed_worksets = double
Expand Down Expand Up @@ -149,8 +145,10 @@ async fn test_emitted_state() {
.available
.push(Fixture.message());

let mut done;
for _i in 0..10 {
if agent.update().await.unwrap() {
(agent, done) = agent.update().await.unwrap();
if done {
break;
}
}
Expand Down Expand Up @@ -181,8 +179,8 @@ async fn test_emitted_state() {
}),
];
let coordinator: &CoordinatorDouble = agent.coordinator.downcast_ref().unwrap();
let events = &coordinator.events;
assert_eq!(events, &expected_events);
let events = &coordinator.events.read().await;
assert_eq!(&events.to_vec(), &expected_events);
}

#[tokio::test]
Expand All @@ -206,8 +204,10 @@ async fn test_emitted_state_failed_setup() {
.available
.push(Fixture.message());

let mut done;
for _i in 0..10 {
if agent.update().await.unwrap() {
(agent, done) = agent.update().await.unwrap();
if done {
break;
}
}
Expand All @@ -223,7 +223,7 @@ async fn test_emitted_state_failed_setup() {
}),
];
let coordinator: &CoordinatorDouble = agent.coordinator.downcast_ref().unwrap();
let events = &coordinator.events;
let events = &coordinator.events.read().await.to_vec();
assert_eq!(events, &expected_events);

// TODO: at some point, the underlying tests should be updated to not write
Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz-agent/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const ONEFUZZ_SERVICE_USER: &str = "onefuzz";
#[cfg(target_family = "windows")]
static SET_PERMISSION_ONCE: OnceCell<()> = OnceCell::const_new();

#[derive(Debug, Deserialize, Eq, PartialEq, Serialize)]
#[derive(Debug, Deserialize, Eq, PartialEq, Serialize, Clone)]
pub struct SshKeyInfo {
pub public_key: Secret<String>,
}
Expand Down
Loading

0 comments on commit b1a0367

Please sign in to comment.