Skip to content

Commit

Permalink
Merge pull request #2077 from FedML-AI/dimitris/autoscaler-hotfix-v4
Browse files Browse the repository at this point in the history
Disabling replica release after idle secs.
  • Loading branch information
Raphael-Jin authored May 2, 2024
2 parents a271f18 + 2ff516e commit 5267742
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 39 deletions.
32 changes: 32 additions & 0 deletions python/fedml/computing/scheduler/comm_utils/job_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,15 @@ def monitor_slave_endpoint_status(self):
is_endpoint_ready = self._check_and_reset_endpoint_status(
job.job_id, job.edge_id, deployment_result, only_check_inference_ready_status=True)

# [Hotfix] Under high-concurrency situation, the ready endpoint might not be available
# But the container is in health state
# In this case, we need to have an exact 503 code, instead of timeout to decide to restart
# TODO(Raphael): Split the /ready endpoint and predict endpoint traffic
if not self._lenient_check_replica_ready(deployment_result):
is_endpoint_ready = False
else:
is_endpoint_ready = True

# Get endpoint container name prefix, prepare for restart
endpoint_container_name_prefix = \
(device_client_constants.ClientConstants.get_endpoint_container_name(
Expand Down Expand Up @@ -736,6 +745,28 @@ def monitor_slave_endpoint_status(self):
except Exception as e:
pass

def _lenient_check_replica_ready(
self, deployment_result
):
"""
Double-check the replica's liveness using /ready api:
if 200 -> return True
[Critical] if timeout -> Could be under high pressure -> return True
if HTTP_202_ACCEPTED -> unhealthy -> return False
"""
result_json = deployment_result
inference_url = result_json.get("model_url", None)

# Make a curl get to inference_url with timeout 5s
# TODO(Raphael): Also support PROXY and MQTT to check the readiness
response_ok = asyncio.run(FedMLHttpInference.is_inference_ready(inference_url, timeout=5))
if response_ok is None:
# This means the server return 202
return False

# 200 or Timeout
return True

def _check_and_reset_endpoint_status(
self, endpoint_id, device_id, deployment_result, only_check_inference_ready_status=False,
should_release_gpu_ids=False
Expand All @@ -761,6 +792,7 @@ def _check_and_reset_endpoint_status(

if self.endpoint_unavailable_counter.get(str(endpoint_id)) is None:
self.endpoint_unavailable_counter[str(endpoint_id)] = 0

if not response_ok:
self.endpoint_unavailable_counter[str(endpoint_id)] += 1
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from enum import Enum
from fedml.computing.scheduler.model_scheduler.device_model_cache import FedMLModelCache
from fedml.computing.scheduler.model_scheduler.autoscaler.policies import *
from utils.singleton import Singleton
from fedml.computing.scheduler.model_scheduler.autoscaler.utils.singleton import Singleton


class ScaleOp(Enum):
Expand Down Expand Up @@ -38,6 +38,26 @@ def get_current_timestamp_micro_seconds(cls):
# in REDIS we record/operate in micro-seconds, hence the division by 1e3!
return int(format(time.time_ns() / 1000.0, '.0f'))

@classmethod
def filter_by_timestamp(cls,
metrics,
before_now_minutes=None,
before_now_seconds=None) -> pd.DataFrame:

# We subtract the number of seconds/minutes from the current timestamp, and then we query
# the data frame to fetch all the records whose timestamp is within the given range.
# By default, we return all records.
filtered = metrics
if before_now_minutes:
less_than_ts = \
str(pd.Timestamp.now() - pd.Timedelta(minutes=before_now_minutes))
filtered = metrics.query("'{}' <= {}".format(less_than_ts, "timestamp"))
if before_now_seconds:
less_than_ts = \
str(pd.Timestamp.now() - pd.Timedelta(seconds=before_now_seconds))
filtered = metrics.query("'{}' <= {}".format(less_than_ts, "timestamp"))
return filtered

@classmethod
def scale_operation_predictive(cls,
predictive_policy: PredictivePolicy,
Expand All @@ -51,17 +71,23 @@ def scale_operation_ewm(cls,
ewm_policy: EWMPolicy,
metrics: pd.DataFrame) -> ScaleOp:

logging.info("Executing the ExponentialWeightMoving average autoscaling policy.")
# Adding the context below to avoid having a series of warning messages.
with warnings.catch_warnings():
warnings.simplefilter(action='ignore', category=FutureWarning)
period_data = metrics.last("{}min".format(ewm_policy.ewm_mins))
# If the data frame window is empty then do nothing more, just return.
if period_data.empty:
return ScaleOp.NO_OP
metric_name = "current_latency" \
if "ewm_latency" == ewm_policy.metric else "current_qps"
ewm_period = period_data[metric_name] \
.ewm(alpha=ewm_policy.ewm_alpha).mean()
period_data = cls.filter_by_timestamp(metrics,
before_now_minutes=ewm_policy.ewm_mins)

# If the data frame window is empty then it means we
# did not have any incoming request, so we need to scale down.
if period_data.empty:
return ScaleOp.DOWN_IN_OP

# Otherwise, we proceed as normal.
metric_name = "current_latency" \
if "ewm_latency" == ewm_policy.metric else "current_qps"
ewm_period = period_data[metric_name] \
.ewm(alpha=ewm_policy.ewm_alpha).mean()

scale_op = ScaleOp.NO_OP
# If there is no exponential moving average within this
Expand Down Expand Up @@ -110,15 +136,21 @@ def scale_operation_query_concurrency(cls,
concurrent_query_policy: ConcurrentQueryPolicy,
metrics: pd.DataFrame) -> ScaleOp:

logging.info("Executing the QueryConcurrency autoscaling policy.")
# Adding the context below to avoid having a series of warning messages.
with warnings.catch_warnings():
warnings.simplefilter(action='ignore', category=FutureWarning)
# Here, the number of queries is the number of rows in the short period data frame.
period_data = metrics.last("{}s".format(concurrent_query_policy.window_size_secs))
# If the data frame window is empty then do nothing more, just return.
if period_data.empty:
return ScaleOp.NO_OP
queries_num = period_data.shape[0]
period_data = cls.filter_by_timestamp(
metrics,
before_now_seconds=concurrent_query_policy.window_size_secs)

# If the data frame window is empty then it means we
# did not have any incoming request, so we need to scale down.
if period_data.empty:
return ScaleOp.DOWN_IN_OP

# Otherwise, we proceed as normal.
queries_num = period_data.shape[0]

try:
# QSR: Queries per Second per Replica: (Number of Queries / Number of Current Replicas) / Window Size
Expand Down Expand Up @@ -154,15 +186,20 @@ def scale_operation_meet_traffic_demand(cls,
meet_traffic_demand_policy: MeetTrafficDemandPolicy,
metrics: pd.DataFrame) -> ScaleOp:

logging.info("Executing the MeetTrafficDemand autoscaling policy.")
# Adding the context below to avoid having a series of warning messages.
with warnings.catch_warnings():
warnings.simplefilter(action='ignore', category=FutureWarning)
# Here, the number of queries is the number of rows in the short period data frame.
period_data = metrics.last("{}s".format(meet_traffic_demand_policy.window_size_secs))
# If the data frame window is empty then do nothing more, just return.
if period_data.empty:
return ScaleOp.NO_OP
period_data = cls.filter_by_timestamp(
metrics,
before_now_seconds=meet_traffic_demand_policy.window_size_secs)

# If the data frame window is empty then it means we
# did not have any incoming request, so we need to scale down.
if period_data.empty:
return ScaleOp.DOWN_IN_OP

# Otherwise, we proceed as normal.
period_requests_num = period_data.shape[0]
all_latencies = metrics["current_latency"]
# Original value is milliseconds, convert to seconds.
Expand Down Expand Up @@ -216,6 +253,7 @@ def run_autoscaling_policy(self,
def validate_scaling_bounds(cls,
scale_op: ScaleOp,
autoscaling_policy: AutoscalingPolicy) -> ScaleOp:
logging.info("Validating scaling bounds.")
# We cannot be lower than the minimum number of replicas,
# nor exceed the maximum number of requested replicas.
new_running_replicas = autoscaling_policy.current_replicas + scale_op.value
Expand All @@ -242,6 +280,7 @@ def enforce_scaling_down_delay_interval(self,

# If the policy has no scaledown delay then return immediately.
if autoscaling_policy.scaledown_delay_secs == 0:
logging.info("No scale down delay, so scale down immediately.")
return ScaleOp.DOWN_IN_OP

# By default, we return a no operation.
Expand All @@ -256,9 +295,12 @@ def enforce_scaling_down_delay_interval(self,
self.fedml_model_cache.get_endpoint_scaling_down_decision_time(endpoint_id)
diff_secs = (current_timestamp - previous_timestamp) / 1e6
if diff_secs > autoscaling_policy.scaledown_delay_secs:
logging.info("Scaling down since the time difference: {}secs, "
"is above the delay period: {} secs.".format(
diff_secs, autoscaling_policy.scaledown_delay_secs))
# At this point, we will perform the scaling down operation, hence
# we need to delete the previously stored scaling down timestamp (if any).
self.fedml_model_cache.delete_endpoint_scaling_down_decision_time(endpoint_id)
self.clean_up_scaling_down_operation_state(endpoint_id)
scale_op = ScaleOp.DOWN_IN_OP
else:
# Record the timestamp of the scaling down operation.
Expand All @@ -268,7 +310,8 @@ def enforce_scaling_down_delay_interval(self,
return scale_op

def clean_up_scaling_down_operation_state(self, endpoint_id) -> bool:
# We return True if the clean up operation succeeded, else False.
# We return True if the cleaning up operation succeeded, else False.
logging.info("Cleaning up scale down state from Redis.")
to_clean_up = \
self.fedml_model_cache.exists_endpoint_scaling_down_decision_time(endpoint_id)
if to_clean_up:
Expand All @@ -293,35 +336,40 @@ def scale_operation_endpoint(self,
0: do nothing
"""

# Fetch most recent metric record from the database.
# Fetch all metrics record from the database.
metrics = self.fedml_model_cache.get_endpoint_metrics(
endpoint_id=endpoint_id)

# Default to nothing.
scale_op = ScaleOp.NO_OP
if not metrics:
# If no metric exists then no scaling operation.
logging.info("No existing metric, so no scaling operation.")
return scale_op

# If we continue here, then it means that there was at least one request.
# The `most_recent_metric` is of type list, hence we need to access index 0.
most_recent_metric = metrics[-1]
latest_request_timestamp_micro_secs = most_recent_metric["timestamp"]
# The time module does not have a micro-second function built-in, so we need to
# divide nanoseconds by 1e3 and convert to micro-seconds.
current_time_micro_seconds = time.time_ns() / 1e3
# compute elapsed time and convert to seconds
elapsed_time_secs = \
(current_time_micro_seconds - latest_request_timestamp_micro_secs) / 1e6
if elapsed_time_secs > autoscaling_policy.release_replica_after_idle_secs:
if autoscaling_policy.release_replica_after_idle_secs:
# At this point it means that there was at least one request. The
# `most_recent_metric` is of type list, hence we need to access index 0.
most_recent_metric = metrics[-1]
latest_request_timestamp_micro_secs = most_recent_metric["timestamp"]
# The time module does not have a micro-second function built-in,
# so we need to divide nanoseconds by 1e3 and convert to micro-seconds.
current_time_micro_seconds = time.time_ns() / 1e3
# Compute the elapsed time and convert to seconds.
elapsed_time_secs = \
(current_time_micro_seconds - latest_request_timestamp_micro_secs) / 1e6
# If the elapsed time is greater than the requested idle time,
# in other words there was no incoming request then scale down.
scale_op = ScaleOp.DOWN_IN_OP
if elapsed_time_secs > autoscaling_policy.release_replica_after_idle_secs:
logging.info("Endpoint remained idle for {} seconds, need to scale down.".format(
elapsed_time_secs))
scale_op = ScaleOp.DOWN_IN_OP
else:
# Otherwise, it means there was a request within the elapsed time, then:
# Otherwise, it means there was a request within the elapsed time, then,
# Check if the current number of running replicas is 0 it means
# we need more resources, hence we need to scale up: ScaleOp.UP_OUT_OP.
if autoscaling_policy.current_replicas == 0:
# Check if the current number of running replicas is 0,
# then we need more resources, hence ScaleOp.UP_OUT_OP.
logging.info("Incoming requests but with 0 replicas, scaling up.")
scale_op = ScaleOp.UP_OUT_OP
else:
# Else, trigger the autoscaling policy with all existing values.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class AutoscalingPolicy(BaseModel):
min_replicas: NonNegativeInt
max_replicas: NonNegativeInt
previous_triggering_value: float = None
release_replica_after_idle_secs: NonNegativeInt = 300 # default is after 5 minutes
release_replica_after_idle_secs: NonNegativeInt = None
scaledown_delay_secs: NonNegativeInt = 60 # default is 1 minute
scaleup_cost_secs: NonNegativeInt = 300 # default is 5 minutes

Expand Down

0 comments on commit 5267742

Please sign in to comment.