Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions ci/lint/pydoclint-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1729,11 +1729,6 @@ python/ray/serve/api.py
DOC103: Function `get_deployment_handle`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [_check_exists: bool, _record_telemetry: bool].
DOC201: Function `get_deployment_handle` does not have a return section in docstring
--------------------
python/ray/serve/autoscaling_policy.py
DOC101: Function `_calculate_desired_num_replicas`: Docstring contains fewer arguments than in function signature.
DOC111: Function `_calculate_desired_num_replicas`: The option `--arg-type-hints-in-docstring` is `False` but there are type hints in the docstring arg list
DOC103: Function `_calculate_desired_num_replicas`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [num_running_replicas: int, total_num_requests: int]. Arguments in the docstring but not in the function signature: [current_num_ongoing_requests: List[float]].
--------------------
python/ray/serve/batching.py
DOC111: Method `_BatchQueue.__init__`: The option `--arg-type-hints-in-docstring` is `False` but there are type hints in the docstring arg list
DOC101: Function `batch`: Docstring contains fewer arguments than in function signature.
Expand Down
17 changes: 17 additions & 0 deletions doc/source/serve/advanced-guides/advanced-autoscaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,23 @@ or equal to the upscale and downscale delay values. For instance, if you set
`upscale_delay_s = 3`, but keep `metrics_interval_s = 10`, the autoscaler only
upscales roughly every 10 seconds.

* **aggregation_function [default_value="mean"]**: One of `"mean","min","max"`.
This parameter determines how time-series autoscaling metrics (i.e
ongoing_requests) are aggregated over the `look_back_period_s`. Autoscaling
decisions are only made when the output of this function is *consistently* above
or below `target_ongoing_requests` for `upscale_delay_s` or `downscale_delay_s`
for each replica. The function output is number of requests per single replica.
*
* `"mean"`: The rounded mean of the windowed timeseries is used.
* `"min"`: The minimum value of the windowed timeseries is used.
* `"max"`: The maximum value of the windowed timeseries is used.

:::{note}
This parameter modifies the autoscaling metrics aggregation only. This means it
impacts autoscaling decisions, but should not impact node-level prometheus
metrics/logs (i.e `running_requests_per_replica`).
:::

* **look_back_period_s [default_value=30]**: This is the window over which the
average number of ongoing requests per replica is calculated.

Expand Down
1 change: 1 addition & 0 deletions doc/source/serve/api/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ See the [model composition guide](serve-model-composition) for how to update cod
serve.config.ProxyLocation
serve.config.gRPCOptions
serve.config.HTTPOptions
serve.config.AggregationFunction
serve.config.AutoscalingConfig
serve.config.AutoscalingPolicy
serve.config.RequestRouterConfig
Expand Down
146 changes: 87 additions & 59 deletions python/ray/serve/_private/autoscaling_state.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Set
from typing import Any, DefaultDict, Dict, Hashable, List, Optional, Set, Tuple

from ray.serve._private.common import (
DeploymentHandleSource,
Expand All @@ -14,6 +14,12 @@
SERVE_LOGGER_NAME,
)
from ray.serve._private.deployment_info import DeploymentInfo
from ray.serve._private.metrics_utils import (
InMemoryMetricsStore,
TimeStampedValue,
merge_timeseries_dicts,
)
from ray.serve._private.router import QUEUED_REQUESTS_KEY
from ray.serve._private.utils import get_capacity_adjusted_num_replicas

