Skip to content

Commit

Permalink
Merge pull request #2041 from FedML-AI/dimitris/autoscaler-hotfix-v2
Browse files Browse the repository at this point in the history
[Deploy] Refine Autoscaling Algorithm
  • Loading branch information
Raphael-Jin authored Apr 26, 2024
2 parents b18f85d + a4aac59 commit cdbacbf
Show file tree
Hide file tree
Showing 8 changed files with 428 additions and 189 deletions.
13 changes: 10 additions & 3 deletions python/fedml/computing/scheduler/comm_utils/job_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
from .job_utils import JobRunnerUtils
from ..model_scheduler.device_http_proxy_inference_protocol import FedMLHttpProxyInference
from ..model_scheduler.device_model_cache import FedMLModelCache
from ..model_scheduler.autoscaler.autoscaler import Autoscaler, EWMPolicy, ConcurrentQueryPolicy
from ..model_scheduler.autoscaler.autoscaler import Autoscaler
from ..model_scheduler.autoscaler.policies import ConcurrentQueryPolicy
from ..model_scheduler.device_model_db import FedMLModelDatabase
from ..model_scheduler.device_mqtt_inference_protocol import FedMLMqttInference
from ..slave import client_constants
Expand Down Expand Up @@ -109,8 +110,14 @@ def autoscaler_reconcile_after_interval(self):
# Set the policy, here we use latency, but other metrics are possible as well, such as qps.
# For more advanced use cases look for the testing scripts under the autoscaler/test directory.
autoscaling_policy_config = \
{"queries_per_replica": endpoint_settings["target_queries_per_replica"],
"window_size_secs": endpoint_settings["aggregation_window_size_seconds"]}
{
"current_replicas": int(endpoint_settings["replica_num"]),
"min_replicas": int(endpoint_settings["scale_min"]),
"max_replicas": int(endpoint_settings["scale_max"]),
"queries_per_replica": int(endpoint_settings["target_queries_per_replica"]),
"window_size_secs": int(endpoint_settings["aggregation_window_size_seconds"]),
"scaledown_delay_secs": int(endpoint_settings["scale_down_delay_seconds"]),
}
autoscaling_policy = ConcurrentQueryPolicy(**autoscaling_policy_config)

e_id, e_name, model_name = endpoint_settings["endpoint_id"], endpoint_settings["endpoint_name"], \
Expand Down
271 changes: 142 additions & 129 deletions python/fedml/computing/scheduler/model_scheduler/autoscaler/autoscaler.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from pydantic import BaseModel, field_validator, NonNegativeInt, NonNegativeFloat


class AutoscalingPolicy(BaseModel):
"""
Below are some default values for every endpoint.
The following parameters refer to:
- current_replicas: the number of currently running replicas of the endpoint
- min_replicas: the minimum number of replicas of the endpoint in the instance group
- max_replicas: the maximum number of replicas of the endpoint in the instance group
- release_replica_after_idle_secs: when to release a single idle replica
- scaledown_delay_secs: how many seconds to wait before performing a scale down operation
- scaleup_cost_secs: how many seconds it takes/costs to perform a scale up operation
- previous_triggering_value: the last value that triggered a scaling operation
The `replica_idle_grace_secs` parameter is used as
the monitoring interval after which a running replica
of an idle endpoint should be released.
"""
current_replicas: NonNegativeInt
min_replicas: NonNegativeInt
max_replicas: NonNegativeInt
previous_triggering_value: float = None
release_replica_after_idle_secs: NonNegativeInt = 300 # default is after 5 minutes
scaledown_delay_secs: NonNegativeInt = 60 # default is 1 minute
scaleup_cost_secs: NonNegativeInt = 300 # default is 5 minutes


class EWMPolicy(AutoscalingPolicy):
"""
Configuration parameters for the reactive autoscaling policy.
EWM stands for Exponential Weighted Calculations, since we use
the pandas.DataFrame.ewm() functionality.
For panda's EWM using alpha = 0.1, we indicate that the most recent
values are weighted more. The reason is that the exponential weighted
mean formula in pandas is computed as:
Yt = X_t + (1-a) * X_{t-1} + (1-a)^2 X_{t-2} / (1 + (1-a) + (1-a)^2)
The following parameters refer to:
- ewm_mins: the length of the interval we consider for reactive decision
- ewm_alpha: the decay factor for the exponential weighted interval
- ewm_latest: the latest recorded value of the metric's exponential weighted mean
- ub_threshold: the upper bound scaling factor threshold for reactive decision
- lb_threshold: the lower bound scaling factor threshold for reactive decision
Example:
Let's say that we consider 15 minutes as the length of our interval and a
decay factor alpha with a value of 0.5:
Original Sequence: [0.1, 0.2, 0.4, 3, 5, 10]
EWM Sequence: [0.1, [0.166, 0.3, 1.74, 3.422, 6.763]
If we assume that our previous scaling operation was triggered at value Y,
then the conditions we use to decide whether to scale up or down are:
Latency:
ScaleUP: X > ((1 + ub_threshold) * Y)
ScaleDown: X < (lb_threshold * Y)
QPS:
ScaleUP: X < (lb_threshold * Y)
ScaleDown: X < ((1 + ub_threshold) * Y)
In other words, QPS is the inverse of Latency and vice versa.
"""
metric: str # possible values: ["ewm_latency", "ewm_qps"]
ewm_mins: NonNegativeInt # recommended value: 15 minutes
ewm_alpha: NonNegativeFloat # recommended value: 0.1
ewm_latest: NonNegativeFloat = None # will be filled by the algorithm
ub_threshold: NonNegativeFloat # recommended value: 0.5
lb_threshold: NonNegativeFloat # recommended value: 0.5

