Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Commit

Permalink
storing the heartbeat in the task and node tables
Browse files Browse the repository at this point in the history
  • Loading branch information
chkeita committed Oct 16, 2020
1 parent eee0e58 commit b12325a
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 110 deletions.
2 changes: 0 additions & 2 deletions src/api-service/__app__/node/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from onefuzztypes.requests import NodeGet, NodeSearch
from onefuzztypes.responses import BoolResult

from ..onefuzzlib.heartbeat import NodeHeartbeat
from ..onefuzzlib.pools import Node, NodeTasks
from ..onefuzzlib.request import not_ok, ok, parse_request

Expand All @@ -32,7 +31,6 @@ def get(req: func.HttpRequest) -> func.HttpResponse:

node_tasks = NodeTasks.get_by_machine_id(request.machine_id)
node.tasks = [(t.task_id, t.state) for t in node_tasks]
node.heartbeats = NodeHeartbeat.get_heartbeats(request.machine_id)

return ok(node)

Expand Down
97 changes: 0 additions & 97 deletions src/api-service/__app__/onefuzzlib/heartbeat.py

This file was deleted.

10 changes: 10 additions & 0 deletions src/api-service/__app__/onefuzzlib/pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,16 @@ def set_halt(self) -> None:
self.set_shutdown()
self.stop()

@classmethod
def try_add_heartbeat(cls, raw: Dict) -> bool:
return True
# try:
# entry = NodeHeartbeatEntry.parse_obj(raw)
# cls.add(entry)
# return True
# except ValidationError:
# return False


class NodeTasks(BASE_NODE_TASK, ORMMixin):
@classmethod
Expand Down
54 changes: 52 additions & 2 deletions src/api-service/__app__/onefuzzlib/tasks/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@

import logging
from datetime import datetime, timedelta
from typing import List, Optional, Tuple, Union
from typing import Dict, List, Optional, Tuple, Union
from uuid import UUID

from onefuzztypes.enums import ErrorCode, TaskState
from onefuzztypes.models import Error
from onefuzztypes.models import Task as BASE_TASK
from onefuzztypes.models import TaskConfig, TaskVm
from onefuzztypes.models import (
TaskConfig,
TaskVm,
TaskHeartbeatEntry,
TaskHeartbeat
)

from ..azure.creds import get_fuzz_storage
from ..azure.image import get_os
Expand All @@ -20,6 +25,8 @@
from ..pools import Node, Pool, Scaleset
from ..proxy_forward import ProxyForward

from pydantic import ValidationError


class Task(BASE_TASK, ORMMixin):
def ready_to_schedule(self) -> bool:
Expand Down Expand Up @@ -140,6 +147,13 @@ def search_expired(cls) -> List["Task"]:
query={"state": TaskState.available()}, raw_unchecked_filter=time_filter
)

@classmethod
def try_get_by_task_id(cls, task_id: UUID) -> Optional["Task"]:
tasks = cls.search(query={"task_id": [task_id]})
if not tasks:
return None
return tasks[0]

@classmethod
def get_by_task_id(cls, task_id: UUID) -> Union[Error, "Task"]:
tasks = cls.search(query={"task_id": [task_id]})
Expand Down Expand Up @@ -261,3 +275,39 @@ def on_start(self) -> None:
@classmethod
def key_fields(cls) -> Tuple[str, str]:
return ("job_id", "task_id")

@classmethod
def try_add_heartbeat(cls, raw: Dict) -> bool:
now = datetime.utcnow()

# class TaskHeartbeatEntry(BaseModel):
# task_id: UUID
# machine_id: UUID
# data: List[Dict[str, HeartbeatType]]

# class TaskHeartbeatSummary(BaseModel):
# machine_id: UUID
# timestamp: Optional[datetime]

try:
entry = TaskHeartbeatEntry.parse_obj(raw)
task = cls.try_get_by_task_id(entry.task_id)
# heartbeats: Optional[Dict[UUID, List[TaskHeartbeatSummary]]]
if task:

if not task.heartbeats:
task.heartbeats = {}
summary = task.heartbeats
for hb in entry.data:
for key in hb:
hb_type = hb[key]

summary[entry.machine_id, hb_type] = TaskHeartbeat(
machine_id=entry.machine_id, timestamp=now, type=hb_type
)

task.save()
# cls.add(entry)P
return True
except ValidationError:
return False
4 changes: 2 additions & 2 deletions src/api-service/__app__/queue_node_heartbeat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import azure.functions as func

from ..onefuzzlib.dashboard import get_event
from ..onefuzzlib.heartbeat import NodeHeartbeat
from ..onefuzzlib.pools import Node


def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None:
Expand All @@ -18,7 +18,7 @@ def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None:

raw = json.loads(body)

NodeHeartbeat.try_add(raw)
Node.try_add_heartbeat(raw)

event = get_event()
if event:
Expand Down
4 changes: 2 additions & 2 deletions src/api-service/__app__/queue_task_heartbeat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import azure.functions as func

from ..onefuzzlib.dashboard import get_event
from ..onefuzzlib.heartbeat import TaskHeartbeat
from ..onefuzzlib.tasks.main import Task


def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None:
Expand All @@ -18,7 +18,7 @@ def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None:

raw = json.loads(body)

TaskHeartbeat.try_add(raw)
Task.try_add_heartbeat(raw)

event = get_event()
if event:
Expand Down
2 changes: 1 addition & 1 deletion src/api-service/__app__/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ requests~=2.24.0
memoization~=0.3.1
github3.py~=1.3.0
# onefuzz types version is set during build
onefuzztypes==0.0.0
onefuzztypes==0.0.0
3 changes: 1 addition & 2 deletions src/api-service/__app__/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from onefuzztypes.requests import TaskGet, TaskSearch
from onefuzztypes.responses import BoolResult

from ..onefuzzlib.heartbeat import TaskHeartbeat
from ..onefuzzlib.jobs import Job
from ..onefuzzlib.pools import NodeTasks
from ..onefuzzlib.request import not_ok, ok, parse_request
Expand Down Expand Up @@ -72,7 +71,7 @@ def get(req: func.HttpRequest) -> func.HttpResponse:
task = Task.get_by_task_id(request.task_id)
if isinstance(task, Error):
return not_ok(task, context=request.task_id)
task.heartbeats = TaskHeartbeat.get_heartbeats(task.task_id)
# task.heartbeats = TaskHeartbeat.get_heartbeats(task.task_id)
task.nodes = NodeTasks.get_node_assignments(request.task_id)
task.events = TaskEvent.get_summary(request.task_id)
return ok(task)
Expand Down
9 changes: 7 additions & 2 deletions src/pytypes/onefuzztypes/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,12 +429,16 @@ class TaskHeartbeatEntry(BaseModel):
data: List[Dict[str, HeartbeatType]]


class TaskHeartbeatSummary(BaseModel):
class TaskHeartbeat(BaseModel):
machine_id: UUID
timestamp: Optional[datetime]
type: HeartbeatType


# class TaskHeartbeatSummary(BaseModel):
# summary:


class TaskHeartbeat(BaseModel):
task_id: UUID
heartbeat_id: str
Expand All @@ -449,6 +453,7 @@ class NodeHeartbeatEntry(BaseModel):

class NodeHeartbeatSummary(BaseModel):
timestamp: Optional[datetime]
node_id: UUID
type: HeartbeatType


Expand Down Expand Up @@ -711,7 +716,7 @@ class Task(BaseModel):
config: TaskConfig
error: Optional[Error]
auth: Optional[Authentication]
heartbeats: Optional[List[TaskHeartbeatSummary]]
heartbeats: Optional[Dict[Tuple[UUID, HeartbeatType], TaskHeartbeat]]
end_time: Optional[datetime]
events: Optional[List[TaskEventSummary]]
nodes: Optional[List[NodeAssignment]]

0 comments on commit b12325a

Please sign in to comment.