From de4e7d898a43ec169a325b5756d9603d191926ed Mon Sep 17 00:00:00 2001 From: Cheick Keita Date: Wed, 14 Oct 2020 17:08:19 -0700 Subject: [PATCH 1/5] Reimage dead nodes --- .../__app__/onefuzzlib/heartbeat.py | 26 ++++++++++++++++++- src/api-service/__app__/onefuzzlib/pools.py | 5 ++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/src/api-service/__app__/onefuzzlib/heartbeat.py b/src/api-service/__app__/onefuzzlib/heartbeat.py index ab4b674db9..11133331bd 100644 --- a/src/api-service/__app__/onefuzzlib/heartbeat.py +++ b/src/api-service/__app__/onefuzzlib/heartbeat.py @@ -3,9 +3,11 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. +from datetime import datetime, timedelta from typing import Dict, List, Tuple from uuid import UUID +from onefuzztypes.enums import HeartbeatType from onefuzztypes.models import NodeHeartbeat as BASE_NODE_HEARTBEAT from onefuzztypes.models import NodeHeartbeatEntry, NodeHeartbeatSummary from onefuzztypes.models import TaskHeartbeat as BASE_TASK_HEARTBEAT @@ -58,10 +60,14 @@ def key_fields(cls) -> Tuple[str, str]: class NodeHeartbeat(BASE_NODE_HEARTBEAT, ORMMixin): + @classmethod + def get_heartbeat_id(cls, node_id: UUID, heartbeat_type: HeartbeatType): + return "-".join([str(node_id), heartbeat_type.name]) + @classmethod def add(cls, entry: NodeHeartbeatEntry) -> None: for value in entry.data: - heartbeat_id = "-".join([str(entry.node_id), value["type"].name]) + heartbeat_id = cls.get_heartbeat_id(entry.node_id, value["type"].name) heartbeat = cls( heartbeat_id=heartbeat_id, node_id=entry.node_id, @@ -92,6 +98,24 @@ def get_heartbeats(cls, node_id: UUID) -> List[NodeHeartbeatSummary]: ) return result + @classmethod + def get_dead_nodes( + cls, node_ids: List[UUID], expiration_period: timedelta + ) -> List[UUID]: + time_filter = "Timestamp lt datetime'%s'" % ( + datetime.utcnow().isoformat() - expiration_period + ) + entries = cls.search( + query={ + "heartbeat_id": [ + cls.get_heartbeat_id(n, HeartbeatType.MachineAlive) + for n in node_ids + ] + }, + raw_unchecked_filter=time_filter, + ) + return [e.node_id for e in entries] + @classmethod def key_fields(cls) -> Tuple[str, str]: return ("node_id", "heartbeat_id") diff --git a/src/api-service/__app__/onefuzzlib/pools.py b/src/api-service/__app__/onefuzzlib/pools.py index 2df8d895ff..883684edb8 100644 --- a/src/api-service/__app__/onefuzzlib/pools.py +++ b/src/api-service/__app__/onefuzzlib/pools.py @@ -60,6 +60,7 @@ update_extensions, ) from .extension import fuzz_extensions +from .heartbeat import NodeHeartbeat from .orm import MappingIntStrAny, ORMMixin, QueryFilter # Future work: @@ -722,6 +723,10 @@ def cleanup_nodes(self) -> bool: else: to_reimage.append(node) + idle_nodes = NodeHeartbeat.get_dead_nodes(azure_nodes) + for node in idle_nodes: + to_reimage.append(node) + # Perform operations until they fail due to scaleset getting locked try: if to_delete: From 745abca441ee17bb33e7d768ea2302beb771653a Mon Sep 17 00:00:00 2001 From: Cheick Keita Date: Wed, 14 Oct 2020 17:36:23 -0700 Subject: [PATCH 2/5] fixing compile erros --- src/api-service/__app__/onefuzzlib/heartbeat.py | 6 +++--- src/api-service/__app__/onefuzzlib/pools.py | 11 +++++++++-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/api-service/__app__/onefuzzlib/heartbeat.py b/src/api-service/__app__/onefuzzlib/heartbeat.py index 11133331bd..34cbcec6fd 100644 --- a/src/api-service/__app__/onefuzzlib/heartbeat.py +++ b/src/api-service/__app__/onefuzzlib/heartbeat.py @@ -61,13 +61,13 @@ def key_fields(cls) -> Tuple[str, str]: class NodeHeartbeat(BASE_NODE_HEARTBEAT, ORMMixin): @classmethod - def get_heartbeat_id(cls, node_id: UUID, heartbeat_type: HeartbeatType): + def get_heartbeat_id(cls, node_id: UUID, heartbeat_type: HeartbeatType) -> str: return "-".join([str(node_id), heartbeat_type.name]) @classmethod def add(cls, entry: NodeHeartbeatEntry) -> None: for value in entry.data: - heartbeat_id = cls.get_heartbeat_id(entry.node_id, value["type"].name) + heartbeat_id = cls.get_heartbeat_id(entry.node_id, value["type"]) heartbeat = cls( heartbeat_id=heartbeat_id, node_id=entry.node_id, @@ -103,7 +103,7 @@ def get_dead_nodes( cls, node_ids: List[UUID], expiration_period: timedelta ) -> List[UUID]: time_filter = "Timestamp lt datetime'%s'" % ( - datetime.utcnow().isoformat() - expiration_period + (datetime.utcnow() - expiration_period).isoformat() ) entries = cls.search( query={ diff --git a/src/api-service/__app__/onefuzzlib/pools.py b/src/api-service/__app__/onefuzzlib/pools.py index 883684edb8..ca5bc89418 100644 --- a/src/api-service/__app__/onefuzzlib/pools.py +++ b/src/api-service/__app__/onefuzzlib/pools.py @@ -109,9 +109,13 @@ def search_outdated( version_query = "not (version eq '%s')" % __version__ return cls.search(query=query, raw_unchecked_filter=version_query) + @classmethod + def get_by_machine_ids(cls, machine_id: List[UUID]) -> List["Node"]: + return cls.search(query={"machine_id": machine_id}) + @classmethod def get_by_machine_id(cls, machine_id: UUID) -> Optional["Node"]: - nodes = cls.search(query={"machine_id": [machine_id]}) + nodes = cls.get_by_machine_ids([machine_id]) if not nodes: return None @@ -723,7 +727,10 @@ def cleanup_nodes(self) -> bool: else: to_reimage.append(node) - idle_nodes = NodeHeartbeat.get_dead_nodes(azure_nodes) + idle_nodes_ids = NodeHeartbeat.get_dead_nodes( + [node_id for node_id in azure_nodes.keys()], datetime.timedelta(hours=1) + ) + idle_nodes = Node.get_by_machine_ids(idle_nodes_ids) for node in idle_nodes: to_reimage.append(node) From 07bfade40ed6357a3c43700a7f6bae25872c7cb4 Mon Sep 17 00:00:00 2001 From: Cheick Keita Date: Wed, 14 Oct 2020 21:07:58 -0700 Subject: [PATCH 3/5] address PR comments --- src/api-service/__app__/onefuzzlib/pools.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/api-service/__app__/onefuzzlib/pools.py b/src/api-service/__app__/onefuzzlib/pools.py index ca5bc89418..676c9e8dde 100644 --- a/src/api-service/__app__/onefuzzlib/pools.py +++ b/src/api-service/__app__/onefuzzlib/pools.py @@ -63,6 +63,8 @@ from .heartbeat import NodeHeartbeat from .orm import MappingIntStrAny, ORMMixin, QueryFilter +NODE_EXPIRATION_TIME: datetime.timedelta = datetime.timedelta(hours=1) + # Future work: # # Enabling autoscaling for the scalesets based on the pool work queues. @@ -727,11 +729,11 @@ def cleanup_nodes(self) -> bool: else: to_reimage.append(node) - idle_nodes_ids = NodeHeartbeat.get_dead_nodes( - [node_id for node_id in azure_nodes.keys()], datetime.timedelta(hours=1) + dead_nodes_ids = NodeHeartbeat.get_dead_nodes( + [node_id for node_id in azure_nodes.keys()], NODE_EXPIRATION_TIME ) - idle_nodes = Node.get_by_machine_ids(idle_nodes_ids) - for node in idle_nodes: + dead_nodes = Node.get_by_machine_ids(dead_nodes_ids) + for node in dead_nodes: to_reimage.append(node) # Perform operations until they fail due to scaleset getting locked From 19ce0b2c1114a35ffa03dc54550e95b8f32ce29c Mon Sep 17 00:00:00 2001 From: Cheick Keita Date: Tue, 20 Oct 2020 11:46:12 -0700 Subject: [PATCH 4/5] simplify query to find dead nodes --- src/api-service/__app__/onefuzzlib/pools.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/api-service/__app__/onefuzzlib/pools.py b/src/api-service/__app__/onefuzzlib/pools.py index 6a9f1d65ed..34026d8287 100644 --- a/src/api-service/__app__/onefuzzlib/pools.py +++ b/src/api-service/__app__/onefuzzlib/pools.py @@ -60,7 +60,6 @@ update_extensions, ) from .extension import fuzz_extensions -from .heartbeat import NodeHeartbeat from .orm import MappingIntStrAny, ORMMixin, QueryFilter NODE_EXPIRATION_TIME: datetime.timedelta = datetime.timedelta(hours=1) @@ -132,7 +131,7 @@ def mark_outdated_nodes(cls) -> None: @classmethod def get_by_machine_id(cls, machine_id: UUID) -> Optional["Node"]: - nodes = cls.get_by_machine_ids([machine_id]) + nodes = cls.search(query={"machine_id": [machine_id]}) if not nodes: return None @@ -281,6 +280,18 @@ def set_halt(self) -> None: self.set_shutdown() self.stop() + @classmethod + def get_dead_nodes( + cls, scaleset_id: UUID, expiration_period: datetime.timedelta + ) -> List["Node"]: + time_filter = "heartbeat lt datetime'%s'" % ( + (datetime.datetime.utcnow() - expiration_period).isoformat() + ) + return cls.search( + query={"scaleset_id": [scaleset_id]}, + raw_unchecked_filter=time_filter, + ) + class NodeTasks(BASE_NODE_TASK, ORMMixin): @classmethod @@ -746,10 +757,7 @@ def cleanup_nodes(self) -> bool: # only add nodes that are not already set to reschedule to_reimage.append(node) - dead_nodes_ids = NodeHeartbeat.get_dead_nodes( - [node_id for node_id in azure_nodes.keys()], NODE_EXPIRATION_TIME - ) - dead_nodes = Node.get_by_machine_ids(dead_nodes_ids) + dead_nodes = Node.get_dead_nodes(self.scaleset_id, NODE_EXPIRATION_TIME) for node in dead_nodes: to_reimage.append(node) From 72234cbcbd4433e510a57728d33e6f3f603ad62f Mon Sep 17 00:00:00 2001 From: Cheick Keita Date: Tue, 20 Oct 2020 12:48:42 -0700 Subject: [PATCH 5/5] calling set_halt on the node before reimage --- src/api-service/__app__/onefuzzlib/pools.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/api-service/__app__/onefuzzlib/pools.py b/src/api-service/__app__/onefuzzlib/pools.py index 34026d8287..b16a03e0a0 100644 --- a/src/api-service/__app__/onefuzzlib/pools.py +++ b/src/api-service/__app__/onefuzzlib/pools.py @@ -759,6 +759,7 @@ def cleanup_nodes(self) -> bool: dead_nodes = Node.get_dead_nodes(self.scaleset_id, NODE_EXPIRATION_TIME) for node in dead_nodes: + node.set_halt() to_reimage.append(node) # Perform operations until they fail due to scaleset getting locked