@field_validator("metric")
def validate_option(cls, v):
assert v in ["ewm_latency", "ewm_qps"]
return v


class ConcurrentQueryPolicy(AutoscalingPolicy):
"""
This policy captures the number of queries we want to support
per replica over the defined window length in seconds.
"""
queries_per_replica: NonNegativeInt # recommended is at least 1 query
window_size_secs: NonNegativeInt # recommended is at least 60seconds


class MeetTrafficDemandPolicy(AutoscalingPolicy):
"""
This policy captures the number of queries we want to support
per replica over the defined window length in seconds.
"""
window_size_secs: NonNegativeInt


class PredictivePolicy(AutoscalingPolicy):
# TODO(fedml-dimitris): TO BE COMPLETED!
pass
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,33 @@
import time

from collections import namedtuple
from fedml.computing.scheduler.model_scheduler.autoscaler.autoscaler import Autoscaler, EWMPolicy, ConcurrentQueryPolicy
from fedml.computing.scheduler.model_scheduler.autoscaler.policies import *
from fedml.computing.scheduler.model_scheduler.autoscaler.autoscaler import Autoscaler, ScaleOp
from fedml.computing.scheduler.model_scheduler.device_model_cache import FedMLModelCache
from fedml.core.mlops.mlops_runtime_log import MLOpsRuntimeLog

ENV_REDIS_ADDR = "local"
ENV_REDIS_PORT = 6379
ENV_REDIS_PASSWD = "fedml_default"
ENV_ENDPOINT_ID_1 = 12345
ENV_ENDPOINT_ID_2 = 77777


class AutoscalerTest(unittest.TestCase):

@classmethod
def populate_redis_with_dummy_metrics(cls):
fedml_model_cache = FedMLModelCache.get_instance()
fedml_model_cache.set_redis_params(ENV_REDIS_ADDR, ENV_REDIS_PORT, ENV_REDIS_PASSWD)
fedml_model_cache.set_monitor_metrics(
ENV_ENDPOINT_ID_1, "", "", "", 5, 5, 5, 10, 100, 100, int(time.time_ns() / 1000), 0)

@classmethod
def clear_redis(cls):
fedml_model_cache = FedMLModelCache.get_instance()
# Clean up redis after test.
fedml_model_cache.delete_endpoint_metrics(
endpoint_ids=[ENV_ENDPOINT_ID_1])

def test_autoscaler_singleton_pattern(self):
autoscaler_1 = Autoscaler.get_instance()
autoscaler_2 = Autoscaler.get_instance()
Expand All @@ -24,72 +38,118 @@ def test_autoscaler_singleton_pattern(self):
self.assertTrue(autoscaler_1 is autoscaler_2)

def test_scale_operation_single_endpoint_ewm_policy(self):

# Populate redis with some dummy values for each endpoint before running the test.
fedml_model_cache = FedMLModelCache.get_instance()
fedml_model_cache.set_redis_params(ENV_REDIS_ADDR, ENV_REDIS_PORT, ENV_REDIS_PASSWD)
fedml_model_cache.set_monitor_metrics(
ENV_ENDPOINT_ID_1, "", "", "", 5, 5, 5, 10, 100, 100, int(time.time_ns() / 1000), 0)
fedml_model_cache.set_monitor_metrics(
ENV_ENDPOINT_ID_1, "", "", "", 5, 5, 5, 10, 100, 100, int(time.time_ns() / 1000), 0)

self.populate_redis_with_dummy_metrics()
# Create autoscaler instance and define policy.
autoscaler = Autoscaler.get_instance()
latency_reactive_policy_default = {
"current_replicas": 1,
"min_replicas": 1,
"max_replicas": 1,
"current_replicas": 1,
"metric": "ewm_latency",
"ewm_mins": 15,
"ewm_alpha": 0.5,
"ub_threshold": 0.5,
"lb_threshold": 0.5
}

autoscaling_policy = EWMPolicy(**latency_reactive_policy_default)
scale_op_1 = autoscaler.scale_operation_endpoint(
autoscaling_policy,
endpoint_id=ENV_ENDPOINT_ID_1)
scale_op_2 = autoscaler.scale_operation_endpoint(
autoscaling_policy,
endpoint_id=ENV_ENDPOINT_ID_2)

# Clean up redis after test.
fedml_model_cache.delete_model_endpoint_metrics(
endpoint_ids=[ENV_ENDPOINT_ID_1, ENV_ENDPOINT_ID_2])

