Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework disk usage limitation #1549

Merged
merged 7 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from neptune.internal.operation_processors.operations_errors_processor import OperationsErrorsProcessor
from neptune.internal.operation_processors.utils import common_metadata
from neptune.internal.threading.daemon import Daemon
from neptune.internal.utils.disk_full import ensure_disk_not_full
from neptune.internal.utils.disk_utilization import ensure_disk_not_overutilize
from neptune.internal.utils.logger import logger
from neptune.internal.utils.monotonic_inc_batch_size import MonotonicIncBatchSize

Expand Down Expand Up @@ -121,7 +121,7 @@ def _init_data_path(container_id: "UniqueId", container_type: "ContainerType") -
path_suffix = f"exec-{now.timestamp()}-{now.strftime('%Y-%m-%d_%H.%M.%S.%f')}-{os.getpid()}"
return get_container_dir(ASYNC_DIRECTORY, container_id, container_type, path_suffix)

@ensure_disk_not_full
@ensure_disk_not_overutilize
def enqueue_operation(self, op: Operation, *, wait: bool) -> None:
self._last_version = self._queue.put(op)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
get_container_dir,
)
from neptune.internal.operation_processors.utils import common_metadata
from neptune.internal.utils.disk_full import ensure_disk_not_full
from neptune.internal.utils.disk_utilization import ensure_disk_not_overutilize

if TYPE_CHECKING:
import threading
Expand All @@ -60,7 +60,7 @@ def __init__(self, container_id: "UniqueId", container_type: "ContainerType", lo
def _init_data_path(container_id: "UniqueId", container_type: "ContainerType") -> "Path":
return get_container_dir(OFFLINE_DIRECTORY, container_id, container_type)

@ensure_disk_not_full
@ensure_disk_not_overutilize
def enqueue_operation(self, op: Operation, *, wait: bool) -> None:
self._queue.put(op)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
get_container_dir,
)
from neptune.internal.operation_processors.utils import common_metadata
from neptune.internal.utils.disk_full import ensure_disk_not_full
from neptune.internal.utils.disk_utilization import ensure_disk_not_overutilize

if TYPE_CHECKING:
from pathlib import Path
Expand Down Expand Up @@ -60,7 +60,7 @@ def _init_data_path(container_id: "UniqueId", container_type: "ContainerType") -
process_path = f"exec-{now.timestamp()}-{now.strftime('%Y-%m-%d_%H.%M.%S.%f')}-{os.getpid()}"
return get_container_dir(SYNC_DIRECTORY, container_id, container_type, process_path)

