Skip to content

Commit

Permalink
Read all cgroup v2 metrics that can be read
Browse files Browse the repository at this point in the history
This commit removes any arbitration of the cgroup v2 heirarchy for a
given process. We instead read anything that can be read, looping over
all cgroup files present but not following the heirarchy down.

Signed-off-by: Brian L. Troutwine <brian.troutwine@datadoghq.com>
  • Loading branch information
blt committed Dec 3, 2024
1 parent 63f1ce0 commit 6575d56
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 158 deletions.
30 changes: 2 additions & 28 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion lading/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ uuid = { workspace = true }
zstd = "0.13.1"

[target.'cfg(target_os = "linux")'.dependencies]
cgroups-rs = { version = "0.3", default-features = false, features = [] }
procfs = { version = "0.17", default-features = false, features = [] }
async-pidfd = "0.1"

Expand Down
137 changes: 8 additions & 129 deletions lading/src/observer/linux.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{collections::VecDeque, io, path::Path, sync::atomic::Ordering};
mod cgroup;

use std::{collections::VecDeque, io, sync::atomic::Ordering};

use cgroups_rs::cgroup::Cgroup;
use metrics::gauge;
use nix::errno::Errno;
use procfs::ProcError::PermissionDenied;
Expand All @@ -9,6 +10,7 @@ use rustc_hash::{FxHashMap, FxHashSet};
use tracing::{error, warn};

use crate::observer::memory::{Regions, Rollup};
use cgroup::v2;

use super::RSS_BYTES;