# TODO Change to ScaleUP or ScaleDown not only not None.
self.assertIsNotNone(scale_op_1)
self.assertIsNotNone(scale_op_2)
self.clear_redis()

def test_scale_operation_single_endpoint_concurrency_query_policy(self):
self.populate_redis_with_dummy_metrics()
# Create autoscaler instance and define policy.
autoscaler = Autoscaler.get_instance()
concurrent_query_policy = {
"current_replicas": 1,
"min_replicas": 1,
"max_replicas": 1,
"queries_per_replica": 1,
"window_size_secs": 60
}
autoscaling_policy = ConcurrentQueryPolicy(**concurrent_query_policy)
scale_op_1 = autoscaler.scale_operation_endpoint(
autoscaling_policy,
endpoint_id=ENV_ENDPOINT_ID_1)

# Populate redis with some dummy values for each endpoint before running the test.
fedml_model_cache = FedMLModelCache.get_instance()
fedml_model_cache.set_redis_params(ENV_REDIS_ADDR, ENV_REDIS_PORT, ENV_REDIS_PASSWD)
fedml_model_cache.set_monitor_metrics(
ENV_ENDPOINT_ID_1, "", "", "", 5, 5, 5, 10, 100, 100, int(time.time_ns() / 1000), 0)
# TODO Change to ScaleUP or ScaleDown not only not None.
self.assertIsNotNone(scale_op_1)
self.clear_redis()

def test_scale_operation_single_endpoint_meet_traffic_demand_query_policy(self):
self.populate_redis_with_dummy_metrics()
# Create autoscaler instance and define policy.
autoscaler = Autoscaler.get_instance()
concurrent_query_policy = {
"current_replicas": 1,
"min_replicas": 1,
"max_replicas": 1,
"current_replicas": 1,
"queries_per_replica": 2, "window_size_secs": 60
"window_size_secs": 60
}

autoscaling_policy = EWMPolicy(**concurrent_query_policy)
autoscaling_policy = MeetTrafficDemandPolicy(**concurrent_query_policy)
scale_op_1 = autoscaler.scale_operation_endpoint(
autoscaling_policy,
endpoint_id=ENV_ENDPOINT_ID_1)

# Clean up redis after test.
fedml_model_cache.delete_model_endpoint_metrics(
endpoint_ids=[ENV_ENDPOINT_ID_1])

# TODO Change to ScaleUP or ScaleDown not only not None.
self.assertIsNotNone(scale_op_1)
self.clear_redis()

def test_validate_scaling_bounds(self):
# Create autoscaler instance and define policy.
autoscaler = Autoscaler.get_instance()
autoscaling_policy = {
"current_replicas": 2,
"min_replicas": 1,
"max_replicas": 3,
}
autoscaling_policy = AutoscalingPolicy(**autoscaling_policy)

# Validate scale up.
scale_up = autoscaler.validate_scaling_bounds(ScaleOp.UP_OUT_OP, autoscaling_policy)
self.assertEquals(scale_up, ScaleOp.UP_OUT_OP)

# Validate scale down.
scale_down = autoscaler.validate_scaling_bounds(ScaleOp.DOWN_IN_OP, autoscaling_policy)
self.assertEquals(scale_down, ScaleOp.DOWN_IN_OP)

# Validate max out-of-bounds.
autoscaling_policy.current_replicas = 3
scale_oob_max = autoscaler.validate_scaling_bounds(ScaleOp.UP_OUT_OP, autoscaling_policy)
self.assertEquals(scale_oob_max, ScaleOp.NO_OP)

# Validate min out-of-bounds.
autoscaling_policy.current_replicas = 1
scale_oob_min = autoscaler.validate_scaling_bounds(ScaleOp.DOWN_IN_OP, autoscaling_policy)
self.assertEquals(scale_oob_min, ScaleOp.NO_OP)

def test_enforce_scaling_down_delay_interval(self):
self.populate_redis_with_dummy_metrics()
# Create autoscaler instance and define policy.
autoscaler = Autoscaler.get_instance()
autoscaling_policy = {
"current_replicas": 1,
"min_replicas": 1,
"max_replicas": 1,
}
autoscaling_policy = AutoscalingPolicy(**autoscaling_policy)

autoscaling_policy.scaledown_delay_secs = 0.0
scale_down = autoscaler.enforce_scaling_down_delay_interval(ENV_ENDPOINT_ID_1, autoscaling_policy)
self.assertEquals(scale_down, ScaleOp.DOWN_IN_OP)

autoscaling_policy.scaledown_delay_secs = 1
scale_noop = autoscaler.enforce_scaling_down_delay_interval(ENV_ENDPOINT_ID_1, autoscaling_policy)
self.assertEquals(scale_noop, ScaleOp.NO_OP)

time.sleep(2)
scale_down = autoscaler.enforce_scaling_down_delay_interval(ENV_ENDPOINT_ID_1, autoscaling_policy)
self.assertEquals(scale_down, ScaleOp.DOWN_IN_OP)
self.clear_redis()


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit cdbacbf

Please sign in to comment.