Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Refactor internal node event schemas #29

Merged
merged 12 commits into from
Sep 29, 2020
46 changes: 37 additions & 9 deletions src/agent/onefuzz-supervisor/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,49 @@ pub struct NodeEventEnvelope {
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case", untagged)]
#[serde(rename_all = "snake_case")]
pub enum NodeEvent {
StateUpdate { state: NodeState },
WorkerEvent { event: WorkerEvent },
StateUpdate(StateUpdateEvent),
ranweiler marked this conversation as resolved.
Show resolved Hide resolved
WorkerEvent(WorkerEvent),
}

impl From<NodeState> for NodeEvent {
fn from(state: NodeState) -> Self {
NodeEvent::StateUpdate { state }
impl From<WorkerEvent> for NodeEvent {
fn from(event: WorkerEvent) -> Self {
NodeEvent::WorkerEvent(event)
}
}

impl From<WorkerEvent> for NodeEvent {
fn from(event: WorkerEvent) -> Self {
NodeEvent::WorkerEvent { event }
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case", tag = "state")]
pub enum StateUpdateEvent {
ranweiler marked this conversation as resolved.
Show resolved Hide resolved
Init,
Free,
SettingUp,
Rebooting,
Ready,
Busy,
Done,
}

impl From<StateUpdateEvent> for NodeEvent {
fn from(event: StateUpdateEvent) -> Self {
NodeEvent::StateUpdate(event)
}
}

impl From<NodeState> for NodeEvent {
fn from(state: NodeState) -> Self {
let event = match state {
NodeState::Init => StateUpdateEvent::Init,
NodeState::Free => StateUpdateEvent::Free,
NodeState::SettingUp => StateUpdateEvent::SettingUp,
NodeState::Rebooting => StateUpdateEvent::Rebooting,
NodeState::Ready => StateUpdateEvent::Ready,
NodeState::Busy => StateUpdateEvent::Busy,
NodeState::Done => StateUpdateEvent::Done,
};

event.into()
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/agent/onefuzz-supervisor/src/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn debug_node_event(opt: NodeEventOpt) -> Result<()> {
}

fn debug_node_event_state_update(state: NodeState) -> Result<()> {
let event = NodeEvent::StateUpdate { state };
let event = state.into();
print_json(into_envelope(event))
}

Expand Down Expand Up @@ -94,7 +94,7 @@ fn debug_node_event_worker_event(opt: WorkerEventOpt) -> Result<()> {
}
}
};
let event = NodeEvent::WorkerEvent { event };
let event = NodeEvent::WorkerEvent(event);

print_json(into_envelope(event))
}
Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz-supervisor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::process::*;
use crate::work::*;

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case", untagged)]
#[serde(rename_all = "snake_case")]
pub enum WorkerEvent {
Running {
task_id: TaskId,
Expand Down
44 changes: 21 additions & 23 deletions src/api-service/__app__/agent_events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,7 @@

import azure.functions as func
from onefuzztypes.enums import ErrorCode, NodeState, NodeTaskState, TaskState
from onefuzztypes.models import (
Error,
NodeEventEnvelope,
NodeStateUpdate,
WorkerDoneEvent,
WorkerEvent,
WorkerRunningEvent,
)
from onefuzztypes.models import Error, NodeEventEnvelope, WorkerEvent
from onefuzztypes.responses import BoolResult

from ..onefuzzlib.agent_authorization import verify_token
Expand Down Expand Up @@ -55,37 +48,42 @@ def on_state_update(machine_id: UUID, state: NodeState) -> func.HttpResponse:
return ok(BoolResult(result=True))


def on_worker_event(machine_id: UUID, worker_event: WorkerEvent) -> func.HttpResponse:
task_id = worker_event.event.task_id
def on_worker_event(machine_id: UUID, event: WorkerEvent) -> func.HttpResponse:
if event.running:
task_id = event.running.task_id
elif event.done:
task_id = event.done.task_id

task = get_task_checked(task_id)
node = get_node_checked(machine_id)
node_task = NodeTasks(
machine_id=machine_id, task_id=task_id, state=NodeTaskState.running
)

if isinstance(worker_event.event, WorkerRunningEvent):
if event.running:
if task.state not in TaskState.shutting_down():
task.state = TaskState.running
if node.state not in NodeState.ready_for_reset():
node.state = NodeState.busy
node_task.save()
task.on_start()
elif isinstance(worker_event.event, WorkerDoneEvent):
# only record exit status if the task isn't already shutting down.
elif event.done:
# Only record exit status if the task isn't already shutting down.
#
# the agent failing because resources vanish out from underneath it during
# deletion is OK
# It's ok for the agent to fail because resources vanish out from underneath
# it during deletion.
if task.state not in TaskState.shutting_down():
exit_status = worker_event.event.exit_status
exit_status = event.done.exit_status

if not exit_status.success:
logging.error("task failed: status = %s", exit_status)

task.error = Error(
code=ErrorCode.TASK_FAILED,
errors=[
"task failed. exit_status = %s" % exit_status,
worker_event.event.stdout,
worker_event.event.stderr,
event.done.stdout,
event.done.stderr,
],
)

Expand All @@ -103,7 +101,7 @@ def on_worker_event(machine_id: UUID, worker_event: WorkerEvent) -> func.HttpRes
task.save()
node.save()
task_event = TaskEvent(
task_id=task_id, machine_id=machine_id, event_data=worker_event
task_id=task_id, machine_id=machine_id, event_data=event
)
task_event.save()
return ok(BoolResult(result=True))
Expand All @@ -120,10 +118,10 @@ def post(req: func.HttpRequest) -> func.HttpResponse:
envelope.event,
)

if isinstance(envelope.event, NodeStateUpdate):
return on_state_update(envelope.machine_id, envelope.event.state)
elif isinstance(envelope.event, WorkerEvent):
return on_worker_event(envelope.machine_id, envelope.event)
if envelope.event.state_update:
return on_state_update(envelope.machine_id, envelope.event.state_update.state)
elif envelope.event.worker_event:
return on_worker_event(envelope.machine_id, envelope.event.worker_event)
else:
err = Error(code=ErrorCode.INVALID_REQUEST, errors=["invalid node event"])
return not_ok(err, context=ERROR_CONTEXT)
Expand Down
37 changes: 20 additions & 17 deletions src/api-service/__app__/onefuzzlib/task_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,7 @@
from uuid import UUID

from onefuzztypes.models import TaskEvent as BASE_TASK_EVENT
from onefuzztypes.models import (
TaskEventSummary,
WorkerDoneEvent,
WorkerEvent,
WorkerRunningEvent,
)
from onefuzztypes.models import TaskEventSummary, WorkerEvent

from .orm import ORMMixin

Expand All @@ -26,8 +21,8 @@ def get_summary(cls, task_id: UUID) -> List[TaskEventSummary]:
return [
TaskEventSummary(
timestamp=e.Timestamp,
event_data=cls.get_event_data(e.event_data),
event_type=type(e.event_data.event).__name__,
event_data=get_event_data(e.event_data),
event_type=get_event_type(e.event_data),
)
for e in events
]
Expand All @@ -36,12 +31,20 @@ def get_summary(cls, task_id: UUID) -> List[TaskEventSummary]:
def key_fields(cls) -> Tuple[str, Optional[str]]:
return ("task_id", None)

@classmethod
def get_event_data(cls, worker_event: WorkerEvent) -> str:
event = worker_event.event
if isinstance(event, WorkerDoneEvent):
return "exit status: %s" % event.exit_status
elif isinstance(event, WorkerRunningEvent):
return ""
else:
return "Unrecognized event: %s" % event

def get_event_data(event: WorkerEvent) -> str:
ranweiler marked this conversation as resolved.
Show resolved Hide resolved
if event.done:
return "exit status: %s" % event.done.exit_status
elif event.running:
return ""
else:
return "Unrecognized event: %s" % event


def get_event_type(event: WorkerEvent) -> str:
if event.done:
return type(event.done).__name__
elif event.running:
return type(event.running).__name__
else:
return "Unrecognized event: %s" % event
34 changes: 28 additions & 6 deletions src/pytypes/onefuzztypes/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
# Licensed under the MIT License.

from datetime import datetime
from typing import Dict, List, Optional, Tuple, Union
from typing import Any, Dict, List, Optional, Tuple, Union
from uuid import UUID, uuid4

from pydantic import BaseModel, Field, validator
from pydantic import BaseModel, Field, root_validator, validator

from .consts import ONE_HOUR, SEVEN_DAYS
from .enums import (
Expand All @@ -32,6 +32,24 @@
from .primitives import Container, PoolName, Region


class EnumModel(BaseModel):
@root_validator(pre=True)
def exactly_one(cls: Any, values: Any) -> Any:
some = []

for field, val in values.items():
if val is not None:
some.append(field)
ranweiler marked this conversation as resolved.
Show resolved Hide resolved

if not some:
raise ValueError('no variant set for enum')

if len(some) > 1:
raise ValueError('multiple values set for enum: %s' % some)

return values


class Error(BaseModel):
code: ErrorCode
errors: List[str]
Expand Down Expand Up @@ -476,15 +494,18 @@ class WorkerDoneEvent(BaseModel):
stdout: str


class WorkerEvent(BaseModel):
event: Union[WorkerDoneEvent, WorkerRunningEvent]
class WorkerEvent(EnumModel):
done: Optional[WorkerDoneEvent]
running: Optional[WorkerRunningEvent]


class NodeStateUpdate(BaseModel):
state: NodeState


NodeEvent = Union[WorkerEvent, NodeStateUpdate]
class NodeEvent(EnumModel):
worker_event: Optional[WorkerEvent]
ranweiler marked this conversation as resolved.
Show resolved Hide resolved
state_update: Optional[NodeStateUpdate]


class NodeEventEnvelope(BaseModel):
Expand All @@ -496,7 +517,8 @@ class NodeCommandStopTask(BaseModel):
task_id: UUID


NodeCommand = Union[NodeCommandStopTask]
class NodeCommand(EnumModel):
stop_task: NodeCommandStopTask


class NodeCommandEnvelope(BaseModel):
Expand Down