Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Store the heartbeat data in the task and node tables #164

Merged
merged 18 commits into from
Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/api-service/__app__/node/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from onefuzztypes.requests import NodeGet, NodeSearch
from onefuzztypes.responses import BoolResult

from ..onefuzzlib.heartbeat import NodeHeartbeat
from ..onefuzzlib.pools import Node, NodeTasks
from ..onefuzzlib.request import not_ok, ok, parse_request

Expand All @@ -32,7 +31,6 @@ def get(req: func.HttpRequest) -> func.HttpResponse:

node_tasks = NodeTasks.get_by_machine_id(request.machine_id)
node.tasks = [(t.task_id, t.state) for t in node_tasks]
node.heartbeats = NodeHeartbeat.get_heartbeats(request.machine_id)

return ok(node)

Expand Down
97 changes: 0 additions & 97 deletions src/api-service/__app__/onefuzzlib/heartbeat.py

This file was deleted.

20 changes: 18 additions & 2 deletions src/api-service/__app__/onefuzzlib/pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
)
from onefuzztypes.models import AutoScaleConfig, Error
from onefuzztypes.models import Node as BASE_NODE
from onefuzztypes.models import NodeAssignment, NodeCommand
from onefuzztypes.models import NodeAssignment, NodeCommand, NodeHeartbeatEntry
from onefuzztypes.models import NodeTasks as BASE_NODE_TASK
from onefuzztypes.models import Pool as BASE_POOL
from onefuzztypes.models import Scaleset as BASE_SCALESET
Expand All @@ -31,7 +31,7 @@
WorkUnitSummary,
)
from onefuzztypes.primitives import PoolName, Region
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, ValidationError

from .__version__ import __version__
from .azure.auth import build_auth
Expand Down Expand Up @@ -244,6 +244,22 @@ def set_halt(self) -> None:
self.set_shutdown()
self.stop()

@classmethod
def try_add_heartbeat(cls, raw: Dict) -> bool:
try:
entry = NodeHeartbeatEntry.parse_obj(raw)
chkeita marked this conversation as resolved.
Show resolved Hide resolved
if not entry:
return False

node = cls.get_by_machine_id(entry.node_id)
if not node:
return False
node.heartbeat = datetime.datetime.utcnow()
node.save()
return True
except ValidationError:
return False


class NodeTasks(BASE_NODE_TASK, ORMMixin):
@classmethod
Expand Down
24 changes: 22 additions & 2 deletions src/api-service/__app__/onefuzzlib/tasks/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@

import logging
from datetime import datetime, timedelta
from typing import List, Optional, Tuple, Union
from typing import Dict, List, Optional, Tuple, Union
from uuid import UUID

from onefuzztypes.enums import ErrorCode, TaskState
from onefuzztypes.models import Error
from onefuzztypes.models import Task as BASE_TASK
from onefuzztypes.models import TaskConfig, TaskVm
from onefuzztypes.models import TaskConfig, TaskHeartbeatEntry, TaskVm
from pydantic import ValidationError

from ..azure.creds import get_fuzz_storage
from ..azure.image import get_os
Expand Down Expand Up @@ -140,6 +141,13 @@ def search_expired(cls) -> List["Task"]:
query={"state": TaskState.available()}, raw_unchecked_filter=time_filter
)

@classmethod
def try_get_by_task_id(cls, task_id: UUID) -> Optional["Task"]:
chkeita marked this conversation as resolved.
Show resolved Hide resolved
tasks = cls.search(query={"task_id": [task_id]})
if not tasks:
return None
return tasks[0]

@classmethod
def get_by_task_id(cls, task_id: UUID) -> Union[Error, "Task"]:
tasks = cls.search(query={"task_id": [task_id]})
Expand Down Expand Up @@ -261,3 +269,15 @@ def on_start(self) -> None:
@classmethod
def key_fields(cls) -> Tuple[str, str]:
return ("job_id", "task_id")

