Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Add NoNewWork node command to tell free nodes to stop asking for new work #866

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/agent/onefuzz-supervisor/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub enum NodeCommand {
AddSshKey(SshKeyInfo),
StopTask(StopTask),
Stop {},
StopIfFree {},
}

#[derive(Debug, Deserialize, Eq, PartialEq, Serialize)]
Expand Down
9 changes: 9 additions & 0 deletions src/agent/onefuzz-supervisor/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ impl Scheduler {
};
*self = state.into();
}
NodeCommand::StopIfFree {} => {
if let Scheduler::Free(_) = self {
let cause = DoneCause::Stopped;
let state = State {
ctx: Done { cause },
};
*self = state.into();
}
}
}

Ok(())
Expand Down
6 changes: 6 additions & 0 deletions src/api-service/__app__/onefuzzlib/versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
from typing import Dict

import semver
from memoization import cached
from onefuzztypes.responses import Version

Expand All @@ -29,3 +30,8 @@ def versions() -> Dict[str, Version]:
version=__version__,
)
return {"onefuzz": entry}


def is_minimum_version(*, version: str, minimum: str) -> bool:
# check if version is at least (or higher) than minimum
return bool(semver.VersionInfo.parse(version).compare(minimum) >= 0)
17 changes: 16 additions & 1 deletion src/api-service/__app__/onefuzzlib/workers/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
)
from onefuzztypes.models import Error
from onefuzztypes.models import Node as BASE_NODE
from onefuzztypes.models import NodeAssignment, NodeCommand, NodeCommandAddSshKey
from onefuzztypes.models import (
NodeAssignment,
NodeCommand,
NodeCommandAddSshKey,
NodeCommandStopIfFree,
)
from onefuzztypes.models import NodeTasks as BASE_NODE_TASK
from onefuzztypes.models import Result, StopNodeCommand, StopTaskNodeCommand
from onefuzztypes.primitives import PoolName
Expand All @@ -26,6 +31,7 @@
from ..azure.vmss import get_instance_id
from ..events import send_event
from ..orm import MappingIntStrAny, ORMMixin, QueryFilter
from ..versions import is_minimum_version

NODE_EXPIRATION_TIME: datetime.timedelta = datetime.timedelta(hours=1)
NODE_REIMAGE_TIME: datetime.timedelta = datetime.timedelta(days=7)
Expand Down Expand Up @@ -338,6 +344,11 @@ def to_reimage(self, done: bool = False) -> None:
if not self.reimage_requested and not self.delete_requested:
logging.info("setting reimage_requested: %s", self.machine_id)
self.reimage_requested = True

# if we're going to reimage, make sure the node doesn't pick up new work
# too.
self.send_stop_if_free()

self.save()

def add_ssh_public_key(self, public_key: str) -> Result[None]:
Expand All @@ -355,6 +366,10 @@ def add_ssh_public_key(self, public_key: str) -> Result[None]:
)
return None

def send_stop_if_free(self) -> None:
if is_minimum_version(version=self.version, minimum="2.16.1"):
bmc-msft marked this conversation as resolved.
Show resolved Hide resolved
self.send_message(NodeCommand(stop_if_free=NodeCommandStopIfFree()))

def stop(self, done: bool = False) -> None:
self.to_reimage(done=done)
self.send_message(NodeCommand(stop=StopNodeCommand()))
Expand Down
9 changes: 9 additions & 0 deletions src/api-service/__app__/onefuzzlib/workers/scalesets.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,10 +440,19 @@ def _resize_grow(self) -> None:
return

def _resize_shrink(self, to_remove: int) -> None:
logging.info(
SCALESET_LOG_PREFIX + "shrinking scaleset. scaleset_id:%s to_remove:%d",
self.scaleset_id,
to_remove,
)
queue = ScalesetShrinkQueue(self.scaleset_id)
for _ in range(to_remove):
queue.add_entry()

nodes = Node.search_states(scaleset_id=self.scaleset_id)
for node in nodes:
node.send_stop_if_free()

def resize(self) -> None:
# no longer needing to resize
if self.state != ScalesetState.resize:
Expand Down
1 change: 1 addition & 0 deletions src/api-service/__app__/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ memoization~=0.3.1
github3.py~=1.3.0
typing-extensions~=3.7.4.3
jsonpatch==1.28
semver==2.13.0
# onefuzz types version is set during build
onefuzztypes==0.0.0
5 changes: 4 additions & 1 deletion src/api-service/mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,7 @@ ignore_missing_imports = True
ignore_missing_imports = True

[mypy-jsonpatch.*]
ignore_missing_imports = True
ignore_missing_imports = True

[mypy-semver.*]
ignore_missing_imports = True
20 changes: 20 additions & 0 deletions src/api-service/tests/test_version_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import unittest

from __app__.onefuzzlib.versions import is_minimum_version


class TestMinVersion(unittest.TestCase):
def test_basic(self) -> None:
self.assertEqual(is_minimum_version(version="1.0.0", minimum="1.0.0"), True)
self.assertEqual(is_minimum_version(version="2.0.0", minimum="1.0.0"), True)
self.assertEqual(is_minimum_version(version="2.0.0", minimum="3.0.0"), False)
self.assertEqual(is_minimum_version(version="1.0.0", minimum="1.6.0"), False)


if __name__ == "__main__":
unittest.main()
5 changes: 5 additions & 0 deletions src/pytypes/onefuzztypes/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,10 @@ class NodeHeartbeatEntry(BaseModel):
data: List[Dict[str, HeartbeatType]]


class NodeCommandStopIfFree(BaseModel):
pass


class StopNodeCommand(BaseModel):
pass

Expand All @@ -573,6 +577,7 @@ class NodeCommand(EnumModel):
stop: Optional[StopNodeCommand]
stop_task: Optional[StopTaskNodeCommand]
add_ssh_key: Optional[NodeCommandAddSshKey]
stop_if_free: Optional[NodeCommandStopIfFree]


class NodeCommandEnvelope(BaseModel):
Expand Down