diff --git a/python/ray/serve/_private/constants.py b/python/ray/serve/_private/constants.py index dc9be4fce2c8..83748d9d118f 100644 --- a/python/ray/serve/_private/constants.py +++ b/python/ray/serve/_private/constants.py @@ -253,3 +253,13 @@ RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS = ( os.environ.get("RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS", "0") == "1" ) + +# Initial deadline for queue length responses in the router. +RAY_SERVE_QUEUE_LENGTH_RESPONSE_DEADLINE_S = float( + os.environ.get("RAY_SERVE_QUEUE_LENGTH_RESPONSE_DEADLINE_S", 0.1) +) + +# Maximum deadline for queue length responses in the router (in backoff). +RAY_SERVE_MAX_QUEUE_LENGTH_RESPONSE_DEADLINE_S = float( + os.environ.get("RAY_SERVE_MAX_QUEUE_LENGTH_RESPONSE_DEADLINE_S", 1.0) +) diff --git a/python/ray/serve/_private/router.py b/python/ray/serve/_private/router.py index 30ccc6a4fce3..b282b50ef1d7 100644 --- a/python/ray/serve/_private/router.py +++ b/python/ray/serve/_private/router.py @@ -29,8 +29,10 @@ from ray.serve._private.common import DeploymentID, RequestProtocol, RunningReplicaInfo from ray.serve._private.constants import ( HANDLE_METRIC_PUSH_INTERVAL_S, + RAY_SERVE_MAX_QUEUE_LENGTH_RESPONSE_DEADLINE_S, RAY_SERVE_MULTIPLEXED_MODEL_ID_MATCHING_TIMEOUT_S, RAY_SERVE_PROXY_PREFER_LOCAL_AZ_ROUTING, + RAY_SERVE_QUEUE_LENGTH_RESPONSE_DEADLINE_S, SERVE_LOGGER_NAME, ) from ray.serve._private.deployment_info import DeploymentInfo @@ -158,8 +160,11 @@ def multiplexed_model_ids(self) -> Set[str]: """Set of model IDs on this replica.""" pass - async def get_queue_state(self) -> Tuple[int, bool]: - """Returns tuple of (queue_len, accepted).""" + async def get_queue_state(self, *, deadline_s: float) -> Tuple[int, bool]: + """Returns tuple of (queue_len, accepted). + + `deadline_s` is passed to verify backoff for testing. + """ pass def send_query( @@ -195,7 +200,7 @@ def availability_zone(self) -> Optional[str]: def multiplexed_model_ids(self) -> Set[str]: return self._multiplexed_model_ids - async def get_queue_state(self) -> Tuple[int, bool]: + async def get_queue_state(self, *, deadline_s: float) -> Tuple[int, bool]: # NOTE(edoakes): the `get_num_ongoing_requests` method name is shared by # the Python and Java replica implementations. If you change it, you need to # change both (or introduce a branch here). @@ -316,7 +321,10 @@ class PowerOfTwoChoicesReplicaScheduler(ReplicaScheduler): # Deadline for replicas to respond with their queue length. If the response isn't # received within this deadline, the replica will not be considered. - queue_len_response_deadline_s = 0.1 + # If this deadline is repeatedly missed, it will be exponentially increased up to + # the maximum configured here. + queue_len_response_deadline_s = RAY_SERVE_QUEUE_LENGTH_RESPONSE_DEADLINE_S + max_queue_len_response_deadline_s = RAY_SERVE_MAX_QUEUE_LENGTH_RESPONSE_DEADLINE_S # Hard limit on the maximum number of scheduling tasks to run. Having too many of # these tasks can cause stability issue due to too much load on the local process @@ -690,35 +698,53 @@ async def choose_two_replicas_with_backoff( ) async def select_from_candidate_replicas( - self, candidates: List[ReplicaWrapper] + self, + candidates: List[ReplicaWrapper], + backoff_index: int, ) -> Optional[ReplicaWrapper]: """Chooses the best replica from the list of candidates. If none of the replicas can be scheduled, returns `None`. The queue length at each replica is queried directly from it. The time waited - for these queries is capped by `self.queue_len_response_deadline_s`; if a - replica doesn't respond within the deadline it is not considered. + for these queries is capped by a response deadline; if a replica doesn't + doesn't respond within the deadline it is not considered. The deadline will be + increased exponentially in backoff. Among replicas that respond within the deadline and accept the request (don't have full queues), the one with the lowest queue length is chosen. """ + # Ensure the max deadline is always >= the initial deadline. + max_queue_len_response_deadline_s = max( + self.queue_len_response_deadline_s, + self.max_queue_len_response_deadline_s, + ) + queue_len_response_deadline_s = min( + self.queue_len_response_deadline_s * (2**backoff_index), + max_queue_len_response_deadline_s, + ) + get_queue_state_tasks = [] for c in candidates: - t = self._loop.create_task(c.get_queue_state()) + t = self._loop.create_task( + c.get_queue_state(deadline_s=queue_len_response_deadline_s) + ) t.replica_id = c.replica_id get_queue_state_tasks.append(t) done, pending = await asyncio.wait( get_queue_state_tasks, - timeout=self.queue_len_response_deadline_s, + timeout=queue_len_response_deadline_s, return_when=asyncio.ALL_COMPLETED, ) for t in pending: t.cancel() logger.warning( f"Failed to get queue length from replica {t.replica_id} " - f"within {self.queue_len_response_deadline_s}s." + f"within {queue_len_response_deadline_s}s. If this happens repeatedly " + "it's likely caused by high network latency in the cluster. You can " + "configure the deadline using the " + "`RAY_SERVE_QUEUE_LENGTH_RESPONSE_DEADLINE_S` environment variable." ) chosen_replica_id = None @@ -820,15 +846,20 @@ async def fulfill_pending_requests(self): """ try: while len(self._scheduling_tasks) <= self.target_num_scheduling_tasks: + backoff_index = 0 request_metadata = self._get_next_pending_request_metadata_to_schedule() async for candidates in self.choose_two_replicas_with_backoff( request_metadata ): - replica = await self.select_from_candidate_replicas(candidates) + replica = await self.select_from_candidate_replicas( + candidates, backoff_index + ) if replica is not None: self.fulfill_next_pending_request(replica, request_metadata) break + backoff_index += 1 + except Exception: logger.exception("Unexpected error in fulfill_pending_requests.") finally: diff --git a/python/ray/serve/tests/unit/test_replica_scheduler.py b/python/ray/serve/tests/unit/test_replica_scheduler.py index 2e89a1076627..24a2f7b7a724 100644 --- a/python/ray/serve/tests/unit/test_replica_scheduler.py +++ b/python/ray/serve/tests/unit/test_replica_scheduler.py @@ -43,6 +43,7 @@ def __init__( self._sleep_time_s = sleep_time_s self.get_queue_state_was_cancelled = False + self.queue_len_deadline_history = list() @property def replica_id(self) -> str: @@ -71,7 +72,8 @@ def set_queue_state_response( self._exception = exception self._has_queue_len_response.set() - async def get_queue_state(self) -> Tuple[int, bool]: + async def get_queue_state(self, *, deadline_s: float) -> Tuple[int, bool]: + self.queue_len_deadline_history.append(deadline_s) try: while not self._has_queue_len_response.is_set(): await self._has_queue_len_response.wait() @@ -1177,6 +1179,76 @@ async def test_get_queue_state_cancelled_on_timeout(pow_2_scheduler, fake_query) assert (await task) == r1 +@pytest.mark.asyncio +async def test_queue_len_response_deadline_backoff(pow_2_scheduler, fake_query): + """ + Verify that the response deadline is exponentially backed off up to the max. + """ + s = pow_2_scheduler + s.queue_len_response_deadline_s = 0.001 + s.max_queue_len_response_deadline_s = 0.005 + loop = get_or_create_event_loop() + + r1 = FakeReplicaWrapper("r1") + s.update_replicas([r1]) + + # Attempt to schedule; the replica will be attempted and a timeout will occur + # due to the short timeout set above. + task = loop.create_task(s.choose_replica_for_query(fake_query)) + done, _ = await asyncio.wait([task], timeout=0.2) + assert len(done) == 0 + + # Verify that the deadline never exceeds the max and deadline_n+1 is equal to + # the max or 2*deadline_n. + for i, j in zip( + range(0, len(r1.queue_len_deadline_history) - 1), + range(1, len(r1.queue_len_deadline_history)), + ): + deadline_i = r1.queue_len_deadline_history[i] + deadline_j = r1.queue_len_deadline_history[j] + print(deadline_i, deadline_j) + assert ( + deadline_i <= deadline_j + and deadline_j <= s.max_queue_len_response_deadline_s + ) + if deadline_i < s.max_queue_len_response_deadline_s: + assert ( + deadline_j == s.max_queue_len_response_deadline_s + or deadline_j == 2 * deadline_i + ) + + r1.set_queue_state_response(0, accepted=True) + assert (await task) == r1 + + +@pytest.mark.asyncio +async def test_max_queue_len_response_deadline(pow_2_scheduler, fake_query): + """ + Verify that if the max response deadline is > the initial deadline, the initial is + always used. + """ + s = pow_2_scheduler + s.queue_len_response_deadline_s = 0.01 + s.max_queue_len_response_deadline_s = 0.001 + loop = get_or_create_event_loop() + + r1 = FakeReplicaWrapper("r1") + s.update_replicas([r1]) + + # Attempt to schedule; the replica will be attempted and a timeout will occur + # due to the short timeout set above. + task = loop.create_task(s.choose_replica_for_query(fake_query)) + done, _ = await asyncio.wait([task], timeout=0.2) + assert len(done) == 0 + + assert all( + d == s.queue_len_response_deadline_s for d in r1.queue_len_deadline_history + ) + + r1.set_queue_state_response(0, accepted=True) + assert (await task) == r1 + + @pytest.mark.asyncio async def test_replicas_updated_event_on_correct_loop(pow_2_scheduler): """See https://github.com/ray-project/ray/issues/40631.