Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Link VMSS nodes and tasks when setting up #43

Merged
merged 26 commits into from
Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e40ed93
Refactor state update events
ranweiler Sep 23, 2020
9a2a968
Update models, event handler
ranweiler Sep 23, 2020
134cd38
Fix worker event enum model
ranweiler Sep 23, 2020
8228137
Fix enum repr of worker event
ranweiler Sep 23, 2020
10df008
Fix up comment
ranweiler Sep 23, 2020
3c978f0
Merge remote-tracking branch 'microsoft/main' into event-types
ranweiler Sep 28, 2020
954705a
Add event shim to support hot upgrades
ranweiler Sep 28, 2020
05e8aa8
Fix imports
ranweiler Sep 28, 2020
f06aaf2
Support data for `setting_up` state update
ranweiler Sep 29, 2020
a874c23
Support agent-side `setting_up` event data
ranweiler Sep 29, 2020
2906198
Pass entire state update to handler
ranweiler Sep 29, 2020
b2c0ebb
Add `setting_up` state to `Task`, `NodeTasks`
ranweiler Sep 29, 2020
2984059
Set task, node tasks `setting_up` state
ranweiler Sep 29, 2020
f81c8ea
Add comments
ranweiler Sep 29, 2020
9bff106
Merge branch 'main' into setting-up
bmc-msft Sep 29, 2020
afa1e68
Merge remote-tracking branch 'microsoft/main' into setting-up
ranweiler Sep 29, 2020
b18ffad
Merge branch 'setting-up' of github.com:ranweiler/onefuzz into settin…
ranweiler Sep 29, 2020
c8e83f4
Add missing return
ranweiler Sep 29, 2020
7a75dc7
Linting
ranweiler Sep 29, 2020
54c0373
Fix node state num
ranweiler Sep 29, 2020
7b6f00d
Emit new style of state update events
ranweiler Sep 29, 2020
7017d87
Format
ranweiler Sep 29, 2020
41f62eb
Run formatter
ranweiler Sep 29, 2020
6ddc054
Don't update task state if running
ranweiler Sep 29, 2020
63f4200
Merge remote-tracking branch 'microsoft/main' into setting-up
ranweiler Sep 29, 2020
17c7e20
Fix comment widths
ranweiler Sep 29, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 39 additions & 10 deletions src/agent/onefuzz-supervisor/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub struct ClaimNodeCommandRequest {
}

