Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Support choosing cloud for Spot controller #3363

Merged
merged 16 commits into from
Apr 23, 2024
Merged
2 changes: 1 addition & 1 deletion sky/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def get_cloud_credential_file_mounts(
file_mounts = {}
for cloud in enabled_clouds:
if (excluded_clouds is not None and
clouds.cloud_in_list(cloud, excluded_clouds)):
clouds.cloud_in_iterable(cloud, excluded_clouds)):
continue
cloud_file_mounts = cloud.get_credential_file_mounts()
file_mounts.update(cloud_file_mounts)
Expand Down
4 changes: 2 additions & 2 deletions sky/clouds/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Clouds in Sky."""

from sky.clouds.cloud import Cloud
from sky.clouds.cloud import cloud_in_list
from sky.clouds.cloud import cloud_in_iterable
from sky.clouds.cloud import CloudImplementationFeatures
from sky.clouds.cloud import ProvisionerVersion
from sky.clouds.cloud import Region
Expand Down Expand Up @@ -47,5 +47,5 @@
'StatusVersion',
'Fluidstack',
# Utility functions
'cloud_in_list',
'cloud_in_iterable',
]
2 changes: 1 addition & 1 deletion sky/clouds/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,6 @@ def __getstate__(self):


# === Helper functions ===
def cloud_in_list(cloud: Cloud, cloud_list: Iterable[Cloud]) -> bool:
def cloud_in_iterable(cloud: Cloud, cloud_list: Iterable[Cloud]) -> bool:
"""Returns whether the cloud is in the given cloud list."""
return any(cloud.is_same_cloud(c) for c in cloud_list)
3 changes: 3 additions & 0 deletions sky/clouds/fluidstack.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ class Fluidstack(clouds.Cloud):
clouds.CloudImplementationFeatures.CUSTOM_DISK_TIER:
'Custom disk tiers'
f' is not supported in {_REPR}.',
clouds.CloudImplementationFeatures.HOST_CONTROLLERS:
'Host controllers'
f' are not supported in {_REPR}.',
}
# Using the latest SkyPilot provisioner API to provision and check status.
PROVISIONER_VERSION = clouds.ProvisionerVersion.SKYPILOT
Expand Down
1 change: 1 addition & 0 deletions sky/clouds/lambda_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class Lambda(clouds.Cloud):
clouds.CloudImplementationFeatures.IMAGE_ID: f'Specifying image ID is not supported in {_REPR}.',
clouds.CloudImplementationFeatures.CUSTOM_DISK_TIER: f'Custom disk tiers are not supported in {_REPR}.',
clouds.CloudImplementationFeatures.OPEN_PORTS: f'Opening ports is currently not supported on {_REPR}.',
clouds.CloudImplementationFeatures.HOST_CONTROLLERS: f'Host controllers are not supported in {_REPR}.',
}

@classmethod
Expand Down
2 changes: 2 additions & 0 deletions sky/clouds/runpod.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class RunPod(clouds.Cloud):
('Mounting object stores is not supported on RunPod. To read data '
'from object stores on RunPod, use `mode: COPY` to copy the data '
'to local disk.'),
clouds.CloudImplementationFeatures.HOST_CONTROLLERS:
('Host controllers are not supported on RunPod.'),
}
_MAX_CLUSTER_NAME_LEN_LIMIT = 120
_regions: List[clouds.Region] = []
Expand Down
5 changes: 3 additions & 2 deletions sky/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ def _estimate_nodes_cost_or_time(
# in the error message.
enabled_clouds = (
sky_check.get_cached_enabled_clouds_or_refresh())
if clouds.cloud_in_list(clouds.Kubernetes(), enabled_clouds):
if clouds.cloud_in_iterable(clouds.Kubernetes(),
enabled_clouds):
if any(orig_resources.cloud is None
for orig_resources in node.resources):
source_hint = 'catalog and kubernetes cluster'
Expand Down Expand Up @@ -1169,7 +1170,7 @@ def _fill_in_launchable_resources(
blocked_resources = []
for resources in task.resources:
if (resources.cloud is not None and
not clouds.cloud_in_list(resources.cloud, enabled_clouds)):
not clouds.cloud_in_iterable(resources.cloud, enabled_clouds)):
if try_fix_with_sky_check:
# Explicitly check again to update the enabled cloud list.
sky_check.check(quiet=True)
Expand Down
36 changes: 5 additions & 31 deletions sky/serve/core.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
"""SkyServe core APIs."""
import re
import tempfile
import typing
from typing import Any, Dict, List, Optional, Set, Union
from typing import Any, Dict, List, Optional, Union

import colorama

import sky
from sky import backends
from sky import exceptions
from sky import global_user_state
from sky import sky_logging
from sky import task as task_lib
from sky.backends import backend_utils
Expand All @@ -26,9 +24,6 @@
from sky.utils import subprocess_utils
from sky.utils import ux_utils

if typing.TYPE_CHECKING:
from sky import clouds

logger = sky_logging.init_logger(__name__)


Expand Down Expand Up @@ -127,14 +122,6 @@ def up(
controller_utils.maybe_translate_local_file_mounts_and_sync_up(task,
path='serve')

# If the controller and replicas are from the same cloud, it should
# provide better connectivity. We will let the controller choose from
# the clouds of the resources if the controller does not exist.
requested_clouds: Set['clouds.Cloud'] = set()
for resources in task.resources:
if resources.cloud is not None:
requested_clouds.add(resources.cloud)

with tempfile.NamedTemporaryFile(
prefix=f'service-task-{service_name}-',
mode='w',
Expand All @@ -151,11 +138,8 @@ def up(
serve_utils.generate_remote_config_yaml_file_name(service_name))
controller_log_file = (
serve_utils.generate_remote_controller_log_file_name(service_name))
controller_resources_in_config = (
controller_utils.get_controller_resources(
controller_type='serve',
controller_resources_config=serve_constants.CONTROLLER_RESOURCES
))
controller_resources = controller_utils.get_controller_resources(
controller_type='serve', task_resources=task.resources)

vars_to_fill = {
'remote_task_yaml_path': remote_tmp_task_yaml_path,
Expand All @@ -174,23 +158,13 @@ def up(
vars_to_fill,
output_path=controller_file.name)
controller_task = task_lib.Task.from_yaml(controller_file.name)
controller_exist = (
global_user_state.get_cluster_from_name(controller_name)
is not None)
if (not controller_exist and
controller_resources_in_config.cloud is None):
controller_clouds = requested_clouds
else:
controller_clouds = {controller_resources_in_config.cloud}
# TODO(tian): Probably run another sky.launch after we get the load
# balancer port from the controller? So we don't need to open so many
# ports here. Or, we should have a nginx traffic control to refuse
# any connection to the unregistered ports.
controller_resources = {
controller_resources_in_config.copy(
cloud=controller_cloud,
ports=[serve_constants.LOAD_BALANCER_PORT_RANGE])
for controller_cloud in controller_clouds
r.copy(ports=[serve_constants.LOAD_BALANCER_PORT_RANGE])
for r in controller_resources
}
controller_task.set_resources(controller_resources)

Expand Down
2 changes: 1 addition & 1 deletion sky/spot/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def launch(
remote_user_config_path = f'{prefix}/{dag.name}-{dag_uuid}.config_yaml'
controller_resources = controller_utils.get_controller_resources(
controller_type='spot',
controller_resources_config=constants.CONTROLLER_RESOURCES)
task_resources=sum([list(t.resources) for t in dag.tasks], []))

vars_to_fill = {
'remote_user_yaml_path': remote_user_yaml_path,
Expand Down
93 changes: 82 additions & 11 deletions sky/utils/controller_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,24 @@
import os
import tempfile
import typing
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Iterable, List, Optional, Set

import colorama

from sky import check as sky_check
from sky import clouds
from sky import exceptions
from sky import global_user_state
from sky import resources
from sky import sky_logging
from sky import skypilot_config
from sky.clouds import gcp
from sky.data import data_utils
from sky.data import storage as storage_lib
from sky.serve import constants as serve_constants
from sky.serve import serve_utils
from sky.skylet import constants
from sky.spot import constants as spot_constants
from sky.spot import spot_utils
from sky.utils import common_utils
from sky.utils import env_options
Expand All @@ -47,6 +50,7 @@
@dataclasses.dataclass
class _ControllerSpec:
"""Spec for skypilot controllers."""
controller_type: str
name: str
cluster_name: str
in_progress_hint: str
Expand All @@ -56,13 +60,15 @@ class _ControllerSpec:
check_cluster_name_hint: str
default_hint_if_non_existent: str
connection_error_hint: str
default_resources_config: Dict[str, Any]


class Controllers(enum.Enum):
"""Skypilot controllers."""
# NOTE(dev): Keep this align with
# sky/cli.py::_CONTROLLER_TO_HINT_OR_RAISE
SPOT_CONTROLLER = _ControllerSpec(
controller_type='spot',
name='managed spot controller',
cluster_name=spot_utils.SPOT_CONTROLLER_NAME,
in_progress_hint=(
Expand All @@ -89,8 +95,10 @@ class Controllers(enum.Enum):
'managed spot controller.'),
default_hint_if_non_existent='No in-progress spot jobs.',
connection_error_hint=(
'Failed to connect to spot controller, please try again later.'))
'Failed to connect to spot controller, please try again later.'),
default_resources_config=spot_constants.CONTROLLER_RESOURCES)
SKY_SERVE_CONTROLLER = _ControllerSpec(
controller_type='serve',
name='serve controller',
cluster_name=serve_utils.SKY_SERVE_CONTROLLER_NAME,
in_progress_hint=(
Expand Down Expand Up @@ -118,7 +126,8 @@ class Controllers(enum.Enum):
'sky serve controller.'),
default_hint_if_non_existent='No live services.',
connection_error_hint=(
'Failed to connect to serve controller, please try again later.'))
'Failed to connect to serve controller, please try again later.'),
default_resources_config=serve_constants.CONTROLLER_RESOURCES)

@classmethod
def from_name(cls, name: Optional[str]) -> Optional['Controllers']:
Expand All @@ -133,6 +142,19 @@ def from_name(cls, name: Optional[str]) -> Optional['Controllers']:
return controller
return None

@classmethod
def from_type(cls, controller_type: str) -> Optional['Controllers']:
"""Get the controller by controller type.