Expand All @@ -27,8 +29,8 @@ pub enum Error {
/// Wrapper for [`procfs::ProcError`]
#[error("Unable to read procfs: {0}")]
Proc(#[from] procfs::ProcError),
#[error("Unable to read cgroups: {0}")]
CGroups(#[from] cgroups_rs::error::Error),
#[error("Unable to read cgroup: {0}")]
CGroup(#[from] v2::Error),
}

macro_rules! report_status_field {
Expand Down Expand Up @@ -401,96 +403,8 @@ impl Sampler {
// if possible we compute the working set of the cgroup
// using the same heuristic as kubernetes:
// total_usage - inactive_file
let cgroup = get_cgroup(pid as _)?;
if let Some(memory_controller) =
cgroup.controller_of::<cgroups_rs::memory::MemController>()
{
let mem_stat = memory_controller.memory_stat();

let inactive_file = if cgroup.v2() {
mem_stat.stat.inactive_file
} else {
mem_stat.stat.total_inactive_file
};
let usage = mem_stat.usage_in_bytes;
let working_set = if usage < inactive_file {
0
} else {
usage - inactive_file
};

gauge!("working_set_bytes").set(working_set as f64);
gauge!("memory.working_set_bytes").set(working_set as f64);

gauge!("memory.fail_cnt").set(mem_stat.fail_cnt as f64);
gauge!("memory.limit_bytes").set(mem_stat.limit_in_bytes as f64);
gauge!("memory.usage_in_bytes").set(mem_stat.usage_in_bytes as f64);
gauge!("memory.max_usage_in_bytes").set(mem_stat.max_usage_in_bytes as f64);
gauge!("memory.soft_limit_in_bytes").set(mem_stat.soft_limit_in_bytes as f64);

gauge!("memory.stat.cache").set(mem_stat.stat.cache as f64);
gauge!("memory.stat.rss").set(mem_stat.stat.rss as f64);
gauge!("memory.stat.rss_huge").set(mem_stat.stat.rss_huge as f64);
gauge!("memory.stat.shmem").set(mem_stat.stat.shmem as f64);
gauge!("memory.stat.mapped_file").set(mem_stat.stat.mapped_file as f64);
gauge!("memory.stat.dirty").set(mem_stat.stat.dirty as f64);
gauge!("memory.stat.writeback").set(mem_stat.stat.writeback as f64);
gauge!("memory.stat.swap").set(mem_stat.stat.swap as f64);
gauge!("memory.stat.pgpgin").set(mem_stat.stat.pgpgin as f64);
gauge!("memory.stat.pgpgout").set(mem_stat.stat.pgpgout as f64);
gauge!("memory.stat.pgfault").set(mem_stat.stat.pgfault as f64);
gauge!("memory.stat.pgmajfault").set(mem_stat.stat.pgmajfault as f64);
gauge!("memory.stat.inactive_anon").set(mem_stat.stat.inactive_anon as f64);
gauge!("memory.stat.active_anon").set(mem_stat.stat.active_anon as f64);
gauge!("memory.stat.inactive_file").set(mem_stat.stat.inactive_file as f64);
gauge!("memory.stat.active_file").set(mem_stat.stat.active_file as f64);
gauge!("memory.stat.unevictable").set(mem_stat.stat.unevictable as f64);
gauge!("memory.stat.hierarchical_memory_limit")
.set(mem_stat.stat.hierarchical_memory_limit as f64);
gauge!("memory.stat.hierarchical_memsw_limit")
.set(mem_stat.stat.hierarchical_memsw_limit as f64);
gauge!("memory.stat.total_cache").set(mem_stat.stat.total_cache as f64);
gauge!("memory.stat.total_rss").set(mem_stat.stat.total_rss as f64);
gauge!("memory.stat.total_rss_huge").set(mem_stat.stat.total_rss_huge as f64);
gauge!("memory.stat.total_shmem").set(mem_stat.stat.total_shmem as f64);
gauge!("memory.stat.total_mapped_file").set(mem_stat.stat.total_mapped_file as f64);
gauge!("memory.stat.total_dirty").set(mem_stat.stat.total_dirty as f64);
gauge!("memory.stat.total_writeback").set(mem_stat.stat.total_writeback as f64);
gauge!("memory.stat.total_swap").set(mem_stat.stat.total_swap as f64);
gauge!("memory.stat.total_pgpgin").set(mem_stat.stat.total_pgpgin as f64);
gauge!("memory.stat.total_pgpgout").set(mem_stat.stat.total_pgpgout as f64);
gauge!("memory.stat.total_pgfault").set(mem_stat.stat.total_pgfault as f64);
gauge!("memory.stat.total_pgmajfault").set(mem_stat.stat.total_pgmajfault as f64);
gauge!("memory.stat.total_inactive_anon")
.set(mem_stat.stat.total_inactive_anon as f64);
gauge!("memory.stat.total_active_anon").set(mem_stat.stat.total_active_anon as f64);
gauge!("memory.stat.total_inactive_file")
.set(mem_stat.stat.total_inactive_file as f64);
gauge!("memory.stat.total_active_file").set(mem_stat.stat.total_active_file as f64);
gauge!("memory.stat.total_unevictable").set(mem_stat.stat.total_unevictable as f64);
}
// Load the CPU controller and get the cpu.stat String out of the
// cgroup, parse whatever fields are present and report them back
// out as metrics.
if let Some(cpu_controller) = cgroup.controller_of::<cgroups_rs::cpu::CpuController>() {
let cpu = cpu_controller.cpu();
for line in cpu.stat.lines() {
let mut fields = line.split_whitespace();
let metric_name = fields.next().unwrap_or_default();
let value = fields.next().unwrap_or_default();
gauge!(format!("cpu.{metric_name}"))
.set(value.parse::<f64>().unwrap_or_default());
}
if let Ok(shares) = cpu_controller.shares() {
gauge!("cpu.shares").set(shares as f64);
}
if let Ok(cfs_period) = cpu_controller.cfs_period() {
gauge!("cpu.cfs_period").set(cfs_period as f64);
}
if let Ok(cfs_quota) = cpu_controller.cfs_quota() {
gauge!("cpu.cfs_quota").set(cfs_quota as f64);
}
}
let cgroup_path = v2::get_path(pid).await?;
v2::poll(cgroup_path).await?;
}

gauge!("num_processes").set(total_processes as f64);
Expand Down Expand Up @@ -608,38 +522,3 @@ fn percentage(delta_ticks: f64, delta_time: f64, num_cores: f64) -> f64 {

overall_percentage.clamp(0.0, 100.0 * num_cores)
}

#[inline]
fn get_cgroup(pid: u32) -> Result<Cgroup, Error> {
let hierarchies = cgroups_rs::hierarchies::auto();
if hierarchies.v2() {
// for cgroups v2, we parse `/proc/<pid>/cgroup` looking for the main cgroup
// relative path. We then use this to load the correct cgroup.
// For unknown reasons, the cgroups_rs lib is not able to do this on its own.
// Heavily inspired by
// https://github.com/containerd/rust-extensions/blob/3d4de340d83aa06dff24fbf73d7d584ebe77c7ec/crates/shim/src/cgroup.rs#L178

let eof = || io::Error::from(io::ErrorKind::UnexpectedEof);
let path = format!("/proc/{pid}/cgroup");
let content = std::fs::read_to_string(path)?;

let first_line = content.lines().next().ok_or_else(eof)?;
let (_, path_part) = first_line.split_once("::").ok_or_else(eof)?;

let mut path_parts = path_part.split('/').skip(1);
let namespace = path_parts.next().ok_or_else(eof)?;
let cgroup_name = path_parts.next().ok_or_else(eof)?;

Ok(Cgroup::load(
hierarchies,
format!("/sys/fs/cgroup/{namespace}/{cgroup_name}").as_str(),
))
} else {
let relative_paths = cgroups_rs::cgroup::get_cgroups_relative_paths_by_pid(pid)?;
Ok(Cgroup::load_with_relative_paths(
hierarchies,
Path::new("."),
relative_paths,
))
}
}
2 changes: 2 additions & 0 deletions lading/src/observer/linux/cgroup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/// Code to read cgroup information.
pub(crate) mod v2;
93 changes: 93 additions & 0 deletions lading/src/observer/linux/cgroup/v2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use std::{io, path::PathBuf};

use metrics::gauge;
use tokio::fs;

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("IO error: {0}")]
Io(#[from] io::Error),
#[error("Parse int error: {0}")]
ParseInt(#[from] std::num::ParseIntError),
#[error("Parse float error: {0}")]
ParseFloat(#[from] std::num::ParseFloatError),
#[error("Cgroup v2 not found")]
CgroupV2NotFound,
}

/// Determines the cgroup v2 path for a given PID.
pub(crate) async fn get_path(pid: i32) -> Result<PathBuf, Error> {
let path = format!("/proc/{pid}/cgroup");
let content = fs::read_to_string(path).await?;

for line in content.lines() {
let mut fields = line.split(':');
let hierarchy_id = fields.next().ok_or(Error::CgroupV2NotFound)?;
let controllers = fields.next().ok_or(Error::CgroupV2NotFound)?;
let cgroup_path = fields.next().ok_or(Error::CgroupV2NotFound)?;

if hierarchy_id == "0" && controllers.is_empty() {
// cgroup v2 detected
let cgroup_mount_point = "/sys/fs/cgroup"; // Default mount point
let full_cgroup_path = PathBuf::from(cgroup_mount_point)
.join(cgroup_path.strip_prefix('/').unwrap_or(cgroup_path));
return Ok(full_cgroup_path);
}
}

Err(Error::CgroupV2NotFound)
}

/// Polls for any cgroup metrics that can be read, v2 version.
pub(crate) async fn poll(path: PathBuf) -> Result<(), Error> {
let mut entries = fs::read_dir(&path).await?;

while let Some(entry) = entries.next_entry().await? {
let metadata = entry.metadata().await?;
if metadata.is_file() {
let file_name = entry.file_name();
let metric_prefix = match file_name.to_str() {
Some(s) => String::from(s),
None => {
// Skip files with non-UTF-8 names
continue;
}
};
let file_path = entry.path();

let content = fs::read_to_string(&file_path).await?;
let content = content.trim();

// Cgroup files that have values are either single-valued or
// key-value pairs. For single-valued files, we create a single
// metric and for key-value pairs, we create metrics with the same
// scheme as single-valued files but tack on the key to the metric
// name.
if let Ok(value) = content.parse::<f64>() {
// Single-valued
gauge!(metric_prefix).set(value);
} else {
// Key-value pairs
if kv_pairs(content, &metric_prefix).is_err() {
// File may fail to parse, for instance cgroup.controllers
// is a list of strings.
continue;
}
}
}
}

Ok(())
}

fn kv_pairs(content: &str, metric_prefix: &str) -> Result<(), Error> {
for line in content.lines() {
let mut parts = line.split_whitespace();
let key = parts.next().expect("malformed key-value pair");
let value_str = parts.next().expect("malformed key-value pair");
let value: f64 = value_str.parse()?;
let metric_name = format!("{metric_prefix}.{key}");
gauge!(metric_name).set(value);
}
Ok(())
}

0 comments on commit 6575d56

Please sign in to comment.