logger = logging.getLogger(SERVE_LOGGER_NAME)
Expand All @@ -32,22 +38,21 @@ class HandleMetricReport:
queued_requests: The current number of queued requests at the
handle, i.e. requests that haven't been assigned to any
replica yet.
running_requests: A map of replica ID to the average number of
requests, assigned through the handle, running at that
replica.
metrics_dict: A map of replica ID to *ascending* timeseries of requests
assigned through the handle to the replica.
timestamp: The time at which this report was received.
"""

actor_id: Optional[str]
handle_source: DeploymentHandleSource
queued_requests: float
running_requests: Dict[ReplicaID, float]
metrics_dict: DefaultDict[Hashable, List[TimeStampedValue]]
timestamp: float

@property
def total_requests(self) -> float:
"""Total number of queued and running requests."""
return self.queued_requests + sum(self.running_requests.values())
def approximate_total_requests(self) -> float:
"""Approximation of the total running requests, computed from the
latest value at each data key."""
return sum(ts[-1].value for ts in self.metrics_dict.values() if ts)

@property
def is_serve_component_source(self) -> bool:
Expand All @@ -69,12 +74,12 @@ class ReplicaMetricReport:
"""Report from a replica on ongoing requests.

Args:
running_requests: Average number of running requests at the
replica.
metrics_dict: A map of replica ID to *ascending* timeseries of total
requests currently running at the replica.
timestamp: The time at which this report was received.
"""

running_requests: float
metrics_dict: DefaultDict[Hashable, List[TimeStampedValue]]
timestamp: float


Expand Down Expand Up @@ -214,20 +219,18 @@ def apply_bounds(self, num_replicas: int) -> int:
)

def record_request_metrics_for_replica(
self, replica_id: ReplicaID, window_avg: Optional[float], send_timestamp: float
self,
replica_id: ReplicaID,
metrics_dict: DefaultDict[Hashable, List[TimeStampedValue]],
send_timestamp: float,
) -> None:
"""Records average number of ongoing requests at a replica."""

if window_avg is None:
return

"""Records the raw autoscaling metrics for a replica."""
if (
replica_id not in self._replica_requests
or send_timestamp > self._replica_requests[replica_id].timestamp
):
self._replica_requests[replica_id] = ReplicaMetricReport(
running_requests=window_avg,
timestamp=send_timestamp,
metrics_dict=metrics_dict, timestamp=send_timestamp
)

def record_request_metrics_for_handle(
Expand All @@ -236,13 +239,10 @@ def record_request_metrics_for_handle(
handle_id: str,
actor_id: Optional[str],
handle_source: DeploymentHandleSource,
queued_requests: float,
running_requests: Dict[ReplicaID, float],
metrics_dict: DefaultDict[Hashable, List[TimeStampedValue]],
send_timestamp: float,
) -> None:
"""Records average number of queued and running requests at a handle for this
deployment.
"""
"""Records the raw autoscaling metrics for a deployment handle."""

if (
handle_id not in self._handle_requests
Expand All @@ -251,8 +251,7 @@ def record_request_metrics_for_handle(
self._handle_requests[handle_id] = HandleMetricReport(
actor_id=actor_id,
handle_source=handle_source,
queued_requests=queued_requests,
running_requests=running_requests,
metrics_dict=metrics_dict,
timestamp=send_timestamp,
)

Expand All @@ -277,24 +276,24 @@ def drop_stale_handle_metrics(self, alive_serve_actor_ids: Set[str]) -> None:
and handle_metric.actor_id not in alive_serve_actor_ids
):
del self._handle_requests[handle_id]
if handle_metric.total_requests > 0:
if handle_metric.approximate_total_requests > 0:
logger.debug(
f"Dropping metrics for handle '{handle_id}' because the Serve "
f"actor it was on ({handle_metric.actor_id}) is no longer "
f"alive. It had {handle_metric.total_requests} ongoing requests"
f"alive. It had ~{handle_metric.approximate_total_requests} ongoing requests"
)
# Drop metrics for handles that haven't sent an update in a while.
# This is expected behavior for handles that were on replicas or
# proxies that have been shut down.
elif time.time() - handle_metric.timestamp >= timeout_s:
del self._handle_requests[handle_id]
if handle_metric.total_requests > 0:
if handle_metric.approximate_total_requests > 0:
actor_id = handle_metric.actor_id
actor_info = f"on actor '{actor_id}' " if actor_id else ""
logger.info(
f"Dropping stale metrics for handle '{handle_id}' {actor_info}"
f"because no update was received for {timeout_s:.1f}s. "
f"Ongoing requests was: {handle_metric.total_requests}."
f"Ongoing requests was : ~{handle_metric.approximate_total_requests}."
)

def get_decision_num_replicas(
Expand Down Expand Up @@ -337,34 +336,62 @@ def get_decision_num_replicas(

return self.apply_bounds(decision_num_replicas)

def get_total_num_requests(self) -> float:
"""Get average total number of requests aggregated over the past
`look_back_period_s` number of seconds.

