From b12325a390bc5b81a79cea61c51de725ba473f05 Mon Sep 17 00:00:00 2001 From: Cheick Keita Date: Thu, 15 Oct 2020 14:51:35 -0700 Subject: [PATCH] storing the heartbeat in the task and node tables --- src/api-service/__app__/node/__init__.py | 2 - .../__app__/onefuzzlib/heartbeat.py | 97 ------------------- src/api-service/__app__/onefuzzlib/pools.py | 10 ++ .../__app__/onefuzzlib/tasks/main.py | 54 ++++++++++- .../__app__/queue_node_heartbeat/__init__.py | 4 +- .../__app__/queue_task_heartbeat/__init__.py | 4 +- src/api-service/__app__/requirements.txt | 2 +- src/api-service/__app__/tasks/__init__.py | 3 +- src/pytypes/onefuzztypes/models.py | 9 +- 9 files changed, 75 insertions(+), 110 deletions(-) delete mode 100644 src/api-service/__app__/onefuzzlib/heartbeat.py diff --git a/src/api-service/__app__/node/__init__.py b/src/api-service/__app__/node/__init__.py index ad68ec136f3..b6dd6447618 100644 --- a/src/api-service/__app__/node/__init__.py +++ b/src/api-service/__app__/node/__init__.py @@ -9,7 +9,6 @@ from onefuzztypes.requests import NodeGet, NodeSearch from onefuzztypes.responses import BoolResult -from ..onefuzzlib.heartbeat import NodeHeartbeat from ..onefuzzlib.pools import Node, NodeTasks from ..onefuzzlib.request import not_ok, ok, parse_request @@ -32,7 +31,6 @@ def get(req: func.HttpRequest) -> func.HttpResponse: node_tasks = NodeTasks.get_by_machine_id(request.machine_id) node.tasks = [(t.task_id, t.state) for t in node_tasks] - node.heartbeats = NodeHeartbeat.get_heartbeats(request.machine_id) return ok(node) diff --git a/src/api-service/__app__/onefuzzlib/heartbeat.py b/src/api-service/__app__/onefuzzlib/heartbeat.py deleted file mode 100644 index ab4b674db96..00000000000 --- a/src/api-service/__app__/onefuzzlib/heartbeat.py +++ /dev/null @@ -1,97 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. - -from typing import Dict, List, Tuple -from uuid import UUID - -from onefuzztypes.models import NodeHeartbeat as BASE_NODE_HEARTBEAT -from onefuzztypes.models import NodeHeartbeatEntry, NodeHeartbeatSummary -from onefuzztypes.models import TaskHeartbeat as BASE_TASK_HEARTBEAT -from onefuzztypes.models import TaskHeartbeatEntry, TaskHeartbeatSummary -from pydantic import ValidationError - -from .orm import ORMMixin - - -class TaskHeartbeat(BASE_TASK_HEARTBEAT, ORMMixin): - @classmethod - def add(cls, entry: TaskHeartbeatEntry) -> None: - for value in entry.data: - heartbeat_id = "-".join([str(entry.machine_id), value["type"].name]) - heartbeat = cls( - task_id=entry.task_id, - heartbeat_id=heartbeat_id, - machine_id=entry.machine_id, - heartbeat_type=value["type"], - ) - heartbeat.save() - - @classmethod - def try_add(cls, raw: Dict) -> bool: - try: - entry = TaskHeartbeatEntry.parse_obj(raw) - cls.add(entry) - return True - except ValidationError: - return False - - @classmethod - def get_heartbeats(cls, task_id: UUID) -> List[TaskHeartbeatSummary]: - entries = cls.search(query={"task_id": [task_id]}) - - result = [] - for entry in entries: - result.append( - TaskHeartbeatSummary( - timestamp=entry.Timestamp, - machine_id=entry.machine_id, - type=entry.heartbeat_type, - ) - ) - return result - - @classmethod - def key_fields(cls) -> Tuple[str, str]: - return ("task_id", "heartbeat_id") - - -class NodeHeartbeat(BASE_NODE_HEARTBEAT, ORMMixin): - @classmethod - def add(cls, entry: NodeHeartbeatEntry) -> None: - for value in entry.data: - heartbeat_id = "-".join([str(entry.node_id), value["type"].name]) - heartbeat = cls( - heartbeat_id=heartbeat_id, - node_id=entry.node_id, - heartbeat_type=value["type"], - ) - heartbeat.save() - - @classmethod - def try_add(cls, raw: Dict) -> bool: - try: - entry = NodeHeartbeatEntry.parse_obj(raw) - cls.add(entry) - return True - except ValidationError: - return False - - @classmethod - def get_heartbeats(cls, node_id: UUID) -> List[NodeHeartbeatSummary]: - entries = cls.search(query={"node_id": [node_id]}) - - result = [] - for entry in entries: - result.append( - NodeHeartbeatSummary( - timestamp=entry.Timestamp, - type=entry.heartbeat_type, - ) - ) - return result - - @classmethod - def key_fields(cls) -> Tuple[str, str]: - return ("node_id", "heartbeat_id") diff --git a/src/api-service/__app__/onefuzzlib/pools.py b/src/api-service/__app__/onefuzzlib/pools.py index 2df8d895ff3..f181498e2e9 100644 --- a/src/api-service/__app__/onefuzzlib/pools.py +++ b/src/api-service/__app__/onefuzzlib/pools.py @@ -244,6 +244,16 @@ def set_halt(self) -> None: self.set_shutdown() self.stop() + @classmethod + def try_add_heartbeat(cls, raw: Dict) -> bool: + return True + # try: + # entry = NodeHeartbeatEntry.parse_obj(raw) + # cls.add(entry) + # return True + # except ValidationError: + # return False + class NodeTasks(BASE_NODE_TASK, ORMMixin): @classmethod diff --git a/src/api-service/__app__/onefuzzlib/tasks/main.py b/src/api-service/__app__/onefuzzlib/tasks/main.py index daff974af75..6a285e4c306 100644 --- a/src/api-service/__app__/onefuzzlib/tasks/main.py +++ b/src/api-service/__app__/onefuzzlib/tasks/main.py @@ -5,13 +5,18 @@ import logging from datetime import datetime, timedelta -from typing import List, Optional, Tuple, Union +from typing import Dict, List, Optional, Tuple, Union from uuid import UUID from onefuzztypes.enums import ErrorCode, TaskState from onefuzztypes.models import Error from onefuzztypes.models import Task as BASE_TASK -from onefuzztypes.models import TaskConfig, TaskVm +from onefuzztypes.models import ( + TaskConfig, + TaskVm, + TaskHeartbeatEntry, + TaskHeartbeat +) from ..azure.creds import get_fuzz_storage from ..azure.image import get_os @@ -20,6 +25,8 @@ from ..pools import Node, Pool, Scaleset from ..proxy_forward import ProxyForward +from pydantic import ValidationError + class Task(BASE_TASK, ORMMixin): def ready_to_schedule(self) -> bool: @@ -140,6 +147,13 @@ def search_expired(cls) -> List["Task"]: query={"state": TaskState.available()}, raw_unchecked_filter=time_filter ) + @classmethod + def try_get_by_task_id(cls, task_id: UUID) -> Optional["Task"]: + tasks = cls.search(query={"task_id": [task_id]}) + if not tasks: + return None + return tasks[0] + @classmethod def get_by_task_id(cls, task_id: UUID) -> Union[Error, "Task"]: tasks = cls.search(query={"task_id": [task_id]}) @@ -261,3 +275,39 @@ def on_start(self) -> None: @classmethod def key_fields(cls) -> Tuple[str, str]: return ("job_id", "task_id") + + @classmethod + def try_add_heartbeat(cls, raw: Dict) -> bool: + now = datetime.utcnow() + +# class TaskHeartbeatEntry(BaseModel): +# task_id: UUID +# machine_id: UUID +# data: List[Dict[str, HeartbeatType]] + +# class TaskHeartbeatSummary(BaseModel): +# machine_id: UUID +# timestamp: Optional[datetime] + + try: + entry = TaskHeartbeatEntry.parse_obj(raw) + task = cls.try_get_by_task_id(entry.task_id) + # heartbeats: Optional[Dict[UUID, List[TaskHeartbeatSummary]]] + if task: + + if not task.heartbeats: + task.heartbeats = {} + summary = task.heartbeats + for hb in entry.data: + for key in hb: + hb_type = hb[key] + + summary[entry.machine_id, hb_type] = TaskHeartbeat( + machine_id=entry.machine_id, timestamp=now, type=hb_type + ) + + task.save() + # cls.add(entry)P + return True + except ValidationError: + return False diff --git a/src/api-service/__app__/queue_node_heartbeat/__init__.py b/src/api-service/__app__/queue_node_heartbeat/__init__.py index 31e81d65719..f2fb907159f 100644 --- a/src/api-service/__app__/queue_node_heartbeat/__init__.py +++ b/src/api-service/__app__/queue_node_heartbeat/__init__.py @@ -9,7 +9,7 @@ import azure.functions as func from ..onefuzzlib.dashboard import get_event -from ..onefuzzlib.heartbeat import NodeHeartbeat +from ..onefuzzlib.pools import Node def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None: @@ -18,7 +18,7 @@ def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None: raw = json.loads(body) - NodeHeartbeat.try_add(raw) + Node.try_add_heartbeat(raw) event = get_event() if event: diff --git a/src/api-service/__app__/queue_task_heartbeat/__init__.py b/src/api-service/__app__/queue_task_heartbeat/__init__.py index ecae1180a3b..33cc78d6ac2 100644 --- a/src/api-service/__app__/queue_task_heartbeat/__init__.py +++ b/src/api-service/__app__/queue_task_heartbeat/__init__.py @@ -9,7 +9,7 @@ import azure.functions as func from ..onefuzzlib.dashboard import get_event -from ..onefuzzlib.heartbeat import TaskHeartbeat +from ..onefuzzlib.tasks.main import Task def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None: @@ -18,7 +18,7 @@ def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None: raw = json.loads(body) - TaskHeartbeat.try_add(raw) + Task.try_add_heartbeat(raw) event = get_event() if event: diff --git a/src/api-service/__app__/requirements.txt b/src/api-service/__app__/requirements.txt index e701f9b6f75..ffc792689c6 100644 --- a/src/api-service/__app__/requirements.txt +++ b/src/api-service/__app__/requirements.txt @@ -33,4 +33,4 @@ requests~=2.24.0 memoization~=0.3.1 github3.py~=1.3.0 # onefuzz types version is set during build -onefuzztypes==0.0.0 \ No newline at end of file +onefuzztypes==0.0.0 diff --git a/src/api-service/__app__/tasks/__init__.py b/src/api-service/__app__/tasks/__init__.py index ec9bc9287e7..1a3ce829cf1 100644 --- a/src/api-service/__app__/tasks/__init__.py +++ b/src/api-service/__app__/tasks/__init__.py @@ -10,7 +10,6 @@ from onefuzztypes.requests import TaskGet, TaskSearch from onefuzztypes.responses import BoolResult -from ..onefuzzlib.heartbeat import TaskHeartbeat from ..onefuzzlib.jobs import Job from ..onefuzzlib.pools import NodeTasks from ..onefuzzlib.request import not_ok, ok, parse_request @@ -72,7 +71,7 @@ def get(req: func.HttpRequest) -> func.HttpResponse: task = Task.get_by_task_id(request.task_id) if isinstance(task, Error): return not_ok(task, context=request.task_id) - task.heartbeats = TaskHeartbeat.get_heartbeats(task.task_id) + # task.heartbeats = TaskHeartbeat.get_heartbeats(task.task_id) task.nodes = NodeTasks.get_node_assignments(request.task_id) task.events = TaskEvent.get_summary(request.task_id) return ok(task) diff --git a/src/pytypes/onefuzztypes/models.py b/src/pytypes/onefuzztypes/models.py index 39399434d1e..8dfcfcb2cc6 100644 --- a/src/pytypes/onefuzztypes/models.py +++ b/src/pytypes/onefuzztypes/models.py @@ -429,12 +429,16 @@ class TaskHeartbeatEntry(BaseModel): data: List[Dict[str, HeartbeatType]] -class TaskHeartbeatSummary(BaseModel): +class TaskHeartbeat(BaseModel): machine_id: UUID timestamp: Optional[datetime] type: HeartbeatType +# class TaskHeartbeatSummary(BaseModel): +# summary: + + class TaskHeartbeat(BaseModel): task_id: UUID heartbeat_id: str @@ -449,6 +453,7 @@ class NodeHeartbeatEntry(BaseModel): class NodeHeartbeatSummary(BaseModel): timestamp: Optional[datetime] + node_id: UUID type: HeartbeatType @@ -711,7 +716,7 @@ class Task(BaseModel): config: TaskConfig error: Optional[Error] auth: Optional[Authentication] - heartbeats: Optional[List[TaskHeartbeatSummary]] + heartbeats: Optional[Dict[Tuple[UUID, HeartbeatType], TaskHeartbeat]] end_time: Optional[datetime] events: Optional[List[TaskEventSummary]] nodes: Optional[List[NodeAssignment]]