From a58696a984fa1ca46c15846c2c23d190fc24408e Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Fri, 7 May 2021 10:24:58 -0400 Subject: [PATCH 1/6] Add NewNewWork node command to tell free nodes to stop asking for new work --- src/agent/onefuzz-supervisor/src/coordinator.rs | 1 + src/agent/onefuzz-supervisor/src/scheduler.rs | 9 +++++++++ src/api-service/__app__/onefuzzlib/versions.py | 12 ++++++++++++ .../__app__/onefuzzlib/workers/nodes.py | 14 +++++++++++++- .../__app__/onefuzzlib/workers/scalesets.py | 9 +++++++++ src/api-service/__app__/requirements.txt | 1 + src/api-service/mypy.ini | 5 ++++- src/pytypes/onefuzztypes/models.py | 5 +++++ 8 files changed, 54 insertions(+), 2 deletions(-) diff --git a/src/agent/onefuzz-supervisor/src/coordinator.rs b/src/agent/onefuzz-supervisor/src/coordinator.rs index 02699c877a..2a7b54b628 100644 --- a/src/agent/onefuzz-supervisor/src/coordinator.rs +++ b/src/agent/onefuzz-supervisor/src/coordinator.rs @@ -25,6 +25,7 @@ pub enum NodeCommand { AddSshKey(SshKeyInfo), StopTask(StopTask), Stop {}, + NoNewWork {}, } #[derive(Debug, Deserialize, Eq, PartialEq, Serialize)] diff --git a/src/agent/onefuzz-supervisor/src/scheduler.rs b/src/agent/onefuzz-supervisor/src/scheduler.rs index 4dd1dc2eec..cf8e8cb6e9 100644 --- a/src/agent/onefuzz-supervisor/src/scheduler.rs +++ b/src/agent/onefuzz-supervisor/src/scheduler.rs @@ -71,6 +71,15 @@ impl Scheduler { }; *self = state.into(); } + NodeCommand::NoNewWork {} => { + if let Scheduler::Free(_) = self { + let cause = DoneCause::Stopped; + let state = State { + ctx: Done { cause }, + }; + *self = state.into(); + } + } } Ok(()) diff --git a/src/api-service/__app__/onefuzzlib/versions.py b/src/api-service/__app__/onefuzzlib/versions.py index 7b822fe10f..9f624f944b 100644 --- a/src/api-service/__app__/onefuzzlib/versions.py +++ b/src/api-service/__app__/onefuzzlib/versions.py @@ -6,6 +6,7 @@ import os from typing import Dict +import semver from memoization import cached from onefuzztypes.responses import Version @@ -29,3 +30,14 @@ def versions() -> Dict[str, Version]: version=__version__, ) return {"onefuzz": entry} + + +def is_minimum_version(*, version: str, minimum: str) -> bool: + # >>> is_minimum_version(version="1.0.0", minimum="1.0.0") + # True + # >>> is_minimum_version(version="1.0.1", minimum="1.0.0") + # True + # >>> is_minimum_version(version="1.0.0", minimum="1.0.1") + # False + # >>> + return bool(semver.compare(version, minimum) >= 0) diff --git a/src/api-service/__app__/onefuzzlib/workers/nodes.py b/src/api-service/__app__/onefuzzlib/workers/nodes.py index 7858688f5d..70d0e2d851 100644 --- a/src/api-service/__app__/onefuzzlib/workers/nodes.py +++ b/src/api-service/__app__/onefuzzlib/workers/nodes.py @@ -16,7 +16,12 @@ ) from onefuzztypes.models import Error from onefuzztypes.models import Node as BASE_NODE -from onefuzztypes.models import NodeAssignment, NodeCommand, NodeCommandAddSshKey +from onefuzztypes.models import ( + NodeAssignment, + NodeCommand, + NodeCommandAddSshKey, + NodeCommandNoNewWork, +) from onefuzztypes.models import NodeTasks as BASE_NODE_TASK from onefuzztypes.models import Result, StopNodeCommand, StopTaskNodeCommand from onefuzztypes.primitives import PoolName @@ -26,6 +31,7 @@ from ..azure.vmss import get_instance_id from ..events import send_event from ..orm import MappingIntStrAny, ORMMixin, QueryFilter +from ..versions import is_minimum_version NODE_EXPIRATION_TIME: datetime.timedelta = datetime.timedelta(hours=1) NODE_REIMAGE_TIME: datetime.timedelta = datetime.timedelta(days=7) @@ -338,6 +344,8 @@ def to_reimage(self, done: bool = False) -> None: if not self.reimage_requested and not self.delete_requested: logging.info("setting reimage_requested: %s", self.machine_id) self.reimage_requested = True + + self.send_no_new_work() self.save() def add_ssh_public_key(self, public_key: str) -> Result[None]: @@ -355,6 +363,10 @@ def add_ssh_public_key(self, public_key: str) -> Result[None]: ) return None + def send_no_new_work(self) -> None: + if is_minimum_version(version=self.version, minimum="2.17.0"): + self.send_message(NodeCommand(no_new_work=NodeCommandNoNewWork())) + def stop(self, done: bool = False) -> None: self.to_reimage(done=done) self.send_message(NodeCommand(stop=StopNodeCommand())) diff --git a/src/api-service/__app__/onefuzzlib/workers/scalesets.py b/src/api-service/__app__/onefuzzlib/workers/scalesets.py index a8b3c1984e..11dd678b13 100644 --- a/src/api-service/__app__/onefuzzlib/workers/scalesets.py +++ b/src/api-service/__app__/onefuzzlib/workers/scalesets.py @@ -440,10 +440,19 @@ def _resize_grow(self) -> None: return def _resize_shrink(self, to_remove: int) -> None: + logging.info( + SCALESET_LOG_PREFIX + "shrinking scaleset. scaleset_id:%s to_remove:%d", + self.scaleset_id, + to_remove, + ) queue = ScalesetShrinkQueue(self.scaleset_id) for _ in range(to_remove): queue.add_entry() + nodes = Node.search_states(scaleset_id=self.scaleset_id) + for node in nodes: + node.send_no_new_work() + def resize(self) -> None: # no longer needing to resize if self.state != ScalesetState.resize: diff --git a/src/api-service/__app__/requirements.txt b/src/api-service/__app__/requirements.txt index 0d3af738a3..4683239e91 100644 --- a/src/api-service/__app__/requirements.txt +++ b/src/api-service/__app__/requirements.txt @@ -30,5 +30,6 @@ memoization~=0.3.1 github3.py~=1.3.0 typing-extensions~=3.7.4.3 jsonpatch==1.28 +semver==2.13.0 # onefuzz types version is set during build onefuzztypes==0.0.0 diff --git a/src/api-service/mypy.ini b/src/api-service/mypy.ini index 330b7bff91..1248542bac 100644 --- a/src/api-service/mypy.ini +++ b/src/api-service/mypy.ini @@ -36,4 +36,7 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-jsonpatch.*] -ignore_missing_imports = True \ No newline at end of file +ignore_missing_imports = True + +[mypy-semver.*] +ignore_missing_imports = True diff --git a/src/pytypes/onefuzztypes/models.py b/src/pytypes/onefuzztypes/models.py index 2489abc3af..efa2c28b47 100644 --- a/src/pytypes/onefuzztypes/models.py +++ b/src/pytypes/onefuzztypes/models.py @@ -557,6 +557,10 @@ class NodeHeartbeatEntry(BaseModel): data: List[Dict[str, HeartbeatType]] +class NodeCommandNoNewWork(BaseModel): + pass + + class StopNodeCommand(BaseModel): pass @@ -573,6 +577,7 @@ class NodeCommand(EnumModel): stop: Optional[StopNodeCommand] stop_task: Optional[StopTaskNodeCommand] add_ssh_key: Optional[NodeCommandAddSshKey] + no_new_work: Optional[NodeCommandNoNewWork] class NodeCommandEnvelope(BaseModel): From 00ea0d4cb15ec1d2b7bc758aab643667e034ac81 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Fri, 7 May 2021 11:50:25 -0400 Subject: [PATCH 2/6] temporarily bump this down for testing --- src/api-service/__app__/onefuzzlib/workers/nodes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api-service/__app__/onefuzzlib/workers/nodes.py b/src/api-service/__app__/onefuzzlib/workers/nodes.py index 70d0e2d851..fb3f9a66d5 100644 --- a/src/api-service/__app__/onefuzzlib/workers/nodes.py +++ b/src/api-service/__app__/onefuzzlib/workers/nodes.py @@ -364,7 +364,7 @@ def add_ssh_public_key(self, public_key: str) -> Result[None]: return None def send_no_new_work(self) -> None: - if is_minimum_version(version=self.version, minimum="2.17.0"): + if is_minimum_version(version=self.version, minimum="2.15.0"): self.send_message(NodeCommand(no_new_work=NodeCommandNoNewWork())) def stop(self, done: bool = False) -> None: From 13d4b8aac0a10557c1dc18dc83041080019bc620 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Fri, 7 May 2021 12:31:56 -0400 Subject: [PATCH 3/6] set minimum version to 2.16.1 --- src/api-service/__app__/onefuzzlib/workers/nodes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api-service/__app__/onefuzzlib/workers/nodes.py b/src/api-service/__app__/onefuzzlib/workers/nodes.py index fb3f9a66d5..a11f559547 100644 --- a/src/api-service/__app__/onefuzzlib/workers/nodes.py +++ b/src/api-service/__app__/onefuzzlib/workers/nodes.py @@ -364,7 +364,7 @@ def add_ssh_public_key(self, public_key: str) -> Result[None]: return None def send_no_new_work(self) -> None: - if is_minimum_version(version=self.version, minimum="2.15.0"): + if is_minimum_version(version=self.version, minimum="2.16.1"): self.send_message(NodeCommand(no_new_work=NodeCommandNoNewWork())) def stop(self, done: bool = False) -> None: From 6ff02ba3e8cf60458552e9805f9b2042fa087282 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Fri, 7 May 2021 13:19:45 -0400 Subject: [PATCH 4/6] move to tests --- .../__app__/onefuzzlib/versions.py | 10 ++-------- src/api-service/tests/test_version_check.py | 20 +++++++++++++++++++ 2 files changed, 22 insertions(+), 8 deletions(-) create mode 100755 src/api-service/tests/test_version_check.py diff --git a/src/api-service/__app__/onefuzzlib/versions.py b/src/api-service/__app__/onefuzzlib/versions.py index 9f624f944b..19edce7460 100644 --- a/src/api-service/__app__/onefuzzlib/versions.py +++ b/src/api-service/__app__/onefuzzlib/versions.py @@ -33,11 +33,5 @@ def versions() -> Dict[str, Version]: def is_minimum_version(*, version: str, minimum: str) -> bool: - # >>> is_minimum_version(version="1.0.0", minimum="1.0.0") - # True - # >>> is_minimum_version(version="1.0.1", minimum="1.0.0") - # True - # >>> is_minimum_version(version="1.0.0", minimum="1.0.1") - # False - # >>> - return bool(semver.compare(version, minimum) >= 0) + # check if version is at least (or higher) than minimum + return bool(semver.VersionInfo.parse(version).compare(minimum) >= 0) diff --git a/src/api-service/tests/test_version_check.py b/src/api-service/tests/test_version_check.py new file mode 100755 index 0000000000..85023b23c9 --- /dev/null +++ b/src/api-service/tests/test_version_check.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python +# +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import unittest + +from __app__.onefuzzlib.versions import is_minimum_version + + +class TestMinVersion(unittest.TestCase): + def test_basic(self) -> None: + self.assertEqual(is_minimum_version(version="1.0.0", minimum="1.0.0"), True) + self.assertEqual(is_minimum_version(version="2.0.0", minimum="1.0.0"), True) + self.assertEqual(is_minimum_version(version="2.0.0", minimum="3.0.0"), False) + self.assertEqual(is_minimum_version(version="1.0.0", minimum="1.6.0"), False) + + +if __name__ == "__main__": + unittest.main() From 1566f8d9285e3a5efe821e713e20a1fc36010736 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Fri, 7 May 2021 13:21:16 -0400 Subject: [PATCH 5/6] document why we're sending no new work. --- src/api-service/__app__/onefuzzlib/workers/nodes.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/api-service/__app__/onefuzzlib/workers/nodes.py b/src/api-service/__app__/onefuzzlib/workers/nodes.py index a11f559547..1bf2cd5f5c 100644 --- a/src/api-service/__app__/onefuzzlib/workers/nodes.py +++ b/src/api-service/__app__/onefuzzlib/workers/nodes.py @@ -345,7 +345,10 @@ def to_reimage(self, done: bool = False) -> None: logging.info("setting reimage_requested: %s", self.machine_id) self.reimage_requested = True + # if we're going to reimage, make sure the node doesn't pick up new work + # too. self.send_no_new_work() + self.save() def add_ssh_public_key(self, public_key: str) -> Result[None]: From 9013801309ae926c4a156be84e3c3176b1c8fe4c Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Fri, 7 May 2021 13:24:48 -0400 Subject: [PATCH 6/6] rename no new work to stop if free --- src/agent/onefuzz-supervisor/src/coordinator.rs | 2 +- src/agent/onefuzz-supervisor/src/scheduler.rs | 2 +- src/api-service/__app__/onefuzzlib/workers/nodes.py | 8 ++++---- src/api-service/__app__/onefuzzlib/workers/scalesets.py | 2 +- src/pytypes/onefuzztypes/models.py | 4 ++-- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/agent/onefuzz-supervisor/src/coordinator.rs b/src/agent/onefuzz-supervisor/src/coordinator.rs index 2a7b54b628..c19911a6e5 100644 --- a/src/agent/onefuzz-supervisor/src/coordinator.rs +++ b/src/agent/onefuzz-supervisor/src/coordinator.rs @@ -25,7 +25,7 @@ pub enum NodeCommand { AddSshKey(SshKeyInfo), StopTask(StopTask), Stop {}, - NoNewWork {}, + StopIfFree {}, } #[derive(Debug, Deserialize, Eq, PartialEq, Serialize)] diff --git a/src/agent/onefuzz-supervisor/src/scheduler.rs b/src/agent/onefuzz-supervisor/src/scheduler.rs index cf8e8cb6e9..a3824410d9 100644 --- a/src/agent/onefuzz-supervisor/src/scheduler.rs +++ b/src/agent/onefuzz-supervisor/src/scheduler.rs @@ -71,7 +71,7 @@ impl Scheduler { }; *self = state.into(); } - NodeCommand::NoNewWork {} => { + NodeCommand::StopIfFree {} => { if let Scheduler::Free(_) = self { let cause = DoneCause::Stopped; let state = State { diff --git a/src/api-service/__app__/onefuzzlib/workers/nodes.py b/src/api-service/__app__/onefuzzlib/workers/nodes.py index 1bf2cd5f5c..2919ea7655 100644 --- a/src/api-service/__app__/onefuzzlib/workers/nodes.py +++ b/src/api-service/__app__/onefuzzlib/workers/nodes.py @@ -20,7 +20,7 @@ NodeAssignment, NodeCommand, NodeCommandAddSshKey, - NodeCommandNoNewWork, + NodeCommandStopIfFree, ) from onefuzztypes.models import NodeTasks as BASE_NODE_TASK from onefuzztypes.models import Result, StopNodeCommand, StopTaskNodeCommand @@ -347,7 +347,7 @@ def to_reimage(self, done: bool = False) -> None: # if we're going to reimage, make sure the node doesn't pick up new work # too. - self.send_no_new_work() + self.send_stop_if_free() self.save() @@ -366,9 +366,9 @@ def add_ssh_public_key(self, public_key: str) -> Result[None]: ) return None - def send_no_new_work(self) -> None: + def send_stop_if_free(self) -> None: if is_minimum_version(version=self.version, minimum="2.16.1"): - self.send_message(NodeCommand(no_new_work=NodeCommandNoNewWork())) + self.send_message(NodeCommand(stop_if_free=NodeCommandStopIfFree())) def stop(self, done: bool = False) -> None: self.to_reimage(done=done) diff --git a/src/api-service/__app__/onefuzzlib/workers/scalesets.py b/src/api-service/__app__/onefuzzlib/workers/scalesets.py index 11dd678b13..f05b42f697 100644 --- a/src/api-service/__app__/onefuzzlib/workers/scalesets.py +++ b/src/api-service/__app__/onefuzzlib/workers/scalesets.py @@ -451,7 +451,7 @@ def _resize_shrink(self, to_remove: int) -> None: nodes = Node.search_states(scaleset_id=self.scaleset_id) for node in nodes: - node.send_no_new_work() + node.send_stop_if_free() def resize(self) -> None: # no longer needing to resize diff --git a/src/pytypes/onefuzztypes/models.py b/src/pytypes/onefuzztypes/models.py index efa2c28b47..d3a2d1385f 100644 --- a/src/pytypes/onefuzztypes/models.py +++ b/src/pytypes/onefuzztypes/models.py @@ -557,7 +557,7 @@ class NodeHeartbeatEntry(BaseModel): data: List[Dict[str, HeartbeatType]] -class NodeCommandNoNewWork(BaseModel): +class NodeCommandStopIfFree(BaseModel): pass @@ -577,7 +577,7 @@ class NodeCommand(EnumModel): stop: Optional[StopNodeCommand] stop_task: Optional[StopTaskNodeCommand] add_ssh_key: Optional[NodeCommandAddSshKey] - no_new_work: Optional[NodeCommandNoNewWork] + stop_if_free: Optional[NodeCommandStopIfFree] class NodeCommandEnvelope(BaseModel):