Skip to content

Commit

Permalink
add cluster utilization data to status endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
jlewitt1 authored and Alexandra Belousov committed Apr 7, 2024
1 parent 8e59b23 commit 3a3d62a
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 20 deletions.
97 changes: 87 additions & 10 deletions runhouse/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
import logging
import shlex
import subprocess
Expand Down Expand Up @@ -120,13 +121,40 @@ def ssh(cluster_name: str, up: bool = typer.Option(False, help="Start the cluste
c.ssh()


def _print_status(config):
###############################
# Status helping functions
###############################


def _adjust_resource_type(resource_type: str):
"""
status helping function. transforms a str form runhouse.resources.{X.Y...}.resource_type to runhouse.resource_type
"""
resource_type = resource_type.split(".")
resource_type = ".".join([resource_type[0], resource_type[-1]])
return resource_type


def _resource_name_to_rns(name: str, current_cluster: Cluster):
"""
If possible, transform the resource name to a rns address.
If not, return the name as is (it is the key in the object store).
"""
resource = current_cluster.get(name)
if not resource.rns_address:
return name
return resource.rns_address


def _print_status(config: dict, current_cluster: Cluster):
"""
Prints the status of the cluster to the console
:param config: cluster's config
:return: cluster's config
"""
envs = config["envs"]
envs_pid = config.pop("envs_pid", [])

config.pop("envs", [])

# print headlines
Expand All @@ -138,7 +166,12 @@ def _print_status(config):
if "name" in config.keys():
console.print(config["name"])

first_info_to_print = ["den_auth", "server_connection_type", "server_port"]
first_info_to_print = [
"den_auth",
"server_connection_type",
"server_port",
"server_pid",
]

for info in config:
if info in first_info_to_print:
Expand All @@ -160,7 +193,10 @@ def _print_status(config):
for env_name in envs:
resources_in_env = envs[env_name]
if len(resources_in_env) == 0:
console.print(f"{env_name} (Env):", style="italic underline")
console.print(
f"{env_name} (runhouse.Env) | {envs_pid[env_name]} :",
style="italic underline",
)
console.print("This environment has no resources.")

else:
Expand All @@ -172,12 +208,18 @@ def _print_status(config):

# sometimes the env itself is not a resource (key) inside the env's servlet.
if len(current_env) == 0:
env_name_txt = f"{env_name} (Env):"
env_name_print = _resource_name_to_rns(env_name, current_cluster)
env_name_txt = (
f"{env_name_print} (runhouse.Env) | {envs_pid[env_name]} :"
)
else:
current_env = current_env[0]
env_name_txt = (
f"{current_env['name']} ({current_env['resource_type']}):"
env_name_print = _resource_name_to_rns(
current_env["name"], current_cluster
)
env_type = _adjust_resource_type(current_env["resource_type"])

env_name_txt = f"{env_name_print} ({env_type}) | {envs_pid[env_name]} :"

console.print(
env_name_txt,
Expand All @@ -189,8 +231,8 @@ def _print_status(config):
]

for resource in resources_in_env:
resource_name = resource["name"]
resource_type = resource["resource_type"]
resource_name = _resource_name_to_rns(resource["name"], current_cluster)
resource_type = _adjust_resource_type(resource["resource_type"])
console.print(f"\u2022{resource_name} ({resource_type})")

return config
Expand All @@ -204,11 +246,31 @@ def status(
)
):
"""Load the status of the Runhouse daemon running on a cluster."""

get_pid_commands = ["import os", "print(os.getpid())"]

cluster_or_local = rh.here
if cluster_or_local != "file":
# If we are on the cluster load status directly from the object store
cluster_status: dict = obj_store.status()
return _print_status(cluster_status)
cluster_config = copy.deepcopy(cluster_status)
cluster_config.pop("envs")
current_cluster: Cluster = Cluster.from_config(cluster_config)
server_pid = current_cluster.run_python(commands=get_pid_commands)
cluster_status["server_pid"] = server_pid
envs_names = cluster_status.get("envs").keys()
envs_pid = [
{
env_name: current_cluster.run_python(
commands=get_pid_commands, env=env_name
)
}
for env_name in envs_names
if env_name != "base"
]
envs_pid.append({"base": {}})
cluster_status["envs_pid"] = envs_pid
return _print_status(cluster_status, current_cluster)

if cluster_name is None:
# If running outside the cluster must specify a cluster name
Expand All @@ -221,6 +283,21 @@ def status(
cluster_status: dict = current_cluster.status(
resource_address=current_cluster.rns_address
)
server_pid = current_cluster.run_python(commands=get_pid_commands)
cluster_status["server_pid"] = server_pid
envs_names = cluster_status.get("envs").keys()
envs_pid = [
{
env_name: current_cluster.run_python(
commands=get_pid_commands, env=env_name
)
}
for env_name in envs_names
if env_name != "base"
]
envs_pid.append({"base": {}})
cluster_status["envs_pid"] = envs_pid

except ValueError:
console.print("Failed to load status for cluster.")
return
Expand All @@ -230,7 +307,7 @@ def status(
)
return

return _print_status(cluster_status)
return _print_status(cluster_status, current_cluster)


def load_cluster(cluster_name: str):
Expand Down
64 changes: 62 additions & 2 deletions runhouse/servers/http/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,64 @@ async def get_keys(request: Request, env_name: Optional[str] = None):
@app.get("/status")
@validate_cluster_access
async def get_status(request: Request):
return await obj_store.astatus()
import psutil
import ray

# Fields: `CPU`, `object_memory_store`, `memory`, `node`
ray_available_resources = {}
ray_total_resources = {}

system_gpu_data = {}

# Note: Ray resource data does not necessarily the real-time utilization of these resources
# Ex: a task may be allocated a CPU but might not be using it at 100% capacity
try:
ray_available_resources = ray.available_resources()
ray_total_resources = ray.cluster_resources()
except ray.exceptions.RaySystemError as e:
# If ray is not initialized
logger.warning(e)

# System wide data
cpu_usage = psutil.cpu_percent(interval=1)

# Fields: `available`, `percent`, `used`, `free`, `active`, `inactive`, `buffers`, `cached`, `shared`, `slab`
memory_usage = psutil.virtual_memory()._asdict()

# Fields: `total`, `used`, `free`, `percent`
disk_usage = psutil.disk_usage("/")._asdict()

try:
# Try loading GPU data (if relevant)
import torch

if torch.cuda.is_available():
system_gpu_data = {}
gpu_count = torch.cuda.device_count()
system_gpu_data["gpu_count"] = gpu_count
for i in range(gpu_count):
device_name = torch.cuda.get_device_name(i)
device_properties = torch.cuda.get_device_properties(i)
device_data = {
"device_properties": device_properties,
"total_memory": device_properties.total_memory,
}
system_gpu_data[device_name] = device_data

except:
pass

cluster_config = await obj_store.status()

return {
"cluster_config": cluster_config,
"ray_available_resources": ray_available_resources,
"ray_total_resources": ray_total_resources,
"system_cpu_usage": cpu_usage,
"system_memory_usage": memory_usage,
"system_disk_usage": disk_usage,
"system_gpu_data": system_gpu_data,
}

@staticmethod
def _collect_cluster_stats():
Expand Down Expand Up @@ -1180,4 +1237,7 @@ async def main():


if __name__ == "__main__":
asyncio.run(main())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
# asyncio.run(main())
17 changes: 11 additions & 6 deletions runhouse/servers/obj_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1442,7 +1442,7 @@ def status_local(self):
# The objects in env can be of any type, and not only runhouse resources,
# therefore we need to distinguish them when creating the list of the resources in each env.
if self.has_local_storage:
resources_in_env_modified = []
objects_in_env_modified = []
for k, v in self._kv_store.items():
cls = type(v)
py_module = cls.__module__
Expand All @@ -1452,27 +1452,32 @@ def status_local(self):
else (py_module + "." + cls.__qualname__)
)
if isinstance(v, runhouse.Resource):
resources_in_env_modified.append(
objects_in_env_modified.append(
{"name": k, "resource_type": cls_name}
)
else:
resources_in_env_modified.append(
objects_in_env_modified.append(
{"name": k, "resource_type": cls_name}
)
return resources_in_env_modified
return objects_in_env_modified
else:
return []

async def astatus(self):
config_cluster = self.get_cluster_config()

# poping creds because we don't want to show them in the status
config_cluster.pop("creds", None)

# getting cluster servlets (envs) and their related objects
cluster_servlets = {}
for env in await self.aget_all_initialized_env_servlet_names():
resources_in_env_modified = await self.acall_actor_method(
objects_in_env_modified = await self.acall_actor_method(
self.get_env_servlet(env), "astatus_local"
)
cluster_servlets[env] = resources_in_env_modified
cluster_servlets[env] = objects_in_env_modified
config_cluster["envs"] = cluster_servlets

return config_cluster

def status(self):
Expand Down
22 changes: 20 additions & 2 deletions tests/test_resources/test_clusters/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,24 @@ def test_cluster_endpoint(self, cluster):
headers=rh.globals.rns_client.request_headers(),
)
assert r.status_code == 200
assert r.json()["resource_type"] == "cluster"
assert r.json().get("cluster_config")["resource_type"] == "cluster"

@pytest.mark.level("local")
def test_load_cluster_status(self, cluster):
endpoint = cluster.endpoint()
verify = cluster.client.verify
r = requests.get(
f"{endpoint}/status",
verify=verify,
headers=rh.globals.rns_client.request_headers(),
)

assert r.status_code == 200
status_data = r.json()
assert status_data["cluster_config"]["resource_type"] == "cluster"
assert status_data["ray_available_resources"]
assert status_data["ray_total_resources"]
assert not status_data["system_gpu_data"]

@pytest.mark.level("local")
def test_cluster_objects(self, cluster):
Expand Down Expand Up @@ -219,7 +236,8 @@ def test_rh_here_objects(self, cluster):
@pytest.mark.level("local")
def test_rh_status_pythonic(self, cluster):
cluster.put(key="status_key1", obj="status_value1", env="numpy_env")
res = cluster.status()
cluster_data = cluster.status()
res = cluster_data.get("cluster_config")
assert res.get("creds") is None
assert res.get("server_port") == (cluster.server_port or DEFAULT_SERVER_PORT)
assert res.get("server_connection_type") == cluster.server_connection_type
Expand Down

0 comments on commit 3a3d62a

Please sign in to comment.