From 59d2037c962815c1a65c28d19c39a55888998289 Mon Sep 17 00:00:00 2001 From: iamjustinhsu Date: Sat, 1 Nov 2025 14:14:36 -0700 Subject: [PATCH 1/4] [core][dashboard] update do_reply to accept status_codes Signed-off-by: iamjustinhsu --- .../ray/dashboard/modules/state/state_head.py | 24 +++++++++++------ python/ray/dashboard/state_api_utils.py | 26 +++++++++++++++---- python/ray/dashboard/utils.py | 2 ++ 3 files changed, 39 insertions(+), 13 deletions(-) diff --git a/python/ray/dashboard/modules/state/state_head.py b/python/ray/dashboard/modules/state/state_head.py index cf7e8fe92cd7..b2a94d660af3 100644 --- a/python/ray/dashboard/modules/state/state_head.py +++ b/python/ray/dashboard/modules/state/state_head.py @@ -86,7 +86,7 @@ def __init__(self, *args, **kwargs): async def limit_handler_(self): return do_reply( - success=False, + status_code=429, error_message=( "Max number of in-progress requests=" f"{self.max_num_call_} reached. " @@ -110,12 +110,16 @@ async def list_jobs(self, req: aiohttp.web.Request) -> aiohttp.web.Response: try: result = await self._state_api.list_jobs(option=options_from_req(req)) return do_reply( - success=True, + status_code=200, error_message="", result=asdict(result), ) except DataSourceUnavailable as e: - return do_reply(success=False, error_message=str(e), result=None) + return do_reply( + status_code=500, + error_message=str(e), + result=None, + ) @routes.get("/api/v0/nodes") @RateLimitedModule.enforce_max_concurrent_calls @@ -171,7 +175,7 @@ async def list_logs(self, req: aiohttp.web.Request) -> aiohttp.web.Response: if not node_id and not node_ip: return do_reply( - success=False, + status_code=400, error_message=( "Both node id and node ip are not provided. " "Please provide at least one of them." @@ -182,7 +186,7 @@ async def list_logs(self, req: aiohttp.web.Request) -> aiohttp.web.Response: node_id = await self._log_api.ip_to_node_id(node_ip) if not node_id: return do_reply( - success=False, + status_code=404, error_message=( f"Cannot find matching node_id for a given node ip {node_ip}" ), @@ -195,12 +199,16 @@ async def list_logs(self, req: aiohttp.web.Request) -> aiohttp.web.Response: ) except DataSourceUnavailable as e: return do_reply( - success=False, + status_code=500, error_message=str(e), result=None, ) - return do_reply(success=True, error_message="", result=result) + return do_reply( + status_code=200, + error_message="", + result=result, + ) @routes.get("/api/v0/logs/{media_type}", resp_type=ResponseType.STREAM) @RateLimitedModule.enforce_max_concurrent_calls @@ -330,7 +338,7 @@ async def delayed_response(self, req: aiohttp.web.Request): delay = int(req.match_info.get("delay_s", 10)) await asyncio.sleep(delay) return do_reply( - success=True, + status_code=200, error_message="", result={}, partial_failure_warning=None, diff --git a/python/ray/dashboard/state_api_utils.py b/python/ray/dashboard/state_api_utils.py index 8ffafa7badb3..dc062a269e64 100644 --- a/python/ray/dashboard/state_api_utils.py +++ b/python/ray/dashboard/state_api_utils.py @@ -23,9 +23,15 @@ from ray.util.state.util import convert_string_to_type -def do_reply(success: bool, error_message: str, result: ListApiResponse, **kwargs): +def do_reply(status_code: int, error_message: str, result: ListApiResponse, **kwargs): + + # Validate status_code and fallback to 500 if invalid + valid_codes = {code.value for code in HTTPStatusCode} + if status_code not in valid_codes: + status_code = HTTPStatusCode.INTERNAL_ERROR + return rest_response( - status_code=HTTPStatusCode.OK if success else HTTPStatusCode.INTERNAL_ERROR, + status_code=status_code, message=error_message, result=result, convert_google_style=False, @@ -40,12 +46,22 @@ async def handle_list_api( try: result = await list_api_fn(option=options_from_req(req)) return do_reply( - success=True, + status_code=200, error_message="", result=asdict(result), ) + except ValueError as e: + return do_reply( + status_code=400, + error_message=str(e), + result=None, + ) except DataSourceUnavailable as e: - return do_reply(success=False, error_message=str(e), result=None) + return do_reply( + status_code=500, + error_message=str(e), + result=None, + ) def _get_filters_from_req( @@ -100,7 +116,7 @@ async def handle_summary_api( ): result = await summary_fn(option=summary_options_from_req(req)) return do_reply( - success=True, + status_code=200, error_message="", result=asdict(result), ) diff --git a/python/ray/dashboard/utils.py b/python/ray/dashboard/utils.py index 4cb60681abeb..f146a682c89c 100644 --- a/python/ray/dashboard/utils.py +++ b/python/ray/dashboard/utils.py @@ -48,7 +48,9 @@ class HTTPStatusCode(IntEnum): OK = 200 # 4xx Client Errors + BAD_REQUEST = 400 NOT_FOUND = 404 + TOO_MANY_REQUESTS = 429 # 5xx Server Errors INTERNAL_ERROR = 500 From a465e56f16d794484f0a3d9c153b62da41dbf956 Mon Sep 17 00:00:00 2001 From: iamjustinhsu Date: Mon, 3 Nov 2025 13:07:26 -0800 Subject: [PATCH 2/4] use status_code directly Signed-off-by: iamjustinhsu --- .../ray/dashboard/modules/state/state_head.py | 18 +++++++++--------- python/ray/dashboard/state_api_utils.py | 18 +++++++----------- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/python/ray/dashboard/modules/state/state_head.py b/python/ray/dashboard/modules/state/state_head.py index b2a94d660af3..187a11f91f07 100644 --- a/python/ray/dashboard/modules/state/state_head.py +++ b/python/ray/dashboard/modules/state/state_head.py @@ -30,7 +30,7 @@ from ray.dashboard.subprocesses.module import SubprocessModule from ray.dashboard.subprocesses.routes import SubprocessRouteTable as routes from ray.dashboard.subprocesses.utils import ResponseType -from ray.dashboard.utils import RateLimitedModule +from ray.dashboard.utils import HTTPStatusCode, RateLimitedModule from ray.util.state.common import ( DEFAULT_DOWNLOAD_FILENAME, DEFAULT_LOG_LIMIT, @@ -86,7 +86,7 @@ def __init__(self, *args, **kwargs): async def limit_handler_(self): return do_reply( - status_code=429, + status_code=HTTPStatusCode.TOO_MANY_REQUESTS, error_message=( "Max number of in-progress requests=" f"{self.max_num_call_} reached. " @@ -110,13 +110,13 @@ async def list_jobs(self, req: aiohttp.web.Request) -> aiohttp.web.Response: try: result = await self._state_api.list_jobs(option=options_from_req(req)) return do_reply( - status_code=200, + status_code=HTTPStatusCode.OK, error_message="", result=asdict(result), ) except DataSourceUnavailable as e: return do_reply( - status_code=500, + status_code=HTTPStatusCode.INTERNAL_ERROR, error_message=str(e), result=None, ) @@ -175,7 +175,7 @@ async def list_logs(self, req: aiohttp.web.Request) -> aiohttp.web.Response: if not node_id and not node_ip: return do_reply( - status_code=400, + status_code=HTTPStatusCode.BAD_REQUEST, error_message=( "Both node id and node ip are not provided. " "Please provide at least one of them." @@ -186,7 +186,7 @@ async def list_logs(self, req: aiohttp.web.Request) -> aiohttp.web.Response: node_id = await self._log_api.ip_to_node_id(node_ip) if not node_id: return do_reply( - status_code=404, + status_code=HTTPStatusCode.NOT_FOUND, error_message=( f"Cannot find matching node_id for a given node ip {node_ip}" ), @@ -199,13 +199,13 @@ async def list_logs(self, req: aiohttp.web.Request) -> aiohttp.web.Response: ) except DataSourceUnavailable as e: return do_reply( - status_code=500, + status_code=HTTPStatusCode.INTERNAL_ERROR, error_message=str(e), result=None, ) return do_reply( - status_code=200, + status_code=HTTPStatusCode.OK, error_message="", result=result, ) @@ -338,7 +338,7 @@ async def delayed_response(self, req: aiohttp.web.Request): delay = int(req.match_info.get("delay_s", 10)) await asyncio.sleep(delay) return do_reply( - status_code=200, + status_code=HTTPStatusCode.OK, error_message="", result={}, partial_failure_warning=None, diff --git a/python/ray/dashboard/state_api_utils.py b/python/ray/dashboard/state_api_utils.py index be13667d2bbd..b794bf840d32 100644 --- a/python/ray/dashboard/state_api_utils.py +++ b/python/ray/dashboard/state_api_utils.py @@ -23,13 +23,9 @@ from ray.util.state.util import convert_string_to_type -def do_reply(status_code: int, error_message: str, result: ListApiResponse, **kwargs): - - # Validate status_code and fallback to 500 if invalid - valid_codes = {code.value for code in HTTPStatusCode} - if status_code not in valid_codes: - status_code = HTTPStatusCode.INTERNAL_ERROR - +def do_reply( + status_code: HTTPStatusCode, error_message: str, result: ListApiResponse, **kwargs +): return rest_response( status_code=status_code, message=error_message, @@ -46,19 +42,19 @@ async def handle_list_api( try: result = await list_api_fn(option=options_from_req(req)) return do_reply( - status_code=200, + status_code=HTTPStatusCode.OK, error_message="", result=asdict(result), ) except ValueError as e: return do_reply( - status_code=400, + status_code=HTTPStatusCode.BAD_REQUEST, error_message=str(e), result=None, ) except DataSourceUnavailable as e: return do_reply( - status_code=500, + status_code=HTTPStatusCode.INTERNAL_ERROR, error_message=str(e), result=None, ) @@ -118,7 +114,7 @@ async def handle_summary_api( ): result = await summary_fn(option=summary_options_from_req(req)) return do_reply( - status_code=200, + status_code=HTTPStatusCode.OK, error_message="", result=asdict(result), ) From a2d65a40c76330cc5e8579b4cce2759bd302d1ef Mon Sep 17 00:00:00 2001 From: iamjustinhsu Date: Thu, 6 Nov 2025 10:17:09 -0800 Subject: [PATCH 3/4] test_handle_list_api_status_codes Signed-off-by: iamjustinhsu --- python/ray/tests/test_state_api.py | 54 +++++++++++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/python/ray/tests/test_state_api.py b/python/ray/tests/test_state_api.py index 487ba294000b..19eac0122b56 100644 --- a/python/ray/tests/test_state_api.py +++ b/python/ray/tests/test_state_api.py @@ -5,7 +5,7 @@ import time from collections import Counter from concurrent.futures import ThreadPoolExecutor -from typing import List +from typing import List, Optional from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -860,6 +860,58 @@ async def test_api_manager_list_workers(state_api_manager): assert exc_info.value.args[0] == GCS_QUERY_FAILURE_WARNING +@pytest.mark.asyncio +@pytest.mark.parametrize( + ("exception", + "status_code"), + [ + (None, 200), + (ValueError("Invalid filter parameter"), 400), + (DataSourceUnavailable("GCS connection failed"), 500), + ], +) +async def test_handle_list_api_status_codes( + exception: Optional[Exception], status_code: int +): + """Test that handle_list_api calls do_reply with correct status codes. + + This directly tests the HTTP layer logic that maps exceptions to status codes: + - Success → HTTP 200 OK + - ValueError → HTTP 400 BAD_REQUEST + - DataSourceUnavailable → HTTP 500 INTERNAL_ERROR + """ + from unittest.mock import AsyncMock, MagicMock + + from ray.dashboard.state_api_utils import handle_list_api + from ray.util.state.common import ListApiResponse + + # Mock aiohttp request with proper query interface + mock_request = MagicMock() + + def mock_get(key, default=None): + return default + + mock_request.query = MagicMock() + mock_request.query.get = mock_get + + # Test 1: Success returns HTTP 200 OK + if exception is None: + mock_backend = AsyncMock( + return_value=ListApiResponse( + result=[], + total=0, + num_after_truncation=0, + num_filtered=0, + partial_failure_warning="", + ) + ) + else: + mock_backend = AsyncMock(side_effect=exception) + + response = await handle_list_api(mock_backend, mock_request) + assert response.status == status_code + + @pytest.mark.asyncio async def test_api_manager_list_tasks(state_api_manager): data_source_client = state_api_manager.data_source_client From 818cd6e2397c5501dbc0431fa36c783810e6c6dd Mon Sep 17 00:00:00 2001 From: iamjustinhsu Date: Thu, 6 Nov 2025 10:18:20 -0800 Subject: [PATCH 4/4] lint + comments Signed-off-by: iamjustinhsu --- python/ray/tests/test_state_api.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python/ray/tests/test_state_api.py b/python/ray/tests/test_state_api.py index 19eac0122b56..b65b187b5e9d 100644 --- a/python/ray/tests/test_state_api.py +++ b/python/ray/tests/test_state_api.py @@ -862,8 +862,7 @@ async def test_api_manager_list_workers(state_api_manager): @pytest.mark.asyncio @pytest.mark.parametrize( - ("exception", - "status_code"), + ("exception", "status_code"), [ (None, 200), (ValueError("Invalid filter parameter"), 400), @@ -885,7 +884,7 @@ async def test_handle_list_api_status_codes( from ray.dashboard.state_api_utils import handle_list_api from ray.util.state.common import ListApiResponse - # Mock aiohttp request with proper query interface + # 1. Mock aiohttp request with proper query interface mock_request = MagicMock() def mock_get(key, default=None): @@ -894,7 +893,7 @@ def mock_get(key, default=None): mock_request.query = MagicMock() mock_request.query.get = mock_get - # Test 1: Success returns HTTP 200 OK + # 2. Mock response whether success or failure. if exception is None: mock_backend = AsyncMock( return_value=ListApiResponse( @@ -909,6 +908,8 @@ def mock_get(key, default=None): mock_backend = AsyncMock(side_effect=exception) response = await handle_list_api(mock_backend, mock_request) + + # 3. Assert status_code is correct. assert response.status == status_code