If there are 0 running replicas, then returns the total number
of requests queued at handles

This code assumes that the metrics are either emmited on handles
or on replicas, but not both. Its the responsibility of the writer
to ensure enclusivity of the metrics.
def _get_request_count(self) -> Tuple[float, int]:
"""Helper function to aggregate request counts from replicas and handles.
If there are 0 running replicas, then returns the number
of requests queued at handles.
"""

total_requests = 0
if self._replica_requests:
merged_metrics = merge_timeseries_dicts(
*[
replica_report.metrics_dict
for replica_report in self._replica_requests.values()
],
window_s=1,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be set to a variable. I would think it should be set to the same rate at which metrics are collected, not simply pushed. I didn't have time today to find it.

)
elif self._handle_requests:
merged_metrics = merge_timeseries_dicts(
*[
handle_metric.metrics_dict
for handle_metric in self._handle_requests.values()
],
window_s=1,
)
else:
logger.debug("No metrics stores detected in autoscaler.")
return total_requests, 0
metrics_store = InMemoryMetricsStore()
metrics_store.data = merged_metrics

queued_requests = metrics_store.get_latest(QUEUED_REQUESTS_KEY) or 0.0
queued_per_replica = (
queued_requests / len(self._running_replicas)
if self._running_replicas
else queued_requests
)
total_requests, report_count = metrics_store.aggregate(
self._running_replicas, self._config.get_aggregation_function()
)
total_requests = 0.0 if total_requests is None else total_requests
total_requests += queued_per_replica

for id in self._running_replicas:
if id in self._replica_requests:
total_requests += self._replica_requests[id].running_requests
return total_requests, report_count

metrics_collected_on_replicas = total_requests > 0
for handle_metric in self._handle_requests.values():
total_requests += handle_metric.queued_requests
def get_total_num_requests(self) -> float:
"""Get total number of requests aggregated over the past
`look_back_period_s` number of seconds.

if not metrics_collected_on_replicas:
for id in self._running_replicas:
if id in handle_metric.running_requests:
total_requests += handle_metric.running_requests[id]
If there are 0 running replicas, then returns the aggregated number
of requests queued at handles.

return total_requests
This code assumes that the metrics are either emitted on handles
or on replicas, but not both. It's the responsibility of the writer
to ensure enclusivity of the metrics.
"""
aggregated_requests, report_count = self._get_request_count()
return aggregated_requests * max(1, report_count)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly is report_count? Is this the number of replicas for the deployment, the number of handles that hold request metrics for this deployment, the number of data points per handle, the number of data points per replica?

Without knowing this, it's unclear whether the calculation aggregated_requests * max(1, report_count) makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

report_count is the number of replicas which have reported valid data to a valid handle_metric. This is not the same as running_replicas as there are cases where running_replicas have not yet reported metrics to the handle. So report_count ensures that the total value is calculated based only on the number of reports collected.

I'll make this more clear.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true? In _aggregate_reduce, you define:

        values = (
            timeseries.value for key in keys for timeseries in self.data.get(key, ())
        )

And then return the report_count as len(values). Doesn't that mean report_count is something along the lines of (#replicas) * (#data pts per replica)? Let me know if I'm understanding correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zcin Correct, it was Implimented incorrectly here, but the tests didn't catch it. I fixed this earlier today in the metrics PR so it will be resolved by that. As soon as that's approved I'll rebase this PR and resolve the metrics-related issues in this PR.



