From bae61d95c01460268e02846a82f77d330c20193d Mon Sep 17 00:00:00 2001 From: xwjiang2010 <87673679+xwjiang2010@users.noreply.github.com> Date: Tue, 7 Feb 2023 22:44:17 -0800 Subject: [PATCH] [release] Improve handle_result in case of empty fetched result. (#32055) Improve handle_result (result alert logic) for release tests in case when the fetched result is empty due to infra issues. For example if job server on the cluster is down (which we rely on to get files back to buildkite runners). Without this, the error code indicates application error, which is misleading. See an example here: https://buildkite.com/ray-project/release-tests-branch/builds/1318#0185fc29-1d4c-483a-999b-ede500781c7a Signed-off-by: xwjiang2010 --- release/ray_release/alerts/handle.py | 25 +++++++++++---- .../command_runner/client_runner.py | 4 +-- .../ray_release/command_runner/job_runner.py | 4 +-- .../ray_release/command_runner/sdk_runner.py | 4 +-- release/ray_release/exception.py | 4 +-- release/ray_release/glue.py | 29 ++++++++++------- release/ray_release/result.py | 3 +- release/ray_release/tests/test_glue.py | 31 +++++++++++++++---- 8 files changed, 71 insertions(+), 33 deletions(-) diff --git a/release/ray_release/alerts/handle.py b/release/ray_release/alerts/handle.py index ab2f20829b34..d880043a7edb 100644 --- a/release/ray_release/alerts/handle.py +++ b/release/ray_release/alerts/handle.py @@ -12,15 +12,28 @@ ) +# The second bit in the tuple indicates whether a result is required to pass the alert. +# If true, the release test will throw a FetchResultError when result cannot be fetched +# successfully. result_to_handle_map = { - "default": default.handle_result, - "long_running_tests": long_running_tests.handle_result, - "rllib_tests": rllib_tests.handle_result, - "tune_tests": tune_tests.handle_result, - "xgboost_tests": xgboost_tests.handle_result, + "default": (default.handle_result, False), + "long_running_tests": ( + long_running_tests.handle_result, + True, + ), + "rllib_tests": (rllib_tests.handle_result, False), + "tune_tests": (tune_tests.handle_result, True), + "xgboost_tests": (xgboost_tests.handle_result, True), } +def require_result(test: Test) -> bool: + alert_suite = test.get("alert", "default") + if alert_suite not in result_to_handle_map: + raise ReleaseTestConfigError(f"Alert suite {alert_suite} not found.") + return result_to_handle_map[alert_suite][1] + + def handle_result(test: Test, result: Result): alert_suite = test.get("alert", "default") @@ -32,7 +45,7 @@ def handle_result(test: Test, result: Result): if alert_suite not in result_to_handle_map: raise ReleaseTestConfigError(f"Alert suite {alert_suite} not found.") - handler = result_to_handle_map[alert_suite] + handler = result_to_handle_map[alert_suite][0] error = handler(test, result) if error: diff --git a/release/ray_release/command_runner/client_runner.py b/release/ray_release/command_runner/client_runner.py index 27901e3d39b8..dbf84586068b 100644 --- a/release/ray_release/command_runner/client_runner.py +++ b/release/ray_release/command_runner/client_runner.py @@ -13,7 +13,7 @@ from ray_release.cluster_manager.cluster_manager import ClusterManager from ray_release.exception import ( - ResultsError, + FetchResultError, LocalEnvSetupError, ClusterNodesWaitTimeout, CommandTimeout, @@ -121,7 +121,7 @@ def _fetch_json(self, path: str) -> Dict[str, Any]: with open(path, "rt") as fp: return json.load(fp) except Exception as e: - raise ResultsError( + raise FetchResultError( f"Could not load local results from client command: {e}" ) from e diff --git a/release/ray_release/command_runner/job_runner.py b/release/ray_release/command_runner/job_runner.py index c199bd8682af..50af91fef683 100644 --- a/release/ray_release/command_runner/job_runner.py +++ b/release/ray_release/command_runner/job_runner.py @@ -11,7 +11,7 @@ CommandTimeout, LocalEnvSetupError, LogsError, - ResultsError, + FetchResultError, ) from ray_release.file_manager.file_manager import FileManager from ray_release.job_manager import JobManager @@ -138,7 +138,7 @@ def _fetch_json(self, path: str) -> Dict[str, Any]: os.unlink(tmpfile) return data except Exception as e: - raise ResultsError(f"Could not fetch results from session: {e}") from e + raise FetchResultError(f"Could not fetch results from session: {e}") from e def fetch_results(self) -> Dict[str, Any]: return self._fetch_json(self.result_output_json) diff --git a/release/ray_release/command_runner/sdk_runner.py b/release/ray_release/command_runner/sdk_runner.py index ef40f0d3bbcf..6a68454f3ce3 100644 --- a/release/ray_release/command_runner/sdk_runner.py +++ b/release/ray_release/command_runner/sdk_runner.py @@ -13,7 +13,7 @@ CommandTimeout, LogsError, RemoteEnvSetupError, - ResultsError, + FetchResultError, ) from ray_release.file_manager.file_manager import FileManager from ray_release.logger import logger @@ -189,7 +189,7 @@ def _fetch_json(self, path: str) -> Dict[str, Any]: os.unlink(tmpfile) return data except Exception as e: - raise ResultsError(f"Could not fetch results from session: {e}") from e + raise FetchResultError(f"Could not fetch results from session: {e}") from e def fetch_results(self) -> Dict[str, Any]: return self._fetch_json(self.result_output_json) diff --git a/release/ray_release/exception.py b/release/ray_release/exception.py index e00b9c69a4d8..eccf038a0719 100644 --- a/release/ray_release/exception.py +++ b/release/ray_release/exception.py @@ -129,8 +129,8 @@ class TestCommandError(CommandError): exit_code = ExitCode.COMMAND_ERROR -class ResultsError(CommandError): - pass +class FetchResultError(FileManagerError): + exit_code = ExitCode.FETCH_RESULT_ERROR class LogsError(CommandError): diff --git a/release/ray_release/glue.py b/release/ray_release/glue.py index 2771fb35e767..e8b924c02ca3 100644 --- a/release/ray_release/glue.py +++ b/release/ray_release/glue.py @@ -2,7 +2,7 @@ import time from typing import Optional, List -from ray_release.alerts.handle import handle_result +from ray_release.alerts.handle import handle_result, require_result from ray_release.anyscale_util import get_cluster_name from ray_release.buildkite.output import buildkite_group, buildkite_open_last from ray_release.cluster_manager.full import FullClusterManager @@ -167,6 +167,8 @@ def run_release_test( raise ReleaseTestSetupError(f"Error setting up release test: {e}") from e pipeline_exception = None + # non critical for some tests. So separate it from the general one. + fetch_result_exception = None try: # Load configs cluster_env = load_test_cluster_env(test, ray_wheels_url=ray_wheels_url) @@ -321,9 +323,9 @@ def run_release_test( try: command_results = command_runner.fetch_results() except Exception as e: - logger.error("Could not fetch results for test command") - logger.exception(e) + logger.exception(f"Could not fetch results for test command: {e}") command_results = {} + fetch_result_exception = e # Postprocess result: if "last_update" in command_results: @@ -357,7 +359,7 @@ def run_release_test( try: last_logs = command_runner.get_last_logs() except Exception as e: - logger.error(f"Error fetching logs: {e}") + logger.exception(f"Error fetching logs: {e}") last_logs = "No logs could be retrieved." result.last_logs = last_logs @@ -367,7 +369,7 @@ def run_release_test( try: cluster_manager.terminate_cluster(wait=False) except Exception as e: - logger.error(f"Could not terminate cluster: {e}") + logger.exception(f"Could not terminate cluster: {e}") time_taken = time.monotonic() - start_time result.runtime = time_taken @@ -376,12 +378,15 @@ def run_release_test( os.chdir(old_wd) if not pipeline_exception: - buildkite_group(":mag: Interpreting results") - # Only handle results if we didn't run into issues earlier - try: - handle_result(test, result) - except Exception as e: - pipeline_exception = e + if require_result(test) and fetch_result_exception: + pipeline_exception = fetch_result_exception + else: + buildkite_group(":mag: Interpreting results") + # Only handle results if we didn't run into issues earlier + try: + handle_result(test, result) + except Exception as e: + pipeline_exception = e if pipeline_exception: buildkite_group(":rotating_light: Handling errors") @@ -398,7 +403,7 @@ def run_release_test( try: reporter.report_result(test, result) except Exception as e: - logger.error(f"Error reporting results via {type(reporter)}: {e}") + logger.exception(f"Error reporting results via {type(reporter)}: {e}") if pipeline_exception: raise pipeline_exception diff --git a/release/ray_release/result.py b/release/ray_release/result.py index 6acf371550cd..8f24339b0380 100644 --- a/release/ray_release/result.py +++ b/release/ray_release/result.py @@ -44,6 +44,7 @@ class ExitCode(enum.Enum): CLUSTER_STARTUP_ERROR = 15 LOCAL_ENV_SETUP_ERROR = 16 REMOTE_ENV_SETUP_ERROR = 17 + FETCH_RESULT_ERROR = 18 # ANYSCALE_SDK_ERROR = 19 # Infra timeouts (retryable) @@ -52,7 +53,7 @@ class ExitCode(enum.Enum): CLUSTER_STARTUP_TIMEOUT = 32 CLUSTER_WAIT_TIMEOUT = 33 - # Command errors + # Command errors - these are considered application errors COMMAND_ERROR = 40 COMMAND_ALERT = 41 COMMAND_TIMEOUT = 42 diff --git a/release/ray_release/tests/test_glue.py b/release/ray_release/tests/test_glue.py index 630219965996..654516889be7 100644 --- a/release/ray_release/tests/test_glue.py +++ b/release/ray_release/tests/test_glue.py @@ -1,10 +1,11 @@ import os +import pytest import shutil import sys import tempfile import time -import unittest from typing import Type, Callable +import unittest from unittest.mock import patch from ray_release.alerts.handle import result_to_handle_map @@ -33,7 +34,7 @@ PrepareCommandTimeout, TestCommandError, TestCommandTimeout, - ResultsError, + FetchResultError, LogsError, ResultsAlert, ClusterNodesWaitTimeout, @@ -153,7 +154,7 @@ def __init__(self, cluster_manager: ClusterManager): def mock_alerter(test: Test, result: Result): return self.mock_alert_return - result_to_handle_map["unit_test_alerter"] = mock_alerter + result_to_handle_map["unit_test_alerter"] = (mock_alerter, False) type_str_to_command_runner["unit_test"] = MockCommandRunner command_runner_to_cluster_manager[MockCommandRunner] = MockClusterManager @@ -583,7 +584,7 @@ def testFetchResultFails(self): self._succeed_until("test_command") - self.command_runner_return["fetch_results"] = _fail_on_call(ResultsError) + self.command_runner_return["fetch_results"] = _fail_on_call(FetchResultError) with self.assertLogs(logger, "ERROR") as cm: self._run(result) self.assertTrue(any("Could not fetch results" in o for o in cm.output)) @@ -593,6 +594,26 @@ def testFetchResultFails(self): # Ensure cluster was terminated self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1) + def testFetchResultFailsReqNonEmptyResult(self): + # set `require_result` bit. + new_handler = (result_to_handle_map["unit_test_alerter"], True) + result_to_handle_map["unit_test_alerter"] = new_handler + + result = Result() + + self._succeed_until("test_command") + + self.command_runner_return["fetch_results"] = _fail_on_call(FetchResultError) + with self.assertRaisesRegex(FetchResultError, "Fail"): + with self.assertLogs(logger, "ERROR") as cm: + self._run(result) + self.assertTrue(any("Could not fetch results" in o for o in cm.output)) + self.assertEqual(result.return_code, ExitCode.FETCH_RESULT_ERROR.value) + self.assertEqual(result.status, "infra_error") + + # Ensure cluster was terminated, no matter what + self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1) + def testLastLogsFails(self): result = Result() @@ -647,6 +668,4 @@ def report_result(self, test: Test, result: Result): if __name__ == "__main__": - import pytest - sys.exit(pytest.main(["-v", __file__]))