Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] Cherry-pick queue length deadline configuration changes #42176

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,13 @@
RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS = (
os.environ.get("RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS", "0") == "1"
)

# Initial deadline for queue length responses in the router.
RAY_SERVE_QUEUE_LENGTH_RESPONSE_DEADLINE_S = float(
os.environ.get("RAY_SERVE_QUEUE_LENGTH_RESPONSE_DEADLINE_S", 0.1)
)

# Maximum deadline for queue length responses in the router (in backoff).
RAY_SERVE_MAX_QUEUE_LENGTH_RESPONSE_DEADLINE_S = float(
os.environ.get("RAY_SERVE_MAX_QUEUE_LENGTH_RESPONSE_DEADLINE_S", 1.0)
)
53 changes: 42 additions & 11 deletions python/ray/serve/_private/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
from ray.serve._private.common import DeploymentID, RequestProtocol, RunningReplicaInfo
from ray.serve._private.constants import (
HANDLE_METRIC_PUSH_INTERVAL_S,
RAY_SERVE_MAX_QUEUE_LENGTH_RESPONSE_DEADLINE_S,
RAY_SERVE_MULTIPLEXED_MODEL_ID_MATCHING_TIMEOUT_S,
RAY_SERVE_PROXY_PREFER_LOCAL_AZ_ROUTING,
RAY_SERVE_QUEUE_LENGTH_RESPONSE_DEADLINE_S,
SERVE_LOGGER_NAME,
)
from ray.serve._private.deployment_info import DeploymentInfo
Expand Down Expand Up @@ -158,8 +160,11 @@ def multiplexed_model_ids(self) -> Set[str]:
"""Set of model IDs on this replica."""
pass

async def get_queue_state(self) -> Tuple[int, bool]:
"""Returns tuple of (queue_len, accepted)."""
async def get_queue_state(self, *, deadline_s: float) -> Tuple[int, bool]:
"""Returns tuple of (queue_len, accepted).

`deadline_s` is passed to verify backoff for testing.
"""
pass

