diff --git a/doc/source/serve/advanced-guides/advanced-autoscaling.md b/doc/source/serve/advanced-guides/advanced-autoscaling.md index 75047a283ad3..1055b03257ef 100644 --- a/doc/source/serve/advanced-guides/advanced-autoscaling.md +++ b/doc/source/serve/advanced-guides/advanced-autoscaling.md @@ -126,6 +126,34 @@ initializes slowly, you can increase `downscale_delay_s` to make the downscaling happen more infrequently and avoid reinitialization when the application needs to upscale again in the future. +:::{note} +Upscaling and downscaling calculations are made periodically by the Serve +Controller which by default runs every `0.1` seconds). You can set and check +this value using the `RAY_SERVE_CONTROL_LOOP_INTERVAL_S` environment variable. +`down/upscale_delay_s` restricts the autoscaler from adding or removing +replicas unless *all* calculations over the delay period unanimously agree to +either upscale or downscale. Any single *non-consistent* calculation will reset +the decision timer. In general `RAY_SERVE_CONTROL_LOOP_INTERVAL_S` should not +be changed for the purpose of balancing autoscaling. +::: + +* **scaling_function [default_value="last"]: One of `"last","mean","min","max"`. +This parameter determines how delayed scaling decisions are made. Scaling +calculations are cached over the `upscale_delay_s` and `downscale_delay_s` +periods, and once *consistent* this parameter determines how the final number +of replicas is decided. + * `"last"`: The most recent scaling calculation is used. + * `"mean"`: The rounded mean value of all calculations is used. + * `"min"`: The minimum value of all calculations is used. + * `"max"`: The maximum value of all calculations is used. + +:::{note} +Previously, the autoscaler would only use the most recent (`"last"`) scaling +calculation. This is maintained as the default behavior for backwards +compatibility, however it is recommended to use `"mean"` or `"max"` to better +accommodate traffic patterns. +::: + * **upscale_smoothing_factor [default_value=1.0] (DEPRECATED)**: This parameter is renamed to `upscaling_factor`. `upscale_smoothing_factor` will be removed in a future release. diff --git a/python/ray/serve/_private/constants.py b/python/ray/serve/_private/constants.py index df0029de0902..8535e99d0af7 100644 --- a/python/ray/serve/_private/constants.py +++ b/python/ray/serve/_private/constants.py @@ -38,6 +38,9 @@ "RAY_SERVE_CONTROL_LOOP_INTERVAL_S cannot be negative." ) +# Max len of scaling decision history to prevent a memory leak. +MAX_SCALING_HISTORY_LENGTH = int(1e6) + #: Max time to wait for HTTP proxy in `serve.start()`. HTTP_PROXY_TIMEOUT = 60 diff --git a/python/ray/serve/autoscaling_policy.py b/python/ray/serve/autoscaling_policy.py index 2cabe736a870..0b0bff6ab718 100644 --- a/python/ray/serve/autoscaling_policy.py +++ b/python/ray/serve/autoscaling_policy.py @@ -2,7 +2,11 @@ import math from typing import Any, Dict, Optional -from ray.serve._private.constants import CONTROL_LOOP_INTERVAL_S, SERVE_LOGGER_NAME +from ray.serve._private.constants import ( + CONTROL_LOOP_INTERVAL_S, + MAX_SCALING_HISTORY_LENGTH, + SERVE_LOGGER_NAME, +) from ray.serve.config import AutoscalingConfig from ray.util.annotations import PublicAPI @@ -57,7 +61,9 @@ def _calculate_desired_num_replicas( # Multiply the distance to 1 by the smoothing ("gain") factor (default=1). smoothed_error_ratio = 1 + ((error_ratio - 1) * scaling_factor) - desired_num_replicas = math.ceil(num_running_replicas * smoothed_error_ratio) + desired_num_replicas = math.ceil( + round(num_running_replicas * smoothed_error_ratio, 6) + ) # If desired num replicas is "stuck" because of the smoothing factor # (meaning the traffic is low enough for the replicas to downscale @@ -101,6 +107,8 @@ def replica_queue_length_autoscaling_policy( seconds. """ decision_counter = policy_state.get("decision_counter", 0) + decision_history = policy_state.get("decision_history", []) + if num_running_replicas == 0: # When 0 replicas and queries are queued, scale up the replicas if total_num_requests > 0: @@ -126,13 +134,16 @@ def replica_queue_length_autoscaling_policy( # Otherwise, just increment. if decision_counter < 0: decision_counter = 0 + decision_history = [] decision_counter += 1 + decision_history.append(desired_num_replicas) # Only actually scale the replicas if we've made this decision for # 'scale_up_consecutive_periods' in a row. if decision_counter > int(config.upscale_delay_s / CONTROL_LOOP_INTERVAL_S): + decision_num_replicas = config.decide_num_replicas(decision_history) decision_counter = 0 - decision_num_replicas = desired_num_replicas + decision_history = [] # Scale down. elif desired_num_replicas < curr_target_num_replicas: @@ -140,19 +151,30 @@ def replica_queue_length_autoscaling_policy( # positive), reset it to zero before decrementing. if decision_counter > 0: decision_counter = 0 + decision_history = [] + decision_counter -= 1 + decision_history.append(desired_num_replicas) # Only actually scale the replicas if we've made this decision for # 'scale_down_consecutive_periods' in a row. if decision_counter < -int(config.downscale_delay_s / CONTROL_LOOP_INTERVAL_S): + decision_num_replicas = config.decide_num_replicas(decision_history) decision_counter = 0 - decision_num_replicas = desired_num_replicas + decision_history = [] # Do nothing. else: decision_counter = 0 + decision_history = [] policy_state["decision_counter"] = decision_counter + + # Limit the length of the decision history to avoid unbounded memory usage + if len(decision_history) > MAX_SCALING_HISTORY_LENGTH: + decision_history = decision_history[-(MAX_SCALING_HISTORY_LENGTH - 10) :] + policy_state["decision_history"] = decision_history + return decision_num_replicas diff --git a/python/ray/serve/config.py b/python/ray/serve/config.py index b2b0fbdbefa9..f4f68f1ac46a 100644 --- a/python/ray/serve/config.py +++ b/python/ray/serve/config.py @@ -71,6 +71,17 @@ class AutoscalingConfig(BaseModel): # How long to wait before scaling up replicas upscale_delay_s: NonNegativeFloat = 30.0 + # Determines how scaling decisions are made based on calculated replica counts + # Replica counts are calculated many times over the upscale/downscale delay period + scaling_function: str = "last" + _scaling_functions = { + "last": staticmethod(lambda history: history[-1]), + "mean": staticmethod(lambda history: round(sum(history) / len(history))), + "max": staticmethod(max), + "min": staticmethod(min), + } + _scaling_function = _scaling_functions[scaling_function] + # Cloudpickled policy definition. _serialized_policy_def: bytes = PrivateAttr(default=b"") @@ -101,6 +112,15 @@ def replicas_settings_valid(cls, max_replicas, values): return max_replicas + @validator("scaling_function", always=True) + def scaling_function_valid(cls, scaling_function: str): + if scaling_function not in cls._scaling_functions: + raise ValueError( + f"scaling_function must be one of {list(cls._scaling_functions.keys())}" + ) + cls._scaling_function = cls._scaling_functions[scaling_function] + return scaling_function + def __init__(self, **kwargs): super().__init__(**kwargs) self.serialize_policy() @@ -153,6 +173,9 @@ def get_downscaling_factor(self) -> PositiveFloat: def get_target_ongoing_requests(self) -> PositiveFloat: return self.target_ongoing_requests + def decide_num_replicas(self, history: List[int]) -> PositiveInt: + return round(self._scaling_function(history)) + # Keep in sync with ServeDeploymentMode in dashboard/client/src/type/serve.ts @Deprecated diff --git a/python/ray/serve/tests/test_autoscaling_policy.py b/python/ray/serve/tests/test_autoscaling_policy.py index c39af8740690..94ea97cd405a 100644 --- a/python/ray/serve/tests/test_autoscaling_policy.py +++ b/python/ray/serve/tests/test_autoscaling_policy.py @@ -23,6 +23,7 @@ ReplicaState, ) from ray.serve._private.constants import ( + CONTROL_LOOP_INTERVAL_S, RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE, SERVE_DEFAULT_APP_NAME, SERVE_NAMESPACE, @@ -1498,6 +1499,80 @@ def check_expected_statuses( print("Statuses are as expected.") +@pytest.mark.parametrize("scaling_function", ["min", "max", "mean", "last"]) +def test_autoscaling_decision_functions(serve_instance_with_signal, scaling_function): + client, signal = serve_instance_with_signal + + # Compute over multiple control loop cycles to correct for misalignment + interval = CONTROL_LOOP_INTERVAL_S * 15 + decision_history = [1, 9, 2] + scaling_delay = interval * len(decision_history) + app_config = { + "import_path": "ray.serve.tests.test_config_files.get_signal.app", + "deployments": [ + { + "name": "A", + "autoscaling_config": { + "metrics_interval_s": 0.05, + "min_replicas": 0, + "max_replicas": 100, + "initial_replicas": 1, + "look_back_period_s": 0.2, + "downscale_delay_s": scaling_delay, + "upscale_delay_s": scaling_delay, + "target_ongoing_requests": 1.0, + "scaling_function": scaling_function, + }, + "graceful_shutdown_timeout_s": 1, + "max_ongoing_requests": 1000, + } + ], + } + + # Wait for downscaling from 1 to 0 to sync with CONTROL_LOOP_INTERVAL_S + client.deploy_apps(ServeDeploySchema(**{"applications": [app_config]})) + wait_for_condition( + check_deployment_status, + name="A", + expected_status=DeploymentStatus.HEALTHY, + retry_interval_ms=5, + ) + h = serve.get_app_handle(SERVE_DEFAULT_APP_NAME) + print("Waiting for downscaling signal to align with control loop.") + wait_for_condition( + check_deployment_status, + name="A", + expected_status=DeploymentStatus.DOWNSCALING, + retry_interval_ms=5, + timeout=scaling_delay + 1, + ) + print("Downscaling detected.") + + for replica_count in decision_history: + ray.get(signal.send.remote(clear=True)) + [h.remote() for _ in range(replica_count)] + time.sleep(interval) + + # Check that the scaling function is working as expected + if scaling_function == "min": + expected_replicas = min(decision_history) + elif scaling_function == "max": + expected_replicas = max(decision_history) + elif scaling_function == "mean": + expected_replicas = round(sum(decision_history) / len(decision_history)) + elif scaling_function == "last": + expected_replicas = decision_history[-1] + + print("Expected replicas: ", expected_replicas) + wait_for_condition( + check_num_replicas_eq, name="A", target=expected_replicas, timeout=scaling_delay + ) + + # Release signal so we don't get an ugly error message from the + # replica when the signal actor goes out of scope and gets killed + ray.get(signal.send.remote()) + + if __name__ == "__main__": import sys diff --git a/python/ray/serve/tests/unit/test_autoscaling_policy.py b/python/ray/serve/tests/unit/test_autoscaling_policy.py index ac3960103f9d..f8d7206eb384 100644 --- a/python/ray/serve/tests/unit/test_autoscaling_policy.py +++ b/python/ray/serve/tests/unit/test_autoscaling_policy.py @@ -1,8 +1,12 @@ +import itertools import sys import pytest -from ray.serve._private.constants import CONTROL_LOOP_INTERVAL_S +from ray.serve._private.constants import ( + CONTROL_LOOP_INTERVAL_S, + MAX_SCALING_HISTORY_LENGTH, +) from ray.serve.autoscaling_policy import ( _calculate_desired_num_replicas, replica_queue_length_autoscaling_policy, @@ -635,6 +639,92 @@ def test_single_replica_receives_all_requests(self, ongoing_requests): ) assert new_num_replicas == ongoing_requests / target_requests + @pytest.mark.parametrize("scaling_function", ["min", "max", "mean", "last"]) + @pytest.mark.parametrize("scaling_delay", [0, 1, 10]) + def test_scaling_decision_functions(self, scaling_function, scaling_delay): + policy_state = {} + min_replicas, max_replicas = 1, 10000 + config = AutoscalingConfig( + min_replicas=min_replicas, + max_replicas=max_replicas, + target_ongoing_requests=1, + upscale_delay_s=scaling_delay, + downscale_delay_s=scaling_delay, + scaling_function=scaling_function, + ) + + current_replicas = 1 + request_history = [] + # Scaling from 0-1 does not utilize policy_state, start at 2 + for i in itertools.chain(range(2, max_replicas), range(max_replicas, 2, -1)): + request_history.append(i) + new_num_replicas = replica_queue_length_autoscaling_policy( + config=config, + total_num_requests=i, + num_running_replicas=current_replicas, + curr_target_num_replicas=current_replicas, + capacity_adjusted_min_replicas=min_replicas, + capacity_adjusted_max_replicas=max_replicas, + policy_state=policy_state, + ) + + # Verify scaling decisions + if current_replicas != new_num_replicas: + if scaling_function == "min": + assert new_num_replicas == min(request_history) + elif scaling_function == "max": + assert new_num_replicas == max(request_history) + elif scaling_function == "mean": + assert new_num_replicas == round( + sum(request_history) / len(request_history) + ) + elif scaling_function == "last": + assert new_num_replicas == request_history[-1] + request_history = [] + # Reset expected scaling decision on direction reversal + elif current_replicas == i: + request_history = [] + + current_replicas = new_num_replicas + + # Verify scaling decisions align with expected policy_state + assert policy_state["decision_history"] == request_history + assert abs(policy_state["decision_counter"]) == len( + policy_state["decision_history"] + ) + + def test_decision_history_bounds(self): + min_replicas, max_replicas = 0, 2 + iterations = 100 + config = AutoscalingConfig( + min_replicas=min_replicas, + max_replicas=max_replicas, + target_ongoing_requests=1, + upscale_delay_s=MAX_SCALING_HISTORY_LENGTH / CONTROL_LOOP_INTERVAL_S + + iterations, + downscale_delay_s=MAX_SCALING_HISTORY_LENGTH / CONTROL_LOOP_INTERVAL_S + + iterations, + ) + + policy_state = { + "decision_history": [2] * MAX_SCALING_HISTORY_LENGTH, + "decision_counter": MAX_SCALING_HISTORY_LENGTH, + } + + for i in itertools.repeat(2, iterations): + _ = replica_queue_length_autoscaling_policy( + config=config, + total_num_requests=i, + num_running_replicas=1, + curr_target_num_replicas=1, + capacity_adjusted_min_replicas=min_replicas, + capacity_adjusted_max_replicas=max_replicas, + policy_state=policy_state, + ) + # Decision history should be bounded, while decision counter increases + assert len(policy_state["decision_history"]) <= MAX_SCALING_HISTORY_LENGTH + assert policy_state["decision_counter"] > MAX_SCALING_HISTORY_LENGTH + if __name__ == "__main__": sys.exit(pytest.main(["-v", "-s", __file__])) diff --git a/src/ray/protobuf/serve.proto b/src/ray/protobuf/serve.proto index 1efe5336f171..1477cedb6964 100644 --- a/src/ray/protobuf/serve.proto +++ b/src/ray/protobuf/serve.proto @@ -72,6 +72,9 @@ message AutoscalingConfig { // The multiplicative "gain" factor to limit downscale. optional double downscaling_factor = 15; + + // How scaling decisions are made based on the calculated replica counts. + string scaling_function = 16; } //[Begin] LOGGING CONFIG