diff --git a/release/ray_release/alerts/handle.py b/release/ray_release/alerts/handle.py index ab2f20829b34..d880043a7edb 100644 --- a/release/ray_release/alerts/handle.py +++ b/release/ray_release/alerts/handle.py @@ -12,15 +12,28 @@ ) +# The second bit in the tuple indicates whether a result is required to pass the alert. +# If true, the release test will throw a FetchResultError when result cannot be fetched +# successfully. result_to_handle_map = { - "default": default.handle_result, - "long_running_tests": long_running_tests.handle_result, - "rllib_tests": rllib_tests.handle_result, - "tune_tests": tune_tests.handle_result, - "xgboost_tests": xgboost_tests.handle_result, + "default": (default.handle_result, False), + "long_running_tests": ( + long_running_tests.handle_result, + True, + ), + "rllib_tests": (rllib_tests.handle_result, False), + "tune_tests": (tune_tests.handle_result, True), + "xgboost_tests": (xgboost_tests.handle_result, True), } +def require_result(test: Test) -> bool: + alert_suite = test.get("alert", "default") + if alert_suite not in result_to_handle_map: + raise ReleaseTestConfigError(f"Alert suite {alert_suite} not found.") + return result_to_handle_map[alert_suite][1] + + def handle_result(test: Test, result: Result): alert_suite = test.get("alert", "default") @@ -32,7 +45,7 @@ def handle_result(test: Test, result: Result): if alert_suite not in result_to_handle_map: raise ReleaseTestConfigError(f"Alert suite {alert_suite} not found.") - handler = result_to_handle_map[alert_suite] + handler = result_to_handle_map[alert_suite][0] error = handler(test, result) if error: diff --git a/release/ray_release/command_runner/client_runner.py b/release/ray_release/command_runner/client_runner.py index 27901e3d39b8..dbf84586068b 100644 --- a/release/ray_release/command_runner/client_runner.py +++ b/release/ray_release/command_runner/client_runner.py @@ -13,7 +13,7 @@ from ray_release.cluster_manager.cluster_manager import ClusterManager from ray_release.exception import ( - ResultsError, + FetchResultError, LocalEnvSetupError, ClusterNodesWaitTimeout, CommandTimeout, @@ -121,7 +121,7 @@ def _fetch_json(self, path: str) -> Dict[str, Any]: with open(path, "rt") as fp: return json.load(fp) except Exception as e: - raise ResultsError( + raise FetchResultError( f"Could not load local results from client command: {e}" ) from e diff --git a/release/ray_release/command_runner/job_runner.py b/release/ray_release/command_runner/job_runner.py index c199bd8682af..50af91fef683 100644 --- a/release/ray_release/command_runner/job_runner.py +++ b/release/ray_release/command_runner/job_runner.py @@ -11,7 +11,7 @@ CommandTimeout, LocalEnvSetupError, LogsError, - ResultsError, + FetchResultError, ) from ray_release.file_manager.file_manager import FileManager from ray_release.job_manager import JobManager @@ -138,7 +138,7 @@ def _fetch_json(self, path: str) -> Dict[str, Any]: os.unlink(tmpfile) return data except Exception as e: - raise ResultsError(f"Could not fetch results from session: {e}") from e + raise FetchResultError(f"Could not fetch results from session: {e}") from e def fetch_results(self) -> Dict[str, Any]: return self._fetch_json(self.result_output_json) diff --git a/release/ray_release/command_runner/sdk_runner.py b/release/ray_release/command_runner/sdk_runner.py index ef40f0d3bbcf..6a68454f3ce3 100644 --- a/release/ray_release/command_runner/sdk_runner.py +++ b/release/ray_release/command_runner/sdk_runner.py @@ -13,7 +13,7 @@ CommandTimeout, LogsError, RemoteEnvSetupError, - ResultsError, + FetchResultError, ) from ray_release.file_manager.file_manager import FileManager from ray_release.logger import logger @@ -189,7 +189,7 @@ def _fetch_json(self, path: str) -> Dict[str, Any]: os.unlink(tmpfile) return data except Exception as e: - raise ResultsError(f"Could not fetch results from session: {e}") from e + raise FetchResultError(f"Could not fetch results from session: {e}") from e def fetch_results(self) -> Dict[str, Any]: return self._fetch_json(self.result_output_json) diff --git a/release/ray_release/exception.py b/release/ray_release/exception.py index e00b9c69a4d8..eccf038a0719 100644 --- a/release/ray_release/exception.py +++ b/release/ray_release/exception.py @@ -129,8 +129,8 @@ class TestCommandError(CommandError): exit_code = ExitCode.COMMAND_ERROR -class ResultsError(CommandError): - pass +class FetchResultError(FileManagerError): + exit_code = ExitCode.FETCH_RESULT_ERROR class LogsError(CommandError): diff --git a/release/ray_release/glue.py b/release/ray_release/glue.py index 2771fb35e767..e8b924c02ca3 100644 --- a/release/ray_release/glue.py +++ b/release/ray_release/glue.py @@ -2,7 +2,7 @@ import time from typing import Optional, List -from ray_release.alerts.handle import handle_result +from ray_release.alerts.handle import handle_result, require_result from ray_release.anyscale_util import get_cluster_name from ray_release.buildkite.output import buildkite_group, buildkite_open_last from ray_release.cluster_manager.full import FullClusterManager @@ -167,6 +167,8 @@ def run_release_test( raise ReleaseTestSetupError(f"Error setting up release test: {e}") from e pipeline_exception = None + # non critical for some tests. So separate it from the general one. + fetch_result_exception = None try: # Load configs cluster_env = load_test_cluster_env(test, ray_wheels_url=ray_wheels_url) @@ -321,9 +323,9 @@ def run_release_test( try: command_results = command_runner.fetch_results() except Exception as e: - logger.error("Could not fetch results for test command") - logger.exception(e) + logger.exception(f"Could not fetch results for test command: {e}") command_results = {} + fetch_result_exception = e # Postprocess result: if "last_update" in command_results: @@ -357,7 +359,7 @@ def run_release_test( try: last_logs = command_runner.get_last_logs() except Exception as e: - logger.error(f"Error fetching logs: {e}") + logger.exception(f"Error fetching logs: {e}") last_logs = "No logs could be retrieved." result.last_logs = last_logs @@ -367,7 +369,7 @@ def run_release_test( try: cluster_manager.terminate_cluster(wait=False) except Exception as e: - logger.error(f"Could not terminate cluster: {e}") + logger.exception(f"Could not terminate cluster: {e}") time_taken = time.monotonic() - start_time result.runtime = time_taken @@ -376,12 +378,15 @@ def run_release_test( os.chdir(old_wd) if not pipeline_exception: - buildkite_group(":mag: Interpreting results") - # Only handle results if we didn't run into issues earlier - try: - handle_result(test, result) - except Exception as e: - pipeline_exception = e + if require_result(test) and fetch_result_exception: + pipeline_exception = fetch_result_exception + else: + buildkite_group(":mag: Interpreting results") + # Only handle results if we didn't run into issues earlier + try: + handle_result(test, result) + except Exception as e: + pipeline_exception = e if pipeline_exception: buildkite_group(":rotating_light: Handling errors") @@ -398,7 +403,7 @@ def run_release_test( try: reporter.report_result(test, result) except Exception as e: - logger.error(f"Error reporting results via {type(reporter)}: {e}") + logger.exception(f"Error reporting results via {type(reporter)}: {e}") if pipeline_exception: raise pipeline_exception diff --git a/release/ray_release/result.py b/release/ray_release/result.py index 6acf371550cd..8f24339b0380 100644 --- a/release/ray_release/result.py +++ b/release/ray_release/result.py @@ -44,6 +44,7 @@ class ExitCode(enum.Enum): CLUSTER_STARTUP_ERROR = 15 LOCAL_ENV_SETUP_ERROR = 16 REMOTE_ENV_SETUP_ERROR = 17 + FETCH_RESULT_ERROR = 18 # ANYSCALE_SDK_ERROR = 19 # Infra timeouts (retryable) @@ -52,7 +53,7 @@ class ExitCode(enum.Enum): CLUSTER_STARTUP_TIMEOUT = 32 CLUSTER_WAIT_TIMEOUT = 33 - # Command errors + # Command errors - these are considered application errors COMMAND_ERROR = 40 COMMAND_ALERT = 41 COMMAND_TIMEOUT = 42 diff --git a/release/ray_release/tests/test_glue.py b/release/ray_release/tests/test_glue.py index 630219965996..654516889be7 100644 --- a/release/ray_release/tests/test_glue.py +++ b/release/ray_release/tests/test_glue.py @@ -1,10 +1,11 @@ import os +import pytest import shutil import sys import tempfile import time -import unittest from typing import Type, Callable +import unittest from unittest.mock import patch from ray_release.alerts.handle import result_to_handle_map @@ -33,7 +34,7 @@ PrepareCommandTimeout, TestCommandError, TestCommandTimeout, - ResultsError, + FetchResultError, LogsError, ResultsAlert, ClusterNodesWaitTimeout, @@ -153,7 +154,7 @@ def __init__(self, cluster_manager: ClusterManager): def mock_alerter(test: Test, result: Result): return self.mock_alert_return - result_to_handle_map["unit_test_alerter"] = mock_alerter + result_to_handle_map["unit_test_alerter"] = (mock_alerter, False) type_str_to_command_runner["unit_test"] = MockCommandRunner command_runner_to_cluster_manager[MockCommandRunner] = MockClusterManager @@ -583,7 +584,7 @@ def testFetchResultFails(self): self._succeed_until("test_command") - self.command_runner_return["fetch_results"] = _fail_on_call(ResultsError) + self.command_runner_return["fetch_results"] = _fail_on_call(FetchResultError) with self.assertLogs(logger, "ERROR") as cm: self._run(result) self.assertTrue(any("Could not fetch results" in o for o in cm.output)) @@ -593,6 +594,26 @@ def testFetchResultFails(self): # Ensure cluster was terminated self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1) + def testFetchResultFailsReqNonEmptyResult(self): + # set `require_result` bit. + new_handler = (result_to_handle_map["unit_test_alerter"], True) + result_to_handle_map["unit_test_alerter"] = new_handler + + result = Result() + + self._succeed_until("test_command") + + self.command_runner_return["fetch_results"] = _fail_on_call(FetchResultError) + with self.assertRaisesRegex(FetchResultError, "Fail"): + with self.assertLogs(logger, "ERROR") as cm: + self._run(result) + self.assertTrue(any("Could not fetch results" in o for o in cm.output)) + self.assertEqual(result.return_code, ExitCode.FETCH_RESULT_ERROR.value) + self.assertEqual(result.status, "infra_error") + + # Ensure cluster was terminated, no matter what + self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1) + def testLastLogsFails(self): result = Result() @@ -647,6 +668,4 @@ def report_result(self, test: Test, result: Result): if __name__ == "__main__": - import pytest - sys.exit(pytest.main(["-v", __file__]))