Skip to content

Commit

Permalink
[release] Improve handle_result in case of empty fetched result. (#32055
Browse files Browse the repository at this point in the history
)

Improve handle_result (result alert logic) for release tests in case when the fetched result is empty due to infra issues. For example if job server on the cluster is down (which we rely on to get files back to buildkite runners).

Without this, the error code indicates application error, which is misleading.
See an example here: https://buildkite.com/ray-project/release-tests-branch/builds/1318#0185fc29-1d4c-483a-999b-ede500781c7a

Signed-off-by: xwjiang2010 <xwjiang2010@gmail.com>
  • Loading branch information
xwjiang2010 authored Feb 8, 2023
1 parent e84fcb1 commit bae61d9
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 33 deletions.
25 changes: 19 additions & 6 deletions release/ray_release/alerts/handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,28 @@
)


# The second bit in the tuple indicates whether a result is required to pass the alert.
# If true, the release test will throw a FetchResultError when result cannot be fetched
# successfully.
result_to_handle_map = {
"default": default.handle_result,
"long_running_tests": long_running_tests.handle_result,
"rllib_tests": rllib_tests.handle_result,
"tune_tests": tune_tests.handle_result,
"xgboost_tests": xgboost_tests.handle_result,
"default": (default.handle_result, False),
"long_running_tests": (
long_running_tests.handle_result,
True,
),
"rllib_tests": (rllib_tests.handle_result, False),
"tune_tests": (tune_tests.handle_result, True),
"xgboost_tests": (xgboost_tests.handle_result, True),
}


def require_result(test: Test) -> bool:
alert_suite = test.get("alert", "default")
if alert_suite not in result_to_handle_map:
raise ReleaseTestConfigError(f"Alert suite {alert_suite} not found.")
return result_to_handle_map[alert_suite][1]


def handle_result(test: Test, result: Result):
alert_suite = test.get("alert", "default")

Expand All @@ -32,7 +45,7 @@ def handle_result(test: Test, result: Result):
if alert_suite not in result_to_handle_map:
raise ReleaseTestConfigError(f"Alert suite {alert_suite} not found.")

handler = result_to_handle_map[alert_suite]
handler = result_to_handle_map[alert_suite][0]
error = handler(test, result)

if error:
Expand Down
4 changes: 2 additions & 2 deletions release/ray_release/command_runner/client_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from ray_release.cluster_manager.cluster_manager import ClusterManager
from ray_release.exception import (
ResultsError,
FetchResultError,
LocalEnvSetupError,
ClusterNodesWaitTimeout,
CommandTimeout,
Expand Down Expand Up @@ -121,7 +121,7 @@ def _fetch_json(self, path: str) -> Dict[str, Any]:
with open(path, "rt") as fp:
return json.load(fp)
except Exception as e:
raise ResultsError(
raise FetchResultError(
f"Could not load local results from client command: {e}"
) from e

Expand Down
4 changes: 2 additions & 2 deletions release/ray_release/command_runner/job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
CommandTimeout,
LocalEnvSetupError,
LogsError,
ResultsError,
FetchResultError,
)
from ray_release.file_manager.file_manager import FileManager
from ray_release.job_manager import JobManager
Expand Down Expand Up @@ -138,7 +138,7 @@ def _fetch_json(self, path: str) -> Dict[str, Any]:
os.unlink(tmpfile)
return data
except Exception as e:
raise ResultsError(f"Could not fetch results from session: {e}") from e
raise FetchResultError(f"Could not fetch results from session: {e}") from e

def fetch_results(self) -> Dict[str, Any]:
return self._fetch_json(self.result_output_json)
Expand Down
4 changes: 2 additions & 2 deletions release/ray_release/command_runner/sdk_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
CommandTimeout,
LogsError,
RemoteEnvSetupError,
ResultsError,
FetchResultError,
)
from ray_release.file_manager.file_manager import FileManager
from ray_release.logger import logger
Expand Down Expand Up @@ -189,7 +189,7 @@ def _fetch_json(self, path: str) -> Dict[str, Any]:
os.unlink(tmpfile)
return data
except Exception as e:
raise ResultsError(f"Could not fetch results from session: {e}") from e
raise FetchResultError(f"Could not fetch results from session: {e}") from e

def fetch_results(self) -> Dict[str, Any]:
return self._fetch_json(self.result_output_json)
Expand Down
4 changes: 2 additions & 2 deletions release/ray_release/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ class TestCommandError(CommandError):
exit_code = ExitCode.COMMAND_ERROR


class ResultsError(CommandError):
pass
class FetchResultError(FileManagerError):
exit_code = ExitCode.FETCH_RESULT_ERROR


class LogsError(CommandError):
Expand Down
29 changes: 17 additions & 12 deletions release/ray_release/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time
from typing import Optional, List

