diff --git a/src/agent/onefuzz-supervisor/src/agent.rs b/src/agent/onefuzz-supervisor/src/agent.rs index 0ac7cf29f5..6ded0f6adf 100644 --- a/src/agent/onefuzz-supervisor/src/agent.rs +++ b/src/agent/onefuzz-supervisor/src/agent.rs @@ -54,7 +54,8 @@ impl Agent { // `Free`. If it has started up after a work set-requested reboot, the // state will be `Ready`. if let Some(Scheduler::Free(..)) = &self.scheduler { - self.coordinator.emit_event(NodeState::Init.into()).await?; + let event = StateUpdateEvent::Init.into(); + self.coordinator.emit_event(event).await?; } loop { @@ -94,7 +95,8 @@ impl Agent { } async fn free(&mut self, state: State) -> Result { - self.coordinator.emit_event(NodeState::Free.into()).await?; + let event = StateUpdateEvent::Free.into(); + self.coordinator.emit_event(event).await?; let msg = self.work_queue.poll().await?; @@ -165,9 +167,9 @@ impl Agent { async fn setting_up(&mut self, state: State) -> Result { verbose!("agent setting up"); - self.coordinator - .emit_event(NodeState::SettingUp.into()) - .await?; + let tasks = state.work_set().task_ids(); + let event = StateUpdateEvent::SettingUp { tasks }; + self.coordinator.emit_event(event.into()).await?; let scheduler = match state.finish(self.setup_runner.as_mut()).await? { SetupDone::Ready(s) => s.into(), @@ -180,9 +182,8 @@ impl Agent { async fn pending_reboot(&mut self, state: State) -> Result { verbose!("agent pending reboot"); - self.coordinator - .emit_event(NodeState::Rebooting.into()) - .await?; + let event = StateUpdateEvent::Rebooting.into(); + self.coordinator.emit_event(event).await?; let ctx = state.reboot_context(); self.reboot.save_context(ctx).await?; @@ -194,13 +195,15 @@ impl Agent { async fn ready(&mut self, state: State) -> Result { verbose!("agent ready"); - self.coordinator.emit_event(NodeState::Ready.into()).await?; + let event = StateUpdateEvent::Ready.into(); + self.coordinator.emit_event(event).await?; Ok(state.run().await?.into()) } async fn busy(&mut self, state: State) -> Result { - self.coordinator.emit_event(NodeState::Busy.into()).await?; + let event = StateUpdateEvent::Busy.into(); + self.coordinator.emit_event(event).await?; let mut events = vec![]; let updated = state @@ -217,7 +220,8 @@ impl Agent { async fn done(&mut self, state: State) -> Result { verbose!("agent done"); - self.coordinator.emit_event(NodeState::Done.into()).await?; + let event = StateUpdateEvent::Done.into(); + self.coordinator.emit_event(event).await?; // `Done` is a final state. Ok(state.into()) diff --git a/src/agent/onefuzz-supervisor/src/coordinator.rs b/src/agent/onefuzz-supervisor/src/coordinator.rs index d784b5cfe8..9f494ab866 100644 --- a/src/agent/onefuzz-supervisor/src/coordinator.rs +++ b/src/agent/onefuzz-supervisor/src/coordinator.rs @@ -78,11 +78,11 @@ impl From for NodeEvent { } #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] -#[serde(rename_all = "snake_case", tag = "state")] +#[serde(rename_all = "snake_case", tag = "state", content = "data")] pub enum StateUpdateEvent { Init, Free, - SettingUp, + SettingUp { tasks: Vec }, Rebooting, Ready, Busy, @@ -95,28 +95,13 @@ impl From for NodeEvent { } } -impl From for NodeEvent { - fn from(state: NodeState) -> Self { - let event = match state { - NodeState::Init => StateUpdateEvent::Init, - NodeState::Free => StateUpdateEvent::Free, - NodeState::SettingUp => StateUpdateEvent::SettingUp, - NodeState::Rebooting => StateUpdateEvent::Rebooting, - NodeState::Ready => StateUpdateEvent::Ready, - NodeState::Busy => StateUpdateEvent::Busy, - NodeState::Done => StateUpdateEvent::Done, - }; - - event.into() - } -} - #[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)] #[serde(rename_all = "snake_case")] pub enum TaskState { Init, Waiting, Scheduled, + SettingUp, Running, Stopping, Stopped, diff --git a/src/agent/onefuzz-supervisor/src/debug.rs b/src/agent/onefuzz-supervisor/src/debug.rs index 308d954028..879fb10194 100644 --- a/src/agent/onefuzz-supervisor/src/debug.rs +++ b/src/agent/onefuzz-supervisor/src/debug.rs @@ -50,7 +50,19 @@ fn debug_node_event(opt: NodeEventOpt) -> Result<()> { } fn debug_node_event_state_update(state: NodeState) -> Result<()> { - let event = state.into(); + let event = match state { + NodeState::Init => StateUpdateEvent::Init, + NodeState::Free => StateUpdateEvent::Free, + NodeState::SettingUp => { + let tasks = vec![Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4()]; + StateUpdateEvent::SettingUp { tasks } + } + NodeState::Rebooting => StateUpdateEvent::Rebooting, + NodeState::Ready => StateUpdateEvent::Ready, + NodeState::Busy => StateUpdateEvent::Busy, + NodeState::Done => StateUpdateEvent::Done, + }; + let event = event.into(); print_json(into_envelope(event)) } diff --git a/src/agent/onefuzz-supervisor/src/scheduler.rs b/src/agent/onefuzz-supervisor/src/scheduler.rs index 4906d7f584..2980fb2fda 100644 --- a/src/agent/onefuzz-supervisor/src/scheduler.rs +++ b/src/agent/onefuzz-supervisor/src/scheduler.rs @@ -153,6 +153,10 @@ impl State { Ok(done) } + + pub fn work_set(&self) -> &WorkSet { + &self.ctx.work_set + } } impl State { diff --git a/src/agent/onefuzz-supervisor/src/work.rs b/src/agent/onefuzz-supervisor/src/work.rs index 91e2e08006..da5aaf8801 100644 --- a/src/agent/onefuzz-supervisor/src/work.rs +++ b/src/agent/onefuzz-supervisor/src/work.rs @@ -24,6 +24,12 @@ pub struct WorkSet { pub work_units: Vec, } +impl WorkSet { + pub fn task_ids(&self) -> Vec { + self.work_units.iter().map(|w| w.task_id).collect() + } +} + #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] pub struct WorkUnit { /// Job that the work is part of. diff --git a/src/api-service/__app__/agent_events/__init__.py b/src/api-service/__app__/agent_events/__init__.py index f359bed9d5..7a4b956747 100644 --- a/src/api-service/__app__/agent_events/__init__.py +++ b/src/api-service/__app__/agent_events/__init__.py @@ -41,13 +41,47 @@ def get_node_checked(machine_id: UUID) -> Node: return node -def on_state_update(machine_id: UUID, state: NodeState) -> func.HttpResponse: +def on_state_update( + machine_id: UUID, + state_update: NodeStateUpdate, +) -> func.HttpResponse: + state = state_update.state node = get_node_checked(machine_id) if state == NodeState.init or node.state not in NodeState.ready_for_reset(): if node.state != state: node.state = state node.save() + + if state == NodeState.setting_up: + # This field will be required in the future. + # For now, it is optional for back compat. + if state_update.data: + for task_id in state_update.data.tasks: + task = get_task_checked(task_id) + + # The task state may be `running` if it has `vm_count` > 1, and + # another node is concurrently executing the task. If so, leave + # the state as-is, to represent the max progress made. + # + # Other states we would want to preserve are excluded by the + # outermost conditional check. + if task.state != TaskState.running: + task.state = TaskState.setting_up + + # We don't yet call `on_start()` for the task. + # This will happen once we see a worker event that + # reports it as `running`. + task.save() + + # Note: we set the node task state to `setting_up`, even though + # the task itself may be `running`. + node_task = NodeTasks( + machine_id=machine_id, + task_id=task_id, + state=NodeTaskState.setting_up, + ) + node_task.save() else: logging.info("ignoring state updates from the node: %s: %s", machine_id, state) @@ -133,7 +167,7 @@ def post(req: func.HttpRequest) -> func.HttpResponse: return not_ok(err, context=ERROR_CONTEXT) if event.state_update: - return on_state_update(envelope.machine_id, event.state_update.state) + return on_state_update(envelope.machine_id, event.state_update) elif event.worker_event: return on_worker_event(envelope.machine_id, event.worker_event) else: diff --git a/src/pytypes/onefuzztypes/enums.py b/src/pytypes/onefuzztypes/enums.py index 08a3097c90..8934332e88 100644 --- a/src/pytypes/onefuzztypes/enums.py +++ b/src/pytypes/onefuzztypes/enums.py @@ -110,6 +110,7 @@ class TaskState(Enum): init = "init" waiting = "waiting" scheduled = "scheduled" + setting_up = "setting_up" running = "running" stopping = "stopping" stopped = "stopped" @@ -286,6 +287,7 @@ class Architecture(Enum): class NodeTaskState(Enum): init = "init" + setting_up = "setting_up" running = "running" diff --git a/src/pytypes/onefuzztypes/models.py b/src/pytypes/onefuzztypes/models.py index aafa287273..50c22b510d 100644 --- a/src/pytypes/onefuzztypes/models.py +++ b/src/pytypes/onefuzztypes/models.py @@ -500,13 +500,33 @@ class WorkerEvent(EnumModel): running: Optional[WorkerRunningEvent] +class SettingUpEventData(BaseModel): + tasks: List[UUID] + + class NodeStateUpdate(BaseModel): state: NodeState + data: Optional[SettingUpEventData] + + @validator("data") + def check_data( + cls, + data: Optional[SettingUpEventData], + values: Any, + ) -> Optional[SettingUpEventData]: + if data: + state = values.get("state") + if state and state != NodeState.setting_up: + raise ValueError( + "data for node state update event does not match state = %s" % state + ) + + return data class NodeEvent(EnumModel): - worker_event: Optional[WorkerEvent] state_update: Optional[NodeStateUpdate] + worker_event: Optional[WorkerEvent] # Temporary shim type to support hot upgrade of 1.0.0 nodes.