Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release] Improve handle_result in case of empty fetched result. #32055

Merged
merged 8 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions release/ray_release/alerts/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from ray_release.config import Test
from ray_release.result import Result

REQ_NON_EMPTY_RESULT = False


def handle_result(
test: Test,
Expand Down
22 changes: 16 additions & 6 deletions release/ray_release/alerts/handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,24 @@


result_to_handle_map = {
"default": default.handle_result,
"long_running_tests": long_running_tests.handle_result,
"rllib_tests": rllib_tests.handle_result,
"tune_tests": tune_tests.handle_result,
"xgboost_tests": xgboost_tests.handle_result,
"default": (default.handle_result, default.REQ_NON_EMPTY_RESULT),
"long_running_tests": (
long_running_tests.handle_result,
long_running_tests.REQ_NON_EMPTY_RESULT,
),
"rllib_tests": (rllib_tests.handle_result, rllib_tests.REQ_NON_EMPTY_RESULT),
"tune_tests": (tune_tests.handle_result, tune_tests.REQ_NON_EMPTY_RESULT),
"xgboost_tests": (xgboost_tests.handle_result, xgboost_tests.REQ_NON_EMPTY_RESULT),
xwjiang2010 marked this conversation as resolved.
Show resolved Hide resolved
}


def require_non_empty_result(test: Test) -> bool:
xwjiang2010 marked this conversation as resolved.
Show resolved Hide resolved
alert_suite = test.get("alert", "default")
if alert_suite not in result_to_handle_map:
raise ReleaseTestConfigError(f"Alert suite {alert_suite} not found.")
return result_to_handle_map[alert_suite][1]


def handle_result(test: Test, result: Result):
alert_suite = test.get("alert", "default")

Expand All @@ -32,7 +42,7 @@ def handle_result(test: Test, result: Result):
if alert_suite not in result_to_handle_map:
raise ReleaseTestConfigError(f"Alert suite {alert_suite} not found.")

handler = result_to_handle_map[alert_suite]
handler = result_to_handle_map[alert_suite][0]
error = handler(test, result)

if error:
Expand Down
3 changes: 3 additions & 0 deletions release/ray_release/alerts/long_running_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
from ray_release.config import Test
from ray_release.result import Result

REQ_NON_EMPTY_RESULT = True


def handle_result(
test: Test,
result: Result,
) -> Optional[str]:

last_update_diff = result.results.get("last_update_diff", float("inf"))

test_name = test["legacy"]["test_name"]
Expand Down
2 changes: 2 additions & 0 deletions release/ray_release/alerts/rllib_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from typing import Dict, Optional

REQ_NON_EMPTY_RESULT = False


def handle_result(
created_on: datetime.datetime,
Expand Down
2 changes: 2 additions & 0 deletions release/ray_release/alerts/tune_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from ray_release.config import Test
from ray_release.result import Result

REQ_NON_EMPTY_RESULT = True


def handle_result(
test: Test,
Expand Down
4 changes: 4 additions & 0 deletions release/ray_release/alerts/xgboost_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
from ray_release.result import Result


REQ_NON_EMPTY_RESULT = True


def handle_result(
test: Test,
result: Result,
) -> Optional[str]:

test_name = test["legacy"]["test_name"]

time_taken = result.results.get("time_taken", float("inf"))
Expand Down
4 changes: 2 additions & 2 deletions release/ray_release/command_runner/client_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from ray_release.cluster_manager.cluster_manager import ClusterManager
from ray_release.exception import (
ResultsError,
FetchResultError,
LocalEnvSetupError,
ClusterNodesWaitTimeout,
CommandTimeout,
Expand Down Expand Up @@ -121,7 +121,7 @@ def _fetch_json(self, path: str) -> Dict[str, Any]:
with open(path, "rt") as fp:
return json.load(fp)
except Exception as e:
raise ResultsError(
raise FetchResultError(
f"Could not load local results from client command: {e}"
) from e

Expand Down
4 changes: 2 additions & 2 deletions release/ray_release/command_runner/job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
CommandTimeout,
LocalEnvSetupError,
LogsError,
ResultsError,
FetchResultError,
)
from ray_release.file_manager.file_manager import FileManager
from ray_release.job_manager import JobManager
Expand Down Expand Up @@ -138,7 +138,7 @@ def _fetch_json(self, path: str) -> Dict[str, Any]:
os.unlink(tmpfile)
return data
except Exception as e:
raise ResultsError(f"Could not fetch results from session: {e}") from e
raise FetchResultError(f"Could not fetch results from session: {e}") from e

def fetch_results(self) -> Dict[str, Any]:
return self._fetch_json(self.result_output_json)
Expand Down
4 changes: 2 additions & 2 deletions release/ray_release/command_runner/sdk_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
CommandTimeout,
LogsError,
RemoteEnvSetupError,
ResultsError,
FetchResultError,
)
from ray_release.file_manager.file_manager import FileManager
from ray_release.logger import logger
Expand Down Expand Up @@ -189,7 +189,7 @@ def _fetch_json(self, path: str) -> Dict[str, Any]:
os.unlink(tmpfile)
return data
except Exception as e:
raise ResultsError(f"Could not fetch results from session: {e}") from e
raise FetchResultError(f"Could not fetch results from session: {e}") from e

def fetch_results(self) -> Dict[str, Any]:
return self._fetch_json(self.result_output_json)
Expand Down
4 changes: 2 additions & 2 deletions release/ray_release/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ class TestCommandError(CommandError):
exit_code = ExitCode.COMMAND_ERROR


class ResultsError(CommandError):
pass
class FetchResultError(FileManagerError):
exit_code = ExitCode.FETCH_RESULT_ERROR


class LogsError(CommandError):
Expand Down
29 changes: 17 additions & 12 deletions release/ray_release/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time
from typing import Optional, List

from ray_release.alerts.handle import handle_result
from ray_release.alerts.handle import handle_result, require_non_empty_result
from ray_release.anyscale_util import get_cluster_name
from ray_release.buildkite.output import buildkite_group, buildkite_open_last
from ray_release.cluster_manager.full import FullClusterManager
Expand Down Expand Up @@ -167,6 +167,8 @@ def run_release_test(
raise ReleaseTestSetupError(f"Error setting up release test: {e}") from e

pipeline_exception = None
# non critical for some tests. So separate it from the general one.
fetch_result_exception = None
try:
# Load configs
cluster_env = load_test_cluster_env(test, ray_wheels_url=ray_wheels_url)
Expand Down Expand Up @@ -321,9 +323,9 @@ def run_release_test(
try:
command_results = command_runner.fetch_results()
except Exception as e:
logger.error("Could not fetch results for test command")
logger.exception(e)
logger.exception(f"Could not fetch results for test command: {e}")
command_results = {}
fetch_result_exception = e

# Postprocess result:
if "last_update" in command_results:
Expand Down Expand Up @@ -357,7 +359,7 @@ def run_release_test(
try:
last_logs = command_runner.get_last_logs()
except Exception as e:
logger.error(f"Error fetching logs: {e}")
logger.exception(f"Error fetching logs: {e}")
last_logs = "No logs could be retrieved."

result.last_logs = last_logs
Expand All @@ -367,7 +369,7 @@ def run_release_test(
try:
cluster_manager.terminate_cluster(wait=False)
except Exception as e:
logger.error(f"Could not terminate cluster: {e}")
logger.exception(f"Could not terminate cluster: {e}")

time_taken = time.monotonic() - start_time
result.runtime = time_taken
Expand All @@ -376,12 +378,15 @@ def run_release_test(
os.chdir(old_wd)

if not pipeline_exception:
buildkite_group(":mag: Interpreting results")
# Only handle results if we didn't run into issues earlier
try:
handle_result(test, result)
except Exception as e:
pipeline_exception = e
if require_non_empty_result(test) and fetch_result_exception:
pipeline_exception = fetch_result_exception
else:
buildkite_group(":mag: Interpreting results")
# Only handle results if we didn't run into issues earlier
try:
handle_result(test, result)
except Exception as e:
pipeline_exception = e

if pipeline_exception:
buildkite_group(":rotating_light: Handling errors")
Expand All @@ -398,7 +403,7 @@ def run_release_test(
try:
reporter.report_result(test, result)
except Exception as e:
logger.error(f"Error reporting results via {type(reporter)}: {e}")
logger.exception(f"Error reporting results via {type(reporter)}: {e}")

if pipeline_exception:
raise pipeline_exception
Expand Down
3 changes: 2 additions & 1 deletion release/ray_release/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class ExitCode(enum.Enum):
CLUSTER_STARTUP_ERROR = 15
LOCAL_ENV_SETUP_ERROR = 16
REMOTE_ENV_SETUP_ERROR = 17
FETCH_RESULT_ERROR = 18
# ANYSCALE_SDK_ERROR = 19

# Infra timeouts (retryable)
Expand All @@ -52,7 +53,7 @@ class ExitCode(enum.Enum):
CLUSTER_STARTUP_TIMEOUT = 32
CLUSTER_WAIT_TIMEOUT = 33

# Command errors
# Command errors - these are considered application errors
COMMAND_ERROR = 40
COMMAND_ALERT = 41
COMMAND_TIMEOUT = 42
Expand Down
31 changes: 25 additions & 6 deletions release/ray_release/tests/test_glue.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import os
import pytest
import shutil
import sys
import tempfile
import time
import unittest
from typing import Type, Callable
import unittest
from unittest.mock import patch

from ray_release.alerts.handle import result_to_handle_map
Expand Down Expand Up @@ -33,7 +34,7 @@
PrepareCommandTimeout,
TestCommandError,
TestCommandTimeout,
ResultsError,
FetchResultError,
LogsError,
ResultsAlert,
ClusterNodesWaitTimeout,
Expand Down Expand Up @@ -153,7 +154,7 @@ def __init__(self, cluster_manager: ClusterManager):
def mock_alerter(test: Test, result: Result):
return self.mock_alert_return

result_to_handle_map["unit_test_alerter"] = mock_alerter
result_to_handle_map["unit_test_alerter"] = (mock_alerter, False)

type_str_to_command_runner["unit_test"] = MockCommandRunner
command_runner_to_cluster_manager[MockCommandRunner] = MockClusterManager
Expand Down Expand Up @@ -583,7 +584,7 @@ def testFetchResultFails(self):

self._succeed_until("test_command")

self.command_runner_return["fetch_results"] = _fail_on_call(ResultsError)
self.command_runner_return["fetch_results"] = _fail_on_call(FetchResultError)
with self.assertLogs(logger, "ERROR") as cm:
self._run(result)
self.assertTrue(any("Could not fetch results" in o for o in cm.output))
Expand All @@ -593,6 +594,26 @@ def testFetchResultFails(self):
# Ensure cluster was terminated
self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1)

def testFetchResultFailsReqNonEmptyResult(self):
# set `require_non_empty_result` bit.
new_handler = (result_to_handle_map["unit_test_alerter"], True)
result_to_handle_map["unit_test_alerter"] = new_handler

result = Result()

self._succeed_until("test_command")

self.command_runner_return["fetch_results"] = _fail_on_call(FetchResultError)
with self.assertRaisesRegex(FetchResultError, "Fail"):
with self.assertLogs(logger, "ERROR") as cm:
self._run(result)
self.assertTrue(any("Could not fetch results" in o for o in cm.output))
self.assertEqual(result.return_code, ExitCode.FETCH_RESULT_ERROR.value)
self.assertEqual(result.status, "infra_error")

# Ensure cluster was terminated, no matter what
self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1)

def testLastLogsFails(self):
result = Result()

Expand Down Expand Up @@ -647,6 +668,4 @@ def report_result(self, test: Test, result: Result):


if __name__ == "__main__":
import pytest

sys.exit(pytest.main(["-v", __file__]))