Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Store the heartbeat data in the task and node tables #164

Merged
merged 18 commits into from
Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/api-service/__app__/node/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from onefuzztypes.requests import NodeGet, NodeSearch
from onefuzztypes.responses import BoolResult

from ..onefuzzlib.heartbeat import NodeHeartbeat
from ..onefuzzlib.pools import Node, NodeTasks
from ..onefuzzlib.request import not_ok, ok, parse_request

Expand All @@ -32,7 +31,6 @@ def get(req: func.HttpRequest) -> func.HttpResponse:

node_tasks = NodeTasks.get_by_machine_id(request.machine_id)
node.tasks = [(t.task_id, t.state) for t in node_tasks]
node.heartbeats = NodeHeartbeat.get_heartbeats(request.machine_id)

return ok(node)

Expand Down
97 changes: 0 additions & 97 deletions src/api-service/__app__/onefuzzlib/heartbeat.py

This file was deleted.

18 changes: 14 additions & 4 deletions src/api-service/__app__/queue_node_heartbeat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,32 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import datetime
import json
import logging

import azure.functions as func
from onefuzztypes.models import NodeHeartbeatEntry
from pydantic import ValidationError

from ..onefuzzlib.dashboard import get_event
from ..onefuzzlib.heartbeat import NodeHeartbeat
from ..onefuzzlib.pools import Node


def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None:
body = msg.get_body()
logging.info("heartbeat: %s", body)

raw = json.loads(body)

NodeHeartbeat.try_add(raw)
try:
entry = NodeHeartbeatEntry.parse_obj(raw)
node = Node.get_by_machine_id(entry.node_id)
if not node:
logging.error("invalid node id: %s", entry.node_id)
return
node.heartbeat = datetime.datetime.utcnow()
node.save()
except ValidationError:
logging.error("invalid node heartbeat: %s", raw)

event = get_event()
if event:
Expand Down
18 changes: 15 additions & 3 deletions src/api-service/__app__/queue_task_heartbeat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,32 @@

import json
import logging
from datetime import datetime

import azure.functions as func
from onefuzztypes.models import Error, TaskHeartbeatEntry
from pydantic import ValidationError

from ..onefuzzlib.dashboard import get_event
from ..onefuzzlib.heartbeat import TaskHeartbeat
from ..onefuzzlib.tasks.main import Task


def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None:
body = msg.get_body()
logging.info("heartbeat: %s", body)

raw = json.loads(body)

TaskHeartbeat.try_add(raw)
try:
entry = TaskHeartbeatEntry.parse_obj(raw)
task = Task.get_by_task_id(entry.task_id)
if isinstance(task, Error):
logging.error(task)
return
if task:
task.heartbeat = datetime.utcnow()
task.save()
except ValidationError:
logging.error("invalid task heartbat: %s", raw)

event = get_event()
if event:
Expand Down
2 changes: 0 additions & 2 deletions src/api-service/__app__/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from onefuzztypes.requests import TaskGet, TaskSearch
from onefuzztypes.responses import BoolResult

from ..onefuzzlib.heartbeat import TaskHeartbeat
from ..onefuzzlib.jobs import Job
from ..onefuzzlib.pools import NodeTasks
from ..onefuzzlib.request import not_ok, ok, parse_request
Expand Down Expand Up @@ -72,7 +71,6 @@ def get(req: func.HttpRequest) -> func.HttpResponse:
task = Task.get_by_task_id(request.task_id)
if isinstance(task, Error):
return not_ok(task, context=request.task_id)
task.heartbeats = TaskHeartbeat.get_heartbeats(task.task_id)
task.nodes = NodeTasks.get_node_assignments(request.task_id)
task.events = TaskEvent.get_summary(request.task_id)
return ok(task)
Expand Down
28 changes: 2 additions & 26 deletions src/pytypes/onefuzztypes/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,42 +429,18 @@ class TaskHeartbeatEntry(BaseModel):
data: List[Dict[str, HeartbeatType]]


class TaskHeartbeatSummary(BaseModel):
machine_id: UUID
timestamp: Optional[datetime]
type: HeartbeatType


class TaskHeartbeat(BaseModel):
task_id: UUID
heartbeat_id: str
machine_id: UUID
heartbeat_type: HeartbeatType


class NodeHeartbeatEntry(BaseModel):
node_id: UUID
data: List[Dict[str, HeartbeatType]]


class NodeHeartbeatSummary(BaseModel):
timestamp: Optional[datetime]
type: HeartbeatType


class NodeHeartbeat(BaseModel):
heartbeat_id: str
node_id: UUID
heartbeat_type: HeartbeatType


class Node(BaseModel):
pool_name: PoolName
machine_id: UUID
state: NodeState = Field(default=NodeState.init)
scaleset_id: Optional[UUID] = None
tasks: Optional[List[Tuple[UUID, NodeTaskState]]] = None
heartbeats: Optional[List[NodeHeartbeatSummary]]
heartbeat: Optional[datetime]
version: str = Field(default="1.0.0")
reimage_requested: bool = Field(default=False)
delete_requested: bool = Field(default=False)
Expand Down Expand Up @@ -712,7 +688,7 @@ class Task(BaseModel):
config: TaskConfig
error: Optional[Error]
auth: Optional[Authentication]
heartbeats: Optional[List[TaskHeartbeatSummary]]
heartbeat: Optional[datetime]
end_time: Optional[datetime]
events: Optional[List[TaskEventSummary]]
nodes: Optional[List[NodeAssignment]]