Skip to content

Commit

Permalink
[Serve] Fail early for user app failure and expose failure reasons (#…
Browse files Browse the repository at this point in the history
…3411)

* expose detailed replica failure and rename service failure to crash loop

* fix path in test

* add target qps

* shorter wait time

* fix smoke

* fix smoke test

* add `;` back

* Revert crash loop

* update failed_status

* typo

* format

* Add initial delay failure

* Add initial delay failure

* format

* do not scale when user app fails

* format

* Add tests for failure statuses

* syntax error

* make service termination more robust

* fix smoke test

* Fix permission issue if not tpu is not needed

* fix test

* fail early for initial delay timeout

* format

* format

* remove unecessary logger

* fix

* explicit annotation of scale down

* fix logs

* format

* fix test with non spot

* Address comments

* add comments

* longer time

* Update sky/serve/autoscalers.py

Co-authored-by: Tian Xia <cblmemo@gmail.com>

* Update sky/serve/autoscalers.py

Co-authored-by: Tian Xia <cblmemo@gmail.com>

* Update sky/serve/replica_managers.py

Co-authored-by: Tian Xia <cblmemo@gmail.com>

---------

Co-authored-by: Tian Xia <cblmemo@gmail.com>
  • Loading branch information
Michaelvll and cblmemo authored Apr 7, 2024
1 parent 48b8ca9 commit 48a5c63
Show file tree
Hide file tree
Showing 9 changed files with 232 additions and 72 deletions.
2 changes: 2 additions & 0 deletions sky/provision/gcp/instance_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,8 @@ def filter(
# Return empty dict instead of raising exception to not break.
if 'is not found or access is unauthorized.' in str(e):
return {}
if 'Permission \'tpu.nodes.list\' denied on' in str(e):
return {}
logger.debug(f'filter: googleapiclient.errors.HttpError: {e}')
raise

Expand Down
34 changes: 31 additions & 3 deletions sky/serve/autoscalers.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ def __init__(self, service_name: str,
# Target number of replicas is initialized to min replicas
self.target_num_replicas: int = spec.min_replicas
self.latest_version: int = constants.INITIAL_VERSION
# The latest_version_ever_ready should be smaller than the
# latest_version, so we can fail early if the initial version got
# unrecoverable failure.
self.latest_version_ever_ready: int = self.latest_version - 1
self.update_mode = serve_utils.DEFAULT_UPDATE_MODE

def update_version(self, version: int, spec: 'service_spec.SkyServiceSpec',
Expand Down Expand Up @@ -113,13 +117,25 @@ def from_spec(cls, service_name: str,
else:
return RequestRateAutoscaler(service_name, spec)

def _dump_dynamic_states(self) -> Dict[str, Any]:
"""Dump dynamic states from autoscaler."""
raise NotImplementedError

def dump_dynamic_states(self) -> Dict[str, Any]:
"""Dump dynamic states from autoscaler."""
states = {'latest_version_ever_ready': self.latest_version_ever_ready}
states.update(self._dump_dynamic_states())
return states

def _load_dynamic_states(self, dynamic_states: Dict[str, Any]) -> None:
"""Load dynamic states to autoscaler."""
raise NotImplementedError

def load_dynamic_states(self, dynamic_states: Dict[str, Any]) -> None:
"""Load dynamic states to autoscaler."""
raise NotImplementedError
self.latest_version_ever_ready = dynamic_states.pop(
'latest_version_ever_ready', constants.INITIAL_VERSION)
self._load_dynamic_states(dynamic_states)


class RequestRateAutoscaler(Autoscaler):
Expand Down Expand Up @@ -376,12 +392,24 @@ def evaluate_scaling(
override dict. Active migration could require returning both SCALE_UP
and SCALE_DOWN.
"""
latest_replicas: List['replica_managers.ReplicaInfo'] = []
latest_nonterminal_replicas: List['replica_managers.ReplicaInfo'] = []

for info in replica_infos:
if info.version == self.latest_version:
latest_replicas.append(info)
if not info.is_terminal:
latest_nonterminal_replicas.append(info)
if info.is_ready:
self.latest_version_ever_ready = self.latest_version
if self.latest_version_ever_ready < self.latest_version:
for info in latest_replicas:
if info.status_property.unrecoverable_failure():
# Stop scaling if one of replica of the latest version
# failed, it is likely that a fatal error happens to the
# user application and may lead to a infinte termination
# and restart.
return []

self._set_target_num_replica_with_hysteresis()

Expand Down Expand Up @@ -433,12 +461,12 @@ def evaluate_scaling(
logger.info('No scaling needed.')
return scaling_options

def dump_dynamic_states(self) -> Dict[str, Any]:
def _dump_dynamic_states(self) -> Dict[str, Any]:
return {
'request_timestamps': self.request_timestamps,
}

def load_dynamic_states(self, dynamic_states: Dict[str, Any]) -> None:
def _load_dynamic_states(self, dynamic_states: Dict[str, Any]) -> None:
if 'request_timestamps' in dynamic_states:
self.request_timestamps = dynamic_states.pop('request_timestamps')
if dynamic_states:
Expand Down
99 changes: 55 additions & 44 deletions sky/serve/replica_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,52 +235,59 @@ class ReplicaStatusProperty:
sky_launch_status: Optional[ProcessStatus] = None
user_app_failed: bool = False
service_ready_now: bool = False
# None means readiness probe is not executed yet;
# None means readiness probe is not succeeded yet;
# -1 means the initial delay seconds is exceeded.
first_ready_time: Optional[float] = None
# None means sky.down is not called yet.
sky_down_status: Optional[ProcessStatus] = None
# Whether the termination is caused by autoscaler's decision
is_scale_down: bool = False
# The replica's spot instance was preempted.
preempted: bool = False

def is_scale_down_succeeded(self, initial_delay_seconds: int) -> bool:
def remove_terminated_replica(self) -> bool:
"""Whether to remove the replica record from the replica table.
If not, the replica will stay in the replica table permanently to
notify the user that something is wrong with the user code / setup.
"""
if self.sky_launch_status == ProcessStatus.INTERRUPTED:
return True
if self.sky_launch_status != ProcessStatus.SUCCEEDED:
# sky_launch_status == RUNNING: a scale down happened before
# the sky.launch finished.
return self.sky_launch_status != ProcessStatus.FAILED
if self.sky_down_status != ProcessStatus.SUCCEEDED:
return self.is_scale_down

def unrecoverable_failure(self) -> bool:
"""Whether the replica fails and cannot be recovered.
Autoscaler should stop scaling if any of the replica has unrecoverable
failure, e.g., the user app fails before the service endpoint being
ready for the current version.
"""
replica_status = self.to_replica_status()
logger.info(
'Check replica unrecorverable: first_ready_time '
f'{self.first_ready_time}, user_app_failed {self.user_app_failed}, '
f'status {replica_status}')
if replica_status not in serve_state.ReplicaStatus.terminal_statuses():
return False
if self.preempted:
return True
if (self.first_ready_time is not None and
time.time() - self.first_ready_time > initial_delay_seconds):
# If the service is up for more than `initial_delay_seconds`,
# we assume there is no bug in the user code and the scale down
# is successful, thus enabling the controller to remove the
# replica from the replica table and auto restart the replica.
# Here we assume that initial_delay_seconds is larger than
# consecutive_failure_threshold_seconds, so if a replica is not
# teardown for initial_delay_seconds, it is safe to assume that
# it is UP for initial_delay_seconds.
# For replica with a failed sky.launch, it is likely due to some
# misconfigured resources, so we don't want to auto restart it.
# For replica with a failed sky.down, we cannot restart it since
# otherwise we will have a resource leak.
return True
if self.first_ready_time is not None:
if self.first_ready_time >= 0:
# If the service is ever up, we assume there is no bug in the
# user code and the scale down is successful, thus enabling the
# controller to remove the replica from the replica table and
# auto restart the replica.
# For replica with a failed sky.launch, it is likely due to some
# misconfigured resources, so we don't want to auto restart it.
# For replica with a failed sky.down, we cannot restart it since
# otherwise we will have a resource leak.
return False
else:
# If the initial delay exceeded, it is likely the service is not
# recoverable.
return True
if self.user_app_failed:
return False
if self.first_ready_time is None:
return True
if not self.service_ready_now:
return False
return self.first_ready_time >= 0.0
# TODO(zhwu): launch failures not related to resource unavailability
# should be considered as unrecoverable failure. (refer to
# `spot.recovery_strategy.StrategyExecutor::_launch`)
return False

def should_track_service_status(self) -> bool:
"""Should we track the status of the replica.
Expand Down Expand Up @@ -333,17 +340,17 @@ def to_replica_status(self) -> serve_state.ReplicaStatus:
return serve_state.ReplicaStatus.FAILED
if self.sky_launch_status == ProcessStatus.FAILED:
# sky.launch failed
return serve_state.ReplicaStatus.FAILED
return serve_state.ReplicaStatus.FAILED_PROVISION
if self.first_ready_time is None:
# readiness probe is not executed yet, but a scale down is
# triggered.
return serve_state.ReplicaStatus.SHUTTING_DOWN
if self.first_ready_time == -1:
# initial delay seconds exceeded
return serve_state.ReplicaStatus.FAILED
return serve_state.ReplicaStatus.FAILED_INITIAL_DELAY
if not self.service_ready_now:
# Max continuous failure exceeded
return serve_state.ReplicaStatus.FAILED
return serve_state.ReplicaStatus.FAILED_PROBING
# This indicate it is a scale_down with correct teardown.
# Should have been cleaned from the replica table.
return serve_state.ReplicaStatus.UNKNOWN
Expand Down Expand Up @@ -641,8 +648,11 @@ def scale_up(self,
self._launch_replica(self._next_replica_id, resources_override)
self._next_replica_id += 1

def _terminate_replica(self, replica_id: int, sync_down_logs: bool,
replica_drain_delay_seconds: int) -> None:
def _terminate_replica(self,
replica_id: int,
sync_down_logs: bool,
replica_drain_delay_seconds: int,
is_scale_down: bool = False) -> None:

if replica_id in self._launch_process_pool:
info = serve_state.get_replica_info_from_id(self._service_name,
Expand Down Expand Up @@ -725,6 +735,7 @@ def _download_and_stream_logs(info: ReplicaInfo):
args=(info.cluster_name, replica_drain_delay_seconds),
)
info.status_property.sky_down_status = ProcessStatus.RUNNING
info.status_property.is_scale_down = is_scale_down
serve_state.add_or_update_replica(self._service_name, replica_id, info)
p.start()
self._down_process_pool[replica_id] = p
Expand All @@ -733,7 +744,8 @@ def scale_down(self, replica_id: int) -> None:
self._terminate_replica(
replica_id,
sync_down_logs=False,
replica_drain_delay_seconds=_DEFAULT_DRAIN_SECONDS)
replica_drain_delay_seconds=_DEFAULT_DRAIN_SECONDS,
is_scale_down=True)

def _handle_preemption(self, info: ReplicaInfo) -> bool:
"""Handle preemption of the replica if any error happened.
Expand Down Expand Up @@ -774,7 +786,8 @@ def _handle_preemption(self, info: ReplicaInfo) -> bool:
info)
self._terminate_replica(info.replica_id,
sync_down_logs=False,
replica_drain_delay_seconds=0)
replica_drain_delay_seconds=0,
is_scale_down=True)
return True

#################################
Expand Down Expand Up @@ -859,12 +872,10 @@ def _refresh_process_pool(self) -> None:
# initial_delay_seconds is not supported. We should add it
# later when we support `sky serve update`.
removal_reason = None
if info.status_property.is_scale_down_succeeded(
self._get_initial_delay_seconds(info.version)):
# This means the cluster is deleted due to
# a scale down or the cluster is recovering
# from preemption. Delete the replica info
# so it won't count as a replica.
if info.status_property.is_scale_down:
# This means the cluster is deleted due to an autoscaler
# decision or the cluster is recovering from preemption.
# Delete the replica info so it won't count as a replica.
if info.status_property.preempted:
removal_reason = 'for preemption recovery'
else:
Expand Down
25 changes: 19 additions & 6 deletions sky/serve/serve_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,18 @@ class ReplicaStatus(enum.Enum):
# The replica VM is being shut down. i.e., the `sky down` is still running.
SHUTTING_DOWN = 'SHUTTING_DOWN'

# The replica VM is once failed and has been deleted.
# The replica fails due to user's run/setup.
FAILED = 'FAILED'

# The replica fails due to initial delay exceeded.
FAILED_INITIAL_DELAY = 'FAILED_INITIAL_DELAY'

# The replica fails due to healthiness check.
FAILED_PROBING = 'FAILED_PROBING'

# The replica fails during launching
FAILED_PROVISION = 'FAILED_PROVISION'

# `sky.down` failed during service teardown.
# This could mean resource leakage.
# TODO(tian): This status should be removed in the future, at which point
Expand All @@ -115,14 +124,15 @@ class ReplicaStatus(enum.Enum):

@classmethod
def failed_statuses(cls) -> List['ReplicaStatus']:
return [cls.FAILED, cls.FAILED_CLEANUP, cls.UNKNOWN]
return [
cls.FAILED, cls.FAILED_CLEANUP, cls.FAILED_INITIAL_DELAY,
cls.FAILED_PROBING, cls.FAILED_PROVISION, cls.UNKNOWN
]

@classmethod
def terminal_statuses(cls) -> List['ReplicaStatus']:
return [
cls.SHUTTING_DOWN, cls.FAILED, cls.FAILED_CLEANUP, cls.PREEMPTED,
cls.UNKNOWN
]
return [cls.SHUTTING_DOWN, cls.PREEMPTED, cls.UNKNOWN
] + cls.failed_statuses()

@classmethod
def scale_down_decision_order(cls) -> List['ReplicaStatus']:
Expand All @@ -145,6 +155,9 @@ def colored_str(self) -> str:
ReplicaStatus.NOT_READY: colorama.Fore.YELLOW,
ReplicaStatus.SHUTTING_DOWN: colorama.Fore.MAGENTA,
ReplicaStatus.FAILED: colorama.Fore.RED,
ReplicaStatus.FAILED_INITIAL_DELAY: colorama.Fore.RED,
ReplicaStatus.FAILED_PROBING: colorama.Fore.RED,
ReplicaStatus.FAILED_PROVISION: colorama.Fore.RED,
ReplicaStatus.FAILED_CLEANUP: colorama.Fore.RED,
ReplicaStatus.PREEMPTED: colorama.Fore.MAGENTA,
ReplicaStatus.UNKNOWN: colorama.Fore.RED,
Expand Down
2 changes: 1 addition & 1 deletion sky/serve/serve_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ def _get_replicas(service_record: Dict[str, Any]) -> str:
if info['status'] == serve_state.ReplicaStatus.READY:
ready_replica_num += 1
# TODO(MaoZiming): add a column showing failed replicas number.
if info['status'] != serve_state.ReplicaStatus.FAILED:
if info['status'] not in serve_state.ReplicaStatus.failed_statuses():
total_replica_num += 1
return f'{ready_replica_num}/{total_replica_num}'

Expand Down
12 changes: 12 additions & 0 deletions tests/skyserve/failures/initial_delay.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
service:
readiness_probe:
path: /health
initial_delay_seconds: 10
replicas: 2

resources:
cpus: 2
ports: 8081

run: |
sleep 1000
38 changes: 38 additions & 0 deletions tests/skyserve/failures/probing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import argparse
import http.server
import socketserver


class MyHttpRequestHandler(http.server.SimpleHTTPRequestHandler):

def do_GET(self):
# Return 200 for all paths
# Therefore, readiness_probe will return 200 at path '/health'
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
html = """
<html>
<head>
<title>SkyPilot Test Page</title>
</head>
<body>
<h1>Hi, SkyPilot here!</h1>
</body>
</html>
"""
self.wfile.write(bytes(html, 'utf8'))
return


if __name__ == '__main__':
parser = argparse.ArgumentParser(description='SkyServe HTTP Test Server')
parser.add_argument('--port', type=int, required=False, default=8081)
args = parser.parse_args()

Handler = MyHttpRequestHandler
with socketserver.TCPServer(('', args.port), Handler) as httpd:
print('serving at port', args.port)
for i in range(20):
# Ends after handle 20 requests
httpd.handle_request()
13 changes: 13 additions & 0 deletions tests/skyserve/failures/probing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
service:
readiness_probe:
path: /health
initial_delay_seconds: 10
replicas: 1

resources:
cpus: 2
ports: 8081

workdir: tests/skyserve/failures

run: python3 probing.py
Loading

0 comments on commit 48a5c63

Please sign in to comment.