Skip to content

Commit

Permalink
Revert "[WIP] CPU/memory usage per func class (#31234)" (#31328)
Browse files Browse the repository at this point in the history
Reverts #31234

Fix `test_runtime_env_2` and `test_metrics_agent` Windows CI failures
  • Loading branch information
krfricke authored and AmeerHajAli committed Jan 12, 2023
1 parent 72f7a7d commit 44921ae
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 493 deletions.
4 changes: 0 additions & 4 deletions dashboard/client/src/pages/metrics/Metrics.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,6 @@ const METRICS_CONFIG = [
title: "Node Memory by Component",
path: "/d-solo/rayDefaultDashboard/default-dashboard?orgId=1&theme=light&panelId=34",
},
{
title: "Node CPU by Component",
path: "/d-solo/rayDefaultDashboard/default-dashboard?orgId=1&theme=light&panelId=37",
},
{
title: "Node GPU Memory (GRAM)",
path: "/d-solo/rayDefaultDashboard/default-dashboard?orgId=1&theme=light&panelId=18",
Expand Down
13 changes: 0 additions & 13 deletions dashboard/modules/metrics/grafana_dashboard_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,19 +296,6 @@ def max_plus_pending(max_resource, pending_resource):
)
],
),
Panel(
id=37,
title="Node CPU by Component",
description="The physical (hardware) CPU usage across the cluster, broken down by component. This reports the summed CPU usage per Ray component.",
unit="cores",
targets=[
Target(
# ray_component_cpu_percentage returns a percentage that can be > 100. It means that it uses more than 1 CPU.
expr="sum(ray_component_cpu_percentage{{{global_filters}}}) by (Component) / 100",
legend="{{Component}}",
)
],
),
Panel(
id=18,
title="Node GPU Memory (GRAM)",
Expand Down
221 changes: 61 additions & 160 deletions dashboard/modules/reporter/reporter_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
import psutil

from typing import List, Optional, Tuple
from collections import defaultdict

import ray
import ray._private.services
import ray._private.utils
from ray.dashboard.consts import (
GCS_RPC_TIMEOUT_SECONDS,
COMPONENT_METRICS_TAG_KEYS,
AVAILABLE_COMPONENT_NAMES_FOR_METRICS,
)
from ray.dashboard.modules.reporter.profile_manager import CpuProfilingManager
import ray.dashboard.modules.reporter.reporter_consts as reporter_consts
Expand Down Expand Up @@ -287,12 +287,6 @@ def __init__(self, dashboard_agent):
self._hostname = socket.gethostname()
# (pid, created_time) -> psutil.Process
self._workers = {}
# psutil.Process of the parent.
self._raylet_proc = None
# psutil.Process of the current process.
self._agent_proc = None
# The last reported worker proc names (e.g., ray::*).
self._latest_worker_proc_names = set()
self._network_stats_hist = [(0, (0.0, 0.0))] # time, (sent, recv)
self._disk_io_stats_hist = [
(0, (0.0, 0.0, 0, 0))
Expand Down Expand Up @@ -512,20 +506,19 @@ def _get_workers(self):
if w.status() != psutil.STATUS_ZOMBIE
]

def _get_raylet_proc(self):
@staticmethod
def _get_raylet_proc():
try:
if not self._raylet_proc:
curr_proc = psutil.Process()
# Here, parent is always raylet because the
# dashboard agent is a child of the raylet process.
self._raylet_proc = curr_proc.parent()

if self._raylet_proc is not None:
if self._raylet_proc.pid == 1:
curr_proc = psutil.Process()
# Here, parent is always raylet because the
# dashboard agent is a child of the raylet process.
parent = curr_proc.parent()
if parent is not None:
if parent.pid == 1:
return None
if self._raylet_proc.status() == psutil.STATUS_ZOMBIE:
if parent.status() == psutil.STATUS_ZOMBIE:
return None
return self._raylet_proc
return parent
except (psutil.AccessDenied, ProcessLookupError):
pass
return None
Expand All @@ -549,9 +542,8 @@ def _get_raylet(self):

def _get_agent(self):
# Current proc == agent proc
if not self._agent_proc:
self._agent_proc = psutil.Process()
return self._agent_proc.as_dict(
agent_proc = psutil.Process()
return agent_proc.as_dict(
attrs=[
"pid",
"create_time",
Expand Down Expand Up @@ -613,137 +605,6 @@ def _get_all_stats(self):
"cmdline": self._get_raylet().get("cmdline", []),
}

def _generate_reseted_stats_record(self, component_name: str) -> List[Record]:
"""Return a list of Record that will reset
the system metrics of a given component name.
Args:
component_name: a component name for a given stats.
Returns:
a list of Record instances of all values 0.
"""
tags = {"ip": self._ip, "Component": component_name}

records = []
records.append(
Record(
gauge=METRICS_GAUGES["component_cpu_percentage"],
value=0.0,
tags=tags,
)
)
records.append(
Record(
gauge=METRICS_GAUGES["component_rss_mb"],
value=0.0,
tags=tags,
)
)
records.append(
Record(
gauge=METRICS_GAUGES["component_uss_mb"],
value=0.0,
tags=tags,
)
)
return records

def _generate_system_stats_record(
self, stats: List[dict], component_name: str, pid: Optional[str] = None
) -> List[Record]:
"""Generate a list of Record class from a given component names.
Args:
stats: a list of stats dict generated by `psutil.as_dict`.
If empty, it will create the metrics of a given "component_name"
which has all 0 values.
component_name: a component name for a given stats.
pid: optionally provided pids.
Returns:
a list of Record class that will be exposed to Prometheus.
"""
total_cpu_percentage = 0.0
total_rss = 0.0
total_uss = 0.0

for stat in stats:
total_cpu_percentage += float(stat.get("cpu_percent", 0.0)) # noqa
memory_info = stat.get("memory_info")
if memory_info:
total_rss += float(stat["memory_info"].rss) / 1.0e6 # noqa
mem_full_info = stat.get("memory_full_info")
if mem_full_info is not None:
total_uss += float(mem_full_info.uss) / 1.0e6

tags = {"ip": self._ip, "Component": component_name}
if pid:
tags["pid"] = pid

records = []
records.append(
Record(
gauge=METRICS_GAUGES["component_cpu_percentage"],
value=total_cpu_percentage,
tags=tags,
)
)
records.append(
Record(
gauge=METRICS_GAUGES["component_rss_mb"],
value=total_rss,
tags=tags,
)
)
if total_uss > 0.0:
records.append(
Record(
gauge=METRICS_GAUGES["component_uss_mb"],
value=total_uss,
tags=tags,
)
)

return records

def generate_worker_stats_record(self, worker_stats: List[dict]) -> List[Record]:
"""Generate a list of Record class for worker proceses.
This API automatically sets the component_name of record as
the name of worker processes. I.e., ray::* so that we can report
per task/actor (grouped by a func/class name) resource usages.
Args:
stats: a list of stats dict generated by `psutil.as_dict`
for worker processes.
"""
# worekr cmd name (ray::*) -> stats dict.
proc_name_to_stats = defaultdict(list)
for stat in worker_stats:
cmdline = stat.get("cmdline")
# All ray processes start with ray::
if cmdline and len(cmdline) > 0 and cmdline[0].startswith("ray::"):
proc_name = cmdline[0]
proc_name_to_stats[proc_name].append(stat)
# We will lose worker stats that don't follow the ray worker proc
# naming convention. Theoretically, there should be no data loss here
# because all worker processes are renamed to ray::.

records = []
for proc_name, stats in proc_name_to_stats.items():
records.extend(self._generate_system_stats_record(stats, proc_name))

# Reset worker metrics that are from finished processes.
new_proc_names = set(proc_name_to_stats.keys())
stale_procs = self._latest_worker_proc_names - new_proc_names
self._latest_worker_proc_names = new_proc_names

for stale_proc_name in stale_procs:
records.extend(self._generate_reseted_stats_record(stale_proc_name))

return records

def _record_stats(self, stats, cluster_stats):
records_reported = []
ip = stats["ip"]
Expand Down Expand Up @@ -954,26 +815,66 @@ def _record_stats(self, stats, cluster_stats):
Record system stats.
"""

def record_system_stats(
stats: List[dict], component_name: str, pid: Optional[str] = None
) -> List[Record]:
assert component_name in AVAILABLE_COMPONENT_NAMES_FOR_METRICS
records = []
total_cpu_percentage = 0.0
total_rss = 0.0
total_uss = 0.0
for stat in stats:
total_cpu_percentage += float(stat["cpu_percent"]) * 100.0
total_rss += float(stat["memory_info"].rss) / 1.0e6
mem_full_info = stat.get("memory_full_info")
if mem_full_info is not None:
total_uss += float(mem_full_info.uss) / 1.0e6

tags = {"ip": ip, "Component": component_name}
if pid:
tags["pid"] = pid

records.append(
Record(
gauge=METRICS_GAUGES["component_cpu_percentage"],
value=total_cpu_percentage,
tags=tags,
)
)
records.append(
Record(
gauge=METRICS_GAUGES["component_rss_mb"],
value=total_rss,
tags=tags,
)
)
if total_uss > 0.0:
records.append(
Record(
gauge=METRICS_GAUGES["component_uss_mb"],
value=total_uss,
tags=tags,
)
)

return records

# Record component metrics.
raylet_stats = stats["raylet"]
if raylet_stats:
raylet_pid = str(raylet_stats["pid"])
records_reported.extend(
self._generate_system_stats_record(
[raylet_stats], "raylet", pid=raylet_pid
)
record_system_stats([raylet_stats], "raylet", pid=raylet_pid)
)
workers_stats = stats["workers"]
logger.info(workers_stats)
if workers_stats:
records_reported.extend(self.generate_worker_stats_record(workers_stats))
# TODO(sang): Maybe we can report per worker memory usage.
records_reported.extend(record_system_stats(workers_stats, "workers"))
agent_stats = stats["agent"]
if agent_stats:
agent_pid = str(agent_stats["pid"])
records_reported.extend(
self._generate_system_stats_record(
[agent_stats], "agent", pid=agent_pid
)
record_system_stats([agent_stats], "agent", pid=agent_pid)
)

# TODO(sang): Record GCS metrics.
Expand Down
Loading

0 comments on commit 44921ae

Please sign in to comment.