Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Fail early for user app failure and expose failure reasons #3411

Merged
merged 41 commits into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
2ed2419
expose detailed replica failure and rename service failure to crash loop
Michaelvll Apr 1, 2024
6ee1b3b
fix path in test
Michaelvll Apr 1, 2024
a6594f3
add target qps
Michaelvll Apr 1, 2024
a0c7359
shorter wait time
Michaelvll Apr 1, 2024
f6a89c2
fix smoke
Michaelvll Apr 2, 2024
6b9348b
fix smoke test
Michaelvll Apr 2, 2024
12db511
add `;` back
Michaelvll Apr 2, 2024
e4936fc
Revert crash loop
Michaelvll Apr 2, 2024
2beb015
update failed_status
Michaelvll Apr 2, 2024
c20fa14
typo
Michaelvll Apr 2, 2024
6256b71
format
Michaelvll Apr 2, 2024
e558410
Add initial delay failure
Michaelvll Apr 2, 2024
5a6a091
Add initial delay failure
Michaelvll Apr 2, 2024
837ac83
format
Michaelvll Apr 2, 2024
75c5069
do not scale when user app fails
Michaelvll Apr 2, 2024
c3fff15
Merge branch 'service-ux' of github.com:skypilot-org/skypilot into fa…
Michaelvll Apr 2, 2024
d8f812e
format
Michaelvll Apr 2, 2024
6e6a662
Add tests for failure statuses
Michaelvll Apr 3, 2024
f63f5e9
Merge branch 'master' of github.com:skypilot-org/skypilot into fail-e…
Michaelvll Apr 3, 2024
4260891
syntax error
Michaelvll Apr 3, 2024
6b0ed01
make service termination more robust
Michaelvll Apr 3, 2024
0e89159
fix smoke test
Michaelvll Apr 3, 2024
afefa2d
Fix permission issue if not tpu is not needed
Michaelvll Apr 3, 2024
8ca9a8a
fix test
Michaelvll Apr 3, 2024
b91b28c
fail early for initial delay timeout
Michaelvll Apr 3, 2024
40866bf
format
Michaelvll Apr 3, 2024
d3cedaa
format
Michaelvll Apr 3, 2024
b0d9e23
remove unecessary logger
Michaelvll Apr 3, 2024
7957567
fix
Michaelvll Apr 3, 2024
ebfd136
explicit annotation of scale down
Michaelvll Apr 3, 2024
f8a8d7d
fix logs
Michaelvll Apr 3, 2024
8841b4c
format
Michaelvll Apr 3, 2024
137db2a
fix test with non spot
Michaelvll Apr 4, 2024
d7ea851
Address comments
Michaelvll Apr 4, 2024
5bf863f
add comments
Michaelvll Apr 4, 2024
f2b3545
longer time
Michaelvll Apr 4, 2024
8d9bcd7
Update sky/serve/autoscalers.py
Michaelvll Apr 5, 2024
23d5a67
Update sky/serve/autoscalers.py
Michaelvll Apr 5, 2024
7eb30ef
Update sky/serve/replica_managers.py
Michaelvll Apr 5, 2024
9ec2924
Merge branch 'master' of github.com:skypilot-org/skypilot into fail-e…
Michaelvll Apr 5, 2024
1ef7388
Merge branch 'fail-early-for-user-error' of github.com:skypilot-org/s…
Michaelvll Apr 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sky/provision/gcp/instance_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,8 @@ def filter(
# Return empty dict instead of raising exception to not break.
if 'is not found or access is unauthorized.' in str(e):
return {}
if 'Permission \'tpu.nodes.list\' denied on' in str(e):
return {}
logger.debug(f'filter: googleapiclient.errors.HttpError: {e}')
raise

Expand Down
34 changes: 31 additions & 3 deletions sky/serve/autoscalers.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ def __init__(self, service_name: str,
# Target number of replicas is initialized to min replicas
self.target_num_replicas: int = spec.min_replicas
self.latest_version: int = constants.INITIAL_VERSION
# The latest_version_ever_ready should be smaller than the
# latest_version, so we can fail early if the initial version got
# unrecoverable failure.
self.latest_version_ever_ready: int = self.latest_version - 1
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
self.update_mode = serve_utils.DEFAULT_UPDATE_MODE

def update_version(self, version: int, spec: 'service_spec.SkyServiceSpec',
Expand Down Expand Up @@ -113,13 +117,25 @@ def from_spec(cls, service_name: str,
else:
return RequestRateAutoscaler(service_name, spec)

def _dump_dynamic_states(self) -> Dict[str, Any]:
"""Dump dynamic states from autoscaler."""
raise NotImplementedError

def dump_dynamic_states(self) -> Dict[str, Any]:
"""Dump dynamic states from autoscaler."""
states = {'latest_version_ever_ready': self.latest_version_ever_ready}
states.update(self._dump_dynamic_states())
return states

def _load_dynamic_states(self, dynamic_states: Dict[str, Any]) -> None:
"""Load dynamic states to autoscaler."""
raise NotImplementedError

def load_dynamic_states(self, dynamic_states: Dict[str, Any]) -> None:
"""Load dynamic states to autoscaler."""
raise NotImplementedError
self.latest_version_ever_ready = dynamic_states.pop(
'latest_version_ever_ready', constants.INITIAL_VERSION)
self._load_dynamic_states(dynamic_states)


class RequestRateAutoscaler(Autoscaler):
Expand Down Expand Up @@ -376,12 +392,24 @@ def evaluate_scaling(
override dict. Active migration could require returning both SCALE_UP
and SCALE_DOWN.
"""
latest_replicas: List['replica_managers.ReplicaInfo'] = []
latest_nonterminal_replicas: List['replica_managers.ReplicaInfo'] = []

for info in replica_infos:
if info.version == self.latest_version:
latest_replicas.append(info)
if not info.is_terminal:
latest_nonterminal_replicas.append(info)
if info.is_ready:
self.latest_version_ever_ready = self.latest_version
if self.latest_version_ever_ready < self.latest_version:
for info in latest_replicas:
if info.status_property.unrecoverable_failure():
# Stop scaling if one of replica of the latest version
# failed, it is likely that a fatal error happens to the
# user application and may lead to a infinte termination
# and restart.
return []

self._set_target_num_replica_with_hysteresis()

Expand Down Expand Up @@ -433,12 +461,12 @@ def evaluate_scaling(
logger.info('No scaling needed.')
return scaling_options

def dump_dynamic_states(self) -> Dict[str, Any]:
def _dump_dynamic_states(self) -> Dict[str, Any]:
return {
'request_timestamps': self.request_timestamps,
}

def load_dynamic_states(self, dynamic_states: Dict[str, Any]) -> None:
def _load_dynamic_states(self, dynamic_states: Dict[str, Any]) -> None:
if 'request_timestamps' in dynamic_states:
self.request_timestamps = dynamic_states.pop('request_timestamps')
if dynamic_states:
Expand Down
99 changes: 55 additions & 44 deletions sky/serve/replica_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,52 +235,59 @@ class ReplicaStatusProperty:
sky_launch_status: Optional[ProcessStatus] = None
user_app_failed: bool = False
service_ready_now: bool = False
# None means readiness probe is not executed yet;
# None means readiness probe is not succeeded yet;
# -1 means the initial delay seconds is exceeded.
first_ready_time: Optional[float] = None
# None means sky.down is not called yet.
sky_down_status: Optional[ProcessStatus] = None
# Whether the termination is caused by autoscaler's decision
is_scale_down: bool = False
# The replica's spot instance was preempted.
preempted: bool = False

def is_scale_down_succeeded(self, initial_delay_seconds: int) -> bool:
def remove_terminated_replica(self) -> bool:
"""Whether to remove the replica record from the replica table.

If not, the replica will stay in the replica table permanently to
notify the user that something is wrong with the user code / setup.
"""
if self.sky_launch_status == ProcessStatus.INTERRUPTED:
return True
if self.sky_launch_status != ProcessStatus.SUCCEEDED:
# sky_launch_status == RUNNING: a scale down happened before
# the sky.launch finished.
return self.sky_launch_status != ProcessStatus.FAILED
if self.sky_down_status != ProcessStatus.SUCCEEDED:
return self.is_scale_down

def unrecoverable_failure(self) -> bool:
"""Whether the replica fails and cannot be recovered.

Autoscaler should stop scaling if any of the replica has unrecoverable
failure, e.g., the user app fails before the service endpoint being
ready for the current version.
"""
replica_status = self.to_replica_status()
logger.info(
'Check replica unrecorverable: first_ready_time '
f'{self.first_ready_time}, user_app_failed {self.user_app_failed}, '
f'status {replica_status}')
if replica_status not in serve_state.ReplicaStatus.terminal_statuses():
return False
if self.preempted:
return True
if (self.first_ready_time is not None and
time.time() - self.first_ready_time > initial_delay_seconds):
# If the service is up for more than `initial_delay_seconds`,
# we assume there is no bug in the user code and the scale down
# is successful, thus enabling the controller to remove the
# replica from the replica table and auto restart the replica.
# Here we assume that initial_delay_seconds is larger than
# consecutive_failure_threshold_seconds, so if a replica is not
# teardown for initial_delay_seconds, it is safe to assume that
# it is UP for initial_delay_seconds.
# For replica with a failed sky.launch, it is likely due to some
# misconfigured resources, so we don't want to auto restart it.
# For replica with a failed sky.down, we cannot restart it since
# otherwise we will have a resource leak.
return True
if self.first_ready_time is not None:
if self.first_ready_time >= 0:
# If the service is ever up, we assume there is no bug in the
# user code and the scale down is successful, thus enabling the
# controller to remove the replica from the replica table and
# auto restart the replica.
# For replica with a failed sky.launch, it is likely due to some
# misconfigured resources, so we don't want to auto restart it.
# For replica with a failed sky.down, we cannot restart it since
# otherwise we will have a resource leak.
return False
else:
# If the initial delay exceeded, it is likely the service is not
# recoverable.
return True
if self.user_app_failed:
return False
if self.first_ready_time is None:
return True
if not self.service_ready_now:
return False
return self.first_ready_time >= 0.0
# TODO(zhwu): launch failures not related to resource unavailability
# should be considered as unrecoverable failure. (refer to
# `spot.recovery_strategy.StrategyExecutor::_launch`)
return False

def should_track_service_status(self) -> bool:
"""Should we track the status of the replica.
Expand Down Expand Up @@ -333,17 +340,17 @@ def to_replica_status(self) -> serve_state.ReplicaStatus:
return serve_state.ReplicaStatus.FAILED
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return serve_state.ReplicaStatus.FAILED
return serve_state.ReplicaStatus.FAILED_USER_APP

should we update this to a status w/ more information as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking to use FAILED to always mean FAILED_USER_APP, as FAILED represents user app error in managed spot job. Wdyt? cc'ing @concretevitamin @romilbhardwaj for some input here as well : )

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh if it is the case, agreed that we keep align w/ spot jobs ;)

if self.sky_launch_status == ProcessStatus.FAILED:
# sky.launch failed
return serve_state.ReplicaStatus.FAILED
return serve_state.ReplicaStatus.FAILED_PROVISION
if self.first_ready_time is None:
# readiness probe is not executed yet, but a scale down is
# triggered.
return serve_state.ReplicaStatus.SHUTTING_DOWN
if self.first_ready_time == -1:
# initial delay seconds exceeded
return serve_state.ReplicaStatus.FAILED
return serve_state.ReplicaStatus.FAILED_INITIAL_DELAY
if not self.service_ready_now:
# Max continuous failure exceeded
return serve_state.ReplicaStatus.FAILED
return serve_state.ReplicaStatus.FAILED_PROBING
# This indicate it is a scale_down with correct teardown.
# Should have been cleaned from the replica table.
return serve_state.ReplicaStatus.UNKNOWN
Expand Down Expand Up @@ -641,8 +648,11 @@ def scale_up(self,
self._launch_replica(self._next_replica_id, resources_override)
self._next_replica_id += 1

def _terminate_replica(self, replica_id: int, sync_down_logs: bool,
replica_drain_delay_seconds: int) -> None:
def _terminate_replica(self,
replica_id: int,
sync_down_logs: bool,
replica_drain_delay_seconds: int,
is_scale_down: bool = False) -> None:

if replica_id in self._launch_process_pool:
info = serve_state.get_replica_info_from_id(self._service_name,
Expand Down Expand Up @@ -725,6 +735,7 @@ def _download_and_stream_logs(info: ReplicaInfo):
args=(info.cluster_name, replica_drain_delay_seconds),
)
info.status_property.sky_down_status = ProcessStatus.RUNNING
info.status_property.is_scale_down = is_scale_down
serve_state.add_or_update_replica(self._service_name, replica_id, info)
p.start()
self._down_process_pool[replica_id] = p
Expand All @@ -733,7 +744,8 @@ def scale_down(self, replica_id: int) -> None:
self._terminate_replica(
replica_id,
sync_down_logs=False,
replica_drain_delay_seconds=_DEFAULT_DRAIN_SECONDS)
replica_drain_delay_seconds=_DEFAULT_DRAIN_SECONDS,
is_scale_down=True)

def _handle_preemption(self, info: ReplicaInfo) -> bool:
"""Handle preemption of the replica if any error happened.
Expand Down Expand Up @@ -774,7 +786,8 @@ def _handle_preemption(self, info: ReplicaInfo) -> bool:
info)
self._terminate_replica(info.replica_id,
sync_down_logs=False,
replica_drain_delay_seconds=0)
replica_drain_delay_seconds=0,
is_scale_down=True)
return True

#################################
Expand Down Expand Up @@ -859,12 +872,10 @@ def _refresh_process_pool(self) -> None:
# initial_delay_seconds is not supported. We should add it
# later when we support `sky serve update`.
removal_reason = None
if info.status_property.is_scale_down_succeeded(
self._get_initial_delay_seconds(info.version)):
# This means the cluster is deleted due to
# a scale down or the cluster is recovering
# from preemption. Delete the replica info
# so it won't count as a replica.
if info.status_property.is_scale_down:
# This means the cluster is deleted due to an autoscaler
# decision or the cluster is recovering from preemption.
# Delete the replica info so it won't count as a replica.
if info.status_property.preempted:
removal_reason = 'for preemption recovery'
else:
Expand Down
25 changes: 19 additions & 6 deletions sky/serve/serve_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,18 @@ class ReplicaStatus(enum.Enum):
# The replica VM is being shut down. i.e., the `sky down` is still running.
SHUTTING_DOWN = 'SHUTTING_DOWN'

# The replica VM is once failed and has been deleted.
# The replica fails due to user's run/setup.
FAILED = 'FAILED'
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved

# The replica fails due to initial delay exceeded.
FAILED_INITIAL_DELAY = 'FAILED_INITIAL_DELAY'

# The replica fails due to healthiness check.
FAILED_PROBING = 'FAILED_PROBING'

# The replica fails during launching
FAILED_PROVISION = 'FAILED_PROVISION'

# `sky.down` failed during service teardown.
# This could mean resource leakage.
# TODO(tian): This status should be removed in the future, at which point
Expand All @@ -115,14 +124,15 @@ class ReplicaStatus(enum.Enum):

@classmethod
def failed_statuses(cls) -> List['ReplicaStatus']:
return [cls.FAILED, cls.FAILED_CLEANUP, cls.UNKNOWN]
return [
cls.FAILED, cls.FAILED_CLEANUP, cls.FAILED_INITIAL_DELAY,
cls.FAILED_PROBING, cls.FAILED_PROVISION, cls.UNKNOWN
]

@classmethod
def terminal_statuses(cls) -> List['ReplicaStatus']:
return [
cls.SHUTTING_DOWN, cls.FAILED, cls.FAILED_CLEANUP, cls.PREEMPTED,
cls.UNKNOWN
]
return [cls.SHUTTING_DOWN, cls.PREEMPTED, cls.UNKNOWN
] + cls.failed_statuses()

@classmethod
def scale_down_decision_order(cls) -> List['ReplicaStatus']:
Expand All @@ -145,6 +155,9 @@ def colored_str(self) -> str:
ReplicaStatus.NOT_READY: colorama.Fore.YELLOW,
ReplicaStatus.SHUTTING_DOWN: colorama.Fore.MAGENTA,
ReplicaStatus.FAILED: colorama.Fore.RED,
ReplicaStatus.FAILED_INITIAL_DELAY: colorama.Fore.RED,
ReplicaStatus.FAILED_PROBING: colorama.Fore.RED,
ReplicaStatus.FAILED_PROVISION: colorama.Fore.RED,
ReplicaStatus.FAILED_CLEANUP: colorama.Fore.RED,
ReplicaStatus.PREEMPTED: colorama.Fore.MAGENTA,
ReplicaStatus.UNKNOWN: colorama.Fore.RED,
Expand Down
2 changes: 1 addition & 1 deletion sky/serve/serve_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ def _get_replicas(service_record: Dict[str, Any]) -> str:
if info['status'] == serve_state.ReplicaStatus.READY:
ready_replica_num += 1
# TODO(MaoZiming): add a column showing failed replicas number.
if info['status'] != serve_state.ReplicaStatus.FAILED:
if info['status'] not in serve_state.ReplicaStatus.failed_statuses():
total_replica_num += 1
return f'{ready_replica_num}/{total_replica_num}'

Expand Down
12 changes: 12 additions & 0 deletions tests/skyserve/failures/initial_delay.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
service:
readiness_probe:
path: /health
initial_delay_seconds: 10
replicas: 2

resources:
cpus: 2
ports: 8081

run: |
sleep 1000
38 changes: 38 additions & 0 deletions tests/skyserve/failures/probing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import argparse
import http.server
import socketserver


class MyHttpRequestHandler(http.server.SimpleHTTPRequestHandler):

def do_GET(self):
# Return 200 for all paths
# Therefore, readiness_probe will return 200 at path '/health'
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
html = """
<html>
<head>
<title>SkyPilot Test Page</title>
</head>
<body>
<h1>Hi, SkyPilot here!</h1>
</body>
</html>
"""
self.wfile.write(bytes(html, 'utf8'))
return


if __name__ == '__main__':
parser = argparse.ArgumentParser(description='SkyServe HTTP Test Server')
parser.add_argument('--port', type=int, required=False, default=8081)
args = parser.parse_args()

Handler = MyHttpRequestHandler
with socketserver.TCPServer(('', args.port), Handler) as httpd:
print('serving at port', args.port)
for i in range(20):
# Ends after handle 20 requests
httpd.handle_request()
13 changes: 13 additions & 0 deletions tests/skyserve/failures/probing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
service:
readiness_probe:
path: /health
initial_delay_seconds: 10
replicas: 1

resources:
cpus: 2
ports: 8081

workdir: tests/skyserve/failures

run: python3 probing.py
Loading
Loading