@classmethod
def try_add_heartbeat(cls, raw: Dict) -> bool:
try:
entry = TaskHeartbeatEntry.parse_obj(raw)
task = cls.try_get_by_task_id(entry.task_id)
if task:
task.heartbeat = datetime.utcnow()
task.save()
return True
except ValidationError:
return False
4 changes: 2 additions & 2 deletions src/api-service/__app__/queue_node_heartbeat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import azure.functions as func

from ..onefuzzlib.dashboard import get_event
from ..onefuzzlib.heartbeat import NodeHeartbeat
from ..onefuzzlib.pools import Node


def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None:
Expand All @@ -18,7 +18,7 @@ def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None:

raw = json.loads(body)

NodeHeartbeat.try_add(raw)
Node.try_add_heartbeat(raw)

event = get_event()
if event:
Expand Down
4 changes: 2 additions & 2 deletions src/api-service/__app__/queue_task_heartbeat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import azure.functions as func

from ..onefuzzlib.dashboard import get_event
from ..onefuzzlib.heartbeat import TaskHeartbeat
from ..onefuzzlib.tasks.main import Task


def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None:
Expand All @@ -18,7 +18,7 @@ def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None:

raw = json.loads(body)

TaskHeartbeat.try_add(raw)
Task.try_add_heartbeat(raw)

event = get_event()
if event:
Expand Down
2 changes: 1 addition & 1 deletion src/api-service/__app__/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ requests~=2.24.0
memoization~=0.3.1
github3.py~=1.3.0
# onefuzz types version is set during build
onefuzztypes==0.0.0
onefuzztypes==0.0.0
2 changes: 0 additions & 2 deletions src/api-service/__app__/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from onefuzztypes.requests import TaskGet, TaskSearch
from onefuzztypes.responses import BoolResult

from ..onefuzzlib.heartbeat import TaskHeartbeat
from ..onefuzzlib.jobs import Job
from ..onefuzzlib.pools import NodeTasks
from ..onefuzzlib.request import not_ok, ok, parse_request
Expand Down Expand Up @@ -72,7 +71,6 @@ def get(req: func.HttpRequest) -> func.HttpResponse:
task = Task.get_by_task_id(request.task_id)
if isinstance(task, Error):
return not_ok(task, context=request.task_id)
task.heartbeats = TaskHeartbeat.get_heartbeats(task.task_id)
task.nodes = NodeTasks.get_node_assignments(request.task_id)
task.events = TaskEvent.get_summary(request.task_id)
return ok(task)
Expand Down
28 changes: 2 additions & 26 deletions src/pytypes/onefuzztypes/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,42 +429,18 @@ class TaskHeartbeatEntry(BaseModel):
data: List[Dict[str, HeartbeatType]]


class TaskHeartbeatSummary(BaseModel):
machine_id: UUID
timestamp: Optional[datetime]
type: HeartbeatType


class TaskHeartbeat(BaseModel):
task_id: UUID
heartbeat_id: str
machine_id: UUID
heartbeat_type: HeartbeatType


class NodeHeartbeatEntry(BaseModel):
node_id: UUID
data: List[Dict[str, HeartbeatType]]


class NodeHeartbeatSummary(BaseModel):
timestamp: Optional[datetime]
type: HeartbeatType


class NodeHeartbeat(BaseModel):
heartbeat_id: str
node_id: UUID
heartbeat_type: HeartbeatType


class Node(BaseModel):
pool_name: PoolName
machine_id: UUID
state: NodeState = Field(default=NodeState.init)
scaleset_id: Optional[UUID] = None
tasks: Optional[List[Tuple[UUID, NodeTaskState]]] = None
heartbeats: Optional[List[NodeHeartbeatSummary]]
heartbeat: Optional[datetime]
version: str = Field(default="1.0.0")
reimage_requested: bool = Field(default=False)
delete_requested: bool = Field(default=False)
Expand Down Expand Up @@ -711,7 +687,7 @@ class Task(BaseModel):
config: TaskConfig
error: Optional[Error]
auth: Optional[Authentication]
heartbeats: Optional[List[TaskHeartbeatSummary]]
heartbeat: Optional[datetime]
end_time: Optional[datetime]
events: Optional[List[TaskEventSummary]]
nodes: Optional[List[NodeAssignment]]