-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Serve] Shared LongPollClient
for Router
s
#48807
Changes from all commits
5169e86
ae170d3
53f97fc
d586f08
6ce1b3e
9fa746b
83eb5cf
cfc7d19
c673bec
67384eb
1b4712f
62bb4c3
56b7df5
102fb09
ca4a1f7
9737dbd
78d2b77
9f14f71
a6676ed
d7dd6de
4f93687
0c62830
49cc679
3dfce1f
0611dae
d62e598
dc5138c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,10 +3,13 @@ | |
import logging | ||
import threading | ||
import time | ||
import weakref | ||
from abc import ABC, abstractmethod | ||
from asyncio import AbstractEventLoop | ||
from collections import defaultdict | ||
from collections.abc import MutableMapping | ||
from contextlib import contextmanager | ||
from functools import partial | ||
from functools import lru_cache, partial | ||
from typing import Any, Coroutine, DefaultDict, Dict, List, Optional, Tuple, Union | ||
|
||
import ray | ||
|
@@ -399,6 +402,14 @@ def __init__( | |
), | ||
) | ||
|
||
# The Router needs to stay informed about changes to the target deployment's | ||
# running replicas and deployment config. We do this via the long poll system. | ||
# However, for efficiency, we don't want to create a LongPollClient for every | ||
# DeploymentHandle, so we use a shared LongPollClient that all Routers | ||
# register themselves with. But first, the router needs to get a fast initial | ||
# update so that it can start serving requests, which we do with a dedicated | ||
# LongPollClient that stops running once the shared client takes over. | ||
|
||
self.long_poll_client = LongPollClient( | ||
controller_handle, | ||
{ | ||
|
@@ -414,6 +425,11 @@ def __init__( | |
call_in_event_loop=self._event_loop, | ||
) | ||
|
||
shared = SharedRouterLongPollClient.get_or_create( | ||
controller_handle, self._event_loop | ||
) | ||
shared.register(self) | ||
|
||
def running_replicas_populated(self) -> bool: | ||
return self._running_replicas_populated | ||
|
||
|
@@ -690,3 +706,72 @@ def shutdown(self) -> concurrent.futures.Future: | |
return asyncio.run_coroutine_threadsafe( | ||
self._asyncio_router.shutdown(), loop=self._asyncio_loop | ||
) | ||
|
||
|
||
class SharedRouterLongPollClient: | ||
def __init__(self, controller_handle: ActorHandle, event_loop: AbstractEventLoop): | ||
self.controller_handler = controller_handle | ||
|
||
# We use a WeakSet to store the Routers so that we don't prevent them | ||
# from being garbage-collected. | ||
self.routers: MutableMapping[ | ||
DeploymentID, weakref.WeakSet[AsyncioRouter] | ||
] = defaultdict(weakref.WeakSet) | ||
|
||
# Creating the LongPollClient implicitly starts it | ||
self.long_poll_client = LongPollClient( | ||
controller_handle, | ||
key_listeners={}, | ||
call_in_event_loop=event_loop, | ||
) | ||
|
||
@classmethod | ||
@lru_cache(maxsize=None) | ||
def get_or_create( | ||
cls, controller_handle: ActorHandle, event_loop: AbstractEventLoop | ||
) -> "SharedRouterLongPollClient": | ||
shared = cls(controller_handle=controller_handle, event_loop=event_loop) | ||
logger.info(f"Started {shared}.") | ||
return shared | ||
|
||
def update_deployment_targets( | ||
self, | ||
deployment_target_info: DeploymentTargetInfo, | ||
deployment_id: DeploymentID, | ||
) -> None: | ||
for router in self.routers[deployment_id]: | ||
router.update_deployment_targets(deployment_target_info) | ||
router.long_poll_client.stop() | ||
Comment on lines
+742
to
+744
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Calling When there is a lot of applications/deployments and controller is slowed down, will there be race conditions with multiple long poll clients updating the same state? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct, the router will get both updates - I guess I was assuming that those updates are and will continue to be idempotent. Is that not the case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm yes I believe that's true. Just want to make sure it's thought through carefully, since I haven't touched the long poll code before. So the router's own long poll client will likely make 2 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds right to me! |
||
|
||
def update_deployment_config( | ||
self, deployment_config: DeploymentConfig, deployment_id: DeploymentID | ||
) -> None: | ||
for router in self.routers[deployment_id]: | ||
router.update_deployment_config(deployment_config) | ||
router.long_poll_client.stop() | ||
|
||
def register(self, router: AsyncioRouter) -> None: | ||
self.routers[router.deployment_id].add(router) | ||
|
||
# Remove the entries for any deployment ids that no longer have any routers. | ||
# The WeakSets will automatically lose track of Routers that get GC'd, | ||
# but the outer dict will keep the key around, so we need to clean up manually. | ||
# Note the list(...) to avoid mutating self.routers while iterating over it. | ||
for deployment_id, routers in list(self.routers.items()): | ||
if not routers: | ||
self.routers.pop(deployment_id) | ||
|
||
# Register the new listeners on the long poll client. | ||
# Some of these listeners may already exist, but it's safe to add them again. | ||
key_listeners = { | ||
(LongPollNamespace.DEPLOYMENT_TARGETS, deployment_id): partial( | ||
self.update_deployment_targets, deployment_id=deployment_id | ||
) | ||
for deployment_id in self.routers.keys() | ||
} | { | ||
(LongPollNamespace.DEPLOYMENT_CONFIG, deployment_id): partial( | ||
self.update_deployment_config, deployment_id=deployment_id | ||
) | ||
for deployment_id in self.routers.keys() | ||
} | ||
self.long_poll_client.add_key_listeners(key_listeners) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This case is now handled by https://github.com/ray-project/ray/pull/48807/files#diff-f138b21f7ddcd7d61c0b2704c8b828b9bbe7eb5021531e2c7fabeb20ec322e1aR280-R288 (and is necessary - when the shared client boots up for the first time it will send an RPC with no keys in it)