from ray_release.alerts.handle import handle_result
from ray_release.alerts.handle import handle_result, require_result
from ray_release.anyscale_util import get_cluster_name
from ray_release.buildkite.output import buildkite_group, buildkite_open_last
from ray_release.cluster_manager.full import FullClusterManager
Expand Down Expand Up @@ -167,6 +167,8 @@ def run_release_test(
raise ReleaseTestSetupError(f"Error setting up release test: {e}") from e

pipeline_exception = None
# non critical for some tests. So separate it from the general one.
fetch_result_exception = None
try:
# Load configs
cluster_env = load_test_cluster_env(test, ray_wheels_url=ray_wheels_url)
Expand Down Expand Up @@ -321,9 +323,9 @@ def run_release_test(
try:
command_results = command_runner.fetch_results()
except Exception as e:
logger.error("Could not fetch results for test command")
logger.exception(e)
logger.exception(f"Could not fetch results for test command: {e}")
command_results = {}
fetch_result_exception = e

# Postprocess result:
if "last_update" in command_results:
Expand Down Expand Up @@ -357,7 +359,7 @@ def run_release_test(
try:
last_logs = command_runner.get_last_logs()
except Exception as e:
logger.error(f"Error fetching logs: {e}")
logger.exception(f"Error fetching logs: {e}")
last_logs = "No logs could be retrieved."

result.last_logs = last_logs
Expand All @@ -367,7 +369,7 @@ def run_release_test(
try:
cluster_manager.terminate_cluster(wait=False)
except Exception as e:
logger.error(f"Could not terminate cluster: {e}")
logger.exception(f"Could not terminate cluster: {e}")

time_taken = time.monotonic() - start_time
result.runtime = time_taken
Expand All @@ -376,12 +378,15 @@ def run_release_test(
os.chdir(old_wd)

if not pipeline_exception:
buildkite_group(":mag: Interpreting results")
# Only handle results if we didn't run into issues earlier
try:
handle_result(test, result)
except Exception as e:
pipeline_exception = e
if require_result(test) and fetch_result_exception:
pipeline_exception = fetch_result_exception
else:
buildkite_group(":mag: Interpreting results")
# Only handle results if we didn't run into issues earlier
try:
handle_result(test, result)
except Exception as e:
pipeline_exception = e

if pipeline_exception:
buildkite_group(":rotating_light: Handling errors")
Expand All @@ -398,7 +403,7 @@ def run_release_test(
try:
reporter.report_result(test, result)
except Exception as e:
logger.error(f"Error reporting results via {type(reporter)}: {e}")
logger.exception(f"Error reporting results via {type(reporter)}: {e}")

if pipeline_exception:
raise pipeline_exception
Expand Down
3 changes: 2 additions & 1 deletion release/ray_release/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class ExitCode(enum.Enum):
CLUSTER_STARTUP_ERROR = 15
LOCAL_ENV_SETUP_ERROR = 16
REMOTE_ENV_SETUP_ERROR = 17
FETCH_RESULT_ERROR = 18
# ANYSCALE_SDK_ERROR = 19

# Infra timeouts (retryable)
Expand All @@ -52,7 +53,7 @@ class ExitCode(enum.Enum):
CLUSTER_STARTUP_TIMEOUT = 32
CLUSTER_WAIT_TIMEOUT = 33

# Command errors
# Command errors - these are considered application errors
COMMAND_ERROR = 40
COMMAND_ALERT = 41
COMMAND_TIMEOUT = 42
Expand Down
31 changes: 25 additions & 6 deletions release/ray_release/tests/test_glue.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import os
import pytest
import shutil
import sys
import tempfile
import time
import unittest
from typing import Type, Callable
import unittest
from unittest.mock import patch

from ray_release.alerts.handle import result_to_handle_map
Expand Down Expand Up @@ -33,7 +34,7 @@
PrepareCommandTimeout,
TestCommandError,
TestCommandTimeout,
ResultsError,
FetchResultError,
LogsError,
ResultsAlert,
ClusterNodesWaitTimeout,
Expand Down Expand Up @@ -153,7 +154,7 @@ def __init__(self, cluster_manager: ClusterManager):
def mock_alerter(test: Test, result: Result):
return self.mock_alert_return

result_to_handle_map["unit_test_alerter"] = mock_alerter
result_to_handle_map["unit_test_alerter"] = (mock_alerter, False)

type_str_to_command_runner["unit_test"] = MockCommandRunner
command_runner_to_cluster_manager[MockCommandRunner] = MockClusterManager
Expand Down Expand Up @@ -583,7 +584,7 @@ def testFetchResultFails(self):

self._succeed_until("test_command")

self.command_runner_return["fetch_results"] = _fail_on_call(ResultsError)
self.command_runner_return["fetch_results"] = _fail_on_call(FetchResultError)
with self.assertLogs(logger, "ERROR") as cm:
self._run(result)
self.assertTrue(any("Could not fetch results" in o for o in cm.output))
Expand All @@ -593,6 +594,26 @@ def testFetchResultFails(self):
# Ensure cluster was terminated
self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1)

def testFetchResultFailsReqNonEmptyResult(self):
# set `require_result` bit.
new_handler = (result_to_handle_map["unit_test_alerter"], True)
result_to_handle_map["unit_test_alerter"] = new_handler

result = Result()

self._succeed_until("test_command")

self.command_runner_return["fetch_results"] = _fail_on_call(FetchResultError)
with self.assertRaisesRegex(FetchResultError, "Fail"):
with self.assertLogs(logger, "ERROR") as cm:
self._run(result)
self.assertTrue(any("Could not fetch results" in o for o in cm.output))
self.assertEqual(result.return_code, ExitCode.FETCH_RESULT_ERROR.value)
self.assertEqual(result.status, "infra_error")

# Ensure cluster was terminated, no matter what
self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1)

def testLastLogsFails(self):
result = Result()

Expand Down Expand Up @@ -647,6 +668,4 @@ def report_result(self, test: Test, result: Result):


if __name__ == "__main__":
import pytest

sys.exit(pytest.main(["-v", __file__]))

0 comments on commit bae61d9

Please sign in to comment.