Skip to content

Commit

Permalink
Add errors processor for async operations (#1539)
Browse files Browse the repository at this point in the history
Co-authored-by: Artsiom Tserashkovich <artsiom.tserashkovich@gmail.com>
  • Loading branch information
artsiomtserashkovich and Artsiom Tserashkovich authored Oct 19, 2023
1 parent b17d986 commit 79f6ef7
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Run async callback in new daemon thread ([#1521](https://github.com/neptune-ai/neptune-client/pull/1521))
- Better handle bool values of `git_ref` param in `init_run` ([#1525](https://github.com/neptune-ai/neptune-client/pull/1525))
- Updated management docstrings ([#1500](https://github.com/neptune-ai/neptune-client/pull/1500))
- Sample logging for series errors ([#1539](https://github.com/neptune-ai/neptune-client/pull/1539))

### Changes
- Safety (errors suppressing) execution mode ([#1503](https://github.com/neptune-ai/neptune-client/pull/1503))
Expand Down
3 changes: 3 additions & 0 deletions src/neptune/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK",
"NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK",
"NEPTUNE_DISABLE_PARENT_DIR_DELETION",
"NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS",
]

from neptune.common.envs import (
Expand Down Expand Up @@ -80,4 +81,6 @@

NEPTUNE_DISABLE_PARENT_DIR_DELETION = "NEPTUNE_DISABLE_PARENT_DIR_DELETION"

NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS = "NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS"

S3_ENDPOINT_URL = "S3_ENDPOINT_URL"
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
OperationStorage,
get_container_dir,
)
from neptune.internal.operation_processors.operations_errors_processor import OperationsErrorsProcessor
from neptune.internal.operation_processors.utils import common_metadata
from neptune.internal.threading.daemon import Daemon
from neptune.internal.utils.disk_full import ensure_disk_not_full
Expand Down Expand Up @@ -280,6 +281,7 @@ def __init__(
):
super().__init__(sleep_time=sleep_time, name="NeptuneAsyncOpProcessor")
self._processor: "AsyncOperationProcessor" = processor
self._errors_processor: OperationsErrorsProcessor = OperationsErrorsProcessor()
self._batch_size: int = batch_size
self._last_flush: float = 0.0
self._no_progress_exceeded: bool = False
Expand Down Expand Up @@ -346,11 +348,7 @@ def process_batch(self, batch: List[Operation], version: int) -> None:
self._processor._last_ack = monotonic()
self._processor._lag_exceeded = False

for error in errors:
logger.error(
"Error occurred during asynchronous operation processing: %s",
error,
)
self._errors_processor.handle(errors)

self._processor._consumed_version = version_to_ack

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import os
import re
from typing import (
List,
Set,
)

from neptune.common.exceptions import NeptuneException
from neptune.envs import NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS
from neptune.exceptions import MetadataInconsistency
from neptune.internal.utils.logger import logger


class OperationsErrorsProcessor:
def __init__(self) -> None:
self._sampling_enabled = os.getenv(NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS, "false").lower() in ("true", "1", "t")
self._error_sampling_exp = re.compile(
r"X-coordinates \(step\) must be strictly increasing for series attribute: (.*)\. Invalid point: (.*)"
)
self._logged_steps: Set[str] = set()

def handle(self, errors: List[NeptuneException]) -> None:
for error in errors:
if self._sampling_enabled and isinstance(error, MetadataInconsistency):
match_exp = self._error_sampling_exp.match(str(error))
if match_exp:
self._handle_not_increased_error_for_step(error, match_exp.group(2))
continue

logger.error("Error occurred during asynchronous operation processing: %s", str(error))

def _handle_not_increased_error_for_step(self, error: MetadataInconsistency, step: str) -> None:
if step not in self._logged_steps:
self._logged_steps.add(step)
logger.error(
f"Error occurred during asynchronous operation processing: {str(error)}. "
+ f"Suppressing other errors for step: {step}."
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import os
from unittest.mock import patch

from neptune.common.exceptions import NeptuneException
from neptune.envs import NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS
from neptune.exceptions import MetadataInconsistency
from neptune.internal.operation_processors.operations_errors_processor import OperationsErrorsProcessor


class TestOperationsErrorsProcessor:
@patch.dict(os.environ, {NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS: "True"})
def test_sample_only_repeated_steps(self, capsys):
processor = OperationsErrorsProcessor()
duplicated_errors = [
MetadataInconsistency(
"X-coordinates (step) must be strictly increasing for series attribute: a. Invalid point: 2.0"
),
MetadataInconsistency(
"X-coordinates (step) must be strictly increasing for series attribute: b. Invalid point: 2.0"
),
MetadataInconsistency(
"X-coordinates (step) must be strictly increasing for series attribute: c. Invalid point: 2.0"
),
]

processor.handle(errors=duplicated_errors)

captured = capsys.readouterr()
assert str(duplicated_errors[0]) in captured.out
assert str(duplicated_errors[1]) not in captured.out
assert str(duplicated_errors[2]) not in captured.out

@patch.dict(os.environ, {NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS: "True"})
def test_not_affect_other_errors(self, capsys):
processor = OperationsErrorsProcessor()
duplicated_errors = list(
[
MetadataInconsistency("X-coordinates (step) must be strictly increasing for series attribute: a."),
NeptuneException("General error"),
MetadataInconsistency("X-coordinates (step) must be strictly increasing for series attribute: a."),
]
)

processor.handle(errors=duplicated_errors)

captured = capsys.readouterr()
assert str(duplicated_errors[0]) in captured.out
assert str(duplicated_errors[1]) in captured.out
assert str(duplicated_errors[2]) in captured.out

@patch.dict(os.environ, {NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS: "False"})
def test_not_sample_when_disabled(self, capsys):
processor = OperationsErrorsProcessor()
duplicated_errors = [
MetadataInconsistency(
"X-coordinates (step) must be strictly increasing for series attribute: a. Invalid point: 2.0"
),
MetadataInconsistency(
"X-coordinates (step) must be strictly increasing for series attribute: b. Invalid point: 2.0"
),
MetadataInconsistency(
"X-coordinates (step) must be strictly increasing for series attribute: c. Invalid point: 2.0"
),
]

processor.handle(errors=duplicated_errors)

captured = capsys.readouterr()
assert str(duplicated_errors[0]) in captured.out
assert str(duplicated_errors[1]) in captured.out
assert str(duplicated_errors[2]) in captured.out

0 comments on commit 79f6ef7

Please sign in to comment.