diff --git a/CHANGELOG.md b/CHANGELOG.md index c8ca1a734..46bd84d91 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,8 @@ - Skip and warn about unsupported float values (infinity, negative infinity, NaN) in logging floats ([#1542](https://github.com/neptune-ai/neptune-client/pull/1542)) - Move error handling to a separate method in `AsyncOperationProcessor` ([#1553](https://github.com/neptune-ai/neptune-client/pull/1553)) - Abstract parts of logic to separate methods for AsyncOperationProcessor ([#1557](https://github.com/neptune-ai/neptune-client/pull/1557)) +- Rework disk utilization check ([#1549](https://github.com/neptune-ai/neptune-client/pull/1549)) + ## neptune 1.8.2 diff --git a/src/neptune/internal/operation_processors/async_operation_processor.py b/src/neptune/internal/operation_processors/async_operation_processor.py index 1aa8a8326..48b66a9e1 100644 --- a/src/neptune/internal/operation_processors/async_operation_processor.py +++ b/src/neptune/internal/operation_processors/async_operation_processor.py @@ -51,7 +51,7 @@ ) from neptune.internal.operation_processors.utils import common_metadata from neptune.internal.threading.daemon import Daemon -from neptune.internal.utils.disk_full import ensure_disk_not_full +from neptune.internal.utils.disk_utilization import ensure_disk_not_overutilize from neptune.internal.utils.logger import logger if TYPE_CHECKING: @@ -120,7 +120,7 @@ def _init_data_path(container_id: "UniqueId", container_type: "ContainerType") - path_suffix = f"exec-{now.timestamp()}-{now.strftime('%Y-%m-%d_%H.%M.%S.%f')}-{os.getpid()}" return get_container_dir(ASYNC_DIRECTORY, container_id, container_type, path_suffix) - @ensure_disk_not_full + @ensure_disk_not_overutilize def enqueue_operation(self, op: Operation, *, wait: bool) -> None: self._last_version = self._queue.put(op) diff --git a/src/neptune/internal/operation_processors/offline_operation_processor.py b/src/neptune/internal/operation_processors/offline_operation_processor.py index 6d3814ccb..3a31a567d 100644 --- a/src/neptune/internal/operation_processors/offline_operation_processor.py +++ b/src/neptune/internal/operation_processors/offline_operation_processor.py @@ -33,7 +33,7 @@ get_container_dir, ) from neptune.internal.operation_processors.utils import common_metadata -from neptune.internal.utils.disk_full import ensure_disk_not_full +from neptune.internal.utils.disk_utilization import ensure_disk_not_overutilize if TYPE_CHECKING: import threading @@ -60,7 +60,7 @@ def __init__(self, container_id: "UniqueId", container_type: "ContainerType", lo def _init_data_path(container_id: "UniqueId", container_type: "ContainerType") -> "Path": return get_container_dir(OFFLINE_DIRECTORY, container_id, container_type) - @ensure_disk_not_full + @ensure_disk_not_overutilize def enqueue_operation(self, op: Operation, *, wait: bool) -> None: self._queue.put(op) diff --git a/src/neptune/internal/operation_processors/sync_operation_processor.py b/src/neptune/internal/operation_processors/sync_operation_processor.py index fb71a4e23..885fa1251 100644 --- a/src/neptune/internal/operation_processors/sync_operation_processor.py +++ b/src/neptune/internal/operation_processors/sync_operation_processor.py @@ -30,7 +30,7 @@ get_container_dir, ) from neptune.internal.operation_processors.utils import common_metadata -from neptune.internal.utils.disk_full import ensure_disk_not_full +from neptune.internal.utils.disk_utilization import ensure_disk_not_overutilize if TYPE_CHECKING: from pathlib import Path @@ -60,7 +60,7 @@ def _init_data_path(container_id: "UniqueId", container_type: "ContainerType") - process_path = f"exec-{now.timestamp()}-{now.strftime('%Y-%m-%d_%H.%M.%S.%f')}-{os.getpid()}" return get_container_dir(SYNC_DIRECTORY, container_id, container_type, process_path) - @ensure_disk_not_full + @ensure_disk_not_overutilize def enqueue_operation(self, op: "Operation", *, wait: bool) -> None: _, errors = self._backend.execute_operations( container_id=self._container_id, diff --git a/src/neptune/internal/utils/disk_full.py b/src/neptune/internal/utils/disk_utilization.py similarity index 52% rename from src/neptune/internal/utils/disk_full.py rename to src/neptune/internal/utils/disk_utilization.py index 64265981d..a0d138126 100644 --- a/src/neptune/internal/utils/disk_full.py +++ b/src/neptune/internal/utils/disk_utilization.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__all__ = ["ensure_disk_not_full"] +__all__ = ["ensure_disk_not_overutilize"] import os @@ -44,26 +44,40 @@ def get_neptune_data_directory() -> str: return os.getenv("NEPTUNE_DATA_DIRECTORY", NEPTUNE_DATA_DIRECTORY) -def get_disk_utilization_percent(path: Optional[str] = None) -> float: +def get_disk_utilization_percent(path: Optional[str] = None) -> Optional[float]: try: if path is None: path = get_neptune_data_directory() return float(psutil.disk_usage(path).percent) - except (ValueError, UnicodeEncodeError): - return 0 + except (ValueError, TypeError, Error): + return None -def get_max_percentage_from_env() -> Optional[float]: - value = os.getenv(NEPTUNE_MAX_DISK_UTILIZATION) - if value is not None: - return float(value) - return None +def get_max_disk_utilization_from_env() -> Optional[float]: + env_limit_disk_utilization = os.getenv(NEPTUNE_MAX_DISK_UTILIZATION) + if env_limit_disk_utilization is None: + return None + + try: + limit_disk_utilization = float(env_limit_disk_utilization) + if limit_disk_utilization <= 0 or limit_disk_utilization > 100: + raise ValueError + + return limit_disk_utilization + except (ValueError, TypeError): + warn_once( + f"Provided invalid value of '{NEPTUNE_MAX_DISK_UTILIZATION}': '{env_limit_disk_utilization}'. " + "Check of disk utilization will not be applied.", + exception=NeptuneWarning, + ) + return None -def ensure_disk_not_full(func: Callable[..., None]) -> Callable[..., None]: - non_raising_on_disk_issue = NEPTUNE_NON_RAISING_ON_DISK_ISSUE in os.environ - max_disk_utilization = get_max_percentage_from_env() + +def ensure_disk_not_overutilize(func: Callable[..., None]) -> Callable[..., None]: + non_raising_on_disk_issue = os.getenv(NEPTUNE_NON_RAISING_ON_DISK_ISSUE, "false").lower() in ("true", "t", "1") + max_disk_utilization = get_max_disk_utilization_from_env() @wraps(func) def wrapper(*args: Tuple, **kwargs: Dict[str, Any]) -> None: @@ -71,17 +85,25 @@ def wrapper(*args: Tuple, **kwargs: Dict[str, Any]) -> None: try: if max_disk_utilization: current_utilization = get_disk_utilization_percent() + if current_utilization is None: + warn_once( + "Encountered disk issue during utilization check. Neptune will not save your data.", + exception=NeptuneWarning, + ) + return + if current_utilization >= max_disk_utilization: warn_once( - f"Max disk utilization {max_disk_utilization}% exceeded with {current_utilization}." - f" Neptune will not be saving your data.", + f"Disk usage is at {current_utilization}%, which exceeds the maximum allowed utilization " + + "of {max_disk_utilization}%. Neptune will not save your data.", exception=NeptuneWarning, ) return func(*args, **kwargs) except (OSError, Error): - warn_once("Encountered disk issue and Neptune will not be saving your data.", exception=NeptuneWarning) + warn_once("Encountered disk issue. Neptune will not save your data.", exception=NeptuneWarning) + else: return func(*args, **kwargs) diff --git a/tests/unit/neptune/new/internal/utils/test_disk_utilization.py b/tests/unit/neptune/new/internal/utils/test_disk_utilization.py new file mode 100644 index 000000000..40a690654 --- /dev/null +++ b/tests/unit/neptune/new/internal/utils/test_disk_utilization.py @@ -0,0 +1,134 @@ +# +# Copyright (c) 2023, Neptune Labs Sp. z o.o. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import os +import unittest +import warnings +from io import UnsupportedOperation + +from mock import ( + MagicMock, + patch, +) +from psutil import ( + AccessDenied, + Error, +) + +from neptune.envs import ( + NEPTUNE_MAX_DISK_UTILIZATION, + NEPTUNE_NON_RAISING_ON_DISK_ISSUE, +) +from neptune.internal.utils.disk_utilization import ensure_disk_not_overutilize + + +class TestDiskUtilization(unittest.TestCase): + @patch.dict(os.environ, {NEPTUNE_NON_RAISING_ON_DISK_ISSUE: "True"}) + def test_handle_invalid_env_values(self): + for value in ["True", "101", "-1"]: + with patch.dict(os.environ, {NEPTUNE_MAX_DISK_UTILIZATION: value}, clear=True): + mocked_func = MagicMock() + with warnings.catch_warnings(record=True) as warns: + wrapped_func = ensure_disk_not_overutilize(mocked_func) + wrapped_func() + + assert len(warns) == 1 + assert f"invalid value of '{NEPTUNE_MAX_DISK_UTILIZATION}': '{value}" in str(warns[-1].message) + mocked_func.assert_called_once() + + # Catching OSError that's base error for all OS and IO errors. More info here: https://peps.python.org/pep-3151 + # Additionally, catching specific psutil's base error - psutil.Error. + # More info about psutil.Error here: https://psutil.readthedocs.io/en/latest/index.html#psutil.Error + @patch.dict(os.environ, {NEPTUNE_NON_RAISING_ON_DISK_ISSUE: "True"}) + def test_suppressing_of_func_errors(self): + disk_errors = [ + OSError(), + IOError(), + EnvironmentError(), + UnsupportedOperation(), + Error(), + AccessDenied(), + ] + for error in disk_errors: + mocked_func = MagicMock() + wrapped_func = ensure_disk_not_overutilize(mocked_func) + mocked_func.side_effect = error + + wrapped_func() # asserting is not required as expecting that any error will be caught + mocked_func.assert_called_once() + + non_disk_errors = [OverflowError(), AttributeError()] + for error in non_disk_errors: + mocked_func = MagicMock() + wrapped_func = ensure_disk_not_overutilize(mocked_func) + mocked_func.side_effect = error + + with self.assertRaises(BaseException): + wrapped_func() + mocked_func.assert_called_once() + + @patch.dict(os.environ, {NEPTUNE_NON_RAISING_ON_DISK_ISSUE: "True"}) + @patch.dict(os.environ, {NEPTUNE_MAX_DISK_UTILIZATION: "60"}) + @patch("psutil.disk_usage") + def test_suppressing_of_checking_utilization_errors(self, disk_usage_mock): + checking_errors = [ + TypeError(), + UnsupportedOperation(), + Error(), + AccessDenied(), + ] + for error in checking_errors: + mocked_func = MagicMock() + wrapped_func = ensure_disk_not_overutilize(mocked_func) + disk_usage_mock.side_effect = error + + wrapped_func() # asserting is not required as expecting that any error will be caught + mocked_func.assert_not_called() + + @patch.dict(os.environ, {NEPTUNE_NON_RAISING_ON_DISK_ISSUE: "True"}) + @patch.dict(os.environ, {NEPTUNE_MAX_DISK_UTILIZATION: "100"}) + @patch("psutil.disk_usage") + def test_not_called_with_usage_100_percent(self, disk_usage_mock): + disk_usage_mock.return_value.percent = 100 + mocked_func = MagicMock() + wrapped_func = ensure_disk_not_overutilize(mocked_func) + + wrapped_func() + + mocked_func.assert_not_called() + + @patch.dict(os.environ, {NEPTUNE_NON_RAISING_ON_DISK_ISSUE: "True"}) + @patch.dict(os.environ, {NEPTUNE_MAX_DISK_UTILIZATION: "100"}) + @patch("psutil.disk_usage") + def test_called_when_usage_less_than_limit(self, disk_usage_mock): + disk_usage_mock.return_value.percent = 99 + mocked_func = MagicMock() + wrapped_func = ensure_disk_not_overutilize(mocked_func) + + wrapped_func() + + mocked_func.assert_called_once() + + @patch.dict(os.environ, {NEPTUNE_NON_RAISING_ON_DISK_ISSUE: "False"}) + @patch.dict(os.environ, {NEPTUNE_MAX_DISK_UTILIZATION: "60"}) + @patch("psutil.disk_usage") + def test_not_called_when_(self, disk_usage_mock): + disk_usage_mock.return_value.percent = 99 + mocked_func = MagicMock() + wrapped_func = ensure_disk_not_overutilize(mocked_func) + + wrapped_func() + + mocked_func.assert_called_once() diff --git a/tests/unit/neptune/new/internal/utils/test_full_disk.py b/tests/unit/neptune/new/internal/utils/test_full_disk.py deleted file mode 100644 index 2fac3c03e..000000000 --- a/tests/unit/neptune/new/internal/utils/test_full_disk.py +++ /dev/null @@ -1,87 +0,0 @@ -# -# Copyright (c) 2023, Neptune Labs Sp. z o.o. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import os -import unittest -from io import UnsupportedOperation - -from mock import ( - MagicMock, - patch, -) -from psutil import ( - AccessDenied, - Error, -) - -from neptune.envs import NEPTUNE_NON_RAISING_ON_DISK_ISSUE -from neptune.internal.utils.disk_full import ensure_disk_not_full - - -class TestDiskFull(unittest.TestCase): - - # Catching OSError that's base error for all OS and IO errors. More info here: https://peps.python.org/pep-3151 - # Additionally, catching specific psutil's base error - psutil.Error. - # More info about psutil.Error here: https://psutil.readthedocs.io/en/latest/index.html#psutil.Error - @patch.dict(os.environ, {NEPTUNE_NON_RAISING_ON_DISK_ISSUE: "True"}) - @patch("neptune.internal.utils.disk_full.get_max_percentage_from_env") - @patch("neptune.internal.utils.disk_full.get_disk_utilization_percent") - def test_suppressing_of_env_errors(self, get_disk_utilization_percent, get_max_percentage_from_env): - get_max_percentage_from_env.return_value = 42 - - env_errors = [OSError(), IOError(), EnvironmentError(), UnsupportedOperation(), Error(), AccessDenied()] - for error in env_errors: - mocked_func = MagicMock() - wrapped_func = ensure_disk_not_full(mocked_func) - get_disk_utilization_percent.side_effect = error - - wrapped_func() # asserting is not required as expecting that any error will be caught - mocked_func.assert_not_called() - - non_env_errors = [ValueError(), OverflowError()] - for error in non_env_errors: - mocked_func = MagicMock() - wrapped_func = ensure_disk_not_full(mocked_func) - get_disk_utilization_percent.side_effect = error - - with self.assertRaises(BaseException): - wrapped_func() - mocked_func.assert_not_called() - - @patch.dict(os.environ, {NEPTUNE_NON_RAISING_ON_DISK_ISSUE: "True"}) - @patch("neptune.internal.utils.disk_full.get_max_percentage_from_env") - @patch("neptune.internal.utils.disk_full.get_disk_utilization_percent") - def test_not_called_with_usage_100_percent(self, get_disk_utilization_percent, get_max_percentage_from_env): - get_max_percentage_from_env.return_value = 100 - get_disk_utilization_percent.return_value = 100 - mocked_func = MagicMock() - wrapped_func = ensure_disk_not_full(mocked_func) - - wrapped_func() - - mocked_func.assert_not_called() - - @patch.dict(os.environ, {NEPTUNE_NON_RAISING_ON_DISK_ISSUE: "True"}) - @patch("neptune.internal.utils.disk_full.get_max_percentage_from_env") - @patch("neptune.internal.utils.disk_full.get_disk_utilization_percent") - def test_called_when_usage_less_than_limit(self, get_disk_utilization_percent, get_max_percentage_from_env): - get_max_percentage_from_env.return_value = 100 - get_disk_utilization_percent.return_value = 99 - mocked_func = MagicMock() - wrapped_func = ensure_disk_not_full(mocked_func) - - wrapped_func() - - mocked_func.assert_called_once()