Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] immediately send ping in router when receiving new replica set #47053

Merged
merged 12 commits into from
Aug 14, 2024
3 changes: 3 additions & 0 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,9 @@ def _configure_logger_and_profilers(
component_id=self._component_id,
)

def push_proxy_handle(self, handle):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do something to the handle? Also maybe add a type hint is it's required 🙃

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doing something with the handle seems unnecessary for now, I think if you pass any actor handle as an argument in a ray remote call like:

x.remote(actor_handle)

then ray core does some processing under the hood that requires making a call to the GCS, so if this actor_handle was never "pushed" to actor beforehand then this call hangs. "Pushing" it once is enough to unblock the call though when the GCS goes down.

pass

def get_num_ongoing_requests(self) -> int:
"""Fetch the number of ongoing requests at this replica (queue length).

Expand Down
15 changes: 15 additions & 0 deletions python/ray/serve/_private/replica_scheduler/pow_2_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
Tuple,
)

import ray
from ray.exceptions import ActorDiedError, ActorUnavailableError
from ray.serve._private.common import (
DeploymentHandleSource,
DeploymentID,
ReplicaID,
RequestMetadata,
Expand Down Expand Up @@ -89,6 +91,7 @@ def __init__(
self,
event_loop: asyncio.AbstractEventLoop,
deployment_id: DeploymentID,
handle_source: DeploymentHandleSource,
prefer_local_node_routing: bool = False,
prefer_local_az_routing: bool = False,
self_node_id: Optional[str] = None,
Expand All @@ -99,6 +102,7 @@ def __init__(
):
self._loop = event_loop
self._deployment_id = deployment_id
self._handle_source = handle_source
self._prefer_local_node_routing = prefer_local_node_routing
self._prefer_local_az_routing = prefer_local_az_routing
self._self_node_id = self_node_id
Expand Down Expand Up @@ -240,7 +244,16 @@ def update_replicas(self, replicas: List[ReplicaWrapper]):
new_replica_id_set = set()
new_colocated_replica_ids = defaultdict(set)
new_multiplexed_model_id_to_replica_ids = defaultdict(set)

for r in replicas:
# If on the proxy, replica needs to call back into the proxy with
# `receive_asgi_messages` which can be blocked when GCS is down.
# To prevent that from happening, push proxy handle eagerly
if self._handle_source == DeploymentHandleSource.PROXY:
r._actor_handle.push_proxy_handle.remote(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a method to the interface, shouldn't be accessing the _actor_handle private attribute

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that way it can be tested as well

ray.get_runtime_context().current_actor
)

new_replicas[r.replica_id] = r
new_replica_id_set.add(r.replica_id)
if self._self_node_id is not None and r.node_id == self._self_node_id:
Expand Down Expand Up @@ -272,6 +285,8 @@ def update_replicas(self, replicas: List[ReplicaWrapper]):
self._replica_queue_len_cache.remove_inactive_replicas(
active_replica_ids=new_replica_id_set
)
# Populate cache for all replicas
self._loop.create_task(self._probe_queue_lens(list(self._replicas.values()), 0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm can we do this only for the replicas that were added instead of all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes! for some reason I thought it would mess with the fault tolerance, but seems like the actor info is stored per-process not per actor handle. changed to only ping new replicas.

self._replicas_updated_event.set()
self.maybe_start_scheduling_tasks()

Expand Down
1 change: 1 addition & 0 deletions python/ray/serve/_private/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ def __init__(
replica_scheduler = PowerOfTwoChoicesReplicaScheduler(
self._event_loop,
deployment_id,
handle_source,
_prefer_local_node_routing,
RAY_SERVE_PROXY_PREFER_LOCAL_AZ_ROUTING,
self_node_id,
Expand Down
56 changes: 56 additions & 0 deletions python/ray/serve/tests/test_gcs_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ray.serve._private.constants import SERVE_DEFAULT_APP_NAME
from ray.serve._private.storage.kv_store import KVStoreError, RayInternalKVStore
from ray.serve.context import _get_global_client
from ray.serve.handle import DeploymentHandle
from ray.tests.conftest import external_redis # noqa: F401


Expand All @@ -27,6 +28,8 @@ def serve_ha(external_redis, monkeypatch): # noqa: F811
serve.start()
yield (address_info, _get_global_client())
ray.shutdown()
# Clear cache and global serve client
serve.shutdown()


@pytest.mark.skipif(
Expand Down Expand Up @@ -105,6 +108,59 @@ def call():
assert pid == call()


@pytest.mark.parametrize("use_proxy", [True, False])
def test_router_on_gcs_failure(serve_ha, use_proxy: bool):
"""Test that a new router can send requests to replicas when GCS is down.

Specifically, if a proxy was just brought up or a deployment handle
was just created, and the GCS goes down BEFORE the router is able to
send its first request, new incoming requests should successfully get
sent to replicas during GCS downtime.
"""

def router_populated_with_replicas(handle: DeploymentHandle):
replicas = handle._router._replica_scheduler._replica_id_set
assert len(replicas) > 0
return True

@serve.deployment
class Dummy:
def __call__(self):
return os.getpid()

h = serve.run(Dummy.options(num_replicas=2).bind())
# TODO(zcin): We want to test the behavior for when the router
# didn't get a chance to send even a single request yet. However on
# the very first request we record telemetry for whether the
# deployment handle API was used, which will hang when the GCS is
# down. As a workaround for now, avoid recording telemetry so we
# can properly test router behavior when GCS is down. We should look
# into adding a timeout on the kv cache operation. For now, the proxy
# doesn't run into this because we don't record telemetry on proxy
h._recorded_telemetry = True
# Eagerly create router so it receives the replica set instead of
# waiting for the first request
h._get_or_create_router()

wait_for_condition(router_populated_with_replicas, handle=h)

# Kill GCS server before a single request is sent.
ray.worker._global_node.kill_gcs_server()

returned_pids = set()
if use_proxy:
for _ in range(10):
returned_pids.add(
int(requests.get("http://localhost:8000", timeout=0.1).text)
)
else:
for _ in range(10):
returned_pids.add(int(h.remote().result(timeout_s=0.1)))

print("Returned pids:", returned_pids)
assert len(returned_pids) == 2


if __name__ == "__main__":
# When GCS is down, right now some core worker members are not cleared
# properly in ray.shutdown. Given that this is not hi-pri issue,
Expand Down