class AutoscalingStateManager:
Expand Down Expand Up @@ -431,27 +458,29 @@ def is_within_bounds(
)

def record_request_metrics_for_replica(
self, replica_id: ReplicaID, window_avg: Optional[float], send_timestamp: float
self,
replica_id: ReplicaID,
metrics_dict: DefaultDict[Hashable, List[TimeStampedValue]],
send_timestamp: float,
) -> None:
deployment_id = replica_id.deployment_id
# Defensively guard against delayed replica metrics arriving
# after the deployment's been deleted
if deployment_id in self._autoscaling_states:
self._autoscaling_states[deployment_id].record_request_metrics_for_replica(
replica_id=replica_id,
window_avg=window_avg,
metrics_dict=metrics_dict,
send_timestamp=send_timestamp,
)

def record_request_metrics_for_handle(
self,
*,
deployment_id: str,
deployment_id: DeploymentID,
handle_id: str,
actor_id: Optional[str],
handle_source: DeploymentHandleSource,
queued_requests: float,
running_requests: Dict[ReplicaID, float],
metrics_dict: DefaultDict[Hashable, List[TimeStampedValue]],
send_timestamp: float,
) -> None:
"""Update request metric for a specific handle."""
Expand All @@ -461,8 +490,7 @@ def record_request_metrics_for_handle(
handle_id=handle_id,
actor_id=actor_id,
handle_source=handle_source,
queued_requests=queued_requests,
running_requests=running_requests,
metrics_dict=metrics_dict,
send_timestamp=send_timestamp,
)

Expand Down
33 changes: 21 additions & 12 deletions python/ray/serve/_private/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,17 @@
import os
import pickle
import time
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from typing import (
Any,
DefaultDict,
Dict,
Hashable,
Iterable,
List,
Optional,
Tuple,
Union,
)

import ray
from ray._common.network_utils import build_address
Expand Down Expand Up @@ -46,6 +56,7 @@
get_component_logger_file_path,
)
from ray.serve._private.long_poll import LongPollHost, LongPollNamespace
from ray.serve._private.metrics_utils import TimeStampedValue
from ray.serve._private.proxy_state import ProxyStateManager
from ray.serve._private.storage.kv_store import RayInternalKVStore
from ray.serve._private.usage import ServeUsageTag
Expand Down Expand Up @@ -260,13 +271,14 @@ def get_pid(self) -> int:
return os.getpid()

def record_autoscaling_metrics(
self, replica_id: str, window_avg: Optional[float], send_timestamp: float
self,
replica_id: str,
metrics_dict: DefaultDict[Hashable, List[TimeStampedValue]],
send_timestamp: float,
):
logger.debug(
f"Received metrics from replica {replica_id}: {window_avg} running requests"
)
logger.debug(f"Received metrics from replica {replica_id} for autoscaling.")
self.autoscaling_state_manager.record_request_metrics_for_replica(
replica_id, window_avg, send_timestamp
replica_id, metrics_dict, send_timestamp
)

def record_handle_metrics(
Expand All @@ -275,21 +287,18 @@ def record_handle_metrics(
handle_id: str,
actor_id: Optional[str],
handle_source: DeploymentHandleSource,
queued_requests: float,
running_requests: Dict[str, float],
metrics_dict: DefaultDict[Hashable, List[TimeStampedValue]],
send_timestamp: float,
):
logger.debug(
f"Received metrics from handle {handle_id} for deployment {deployment_id}: "
f"{queued_requests} queued requests and {running_requests} running requests"
f"Received metrics from handle {handle_id} for deployment {deployment_id}"
)
self.autoscaling_state_manager.record_request_metrics_for_handle(
deployment_id=deployment_id,
handle_id=handle_id,
actor_id=actor_id,
handle_source=handle_source,
queued_requests=queued_requests,
running_requests=running_requests,
metrics_dict=metrics_dict,
send_timestamp=send_timestamp,
)

Expand Down
Loading