Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Propagate replica constructor error to deployment status message and print num retries left #48531

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 28 additions & 16 deletions python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -1384,6 +1384,13 @@ def deployment_name(self) -> str:
def app_name(self) -> str:
return self._id.app_name

@property
def _failed_to_start_threshold(self) -> int:
return min(
MAX_DEPLOYMENT_CONSTRUCTOR_RETRY_COUNT,
self._target_state.target_num_replicas * 3,
)

def get_alive_replica_actor_ids(self) -> Set[str]:
return {replica.actor_id for replica in self._replicas.get()}

Expand Down Expand Up @@ -1845,11 +1852,10 @@ def scale_deployment_replicas(

if to_add > 0:
# Exponential backoff
failed_to_start_threshold = min(
MAX_DEPLOYMENT_CONSTRUCTOR_RETRY_COUNT,
self._target_state.target_num_replicas * 3,
)
if self._replica_constructor_retry_counter >= failed_to_start_threshold:
if (
self._replica_constructor_retry_counter
>= self._failed_to_start_threshold
):
# Wait 1, 2, 4, ... seconds before consecutive retries, with random
# offset added to avoid synchronization
if (
Expand Down Expand Up @@ -1909,17 +1915,13 @@ def check_curr_status(self) -> Tuple[bool, bool]:
)

failed_to_start_count = self._replica_constructor_retry_counter
failed_to_start_threshold = min(
MAX_DEPLOYMENT_CONSTRUCTOR_RETRY_COUNT,
self._target_state.target_num_replicas * 3,
)

# Got to make a call to complete current deploy() goal after
# start failure threshold reached, while we might still have
# pending replicas in current goal.
if (
failed_to_start_count >= failed_to_start_threshold
and failed_to_start_threshold != 0
failed_to_start_count >= self._failed_to_start_threshold
and self._failed_to_start_threshold != 0
):
if running_at_target_version_replica_cnt > 0:
# At least one RUNNING replica at target state, partial
Expand Down Expand Up @@ -2043,17 +2045,27 @@ def record_replica_startup_failure(self, error_msg: str):
self._replica_constructor_retry_counter += 1
self._replica_constructor_error_msg = error_msg

retrying_msg = "Retrying"
if self._failed_to_start_threshold != 0:
remaining_retries = (
self._failed_to_start_threshold
- self._replica_constructor_retry_counter
)
retrying_msg += f" {remaining_retries} more time(s)"

message = (
f"A replica failed to start with exception. {retrying_msg}. Error:\n"
f"{error_msg}"
)
self._curr_status_info = self._curr_status_info.update_message(message)

def update_replica_startup_backoff_time(self):
"""Updates the replica startup backoff time."""

# If replicas have failed enough times, execute exponential backoff
# Wait 1, 2, 4, ... seconds before consecutive retries (or use a custom
# backoff factor by setting EXPONENTIAL_BACKOFF_FACTOR)
failed_to_start_threshold = min(
MAX_DEPLOYMENT_CONSTRUCTOR_RETRY_COUNT,
self._target_state.target_num_replicas * 3,
)
if self._replica_constructor_retry_counter > failed_to_start_threshold:
if self._replica_constructor_retry_counter > self._failed_to_start_threshold:
self._backoff_time_s = min(
EXPONENTIAL_BACKOFF_FACTOR * self._backoff_time_s, MAX_BACKOFF_TIME_S
)
Expand Down
28 changes: 28 additions & 0 deletions python/ray/serve/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,34 @@ def check_for_failed_deployment():
wait_for_condition(check_for_failed_deployment)


@pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.")
def test_status_constructor_retry_error(ray_start_stop):
"""Deploys Serve deployment that errors out in constructor, checks that the
retry message is surfaced.
"""

config_file_name = os.path.join(
os.path.dirname(__file__), "test_config_files", "deployment_fail_2.yaml"
)

subprocess.check_output(["serve", "deploy", config_file_name])

def check_for_failed_deployment():
cli_output = subprocess.check_output(
["serve", "status", "-a", "http://localhost:52365/"]
)
status = yaml.safe_load(cli_output)["applications"][SERVE_DEFAULT_APP_NAME]
assert status["status"] == "DEPLOYING"

deployment_status = status["deployments"]["A"]
assert deployment_status["status"] == "UPDATING"
assert deployment_status["status_trigger"] == "CONFIG_UPDATE_STARTED"
assert "ZeroDivisionError" in deployment_status["message"]
return True

wait_for_condition(check_for_failed_deployment)
zcin marked this conversation as resolved.
Show resolved Hide resolved


@pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.")
def test_status_package_unavailable_in_controller(ray_start_stop):
"""Test that exceptions raised from packages that are installed on deployment actors
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
applications:
- name: default
import_path: ray.serve.tests.test_config_files.fail_2.node
13 changes: 13 additions & 0 deletions python/ray/serve/tests/test_config_files/fail_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import time

from ray import serve


@serve.deployment
class A:
def __init__(self):
time.sleep(5)
1 / 0


node = A.bind()
10 changes: 4 additions & 6 deletions python/ray/serve/tests/unit/test_deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -2492,9 +2492,7 @@ def create_deployment_state(

check_counts(ds1, total=3, by_state=[(ReplicaState.STOPPING, 3, None)])
assert ds1._replica_constructor_retry_counter == 3
# An error message should show up after
# 3 * num_replicas startup failures.
assert "" == ds1.curr_status_info.message
assert "Retrying 6 more time(s)" in ds1.curr_status_info.message

# Set all of ds1's replicas to stopped.
for replica in ds1._replicas.get():
Expand All @@ -2512,7 +2510,7 @@ def create_deployment_state(
assert ds1.curr_status_info.status == DeploymentStatus.UPDATING
check_counts(ds1, total=3, by_state=[(ReplicaState.STOPPING, 3, None)])
assert ds1._replica_constructor_retry_counter == 6
assert "" == ds1.curr_status_info.message
assert "Retrying 3 more time(s)" in ds1.curr_status_info.message

# Set all of ds1's replicas to stopped.
for replica in ds1._replicas.get():
Expand All @@ -2527,7 +2525,7 @@ def create_deployment_state(
assert ds1.curr_status_info.status == DeploymentStatus.UPDATING
check_counts(ds1, total=3, by_state=[(ReplicaState.STOPPING, 3, None)])
assert ds1._replica_constructor_retry_counter == 9
assert "" == ds1.curr_status_info.message
assert "Retrying 0 more time(s)" in ds1.curr_status_info.message

# Set all of ds1's replicas to stopped.
for replica in ds1._replicas.get():
Expand All @@ -2540,7 +2538,7 @@ def create_deployment_state(
assert ds1.curr_status_info.status == DeploymentStatus.UNHEALTHY
check_counts(ds1, total=0)
assert ds1._replica_constructor_retry_counter == 9
assert "Replica scheduling failed" in ds1.curr_status_info.message
assert "The deployment failed to start" in ds1.curr_status_info.message


def test_deploy_with_transient_constructor_failure(mock_deployment_state_manager):
Expand Down