Returns:
The controller if the controller type is valid.
Otherwise, returns None.
"""
for controller in cls:
if controller.value.controller_type == controller_type:
return controller
return None


# Install cli dependencies. Not using SkyPilot wheels because the wheel
# can be cleaned up by another process.
Expand Down Expand Up @@ -266,18 +288,21 @@ def shared_controller_vars_to_fill(

def get_controller_resources(
controller_type: str,
controller_resources_config: Dict[str, Any],
) -> 'resources.Resources':
task_resources: Iterable['resources.Resources'],
) -> Set['resources.Resources']:
"""Read the skypilot config and setup the controller resources.

Returns:
A tuple of (vars_to_fill, controller_resources_config). `var_to_fill`
is a dict of variables that will be filled in the controller template.
The controller_resources_config is the resources config that will be
used to launch the controller.
A set of controller resources that will be used to launch the
controller. All fields are the same except for the cloud. If no
controller exists and the controller resources has no cloud
specified, the controller will be launched on one of the clouds
of the task resources for better connectivity.
"""
controller = Controllers.from_type(controller_type)
assert controller is not None, controller_type
controller_resources_config_copied: Dict[str, Any] = copy.copy(
controller_resources_config)
controller.value.default_resources_config)
if skypilot_config.loaded():
# Override the controller resources with the ones specified in the
# config.
Expand All @@ -297,6 +322,9 @@ def get_controller_resources(
controller_type=controller_type,
err=common_utils.format_exception(e,
use_bracket=True))) from e
# TODO(tian): Support multiple resources for the controller. One blocker
# here is the semantic if controller resources use `ordered` and we want
# to override it with multiple cloud from task resources.
if len(controller_resources) != 1:
with ux_utils.print_exception_no_traceback():
raise ValueError(
Expand All @@ -305,7 +333,50 @@ def get_controller_resources(
err=f'Expected exactly one resource, got '
f'{len(controller_resources)} resources: '
f'{controller_resources}'))
return list(controller_resources)[0]
controller_resources_to_use: resources.Resources = list(
controller_resources)[0]

controller_exist = (global_user_state.get_cluster_from_name(
controller.value.name) is not None)
if controller_exist or controller_resources_to_use.cloud is not None:
return {controller_resources_to_use}
Comment on lines +336 to +342
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are returning a set of resources anyway, why do we only return the first one but not the whole controller_resources, i.e. allowing a user to specify multiple resources for the controller? (If needed, we can add a TODO here)

Also, just wondering, do we have some places depending on the results of the resources returned by this function to decide how many services we can run on the controller? In that case, will changing this to a set cause failure?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not allowing a user to specify multiple resources for the controller?

That is a good point! I think it is at least feasible to support set of resources (i.e. any_of). One thing we need to discuss is what is the behaviour if the controller specified ordered resources w/o cloud specified and we want to override it with task resources cloud? Consider the following:

# ~/.sky/config.yaml
serve:
  controller:
    resources:
      ordered:
        - accelerators: L4
        - accelerators: T4
# service.yaml
resources:
  any_of:
    - cloud: aws
    - cloud: gcp

Should the controller resoruces be a list?

  • If it is a list, what is the order of aws and gcp?
  • If it is a set, does it break the semantic of ordered in controller resources?

do we have some places depending on the results of the resources returned by this function to decide how many services we can run on the controller?

No, this is automatically calculated from system memory. Reference here:

_SYSTEM_MEMORY_GB = psutil.virtual_memory().total // (1024**3)
NUM_SERVICE_THRESHOLD = (_SYSTEM_MEMORY_GB //
constants.CONTROLLER_MEMORY_USAGE_GB)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a TODO first. Thanks!


# If the controller and replicas are from the same cloud, it should
# provide better connectivity. We will let the controller choose from
# the clouds of the resources if the controller does not exist.
# TODO(tian): Consider respecting the regions/zones specified for the
# resources as well.
requested_clouds: Set['clouds.Cloud'] = set()
for resource in task_resources:
# cloud is an object and will not be able to be distinguished by set.
# Here we manually check if the cloud is in the set.
if resource.cloud is not None:
if not clouds.cloud_in_iterable(resource.cloud, requested_clouds):
try:
resource.cloud.check_features_are_supported(
resources.Resources(),
{clouds.CloudImplementationFeatures.HOST_CONTROLLERS})
except exceptions.NotSupportedError:
# Skip the cloud if it does not support hosting controllers.
continue
requested_clouds.add(resource.cloud)
else:
# if one of the resource.cloud is None, this could represent user
# does not know which cloud is best for the specified resources.
# For example:
# resources:
# - accelerators: L4 # Both available on AWS and GCP
# - cloud: runpod
# accelerators: A40
# In this case, we allow the controller to be launched on any cloud.
requested_clouds.clear()
break
if not requested_clouds:
return {controller_resources_to_use}
return {
controller_resources_to_use.copy(cloud=controller_cloud)
for controller_cloud in requested_clouds
}


def _setup_proxy_command_on_controller(
Expand Down
Loading
Loading