From f1f4d69df3c797723c5a24a32a96017a3bb7ea05 Mon Sep 17 00:00:00 2001 From: Cindy Zhang Date: Thu, 18 May 2023 10:10:27 -0700 Subject: [PATCH] [serve] Shutdown http proxy state (cherry-pick) (#35446) * [serve] Shutdown http proxy state (#35395) Shutdown http proxy state so that it won't run anything in its update loop once a shutdown signal is received. Signed-off-by: Cindy Zhang * [serve] Remove print statement + fix lint (#35439) Signed-off-by: Cindy Zhang --------- Signed-off-by: Cindy Zhang --- python/ray/serve/_private/http_state.py | 12 +++++-- python/ray/serve/tests/test_http_state.py | 39 +++++++++++++++++++++++ 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/python/ray/serve/_private/http_state.py b/python/ray/serve/_private/http_state.py index 3b52a931b0170..ecff9d0ed5398 100644 --- a/python/ray/serve/_private/http_state.py +++ b/python/ray/serve/_private/http_state.py @@ -41,6 +41,7 @@ def __init__( self._status = HTTPProxyStatus.STARTING self._health_check_obj_ref = None self._last_health_check_time: float = 0 + self._shutting_down = False self._actor_details = HTTPProxyDetails( node_id=node_id, @@ -78,6 +79,9 @@ def update_actor_details(self, **kwargs) -> None: self._actor_details = HTTPProxyDetails(**details_kwargs) def update(self): + if self._shutting_down: + return + if self._status == HTTPProxyStatus.STARTING: finished, _ = ray.wait([self._ready_obj_ref], timeout=0) if finished: @@ -128,6 +132,10 @@ def update(self): self._health_check_obj_ref = self._actor_handle.check_health.remote() self._last_health_check_time = time.time() + def shutdown(self): + self._shutting_down = True + ray.kill(self.actor_handle, no_restart=True) + class HTTPState: """Manages all state for HTTP proxies in the system. @@ -164,8 +172,8 @@ def __init__( self._start_proxies_if_needed() def shutdown(self) -> None: - for proxy in self.get_http_proxy_handles().values(): - ray.kill(proxy, no_restart=True) + for proxy_state in self._proxy_states.values(): + proxy_state.shutdown() def get_config(self): return self._config diff --git a/python/ray/serve/tests/test_http_state.py b/python/ray/serve/tests/test_http_state.py index ae3870d292d58..6288961a9b0b6 100644 --- a/python/ray/serve/tests/test_http_state.py +++ b/python/ray/serve/tests/test_http_state.py @@ -1,9 +1,11 @@ import json from unittest.mock import patch +import asyncio import pytest import ray +from ray.experimental.state.api import list_actors from ray._private.test_utils import SignalActor, wait_for_condition from ray.serve.config import DeploymentMode, HTTPOptions from ray.serve._private.common import HTTPProxyStatus @@ -132,6 +134,43 @@ def check_proxy(status): ray.shutdown() +def test_http_proxy_shutdown(): + ray.init() + + @ray.remote(num_cpus=0) + class MockHTTPProxyActor: + async def ready(self): + return json.dumps(["mock_worker_id", "mock_log_file_path"]) + + async def check_health(self): + await asyncio.sleep(100) + + proxy = MockHTTPProxyActor.options(lifetime="detached").remote() + state = HTTPProxyState(proxy, "alice", "mock_node_id", "mock_node_ip") + assert state.status == HTTPProxyStatus.STARTING + + def check_proxy(status): + state.update() + return state.status == status + + # Proxy actor is ready, so status should transition STARTING -> HEALTHY + wait_for_condition(check_proxy, status=HTTPProxyStatus.HEALTHY, timeout=2) + + # Confirm that a new health check has been started + state.update() + assert state._health_check_obj_ref + + # Shutdown the http proxy state. Wait for the http proxy actor to be killed + state.shutdown() + wait_for_condition(lambda: len(list_actors(filters=[("state", "=", "ALIVE")])) == 0) + + # Make sure that the state doesn't try to check on the status of the dead actor + state.update() + assert state.status == HTTPProxyStatus.HEALTHY + + ray.shutdown() + + if __name__ == "__main__": import sys