Skip to content

Commit

Permalink
Add intervals to gpu and cpu util calcs (#1120)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexandra Belousov <sashabelousovrh@Alexandras-MacBook-Pro.local>
  • Loading branch information
BelSasha and Alexandra Belousov committed Sep 4, 2024
1 parent a947d01 commit 5507636
Show file tree
Hide file tree
Showing 8 changed files with 318 additions and 109 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ wheel
apispec
httpx
pydantic >=2.5.0
pynvml
14 changes: 13 additions & 1 deletion runhouse/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,23 @@
# Constants for the status check
DOUBLE_SPACE_UNICODE = "\u00A0\u00A0"
BULLET_UNICODE = "\u2022"
SECOND = 1
MINUTE = 60
HOUR = 3600
DEFAULT_STATUS_CHECK_INTERVAL = 1 * MINUTE
INCREASED_STATUS_CHECK_INTERVAL = 1 * HOUR
STATUS_CHECK_DELAY = 1 * MINUTE
GPU_COLLECTION_INTERVAL = 5 * SECOND

# We collect gpu every GPU_COLLECTION_INTERVAL.
# Meaning that in one minute we collect (MINUTE / GPU_COLLECTION_INTERVAL) gpu stats.
# Currently, we save gpu info of the last 10 minutes or less.
MAX_GPU_INFO_LEN = (MINUTE / GPU_COLLECTION_INTERVAL) * 10

# If we just collect the gpu stats (and not send them to den), the gpu_info dictionary *will not* be reseted by the servlets.
# Therefore, we need to cut the gpu_info size, so it doesn't consume too much cluster memory.
# Currently, we reduce the size by half, meaning we only keep the gpu_info of the last (MAX_GPU_INFO_LEN / 2) minutes.
REDUCED_GPU_INFO_LEN = MAX_GPU_INFO_LEN / 2


# Constants Surfacing Logs to Den
DEFAULT_LOG_SURFACING_INTERVAL = 2 * MINUTE
Expand Down
21 changes: 19 additions & 2 deletions runhouse/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,14 +366,13 @@ def _print_envs_info(
total_gpu_memory = math.ceil(
float(env_gpu_info.get("total_memory")) / (1024**3)
)
gpu_util_percent = round(float(env_gpu_info.get("utilization_percent")), 2)
used_gpu_memory = round(
float(env_gpu_info.get("used_memory")) / (1024**3), 2
)
gpu_memory_usage_percent = round(
float(used_gpu_memory / total_gpu_memory) * 100, 2
)
gpu_usage_summery = f"{DOUBLE_SPACE_UNICODE}GPU: {gpu_util_percent}% | Memory: {used_gpu_memory} / {total_gpu_memory} Gb ({gpu_memory_usage_percent}%)"
gpu_usage_summery = f"{DOUBLE_SPACE_UNICODE}GPU Memory: {used_gpu_memory} / {total_gpu_memory} Gb ({gpu_memory_usage_percent}%)"
console.print(gpu_usage_summery)

resources_in_env = [
Expand Down Expand Up @@ -408,6 +407,8 @@ def _print_status(status_data: dict, current_cluster: Cluster) -> None:
if "name" in cluster_config.keys():
console.print(cluster_config.get("name"))

has_cuda: bool = cluster_config.get("has_cuda")

# print headline
daemon_headline_txt = (
"\N{smiling face with horns} Runhouse Daemon is running \N{Runner}"
Expand All @@ -420,6 +421,22 @@ def _print_status(status_data: dict, current_cluster: Cluster) -> None:
# Print relevant info from cluster config.
_print_cluster_config(cluster_config)

# print general cpu and gpu utilization
cluster_gpu_utilization: float = status_data.get("server_gpu_utilization")

# cluster_gpu_utilization can be none, if the cluster was not using its GPU at the moment cluster.status() was invoked.
if cluster_gpu_utilization is None and has_cuda:
cluster_gpu_utilization: float = 0.0

cluster_cpu_utilization: float = status_data.get("server_cpu_utilization")

server_util_info = (
f"CPU Utilization: {round(cluster_cpu_utilization, 2)}% | GPU Utilization: {round(cluster_gpu_utilization,2)}%"
if has_cuda
else f"CPU Utilization: {round(cluster_cpu_utilization, 2)}%"
)
console.print(server_util_info)

# print the environments in the cluster, and the resources associated with each environment.
_print_envs_info(env_servlet_processes, current_cluster)

Expand Down
144 changes: 107 additions & 37 deletions runhouse/servers/cluster_servlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@
from typing import Any, Dict, List, Optional, Set, Tuple, Union

import httpx

import psutil
import pynvml
import requests

import runhouse

from runhouse.constants import (
DEFAULT_STATUS_CHECK_INTERVAL,
GPU_COLLECTION_INTERVAL,
INCREASED_STATUS_CHECK_INTERVAL,
MAX_GPU_INFO_LEN,
REDUCED_GPU_INFO_LEN,
SERVER_LOGFILE,
SERVER_LOGS_FILE_NAME,
)
Expand All @@ -25,7 +31,13 @@
from runhouse.rns.utils.api import ResourceAccess
from runhouse.servers.autostop_helper import AutostopHelper
from runhouse.servers.http.auth import AuthCache
from runhouse.utils import ColoredFormatter, sync_function
from runhouse.utils import (
ColoredFormatter,
get_gpu_usage,
get_pid,
ServletType,
sync_function,
)

logger = get_logger(__name__)

Expand All @@ -47,6 +59,8 @@ async def __init__(
self.cluster_config: Optional[Dict[str, Any]] = (
cluster_config if cluster_config else {}
)
self.cluster_config["has_cuda"] = detect_cuda_version_or_cpu() != "cpu"

self._initialized_env_servlet_names: Set[str] = set()
self._key_to_env_servlet_name: Dict[Any, str] = {}
self._auth_cache: AuthCache = AuthCache(cluster_config)
Expand All @@ -65,6 +79,19 @@ async def __init__(
self._api_server_url = self.cluster_config.get(
"api_server_url", rns_client.api_server_url
)
self.pid = get_pid()
self.process = psutil.Process(pid=self.pid)
self.gpu_metrics = None # will be updated only if this is a gpu cluster.
self.lock = (
threading.Lock()
) # will be used when self.gpu_metrics will be updated by different threads.

if self.cluster_config.get("has_cuda"):
logger.debug("Creating _periodic_gpu_check thread.")
collect_gpu_thread = threading.Thread(
target=self._periodic_gpu_check, daemon=True
)
collect_gpu_thread.start()

logger.info("Creating periodic_cluster_checks thread.")
cluster_checks_thread = threading.Thread(
Expand Down Expand Up @@ -289,7 +316,9 @@ async def aperiodic_cluster_checks(self):
"Cluster has not yet been saved to Den, cannot update status or logs."
)
elif status_code != 200:
logger.error("Failed to send cluster status to Den")
logger.error(
f"Failed to send cluster status to Den, status_code: {status_code}"
)
else:
logger.debug("Successfully sent cluster status to Den")

Expand Down Expand Up @@ -382,42 +411,79 @@ async def _status_for_env_servlet(self, env_servlet_name):
except Exception as e:
return {"env_servlet_name": env_servlet_name, "Exception": e}

async def _aperiodic_gpu_check(self):
"""periodically collects cluster gpu usage"""

pynvml.nvmlInit() # init nvidia ml info collection

while True:
try:

gpu_count = pynvml.nvmlDeviceGetCount()
with self.lock:
if not self.gpu_metrics:
self.gpu_metrics = {device: [] for device in range(gpu_count)}

for gpu_index in range(gpu_count):
handle = pynvml.nvmlDeviceGetHandleByIndex(gpu_index)
util_info = pynvml.nvmlDeviceGetUtilizationRates(handle)
memory_info = pynvml.nvmlDeviceGetMemoryInfo(handle)

total_memory = memory_info.total # in bytes
used_memory = memory_info.used # in bytes
free_memory = memory_info.free # in bytes
utilization_percent = util_info.gpu / 1.0 # make it float

# to reduce cluster memory usage (we are saving the gpu_usage info on the cluster),
# we save only the most updated gpu usage. If for some reason the size of updated_gpu_info is
# too big, we remove the older gpu usage info.
# This is relevant when using cluster.status() directly and not relying on status being sent to den.
updated_gpu_info = self.gpu_metrics[gpu_index]
if len(updated_gpu_info) + 1 > MAX_GPU_INFO_LEN:
updated_gpu_info = updated_gpu_info[REDUCED_GPU_INFO_LEN:]
updated_gpu_info.append(
{
"total_memory": total_memory,
"used_memory": used_memory,
"free_memory": free_memory,
"utilization_percent": utilization_percent,
}
)
self.gpu_metrics[gpu_index] = updated_gpu_info
except Exception as e:
logger.error(str(e))
pynvml.nvmlShutdown()
break
finally:
# collects gpu usage every 5 seconds.
await asyncio.sleep(GPU_COLLECTION_INTERVAL)

def _periodic_gpu_check(self):
# This is only ever called once in its own thread, so we can do asyncio.run here instead of
# sync_function.
asyncio.run(self._aperiodic_gpu_check())

def _get_node_gpu_usage(self, server_pid: int):
import subprocess

gpu_general_info = (
subprocess.run(
[
"nvidia-smi",
"--query-gpu=memory.total,memory.used,memory.free,count,utilization.gpu",
"--format=csv,noheader,nounits",
],
stdout=subprocess.PIPE,
)
.stdout.decode("utf-8")
.strip()
.split(", ")

# currently works correctly for a single node GPU. Multinode-clusters will be supported shortly.

collected_gpus_info = copy.deepcopy(self.gpu_metrics)

if collected_gpus_info is None or not collected_gpus_info[0]:
return None

cluster_gpu_usage = get_gpu_usage(
collected_gpus_info=collected_gpus_info, servlet_type=ServletType.cluster
)
total_gpu_memory = int(gpu_general_info[0]) * (1024**2) # in bytes
total_used_memory = int(gpu_general_info[1]) * (1024**2) # in bytes
free_memory = int(gpu_general_info[2]) * (1024**2) # in bytes
gpu_count = int(gpu_general_info[3])
gpu_utilization_percent = int(gpu_general_info[4]) / 100

return {
"total_memory": total_gpu_memory,
"used_memory": total_used_memory,
"free_memory": free_memory,
"gpu_count": gpu_count,
"utilization_percent": gpu_utilization_percent,
"server_pid": server_pid, # will be useful for multi-node clusters.
}
cluster_gpu_usage[
"server_pid"
] = server_pid # will be useful for multi-node clusters.

return cluster_gpu_usage

async def astatus(self, send_to_den: bool = False) -> Tuple[Dict, Optional[int]]:
import psutil

from runhouse.utils import get_pid

config_cluster = copy.deepcopy(self.cluster_config)

# Popping out creds because we don't want to show them in the status
Expand Down Expand Up @@ -453,7 +519,7 @@ async def astatus(self, send_to_den: bool = False) -> Tuple[Dict, Optional[int]]
env_servlet_utilization_data[env_servlet_name] = env_memory_info

# TODO: decide if we need this info at all: cpu_usage, memory_usage, disk_usage
cpu_utilization = psutil.cpu_percent(interval=1)
cpu_utilization = psutil.cpu_percent(interval=0)

# A dictionary that match the keys of psutil.virtual_memory()._asdict() to match the keys we expect in Den.
relevant_memory_info = {
Expand All @@ -472,11 +538,9 @@ async def astatus(self, send_to_den: bool = False) -> Tuple[Dict, Optional[int]]
for k in relevant_memory_info.keys()
}

server_pid: int = get_pid()

# get general gpu usage
server_gpu_usage = (
self._get_node_gpu_usage(server_pid)
self._get_node_gpu_usage(self.pid)
if self.cluster_config.get("has_cuda", False)
else None
)
Expand All @@ -486,10 +550,16 @@ async def astatus(self, send_to_den: bool = False) -> Tuple[Dict, Optional[int]]
else None
)

# rest the gpu_info only after the status was sent to den. If we should not send status to den,
# self.gpu_metrics will not be updated at all, therefore should not be reset.
if send_to_den:
with self.lock:
self.gpu_metrics = None

status_data = {
"cluster_config": config_cluster,
"runhouse_version": runhouse.__version__,
"server_pid": server_pid,
"server_pid": self.pid,
"env_servlet_processes": env_servlet_utilization_data,
"server_cpu_utilization": cpu_utilization,
"server_gpu_utilization": gpu_utilization,
Expand Down
Loading

0 comments on commit 5507636

Please sign in to comment.