Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Refactor internal node event schemas #29

Merged
merged 12 commits into from
Sep 29, 2020
46 changes: 37 additions & 9 deletions src/agent/onefuzz-supervisor/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,49 @@ pub struct NodeEventEnvelope {
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case", untagged)]
#[serde(rename_all = "snake_case")]
pub enum NodeEvent {
StateUpdate { state: NodeState },
WorkerEvent { event: WorkerEvent },
StateUpdate(StateUpdateEvent),
ranweiler marked this conversation as resolved.
Show resolved Hide resolved
WorkerEvent(WorkerEvent),
}

impl From<NodeState> for NodeEvent {
fn from(state: NodeState) -> Self {
NodeEvent::StateUpdate { state }
impl From<WorkerEvent> for NodeEvent {
fn from(event: WorkerEvent) -> Self {
NodeEvent::WorkerEvent(event)
}
}

impl From<WorkerEvent> for NodeEvent {
fn from(event: WorkerEvent) -> Self {
NodeEvent::WorkerEvent { event }
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case", tag = "state")]
pub enum StateUpdateEvent {
ranweiler marked this conversation as resolved.
Show resolved Hide resolved
Init,
Free,
SettingUp,
Rebooting,
Ready,
Busy,
Done,
}

impl From<StateUpdateEvent> for NodeEvent {
fn from(event: StateUpdateEvent) -> Self {
NodeEvent::StateUpdate(event)
}
}

impl From<NodeState> for NodeEvent {
fn from(state: NodeState) -> Self {
let event = match state {
NodeState::Init => StateUpdateEvent::Init,
NodeState::Free => StateUpdateEvent::Free,
NodeState::SettingUp => StateUpdateEvent::SettingUp,
NodeState::Rebooting => StateUpdateEvent::Rebooting,
NodeState::Ready => StateUpdateEvent::Ready,
NodeState::Busy => StateUpdateEvent::Busy,
NodeState::Done => StateUpdateEvent::Done,
};

event.into()
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/agent/onefuzz-supervisor/src/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ fn debug_node_event(opt: NodeEventOpt) -> Result<()> {
}

fn debug_node_event_state_update(state: NodeState) -> Result<()> {
let event = NodeEvent::StateUpdate { state };
let event = state.into();
print_json(into_envelope(event))
}

Expand Down Expand Up @@ -95,7 +95,7 @@ fn debug_node_event_worker_event(opt: WorkerEventOpt) -> Result<()> {
}
}
};
let event = NodeEvent::WorkerEvent { event };
let event = NodeEvent::WorkerEvent(event);

print_json(into_envelope(event))
}
Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz-supervisor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::process::*;
use crate::work::*;

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case", untagged)]
#[serde(rename_all = "snake_case")]
pub enum WorkerEvent {
Running {
task_id: TaskId,
Expand Down
46 changes: 30 additions & 16 deletions src/api-service/__app__/agent_events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
from onefuzztypes.enums import ErrorCode, NodeState, NodeTaskState, TaskState
from onefuzztypes.models import (
Error,
NodeEvent,
NodeEventEnvelope,
NodeStateUpdate,
WorkerDoneEvent,
WorkerEvent,
WorkerRunningEvent,
)
from onefuzztypes.responses import BoolResult

Expand Down Expand Up @@ -55,37 +54,42 @@ def on_state_update(machine_id: UUID, state: NodeState) -> func.HttpResponse:
return ok(BoolResult(result=True))


def on_worker_event(machine_id: UUID, worker_event: WorkerEvent) -> func.HttpResponse:
task_id = worker_event.event.task_id
def on_worker_event(machine_id: UUID, event: WorkerEvent) -> func.HttpResponse:
if event.running:
task_id = event.running.task_id
elif event.done:
task_id = event.done.task_id

task = get_task_checked(task_id)
node = get_node_checked(machine_id)
node_task = NodeTasks(
machine_id=machine_id, task_id=task_id, state=NodeTaskState.running
)

if isinstance(worker_event.event, WorkerRunningEvent):
if event.running:
if task.state not in TaskState.shutting_down():
task.state = TaskState.running
if node.state not in NodeState.ready_for_reset():
node.state = NodeState.busy
node_task.save()
task.on_start()
elif isinstance(worker_event.event, WorkerDoneEvent):
# only record exit status if the task isn't already shutting down.
elif event.done:
# Only record exit status if the task isn't already shutting down.
#
# the agent failing because resources vanish out from underneath it during
# deletion is OK
# It's ok for the agent to fail because resources vanish out from underneath
# it during deletion.
if task.state not in TaskState.shutting_down():
exit_status = worker_event.event.exit_status
exit_status = event.done.exit_status

if not exit_status.success:
logging.error("task failed: status = %s", exit_status)

task.error = Error(
code=ErrorCode.TASK_FAILED,
errors=[
"task failed. exit_status = %s" % exit_status,
worker_event.event.stdout,
worker_event.event.stderr,
event.done.stdout,
event.done.stderr,
],
)

Expand All @@ -103,7 +107,7 @@ def on_worker_event(machine_id: UUID, worker_event: WorkerEvent) -> func.HttpRes
task.save()
node.save()
task_event = TaskEvent(
task_id=task_id, machine_id=machine_id, event_data=worker_event
task_id=task_id, machine_id=machine_id, event_data=event
)
task_event.save()
return ok(BoolResult(result=True))
Expand All @@ -120,10 +124,20 @@ def post(req: func.HttpRequest) -> func.HttpResponse:
envelope.event,
)

if isinstance(envelope.event, NodeStateUpdate):
return on_state_update(envelope.machine_id, envelope.event.state)
if isinstance(envelope.event, NodeEvent):
event = envelope.event
elif isinstance(envelope.event, NodeStateUpdate):
event = NodeEvent(state_update=envelope.event)
elif isinstance(envelope.event, WorkerEvent):
return on_worker_event(envelope.machine_id, envelope.event)
event = NodeEvent(worker_event=envelope.event)
else:
err = Error(code=ErrorCode.INVALID_REQUEST, errors=["invalid node event"])
return not_ok(err, context=ERROR_CONTEXT)

if event.state_update:
return on_state_update(envelope.machine_id, event.state_update.state)
elif event.worker_event:
return on_worker_event(envelope.machine_id, event.worker_event)
else:
err = Error(code=ErrorCode.INVALID_REQUEST, errors=["invalid node event"])
return not_ok(err, context=ERROR_CONTEXT)
Expand Down
37 changes: 20 additions & 17 deletions src/api-service/__app__/onefuzzlib/task_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,7 @@
from uuid import UUID

from onefuzztypes.models import TaskEvent as BASE_TASK_EVENT
from onefuzztypes.models import (
TaskEventSummary,
WorkerDoneEvent,
WorkerEvent,
WorkerRunningEvent,
)
from onefuzztypes.models import TaskEventSummary, WorkerEvent

from .orm import ORMMixin

Expand All @@ -26,8 +21,8 @@ def get_summary(cls, task_id: UUID) -> List[TaskEventSummary]:
return [
TaskEventSummary(
timestamp=e.Timestamp,
event_data=cls.get_event_data(e.event_data),
event_type=type(e.event_data.event).__name__,
event_data=get_event_data(e.event_data),
event_type=get_event_type(e.event_data),
)
for e in events
]
Expand All @@ -36,12 +31,20 @@ def get_summary(cls, task_id: UUID) -> List[TaskEventSummary]:
def key_fields(cls) -> Tuple[str, Optional[str]]:
return ("task_id", None)

@classmethod
def get_event_data(cls, worker_event: WorkerEvent) -> str:
event = worker_event.event
if isinstance(event, WorkerDoneEvent):
return "exit status: %s" % event.exit_status
elif isinstance(event, WorkerRunningEvent):
return ""
else:
return "Unrecognized event: %s" % event

def get_event_data(event: WorkerEvent) -> str:
ranweiler marked this conversation as resolved.
Show resolved Hide resolved
if event.done:
return "exit status: %s" % event.done.exit_status
elif event.running:
return ""
else:
return "Unrecognized event: %s" % event


def get_event_type(event: WorkerEvent) -> str:
if event.done:
return type(event.done).__name__
elif event.running:
return type(event.running).__name__
else:
return "Unrecognized event: %s" % event
17 changes: 13 additions & 4 deletions src/pytypes/onefuzztypes/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,20 +495,29 @@ class WorkerDoneEvent(BaseModel):
stdout: str


class WorkerEvent(BaseModel):
event: Union[WorkerDoneEvent, WorkerRunningEvent]
class WorkerEvent(EnumModel):
done: Optional[WorkerDoneEvent]
running: Optional[WorkerRunningEvent]


class NodeStateUpdate(BaseModel):
state: NodeState


NodeEvent = Union[WorkerEvent, NodeStateUpdate]
class NodeEvent(EnumModel):
worker_event: Optional[WorkerEvent]
ranweiler marked this conversation as resolved.
Show resolved Hide resolved
state_update: Optional[NodeStateUpdate]


# Temporary shim type to support hot upgrade of 1.0.0 nodes.
#
# We want future variants to use an externally-tagged repr.
NodeEventShim = Union[NodeEvent, WorkerEvent, NodeStateUpdate]


class NodeEventEnvelope(BaseModel):
machine_id: UUID
event: NodeEvent
event: NodeEventShim


class StopNodeCommand(BaseModel):
Expand Down