#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
#[serde(rename_all = "snake_case", tag = "state", content = "data")]
pub enum NodeState {
Init,
Free,
Expand All @@ -65,21 +65,49 @@ pub struct NodeEventEnvelope {
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case", untagged)]
#[serde(rename_all = "snake_case")]
pub enum NodeEvent {
StateUpdate { state: NodeState },
WorkerEvent { event: WorkerEvent },
StateUpdate(StateUpdateEvent),
WorkerEvent(WorkerEvent),
}

impl From<NodeState> for NodeEvent {
fn from(state: NodeState) -> Self {
NodeEvent::StateUpdate { state }
impl From<WorkerEvent> for NodeEvent {
fn from(event: WorkerEvent) -> Self {
NodeEvent::WorkerEvent(event)
}
}

impl From<WorkerEvent> for NodeEvent {
fn from(event: WorkerEvent) -> Self {
NodeEvent::WorkerEvent { event }
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum StateUpdateEvent {
Init,
Free,
SettingUp { tasks: Vec<TaskId> },
Rebooting,
Ready,
Busy,
Done,
}

impl From<StateUpdateEvent> for NodeEvent {
fn from(event: StateUpdateEvent) -> Self {
NodeEvent::StateUpdate(event)
}
}

impl From<NodeState> for NodeEvent {
fn from(state: NodeState) -> Self {
let event = match state {
NodeState::Init => StateUpdateEvent::Init,
NodeState::Free => StateUpdateEvent::Free,
NodeState::SettingUp => StateUpdateEvent::SettingUp,
NodeState::Rebooting => StateUpdateEvent::Rebooting,
NodeState::Ready => StateUpdateEvent::Ready,
NodeState::Busy => StateUpdateEvent::Busy,
NodeState::Done => StateUpdateEvent::Done,
};

event.into()
}
}

Expand All @@ -89,6 +117,7 @@ pub enum TaskState {
Init,
Waiting,
Scheduled,
SettingUp,
Running,
Stopping,
Stopped,
Expand Down
4 changes: 2 additions & 2 deletions src/agent/onefuzz-supervisor/src/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ fn debug_node_event(opt: NodeEventOpt) -> Result<()> {
}

fn debug_node_event_state_update(state: NodeState) -> Result<()> {
let event = NodeEvent::StateUpdate { state };
let event = state.into();
print_json(into_envelope(event))
}

Expand Down Expand Up @@ -95,7 +95,7 @@ fn debug_node_event_worker_event(opt: WorkerEventOpt) -> Result<()> {
}
}
};
let event = NodeEvent::WorkerEvent { event };
let event = NodeEvent::WorkerEvent(event);

print_json(into_envelope(event))
}
Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz-supervisor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::process::*;
use crate::work::*;

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case", untagged)]
#[serde(rename_all = "snake_case")]
pub enum WorkerEvent {
Running {
task_id: TaskId,
Expand Down
72 changes: 55 additions & 17 deletions src/api-service/__app__/agent_events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
from onefuzztypes.enums import ErrorCode, NodeState, NodeTaskState, TaskState
from onefuzztypes.models import (
Error,
NodeEvent,
NodeEventEnvelope,
NodeStateUpdate,
WorkerDoneEvent,
WorkerEvent,
WorkerRunningEvent,
)
from onefuzztypes.responses import BoolResult

Expand Down Expand Up @@ -42,50 +41,79 @@ def get_node_checked(machine_id: UUID) -> Node:
return node


def on_state_update(machine_id: UUID, state: NodeState) -> func.HttpResponse:
def on_state_update(
machine_id: UUID,
state_update: NodeStateUpdate,
) -> func.HttpResponse:
state = state_update.state
node = get_node_checked(machine_id)

if state == NodeState.init or node.state not in NodeState.ready_for_reset():
if node.state != state:
node.state = state
node.save()

if state == NodeState.setting_up:
# This field will be required in the future.
# For now, it is optional for back compat.
if state_update.data:
for task_id in state_update.data.tasks:
task = get_task_checked(task_id)
task.state = TaskState.setting_up
ranweiler marked this conversation as resolved.
Show resolved Hide resolved

# We don't yet call `on_start()` for the task.
ranweiler marked this conversation as resolved.
Show resolved Hide resolved
# This will happen once we see a worker event that
# reports it as `running`.
task.save()

node_task = NodeTasks(
machine_id=machine_id,
task_id=task_id,
state=NodeTaskState.setting_up,
)
node_task.save()
else:
logging.info("ignoring state updates from the node: %s: %s", machine_id, state)

return ok(BoolResult(result=True))


def on_worker_event(machine_id: UUID, worker_event: WorkerEvent) -> func.HttpResponse:
task_id = worker_event.event.task_id
def on_worker_event(machine_id: UUID, event: WorkerEvent) -> func.HttpResponse:
if event.running:
task_id = event.running.task_id
elif event.done:
task_id = event.done.task_id

task = get_task_checked(task_id)
node = get_node_checked(machine_id)
node_task = NodeTasks(
machine_id=machine_id, task_id=task_id, state=NodeTaskState.running
)

if isinstance(worker_event.event, WorkerRunningEvent):
if event.running:
if task.state not in TaskState.shutting_down():
task.state = TaskState.running
if node.state not in NodeState.ready_for_reset():
node.state = NodeState.busy
node_task.save()
task.on_start()
elif isinstance(worker_event.event, WorkerDoneEvent):
# only record exit status if the task isn't already shutting down.
elif event.done:
# Only record exit status if the task isn't already shutting down.
#
# the agent failing because resources vanish out from underneath it during
# deletion is OK
# It's ok for the agent to fail because resources vanish out from underneath
# it during deletion.
if task.state not in TaskState.shutting_down():
exit_status = worker_event.event.exit_status
exit_status = event.done.exit_status

if not exit_status.success:
logging.error("task failed: status = %s", exit_status)

task.error = Error(
code=ErrorCode.TASK_FAILED,
errors=[
"task failed. exit_status = %s" % exit_status,
worker_event.event.stdout,
worker_event.event.stderr,
event.done.stdout,
event.done.stderr,
],
)

Expand All @@ -103,7 +131,7 @@ def on_worker_event(machine_id: UUID, worker_event: WorkerEvent) -> func.HttpRes
task.save()
node.save()
task_event = TaskEvent(
task_id=task_id, machine_id=machine_id, event_data=worker_event
task_id=task_id, machine_id=machine_id, event_data=event
)
task_event.save()
return ok(BoolResult(result=True))
Expand All @@ -120,10 +148,20 @@ def post(req: func.HttpRequest) -> func.HttpResponse:
envelope.event,
)

if isinstance(envelope.event, NodeStateUpdate):
return on_state_update(envelope.machine_id, envelope.event.state)
if isinstance(envelope.event, NodeEvent):
event = envelope.event
elif isinstance(envelope.event, NodeStateUpdate):
event = NodeEvent(state_update=envelope.event)
elif isinstance(envelope.event, WorkerEvent):
return on_worker_event(envelope.machine_id, envelope.event)
event = NodeEvent(worker_event=envelope.event)
else:
err = Error(code=ErrorCode.INVALID_REQUEST, errors=["invalid node event"])
return not_ok(err, context=ERROR_CONTEXT)

if event.state_update:
return on_state_update(envelope.machine_id, event.state_update)
elif event.worker_event:
return on_worker_event(envelope.machine_id, event.worker_event)
else:
err = Error(code=ErrorCode.INVALID_REQUEST, errors=["invalid node event"])
return not_ok(err, context=ERROR_CONTEXT)
Expand Down
37 changes: 20 additions & 17 deletions src/api-service/__app__/onefuzzlib/task_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,7 @@
from uuid import UUID

from onefuzztypes.models import TaskEvent as BASE_TASK_EVENT
from onefuzztypes.models import (
TaskEventSummary,
WorkerDoneEvent,
WorkerEvent,
WorkerRunningEvent,
)
from onefuzztypes.models import TaskEventSummary, WorkerEvent

from .orm import ORMMixin

Expand All @@ -26,8 +21,8 @@ def get_summary(cls, task_id: UUID) -> List[TaskEventSummary]:
return [
TaskEventSummary(
timestamp=e.Timestamp,
event_data=cls.get_event_data(e.event_data),
event_type=type(e.event_data.event).__name__,
event_data=get_event_data(e.event_data),
event_type=get_event_type(e.event_data),
)
for e in events
]
Expand All @@ -36,12 +31,20 @@ def get_summary(cls, task_id: UUID) -> List[TaskEventSummary]:
def key_fields(cls) -> Tuple[str, Optional[str]]:
return ("task_id", None)

@classmethod
def get_event_data(cls, worker_event: WorkerEvent) -> str:
event = worker_event.event
if isinstance(event, WorkerDoneEvent):
return "exit status: %s" % event.exit_status
elif isinstance(event, WorkerRunningEvent):
return ""
else:
return "Unrecognized event: %s" % event

def get_event_data(event: WorkerEvent) -> str:
if event.done:
return "exit status: %s" % event.done.exit_status
elif event.running:
return ""
else:
return "Unrecognized event: %s" % event


def get_event_type(event: WorkerEvent) -> str:
if event.done:
return type(event.done).__name__
elif event.running:
return type(event.running).__name__
else:
return "Unrecognized event: %s" % event
2 changes: 2 additions & 0 deletions src/pytypes/onefuzztypes/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class TaskState(Enum):
init = "init"
waiting = "waiting"
scheduled = "scheduled"
setting_up = "setting_up"
running = "running"
stopping = "stopping"
stopped = "stopped"
Expand Down Expand Up @@ -286,6 +287,7 @@ class Architecture(Enum):

class NodeTaskState(Enum):
init = "init"
setting_up = "setting_up"
running = "running"


Expand Down
31 changes: 27 additions & 4 deletions src/pytypes/onefuzztypes/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,20 +494,43 @@ class WorkerDoneEvent(BaseModel):
stdout: str


class WorkerEvent(BaseModel):
event: Union[WorkerDoneEvent, WorkerRunningEvent]
class WorkerEvent(EnumModel):
done: Optional[WorkerDoneEvent]
running: Optional[WorkerRunningEvent]


class SettingUpEventData(BaseModel):
tasks: List[UUID]


class NodeStateUpdate(BaseModel):
state: NodeState
data: Optional[SettingUpEventData]

@validator("data")
def check_data(cls, data: Optional[SettingUpEventData], values: Any) -> Optional[SettingUpEventData]:
if data:
state = values.get("state")
if state and state != NodeState.setting_up:
raise ValueError("data for node state update event does not match state = %s" % state)

data
ranweiler marked this conversation as resolved.
Show resolved Hide resolved


NodeEvent = Union[WorkerEvent, NodeStateUpdate]
class NodeEvent(EnumModel):
state_update: Optional[NodeStateUpdate]
worker_event: Optional[WorkerEvent]


# Union type to support hot upgrade of 1.0.0 nodes.
#
# We want future variants to use an externally-tagged repr.
NodeEventShim = Union[NodeEvent, WorkerEvent, NodeStateUpdate]


class NodeEventEnvelope(BaseModel):
machine_id: UUID
event: NodeEvent
event: NodeEventShim


class StopNodeCommand(BaseModel):
Expand Down