@ensure_disk_not_full
@ensure_disk_not_overutilize
def enqueue_operation(self, op: "Operation", *, wait: bool) -> None:
_, errors = self._backend.execute_operations(
container_id=self._container_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
__all__ = ["ensure_disk_not_full"]
__all__ = ["ensure_disk_not_overutilize"]


import os
Expand Down Expand Up @@ -44,44 +44,66 @@ def get_neptune_data_directory() -> str:
return os.getenv("NEPTUNE_DATA_DIRECTORY", NEPTUNE_DATA_DIRECTORY)


def get_disk_utilization_percent(path: Optional[str] = None) -> float:
def get_disk_utilization_percent(path: Optional[str] = None) -> Optional[float]:
try:
if path is None:
path = get_neptune_data_directory()

return float(psutil.disk_usage(path).percent)
except (ValueError, UnicodeEncodeError):
return 0
except (ValueError, TypeError, Error):
return None


def get_max_percentage_from_env() -> Optional[float]:
value = os.getenv(NEPTUNE_MAX_DISK_UTILIZATION)
if value is not None:
return float(value)
return None
def get_max_disk_utilization_from_env() -> Optional[float]:
env_limit_disk_utilization = os.getenv(NEPTUNE_MAX_DISK_UTILIZATION)

if env_limit_disk_utilization is None:
return None

try:
limit_disk_utilization = float(env_limit_disk_utilization)
if limit_disk_utilization <= 0 or limit_disk_utilization > 100:
raise ValueError

return limit_disk_utilization
except (ValueError, TypeError):
warn_once(
f"Provided invalid value of '{NEPTUNE_MAX_DISK_UTILIZATION}': '{env_limit_disk_utilization}'. "
"Check of disk utilization will not be applied.",
exception=NeptuneWarning,
)
return None

def ensure_disk_not_full(func: Callable[..., None]) -> Callable[..., None]:
non_raising_on_disk_issue = NEPTUNE_NON_RAISING_ON_DISK_ISSUE in os.environ
max_disk_utilization = get_max_percentage_from_env()

def ensure_disk_not_overutilize(func: Callable[..., None]) -> Callable[..., None]:
non_raising_on_disk_issue = os.getenv(NEPTUNE_NON_RAISING_ON_DISK_ISSUE, "false").lower() in ("true", "t", "1")
max_disk_utilization = get_max_disk_utilization_from_env()

@wraps(func)
def wrapper(*args: Tuple, **kwargs: Dict[str, Any]) -> None:
if non_raising_on_disk_issue:
try:
if max_disk_utilization:
current_utilization = get_disk_utilization_percent()
if current_utilization is None:
warn_once(
"Encountered disk issue during utilization check. Neptune will not save your data.",
exception=NeptuneWarning,
)
return

if current_utilization >= max_disk_utilization:
warn_once(
f"Max disk utilization {max_disk_utilization}% exceeded with {current_utilization}."
f" Neptune will not be saving your data.",
f"Disk usage is at {current_utilization}%, which exceeds the maximum allowed utilization "
+ "of {max_disk_utilization}%. Neptune will not save your data.",
exception=NeptuneWarning,
)
return

func(*args, **kwargs)
except (OSError, Error):
warn_once("Encountered disk issue and Neptune will not be saving your data.", exception=NeptuneWarning)
warn_once("Encountered disk issue. Neptune will not save your data.", exception=NeptuneWarning)

else:
return func(*args, **kwargs)

Expand Down
134 changes: 134 additions & 0 deletions tests/unit/neptune/new/internal/utils/test_disk_utilization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#
# Copyright (c) 2023, Neptune Labs Sp. z o.o.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import unittest
import warnings
from io import UnsupportedOperation

from mock import (
MagicMock,
patch,
)
from psutil import (
AccessDenied,
Error,
)

from neptune.envs import (
NEPTUNE_MAX_DISK_UTILIZATION,
NEPTUNE_NON_RAISING_ON_DISK_ISSUE,
)
from neptune.internal.utils.disk_utilization import ensure_disk_not_overutilize


class TestDiskUtilization(unittest.TestCase):
@patch.dict(os.environ, {NEPTUNE_NON_RAISING_ON_DISK_ISSUE: "True"})
def test_handle_invalid_env_values(self):
for value in ["True", "101", "-1"]:
with patch.dict(os.environ, {NEPTUNE_MAX_DISK_UTILIZATION: value}, clear=True):
mocked_func = MagicMock()
with warnings.catch_warnings(record=True) as warns:
wrapped_func = ensure_disk_not_overutilize(mocked_func)
wrapped_func()

assert len(warns) == 1
assert f"invalid value of '{NEPTUNE_MAX_DISK_UTILIZATION}': '{value}" in str(warns[-1].message)
mocked_func.assert_called_once()

# Catching OSError that's base error for all OS and IO errors. More info here: https://peps.python.org/pep-3151
# Additionally, catching specific psutil's base error - psutil.Error.
# More info about psutil.Error here: https://psutil.readthedocs.io/en/latest/index.html#psutil.Error
@patch.dict(os.environ, {NEPTUNE_NON_RAISING_ON_DISK_ISSUE: "True"})
def test_suppressing_of_func_errors(self):
disk_errors = [
OSError(),
IOError(),
EnvironmentError(),
UnsupportedOperation(),
Error(),
AccessDenied(),
]
for error in disk_errors:
mocked_func = MagicMock()
wrapped_func = ensure_disk_not_overutilize(mocked_func)
mocked_func.side_effect = error

wrapped_func() # asserting is not required as expecting that any error will be caught
mocked_func.assert_called_once()

non_disk_errors = [OverflowError(), AttributeError()]
for error in non_disk_errors:
mocked_func = MagicMock()
wrapped_func = ensure_disk_not_overutilize(mocked_func)
mocked_func.side_effect = error

with self.assertRaises(BaseException):
wrapped_func()
mocked_func.assert_called_once()

@patch.dict(os.environ, {NEPTUNE_NON_RAISING_ON_DISK_ISSUE: "True"})
@patch.dict(os.environ, {NEPTUNE_MAX_DISK_UTILIZATION: "60"})
@patch("psutil.disk_usage")
def test_suppressing_of_checking_utilization_errors(self, disk_usage_mock):
checking_errors = [
TypeError(),
UnsupportedOperation(),
Error(),
AccessDenied(),
]
for error in checking_errors:
mocked_func = MagicMock()
wrapped_func = ensure_disk_not_overutilize(mocked_func)
disk_usage_mock.side_effect = error

wrapped_func() # asserting is not required as expecting that any error will be caught
mocked_func.assert_not_called()

@patch.dict(os.environ, {NEPTUNE_NON_RAISING_ON_DISK_ISSUE: "True"})
@patch.dict(os.environ, {NEPTUNE_MAX_DISK_UTILIZATION: "100"})
@patch("psutil.disk_usage")
def test_not_called_with_usage_100_percent(self, disk_usage_mock):
disk_usage_mock.return_value.percent = 100
mocked_func = MagicMock()
wrapped_func = ensure_disk_not_overutilize(mocked_func)

wrapped_func()

mocked_func.assert_not_called()

@patch.dict(os.environ, {NEPTUNE_NON_RAISING_ON_DISK_ISSUE: "True"})
@patch.dict(os.environ, {NEPTUNE_MAX_DISK_UTILIZATION: "100"})
@patch("psutil.disk_usage")
def test_called_when_usage_less_than_limit(self, disk_usage_mock):
disk_usage_mock.return_value.percent = 99
mocked_func = MagicMock()
wrapped_func = ensure_disk_not_overutilize(mocked_func)

wrapped_func()

mocked_func.assert_called_once()

@patch.dict(os.environ, {NEPTUNE_NON_RAISING_ON_DISK_ISSUE: "False"})
@patch.dict(os.environ, {NEPTUNE_MAX_DISK_UTILIZATION: "60"})
@patch("psutil.disk_usage")
def test_not_called_when_(self, disk_usage_mock):
disk_usage_mock.return_value.percent = 99
mocked_func = MagicMock()
wrapped_func = ensure_disk_not_overutilize(mocked_func)

wrapped_func()

mocked_func.assert_called_once()
87 changes: 0 additions & 87 deletions tests/unit/neptune/new/internal/utils/test_full_disk.py

This file was deleted.

Loading