Skip to content

Commit

Permalink
Move Bump SkyPilot to 0.6.0 and move autostop logic to AutoStopServlet
Browse files Browse the repository at this point in the history
  • Loading branch information
dongreenberg committed Jun 4, 2024
1 parent 82a9b2a commit 66d97e7
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 52 deletions.
113 changes: 67 additions & 46 deletions runhouse/servers/cluster_servlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,26 @@ async def __init__(
self._initialized_env_servlet_names: Set[str] = set()
self._key_to_env_servlet_name: Dict[Any, str] = {}
self._auth_cache: AuthCache = AuthCache(cluster_config)
self.autostop_servlet = None

if cluster_config.get("resource_subtype", None) == "OnDemandCluster":
if cluster_config.get("autostop_mins") > 0:
try:
from sky.skylet import configs as sky_configs # noqa
except ImportError:
raise ImportError(
"skypilot must be installed on the cluster environment to support cluster autostop. "
"Install using cluster.run('pip install skypilot') or adding `skypilot` to the env requirements."
)
self._last_activity = time.time()
self._last_register = None
autostop_thread = threading.Thread(target=self.update_autostop, daemon=True)
autostop_thread.start()
import ray

current_ip = ray.get_runtime_context().worker.node_ip_address
self.autostop_servlet = (
ray.remote(AutostopServlet)
.options(
name="autostop_servlet",
get_if_exists=True,
lifetime="detached",
namespace="runhouse",
max_concurrency=1000,
resources={f"node:{current_ip}": 0.001},
num_cpus=0,
runtime_env={"env_vars": {"VIRTUAL_ENV": "skypilot-runtime"}},
)
.remote()
)

# Only send for clusters that have den_auth enabled and if we are logged in with a user's token
# to authenticate the request
Expand All @@ -86,34 +92,6 @@ async def __init__(
)
post_status_thread.start()

##############################################
# Cluster autostop
##############################################
def update_autostop(self):
import pickle

from sky.skylet import configs as sky_configs

while True:
autostop_mins = pickle.loads(
sky_configs.get_config("autostop_config")
).autostop_idle_minutes
self._last_register = float(
sky_configs.get_config("autostop_last_active_time")
)
if autostop_mins > 0 and (
not self._last_register
or (
# within 2 min of autostop and there's more recent activity
60 * autostop_mins - (time.time() - self._last_register) < 120
and self._last_activity > self._last_register
)
):
sky_configs.set_config("autostop_last_active_time", self._last_activity)
self._last_register = self._last_activity

time.sleep(30)

##############################################
# Cluster config state storage methods
##############################################
Expand Down Expand Up @@ -159,11 +137,8 @@ async def aset_cluster_config(self, cluster_config: Dict[str, Any]):
return self.cluster_config

async def aset_cluster_config_value(self, key: str, value: Any):
if key == "autostop_mins" and value > -1:
from sky.skylet import configs as sky_configs

self._last_activity = time.time()
sky_configs.set_config("autostop_last_active_time", self._last_activity)
if self.autostop_servlet and key == "autostop_mins" and value > -1:
await self.autostop_servlet.set_auto_stop.remote(value)
self.cluster_config[key] = value

# Propagate the changes to all other process's obj_stores
Expand Down Expand Up @@ -249,7 +224,8 @@ async def aget_key_to_env_servlet_name_dict(self) -> Dict[Any, str]:
return self._key_to_env_servlet_name

async def aget_env_servlet_name_for_key(self, key: Any) -> str:
self._last_activity = time.time()
if self.autostop_servlet:
await self.autostop_servlet.set_last_active_time_to_now.remote()
return self._key_to_env_servlet_name.get(key, None)

async def aput_env_servlet_name_for_key(self, key: Any, env_servlet_name: str):
Expand Down Expand Up @@ -444,3 +420,48 @@ async def astatus(self):

def status(self):
return sync_function(self.astatus)()


class AutostopServlet:
"""A helper class strictly to run SkyPilot methods on OnDemandClusters inside SkyPilot's conda env."""

def __init__(self):
self._last_activity = time.time()
self._last_register = None
autostop_thread = threading.Thread(target=self.update_autostop, daemon=True)
autostop_thread.start()

def set_last_active_time_to_now(self):
self._last_activity = time.time()

def set_autostop(self, value=None):
from sky.skylet import autostop_lib

self.set_last_active_time_to_now()
autostop_lib.set_autostop(value, None, True)

def update_autostop(self):
import pickle

from sky.skylet import configs as sky_configs

while True:

autostop_mins = pickle.loads(
sky_configs.get_config("autostop_config")
).autostop_idle_minutes
self._last_register = float(
sky_configs.get_config("autostop_last_active_time")
)
if autostop_mins > 0 and (
not self._last_register
or (
# within 2 min of autostop and there's more recent activity
60 * autostop_mins - (time.time() - self._last_register) < 120
and self._last_activity > self._last_register
)
):
sky_configs.set_config("autostop_last_active_time", self._last_activity)
self._last_register = self._last_activity

time.sleep(30)
12 changes: 6 additions & 6 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,30 +88,30 @@ def parse_readme(readme: str) -> str:
# NOTE: Change the templates/spot-controller.yaml.j2 file if any of the following
# packages dependencies are changed.
extras_require = {
"sky": ["skypilot==0.5.0"],
"sky": ["skypilot==0.6.0"],
"data": ["pandas", "pyarrow"],
"aws": [
"skypilot[aws]==0.5.0",
"skypilot[aws]==0.6.0",
# https://github.com/aio-libs/aiobotocore/issues/983
# If you don't want to use these exact versions, you can install runhouse without the aws extras, then
# install your desired versions of awscli and boto3
"pycryptodome==3.12.0",
"sshtunnel>=0.3.0", # required for sagemaker
],
"azure": ["skypilot[azure]==0.5.0"],
"azure": ["skypilot[azure]==0.6.0"],
"gcp": [
"skypilot[gcp]==0.5.0",
"skypilot[gcp]==0.6.0",
"gcsfs",
],
"docker": ["docker"],
"sagemaker": [
"skypilot==0.5.0",
"skypilot==0.6.0",
# https://github.com/aws-samples/sagemaker-ssh-helper
"sagemaker_ssh_helper",
"sagemaker",
"paramiko>=3.2.0",
],
"kubernetes": ["skypilot==0.5.0", "kubernetes"],
"kubernetes": ["skypilot==0.6.0", "kubernetes"],
}

extras_require["all"] = sum(extras_require.values(), [])
Expand Down

0 comments on commit 66d97e7

Please sign in to comment.