def send_query(
Expand Down Expand Up @@ -195,7 +200,7 @@ def availability_zone(self) -> Optional[str]:
def multiplexed_model_ids(self) -> Set[str]:
return self._multiplexed_model_ids

async def get_queue_state(self) -> Tuple[int, bool]:
async def get_queue_state(self, *, deadline_s: float) -> Tuple[int, bool]:
# NOTE(edoakes): the `get_num_ongoing_requests` method name is shared by
# the Python and Java replica implementations. If you change it, you need to
# change both (or introduce a branch here).
Expand Down Expand Up @@ -316,7 +321,10 @@ class PowerOfTwoChoicesReplicaScheduler(ReplicaScheduler):

# Deadline for replicas to respond with their queue length. If the response isn't
# received within this deadline, the replica will not be considered.
queue_len_response_deadline_s = 0.1
# If this deadline is repeatedly missed, it will be exponentially increased up to
# the maximum configured here.
queue_len_response_deadline_s = RAY_SERVE_QUEUE_LENGTH_RESPONSE_DEADLINE_S
max_queue_len_response_deadline_s = RAY_SERVE_MAX_QUEUE_LENGTH_RESPONSE_DEADLINE_S

# Hard limit on the maximum number of scheduling tasks to run. Having too many of
# these tasks can cause stability issue due to too much load on the local process
Expand Down Expand Up @@ -690,35 +698,53 @@ async def choose_two_replicas_with_backoff(
)

async def select_from_candidate_replicas(
self, candidates: List[ReplicaWrapper]
self,
candidates: List[ReplicaWrapper],
backoff_index: int,
) -> Optional[ReplicaWrapper]:
"""Chooses the best replica from the list of candidates.

If none of the replicas can be scheduled, returns `None`.

The queue length at each replica is queried directly from it. The time waited
for these queries is capped by `self.queue_len_response_deadline_s`; if a
replica doesn't respond within the deadline it is not considered.
for these queries is capped by a response deadline; if a replica doesn't
doesn't respond within the deadline it is not considered. The deadline will be
increased exponentially in backoff.

Among replicas that respond within the deadline and accept the request (don't
have full queues), the one with the lowest queue length is chosen.
"""
# Ensure the max deadline is always >= the initial deadline.
max_queue_len_response_deadline_s = max(
self.queue_len_response_deadline_s,
self.max_queue_len_response_deadline_s,
)
queue_len_response_deadline_s = min(
self.queue_len_response_deadline_s * (2**backoff_index),
max_queue_len_response_deadline_s,
)

get_queue_state_tasks = []
for c in candidates:
t = self._loop.create_task(c.get_queue_state())
t = self._loop.create_task(
c.get_queue_state(deadline_s=queue_len_response_deadline_s)
)
t.replica_id = c.replica_id
get_queue_state_tasks.append(t)

done, pending = await asyncio.wait(
get_queue_state_tasks,
timeout=self.queue_len_response_deadline_s,
timeout=queue_len_response_deadline_s,
return_when=asyncio.ALL_COMPLETED,
)
for t in pending:
t.cancel()
logger.warning(
f"Failed to get queue length from replica {t.replica_id} "
f"within {self.queue_len_response_deadline_s}s."
f"within {queue_len_response_deadline_s}s. If this happens repeatedly "
"it's likely caused by high network latency in the cluster. You can "
"configure the deadline using the "
"`RAY_SERVE_QUEUE_LENGTH_RESPONSE_DEADLINE_S` environment variable."
)

chosen_replica_id = None
Expand Down Expand Up @@ -820,15 +846,20 @@ async def fulfill_pending_requests(self):
"""
try:
while len(self._scheduling_tasks) <= self.target_num_scheduling_tasks:
backoff_index = 0
request_metadata = self._get_next_pending_request_metadata_to_schedule()
async for candidates in self.choose_two_replicas_with_backoff(
request_metadata
):
replica = await self.select_from_candidate_replicas(candidates)
replica = await self.select_from_candidate_replicas(
candidates, backoff_index
)
if replica is not None:
self.fulfill_next_pending_request(replica, request_metadata)
break

backoff_index += 1

except Exception:
logger.exception("Unexpected error in fulfill_pending_requests.")
finally:
Expand Down
74 changes: 73 additions & 1 deletion python/ray/serve/tests/unit/test_replica_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(
self._sleep_time_s = sleep_time_s

self.get_queue_state_was_cancelled = False
self.queue_len_deadline_history = list()

@property
def replica_id(self) -> str:
Expand Down Expand Up @@ -71,7 +72,8 @@ def set_queue_state_response(
self._exception = exception
self._has_queue_len_response.set()

async def get_queue_state(self) -> Tuple[int, bool]:
async def get_queue_state(self, *, deadline_s: float) -> Tuple[int, bool]:
self.queue_len_deadline_history.append(deadline_s)
try:
while not self._has_queue_len_response.is_set():
await self._has_queue_len_response.wait()
Expand Down Expand Up @@ -1177,6 +1179,76 @@ async def test_get_queue_state_cancelled_on_timeout(pow_2_scheduler, fake_query)
assert (await task) == r1


@pytest.mark.asyncio
async def test_queue_len_response_deadline_backoff(pow_2_scheduler, fake_query):
"""
Verify that the response deadline is exponentially backed off up to the max.
"""
s = pow_2_scheduler
s.queue_len_response_deadline_s = 0.001
s.max_queue_len_response_deadline_s = 0.005
loop = get_or_create_event_loop()

r1 = FakeReplicaWrapper("r1")
s.update_replicas([r1])

# Attempt to schedule; the replica will be attempted and a timeout will occur
# due to the short timeout set above.
task = loop.create_task(s.choose_replica_for_query(fake_query))
done, _ = await asyncio.wait([task], timeout=0.2)
assert len(done) == 0

# Verify that the deadline never exceeds the max and deadline_n+1 is equal to
# the max or 2*deadline_n.
for i, j in zip(
range(0, len(r1.queue_len_deadline_history) - 1),
range(1, len(r1.queue_len_deadline_history)),
):
deadline_i = r1.queue_len_deadline_history[i]
deadline_j = r1.queue_len_deadline_history[j]
print(deadline_i, deadline_j)
assert (
deadline_i <= deadline_j
and deadline_j <= s.max_queue_len_response_deadline_s
)
if deadline_i < s.max_queue_len_response_deadline_s:
assert (
deadline_j == s.max_queue_len_response_deadline_s
or deadline_j == 2 * deadline_i
)

r1.set_queue_state_response(0, accepted=True)
assert (await task) == r1


@pytest.mark.asyncio
async def test_max_queue_len_response_deadline(pow_2_scheduler, fake_query):
"""
Verify that if the max response deadline is > the initial deadline, the initial is
always used.
"""
s = pow_2_scheduler
s.queue_len_response_deadline_s = 0.01
s.max_queue_len_response_deadline_s = 0.001
loop = get_or_create_event_loop()

r1 = FakeReplicaWrapper("r1")
s.update_replicas([r1])

# Attempt to schedule; the replica will be attempted and a timeout will occur
# due to the short timeout set above.
task = loop.create_task(s.choose_replica_for_query(fake_query))
done, _ = await asyncio.wait([task], timeout=0.2)
assert len(done) == 0

assert all(
d == s.queue_len_response_deadline_s for d in r1.queue_len_deadline_history
)

r1.set_queue_state_response(0, accepted=True)
assert (await task) == r1


@pytest.mark.asyncio
async def test_replicas_updated_event_on_correct_loop(pow_2_scheduler):
"""See https://github.com/ray-project/ray/issues/40631.
Expand Down
Loading